# -*- coding: utf-8 -*- """ A basic "plugin loader" implementation which searches for default packaged and user-supplied parser modules and verifies them so they can be executed by logparse.interface. The requirements for parser modules and classes are specified in the docstring of the Parser class. Classes in this module: - `Parser`: Base class that every parser should inherit - `ParserLoader`: Class used internally by `interface.py` to load parsers """ import importlib from importlib import util from os.path import dirname from pkgutil import iter_modules import inspect from pathlib import Path import subprocess from subprocess import Popen, PIPE from typing import get_type_hints import logging logger = logging.getLogger(__name__) PARSER_DIR = "/usr/share/logparse/user-parsers" PARSER_PKG = "logparse.parsers" class Parser(): """ This is the base class that every parser should inherit. Parsers should each exist in their own module and contain a Parser class whose name is the same as the parser (e.g. `example.py` contains class `Example(Parser)`). Each parser module must contain exactly one Parser class definition, and this class cannot be a redefinition of the base Parser class (i.e. this class). This class must provide the parse_log() method which returns a logparse.formatting.Section object. """ def __init__(self, name=None, path=None, info=None, deprecated=False, successor=""): """ The following variables can be set to display information about the parser. The object `self.logger` can be used for outputting messages to to whatever sink is set up in __init__.py (no setup required in the parser module itself). """ self.name = str(name) if name else None self.path = Path(path) if path else None self.info = dict(info) if info else None self.logger = logging.getLogger(self.__module__) self.deprecated = deprecated self.successor = successor def load(self): """ A generic loading method to import a parser, only used for debugging """ logger.debug("Loading parser {0} from {1}".format( self.name, str(self.path) if self.path != None else "defaults")) return importlib.import_module(self.name) def parse_log(self, **args) -> None: """ Every parser should provide the parse_log method which is executed at runtime to analyse logs. Verification checks should prevent the below exception from ever being raised. """ raise NotImplementedError("Failed to find an entry point for parser") def check_dependencies(self) -> tuple: """ Parsers should check their own requirements here and return a boolean value reflecting whether the parser can run successfully. Typically this method should check for the program whose logs are being parsed, as well as any external dependencies like libsystemd. This method should return a tuple containing a boolean representing whether or not the dependencies are satisfied and list containing the names of any dependencies that are unsatisfied. """ return (True, None) def _check_dependency_command(self, cmdline) -> tuple: """ Runs a shell command (typically something --version) and returns the output and return code as a tuple. The command to run is passed as a string, optionally including arguments, in the `cmdline` argument. """ logger.debug("Checking output of command " + cmdline) cmd = subprocess.getstatusoutput(cmdline) if cmd[0] != 0: logger.warning("{0} is not available on this system (`{1}` " "returned code {2}: \"{3}\")".format( cmdline.split()[0], cmdline, *cmd)) return cmd else: logger.debug("Command {0} succeeded".format(cmdline)) return cmd class ParserLoader: """ This class searches for parsers in the main logparse package (logparser.parsers) and optionally in another external package (default /usr/share/logparse). """ def __init__(self, pkg=PARSER_PKG, path=PARSER_DIR): """ The pkg and path attributes shouldn't need to be set on object creation, the default values should work fine. They are hard-coded here for security so that a module can't force-load a module from another package/location, e.g. from the internet. """ self.pkg = pkg self.path = path self.parsers = [] self.has_systemd = False def search(self, pattern): """ Find a parser and determine its journald attribute. When a user requests a parser of the form .*_journald, this function will use that parser if it exists, but if not it will revert to using the base parser (without the _journald) if it has a journald attribute. If it does not have this error (and the parser as requested does not exist), then no parser is loaded.. """ # Separate into underscore words split_name = pattern.split("_") # Check if parser exists with exact name requested by user result = self._search_both(pattern) if result == None and split_name[-1] == "journald": # No match for exact name but .*_journald was requested... if self.has_systemd: # Look for base parser with journald attribute result = self._search_both("".join(split_name[:-1])) if result == None: logger.error("Couldn't find a matching parser module " "for {0}".format(pattern)) if not hasattr(result, "journald"): logger.error("Found parser {} but it does not support " "journald".format("".join(split_name[:-1]))) result = None else: result.journald = True else: logger.error("A parser that requires systemd was requested " "but the dependencies are not installed.") return None if not result.deps_ok: return None if result == None: # Still can't find a matching parser logger.error("Couldn't find a matching parser module " "for {0}".format(pattern)) else: self.parsers.append(result) return result def _search_both(self, pattern): """ Basic wrapper for the two search functions below. """ default_parser = self._search_default(pattern) if default_parser != None: return default_parser else: user_parser = self._search_user(pattern) if user_parser != None: return user_parser else: return None def _search_user(self, pattern): """ Search for a parser name `pattern` in the user-managed parser directory """ logger.debug("Searching for {0} in {1}".format(pattern, self.path)) try: spec = importlib.machinery.PathFinder.find_spec( pattern, path=[self.path]) parser_module = spec.loader.load_module(spec.name) return self._validate_module(parser_module) except Exception as e: return None def _search_default(self, pattern): """ Search for a parser name `pattern` in the default parser package TODO use importlib.resources.is_resources() once there is a backport to Python 3.6 or below """ logger.debug("Searching for {0} in default parsers".format(pattern)) try: parser_module = importlib.import_module(self.pkg + "." + pattern) return self._validate_module(parser_module) except Exception as e: return None def _validate_module(self, parser_module): """ Some basic security tests for candidate modules: 1. Must contain exactly one Parser object 3. This class cannot be a redefinition of the base Parser class 4. Must provide the parse_log() method 5. Must not return None 6. Must not match an already-loaded class 7. Dependencies must exist """ logger.debug("Checking validity of module {0} at {1}".format( parser_module.__name__, parser_module.__file__)) available_parsers = [] missing_dependencies = [] clsmembers = inspect.getmembers(parser_module, inspect.isclass) # Check individual classes for (_, c) in clsmembers: if not (issubclass(c, Parser) & (c is not Parser)): continue if c in self.parsers: logger.error("Parser class {0} has already been loaded " "from another source, ignoring it".format( c.__class__.__name__, c.__file__)) if not inspect.isroutine(c.parse_log): logger.error("Parser class {0} in {1} does not contain a " "parse_log() method".format( c.__class__.__name__, c.__file__)) continue if None in get_type_hints(c): logger.error("Parser class {0} in {1} contains a " "null-returning parse_log() method".format( c.__class__.__name__, c.__file__)) continue parser_obj = c() if parser_obj.deprecated: logger.warning("Parser {0} is deprecated - " "use {1} instead".format( parser_obj.name, parser_obj.successor)) # Check dependencies deps = parser_obj.check_dependencies() if deps[0]: parser_obj.deps_ok = True else: logger.error("The following dependencies are missing for " "parser {0}: {1}".format(parser_obj.name, ", ".join(deps[1]))) missing_dependencies.append(parser_obj) parser_obj.deps_ok = False logger.debug("Found parser {0}.{1}".format( c.__module__, c.__class__.__name__)) available_parsers.append(parser_obj) # Check module structure if len(available_parsers) > 1: logger.error("Found multiple valid parser classes in {0} at {1} " "- ignoring this module" .format(parser_module.__name__, parser_module.__file__)) return None elif len(available_parsers) == 0: if len(missing_dependencies) > 0: return None logger.error("No valid classes in {0} at {1}". format(parser_module.__name__, parser_module.__file__)) return None if len(available_parsers) == 1: logger.debug("Parser module {0} at {1} passed validity checks" .format(parser_module.__name__, parser_module.__file__)) return available_parsers[0] def check_systemd(self): """ Check if the appropriate dependencies are installed for parsing systemd logs. Output codes: 0. systemd + libsystemd + systemd-python are installed 1. systemd + libsystemd are installed 2. systemd is installed 3. systemd is not installed, no support required """ # Test if systemctl works systemctl_cmd = Popen(["systemctl", "--version"], stdout=PIPE) systemctl_cmd.communicate() if systemctl_cmd.returncode == 0: logger.debug("Passed systemctl test") # Test if libsystemd exists libsystemd_cmd = Popen(["locate", "libsystemd.so.0"], stdout=PIPE) libsystemd_cmd.communicate() if libsystemd_cmd.returncode == 0: logger.debug("Passed libsystemd test") # Test if systemd-python exists if util.find_spec("systemd") is not None: logger.debug("Passed systemd-python test") self.has_systemd = True logger.debug("Passed all systemd dependency checks") return 0 else: logger.warning("Systemd is running on this system but the " "package systemd-python is not installed. Parsers " "that use journald will not work. For more " "features, install systemd-python from " " or " "`pip install systemd-python`.") return 1 else: logger.warning("Systemd is running on this system but " "libsystemd headers are missing. This package is " "required to make use of the journald parsers. " "Libsystemd should be available with your package " "manager of choice.") return 2 else: logger.debug("Systemd not installed.. parsers that use journald " "will not work.") return 3 def load_pkg(self): """ Clear the list of currently loaded packages and load all valid and non-deprecated parser classes from self.pkg using importlib. """ available_parsers = [name for _, name, _ in iter_modules( [dirname(importlib.import_module(self.pkg).__file__)])] for parser_name in available_parsers: parser_module = importlib.import_module( "logparse.parsers." + parser_name) parser_class = self._validate_module(parser_module) if parser_class == None: continue if parser_class.deprecated: logger.debug("Ignoring parser {0} because it is deprecated" .format(parser_class.__class__.__name__)) continue self.parsers.append(parser_class) return self.parsers def ignore(self, pattern): """ Remove a parser from the list of currently loaded parsers """ for parser in self.parsers: if parser.__module__ == pattern: self.parsers.remove(parser) logger.debug("Ignoring parser {0}".format(parser.__name__)) return self.parsers