# # temperature.py # # Find current temperature of various system components (CPU, motherboard, # hard drives, ambient). Detection of motherboard-based temperatures (CPU # etc) uses the pysensors library, and produces a similar output to # lmsensors. HDD temperatures are obtained from the hddtemp daemon # (http://www.guzu.net/linux/hddtemp.php) which was orphaned since 2007. For # hddtemp to work, it must be started in daemon mode, either manually or with # a unit file. Manually, it would be started like this: # # sudo hddtemp -d /dev/sda /dev/sdb ... /dev/sdX # import re import sensors import socket, sys from telnetlib import Telnet from typing import List, Dict, NamedTuple from logparse import formatting from ..formatting import * from ..util import readlog, resolve from ..config import * import logging logger = logging.getLogger(__name__) class Drive(NamedTuple): path: str model: str temperature: int units: str class HddtempClient: def __init__(self, host: str='127.0.0.1', port: int=7634, timeout: int=10, sep: str='|') -> None: self.host = host self.port = port self.timeout = timeout self.sep = sep def _parse_drive(self, drive: str) -> Drive: try: drive_data = drive.split(self.sep) return Drive(drive_data[0], drive_data[1], int(drive_data[2]), drive_data[3]) except Exception as e: logger.warning("Error processing drive: {0}".format(str(drive_data))) return None def _parse(self, data: str) -> List[Drive]: line = data.lstrip(self.sep).rstrip(self.sep) # Remove first/last drives = line.split(self.sep * 2) parsed_drives = [] for drive in drives: parsed_drive = self._parse_drive(drive) if parsed_drive != None: parsed_drives.append(parsed_drive) # return [self._parse_drive(drive) for drive in drives if drive != None] # return list(filter(lambda drive: self._parse_drive(drive), drives)) return parsed_drives def get_drives(self) -> List[Drive]: # Obtain data from telnet server try: with Telnet(self.host, self.port, timeout=self.timeout) as tn: raw_data = tn.read_all() return self._parse(raw_data.decode('ascii')) # Return parsed data except Exception as e: logger.warning("Couldn't read data from {0}:{1} - {2}".format(self.host, self.port, str(e))) return 1 def parse_log(): logger.debug("Starting temp section") section = Section("temperatures") sensors.init() systemp = Data("Sys", []) coretemp = Data("Cores", []) pkgtemp = Data("Processor", []) try: for chip in sensors.iter_detected_chips(): for feature in chip: if "Core" in feature.label: coretemp.items.append([feature.label, float(feature.get_value())]) continue if "CPUTIN" in feature.label: pkgtemp.items.append([feature.label, float(feature.get_value())]) continue if "SYS" in feature.label: systemp.items.append([feature.label, float(feature.get_value())]) continue logger.debug("Core data is {0}".format(str(coretemp.items))) logger.debug("Sys data is {0}".format(str(systemp.items))) logger.debug("Pkg data is {0}".format(str(pkgtemp.items))) for temp_data in [systemp, coretemp, pkgtemp]: logger.debug("Looking at temp data {0}".format(str(temp_data.items))) if len(temp_data.items) > 1: avg = float(sum(feature[1] for feature in temp_data.items)) / len(temp_data.items) logger.debug("Avg temp for {0} is {1} {2}{3}".format(temp_data.subtitle, str(avg), DEG, CEL)) temp_data.subtitle += " (avg {0}{1}{2})".format(str(avg), DEG, CEL) temp_data.items = ["{0}: {1}{2}{3}".format(feature[0], str(feature[1]), DEG, CEL) for feature in temp_data.items] else: temp_data.items = [str(temp_data.items[0][1]) + DEG + CEL] section.append_data(temp_data) finally: logger.debug("Finished reading onboard temperatures") sensors.cleanup() # drive temp # For this to work, `hddtemp` must be running in daemon mode. # Start it like this (bash): sudo hddtemp -d /dev/sda /dev/sdX... received = '' sumtemp = 0.0 data = "" hddtemp_data = Data("Disks") client = HddtempClient(host=config.prefs['hddtemp']['host'], port=int(config.prefs['hddtemp']['port']), sep=config.prefs['hddtemp']['separator'], timeout=int(config.prefs['hddtemp']['timeout'])) drives = client.get_drives() logger.debug("Received drive info: " + str(drives)) for drive in sorted(drives, key=lambda x: x.path): if drive.path in config.prefs['hddtemp']['drives']: sumtemp += drive.temperature hddtemp_data.items.append(("{0} ({1})".format(drive.path, drive.model) if config.prefs['hddtemp']['show-model'] else drive.path) + ": {0}{1}{2}".format(drive.temperature, DEG, drive.units)) else: drives.remove(drive) logger.debug("Ignoring drive {0} ({1}) due to config".format(drive.path, drive.model)) logger.debug("Sorted drive info: " + str(drives)) hddavg = '{0:.1f}{1}{2}'.format(sumtemp/len(drives), DEG, drives[0].units) # use units of first drive logger.debug("Sum of temperatures: {}; Number of drives: {}; => Avg disk temp is {}".format(str(sumtemp), str(len(drives)), hddavg)) hddtemp_data.subtitle += " (avg {0}{1}{2})".format(str(hddavg), DEG, CEL) logger.debug("Finished processing drive temperatures") logger.info("Finished temp section") return section