CreatBotKlipper/klippy/extras/filament_airpump_sensor.py

150 lines
6.2 KiB
Python

# Filament air pump load Module
#
# Copyright (C) 2025 Creatbot
#
# This file may be distributed under the terms of the GNU GPLv3 license.
import logging
from . import output_pin, filament_switch_sensor
class AirPump:
def __init__(self, config, timeout_event_handler):
self.printer = config.get_printer()
self.name = config.get_name()
self.reactor = self.printer.get_reactor()
self.max_run_time = config.getfloat('max_run_time', 10., minval=0.)
self.insert_delay_time = config.getfloat('insert_delay_time', 2., minval=0.)
self.start_time = None
self.is_running = False
self.timeout_timer = None
ppins = self.printer.lookup_object('pins')
self.airpump_pin = None
airpump_pin = config.get('airpump_pin')
self.airpump_pin = ppins.setup_pin('digital_out', airpump_pin)
self.airpump_pin.setup_max_duration(0.)
self.airpump_mcu = self.airpump_pin.get_mcu()
self.timeout_event_handler = timeout_event_handler
self.motor_pin = None
motor_pin = config.get('motor_pin')
self.motor_pin = ppins.setup_pin('digital_out', motor_pin)
self.motor_pin.setup_max_duration(0.)
def set_power(self, power, eventtime):
print_time = self.airpump_mcu.estimated_print_time(eventtime + 0.1)
logging.info(f"set air pump to {power} at {print_time}")
if power > 0:
if not self.is_running:
self.start_time = self.reactor.monotonic()
self.is_running = True
self.timeout_timer = self.reactor.register_timer(
self._check_timeout, self.start_time + self.max_run_time)
self.motor_pin.set_digital(print_time, 1)
self.airpump_pin.set_digital(print_time, 1)
else:
if self.timeout_timer is not None:
self.reactor.unregister_timer(self.timeout_timer)
self.timeout_timer = None
self.is_running = False
self.start_time = None
self.motor_pin.set_digital(print_time, 0)
self.airpump_pin.set_digital(print_time, 0)
def _check_timeout(self, eventtime):
if self.is_running and (eventtime - self.start_time >= self.max_run_time):
if self.timeout_event_handler:
self.timeout_event_handler(eventtime)
logging.info(f"Air pump '{self.name}' timed out after {self.max_run_time}s")
return self.reactor.NEVER
class AirPump_Helper(filament_switch_sensor.RunoutHelper):
def __init__(self, config):
super().__init__(config)
self.airpump = AirPump(config, self._timeout_event_handler)
self.retry_count = 0
def set_airpump_state(self, power, eventtime=None):
if eventtime is None:
eventtime = self.reactor.monotonic()
self.airpump.set_power(power, eventtime)
def _runout_event_handler(self, eventtime):
self.set_airpump_state(1, eventtime)
self.min_event_systime = self.reactor.monotonic() + self.event_delay
def _insert_pump_event_handler(self, eventtime):
delay = self.airpump.insert_delay_time
target_time = eventtime + delay
def delayed_action(eventtime):
self.set_airpump_state(0, eventtime)
self.min_event_systime = self.reactor.monotonic() + self.event_delay
return self.reactor.NEVER
self.reactor.register_callback(delayed_action, target_time)
self.retry_count = 0
def _timeout_event_handler(self, eventtime):
self.retry_count += 1
self.set_airpump_state(0, eventtime)
if self.retry_count < 3:
target_time = eventtime + 30
def retry_action(eventtime):
self._runout_event_handler(eventtime)
return self.reactor.NEVER
self.reactor.register_callback(retry_action, target_time)
else:
super()._runout_event_handler(eventtime)
self.retry_count = 0
class AirPumpLoad:
def __init__(self, config):
self.cmd_SET_AIRPUMP_help = "Sets the state of the filament airpump"
self.printer = config.get_printer()
buttons = self.printer.load_object(config, 'buttons')
switch_pin = config.get('switch_pin')
buttons.register_debounce_button(switch_pin, self._pump_status_handler, config)
self.airpump_helper = AirPump_Helper(config)
self.get_status = self.airpump_helper.get_status
self.name = config.get_name().split()[-1]
self.template_eval = output_pin.lookup_template_eval(config)
gcode = self.printer.lookup_object('gcode')
gcode.register_mux_command("SET_AIRPUMP", "AIRPUMP",
self.name,
self.cmd_SET_AIRPUMP,
desc=self.cmd_SET_AIRPUMP_help)
def _pump_status_handler(self, eventtime, state):
self.airpump_helper.note_filament_present(eventtime, state)
def _template_update(self, text):
try:
value = float(text)
except ValueError as e:
logging.exception("Airpump template render error")
value = 0.
self.airpump_helper.set_airpump_state(value)
def cmd_SET_AIRPUMP(self, gcmd):
value = gcmd.get_float('VALUE', None, minval=0., maxval=1.)
template = gcmd.get('TEMPLATE', None)
run_time = gcmd.get_float('RUN_TIME', None, minval=0.)
if (value is None) == (template is None):
raise gcmd.error("SET_AIRPUMP must specify VALUE or TEMPLATE")
if template is not None:
self.template_eval.set_template(gcmd, self._template_update)
return
self.airpump_helper.set_airpump_state(value)
gcmd.respond_info(f"{self.name} airpump set to: {value}")
if value > 0 and run_time is not None:
eventtime = self.airpump_helper.reactor.monotonic()
def turn_off_pump(eventtime):
self.airpump_helper.set_airpump_state(0, eventtime)
return self.airpump_helper.reactor.NEVER
self.airpump_helper.reactor.register_timer(turn_off_pump, eventtime + run_time)
def load_config_prefix(config):
return AirPumpLoad(config)