# # load_parsers.py # # Search for and load files which parse logs for particular services # import imp import importlib import os import glob from pathlib import Path from sys import path from typing import NamedTuple parser_dir = "/usr/share/logparse/" main_module = "__init__" default_parsers = ["cron", "httpd", "mem", "postfix", "smbd", "sshd", "sudo", "sysinfo", "temperature", "zfs"] import logging logger = logging.getLogger(__name__) class Parser(): def __init__(self, name, path=None, info=None): self.name = str(name) self.path = Path(path) if path else None self.info = dict(info) if info else None def findall(): logger.debug("Searching for parsers in {0}".format(parser_dir)) path.append(os.path.abspath(parser_dir)) parsers = [] parser_candidates = os.listdir(parser_dir) for parser_name in parser_candidates: location = os.path.join(parser_dir, parser_name) if not os.path.isdir(location) or not main_module + '.py' in os.listdir(location): logger.warning("Rejecting parser {0} due to invalid structure".format(location)) continue info = imp.find_module(main_module, [location]) parser_obj = Parser(parser_name, location, info) parsers.append(parser_obj) logger.debug("Added parser {0}".format(parser_obj.name)) return parsers def search(name): logger.debug("Searching for parser {0}".format(name)) if name in default_parsers: logger.debug("Found parser {0} in default modules".format(name)) return Parser('.'.join(__name__.split('.')[:-1] + [name])) else: return None def load(parser): logger.debug("Loading parser {0} from {1}".format(parser.name, parser.path if parser.path != None else "defaults")) return importlib.import_module(parser.name)