# # load_parsers.py # # Search for and load files which parse logs for particular services # import imp import importlib import os import glob import pkgutil import inspect from pathlib import Path from sys import path from typing import NamedTuple parser_dir = "/usr/share/logparse/" main_module = "__init__" default_parsers = ["cron_journald", "httpd", "mem", "postfix", "smbd", "sshd_journald", "sudo", "sysinfo", "temperature", "zfs"] deprecated_parsers = ["sshd", "cron"] import logging logger = logging.getLogger(__name__) class Parser(): """ Base class that every parser should inherit """ def __init__(self, name=None, path=None, info=None): self.name = str(name) if name else None self.path = Path(path) if path else None self.info = dict(info) if info else None self.logger = logging.getLogger(__name__) def load(self): logger.debug("Loading parser {0} from {1}".format(self.name, str(self.path) if self.path != None else "defaults")) return importlib.import_module(self.name) def parse_log(self, **args): """ Every parser should provide the parse_log method which is executed at runtime to analyse logs. """ raise NotImplementedError("Failed to find an entry point for parser " + self.name) class ParserLoader: """ This class searches for parsers in the main logparse package and optionally in another external package (default /usr/share/logparse). """ def __init__(self, pkg): """ Initiate search for parsers """ self.pkg = pkg self.parsers= [] self.reload() def reload(self): """ Reset parsers list and iterate through package modules """ self.parsers= [] self.seen_paths = [] logger.debug("Looking for parsers in package {0}".format(str(self.pkg))) self.walk_package(self.pkg) def walk_package(self, package): """ Check package and subdirectories for loadable modules """ imported_package = __import__(package, fromlist=["null"]) # fromlist must be non-empty to load target module rather than parent package for _, parser_name, ispkg in pkgutil.iter_modules(imported_package.__path__, imported_package.__name__ + '.'): if not ispkg: parser_module = __import__(parser_name, fromlist=["null"]) clsmembers = inspect.getmembers(parser_module, inspect.isclass) for (_, c) in clsmembers: # Ignore the base Parser class if issubclass(c, Parser) & (c is not Parser): logger.debug("Found parser {0}.{1}".format(c.__module__, c.__name__)) self.parsers.append(c()) # Recurse subpackages all_current_paths = [] if isinstance(imported_package.__path__, str): all_current_paths.append(imported_package.__path__) else: all_current_paths.extend([x for x in imported_package.__path__]) for pkg_path in all_current_paths: if pkg_path not in self.seen_paths: self.seen_paths.append(pkg_path) # Get subdirectories of package child_pkgs = [p for p in os.listdir(pkg_path) if os.path.isdir(os.path.join(pkg_path, p))] # Walk through each subdirectory for child_pkg in child_pkgs: self.walk_package(package + '.' + child_pkg) def findall(): logger.debug("Searching for parsers in {0}".format(parser_dir)) path.append(os.path.abspath(parser_dir)) parsers = [] parser_candidates = os.listdir(parser_dir) for parser_name in parser_candidates: location = os.path.join(parser_dir, parser_name) if not os.path.isdir(location) or not main_module + '.py' in os.listdir(location): logger.warning("Rejecting parser {0} due to invalid structure".format(location)) continue info = imp.find_module(main_module, [location]) parser_obj = Parser(parser_name, location, info) parsers.append(parser_obj) logger.debug("Added parser {0}".format(parser_obj.name)) return parsers def search(name): logger.debug("Searching for parser {0}".format(name)) if name in default_parsers: logger.debug("Found parser {0} in default modules".format(name)) return Parser('.'.join(__name__.split('.')[:-1] + [name])) elif name in deprecated_parsers: logger.debug("Found parser {0} in deprecated modules".format(name)) return Parser('.'.join(__name__.split('.')[:-1] + [name])) else: return None def load(parser): logger.debug("Loading parser {0} from {1}".format(parser.name, parser.path if parser.path != None else "defaults")) return importlib.import_module(parser.name)