From 76493215c595e61736e91c8233ba75fd75bff683 Mon Sep 17 00:00:00 2001 From: Arksine Date: Sat, 15 Aug 2020 15:22:17 -0400 Subject: [PATCH] moonraker: move klippy connection logic to its own class Signed-off-by: Eric Callahan --- moonraker/moonraker.py | 162 ++++++++++++++++++++++------------------- 1 file changed, 88 insertions(+), 74 deletions(-) diff --git a/moonraker/moonraker.py b/moonraker/moonraker.py index c729b51..068b6c4 100644 --- a/moonraker/moonraker.py +++ b/moonraker/moonraker.py @@ -41,7 +41,8 @@ class Server: # Klippy Connection Handling self.klippy_address = config.get( 'klippy_uds_address', "/tmp/klippy_uds") - self.klippy_iostream = None + self.klippy_connection = KlippyConnection( + self.process_command, self.on_connection_closed) self.is_klippy_ready = False self.gc_response_registered = False self.klippy_state = "disconnected" @@ -146,74 +147,45 @@ class Server: # ***** Klippy Connection ***** async def _connect_klippy(self): - ksock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - kstream = iostream.IOStream(ksock) - try: - await kstream.connect(self.klippy_address) - except iostream.StreamClosedError: - # Klippy Socket Server not available + ret = await self.klippy_connection.connect(self.klippy_address) + if not ret: self.ioloop.call_later(1., self._connect_klippy) return - await gen.sleep(0.5) - if kstream.closed(): - # Klippy Connection was rejected - self.ioloop.call_later(1., self._connect_klippy) - return - logging.info("Klippy Connection Established") - self.klippy_iostream = kstream - self.klippy_iostream.set_close_callback( - self._handle_stream_closed) - self.ioloop.spawn_callback( - self._read_klippy_stream, self.klippy_iostream) # begin server iniialization self.init_cb.start() - async def _read_klippy_stream(self, stream): - while not stream.closed(): - try: - data = await stream.read_until(b'\x03') - except iostream.StreamClosedError as e: - return - except Exception: - logging.exception("Klippy Stream Read Error") - continue - try: - decoded_cmd = json.loads(data[:-1]) - method = decoded_cmd.get('method', None) - if method is not None: - # This is a remote method called from klippy - cb = self.remote_methods.get(method, None) - if cb is not None: - params = decoded_cmd.get('params', {}) - cb(**params) - else: - logging.info(f"Unknown method received: {method}") - continue - # This is a response to a request, process - req_id = decoded_cmd.get('id', None) - request = self.pending_requests.pop(req_id, None) - if request is None: - logging.info( - f"No request matching request ID: {req_id}, " - f"response: {decoded_cmd}") - continue - if 'result' in decoded_cmd: - result = decoded_cmd['result'] - if not result: - result = "ok" - else: - err = decoded_cmd.get('error', "Malformed Klippy Response") - result = ServerError(err, 400) - request.notify(result) - except Exception: - logging.exception( - f"Error processing Klippy Host Response: {data.decode()}") + def process_command(self, cmd): + method = cmd.get('method', None) + if method is not None: + # This is a remote method called from klippy + cb = self.remote_methods.get(method, None) + if cb is not None: + params = cmd.get('params', {}) + cb(**params) + else: + logging.info(f"Unknown method received: {method}") + return + # This is a response to a request, process + req_id = cmd.get('id', None) + request = self.pending_requests.pop(req_id, None) + if request is None: + logging.info( + f"No request matching request ID: {req_id}, " + f"response: {cmd}") + return + if 'result' in cmd: + result = cmd['result'] + if not result: + result = "ok" + else: + err = cmd.get('error', "Malformed Klippy Response") + result = ServerError(err, 400) + request.notify(result) - def _handle_stream_closed(self): + def on_connection_closed(self): self.is_klippy_ready = False self.gc_response_registered = False self.klippy_state = "disconnected" - self.klippy_iostream = None self.init_cb.stop() for request in self.pending_requests.values(): request.notify(ServerError("Klippy Disconnected", 503)) @@ -222,16 +194,6 @@ class Server: self.send_event("server:klippy_state_changed", "disconnect") self.ioloop.call_later(1., self._connect_klippy) - async def send_klippy_request(self, request): - if self.klippy_iostream is None: - request.notify(ServerError("Klippy Host not connected", 503)) - return - data = json.dumps(request.to_dict()).encode() + b"\x03" - try: - await self.klippy_iostream.write(data) - except iostream.StreamClosedError: - request.notify(ServerError("Klippy Host not connected", 503)) - async def _initialize(self): await self._request_endpoints() if not self.is_klippy_ready: @@ -367,7 +329,7 @@ class Server: base_request = BaseRequest(rpc_method, params) self.pending_requests[base_request.id] = base_request self.ioloop.spawn_callback( - self.send_klippy_request, base_request) + self.klippy_connection.send_request, base_request) result = await base_request.wait() return result @@ -380,14 +342,66 @@ class Server: for plugin in self.plugins: if hasattr(plugin, "close"): await plugin.close() - if self.klippy_iostream is not None and \ - not self.klippy_iostream.closed(): - self.klippy_iostream.close() + self.klippy_connection.close() if self.server_running: self.server_running = False await self.moonraker_app.close() self.ioloop.stop() +class KlippyConnection: + def __init__(self, on_recd, on_close): + self.ioloop = IOLoop.current() + self.iostream = None + self.on_recd = on_recd + self.on_close = on_close + + async def connect(self, address): + ksock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + kstream = iostream.IOStream(ksock) + try: + await kstream.connect(address) + except iostream.StreamClosedError: + return False + logging.info("Klippy Connection Established") + self.iostream = kstream + self.iostream.set_close_callback(self.on_close) + self.ioloop.spawn_callback(self._read_stream, self.iostream) + return True + + async def _read_stream(self, stream): + while not stream.closed(): + try: + data = await stream.read_until(b'\x03') + except iostream.StreamClosedError as e: + return + except Exception: + logging.exception("Klippy Stream Read Error") + continue + try: + decoded_cmd = json.loads(data[:-1]) + self.on_recd(decoded_cmd) + except Exception: + logging.exception( + f"Error processing Klippy Host Response: {data.decode()}") + + async def send_request(self, request): + if self.iostream is None: + request.notify(ServerError("Klippy Host not connected", 503)) + return + data = json.dumps(request.to_dict()).encode() + b"\x03" + try: + await self.iostream.write(data) + except iostream.StreamClosedError: + request.notify(ServerError("Klippy Host not connected", 503)) + + def is_connected(self): + return self.iostream is not None and not self.iostream.closed() + + def close(self): + if self.iostream is not None and \ + not self.iostream.closed(): + self.iostream.close() + # Basic WebRequest class, easily converted to dict for json encoding class BaseRequest: def __init__(self, rpc_method, params):