afc652b4fe00eba694f5127de062a43845922d8e
1#
2# temperature.py
3#
4# Find current temperature of various system components (CPU, motherboard,
5# hard drives, ambient). Detection of motherboard-based temperatures (CPU
6# etc) uses the pysensors library, and produces a similar output to
7# lmsensors. HDD temperatures are obtained from the hddtemp daemon
8# (http://www.guzu.net/linux/hddtemp.php) which was orphaned since 2007. For
9# hddtemp to work, it must be started in daemon mode, either manually or with
10# a unit file. Manually, it would be started like this:
11#
12# sudo hddtemp -d /dev/sda /dev/sdb ... /dev/sdX
13#
14
15import re
16import sensors
17import socket, sys
18from telnetlib import Telnet
19from typing import List, Dict, NamedTuple
20
21from logparse.formatting import *
22from logparse.util import readlog
23from logparse import config
24
25import logging
26logger = logging.getLogger(__name__)
27from logparse.load_parsers import Parser
28
29class Drive(NamedTuple):
30 path: str
31 model: str
32 temperature: int
33 units: str
34
35class HddtempClient:
36
37 def __init__(self, host: str='127.0.0.1', port: int=7634, timeout: int=10, sep: str='|') -> None:
38 self.host = host
39 self.port = port
40 self.timeout = timeout
41 self.sep = sep
42
43 def _parse_drive(self, drive: str) -> Drive:
44 try:
45 drive_data = drive.split(self.sep)
46 return Drive(drive_data[0], drive_data[1], int(drive_data[2]), drive_data[3])
47 except Exception as e:
48 logger.warning("Error processing drive: {0}".format(str(drive_data)))
49 return None
50
51 def _parse(self, data: str) -> List[Drive]:
52 line = data.lstrip(self.sep).rstrip(self.sep) # Remove first/last
53 drives = line.split(self.sep * 2)
54 parsed_drives = []
55 for drive in drives:
56 parsed_drive = self._parse_drive(drive)
57 if parsed_drive != None:
58 parsed_drives.append(parsed_drive)
59
60# return [self._parse_drive(drive) for drive in drives if drive != None]
61# return list(filter(lambda drive: self._parse_drive(drive), drives))
62 return parsed_drives
63
64 def get_drives(self) -> List[Drive]: # Obtain data from telnet server
65 try:
66 with Telnet(self.host, self.port, timeout=self.timeout) as tn:
67 raw_data = tn.read_all()
68 return self._parse(raw_data.decode('ascii')) # Return parsed data
69 except Exception as e:
70 logger.warning("Couldn't read data from {0}:{1} - {2}".format(self.host, self.port, str(e)))
71 return 1
72
73
74class Temperature(Parser):
75
76 def __init__(self):
77 super().__init__()
78 self.name = "temperature"
79 self.info = "Find current temperature of various system components (CPU, motherboard, hard drives, ambient)."
80
81 def parse_log(self):
82
83 logger.debug("Starting temp section")
84 section = Section("temperatures")
85
86 sensors.init()
87
88 systemp = Data("Sys", [])
89 coretemp = Data("Cores", [])
90 pkgtemp = Data("Processor", [])
91
92 try:
93 for chip in sensors.iter_detected_chips():
94 for feature in chip:
95 if "Core" in feature.label:
96 coretemp.items.append([feature.label, float(feature.get_value())])
97 continue
98 if "CPUTIN" in feature.label:
99 pkgtemp.items.append([feature.label, float(feature.get_value())])
100 continue
101 if "SYS" in feature.label:
102 systemp.items.append([feature.label, float(feature.get_value())])
103 continue
104
105 logger.debug("Core data is {0}".format(str(coretemp.items)))
106 logger.debug("Sys data is {0}".format(str(systemp.items)))
107 logger.debug("Pkg data is {0}".format(str(pkgtemp.items)))
108 for temp_data in [systemp, coretemp, pkgtemp]:
109 logger.debug("Looking at temp data {0}".format(str(temp_data.items)))
110 if len(temp_data.items) > 1:
111 avg = float(sum(feature[1] for feature in temp_data.items)) / len(temp_data.items)
112 logger.debug("Avg temp for {0} is {1} {2}{3}".format(temp_data.subtitle, str(avg), DEG, CEL))
113 temp_data.subtitle += " (avg {0}{1}{2})".format(str(avg), DEG, CEL)
114 temp_data.items = ["{0}: {1}{2}{3}".format(feature[0], str(feature[1]), DEG, CEL) for feature in temp_data.items]
115 else:
116 temp_data.items = [str(temp_data.items[0][1]) + DEG + CEL]
117 section.append_data(temp_data)
118
119 finally:
120 logger.debug("Finished reading onboard temperatures")
121 sensors.cleanup()
122
123
124 # drive temp
125
126 # For this to work, `hddtemp` must be running in daemon mode.
127 # Start it like this (bash): sudo hddtemp -d /dev/sda /dev/sdX...
128
129 received = ''
130 sumtemp = 0.0
131 data = ""
132 hddtemp_data = Data("Disks")
133
134 client = HddtempClient(
135 host=config.prefs.get("temperatures", "host"),
136 port=config.prefs.getint("temperatures", "port"),
137 sep=config.prefs.get("temperatures", "separator"),
138 timeout=int(config.prefs.get("temperatures", "timeout")))
139 drives = client.get_drives()
140 logger.debug("Received drive info: " + str(drives))
141
142 for drive in sorted(drives, key=lambda x: x.path):
143 if drive.path in config.prefs.get("temperatures", "drives").split():
144 sumtemp += drive.temperature
145 hddtemp_data.items.append(("{0} ({1})".format(drive.path, drive.model) if config.prefs.getboolean("temperatures", "show-model") else drive.path) + ": {0}{1}{2}".format(drive.temperature, DEG, drive.units))
146 else:
147 drives.remove(drive)
148 logger.debug("Ignoring drive {0} ({1}) due to config".format(drive.path, drive.model))
149 logger.debug("Sorted drive info: " + str(drives))
150
151 hddavg = '{0:.1f}{1}{2}'.format(sumtemp/len(drives), DEG, drives[0].units) # use units of first drive
152 logger.debug("Sum of temperatures: {}; Number of drives: {}; => Avg disk temp is {}".format(str(sumtemp), str(len(drives)), hddavg))
153 hddtemp_data.subtitle += " (avg {0}{1}{2})".format(str(hddavg), DEG, CEL)
154
155 logger.debug("Finished processing drive temperatures")
156 logger.info("Finished temp section")
157
158 return section