remove idle_timeout from the enable status (not used) make status data do less stuff for performance reasons separate printer and moonraker in status enable docs: make menu config look more like configuration reference of klipper also fixes some mistakes and simplify the description
421 lines
16 KiB
Python
421 lines
16 KiB
Python
import logging
|
|
import gi
|
|
|
|
gi.require_version("Gtk", "3.0")
|
|
from gi.repository import GLib
|
|
|
|
|
|
class Printer:
|
|
def __init__(self, state_cb, state_callbacks):
|
|
self.config = {}
|
|
self.data = {}
|
|
self.state = "disconnected"
|
|
self.state_cb = state_cb
|
|
self.state_callbacks = state_callbacks
|
|
self.devices = {}
|
|
self.power_devices = {}
|
|
self.tools = []
|
|
self.extrudercount = 0
|
|
self.tempdevcount = 0
|
|
self.fancount = 0
|
|
self.ledcount = 0
|
|
self.output_pin_count = 0
|
|
self.store_timeout = None
|
|
self.tempstore = {}
|
|
self.tempstore_size = 1200
|
|
self.cameras = []
|
|
self.available_commands = {}
|
|
self.spoolman = False
|
|
self.temp_devices = self.sensors = None
|
|
|
|
def reinit(self, printer_info, data):
|
|
self.config = data['configfile']['config']
|
|
self.data = data
|
|
self.devices.clear()
|
|
self.tools.clear()
|
|
self.extrudercount = 0
|
|
self.tempdevcount = 0
|
|
self.fancount = 0
|
|
self.ledcount = 0
|
|
self.output_pin_count = 0
|
|
self.tempstore.clear()
|
|
self.tempstore_size = 1200
|
|
self.available_commands.clear()
|
|
self.temp_devices = self.sensors = None
|
|
self.stop_tempstore_updates()
|
|
|
|
for x in self.config.keys():
|
|
if x[:8] == "extruder":
|
|
self.tools.append(x)
|
|
self.tools = sorted(self.tools)
|
|
self.extrudercount += 1
|
|
if x.startswith('extruder_stepper'):
|
|
continue
|
|
self.devices[x] = {
|
|
"temperature": 0,
|
|
"target": 0
|
|
}
|
|
if x == 'heater_bed' \
|
|
or x.startswith('heater_generic ') \
|
|
or x.startswith('temperature_sensor ') \
|
|
or x.startswith('temperature_fan '):
|
|
self.devices[x] = {"temperature": 0}
|
|
if not x.startswith('temperature_sensor '):
|
|
self.devices[x]["target"] = 0
|
|
# Support for hiding devices by name
|
|
name = x.split()[1] if len(x.split()) > 1 else x
|
|
if not name.startswith("_"):
|
|
self.tempdevcount += 1
|
|
if x == 'fan' \
|
|
or x.startswith('controller_fan ') \
|
|
or x.startswith('heater_fan ') \
|
|
or x.startswith('fan_generic '):
|
|
# Support for hiding devices by name
|
|
name = x.split()[1] if len(x.split()) > 1 else x
|
|
if not name.startswith("_"):
|
|
self.fancount += 1
|
|
if x.startswith('output_pin ') and not x.split()[1].startswith("_"):
|
|
self.output_pin_count += 1
|
|
if x.startswith('bed_mesh '):
|
|
try:
|
|
r = self.config[x]
|
|
r['x_count'] = int(r['x_count'])
|
|
r['y_count'] = int(r['y_count'])
|
|
r['max_x'] = float(r['max_x'])
|
|
r['min_x'] = float(r['min_x'])
|
|
r['max_y'] = float(r['max_y'])
|
|
r['min_y'] = float(r['min_y'])
|
|
r['points'] = [[float(j.strip()) for j in i.split(",")] for i in r['points'].strip().split("\n")]
|
|
except KeyError:
|
|
logging.debug(f"Couldn't load mesh {x}: {self.config[x]}")
|
|
if x.startswith('led') \
|
|
or x.startswith('neopixel ') \
|
|
or x.startswith('dotstar ') \
|
|
or x.startswith('pca9533 ') \
|
|
or x.startswith('pca9632 '):
|
|
name = x.split()[1] if len(x.split()) > 1 else x
|
|
if not name.startswith("_"):
|
|
self.ledcount += 1
|
|
self.process_update(data)
|
|
|
|
logging.info(f"Klipper version: {printer_info['software_version']}")
|
|
logging.info(f"# Extruders: {self.extrudercount}")
|
|
logging.info(f"# Temperature devices: {self.tempdevcount}")
|
|
logging.info(f"# Fans: {self.fancount}")
|
|
logging.info(f"# Output pins: {self.output_pin_count}")
|
|
logging.info(f"# Leds: {self.ledcount}")
|
|
|
|
def stop_tempstore_updates(self):
|
|
logging.info("Stopping tempstore")
|
|
if self.store_timeout is not None:
|
|
GLib.source_remove(self.store_timeout)
|
|
self.store_timeout = None
|
|
|
|
def process_update(self, data):
|
|
if self.data is None:
|
|
return
|
|
for x in (self.get_temp_devices() + self.get_filament_sensors()):
|
|
if x in data:
|
|
for i in data[x]:
|
|
self.set_dev_stat(x, i, data[x][i])
|
|
|
|
for x in data:
|
|
if x == "configfile":
|
|
continue
|
|
if x not in self.data:
|
|
self.data[x] = {}
|
|
self.data[x].update(data[x])
|
|
|
|
if "webhooks" in data or "print_stats" in data or "idle_timeout" in data:
|
|
self.process_status_update()
|
|
|
|
def evaluate_state(self):
|
|
# webhooks states: startup, ready, shutdown, error
|
|
# print_stats: standby, printing, paused, error, complete
|
|
# idle_timeout: Idle, Printing, Ready
|
|
if self.data['webhooks']['state'] == "ready" and (
|
|
'print_stats' in self.data and 'state' in self.data['print_stats']):
|
|
if self.data['print_stats']['state'] == 'paused':
|
|
return "paused"
|
|
if self.data['print_stats']['state'] == 'printing':
|
|
return "printing"
|
|
return self.data['webhooks']['state']
|
|
|
|
def process_status_update(self):
|
|
state = self.evaluate_state()
|
|
if state != self.state:
|
|
self.change_state(state)
|
|
return False
|
|
|
|
def process_power_update(self, data):
|
|
if data['device'] in self.power_devices:
|
|
self.power_devices[data['device']]['status'] = data['status']
|
|
|
|
def change_state(self, state):
|
|
if state not in list(self.state_callbacks):
|
|
return # disconnected, startup, ready, shutdown, error, paused, printing
|
|
if state != self.state:
|
|
logging.debug(f"Changing state from '{self.state}' to '{state}'")
|
|
self.state = state
|
|
if self.state_callbacks[state] is not None:
|
|
logging.debug(f"Adding callback for state: {state}")
|
|
GLib.idle_add(self.state_cb, self.state_callbacks[state])
|
|
|
|
def configure_power_devices(self, data):
|
|
self.power_devices = {}
|
|
|
|
logging.debug(f"Processing power devices: {data}")
|
|
for x in data['devices']:
|
|
self.power_devices[x['device']] = {
|
|
"status": "on" if x['status'] == "on" else "off"
|
|
}
|
|
logging.debug(f"Power devices: {self.power_devices}")
|
|
|
|
def configure_cameras(self, data):
|
|
self.cameras = data
|
|
logging.debug(f"Cameras: {self.cameras}")
|
|
|
|
def get_config_section_list(self, search=""):
|
|
if self.config is not None:
|
|
return [i for i in list(self.config) if i.startswith(search)] if hasattr(self, "config") else []
|
|
return []
|
|
|
|
def get_config_section(self, section):
|
|
return self.config[section] if section in self.config else False
|
|
|
|
def get_macro(self, macro):
|
|
return next(
|
|
(
|
|
self.config[key]
|
|
for key in self.config.keys()
|
|
if key.find(macro) > -1
|
|
),
|
|
False,
|
|
)
|
|
|
|
def get_fans(self):
|
|
fans = []
|
|
if self.config_section_exists("fan"):
|
|
fans.append("fan")
|
|
for fan_type in ["controller_fan", "fan_generic", "heater_fan"]:
|
|
fans.extend(iter(self.get_config_section_list(f"{fan_type} ")))
|
|
return fans
|
|
|
|
def get_output_pins(self):
|
|
return self.get_config_section_list("output_pin ")
|
|
|
|
def get_gcode_macros(self):
|
|
macros = []
|
|
for macro in self.get_config_section_list("gcode_macro "):
|
|
macro = macro[12:].strip()
|
|
if macro.startswith("_") or macro.upper() in ('LOAD_FILAMENT', 'UNLOAD_FILAMENT'):
|
|
continue
|
|
if self.get_macro(macro) and "rename_existing" in self.get_macro(macro):
|
|
continue
|
|
macros.append(macro)
|
|
return macros
|
|
|
|
def get_heaters(self):
|
|
heaters = self.get_config_section_list("heater_generic ")
|
|
if "heater_bed" in self.devices:
|
|
heaters.insert(0, "heater_bed")
|
|
return heaters
|
|
|
|
def get_temp_fans(self):
|
|
return self.get_config_section_list("temperature_fan")
|
|
|
|
def get_temp_sensors(self):
|
|
return self.get_config_section_list("temperature_sensor")
|
|
|
|
def get_filament_sensors(self):
|
|
if self.sensors is None:
|
|
self.sensors = list(self.get_config_section_list("filament_switch_sensor "))
|
|
self.sensors.extend(iter(self.get_config_section_list("filament_motion_sensor ")))
|
|
return self.sensors
|
|
|
|
def get_probe(self):
|
|
probe_types = ["probe", "bltouch", "smart_effector", "dockable_probe"]
|
|
for probe_type in probe_types:
|
|
if self.config_section_exists(probe_type):
|
|
logging.info(f"Probe type: {probe_type}")
|
|
return self.get_config_section(probe_type)
|
|
return None
|
|
|
|
def get_printer_status_data(self):
|
|
return {
|
|
"moonraker": {
|
|
"power_devices": {"count": len(self.get_power_devices())},
|
|
"cameras": {"count": len(self.cameras)},
|
|
"spoolman": self.spoolman,
|
|
},
|
|
"printer": {
|
|
"pause_resume": {"is_paused": self.state == "paused"},
|
|
"extruders": {"count": self.extrudercount},
|
|
"temperature_devices": {"count": self.tempdevcount},
|
|
"fans": {"count": self.fancount},
|
|
"output_pins": {"count": self.output_pin_count},
|
|
"gcode_macros": {"count": len(self.get_gcode_macros()), "list": self.get_gcode_macros()},
|
|
"leds": {"count": self.ledcount},
|
|
"config_sections": [section for section in self.config.keys()],
|
|
}
|
|
}
|
|
|
|
def get_leds(self):
|
|
return [
|
|
led
|
|
for led_type in ["dotstar", "led", "neopixel", "pca9533", "pca9632"]
|
|
for led in self.get_config_section_list(f"{led_type} ")
|
|
if not led.split()[1].startswith("_")
|
|
]
|
|
|
|
def get_led_color_order(self, led):
|
|
if led not in self.config or led not in self.data:
|
|
logging.debug(f"Error getting {led} config")
|
|
return None
|
|
elif "color_order" in self.config[led]:
|
|
return self.config[led]["color_order"]
|
|
colors = ''
|
|
for option in self.config[led]:
|
|
if option in ("red_pin", 'initial_RED') and 'R' not in colors:
|
|
colors += 'R'
|
|
elif option in ("green_pin", 'initial_GREEN') and 'G' not in colors:
|
|
colors += 'G'
|
|
elif option in ("blue_pin", 'initial_BLUE') and 'B' not in colors:
|
|
colors += 'B'
|
|
elif option in ("white_pin", 'initial_WHITE') and 'W' not in colors:
|
|
colors += 'W'
|
|
logging.debug(f"Colors in led: {colors}")
|
|
return colors
|
|
|
|
def get_power_devices(self):
|
|
return list(self.power_devices)
|
|
|
|
def get_power_device_status(self, device):
|
|
if device not in self.power_devices:
|
|
return
|
|
return self.power_devices[device]['status']
|
|
|
|
def get_stat(self, stat, substat=None):
|
|
if self.data is None or stat not in self.data:
|
|
return {}
|
|
if substat is not None:
|
|
return self.data[stat][substat] if substat in self.data[stat] else {}
|
|
return self.data[stat]
|
|
|
|
def get_dev_stat(self, dev, stat):
|
|
if dev in self.devices and stat in self.devices[dev]:
|
|
return self.devices[dev][stat]
|
|
return None
|
|
|
|
def get_fan_speed(self, fan="fan"):
|
|
speed = 0
|
|
if fan not in self.config or fan not in self.data:
|
|
logging.debug(f"Error getting {fan} config")
|
|
return speed
|
|
if "speed" in self.data[fan]:
|
|
speed = self.data[fan]["speed"]
|
|
if 'max_power' in self.config[fan]:
|
|
max_power = float(self.config[fan]['max_power'])
|
|
if max_power > 0:
|
|
speed = speed / max_power
|
|
if 'off_below' in self.config[fan]:
|
|
off_below = float(self.config[fan]['off_below'])
|
|
if speed < off_below:
|
|
speed = 0
|
|
return speed
|
|
|
|
def get_pin_value(self, pin):
|
|
if pin in self.data:
|
|
return self.data[pin]["value"]
|
|
elif pin in self.config and 'value' in self.config[pin]:
|
|
return self.config[pin]["value"]
|
|
return 0
|
|
|
|
def get_temp_store_devices(self):
|
|
return list(self.tempstore)
|
|
|
|
def device_has_target(self, device):
|
|
return "target" in self.devices[device]
|
|
|
|
def device_has_power(self, device):
|
|
return "power" in self.devices[device]
|
|
|
|
def get_temp_store(self, device, section=False, results=0):
|
|
if device not in self.tempstore:
|
|
return False
|
|
|
|
if section is not False:
|
|
if section not in self.tempstore[device]:
|
|
return False
|
|
if results == 0 or results >= len(self.tempstore[device][section]):
|
|
return self.tempstore[device][section]
|
|
return self.tempstore[device][section][-results:]
|
|
|
|
temp = {}
|
|
for section in self.tempstore[device]:
|
|
if results == 0 or results >= len(self.tempstore[device][section]):
|
|
temp[section] = self.tempstore[device][section]
|
|
temp[section] = self.tempstore[device][section][-results:]
|
|
return temp
|
|
|
|
def get_tempstore_size(self):
|
|
return self.tempstore_size
|
|
|
|
def get_temp_devices(self):
|
|
if self.temp_devices is None:
|
|
devices = [
|
|
device
|
|
for device in self.tools
|
|
if not device.startswith('extruder_stepper')
|
|
]
|
|
self.temp_devices = devices + self.get_heaters() + self.get_temp_sensors() + self.get_temp_fans()
|
|
return self.temp_devices
|
|
|
|
def get_tools(self):
|
|
return self.tools
|
|
|
|
def get_tool_number(self, tool):
|
|
return self.tools.index(tool)
|
|
|
|
def init_temp_store(self, tempstore):
|
|
if self.tempstore and set(self.tempstore) != set(tempstore):
|
|
logging.debug("Tempstore has changed")
|
|
self.tempstore = tempstore
|
|
self.change_state(self.state)
|
|
else:
|
|
self.tempstore = tempstore
|
|
for device in self.tempstore:
|
|
for x in self.tempstore[device]:
|
|
length = len(self.tempstore[device][x])
|
|
if length < self.tempstore_size:
|
|
for _ in range(1, self.tempstore_size - length):
|
|
self.tempstore[device][x].insert(0, 0)
|
|
logging.info(f"Temp store: {list(self.tempstore)}")
|
|
if not self.store_timeout:
|
|
self.store_timeout = GLib.timeout_add_seconds(1, self._update_temp_store)
|
|
|
|
def config_section_exists(self, section):
|
|
return section in self.get_config_section_list()
|
|
|
|
def set_dev_stat(self, dev, stat, value):
|
|
if dev not in self.devices:
|
|
return
|
|
|
|
self.devices[dev][stat] = value
|
|
|
|
def _update_temp_store(self):
|
|
if self.tempstore is None:
|
|
return False
|
|
for device in self.tempstore:
|
|
for x in self.tempstore[device]:
|
|
self.tempstore[device][x].pop(0)
|
|
temp = self.get_dev_stat(device, x[:-1])
|
|
if temp is None:
|
|
temp = 0
|
|
self.tempstore[device][x].append(temp)
|
|
return True
|
|
|
|
def enable_spoolman(self):
|
|
logging.info("Enabling Spoolman")
|
|
self.spoolman = True
|