1#
2# load_parsers.py
3#
4# Search for and load files which parse logs for particular services
5#
67
import imp
8import importlib
9import os
10import glob
11from pathlib import Path
12from sys import path
13from typing import NamedTuple
1415
parser_dir = "/usr/share/logparse/"
16main_module = "__init__"
17default_parsers = ["cron", "httpd", "postfix", "smbd", "sshd", "sudo", "temperature", "zfs"]
1819
import logging
20logger = logging.getLogger(__name__)
2122
class Parser():
23def __init__(self, name, path=None, info=None):
24self.name = str(name)
25self.path = Path(path) if path else None
26self.info = dict(info) if info else None
2728
def findall():
29logger.debug("Searching for parsers in {0}".format(parser_dir))
30path.append(os.path.abspath(parser_dir))
31parsers = []
32parser_candidates = os.listdir(parser_dir)
33for parser_name in parser_candidates:
34location = os.path.join(parser_dir, parser_name)
35if not os.path.isdir(location) or not main_module + '.py' in os.listdir(location):
36logger.warning("Rejecting parser {0} due to invalid structure".format(location))
37continue
38info = imp.find_module(main_module, [location])
39parser_obj = Parser(parser_name, location, info)
40parsers.append(parser_obj)
41logger.debug("Added parser {0}".format(parser_obj.name))
42return parsers
4344
def search(name):
45logger.debug("Searching for parser {0}".format(name))
46if name in default_parsers:
47logger.debug("Found parser {0} in default modules".format(name))
48return Parser('.'.join(__name__.split('.')[:-1] + [name]))
49else:
50return None
5152
def load(parser):
53logger.debug("Loading parser {0} from {1}".format(parser.name, parser.path if parser.path != None else "defaults"))
54return importlib.import_module(parser.name)