1#
2# temperature.py
3#
4# Find current temperature of various system components (CPU, motherboard,
5# hard drives, ambient). Detection of motherboard-based temperatures (CPU
6# etc) uses the pysensors library, and produces a similar output to
7# lmsensors. HDD temperatures are obtained from the hddtemp daemon
8# (http://www.guzu.net/linux/hddtemp.php) which was orphaned since 2007. For
9# hddtemp to work, it must be started in daemon mode, either manually or with
10# a unit file. Manually, it would be started like this:
11#
12# sudo hddtemp -d /dev/sda /dev/sdb ... /dev/sdX
13#
14
15import re
16import sensors
17import socket, sys
18from telnetlib import Telnet
19from typing import List, Dict, NamedTuple
20
21from ..formatting import *
22from ..util import readlog, resolve
23from ..config import *
24
25import logging
26logger = logging.getLogger(__name__)
27
28class Drive(NamedTuple):
29 path: str
30 model: str
31 temperature: int
32 units: str
33
34class HddtempClient:
35
36 def __init__(self, host: str='127.0.0.1', port: int=7634, timeout: int=10, sep: str='|') -> None:
37 self.host = host
38 self.port = port
39 self.timeout = timeout
40 self.sep = sep
41
42 def _parse_drive(self, drive: str) -> Drive:
43 try:
44 drive_data = drive.split(self.sep)
45 return Drive(drive_data[0], drive_data[1], int(drive_data[2]), drive_data[3])
46 except Exception as e:
47 logger.warning("Error processing drive: {0}".format(str(drive_data)))
48 return None
49
50 def _parse(self, data: str) -> List[Drive]:
51 line = data.lstrip(self.sep).rstrip(self.sep) # Remove first/last
52 drives = line.split(self.sep * 2)
53 parsed_drives = []
54 for drive in drives:
55 parsed_drive = self._parse_drive(drive)
56 if parsed_drive != None:
57 parsed_drives.append(parsed_drive)
58
59# return [self._parse_drive(drive) for drive in drives if drive != None]
60# return list(filter(lambda drive: self._parse_drive(drive), drives))
61 return parsed_drives
62
63 def get_drives(self) -> List[Drive]: # Obtain data from telnet server
64 try:
65 with Telnet(self.host, self.port, timeout=self.timeout) as tn:
66 data = tn.read_all()
67 return self._parse(data.decode('ascii')) # Return parsed data
68 except Exception as e:
69 logger.warning("Couldn't read data from {0}:{1} - {2}".format(self.host, self.port, str(e)))
70 return 1
71
72
73def parse_log():
74 logger.debug("Starting temp section")
75 output = writetitle("temperatures")
76 output += opentag('div', 1, 'temp', 'section')
77
78 # cpu temp
79
80 sensors.init()
81 coretemps = []
82 pkgtemp = 0
83 systemp = 0
84 try:
85 for chip in sensors.iter_detected_chips():
86 for feature in chip:
87 if "Core" in feature.label:
88 coretemps.append([feature.label, feature.get_value()])
89 logger.debug("found core " + feature.label + " at temp " + str(feature.get_value()))
90 if "CPUTIN" in feature.label:
91 pkgtemp = str(feature.get_value())
92 logger.debug("found cpu package at temperature " + pkgtemp)
93 if "SYS" in feature.label:
94 systemp = feature.get_value()
95 logger.debug("found sys input " + feature.label + " at temp " + str(feature.get_value()))
96 logger.debug("Core temp data is: " + str(coretemps))
97# core_avg = reduce(lambda x, y: x[1] + y[1], coretemps) / len(coretemps)
98 core_avg = sum(core[1] for core in coretemps) / len(coretemps)
99 logger.debug("average cpu temp is " + str(core_avg))
100 coretemps.append(["avg", str(core_avg)])
101 coretemps.append(["pkg", pkgtemp])
102 coretemps = [x[0] + ": " + str(x[1]) + DEG + CEL for x in coretemps]
103 finally:
104 sensors.cleanup()
105
106 if (systemp != 0):
107 output += writedata("sys: " + str(systemp) + DEG)
108 if (coretemps != ''):
109 output += writedata("cores", coretemps)
110
111 logger.info("Finished reading onboard temperatures")
112
113 # drive temp
114
115 # For this to work, `hddtemp` must be running in daemon mode.
116 # Start it like this (bash): sudo hddtemp -d /dev/sda /dev/sdX...
117
118 received = ''
119 sumtemp = 0.0
120 data = ""
121 fields = []
122
123 client = HddtempClient(host=config.prefs['hddtemp']['host'], port=int(config.prefs['hddtemp']['port']), sep=config.prefs['hddtemp']['separator'], timeout=int(config.prefs['hddtemp']['timeout']))
124 drives = client.get_drives()
125 logger.debug("Received drive info: " + str(drives))
126 for drive in sorted(drives, key=lambda x: x.path):
127 if drive.path in config.prefs['hddtemp']['drives']:
128 sumtemp += drive.temperature
129 fields.append(("{0} ({1})".format(drive.path, drive.model) if config.prefs['hddtemp']['show-model'] else drive.path) + ": {0}{1}{2}".format(drive.temperature, DEG, drive.units))
130 else:
131 drives.remove(drive)
132 logger.debug("Ignoring drive {0} ({1})due to config".format(drive.path, drive.model))
133 logger.debug("Sorted drive info: " + str(drives))
134
135 hddavg = '{0:.1f}{1}{2}'.format(sumtemp/len(drives), DEG, drives[0].units) # use units of first drive
136 logger.debug("Sum of temperatures: {}; Number of drives: {}; => Avg disk temp is {}".format(str(sumtemp), str(len(drives)), hddavg))
137 fields.append("avg: " + str(hddavg))
138
139 if (config.prefs['hddtemp']['drives'] != ''):
140 output += writedata("disks", fields)
141 logger.info("Finished processing drive temperatures")
142
143
144 output += closetag('div', 1)
145 logger.info("Finished temp section")
146 return output