Eric Callahan 504a3a76f5
common: reduce jrpc exception spam
When a client repeatedly makes a request that results in an error
the log is spammed with tracebacks that don't provide significant
value.  The method name, code, and error message should be
enough for basic troubleshooting.

Signed-off-by:  Eric Callahan <arksine.code@gmail.com>
2023-07-09 13:48:01 -04:00

594 lines
20 KiB
Python

# Common classes used throughout Moonraker
#
# Copyright (C) 2023 Eric Callahan <arksine.code@gmail.com>
#
# This file may be distributed under the terms of the GNU GPLv3 license
from __future__ import annotations
import ipaddress
import logging
import copy
import json
from .utils import ServerError, Sentinel
# Annotation imports
from typing import (
TYPE_CHECKING,
Any,
Optional,
Callable,
Coroutine,
Type,
TypeVar,
Union,
Dict,
List,
Awaitable
)
if TYPE_CHECKING:
from .server import Server
from .websockets import WebsocketManager
from .components.authorization import Authorization
from asyncio import Future
_T = TypeVar("_T")
_C = TypeVar("_C", str, bool, float, int)
IPUnion = Union[ipaddress.IPv4Address, ipaddress.IPv6Address]
ConvType = Union[str, bool, float, int]
ArgVal = Union[None, int, float, bool, str]
RPCCallback = Callable[..., Coroutine]
AuthComp = Optional[Authorization]
class Subscribable:
def send_status(self,
status: Dict[str, Any],
eventtime: float
) -> None:
raise NotImplementedError
class APIDefinition:
def __init__(self,
endpoint: str,
http_uri: str,
jrpc_methods: List[str],
request_methods: Union[str, List[str]],
transports: List[str],
callback: Optional[Callable[[WebRequest], Coroutine]],
need_object_parser: bool):
self.endpoint = endpoint
self.uri = http_uri
self.jrpc_methods = jrpc_methods
if not isinstance(request_methods, list):
request_methods = [request_methods]
self.request_methods = request_methods
self.supported_transports = transports
self.callback = callback
self.need_object_parser = need_object_parser
class APITransport:
def register_api_handler(self, api_def: APIDefinition) -> None:
raise NotImplementedError
def remove_api_handler(self, api_def: APIDefinition) -> None:
raise NotImplementedError
class BaseRemoteConnection(Subscribable):
def on_create(self, server: Server) -> None:
self.server = server
self.eventloop = server.get_event_loop()
self.wsm: WebsocketManager = self.server.lookup_component("websockets")
self.rpc = self.wsm.rpc
self._uid = id(self)
self.ip_addr = ""
self.is_closed: bool = False
self.queue_busy: bool = False
self.pending_responses: Dict[int, Future] = {}
self.message_buf: List[Union[str, Dict[str, Any]]] = []
self._connected_time: float = 0.
self._identified: bool = False
self._client_data: Dict[str, str] = {
"name": "unknown",
"version": "",
"type": "",
"url": ""
}
self._need_auth: bool = False
self._user_info: Optional[Dict[str, Any]] = None
@property
def user_info(self) -> Optional[Dict[str, Any]]:
return self._user_info
@user_info.setter
def user_info(self, uinfo: Dict[str, Any]) -> None:
self._user_info = uinfo
self._need_auth = False
@property
def need_auth(self) -> bool:
return self._need_auth
@property
def uid(self) -> int:
return self._uid
@property
def hostname(self) -> str:
return ""
@property
def start_time(self) -> float:
return self._connected_time
@property
def identified(self) -> bool:
return self._identified
@property
def client_data(self) -> Dict[str, str]:
return self._client_data
@client_data.setter
def client_data(self, data: Dict[str, str]) -> None:
self._client_data = data
self._identified = True
async def _process_message(self, message: str) -> None:
try:
response = await self.rpc.dispatch(message, self)
if response is not None:
self.queue_message(response)
except Exception:
logging.exception("Websocket Command Error")
def queue_message(self, message: Union[str, Dict[str, Any]]):
self.message_buf.append(message)
if self.queue_busy:
return
self.queue_busy = True
self.eventloop.register_callback(self._write_messages)
def authenticate(
self,
token: Optional[str] = None,
api_key: Optional[str] = None
) -> None:
auth: AuthComp = self.server.lookup_component("authorization", None)
if auth is None:
return
if token is not None:
self.user_info = auth.validate_jwt(token)
elif api_key is not None and self.user_info is None:
self.user_info = auth.validate_api_key(api_key)
else:
self.check_authenticated()
def check_authenticated(self, path: str = "") -> None:
if not self._need_auth:
return
auth: AuthComp = self.server.lookup_component("authorization", None)
if auth is None:
return
if not auth.is_path_permitted(path):
raise self.server.error("Unauthorized", 401)
def on_user_logout(self, user: str) -> bool:
if self._user_info is None:
return False
if user == self._user_info.get("username", ""):
self._user_info = None
return True
return False
async def _write_messages(self):
if self.is_closed:
self.message_buf = []
self.queue_busy = False
return
while self.message_buf:
msg = self.message_buf.pop(0)
await self.write_to_socket(msg)
self.queue_busy = False
async def write_to_socket(
self, message: Union[str, Dict[str, Any]]
) -> None:
raise NotImplementedError("Children must implement write_to_socket")
def send_status(self,
status: Dict[str, Any],
eventtime: float
) -> None:
if not status:
return
self.queue_message({
'jsonrpc': "2.0",
'method': "notify_status_update",
'params': [status, eventtime]})
def call_method(
self,
method: str,
params: Optional[Union[List, Dict[str, Any]]] = None
) -> Awaitable:
fut = self.eventloop.create_future()
msg = {
'jsonrpc': "2.0",
'method': method,
'id': id(fut)
}
if params is not None:
msg["params"] = params
self.pending_responses[id(fut)] = fut
self.queue_message(msg)
return fut
def send_notification(self, name: str, data: List) -> None:
self.wsm.notify_clients(name, data, [self._uid])
def resolve_pending_response(
self, response_id: int, result: Any
) -> bool:
fut = self.pending_responses.pop(response_id, None)
if fut is None:
return False
if isinstance(result, ServerError):
fut.set_exception(result)
else:
fut.set_result(result)
return True
def close_socket(self, code: int, reason: str) -> None:
raise NotImplementedError("Children must implement close_socket()")
class WebRequest:
def __init__(self,
endpoint: str,
args: Dict[str, Any],
action: Optional[str] = "",
conn: Optional[Subscribable] = None,
ip_addr: str = "",
user: Optional[Dict[str, Any]] = None
) -> None:
self.endpoint = endpoint
self.action = action or ""
self.args = args
self.conn = conn
self.ip_addr: Optional[IPUnion] = None
try:
self.ip_addr = ipaddress.ip_address(ip_addr)
except Exception:
self.ip_addr = None
self.current_user = user
def get_endpoint(self) -> str:
return self.endpoint
def get_action(self) -> str:
return self.action
def get_args(self) -> Dict[str, Any]:
return self.args
def get_subscribable(self) -> Optional[Subscribable]:
return self.conn
def get_client_connection(self) -> Optional[BaseRemoteConnection]:
if isinstance(self.conn, BaseRemoteConnection):
return self.conn
return None
def get_ip_address(self) -> Optional[IPUnion]:
return self.ip_addr
def get_current_user(self) -> Optional[Dict[str, Any]]:
return self.current_user
def _get_converted_arg(self,
key: str,
default: Union[Sentinel, _T],
dtype: Type[_C]
) -> Union[_C, _T]:
if key not in self.args:
if default is Sentinel.MISSING:
raise ServerError(f"No data for argument: {key}")
return default
val = self.args[key]
try:
if dtype is not bool:
return dtype(val)
else:
if isinstance(val, str):
val = val.lower()
if val in ["true", "false"]:
return True if val == "true" else False # type: ignore
elif isinstance(val, bool):
return val # type: ignore
raise TypeError
except Exception:
raise ServerError(
f"Unable to convert argument [{key}] to {dtype}: "
f"value recieved: {val}")
def get(self,
key: str,
default: Union[Sentinel, _T] = Sentinel.MISSING
) -> Union[_T, Any]:
val = self.args.get(key, default)
if val is Sentinel.MISSING:
raise ServerError(f"No data for argument: {key}")
return val
def get_str(self,
key: str,
default: Union[Sentinel, _T] = Sentinel.MISSING
) -> Union[str, _T]:
return self._get_converted_arg(key, default, str)
def get_int(self,
key: str,
default: Union[Sentinel, _T] = Sentinel.MISSING
) -> Union[int, _T]:
return self._get_converted_arg(key, default, int)
def get_float(self,
key: str,
default: Union[Sentinel, _T] = Sentinel.MISSING
) -> Union[float, _T]:
return self._get_converted_arg(key, default, float)
def get_boolean(self,
key: str,
default: Union[Sentinel, _T] = Sentinel.MISSING
) -> Union[bool, _T]:
return self._get_converted_arg(key, default, bool)
def _parse_list(
self,
key: str,
sep: str,
ltype: Type[_C],
count: Optional[int],
default: Union[Sentinel, _T]
) -> Union[List[_C], _T]:
if key not in self.args:
if default is Sentinel.MISSING:
raise ServerError(f"No data for argument: {key}")
return default
value = self.args[key]
if isinstance(value, str):
try:
ret = [ltype(val.strip()) for val in value.split(sep) if val.strip()]
except Exception as e:
raise ServerError(
f"Invalid list format received for argument '{key}', "
"parsing failed."
) from e
elif isinstance(value, list):
for val in value:
if not isinstance(val, ltype):
raise ServerError(
f"Invalid list format for argument '{key}', expected all "
f"values to be of type {ltype.__name__}."
)
# List already parsed
ret = value
else:
raise ServerError(
f"Invalid value received for argument '{key}'. Expected List type, "
f"received {type(value).__name__}"
)
if count is not None and len(ret) != count:
raise ServerError(
f"Invalid list received for argument '{key}', count mismatch. "
f"Expected {count} items, got {len(ret)}."
)
return ret
def get_list(
self,
key: str,
default: Union[Sentinel, _T] = Sentinel.MISSING,
sep: str = ",",
count: Optional[int] = None
) -> Union[_T, List[str]]:
return self._parse_list(key, sep, str, count, default)
class JsonRPC:
def __init__(
self, server: Server, transport: str = "Websocket"
) -> None:
self.methods: Dict[str, RPCCallback] = {}
self.transport = transport
self.sanitize_response = False
self.verbose = server.is_verbose_enabled()
def _log_request(self, rpc_obj: Dict[str, Any], ) -> None:
if not self.verbose:
return
self.sanitize_response = False
output = rpc_obj
method: Optional[str] = rpc_obj.get("method")
params: Dict[str, Any] = rpc_obj.get("params", {})
if isinstance(method, str):
if (
method.startswith("access.") or
method == "machine.sudo.password"
):
self.sanitize_response = True
if params and isinstance(params, dict):
output = copy.deepcopy(rpc_obj)
output["params"] = {key: "<sanitized>" for key in params}
elif method == "server.connection.identify":
output = copy.deepcopy(rpc_obj)
for field in ["access_token", "api_key"]:
if field in params:
output["params"][field] = "<sanitized>"
logging.debug(f"{self.transport} Received::{json.dumps(output)}")
def _log_response(self, resp_obj: Optional[Dict[str, Any]]) -> None:
if not self.verbose:
return
if resp_obj is None:
return
output = resp_obj
if self.sanitize_response and "result" in resp_obj:
output = copy.deepcopy(resp_obj)
output["result"] = "<sanitized>"
self.sanitize_response = False
logging.debug(f"{self.transport} Response::{json.dumps(output)}")
def register_method(self,
name: str,
method: RPCCallback
) -> None:
self.methods[name] = method
def remove_method(self, name: str) -> None:
self.methods.pop(name, None)
async def dispatch(self,
data: str,
conn: Optional[BaseRemoteConnection] = None
) -> Optional[str]:
try:
obj: Union[Dict[str, Any], List[dict]] = json.loads(data)
except Exception:
msg = f"{self.transport} data not json: {data}"
logging.exception(msg)
err = self.build_error(-32700, "Parse error")
return json.dumps(err)
if isinstance(obj, list):
responses: List[Dict[str, Any]] = []
for item in obj:
self._log_request(item)
resp = await self.process_object(item, conn)
if resp is not None:
self._log_response(resp)
responses.append(resp)
if responses:
return json.dumps(responses)
else:
self._log_request(obj)
response = await self.process_object(obj, conn)
if response is not None:
self._log_response(response)
return json.dumps(response)
return None
async def process_object(self,
obj: Dict[str, Any],
conn: Optional[BaseRemoteConnection]
) -> Optional[Dict[str, Any]]:
req_id: Optional[int] = obj.get('id', None)
rpc_version: str = obj.get('jsonrpc', "")
if rpc_version != "2.0":
return self.build_error(-32600, "Invalid Request", req_id)
method_name = obj.get('method', Sentinel.MISSING)
if method_name is Sentinel.MISSING:
self.process_response(obj, conn)
return None
if not isinstance(method_name, str):
return self.build_error(
-32600, "Invalid Request", req_id, method_name=str(method_name)
)
method = self.methods.get(method_name, None)
if method is None:
return self.build_error(
-32601, "Method not found", req_id, method_name=method_name
)
params: Dict[str, Any] = {}
if 'params' in obj:
params = obj['params']
if not isinstance(params, dict):
return self.build_error(
-32602, f"Invalid params:", req_id, method_name=method_name
)
return await self.execute_method(method_name, method, req_id, conn, params)
def process_response(
self, obj: Dict[str, Any], conn: Optional[BaseRemoteConnection]
) -> None:
if conn is None:
logging.debug(f"RPC Response to non-socket request: {obj}")
return
response_id = obj.get("id")
if response_id is None:
logging.debug(f"RPC Response with null ID: {obj}")
return
result = obj.get("result")
if result is None:
name = conn.client_data["name"]
error = obj.get("error")
msg = f"Invalid Response: {obj}"
code = -32600
if isinstance(error, dict):
msg = error.get("message", msg)
code = error.get("code", code)
msg = f"{name} rpc error: {code} {msg}"
ret = ServerError(msg, 418)
else:
ret = result
conn.resolve_pending_response(response_id, ret)
async def execute_method(
self,
method_name: str,
callback: RPCCallback,
req_id: Optional[int],
conn: Optional[BaseRemoteConnection],
params: Dict[str, Any]
) -> Optional[Dict[str, Any]]:
if conn is not None:
params["_socket_"] = conn
try:
result = await callback(params)
except TypeError as e:
return self.build_error(
-32602, f"Invalid params:\n{e}", req_id, True, method_name
)
except ServerError as e:
code = e.status_code
if code == 404:
code = -32601
elif code == 401:
code = -32602
return self.build_error(code, str(e), req_id, True, method_name)
except Exception as e:
return self.build_error(-31000, str(e), req_id, True, method_name)
if req_id is None:
return None
else:
return self.build_result(result, req_id)
def build_result(self, result: Any, req_id: int) -> Dict[str, Any]:
return {
'jsonrpc': "2.0",
'result': result,
'id': req_id
}
def build_error(
self,
code: int,
msg: str,
req_id: Optional[int] = None,
is_exc: bool = False,
method_name: str = ""
) -> Dict[str, Any]:
if method_name:
method_name = f"Requested Method: {method_name}, "
log_msg = f"JSON-RPC Request Error - {method_name}Code: {code}, Message: {msg}"
if is_exc and self.verbose:
logging.exception(log_msg)
else:
logging.info(log_msg)
return {
'jsonrpc': "2.0",
'error': {'code': code, 'message': msg},
'id': req_id
}