From 559df5aea198b29cd6eb777e94e82f4dae54230b Mon Sep 17 00:00:00 2001 From: Eric Callahan Date: Sun, 5 Feb 2023 07:54:29 -0500 Subject: [PATCH] websockets: implement websocket-klippy bridge Provide a new websocket implementation that creates a near one to one bridge with a Unix Socket connection to Klippy. This may be used to access Klippy APIs not otherwise available over the primary websocket, such as the various "dump" commands. Unlike the primary websocket Moonraker does not decode or inspect data that passes through the bridge. Signed-off-by: Eric Callahan --- moonraker/app.py | 9 +- moonraker/klippy_connection.py | 12 ++- moonraker/websockets.py | 168 ++++++++++++++++++++++++++++++++- 3 files changed, 185 insertions(+), 4 deletions(-) diff --git a/moonraker/app.py b/moonraker/app.py index 382c439..b4747d0 100644 --- a/moonraker/app.py +++ b/moonraker/app.py @@ -23,7 +23,13 @@ from tornado.routing import Rule, PathMatches, AnyMatches from tornado.http1connection import HTTP1Connection from tornado.log import access_log from utils import ServerError -from websockets import WebRequest, WebsocketManager, WebSocket, APITransport +from websockets import ( + WebRequest, + WebsocketManager, + WebSocket, + APITransport, + BridgeSocket +) from streaming_form_data import StreamingFormDataParser from streaming_form_data.targets import FileTarget, ValueTarget, SHA256Target @@ -210,6 +216,7 @@ class MoonrakerApp: (AnyMatches(), self.mutable_router), (r"/", WelcomeHandler), (r"/websocket", WebSocket), + (r"/klippysocket", BridgeSocket), (r"/server/redirect", RedirectHandler) ] self.app = tornado.web.Application(app_handlers, **app_args) diff --git a/moonraker/klippy_connection.py b/moonraker/klippy_connection.py index 30cc424..aad83f3 100644 --- a/moonraker/klippy_connection.py +++ b/moonraker/klippy_connection.py @@ -26,6 +26,7 @@ from typing import ( Dict, List, Set, + Tuple ) if TYPE_CHECKING: from moonraker import Server @@ -246,8 +247,7 @@ class KlippyConnection: continue self.log_no_access = True try: - reader, writer = await asyncio.open_unix_connection( - str(self.uds_address), limit=UNIX_BUFFER_LIMIT) + reader, writer = await self.open_klippy_connection(True) except asyncio.CancelledError: raise except Exception: @@ -267,6 +267,14 @@ class KlippyConnection: self.event_loop.create_task(self._read_stream(reader)) return await self._init_klippy_connection() + async def open_klippy_connection( + self, primary: bool = False + ) -> Tuple[asyncio.StreamReader, asyncio.StreamWriter]: + if not primary and not self.is_connected(): + raise ServerError("Klippy Unix Connection Not Available", 503) + return await asyncio.open_unix_connection( + str(self.uds_address), limit=UNIX_BUFFER_LIMIT) + def _get_peer_credentials(self, writer: asyncio.StreamWriter) -> bool: self._peer_cred = get_unix_peer_credentials(writer, "Klippy") if not self._peer_cred: diff --git a/moonraker/websockets.py b/moonraker/websockets.py index dc8a2f8..b876beb 100644 --- a/moonraker/websockets.py +++ b/moonraker/websockets.py @@ -11,6 +11,7 @@ import json import asyncio import copy from tornado.websocket import WebSocketHandler, WebSocketClosedError +from tornado.web import HTTPError from utils import ServerError, SentinelClass # Annotation imports @@ -350,6 +351,7 @@ class WebsocketManager(APITransport): def __init__(self, server: Server) -> None: self.server = server self.clients: Dict[int, BaseSocketClient] = {} + self.bridge_connections: Dict[int, BridgeSocket] = {} self.rpc = JsonRPC(server) self.closed_event: Optional[asyncio.Event] = None @@ -516,7 +518,24 @@ class WebsocketManager(APITransport): if old_sc is not None: self.server.send_event("websockets:client_removed", sc) logging.debug(f"Websocket Removed: {sc.uid}") - if self.closed_event is not None and not self.clients: + self._check_closed_event() + + def add_bridge_connection(self, bc: BridgeSocket) -> None: + self.bridge_connections[bc.uid] = bc + logging.debug(f"New Bridge Connection Added: {bc.uid}") + + def remove_bridge_connection(self, bc: BridgeSocket) -> None: + old_bc = self.bridge_connections.pop(bc.uid, None) + if old_bc is not None: + logging.debug(f"Bridge Connection Removed: {bc.uid}") + self._check_closed_event() + + def _check_closed_event(self) -> None: + if ( + self.closed_event is not None and + not self.clients and + not self.bridge_connections + ): self.closed_event.set() def notify_clients( @@ -540,6 +559,8 @@ class WebsocketManager(APITransport): if not self.clients: return self.closed_event = asyncio.Event() + for bc in list(self.bridge_connections.values()): + bc.close_socket(1001, "Server Shutdown") for sc in list(self.clients.values()): sc.close_socket(1001, "Server Shutdown") try: @@ -822,3 +843,148 @@ class WebSocket(WebSocketHandler, BaseSocketClient): def close_socket(self, code: int, reason: str) -> None: self.close(code, reason) + +class BridgeSocket(WebSocketHandler): + def initialize(self) -> None: + self.server: Server = self.settings['server'] + self.wsm: WebsocketManager = self.server.lookup_component("websockets") + self.eventloop = self.server.get_event_loop() + self.uid = id(self) + self.ip_addr: str = self.request.remote_ip or "" + self.last_pong_time: float = self.eventloop.get_loop_time() + self.is_closed = False + self.klippy_writer: Optional[asyncio.StreamWriter] = None + self.klippy_write_buf: List[bytes] = [] + self.klippy_queue_busy: bool = False + + @property + def hostname(self) -> str: + return self.request.host_name + + def open(self, *args, **kwargs) -> None: + WebSocket.connection_count += 1 + self.set_nodelay(True) + self._connected_time = self.eventloop.get_loop_time() + agent = self.request.headers.get("User-Agent", "") + is_proxy = False + if ( + "X-Forwarded-For" in self.request.headers or + "X-Real-Ip" in self.request.headers + ): + is_proxy = True + logging.info(f"Bridge Socket Opened: ID: {self.uid}, " + f"Proxied: {is_proxy}, " + f"User Agent: {agent}, " + f"Host Name: {self.hostname}") + self.wsm.add_bridge_connection(self) + + def on_message(self, message: Union[bytes, str]) -> None: + if isinstance(message, str): + message = message.encode(encoding="utf-8") + self.klippy_write_buf.append(message) + if self.klippy_queue_busy: + return + self.klippy_queue_busy = True + self.eventloop.register_callback(self._write_klippy_messages) + + async def _write_klippy_messages(self) -> None: + while self.klippy_write_buf: + if self.klippy_writer is None or self.is_closed: + break + msg = self.klippy_write_buf.pop(0) + try: + self.klippy_writer.write(msg + b"\x03") + await self.klippy_writer.drain() + except asyncio.CancelledError: + raise + except Exception: + if not self.is_closed: + logging.debug("Klippy Disconnection From _write_request()") + self.close(1001, "Klippy Disconnected") + break + self.klippy_queue_busy = False + + def on_pong(self, data: bytes) -> None: + self.last_pong_time = self.eventloop.get_loop_time() + + def on_close(self) -> None: + WebSocket.connection_count -= 1 + self.is_closed = True + self.klippy_write_buf.clear() + if self.klippy_writer is not None: + self.klippy_writer.close() + self.klippy_writer = None + now = self.eventloop.get_loop_time() + pong_elapsed = now - self.last_pong_time + logging.info(f"Bridge Socket Closed: ID: {self.uid} " + f"Close Code: {self.close_code}, " + f"Close Reason: {self.close_reason}, " + f"Pong Time Elapsed: {pong_elapsed:.2f}") + self.wsm.remove_bridge_connection(self) + + async def _read_unix_stream(self, reader: asyncio.StreamReader) -> None: + errors_remaining: int = 10 + while not reader.at_eof(): + try: + data = memoryview(await reader.readuntil(b'\x03')) + except (ConnectionError, asyncio.IncompleteReadError): + break + except asyncio.CancelledError: + logging.exception("Klippy Stream Read Cancelled") + raise + except Exception: + logging.exception("Klippy Stream Read Error") + errors_remaining -= 1 + if not errors_remaining or self.is_closed: + break + continue + try: + await self.write_message(data[:-1].tobytes()) + except WebSocketClosedError: + logging.info( + f"Bridge closed while writing: {self.uid}") + break + except asyncio.CancelledError: + raise + except Exception: + logging.exception( + f"Error sending data over Bridge: {self.uid}") + errors_remaining -= 1 + if not errors_remaining or self.is_closed: + break + continue + errors_remaining = 10 + if not self.is_closed: + logging.debug("Bridge Disconnection From _read_unix_stream()") + self.close_socket(1001, "Klippy Disconnected") + + def check_origin(self, origin: str) -> bool: + if not super().check_origin(origin): + auth: AuthComp = self.server.lookup_component('authorization', None) + if auth is not None: + return auth.check_cors(origin) + return False + return True + + # Check Authorized User + async def prepare(self) -> None: + max_conns = self.settings["max_websocket_connections"] + if WebSocket.connection_count >= max_conns: + raise self.server.error( + "Maximum Number of Bridge Connections Reached" + ) + auth: AuthComp = self.server.lookup_component("authorization", None) + if auth is not None: + self.current_user = auth.check_authorized(self.request) + kconn: Klippy = self.server.lookup_component("klippy_connection") + try: + reader, writer = await kconn.open_klippy_connection() + except ServerError as err: + raise HTTPError(err.status_code, str(err)) from None + except Exception as e: + raise HTTPError(503, "Failed to open connection to Klippy") from e + self.klippy_writer = writer + self.eventloop.register_callback(self._read_unix_stream, reader) + + def close_socket(self, code: int, reason: str) -> None: + self.close(code, reason)