""" Find current temperature of various system components (CPU, motherboard, hard drives, ambient). Detection of motherboard-based temperatures (CPU etc) uses the pysensors library, and produces a similar output to lmsensors. HDD temperatures are obtained from the hddtemp daemon which was orphaned since 2007. For hddtemp to work, it must be started in daemon mode, either manually or with a unit file. Manually, it would be started like this: `sudo hddtemp -d /dev/sda /dev/sdb ... /dev/sdX` """ import re import sensors import socket, sys from telnetlib import Telnet from typing import List, Dict, NamedTuple from logparse.formatting import * from logparse.util import readlog from logparse import config import logging logger = logging.getLogger(__name__) from logparse.load_parsers import Parser class Drive(NamedTuple): path: str model: str temperature: int units: str class HddtempClient: def __init__(self, host: str='127.0.0.1', port: int=7634, timeout: int=10, sep: str='|') -> None: self.host = host self.port = port self.timeout = timeout self.sep = sep def _parse_drive(self, drive: str) -> Drive: try: drive_data = drive.split(self.sep) return Drive(drive_data[0], drive_data[1], int(drive_data[2]), drive_data[3]) except Exception as e: logger.warning("Error processing drive: " + str(drive_data)) return None def _parse(self, data: str) -> List[Drive]: line = data.lstrip(self.sep).rstrip(self.sep) # Remove first/last drives = line.split(self.sep * 2) parsed_drives = [] for drive in drives: parsed_drive = self._parse_drive(drive) if parsed_drive != None: parsed_drives.append(parsed_drive) return parsed_drives def get_drives(self) -> List[Drive]: # Obtain data from telnet server try: with Telnet(self.host, self.port, timeout=self.timeout) as tn: raw_data = tn.read_all() return self._parse(raw_data.decode('ascii')) # Return parsed data except Exception as e: logger.warning("Couldn't read data from {0}:{1} - {2}".format( self.host, self.port, str(e))) return 1 class Temperature(Parser): def __init__(self): super().__init__() self.name = "temperature" self.info = "Find current temperature of various system components " "(CPU, motherboard, hard drives, ambient)." def parse_log(self): logger.debug("Starting temp section") section = Section("temperatures") sensors.init() systemp = Data("Sys", []) coretemp = Data("Cores", []) pkgtemp = Data("Processor", []) try: for chip in sensors.iter_detected_chips(): for feature in chip: if "Core" in feature.label: coretemp.items.append([feature.label, float(feature.get_value())]) continue if "CPUTIN" in feature.label: pkgtemp.items.append([feature.label, float(feature.get_value())]) continue if "SYS" in feature.label: systemp.items.append([feature.label, float(feature.get_value())]) continue logger.debug("Core data is {0}".format(str(coretemp.items))) logger.debug("Sys data is {0}".format(str(systemp.items))) logger.debug("Pkg data is {0}".format(str(pkgtemp.items))) for temp_data in [systemp, coretemp, pkgtemp]: logger.debug("Looking at temp data {0}".format( temp_data.items)) if len(temp_data.items) > 1: avg = (float(sum(feature[1] for feature in temp_data.items)) / len(temp_data.items)) logger.debug("Avg temp for {0} is {1} {2}{3}".format( temp_data.subtitle, avg, DEG, CEL)) temp_data.subtitle += " (avg {0}{1}{2})".format( avg, DEG, CEL) temp_data.items = ["{0}: {1}{2}{3}".format( feature[0], feature[1], DEG, CEL) for feature in temp_data.items] else: temp_data.items = [str(temp_data.items[0][1]) + DEG + CEL] section.append_data(temp_data) finally: logger.debug("Finished reading onboard temperatures") sensors.cleanup() # drive temp # For this to work, `hddtemp` must be running in daemon mode. # Start it like this (bash): sudo hddtemp -d /dev/sda /dev/sdX... received = '' sumtemp = 0.0 data = "" hddtemp_data = Data("Disks") client = HddtempClient( host=config.prefs.get("temperatures", "host"), port=config.prefs.getint("temperatures", "port"), sep=config.prefs.get("temperatures", "separator"), timeout=int(config.prefs.get("temperatures", "timeout"))) drives = client.get_drives() logger.debug("Received drive info: " + str(drives)) for drive in sorted(drives, key=lambda x: x.path): if drive.path in config.prefs.get("temperatures", "drives").split(): sumtemp += drive.temperature hddtemp_data.items.append(("{0} ({1})".format( drive.path, drive.model) if config.prefs.getboolean("temperatures", "show-model") else drive.path) + ": {0}{1}{2}".format( drive.temperature, DEG, drive.units)) else: drives.remove(drive) logger.debug("Ignoring drive {0} ({1}) due to config".format( drive.path, drive.model)) logger.debug("Sorted drive info: " + str(drives)) if not len(drives) == 0: # use units of first drive hddavg = '{0:.1f}{1}{2}'.format( sumtemp/len(drives), DEG, drives[0].units) logger.debug("Sum of temperatures: {}; Number of drives: {}; " "=> Avg disk temp is {}".format(sumtemp, len(drives), hddavg)) hddtemp_data.subtitle += " (avg {0}{1}{2})".format(hddavg, DEG, CEL) section.append_data(hddtemp_data) logger.debug("Finished processing drive temperatures") logger.info("Finished temp section") return section