-#
-# sshd.py
-#
-# Find number of ssh logins and authorised users (uses /var/log/auth.log)
-#
-# NOTE: This file is now deprecated in favour of the newer journald mechanism
-# used in sshd-journald.py. This parser is still functional but is slower and
-# has less features. Please switch over if possible.
-#
+"""
+Find number of ssh logins and authorised users (uses /var/log/auth.log)
+NOTE: This file is now deprecated in favour of the newer journald mechanism
+used in sshd-journald.py. This parser is still functional but is slower and
+has less features. Please switch over if possible.
+"""
import re
def __init__(self):
super().__init__()
self.name = "sshd"
- self.info = "Find number of ssh logins and authorised users (uses /var/log/auth.log)"
+ self.info = "Find number of ssh logins and authorised users "
+ "(uses /var/log/auth.log)"
self.deprecated = True
+ self.successor = "sshd_journald"
def parse_log(self):
- logger.warning("NOTE: This sshd parser is now deprecated. Please use sshd-journald if possible.")
-
+ logger.warning("NOTE: This sshd parser is now deprecated. "
+ "Please use sshd-journald if possible.")
logger.debug("Starting sshd section")
section = Section("ssh")
- logger.debug("Searching for matches in {0}".format(config.prefs.get("logs", "auth")))
- matches = re.findall('.*sshd.*Accepted publickey for .* from .*', readlog(config.prefs.get("logs", "auth"))) # get all logins
- logger.debug("Finished searching for logins")
- logger.debug("Searching for matches in {0}".format(config.prefs.get("logs", "auth")))
+ logger.debug("Searching for matches in {0}".format(
+ config.prefs.get("logs", "auth")))
authlog = readlog(config.prefs.get("logs", "auth"))
-
- matches = re.findall('.*sshd.*Accepted publickey for .* from .*', authlog) # get all logins
+ matches = re.findall('.*sshd.*Accepted publickey for .* from .*',
+ authlog) # get all logins
invalid_matches = re.findall(".*sshd.*Invalid user .* from .*", authlog)
- root_matches = re.findall("Disconnected from authenticating user root", authlog)
+ root_matches = re.findall("Disconnected from authenticating user root",
+ authlog)
logger.debug("Finished searching for logins")
- users = [] # list of users with format [username, number of logins] for each item
+ users = [] # list of users with format [username, number of logins]
+ # for each item
data = []
num = len(matches) # total number of logins
+
for match in matches:
- entry = re.search('^.*publickey\sfor\s(\w*)\sfrom\s(\S*)', match) # [('user', 'ip')]
+
+ # [('user', 'ip')]
+ entry = re.search('^.*publickey\sfor\s(\w*)\sfrom\s(\S*)', match)
user = entry.group(1)
ip = entry.group(2)
- userhost = user + '@' + resolve(ip, fqdn=config.prefs.get("sshd", "sshd-resolve-domains"))
+ userhost = user + '@' + resolve(ip,
+ fqdn=config.prefs.get("sshd", "sshd-resolve-domains"))
users.append(userhost)
+
logger.debug("Parsed list of authorised users")
+ # Format authorised users
auth_data = Data(subtitle=plural('login', num) + ' from', items=users)
-
- if (len(auth_data.items) == 1): # if only one user, do not display no of logins for this user
- logger.debug("found " + str(len(matches)) + " ssh logins for user " + users[0])
- auth_data.subtitle += ' ' + auth_data.items[0]
+ if (len(auth_data.items) == 1):
+ # If only one user, do not display no of logins for this user
+ logger.debug("Found {0} logins for user {1}".format(
auth_data.orderbyfreq()
auth_data.truncl(config.prefs.getint("logparse", "maxlist"))
- logger.debug("Found " + str(len(matches)) + " ssh logins for users " + str(data))
+ logger.debug("Found {0} logins for {1} users".format(
+ len(matches), len(users)))
section.append_data(auth_data)
+ # Format invalid users
invalid_users = []
for match in invalid_matches:
- entry = re.search('^.*Invalid user (\S+) from (\S+).*', match) # [('user', 'ip')]
-
+ # [('user', 'ip')]
+ entry = re.search('^.*Invalid user (\S+) from (\S+).*', match)
try:
user = entry.group(1)
ip = entry.group(2)
userhost = user + '@' + ip
invalid_users.append(userhost)
+
logger.debug("Parsed list of invalid users")
- invalid_data = Data(subtitle=plural("attempted login", len(invalid_matches)) + " from " + plural("invalid user", len(invalid_users), print_quantity=False), items=invalid_users)
- if (len(invalid_data.items) == 1): # if only one user, do not display no of logins for this user
- logger.debug("Found " + str(len(invalid_matches)) + " SSH login attempts for invalid user " + invalid_users[0])
+
+ invalid_data = Data(subtitle=plural("attempted login",
+ len(invalid_matches)) + " from "
+ + plural("invalid user", len(invalid_users), print_quantity=False),
+ items=invalid_users)
+ if (len(invalid_data.items) == 1):
+ # If only one user, do not display no of logins for this user
+ logger.debug("Found {0} login attempts for user {1}"
+ .format(len(invalid_matches), invalid_data.items[0]))
invalid_data.subtitle += ' ' + invalid_data.items[0]
invalid_data.orderbyfreq()
invalid_data.truncl(config.prefs.get("logparse", "maxlist"))
- logger.debug("Found " + str(len(invalid_matches)) + " SSH login attempts for invalid users " + str(data))
+ logger.debug("Found {0} login attempts for invalid users"
+ .format(len(invalid_matches)))
section.append_data(invalid_data)
- logger.debug("Found {0} attempted logins for root".format(str(len(root_matches))))
+ logger.debug("Found {0} attempted logins for root".
+ format(str(len(root_matches))))
- section.append_data(Data(subtitle=plural("attempted login", str(len(root_matches))) + " for root"))
+ section.append_data(Data(subtitle=plural("attempted login",
+ str(len(root_matches))) + " for root"))
logger.info("Finished sshd section")
return section