Eric Callahan 9b6161a6b0 proc_stats: add stat callback registration
This allows other components to be register callbacks that will
be executed in the stat update timer. This is useful for methods
that wish to poll subprocess commands, as its desireable to
prevent multiple subprocesses from running simultaneously.

Signed-off-by: Eric Callahan <arksine.code@gmail.com>
2022-01-28 18:00:21 -05:00

310 lines
12 KiB
Python

# Moonraker Process Stat Tracking
#
# Copyright (C) 2021 Eric Callahan <arksine.code@gmail.com>
#
# This file may be distributed under the terms of the GNU GPLv3 license.
from __future__ import annotations
import asyncio
import time
import re
import os
import pathlib
import logging
from collections import deque
# Annotation imports
from typing import (
TYPE_CHECKING,
Awaitable,
Callable,
Deque,
Any,
List,
Tuple,
Optional,
Dict,
)
if TYPE_CHECKING:
from confighelper import ConfigHelper
from websockets import WebRequest, WebsocketManager
from . import shell_command
STAT_CALLBACK = Callable[[int], Optional[Awaitable]]
VC_GEN_CMD_FILE = "/usr/bin/vcgencmd"
STATM_FILE_PATH = "/proc/self/smaps_rollup"
NET_DEV_PATH = "/proc/net/dev"
TEMPERATURE_PATH = "/sys/class/thermal/thermal_zone0/temp"
CPU_STAT_PATH = "/proc/stat"
STAT_UPDATE_TIME = 1.
REPORT_QUEUE_SIZE = 30
THROTTLE_CHECK_INTERVAL = 10
WATCHDOG_REFRESH_TIME = 2.
REPORT_BLOCKED_TIME = 4.
THROTTLED_FLAGS = {
1: "Under-Voltage Detected",
1 << 1: "Frequency Capped",
1 << 2: "Currently Throttled",
1 << 3: "Temperature Limit Active",
1 << 16: "Previously Under-Volted",
1 << 17: "Previously Frequency Capped",
1 << 18: "Previously Throttled",
1 << 19: "Previously Temperature Limited"
}
class ProcStats:
def __init__(self, config: ConfigHelper) -> None:
self.server = config.get_server()
self.event_loop = self.server.get_event_loop()
self.watchdog = Watchdog(self)
self.stat_update_timer = self.event_loop.register_timer(
self._handle_stat_update)
self.vcgencmd: Optional[shell_command.ShellCommand] = None
if os.path.exists(VC_GEN_CMD_FILE):
logging.info("Detected 'vcgencmd', throttle checking enabled")
shell_cmd: shell_command.ShellCommandFactory
shell_cmd = self.server.load_component(config, "shell_command")
self.vcgencmd = shell_cmd.build_shell_command(
"vcgencmd get_throttled")
self.server.register_notification("proc_stats:cpu_throttled")
else:
logging.info("Unable to find 'vcgencmd', throttle checking "
"disabled")
self.temp_file = pathlib.Path(TEMPERATURE_PATH)
self.smaps = pathlib.Path(STATM_FILE_PATH)
self.netdev_file = pathlib.Path(NET_DEV_PATH)
self.cpu_stats_file = pathlib.Path(CPU_STAT_PATH)
self.server.register_endpoint(
"/machine/proc_stats", ["GET"], self._handle_stat_request)
self.server.register_event_handler(
"server:klippy_shutdown", self._handle_shutdown)
self.server.register_notification("proc_stats:proc_stat_update")
self.proc_stat_queue: Deque[Dict[str, Any]] = deque(maxlen=30)
self.last_update_time = time.time()
self.last_proc_time = time.process_time()
self.throttle_check_lock = asyncio.Lock()
self.total_throttled: int = 0
self.last_throttled: int = 0
self.update_sequence: int = 0
self.last_net_stats: Dict[str, Dict[str, Any]] = {}
self.last_cpu_stats: Dict[str, Tuple[int, int]] = {}
self.cpu_usage: Dict[str, float] = {}
self.stat_callbacks: List[STAT_CALLBACK] = []
self.stat_update_timer.start()
self.watchdog.start()
def register_stat_callback(self, callback: STAT_CALLBACK) -> None:
self.stat_callbacks.append(callback)
async def _handle_stat_request(self,
web_request: WebRequest
) -> Dict[str, Any]:
ts: Optional[Dict[str, Any]] = None
if self.vcgencmd is not None:
ts = await self._check_throttled_state()
cpu_temp = await self.event_loop.run_in_thread(
self._get_cpu_temperature)
wsm: WebsocketManager = self.server.lookup_component("websockets")
websocket_count = wsm.get_count()
return {
'moonraker_stats': list(self.proc_stat_queue),
'throttled_state': ts,
'cpu_temp': cpu_temp,
'network': self.last_net_stats,
'system_cpu_usage': self.cpu_usage,
'websocket_connections': websocket_count
}
async def _handle_shutdown(self) -> None:
msg = "\nMoonraker System Usage Statistics:"
for stats in self.proc_stat_queue:
msg += f"\n{self._format_stats(stats)}"
cpu_temp = await self.event_loop.run_in_thread(
self._get_cpu_temperature)
msg += f"\nCPU Temperature: {cpu_temp}"
logging.info(msg)
if self.vcgencmd is not None:
ts = await self._check_throttled_state()
logging.info(f"Throttled Flags: {' '.join(ts['flags'])}")
async def _handle_stat_update(self, eventtime: float) -> float:
update_time = eventtime
proc_time = time.process_time()
time_diff = update_time - self.last_update_time
usage = round((proc_time - self.last_proc_time) / time_diff * 100, 2)
cpu_temp, mem, mem_units, net = await self.event_loop.run_in_thread(
self._read_system_files)
for dev in net:
bytes_sec = 0.
if dev in self.last_net_stats:
last_dev_stats = self.last_net_stats[dev]
cur_total: int = net[dev]['rx_bytes'] + net[dev]['tx_bytes']
last_total: int = last_dev_stats['rx_bytes'] + \
last_dev_stats['tx_bytes']
bytes_sec = round((cur_total - last_total) / time_diff, 2)
net[dev]['bandwidth'] = bytes_sec
self.last_net_stats = net
result = {
'time': time.time(),
'cpu_usage': usage,
'memory': mem,
'mem_units': mem_units
}
self.proc_stat_queue.append(result)
wsm: WebsocketManager = self.server.lookup_component("websockets")
websocket_count = wsm.get_count()
self.server.send_event("proc_stats:proc_stat_update", {
'moonraker_stats': result,
'cpu_temp': cpu_temp,
'network': net,
'system_cpu_usage': self.cpu_usage,
'websocket_connections': websocket_count
})
if not self.update_sequence % THROTTLE_CHECK_INTERVAL:
if self.vcgencmd is not None:
ts = await self._check_throttled_state()
cur_throttled = ts['bits']
if cur_throttled & ~self.total_throttled:
self.server.add_log_rollover_item(
'throttled', f"CPU Throttled Flags: {ts['flags']}")
if cur_throttled != self.last_throttled:
self.server.send_event("proc_stats:cpu_throttled", ts)
self.last_throttled = cur_throttled
self.total_throttled |= cur_throttled
for cb in self.stat_callbacks:
ret = cb(self.update_sequence)
if ret is not None:
await ret
self.last_update_time = update_time
self.last_proc_time = proc_time
self.update_sequence += 1
return eventtime + STAT_UPDATE_TIME
async def _check_throttled_state(self) -> Dict[str, Any]:
async with self.throttle_check_lock:
assert self.vcgencmd is not None
try:
resp = await self.vcgencmd.run_with_response(
timeout=.5, log_complete=False)
ts = int(resp.strip().split("=")[-1], 16)
except Exception:
return {'bits': 0, 'flags': ["?"]}
flags = []
for flag, desc in THROTTLED_FLAGS.items():
if flag & ts:
flags.append(desc)
return {'bits': ts, 'flags': flags}
def _read_system_files(self) -> Tuple:
mem, units = self._get_memory_usage()
temp = self._get_cpu_temperature()
net_stats = self._get_net_stats()
self._update_cpu_stats()
return temp, mem, units, net_stats
def _get_memory_usage(self) -> Tuple[Optional[int], Optional[str]]:
try:
mem_data = self.smaps.read_text()
rss_match = re.search(r"Rss:\s+(\d+)\s+(\w+)", mem_data)
if rss_match is None:
return None, None
mem = int(rss_match.group(1))
units = rss_match.group(2)
except Exception:
return None, None
return mem, units
def _get_cpu_temperature(self) -> Optional[float]:
try:
res = int(self.temp_file.read_text().strip())
temp = res / 1000.
except Exception:
return None
return temp
def _get_net_stats(self) -> Dict[str, Any]:
net_stats: Dict[str, Any] = {}
try:
ret = self.netdev_file.read_text()
dev_info = re.findall(r"([\w]+):(.+)", ret)
for (dev_name, stats) in dev_info:
parsed_stats = stats.strip().split()
net_stats[dev_name] = {
'rx_bytes': int(parsed_stats[0]),
'tx_bytes': int(parsed_stats[8])
}
return net_stats
except Exception:
return {}
def _update_cpu_stats(self) -> None:
try:
cpu_usage: Dict[str, Any] = {}
ret = self.cpu_stats_file.read_text()
usage_info: List[str] = re.findall(r"cpu[^\n]+", ret)
for cpu in usage_info:
parts = cpu.split()
name = parts[0]
cpu_sum = sum([int(t) for t in parts[1:]])
cpu_idle = int(parts[4])
if name in self.last_cpu_stats:
last_sum, last_idle = self.last_cpu_stats[name]
cpu_delta = cpu_sum - last_sum
idle_delta = cpu_idle - last_idle
cpu_used = cpu_delta - idle_delta
cpu_usage[name] = round(
100 * (cpu_used / cpu_delta), 2)
self.cpu_usage = cpu_usage
self.last_cpu_stats[name] = (cpu_sum, cpu_idle)
except Exception:
pass
def _format_stats(self, stats: Dict[str, Any]) -> str:
return f"System Time: {stats['time']:2f}, " \
f"Usage: {stats['cpu_usage']}%, " \
f"Memory: {stats['memory']} {stats['mem_units']}"
def log_last_stats(self, count: int = 1):
count = min(len(self.proc_stat_queue), count)
msg = ""
for stats in list(self.proc_stat_queue)[-count:]:
msg += f"\n{self._format_stats(stats)}"
logging.info(msg)
def close(self) -> None:
self.stat_update_timer.stop()
self.watchdog.stop()
class Watchdog:
def __init__(self, proc_stats: ProcStats) -> None:
self.proc_stats = proc_stats
self.event_loop = proc_stats.event_loop
self.last_watch_time: float = 0.
self.watchdog_timer = self.event_loop.register_timer(
self._watchdog_callback
)
def _watchdog_callback(self, eventtime: float) -> float:
time_diff = eventtime - self.last_watch_time
if time_diff > REPORT_BLOCKED_TIME:
logging.info(
f"EVENT LOOP BLOCKED: {round(time_diff, 2)} seconds")
# delay the stat logging so we capture the CPU percentage after
# the next cycle
self.event_loop.delay_callback(
.2, self.proc_stats.log_last_stats, 5)
self.last_watch_time = eventtime
return eventtime + WATCHDOG_REFRESH_TIME
def start(self):
if not self.watchdog_timer.is_running():
self.last_watch_time = self.event_loop.get_loop_time()
self.watchdog_timer.start()
def stop(self):
self.watchdog_timer.stop()
def load_component(config: ConfigHelper) -> ProcStats:
return ProcStats(config)