+"""
+This file sets up logging for the entire logparse package. Custom log formatter
+and handler classes are specified below and the logger is supplied to all
+subsequent modules, including parsers.
+"""
+
__version__ = '2.0'
__name__ = 'logparse'
+
+
+from copy import copy
+import logging
+import logging.handlers
+
+
+# Standard shell escape codes
+ESC = {
+ "reset": "\033[0m",
+ "color": "\033[1;%dm",
+ "bold": "\033[1m",
+ "underlined": "\033[4m"
+ }
+
+# Standard shell colour codes (30..39 are the foreground colors)
+DEFAULT = 39
+BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN = range(30, 37)
+
+# Map colours to log levels (used for level text only)
+COLORS = {
+ 10: BLUE, # debug
+ 20: DEFAULT, # info
+ 30: YELLOW, # warning
+ 40: RED, # error
+ 50: RED # critical
+ }
+
+# Template for formatting log messages
+FORMAT = ("{bold}%(name)-15s{reset} %(levelname)-18s %(message)s "
+ "({bold}%(filename)s{reset}:%(lineno)d)")
+
+
+class ColoredFormatter(logging.Formatter):
+ """
+ Custom implementation of a log formatter to apply standard shell colour
+ escape sequences depending on the log level. The original record is copied
+ so as to not interfere with subsequent handlings (i.e. the syslog handler).
+ """
+
+ def __init__(self, msg):
+ logging.Formatter.__init__(self, msg)
+
+ def format(self, record):
+ temprecord = copy(record)
+ levelname = temprecord.levelname
+ if temprecord.levelno in COLORS:
+ levelname_color = ESC["color"] % (COLORS[temprecord.levelno]) \
+ + levelname + ESC["reset"]
+ temprecord.levelname = levelname_color
+ temprecord.name = record.name.replace(__name__ + ".", "")
+ return logging.Formatter.format(self, temprecord)
+
+
+class ColoredLogger(logging.Logger):
+ """
+ Custom implementation of a logger object using the `ColoredFormatter` class
+ above. This class also includes a syslog handler to direct a minimal amount
+ of output to /dev/log.
+ """
+
+ message = FORMAT.format(**ESC)
+
+ def __init__(self, name):
+ """
+ Initialise the logger for the entire package. This is done here so that
+ the configuration is applied to all child modules. A syslog handler
+ is also initialised, with a min level of INFO so that journald doesn't
+ get spammed with debug messages..
+ """
+
+ logging.Logger.__init__(self, name)
+
+ color_formatter = ColoredFormatter(self.message)
+
+ syslog_handler = logging.handlers.SysLogHandler(address = '/dev/log')
+ syslog_handler.setLevel(logging.INFO)
+ syslog_handler.setFormatter(logging.Formatter(
+ fmt='{}[%(process)d]: (%(module)s) %(message)s'.format(__name__)))
+
+ console = logging.StreamHandler()
+ console.setFormatter(color_formatter)
+
+ self.addHandler(console)
+ self.addHandler(syslog_handler)
+ return
+
+
+# Initialise logger object
+logging.setLoggerClass(ColoredLogger)
+logger = logging.getLogger()
-#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
from logparse import interface, util, mail, config
import logging
+logger = None
logger = logging.getLogger(__name__)
if interface.argparser.parse_args().no_write:
css_path = os.path.relpath(css_path, ".")
elif interface.argparser.parse_args().destination:
- css_path = os.path.relpath(css_path, interface.argparser.parse_args().destination())
+ css_path = os.path.relpath(
+ css_path, interface.argparser.parse_args().destination())
elif config.prefs.get("logparse", "output"):
- css_path = os.path.relpath(css_path, config.prefs.get("logparse", "output"))
+ css_path = os.path.relpath(
+ css_path, config.prefs.get("logparse", "output"))
VARSUBST = {
"title": config.prefs.get("logparse", "title"),
"date": interface.start.strftime(DATEFMT),
"time": interface.start.strftime(TIMEFMT),
- "hostname": util.hostname(config.prefs.get("logparse", "hostname-path")),
+ "hostname": util.hostname(config.prefs.get(
+ "logparse", "hostname-path")),
"version": logparse.__version__,
"css": css_path
}
print()
if lines:
- line = PlaintextLine(linewidth=config.prefs.getint("plain", "linewidth"), double=True)
+ line = PlaintextLine(linewidth=
+ config.prefs.getint("plain", "linewidth"), double=True)
print(line.draw())
print(self.content)
if lines:
class PlaintextOutput(Output):
"""
- Processes & outputs data in a plaintext form which can be read with cat or plaintext email.
+ Processes & outputs data in a plaintext form which can be read with cat or
+ plaintext email.
"""
def __init__(self, linewidth=80):
"""
Print details with some primitive formatting
"""
- box = PlaintextBox(content=Template("$title $version on $hostname\n\n$time $date").safe_substitute(VARSUBST), vpadding=2, hpadding="\t\t", linewidth=self.linewidth)
+ box = PlaintextBox(content=
+ Template("$title $version on $hostname\n\n$time $date")
+ .safe_substitute(VARSUBST),
+ vpadding=2, hpadding="\t\t", linewidth=self.linewidth)
line = PlaintextLine(self.linewidth)
self.append(box.draw() + line.draw())
This should be run by interface.py after every instance of parse_log().
"""
- self.append(PlaintextBox(content=section.title, double=False, fullwidth=False, vpadding=0, hpadding=" ").draw())
+ self.append(PlaintextBox(
+ content=section.title, double=False,
+ fullwidth=False, vpadding=0, hpadding=" ").draw())
if section.period and section.period.unique:
- self.append("\n(since {0})".format(section.period.startdate.strftime(DATEFMT + " " + TIMEFMT)))
+ self.append("\n(since {0})".format(
+ section.period.startdate.strftime(DATEFMT + " " + TIMEFMT)))
self.append('\n'*2)
for data in section.data:
self.append(self._fmt_data(data.subtitle, data.items))
"""
if not self._embedded:
- self._embedded = mail.mailprep(re.sub(".*" + re.escape(VARSUBST['css']) + ".*\n", "", self.content), css)
+ self._embedded = mail.mailprep(re.sub(
+ ".*" + re.escape(VARSUBST['css']) + ".*\n", "", self.content),
+ css)
return self._embedded
def write_embedded(self, destination = ""):
def append_section(self, section):
"""
- Call the appropriate methods to generate HTML tags for a section (provided by a parser).
- This should be run by interface.py after every instance of parse_log().
+ Call the appropriate methods to generate HTML tags for a section
+ (provided by a parser). This should be run by interface.py after every
+ instance of parse_log().
"""
self.append(opentag('div', 1, section.title, 'section'))
logger.debug("Received data " + str(data))
subtitle += ':'
if (len(data) == 1):
- return tag('p', False, subtitle + ' ' + data[0], cl="severity-" + str(severity))
+ return tag('p', False, subtitle + ' ' + data[0],
+ cl="severity-" + str(severity))
else:
output = ""
- output += tag('p', False, subtitle, cl="severity-" + str(severity))
+ output += tag('p', False, subtitle,
+ cl="severity-" + str(severity))
output += opentag('ul', 1)
coderegex = re.compile('`(.*)`')
for datum in data:
self.embed_css(config.prefs.get("html", "css"))
print()
if lines:
- line = PlaintextLine(linewidth=config.prefs.getint("plain", "linewidth"), double=True)
+ line = PlaintextLine(linewidth=
+ config.prefs.getint("plain", "linewidth"), double=True)
print(line.draw())
print(self._embedded)
if lines:
def truncl(self, limit): # truncate list
"""
- Truncate self.items to a specified value and state how many items are hidden.
+ Truncate self.items to a specified value and state how many items are
+ hidden.
"""
if (len(self.items) > limit):
"""
unsorted = list(self.items)
- self.items = ["{0} ({1})".format(y, unsorted.count(y)) for y in sorted(set(unsorted), key = lambda x: -unsorted.count(x))]
+ self.items = ["{0} ({1})".format(y, unsorted.count(y)) for y in sorted(
+ set(unsorted), key = lambda x: -unsorted.count(x))]
return self
A wrapper for python-tabulate's Tabulate type.
"""
- def __init__(self, double=False, borders=False, hpadding=" ", maxwidth=80, headers=[]):
+ def __init__(self, double=False, borders=False, hpadding=" ",
+ maxwidth=80, headers=[]):
"""
Initialise variables. Note the keymap is used for a faster index map,
but is not currently used anywhere (may be removed in future).
self._align_cols[i] = align
for row in self.rows:
row.columns[i].align = align
- logger.debug("Column alignment is now {0}".format(str(self._align_cols)))
+ logger.debug("Column alignment is now {0}".format(self._align_cols))
def _gen_list(self):
"""
Output HTML string (wrapper for tabulate)
"""
- output = tabulate(self._gen_list(), self.headers, tablefmt="html", colalign=tuple(self._align_cols))
+ output = tabulate(self._gen_list(), self.headers, tablefmt="html",
+ colalign=tuple(self._align_cols))
return output
def draw_plain(self):
Output plain text string (wrapper for tabulate)
"""
- output = tabulate(self._gen_list(), self.headers, tablefmt="fancy_grid" if self.borders else "plain", colalign=tuple(self._align_cols))
+ output = tabulate(self._gen_list(), self.headers,
+ tablefmt="fancy_grid" if self.borders
+ else "plain", colalign=tuple(self._align_cols))
return output + "\n"*2
"""
line = (LINECHARS_DOUBLE[1] if self.double else LINECHARS_SINGLE[1])
- return "\n" * self.vpadding + self.hpadding + line * (self.linewidth - 2 * len(self.hpadding)) + self.hpadding + "\n" * self.vpadding
+ return "\n" * self.vpadding + self.hpadding \
+ + line * (self.linewidth - 2 * len(self.hpadding)) \
+ + self.hpadding + "\n" * self.vpadding
class PlaintextBox:
Draw a rectangular box around text, with customisable padding/size/style
"""
- def __init__(self, content="", double=True, fullwidth=True, linewidth=80, hpadding="\t", vpadding=1):
+ def __init__(self, content="", double=True, fullwidth=True, linewidth=80,
+ hpadding="\t", vpadding=1):
"""
Initialise variables
"""
# Calculate number of characters per line
contentlines = self.content.splitlines()
- contentwidth = int((self.linewidth if self.linewidth > 0 else 80) if self.content.splitlines() else len(max(contentlines, key=len)))
+ contentwidth = int((self.linewidth if self.linewidth > 0 else 80)
+ if self.content.splitlines()
+ else len(max(contentlines, key=len)))
logger.debug("Contentwidth is {0}".format(str(contentwidth)))
- logger.debug("Longest line is {0}".format(len(max(contentlines, key=len))))
+ logger.debug("Longest line is {0}".format(
+ len(max(contentlines, key=len))))
contentwidth += -2*(len(self.hpadding)+1)
if not self.fullwidth:
longestline = len(max(contentlines, key=len))
contentlines[i] = res
# Flatten list
- # Note list comprehension doesn't work here, so we must iterate through each item
+ # Note list comprehension doesn't work here, so we must iterate
+ # through each item
newlines = []
for line in contentlines:
if isinstance(line, list):
contentlines.append(' '*contentwidth)
# Insert horizontal padding on lines that are too short
- contentlines = [linechars[0] + self.hpadding + x + ' '*(self.linewidth-(len(x)+2*len(self.hpadding)+2) if len(x) < contentwidth else 0) + self.hpadding + linechars[0] for x in contentlines]
- contentlines.insert(0, cornerchars[3] + linechars[1] * (contentwidth + len(self.hpadding)*2) + cornerchars[2])
- contentlines.append(cornerchars[0] + linechars[1] * (contentwidth + len(self.hpadding)*2) + cornerchars[1])
+ contentlines = [linechars[0] + self.hpadding + x
+ + ' '*(self.linewidth-(len(x)+2*len(self.hpadding)+2)
+ if len(x) < contentwidth else 0)
+ + self.hpadding + linechars[0] for x in contentlines]
+ contentlines.insert(0, cornerchars[3] + linechars[1]
+ * (contentwidth + len(self.hpadding)*2) + cornerchars[2])
+ contentlines.append(cornerchars[0] + linechars[1]
+ * (contentwidth + len(self.hpadding)*2) + cornerchars[1])
return ('\n').join(contentlines)
output += " class='" + cl + "'"
if style:
output += " style='"
- output += " ".join("{0}: {1};".format(attr, value) for attr, value in style.items())
+ output += " ".join("{0}: {1};".format(attr, value)
+ for attr, value in style.items())
output += "'"
output += '>'
if block:
-#!/usr/bin/env python
+#!/usr/bin/env python3
# -*- coding: utf-8 -*-
+
"""
This module is the entrypoint of the `logparse` shell command and also contains
single-use functions which don't fit elsewhere. All user interaction with
- `rotate_sim()`: Simulate log rotation
"""
-import logging, logging.handlers
import argparse
+from copy import copy
+import logging
+import logging.handlers
import os
from sys import stdin, version
from subprocess import check_output
from datetime import datetime
import logparse
-from logparse import formatting, mail, config, load_parsers
+from logparse import formatting, mail, config, load_parsers, util
def main():
argparser.parse_args().time_period)
# Set up logging
-
logger = logging.getLogger(__name__)
- loghandler = logging.handlers.SysLogHandler(address = '/dev/log')
- loghandler.setFormatter(logging.Formatter(
- fmt='logparse[' + str(os.getpid()) + ']: %(message)s'))
- loghandler.setLevel(logging.INFO) # don't spam syslog with debug messages
-
if (argparser.parse_args().quiet
or config.prefs.getboolean("logparse", "quiet")):
- logging.basicConfig(level=logging.CRITICAL)
+ logparse.logger.setLevel(logging.CRITICAL)
elif (argparser.parse_args().verbose
or config.prefs.getboolean("logparse", "verbose")):
- logging.basicConfig(level=logging.DEBUG)
+ logparse.logger.setLevel(logging.DEBUG)
logger.debug("Verbose mode turned on")
else:
- logging.basicConfig(level=logging.INFO)
+ logparse.logger.setLevel(logging.INFO)
+
- logger.addHandler(loghandler)
# Time analysis
-#!/usr/bin/env python
# -*- coding: utf-8 -*-
+
"""
A basic "plugin loader" implementation which searches for default packaged and
user-supplied parser modules and verifies them so they can be executed by
logparse.formatting.Section object.
"""
- def __init__(self, name=None, path=None, info=None, deprecated=False, successor=""):
+ def __init__(self, name=None, path=None, info=None, deprecated=False,
+ successor=""):
"""
The following variables can be set to display information about the
- parser. The object `self.logger` can be used as for outputting messages
- to whatever sink is set up in logparse.interface (no setup required in
- the parser module itself).
+ parser. The object `self.logger` can be used for outputting messages to
+ to whatever sink is set up in __init__.py (no setup required in the
+ parser module itself).
"""
self.name = str(name) if name else None
self.path = Path(path) if path else None
self.info = dict(info) if info else None
- self.logger = logging.getLogger(__name__)
+ self.logger = logging.getLogger(self.__module__)
self.deprecated = deprecated
self.successor = successor
"""
A generic loading method to import a parser, only used for debugging
"""
- logger.debug("Loading parser {0} from {1}".format(self.name, str(self.path) if self.path != None else "defaults"))
+ logger.debug("Loading parser {0} from {1}".format(
+ self.name, str(self.path) if self.path != None else "defaults"))
return importlib.import_module(self.name)
def parse_log(self, **args) -> None:
self.parsers.append(user_parser)
return user_parser
else:
- logger.warning("Couldn't find a matching parser module for search term {0}".format(pattern))
+ logger.warning("Couldn't find a matching parser module "
+ "for search term {0}".format(pattern))
return None
def _search_user(self, pattern):
logger.debug("Searching for {0} in {1}".format(pattern, self.path))
try:
- spec = importlib.machinery.PathFinder.find_spec(pattern, path=[self.path])
+ spec = importlib.machinery.PathFinder.find_spec(
+ pattern, path=[self.path])
parser_module = spec.loader.load_module(spec.name)
return self._validate_module(parser_module)
except Exception as e:
def _search_default(self, pattern):
"""
Search for a parser name `pattern` in the default parser package
+ TODO use importlib.resources.is_resources() once there is a backport
+ to Python 3.6 or below
"""
- # TODO use importlib.resources.is_resources() once there is a backport to Python 3.6 or below
logger.debug("Searching for {0} in default parsers".format(pattern))
try:
parser_module = importlib.import_module(self.pkg + "." + pattern)
6. Must not match an already-loaded class
"""
- logger.debug("Checking validity of module {0} at {1}".format(parser_module.__name__, parser_module.__file__))
+ logger.debug("Checking validity of module {0} at {1}".format(
+ parser_module.__name__, parser_module.__file__))
available_parsers = []
clsmembers = inspect.getmembers(parser_module, inspect.isclass)
if not (issubclass(c, Parser) & (c is not Parser)):
continue
if c in self.parsers:
- logger.warning("Parser class {0} has already been loaded from another source, ignoring it".format(c.__class__.__name__, c.__file__))
+ logger.warning("Parser class {0} has already been loaded "
+ "from another source, ignoring it".format(
+ c.__class__.__name__, c.__file__))
if not inspect.isroutine(c.parse_log):
- logger.warning("Parser class {0} in {1} does not contain a parse_log() method".format(c.__class__.__name__, c.__file__))
+ logger.warning("Parser class {0} in {1} does not contain a "
+ "parse_log() method".format(
+ c.__class__.__name__, c.__file__))
continue
if None in get_type_hints(c):
- logger.warning("Parser class {0} in {1} contains a null-returning parse_log() method".format(c.__class__.__name__, c.__file__))
+ logger.warning("Parser class {0} in {1} contains a "
+ "null-returning parse_log() method".format(
+ c.__class__.__name__, c.__file__))
continue
parser_obj = c()
if parser_obj.deprecated:
- logger.warning("Parser {0} is deprecated - use {1} instead".format(parser_obj.name, parser_obj.successor))
- logger.debug("Found parser {0}.{1}".format(c.__module__, c.__class__.__name__))
- available_parsers.append(c())
+ logger.warning("Parser {0} is deprecated - "
+ "use {1} instead".format(
+ parser_obj.name, parser_obj.successor))
+ logger.debug("Found parser {0}.{1}".format(
+ c.__module__, c.__class__.__name__))
+ available_parsers.append(parser_obj)
# Check module structure
if len(available_parsers) == 1:
- logger.debug("Parser module {0} at {1} passed validity checks".format(parser_module.__name__, parser_module.__file__))
+ logger.debug("Parser module {0} at {1} passed validity checks"
+ .format(parser_module.__name__, parser_module.__file__))
return available_parsers[0]
elif len(available_parsers) == 0:
- logger.warning("No valid classes in {0} at {1}".format(parser_module.__name__, parser_module.__file__))
+ logger.warning("No valid classes in {0} at {1}".
+ format(parser_module.__name__, parser_module.__file__))
return None
elif len(available_parsers) > 1:
- logger.warning("Found multiple valid parser classes in {0} at {1} - ignoring this module".format(parser_module.__name__, parser_module.__file__))
+ logger.warning("Found multiple valid parser classes in {0} at {1} "
+ "- ignoring this module"
+ .format(parser_module.__name__, parser_module.__file__))
return None
def load_pkg(self):
non-deprecated parser classes from self.pkg using importlib.
"""
- available_parsers = [name for _, name, _ in iter_modules([dirname(importlib.import_module(self.pkg).__file__)])]
+ available_parsers = [name for _, name, _ in iter_modules(
+ [dirname(importlib.import_module(self.pkg).__file__)])]
for parser_name in available_parsers:
- parser_module = importlib.import_module("logparse.parsers." + parser_name)
+ parser_module = importlib.import_module(
+ "logparse.parsers." + parser_name)
parser_class = self._validate_module(parser_module)
if parser_class == None:
continue
- parser_obj = parser_class
- if parser_obj.deprecated:
- logger.debug("Ignoring parser {0} because it is deprecated".format(parser_class.__class__.__name__))
+ if parser_class.deprecated:
+ logger.debug("Ignoring parser {0} because it is deprecated"
+ .format(parser_class.__class__.__name__))
continue
self.parsers.append(parser_class)
return self.parsers
# -*- coding: utf-8 -*-
+
#
# systemctl.py
#
"""
from datetime import datetime, timedelta
+import copy
import ipaddress
import logging
import os
domain = socket.getfqdn().split('.', 1)
if len(domain) != 2:
- logger.warning("Could not get domain of this server, only hostname. Please consider updating the hostname file at {0}".format(config.prefs.get("logparse", "hostname-path")))
+ logger.warning("Could not get domain of this server, only hostname. "
+ "Please consider updating the hostname file at {0}".format(
+ config.prefs.get("logparse", "hostname-path")))
return 'localdomain'
else:
return domain[-1]
return 1
else:
if (os.path.isfile(path) is False):
- logger.error("Log at {0} was requested but does not exist".format(path))
+ logger.error("Log at {0} was requested but does not exist"
+ .format(path))
return ''
else:
try:
return open(path, mode).read()
except IOError or OSError as e:
- logger.warning("Error reading log at {0}: {1}".format(path, e.strerror))
+ logger.warning("Error reading log at {0}: {1}"
+ .format(path, e.strerror))
return 1
class LogPeriod:
def __init__(self, section):
if config.prefs.get(section.split("_")[0], "period"):
- self.startdate = datetime.now() - timeparse(config.prefs.get(section.split("_")[0], "period"))
- logger.debug("Parsing logs for {0} since {1}".format(section, self.startdate.strftime(formatting.DATEFMT + " " + formatting.TIMEFMT)))
+ self.startdate = datetime.now() \
+ - timeparse(config.prefs.get(section.split("_")[0], "period"))
+ logger.debug("Parsing logs for {0} since {1}".format(
+ section, self.startdate.strftime(formatting.DATEFMT
+ + " " + formatting.TIMEFMT)))
self.unique = True
else:
- self.startdate = datetime.now() - timeparse(config.prefs.get("logparse", "period"))
+ self.startdate = datetime.now() \
+ - timeparse(config.prefs.get("logparse", "period"))
self.unique = False
setuptools.setup(
name='logparse', # https://packaging.python.org/specifications/core-metadata/#name
- version=logparse.__version__, # https://www.python.org/dev/peps/pep-0440/ https://packaging.python.org/en/latest/single_source_version.html
+ version=__version__, # https://www.python.org/dev/peps/pep-0440/ https://packaging.python.org/en/latest/single_source_version.html
description='Summarise server logs',
long_description_content_type='text/markdown',
url='https://git.lorimer.id.au/logparse.git',