1. use the status reference instead of regex matching (this was coded before the status existed) 2. change icons to reflect the rotation direction 3. change icon to reflect that the adjustment is good enough (according to klipper docs below 6 minutes) 4. auto open the panel when screws_tilt_calculate finished close #1231
418 lines
16 KiB
Python
418 lines
16 KiB
Python
import logging
|
|
import gi
|
|
|
|
gi.require_version("Gtk", "3.0")
|
|
from gi.repository import GLib
|
|
|
|
|
|
class Printer:
|
|
def __init__(self, state_cb, state_callbacks):
|
|
self.config = {}
|
|
self.data = {}
|
|
self.state = "disconnected"
|
|
self.state_cb = state_cb
|
|
self.state_callbacks = state_callbacks
|
|
self.devices = {}
|
|
self.power_devices = {}
|
|
self.tools = []
|
|
self.extrudercount = 0
|
|
self.tempdevcount = 0
|
|
self.fancount = 0
|
|
self.ledcount = 0
|
|
self.output_pin_count = 0
|
|
self.store_timeout = None
|
|
self.tempstore = {}
|
|
self.tempstore_size = 1200
|
|
self.cameras = []
|
|
self.available_commands = {}
|
|
self.spoolman = False
|
|
self.temp_devices = self.sensors = None
|
|
|
|
def reinit(self, printer_info, data):
|
|
self.config = data['configfile']['config']
|
|
self.data = data
|
|
self.devices = {}
|
|
self.tools = []
|
|
self.extrudercount = 0
|
|
self.tempdevcount = 0
|
|
self.fancount = 0
|
|
self.ledcount = 0
|
|
self.output_pin_count = 0
|
|
self.tempstore = {}
|
|
if not self.store_timeout:
|
|
self.store_timeout = GLib.timeout_add_seconds(1, self._update_temp_store)
|
|
self.tempstore_size = 1200
|
|
self.available_commands = {}
|
|
self.temp_devices = self.sensors = None
|
|
|
|
for x in self.config.keys():
|
|
if x[:8] == "extruder":
|
|
self.tools.append(x)
|
|
self.tools = sorted(self.tools)
|
|
self.extrudercount += 1
|
|
if x.startswith('extruder_stepper'):
|
|
continue
|
|
self.devices[x] = {
|
|
"temperature": 0,
|
|
"target": 0
|
|
}
|
|
if x == 'heater_bed' \
|
|
or x.startswith('heater_generic ') \
|
|
or x.startswith('temperature_sensor ') \
|
|
or x.startswith('temperature_fan '):
|
|
self.devices[x] = {"temperature": 0}
|
|
if not x.startswith('temperature_sensor '):
|
|
self.devices[x]["target"] = 0
|
|
# Support for hiding devices by name
|
|
name = x.split()[1] if len(x.split()) > 1 else x
|
|
if not name.startswith("_"):
|
|
self.tempdevcount += 1
|
|
if x == 'fan' \
|
|
or x.startswith('controller_fan ') \
|
|
or x.startswith('heater_fan ') \
|
|
or x.startswith('fan_generic '):
|
|
# Support for hiding devices by name
|
|
name = x.split()[1] if len(x.split()) > 1 else x
|
|
if not name.startswith("_"):
|
|
self.fancount += 1
|
|
if x.startswith('output_pin ') and not x.split()[1].startswith("_"):
|
|
self.output_pin_count += 1
|
|
if x.startswith('bed_mesh '):
|
|
try:
|
|
r = self.config[x]
|
|
r['x_count'] = int(r['x_count'])
|
|
r['y_count'] = int(r['y_count'])
|
|
r['max_x'] = float(r['max_x'])
|
|
r['min_x'] = float(r['min_x'])
|
|
r['max_y'] = float(r['max_y'])
|
|
r['min_y'] = float(r['min_y'])
|
|
r['points'] = [[float(j.strip()) for j in i.split(",")] for i in r['points'].strip().split("\n")]
|
|
except KeyError:
|
|
logging.debug(f"Couldn't load mesh {x}: {self.config[x]}")
|
|
if x.startswith('led') \
|
|
or x.startswith('neopixel ') \
|
|
or x.startswith('dotstar ') \
|
|
or x.startswith('pca9533 ') \
|
|
or x.startswith('pca9632 '):
|
|
name = x.split()[1] if len(x.split()) > 1 else x
|
|
if not name.startswith("_"):
|
|
self.ledcount += 1
|
|
self.process_update(data)
|
|
|
|
logging.info(f"Klipper version: {printer_info['software_version']}")
|
|
logging.info(f"# Extruders: {self.extrudercount}")
|
|
logging.info(f"# Temperature devices: {self.tempdevcount}")
|
|
logging.info(f"# Fans: {self.fancount}")
|
|
logging.info(f"# Output pins: {self.output_pin_count}")
|
|
logging.info(f"# Leds: {self.ledcount}")
|
|
|
|
def process_update(self, data):
|
|
if self.data is None:
|
|
return
|
|
for x in (self.get_temp_devices() + self.get_filament_sensors()):
|
|
if x in data:
|
|
for i in data[x]:
|
|
self.set_dev_stat(x, i, data[x][i])
|
|
|
|
for x in data:
|
|
if x == "configfile":
|
|
continue
|
|
if x not in self.data:
|
|
self.data[x] = {}
|
|
self.data[x].update(data[x])
|
|
|
|
if "webhooks" in data or "print_stats" in data or "idle_timeout" in data:
|
|
self.process_status_update()
|
|
|
|
def evaluate_state(self):
|
|
# webhooks states: startup, ready, shutdown, error
|
|
# print_stats: standby, printing, paused, error, complete
|
|
# idle_timeout: Idle, Printing, Ready
|
|
if self.data['webhooks']['state'] == "ready" and (
|
|
'print_stats' in self.data and 'state' in self.data['print_stats']):
|
|
if self.data['print_stats']['state'] == 'paused':
|
|
return "paused"
|
|
if self.data['print_stats']['state'] == 'printing':
|
|
return "printing"
|
|
return self.data['webhooks']['state']
|
|
|
|
def process_status_update(self):
|
|
state = self.evaluate_state()
|
|
if state != self.state:
|
|
self.change_state(state)
|
|
return False
|
|
|
|
def process_power_update(self, data):
|
|
if data['device'] in self.power_devices:
|
|
self.power_devices[data['device']]['status'] = data['status']
|
|
|
|
def change_state(self, state):
|
|
if state not in list(self.state_callbacks):
|
|
return # disconnected, startup, ready, shutdown, error, paused, printing
|
|
if state != self.state:
|
|
logging.debug(f"Changing state from '{self.state}' to '{state}'")
|
|
self.state = state
|
|
if self.state_callbacks[state] is not None:
|
|
logging.debug(f"Adding callback for state: {state}")
|
|
GLib.idle_add(self.state_cb, self.state_callbacks[state])
|
|
|
|
def configure_power_devices(self, data):
|
|
self.power_devices = {}
|
|
|
|
logging.debug(f"Processing power devices: {data}")
|
|
for x in data['devices']:
|
|
self.power_devices[x['device']] = {
|
|
"status": "on" if x['status'] == "on" else "off"
|
|
}
|
|
logging.debug(f"Power devices: {self.power_devices}")
|
|
|
|
def configure_cameras(self, data):
|
|
self.cameras = data
|
|
logging.debug(f"Cameras: {self.cameras}")
|
|
|
|
def get_config_section_list(self, search=""):
|
|
if self.config is not None:
|
|
return [i for i in list(self.config) if i.startswith(search)] if hasattr(self, "config") else []
|
|
return []
|
|
|
|
def get_config_section(self, section):
|
|
return self.config[section] if section in self.config else False
|
|
|
|
def get_macro(self, macro):
|
|
return next(
|
|
(
|
|
self.config[key]
|
|
for key in self.config.keys()
|
|
if key.find(macro) > -1
|
|
),
|
|
False,
|
|
)
|
|
|
|
def get_fans(self):
|
|
fans = []
|
|
if self.config_section_exists("fan"):
|
|
fans.append("fan")
|
|
for fan_type in ["controller_fan", "fan_generic", "heater_fan"]:
|
|
fans.extend(iter(self.get_config_section_list(f"{fan_type} ")))
|
|
return fans
|
|
|
|
def get_output_pins(self):
|
|
return self.get_config_section_list("output_pin ")
|
|
|
|
def get_gcode_macros(self):
|
|
macros = []
|
|
for macro in self.get_config_section_list("gcode_macro "):
|
|
macro = macro[12:].strip()
|
|
if macro.startswith("_") or macro.upper() in ('LOAD_FILAMENT', 'UNLOAD_FILAMENT'):
|
|
continue
|
|
if self.get_macro(macro) and "rename_existing" in self.get_macro(macro):
|
|
continue
|
|
macros.append(macro)
|
|
return macros
|
|
|
|
def get_heaters(self):
|
|
heaters = self.get_config_section_list("heater_generic ")
|
|
if "heater_bed" in self.devices:
|
|
heaters.insert(0, "heater_bed")
|
|
return heaters
|
|
|
|
def get_temp_fans(self):
|
|
return self.get_config_section_list("temperature_fan")
|
|
|
|
def get_temp_sensors(self):
|
|
return self.get_config_section_list("temperature_sensor")
|
|
|
|
def get_filament_sensors(self):
|
|
if self.sensors is None:
|
|
self.sensors = list(self.get_config_section_list("filament_switch_sensor "))
|
|
self.sensors.extend(iter(self.get_config_section_list("filament_motion_sensor ")))
|
|
return self.sensors
|
|
|
|
def get_probe(self):
|
|
probe_types = ["probe", "bltouch", "smart_effector", "dockable_probe"]
|
|
for probe_type in probe_types:
|
|
if self.config_section_exists(probe_type):
|
|
logging.info(f"Probe type: {probe_type}")
|
|
return self.get_config_section(probe_type)
|
|
return None
|
|
|
|
def get_printer_status_data(self):
|
|
data = {
|
|
"printer": {
|
|
"extruders": {"count": self.extrudercount},
|
|
"temperature_devices": {"count": self.tempdevcount},
|
|
"fans": {"count": self.fancount},
|
|
"output_pins": {"count": self.output_pin_count},
|
|
"gcode_macros": {"count": len(self.get_gcode_macros()), "list": self.get_gcode_macros()},
|
|
"idle_timeout": self.get_stat("idle_timeout").copy(),
|
|
"pause_resume": {"is_paused": self.state == "paused"},
|
|
"power_devices": {"count": len(self.get_power_devices())},
|
|
"cameras": {"count": len(self.cameras)},
|
|
"spoolman": self.spoolman,
|
|
"leds": {"count": self.ledcount},
|
|
}
|
|
}
|
|
|
|
sections = ["bed_mesh", "bltouch", "probe", "quad_gantry_level", "z_tilt"]
|
|
for section in sections:
|
|
if self.config_section_exists(section):
|
|
data["printer"][section] = self.get_config_section(section).copy()
|
|
|
|
sections = ["firmware_retraction", "input_shaper", "bed_screws", "screws_tilt_adjust"]
|
|
for section in sections:
|
|
data["printer"][section] = self.config_section_exists(section)
|
|
|
|
return data
|
|
|
|
def get_leds(self):
|
|
return [
|
|
led
|
|
for led_type in ["dotstar", "led", "neopixel", "pca9533", "pca9632"]
|
|
for led in self.get_config_section_list(f"{led_type} ")
|
|
if not led.split()[1].startswith("_")
|
|
]
|
|
|
|
def get_led_color_order(self, led):
|
|
if led not in self.config or led not in self.data:
|
|
logging.debug(f"Error getting {led} config")
|
|
return None
|
|
elif "color_order" in self.config[led]:
|
|
return self.config[led]["color_order"]
|
|
colors = ''
|
|
for option in self.config[led]:
|
|
if option in ("red_pin", 'initial_RED') and 'R' not in colors:
|
|
colors += 'R'
|
|
elif option in ("green_pin", 'initial_GREEN') and 'G' not in colors:
|
|
colors += 'G'
|
|
elif option in ("blue_pin", 'initial_BLUE') and 'B' not in colors:
|
|
colors += 'B'
|
|
elif option in ("white_pin", 'initial_WHITE') and 'W' not in colors:
|
|
colors += 'W'
|
|
logging.debug(f"Colors in led: {colors}")
|
|
return colors
|
|
|
|
def get_power_devices(self):
|
|
return list(self.power_devices)
|
|
|
|
def get_power_device_status(self, device):
|
|
if device not in self.power_devices:
|
|
return
|
|
return self.power_devices[device]['status']
|
|
|
|
def get_stat(self, stat, substat=None):
|
|
if self.data is None or stat not in self.data:
|
|
return {}
|
|
if substat is not None:
|
|
return self.data[stat][substat] if substat in self.data[stat] else {}
|
|
return self.data[stat]
|
|
|
|
def get_dev_stat(self, dev, stat):
|
|
if dev in self.devices and stat in self.devices[dev]:
|
|
return self.devices[dev][stat]
|
|
return None
|
|
|
|
def get_fan_speed(self, fan="fan"):
|
|
speed = 0
|
|
if fan not in self.config or fan not in self.data:
|
|
logging.debug(f"Error getting {fan} config")
|
|
return speed
|
|
if "speed" in self.data[fan]:
|
|
speed = self.data[fan]["speed"]
|
|
if 'max_power' in self.config[fan]:
|
|
max_power = float(self.config[fan]['max_power'])
|
|
if max_power > 0:
|
|
speed = speed / max_power
|
|
if 'off_below' in self.config[fan]:
|
|
off_below = float(self.config[fan]['off_below'])
|
|
if speed < off_below:
|
|
speed = 0
|
|
return speed
|
|
|
|
def get_pin_value(self, pin):
|
|
if pin in self.data:
|
|
return self.data[pin]["value"]
|
|
elif pin in self.config and 'value' in self.config[pin]:
|
|
return self.config[pin]["value"]
|
|
return 0
|
|
|
|
def get_temp_store_devices(self):
|
|
if self.tempstore is not None:
|
|
return list(self.tempstore)
|
|
|
|
def device_has_target(self, device):
|
|
return "target" in self.devices[device] or (device in self.tempstore and "targets" in self.tempstore[device])
|
|
|
|
def get_temp_store(self, device, section=False, results=0):
|
|
if device not in self.tempstore:
|
|
return False
|
|
|
|
if section is not False:
|
|
if section not in self.tempstore[device]:
|
|
return False
|
|
if results == 0 or results >= len(self.tempstore[device][section]):
|
|
return self.tempstore[device][section]
|
|
return self.tempstore[device][section][-results:]
|
|
|
|
temp = {}
|
|
for section in self.tempstore[device]:
|
|
if results == 0 or results >= len(self.tempstore[device][section]):
|
|
temp[section] = self.tempstore[device][section]
|
|
temp[section] = self.tempstore[device][section][-results:]
|
|
return temp
|
|
|
|
def get_temp_devices(self):
|
|
if self.temp_devices is None:
|
|
devices = [
|
|
device
|
|
for device in self.tools
|
|
if not device.startswith('extruder_stepper')
|
|
]
|
|
self.temp_devices = devices + self.get_heaters() + self.get_temp_sensors() + self.get_temp_fans()
|
|
return self.temp_devices
|
|
|
|
def get_tools(self):
|
|
return self.tools
|
|
|
|
def get_tool_number(self, tool):
|
|
return self.tools.index(tool)
|
|
|
|
def init_temp_store(self, tempstore):
|
|
if self.tempstore and list(self.tempstore) != list(tempstore):
|
|
logging.debug("Tempstore has changed")
|
|
self.tempstore = tempstore
|
|
self.change_state(self.state)
|
|
else:
|
|
self.tempstore = tempstore
|
|
for device in self.tempstore:
|
|
for x in self.tempstore[device]:
|
|
length = len(self.tempstore[device][x])
|
|
if length < self.tempstore_size:
|
|
for _ in range(1, self.tempstore_size - length):
|
|
self.tempstore[device][x].insert(0, 0)
|
|
logging.info(f"Temp store: {list(self.tempstore)}")
|
|
|
|
def config_section_exists(self, section):
|
|
return section in self.get_config_section_list()
|
|
|
|
def set_dev_stat(self, dev, stat, value):
|
|
if dev not in self.devices:
|
|
return
|
|
|
|
self.devices[dev][stat] = value
|
|
|
|
def _update_temp_store(self):
|
|
if self.tempstore is None:
|
|
return False
|
|
for device in self.tempstore:
|
|
for x in self.tempstore[device]:
|
|
self.tempstore[device][x].pop(0)
|
|
temp = self.get_dev_stat(device, x[:-1])
|
|
if temp is None:
|
|
temp = 0
|
|
self.tempstore[device][x].append(temp)
|
|
return True
|
|
|
|
def enable_spoolman(self):
|
|
logging.info("Enabling Spoolman")
|
|
self.spoolman = True
|