diff --git a/moonraker/app.py b/moonraker/app.py index 382c439..b4747d0 100644 --- a/moonraker/app.py +++ b/moonraker/app.py @@ -23,7 +23,13 @@ from tornado.routing import Rule, PathMatches, AnyMatches from tornado.http1connection import HTTP1Connection from tornado.log import access_log from utils import ServerError -from websockets import WebRequest, WebsocketManager, WebSocket, APITransport +from websockets import ( + WebRequest, + WebsocketManager, + WebSocket, + APITransport, + BridgeSocket +) from streaming_form_data import StreamingFormDataParser from streaming_form_data.targets import FileTarget, ValueTarget, SHA256Target @@ -210,6 +216,7 @@ class MoonrakerApp: (AnyMatches(), self.mutable_router), (r"/", WelcomeHandler), (r"/websocket", WebSocket), + (r"/klippysocket", BridgeSocket), (r"/server/redirect", RedirectHandler) ] self.app = tornado.web.Application(app_handlers, **app_args) diff --git a/moonraker/klippy_connection.py b/moonraker/klippy_connection.py index 30cc424..aad83f3 100644 --- a/moonraker/klippy_connection.py +++ b/moonraker/klippy_connection.py @@ -26,6 +26,7 @@ from typing import ( Dict, List, Set, + Tuple ) if TYPE_CHECKING: from moonraker import Server @@ -246,8 +247,7 @@ class KlippyConnection: continue self.log_no_access = True try: - reader, writer = await asyncio.open_unix_connection( - str(self.uds_address), limit=UNIX_BUFFER_LIMIT) + reader, writer = await self.open_klippy_connection(True) except asyncio.CancelledError: raise except Exception: @@ -267,6 +267,14 @@ class KlippyConnection: self.event_loop.create_task(self._read_stream(reader)) return await self._init_klippy_connection() + async def open_klippy_connection( + self, primary: bool = False + ) -> Tuple[asyncio.StreamReader, asyncio.StreamWriter]: + if not primary and not self.is_connected(): + raise ServerError("Klippy Unix Connection Not Available", 503) + return await asyncio.open_unix_connection( + str(self.uds_address), limit=UNIX_BUFFER_LIMIT) + def _get_peer_credentials(self, writer: asyncio.StreamWriter) -> bool: self._peer_cred = get_unix_peer_credentials(writer, "Klippy") if not self._peer_cred: diff --git a/moonraker/websockets.py b/moonraker/websockets.py index dc8a2f8..b876beb 100644 --- a/moonraker/websockets.py +++ b/moonraker/websockets.py @@ -11,6 +11,7 @@ import json import asyncio import copy from tornado.websocket import WebSocketHandler, WebSocketClosedError +from tornado.web import HTTPError from utils import ServerError, SentinelClass # Annotation imports @@ -350,6 +351,7 @@ class WebsocketManager(APITransport): def __init__(self, server: Server) -> None: self.server = server self.clients: Dict[int, BaseSocketClient] = {} + self.bridge_connections: Dict[int, BridgeSocket] = {} self.rpc = JsonRPC(server) self.closed_event: Optional[asyncio.Event] = None @@ -516,7 +518,24 @@ class WebsocketManager(APITransport): if old_sc is not None: self.server.send_event("websockets:client_removed", sc) logging.debug(f"Websocket Removed: {sc.uid}") - if self.closed_event is not None and not self.clients: + self._check_closed_event() + + def add_bridge_connection(self, bc: BridgeSocket) -> None: + self.bridge_connections[bc.uid] = bc + logging.debug(f"New Bridge Connection Added: {bc.uid}") + + def remove_bridge_connection(self, bc: BridgeSocket) -> None: + old_bc = self.bridge_connections.pop(bc.uid, None) + if old_bc is not None: + logging.debug(f"Bridge Connection Removed: {bc.uid}") + self._check_closed_event() + + def _check_closed_event(self) -> None: + if ( + self.closed_event is not None and + not self.clients and + not self.bridge_connections + ): self.closed_event.set() def notify_clients( @@ -540,6 +559,8 @@ class WebsocketManager(APITransport): if not self.clients: return self.closed_event = asyncio.Event() + for bc in list(self.bridge_connections.values()): + bc.close_socket(1001, "Server Shutdown") for sc in list(self.clients.values()): sc.close_socket(1001, "Server Shutdown") try: @@ -822,3 +843,148 @@ class WebSocket(WebSocketHandler, BaseSocketClient): def close_socket(self, code: int, reason: str) -> None: self.close(code, reason) + +class BridgeSocket(WebSocketHandler): + def initialize(self) -> None: + self.server: Server = self.settings['server'] + self.wsm: WebsocketManager = self.server.lookup_component("websockets") + self.eventloop = self.server.get_event_loop() + self.uid = id(self) + self.ip_addr: str = self.request.remote_ip or "" + self.last_pong_time: float = self.eventloop.get_loop_time() + self.is_closed = False + self.klippy_writer: Optional[asyncio.StreamWriter] = None + self.klippy_write_buf: List[bytes] = [] + self.klippy_queue_busy: bool = False + + @property + def hostname(self) -> str: + return self.request.host_name + + def open(self, *args, **kwargs) -> None: + WebSocket.connection_count += 1 + self.set_nodelay(True) + self._connected_time = self.eventloop.get_loop_time() + agent = self.request.headers.get("User-Agent", "") + is_proxy = False + if ( + "X-Forwarded-For" in self.request.headers or + "X-Real-Ip" in self.request.headers + ): + is_proxy = True + logging.info(f"Bridge Socket Opened: ID: {self.uid}, " + f"Proxied: {is_proxy}, " + f"User Agent: {agent}, " + f"Host Name: {self.hostname}") + self.wsm.add_bridge_connection(self) + + def on_message(self, message: Union[bytes, str]) -> None: + if isinstance(message, str): + message = message.encode(encoding="utf-8") + self.klippy_write_buf.append(message) + if self.klippy_queue_busy: + return + self.klippy_queue_busy = True + self.eventloop.register_callback(self._write_klippy_messages) + + async def _write_klippy_messages(self) -> None: + while self.klippy_write_buf: + if self.klippy_writer is None or self.is_closed: + break + msg = self.klippy_write_buf.pop(0) + try: + self.klippy_writer.write(msg + b"\x03") + await self.klippy_writer.drain() + except asyncio.CancelledError: + raise + except Exception: + if not self.is_closed: + logging.debug("Klippy Disconnection From _write_request()") + self.close(1001, "Klippy Disconnected") + break + self.klippy_queue_busy = False + + def on_pong(self, data: bytes) -> None: + self.last_pong_time = self.eventloop.get_loop_time() + + def on_close(self) -> None: + WebSocket.connection_count -= 1 + self.is_closed = True + self.klippy_write_buf.clear() + if self.klippy_writer is not None: + self.klippy_writer.close() + self.klippy_writer = None + now = self.eventloop.get_loop_time() + pong_elapsed = now - self.last_pong_time + logging.info(f"Bridge Socket Closed: ID: {self.uid} " + f"Close Code: {self.close_code}, " + f"Close Reason: {self.close_reason}, " + f"Pong Time Elapsed: {pong_elapsed:.2f}") + self.wsm.remove_bridge_connection(self) + + async def _read_unix_stream(self, reader: asyncio.StreamReader) -> None: + errors_remaining: int = 10 + while not reader.at_eof(): + try: + data = memoryview(await reader.readuntil(b'\x03')) + except (ConnectionError, asyncio.IncompleteReadError): + break + except asyncio.CancelledError: + logging.exception("Klippy Stream Read Cancelled") + raise + except Exception: + logging.exception("Klippy Stream Read Error") + errors_remaining -= 1 + if not errors_remaining or self.is_closed: + break + continue + try: + await self.write_message(data[:-1].tobytes()) + except WebSocketClosedError: + logging.info( + f"Bridge closed while writing: {self.uid}") + break + except asyncio.CancelledError: + raise + except Exception: + logging.exception( + f"Error sending data over Bridge: {self.uid}") + errors_remaining -= 1 + if not errors_remaining or self.is_closed: + break + continue + errors_remaining = 10 + if not self.is_closed: + logging.debug("Bridge Disconnection From _read_unix_stream()") + self.close_socket(1001, "Klippy Disconnected") + + def check_origin(self, origin: str) -> bool: + if not super().check_origin(origin): + auth: AuthComp = self.server.lookup_component('authorization', None) + if auth is not None: + return auth.check_cors(origin) + return False + return True + + # Check Authorized User + async def prepare(self) -> None: + max_conns = self.settings["max_websocket_connections"] + if WebSocket.connection_count >= max_conns: + raise self.server.error( + "Maximum Number of Bridge Connections Reached" + ) + auth: AuthComp = self.server.lookup_component("authorization", None) + if auth is not None: + self.current_user = auth.check_authorized(self.request) + kconn: Klippy = self.server.lookup_component("klippy_connection") + try: + reader, writer = await kconn.open_klippy_connection() + except ServerError as err: + raise HTTPError(err.status_code, str(err)) from None + except Exception as e: + raise HTTPError(503, "Failed to open connection to Klippy") from e + self.klippy_writer = writer + self.eventloop.register_callback(self._read_unix_stream, reader) + + def close_socket(self, code: int, reason: str) -> None: + self.close(code, reason)