# # sudo-journald.py # # Get number of sudo sessions for each user # import re from systemd import journal from logparse.formatting import * from logparse.util import readlog from logparse.config import prefs from logparse.load_parsers import Parser class SudoCommand(): def __init__(self, msg): self.init_user, self.pwd, self.su, self.command = re.search( r"\s*(?P\w*) : TTY=.* ; PWD=(?P\S*) ;" " USER=(?P\w*) ; COMMAND=(?P\S*)", msg)\ .groupdict().values() class Sudo(Parser): def __init__(self): super().__init__() self.name = "sudo" self.info = "Get number of sudo sessions for each user" def parse_log(self): logger.debug("Starting sudo section") section = Section("sudo") if not (config.prefs.getboolean("sudo", "summary") or config.prefs.getboolean("sudo", "list-users")): logger.warning("Both summary and list-users configuration options " "are set to false, so no output will be generated. " "Skipping this parser.") return None j = journal.Reader() j.this_machine() j.log_level(journal.LOG_INFO) j.add_match(_COMM="sudo") j.seek_realtime(section.period.startdate) j.log_level(5) logger.debug("Searching for sudo commands") messages = [entry["MESSAGE"] for entry in j if "MESSAGE" in entry] commands_objects = [] # list of command objects init_users = {} # keys are users, values are lists of commands logger.debug("Parsing sudo log messages") for msg in messages: if "COMMAND=" in msg: try: command_obj = SudoCommand(msg) except Exception as e: logger.warning("Malformed sudo log message: {0}. " "Error message: {1}".format(msg, str(e))) continue if config.prefs.getboolean("sudo", "truncate-commands"): command_obj.command = command_obj.command.split("/")[-1] commands_objects.append(command_obj) if not command_obj.init_user in init_users: init_users[command_obj.init_user] = [] init_users[command_obj.init_user].append( command_obj.su + ": " + command_obj.command) logger.debug("Generating output") if len(commands_objects) == 0: logger.warning("No sudo commands found") return if config.prefs.getboolean("sudo", "summary"): summary_data = Data() summary_data.subtitle = plural("sudo session", len(commands_objects)) if all(cmd.su == commands_objects[0].su for cmd in commands_objects): # Only one superuser if len(set(cmd.init_user for cmd in commands_objects)) > 1: # Multiple initiating users summary_data.subtitle += " for superuser " \ + commands_objects[0].su summary_data.items = ["{}: {}".format(cmd.init_user, cmd.command) for cmd in commands_objects] else: # Only one initiating user summary_data.subtitle += " opened by " \ + commands_objects[0].init_user + " for " \ + commands_objects[0].su summary_data.items = [cmd.command for cmd in commands_objects] else: # Multiple superusers if len(set(cmd.init_user for cmd in commands_objects)) > 1: # Multiple initiating users summary_data.subtitle += " for " + plural("superuser", len(set(cmd.su for cmd in commands_objects))) summary_data.items = ["{}→{}: {}".format( cmd.init_user, cmd.su, cmd.command) for cmd in commands_objects] else: # Only one initiating user summary_data.subtitle += " by " \ + commands_objects[0].init_user \ + " for " + plural("superuser", len(set(cmd.su for cmd in commands_objects))) summary_data.items = ["{}: {}".format( cmd.su, cmd.command) for cmd in commands_objects] summary_data.orderbyfreq() summary_data.truncl(config.prefs.getint("logparse", "maxcmd")) section.append_data(summary_data) if config.prefs.getboolean("sudo", "list-users") \ and len(set(cmd.init_user for cmd in commands_objects)) > 1: for user, user_commands in init_users.items(): user_data = Data() user_data.subtitle = plural("sudo session", len(user_commands)) + " for user " + user user_data.items = user_commands user_data.orderbyfreq() user_data.truncl(config.prefs.getint("logparse", "maxcmd")) section.append_data(user_data) logger.info("Finished sudo section") return section