# -*- coding: utf-8 -*- """ A basic "plugin loader" implementation which searches for default packaged and user-supplied parser modules and verifies them so they can be executed by logparse.interface. The requirements for parser modules and classes are specified in the docstring of the Parser class. Classes in this module: - `Parser`: Base class that every parser should inherit - `ParserLoader`: Class used internally by `interface.py` to load parsers """ import importlib from os.path import dirname from pkgutil import iter_modules import inspect from pathlib import Path from typing import get_type_hints import logging logger = logging.getLogger(__name__) PARSER_DIR = "/usr/share/logparse/user-parsers" PARSER_PKG = "logparse.parsers" class Parser(): """ This is the base class that every parser should inherit. Parsers should each exist in their own module and contain a Parser class whose name is the same as the parser (e.g. `example.py` contains class `Example(Parser)`). Each parser module must contain exactly one Parser class definition, and this class cannot be a redefinition of the base Parser class (i.e. this class). This class must provide the parse_log() method which returns a logparse.formatting.Section object. """ def __init__(self, name=None, path=None, info=None, deprecated=False, successor=""): """ The following variables can be set to display information about the parser. The object `self.logger` can be used for outputting messages to to whatever sink is set up in __init__.py (no setup required in the parser module itself). """ self.name = str(name) if name else None self.path = Path(path) if path else None self.info = dict(info) if info else None self.logger = logging.getLogger(self.__module__) self.deprecated = deprecated self.successor = successor def load(self): """ A generic loading method to import a parser, only used for debugging """ logger.debug("Loading parser {0} from {1}".format( self.name, str(self.path) if self.path != None else "defaults")) return importlib.import_module(self.name) def parse_log(self, **args) -> None: """ Every parser should provide the parse_log method which is executed at runtime to analyse logs. Verification checks should prevent the below exception from ever being raised. """ raise NotImplementedError("Failed to find an entry point for parser") class ParserLoader: """ This class searches for parsers in the main logparse package (logparser.parsers) and optionally in another external package (default /usr/share/logparse). """ def __init__(self, pkg=PARSER_PKG, path=PARSER_DIR): """ The pkg and path attributes shouldn't need to be set on object creation, the default values should work fine. They are hard-coded here for security so that a module can't force-load a module from another package/location, e.g. from the internet. """ self.pkg = pkg self.path = path self.parsers = [] def search(self, pattern): """ Basic wrapper for the two search functions below. """ default_parser = self._search_default(pattern) if default_parser != None: self.parsers.append(default_parser) return default_parser else: user_parser = self._search_user(pattern) if user_parser != None: self.parsers.append(user_parser) return user_parser else: logger.warning("Couldn't find a matching parser module " "for search term {0}".format(pattern)) return None def _search_user(self, pattern): """ Search for a parser name `pattern` in the user-managed parser directory """ logger.debug("Searching for {0} in {1}".format(pattern, self.path)) try: spec = importlib.machinery.PathFinder.find_spec( pattern, path=[self.path]) parser_module = spec.loader.load_module(spec.name) return self._validate_module(parser_module) except Exception as e: return None def _search_default(self, pattern): """ Search for a parser name `pattern` in the default parser package TODO use importlib.resources.is_resources() once there is a backport to Python 3.6 or below """ logger.debug("Searching for {0} in default parsers".format(pattern)) try: parser_module = importlib.import_module(self.pkg + "." + pattern) return self._validate_module(parser_module) except Exception as e: return None def _validate_module(self, parser_module): """ Some basic security tests for candidate modules: 1. Must contain exactly one Parser object 3. This class cannot be a redefinition of the base Parser class 4. Must provide the parse_log() method 5. Must not return None 6. Must not match an already-loaded class """ logger.debug("Checking validity of module {0} at {1}".format( parser_module.__name__, parser_module.__file__)) available_parsers = [] clsmembers = inspect.getmembers(parser_module, inspect.isclass) # Check individual classes for (_, c) in clsmembers: if not (issubclass(c, Parser) & (c is not Parser)): continue if c in self.parsers: logger.warning("Parser class {0} has already been loaded " "from another source, ignoring it".format( c.__class__.__name__, c.__file__)) if not inspect.isroutine(c.parse_log): logger.warning("Parser class {0} in {1} does not contain a " "parse_log() method".format( c.__class__.__name__, c.__file__)) continue if None in get_type_hints(c): logger.warning("Parser class {0} in {1} contains a " "null-returning parse_log() method".format( c.__class__.__name__, c.__file__)) continue parser_obj = c() if parser_obj.deprecated: logger.warning("Parser {0} is deprecated - " "use {1} instead".format( parser_obj.name, parser_obj.successor)) logger.debug("Found parser {0}.{1}".format( c.__module__, c.__class__.__name__)) available_parsers.append(parser_obj) # Check module structure if len(available_parsers) == 1: logger.debug("Parser module {0} at {1} passed validity checks" .format(parser_module.__name__, parser_module.__file__)) return available_parsers[0] elif len(available_parsers) == 0: logger.warning("No valid classes in {0} at {1}". format(parser_module.__name__, parser_module.__file__)) return None elif len(available_parsers) > 1: logger.warning("Found multiple valid parser classes in {0} at {1} " "- ignoring this module" .format(parser_module.__name__, parser_module.__file__)) return None def load_pkg(self): """ Clear the list of currently loaded packages and load all valid and non-deprecated parser classes from self.pkg using importlib. """ available_parsers = [name for _, name, _ in iter_modules( [dirname(importlib.import_module(self.pkg).__file__)])] for parser_name in available_parsers: parser_module = importlib.import_module( "logparse.parsers." + parser_name) parser_class = self._validate_module(parser_module) if parser_class == None: continue if parser_class.deprecated: logger.debug("Ignoring parser {0} because it is deprecated" .format(parser_class.__class__.__name__)) continue self.parsers.append(parser_class) return self.parsers def ignore(self, pattern): """ Remove a parser from the list of currently loaded parsers """ for parser in self.parsers: if parser.__module__ == pattern: self.parsers.remove(parser) logger.debug("Ignoring parser {0}".format(parser.__name__)) return self.parsers