dfebb9ca76c3efdec7f6a8bd44b3e5d4857346bb
1#
2# temperature.py
3#
4# Find current temperature of various system components (CPU, motherboard,
5# hard drives, ambient). Detection of motherboard-based temperatures (CPU
6# etc) uses the pysensors library, and produces a similar output to
7# lmsensors. HDD temperatures are obtained from the hddtemp daemon
8# (http://www.guzu.net/linux/hddtemp.php) which was orphaned since 2007. For
9# hddtemp to work, it must be started in daemon mode, either manually or with
10# a unit file. Manually, it would be started like this:
11#
12# sudo hddtemp -d /dev/sda /dev/sdb ... /dev/sdX
13#
14
15import re
16import sensors
17import socket, sys
18from telnetlib import Telnet
19from typing import List, Dict, NamedTuple
20
21from ..formatting import *
22from ..util import readlog, resolve
23from ..config import *
24
25import logging
26logger = logging.getLogger(__name__)
27
28class Drive(NamedTuple):
29 path: str
30 model: str
31 temperature: int
32 units: str
33
34class HddtempClient:
35
36 def __init__(self, host: str='127.0.0.1', port: int=7634, timeout: int=10, sep: str='|') -> None:
37 self.host = host
38 self.port = port
39 self.timeout = timeout
40 self.sep = sep
41
42 def _parse_drive(self, drive: str) -> Drive:
43 try:
44 drive_data = drive.split(self.sep)
45 return Drive(drive_data[0], drive_data[1], int(drive_data[2]), drive_data[3])
46 except Exception as e:
47 logger.warning("Error processing drive: {0}".format(str(drive_data)))
48 return None
49
50 def _parse(self, data: str) -> List[Drive]:
51 line = data.lstrip(self.sep).rstrip(self.sep) # Remove first/last
52 drives = line.split(self.sep * 2)
53 parsed_drives = []
54 for drive in drives:
55 parsed_drive = self._parse_drive(drive)
56 if parsed_drive != None:
57 parsed_drives.append(parsed_drive)
58
59# return [self._parse_drive(drive) for drive in drives if drive != None]
60# return list(filter(lambda drive: self._parse_drive(drive), drives))
61 return parsed_drives
62
63 def get_drives(self) -> List[Drive]: # Obtain data from telnet server
64 try:
65 with Telnet(self.host, self.port, timeout=self.timeout) as tn:
66 raw_data = tn.read_all()
67 return self._parse(raw_data.decode('ascii')) # Return parsed data
68 except Exception as e:
69 logger.warning("Couldn't read data from {0}:{1} - {2}".format(self.host, self.port, str(e)))
70 return 1
71
72
73def parse_log():
74
75 logger.debug("Starting temp section")
76 section = Section("temperatures")
77
78 sensors.init()
79
80 coretemps = []
81 pkgtemp = 0
82 systemp = 0
83
84 systemp_data = Data("Sys")
85 coretemp_data = Data("Cores")
86 pkgtemp_data = Data("Processor")
87
88 try:
89
90 for chip in sensors.iter_detected_chips():
91 for feature in chip:
92 if "Core" in feature.label:
93 coretemp_data.items.append([feature.label, feature.get_value()])
94 logger.debug("Found core " + feature.label + " at temp " + str(feature.get_value()))
95 if "CPUTIN" in feature.label:
96 pkgtem_data.items.append([feature.label, str(feature.get_value())])
97 logger.debug("Found CPU package at temp" + str(feature.get_value()))
98 if "SYS" in feature.label:
99 systemp_data.items.append([feature.label, str(feature.get_value())])
100 logger.debug("Found sys input " + feature.label + " at temp " + str(feature.get_value()))
101
102 for temp_data in [systemp_data, coretemp_data, pkgtemp_data]:
103 if len(temp_data.items) > 1:
104 avg = sum(feature[1] for feature in temp_data.items) / len(temp_data.items)
105 logger.debug("Avg temp for {0} is {1} {2}{3}".format(temp_data.subtitle, str(avg), DEG, CEL))
106 temp_data.subtitle += " (avg {0}{1}{2}):".format(str(avg), DEG, CEL)
107 temp_data.items = ["{0}: {1}{2}{3}".format(feature[0], feature[1], DEG, CEL) for feature in temp_data]
108 else:
109 temp_data.items = temp_data[0][1] + DEG + CEL
110 section.append_data(temp_data)
111
112 finally:
113 logger.info("Finished reading onboard temperatures")
114 sensors.cleanup()
115
116
117 # drive temp
118
119 # For this to work, `hddtemp` must be running in daemon mode.
120 # Start it like this (bash): sudo hddtemp -d /dev/sda /dev/sdX...
121
122 received = ''
123 sumtemp = 0.0
124 data = ""
125 hddtemp_data = Data("Disks")
126
127 client = HddtempClient(host=config.prefs['hddtemp']['host'], port=int(config.prefs['hddtemp']['port']), sep=config.prefs['hddtemp']['separator'], timeout=int(config.prefs['hddtemp']['timeout']))
128 drives = client.get_drives()
129 logger.debug("Received drive info: " + str(drives))
130
131 for drive in sorted(drives, key=lambda x: x.path):
132 if drive.path in config.prefs['hddtemp']['drives']:
133 sumtemp += drive.temperature
134 hddtemp_data.items.append(("{0} ({1})".format(drive.path, drive.model) if config.prefs['hddtemp']['show-model'] else drive.path) + ": {0}{1}{2}".format(drive.temperature, DEG, drive.units))
135 else:
136 drives.remove(drive)
137 logger.debug("Ignoring drive {0} ({1}) due to config".format(drive.path, drive.model))
138 logger.debug("Sorted drive info: " + str(drives))
139
140 hddavg = '{0:.1f}{1}{2}'.format(sumtemp/len(drives), DEG, drives[0].units) # use units of first drive
141 logger.debug("Sum of temperatures: {}; Number of drives: {}; => Avg disk temp is {}".format(str(sumtemp), str(len(drives)), hddavg))
142 hddtemp_data.subtitle += " (avg {0}{1}{2})".format(str(hddavg), DEG, CEL)
143
144 logger.info("Finished processing drive temperatures")
145
146 logger.info("Finished temp section")
147 return section