Compare commits

...

11 Commits

Author SHA1 Message Date
c925c17669 重置密码后重置当前已鉴权的连接状态 2025-07-02 16:01:09 +08:00
f60db3b002 添加MQTT消息鉴权 2025-07-02 15:34:03 +08:00
7aeeb07a56 规范MQTT报错处理
1. 当payload可以正常解析为Json时,报错信息放在data字段
2. 删掉日志中的警告记录,防止MQTT处理异常时持续灌水日志
2025-07-02 11:53:17 +08:00
b61556db47 添加默认内置用户
内置用户属于半保留用户,不能删除,可以登录登出,修改密码
内置用户用户MQTT鉴权,Web鉴权登录,本地连接鉴权登录
2025-07-02 11:42:18 +08:00
zkk
8991a260f3 实现webrtc信令透传的功能 2025-07-02 10:16:04 +08:00
e85557a630 实现CreatCloud的API支持 2025-06-27 16:17:13 +08:00
436c0aa6dc 继承mqtt库原生订阅,V5版本订阅时默认启用noLocal选项 2025-06-27 16:08:48 +08:00
01d2358f91 初步实现对CreatCloud的支持 2025-06-25 10:10:49 +08:00
d5e3a39fb2 mqtt通配符订阅的回调函数必须发送主题参数 2025-06-25 10:09:37 +08:00
895f1c7669 封装获取设备唯一识别号的函数 2025-06-24 18:23:24 +08:00
8f2c6eb982 mqtt支持通配符订阅 2025-06-24 14:20:56 +08:00
4 changed files with 254 additions and 8 deletions

View File

@ -65,6 +65,7 @@ USER_TABLE = "authorized_users"
AUTH_SOURCES = ["moonraker", "ldap"]
HASH_ITER = 100000
API_USER = "_API_KEY_USER_"
SUPER_USER = "_SUPER_USER_"
TRUSTED_USER = "_TRUSTED_USER_"
RESERVED_USERS = [API_USER, TRUSTED_USER]
JWT_EXP_TIME = datetime.timedelta(hours=1)
@ -181,6 +182,7 @@ class Authorization:
self.trusted_ips: List[IPAddr] = []
self.trusted_ranges: List[IPNetwork] = []
self.trusted_domains: List[str] = []
self.trusted_mqtt_clients: List[str] = [] # MQTT client id
for val in config.getlist('trusted_clients', []):
# Check IP address
try:
@ -315,6 +317,17 @@ class Authorization:
self.users[API_USER] = UserInfo(username=API_USER, password=self.api_key)
else:
self.api_key = api_user.password
super_user: Optional[UserInfo] = self.users.get(SUPER_USER, None)
if super_user is None:
need_sync = True
salt = secrets.token_bytes(32)
hashed_pass = hashlib.pbkdf2_hmac(
'sha256', 'admin'.encode(), salt, HASH_ITER).hex()
self.users[SUPER_USER] = UserInfo(
username=SUPER_USER,
password=hashed_pass,
salt=salt.hex(),
)
for username, user_info in list(self.users.items()):
if username == API_USER:
continue
@ -461,7 +474,7 @@ class Authorization:
) -> Dict[str, List[Dict[str, Any]]]:
user_list = []
for user in self.users.values():
if user.username == API_USER:
if user.username in [API_USER, SUPER_USER]:
continue
user_list.append({
'username': user.username,
@ -495,7 +508,20 @@ class Authorization:
new_hashed_pass = hashlib.pbkdf2_hmac(
'sha256', new_pass.encode(), salt, HASH_ITER).hex()
self.users[username].password = new_hashed_pass
if username == SUPER_USER:
self.trusted_mqtt_clients.clear()
jwk_id: Optional[str] = self.users[username].jwk_id
self.users[username].jwt_secret = None
self.users[username].jwk_id = None
if jwk_id is not None:
self.public_jwks.pop(jwk_id, None)
await self._sync_user(username)
eventloop = self.server.get_event_loop()
eventloop.delay_callback(
.005, self.server.send_event,
"authorization:user_logged_out",
{'username': username}
)
return {
'username': username,
'action': "user_password_reset"
@ -602,7 +628,7 @@ class Authorization:
curname = current_user.username
if curname == username:
raise self.server.error(f"Cannot delete logged in user {curname}")
if username in RESERVED_USERS:
if username in RESERVED_USERS + [SUPER_USER]:
raise self.server.error(
f"Invalid Request for reserved user {username}")
user_info: Optional[UserInfo] = self.users.get(username)
@ -707,6 +733,22 @@ class Authorization:
return self.users[API_USER]
raise self.server.error("Invalid API Key", 401)
def validate_mqtt(self, uuid: str, data: Dict) -> bool:
username: str = data.get("username")
password: str = data.get("password")
if username != SUPER_USER:
return False
user_info = self.users[username]
salt = bytes.fromhex(user_info.salt)
hashed_pass = hashlib.pbkdf2_hmac(
'sha256', password.encode(), salt, HASH_ITER).hex()
if (valid := hashed_pass == user_info.password):
self.trusted_mqtt_clients.append(uuid)
return valid
def check_mqtt(self, uuid: str) -> bool:
return uuid in self.trusted_mqtt_clients
def _load_private_key(self, secret: str) -> Signer:
try:
key = Signer(bytes.fromhex(secret))

View File

@ -264,6 +264,10 @@ class Machine:
def get_moonraker_service_info(self):
return dict(self.moonraker_service_info)
def get_machine_uuid(self) -> str:
uuid = self.system_info["cpu_info"]["serial_number"] or str(__import__("uuid").getnode())
return uuid.zfill(15)[-15:].upper()
async def wait_for_init(
self, timeout: Optional[float] = None
) -> None:

View File

@ -10,6 +10,7 @@ import asyncio
import logging
import pathlib
import ssl
import re
from collections import deque
import paho.mqtt.client as paho_mqtt
import paho.mqtt
@ -41,8 +42,11 @@ if TYPE_CHECKING:
from ..common import JsonRPC, APIDefinition
from ..eventloop import FlexTimer
from .klippy_apis import KlippyAPI
from .machine import Machine
from .authorization import Authorization
FlexCallback = Callable[[bytes], Optional[Coroutine]]
RPCCallback = Callable[..., Coroutine]
AuthComp = Optional[Authorization]
PAHO_MQTT_VERSION = tuple([int(p) for p in paho.mqtt.__version__.split(".")])
DUP_API_REQ_CODE = -10000
@ -220,6 +224,23 @@ class ExtPahoClient(paho_mqtt.Client):
return self._send_connect(self._keepalive)
# MQTTv5 apply noLocal option by default
def subscribe(self, topic, qos=0, options=None):
if self._protocol == paho_mqtt.MQTTv5:
if options is None:
options = paho_mqtt.SubscribeOptions(qos=qos, noLocal=True)
qos = 0
if isinstance(topic, list):
def formatMqttV5Tuple(topic, options):
if isinstance(options, paho_mqtt.SubscribeOptions):
return (topic, options)
elif isinstance(options, int):
if 0 <= options <= 2:
return (topic, paho_mqtt.SubscribeOptions(qos=options, noLocal=True))
raise ValueError(f"Invalid QoS level: {options}")
raise ValueError(f"Invalid options type: {type(options)}")
topic = [formatMqttV5Tuple(t, o) for t, o in topic]
return super().subscribe(topic, qos, options)
class SubscriptionHandle:
def __init__(self, topic: str, callback: FlexCallback) -> None:
@ -340,6 +361,7 @@ class MQTTClient(APITransport):
f"Invalid value '{protocol}' for option 'mqtt_protocol' "
"in section [mqtt]. Must be one of "
f"{MQTT_PROTOCOLS.values()}")
self.support_creatcloud = config.getboolean("support_creatcloud", False)
self.instance_name = config.get('instance_name', socket.gethostname())
if '+' in self.instance_name or '#' in self.instance_name:
raise config.error(
@ -353,6 +375,9 @@ class MQTTClient(APITransport):
self.publish_split_status = \
config.getboolean("publish_split_status", False)
client_id: Optional[str] = config.get("client_id", None)
if client_id is None and self.support_creatcloud:
machine: Machine = self.server.lookup_component("machine")
self.client_id = client_id = machine.get_machine_uuid()
if PAHO_MQTT_VERSION < (2, 0):
self.client = ExtPahoClient(client_id, protocol=self.protocol)
else:
@ -370,6 +395,7 @@ class MQTTClient(APITransport):
self.disconnect_evt: Optional[asyncio.Event] = None
self.connect_task: Optional[asyncio.Task] = None
self.subscribed_topics: SubscribedDict = {}
self.regex_topics_map: Dict[str, re.Pattern] = {}
self.pending_responses: List[asyncio.Future] = []
self.pending_acks: Dict[int, asyncio.Future] = {}
@ -392,6 +418,16 @@ class MQTTClient(APITransport):
self.klipper_status_topic = f"{self.instance_name}/klipper/status"
self.klipper_state_prefix = f"{self.instance_name}/klipper/state"
self.moonraker_status_topic = f"{self.instance_name}/moonraker/status"
# CreatCloud API
if self.support_creatcloud:
self.creatcloud_topic_prefix = "CreatCloud/Klipper"
self.api_request_topic = f"{self.creatcloud_topic_prefix}/{client_id}/+/Action"
self.api_resp_topic = f"{self.creatcloud_topic_prefix}/{client_id}/000000/Action"
self.klipper_status_topic = f"{self.creatcloud_topic_prefix}/{client_id}/Status"
self.klipper_state_prefix = f"{self.creatcloud_topic_prefix}/{client_id}/State"
self.moonraker_status_topic = f"{self.creatcloud_topic_prefix}/{client_id}/Public"
status_cfg: Dict[str, str] = config.getdict(
"status_objects", {}, allow_empty_fields=True
)
@ -420,9 +456,13 @@ class MQTTClient(APITransport):
self.timestamp_deque: Deque = deque(maxlen=20)
self.api_qos = config.getint('api_qos', self.qos)
if self.support_creatcloud:
api_func = self._process_creatcloud_request
else:
api_func = self._process_api_request
if config.getboolean("enable_moonraker_api", True):
self.subscribe_topic(self.api_request_topic,
self._process_api_request,
api_func,
self.api_qos)
self.server.register_remote_method("publish_mqtt_topic",
@ -471,17 +511,31 @@ class MQTTClient(APITransport):
self.status_cache = {}
self._publish_status_update(payload, self.last_status_time)
def _get_topic_handles(self, topic) -> Optional[tuple[list, bool]]:
if topic in self.subscribed_topics:
return self.subscribed_topics[topic][1], False
for wildcardTopic, pattern in self.regex_topics_map.items():
if pattern.match(topic):
cb_hdls = self.subscribed_topics[wildcardTopic][1].copy()
for cb in cb_hdls:
cb.topic = topic
return cb_hdls, True
else:
return None
def _on_message(self,
client: str,
user_data: Any,
message: paho_mqtt.MQTTMessage
) -> None:
topic = message.topic
if topic in self.subscribed_topics:
cb_hdls = self.subscribed_topics[topic][1]
cb_hdls = self._get_topic_handles(topic)
if cb_hdls:
cb_hdls, wildcard = cb_hdls
for hdl in cb_hdls:
self.eventloop.register_callback(
hdl.callback, message.payload)
hdl.callback, message.payload,
*((hdl.topic,) if wildcard else ()))
else:
logging.debug(
f"Unregistered MQTT Topic Received: {topic}, "
@ -602,16 +656,24 @@ class MQTTClient(APITransport):
def is_connected(self) -> bool:
return self.connect_evt.is_set()
def _mqtt_topic_to_regex(self, topic) -> re.Pattern:
escaped = re.escape(topic)
escaped = escaped.replace(r'\+', r'[^/]+')
escaped = escaped.replace(r'\#', r'.*')
return re.compile(f'^{escaped}$')
def subscribe_topic(self,
topic: str,
callback: FlexCallback,
qos: Optional[int] = None
) -> SubscriptionHandle:
if '#' in topic or '+' in topic:
raise self.server.error("Wildcards may not be used")
# if '#' in topic or '+' in topic:
# raise self.server.error("Wildcards may not be used")
qos = qos or self.qos
if qos > 2 or qos < 0:
raise self.server.error("QOS must be between 0 and 2")
if ('#' in topic or '+' in topic) and topic not in self.regex_topics_map:
self.regex_topics_map[topic] = self._mqtt_topic_to_regex(topic)
hdl = SubscriptionHandle(topic, callback)
sub_handles = [hdl]
need_sub = True
@ -777,6 +839,55 @@ class MQTTClient(APITransport):
await self.publish_topic(self.api_resp_topic, response,
self.api_qos)
async def _process_creatcloud_request(self, payload: bytes, topic: str = None) -> None:
try:
request: Dict[str, Any] = jsonw.loads(payload)
msgVer = request.get("ver")
response = request.copy()
if msgVer == 3: # msg version is 3 or 3.0
msgIMEI = request.get("imei")
msgUUID = request.get("uuid")
msgCmd = request.get("cmd")
msgData = request.get("data")
response["data"] = ""
if msgIMEI == self.client_id:
auth: AuthComp = self.server.lookup_component('authorization', None)
if auth is None or auth.check_mqtt(msgUUID) or msgCmd == 'PWD':
if msgCmd == 'PWD':
if auth is not None:
response['data'] = 'OK' if auth.validate_mqtt(msgUUID, msgData) else 'INCORRECT'
else:
response['data'] = 'IGNORE'
elif msgCmd == 'API':
rpc: JsonRPC = self.server.lookup_component("jsonrpc")
result = await rpc.dispatch(jsonw.dumps(msgData), self)
response["data"] = jsonw.loads(result)
elif msgCmd == 'SDP':
webrtc_bridge = self.server.lookup_component("webrtc_bridge", None)
if webrtc_bridge:
response["data"] = await webrtc_bridge.handle_sdp(msgData, topic)
else:
response["data"] = {"type": "error", "message": "WebRTC Bridge component not available"}
else:
response["data"] = f"error: Unknown MQTT message cmd: {msgCmd}"
else:
response['data'] = f"error: MQTT UserID [{msgUUID}] needs authentication"
else:
response["data"] = f"error: MQTT client_id [{msgIMEI}] does not match"
else:
response["data"] = f"error: MQTT message version [{msgVer}] is not supported"
except jsonw.JSONDecodeError:
data = payload.decode()
response = f"MQTT payload is not valid json: {data}"
logging.exception(response)
except Exception as e:
response = None
logging.exception(e)
if response is not None and topic is not None:
await self.publish_topic(topic, response, self.api_qos)
@property
def transport_type(self) -> TransportType:
return TransportType.MQTT

View File

@ -0,0 +1,89 @@
from __future__ import annotations
import logging
from typing import TYPE_CHECKING, Dict, Any, List
from tornado.httpclient import AsyncHTTPClient, HTTPRequest, HTTPError
from urllib.parse import quote
if TYPE_CHECKING:
from ..confighelper import ConfigHelper
API_WEBRTC_URL = "http://localhost:1984/api/webrtc"
class WebRTCBridge:
def __init__(self, config: ConfigHelper):
default_cameras = config.getlist("camera_name", ["Camera"])
self.default_cameras = []
for camera in default_cameras:
self.default_cameras.extend(
[cam.strip() for cam in camera.split(",") if cam.strip()]
)
logging.info(f"WebRTC Bridge initialized with cameras: {self.default_cameras}")
def _parse_cameras(self, cameras) -> List[str]:
if isinstance(cameras, str):
return [cam.strip() for cam in cameras.split(",") if cam.strip()]
elif isinstance(cameras, list):
result = []
for camera in cameras:
if isinstance(camera, str):
result.extend(
[cam.strip() for cam in camera.split(",") if cam.strip()]
)
else:
result.append(str(camera).strip())
return result
else:
return self.default_cameras
def _build_url(self, cameras: List[str]) -> str:
params = "&".join(f"src={quote(cam)}" for cam in cameras if cam)
return f"{API_WEBRTC_URL}?{params}"
async def handle_sdp(self, data: Dict[str, Any], topic: str) -> Dict[str, Any]:
try:
sdp = data.get("sdp", "")
if not sdp:
return {"type": "error", "message": "Missing SDP in offer"}
cameras = self._parse_cameras(data.get("cameras"))
logging.info(f"Received SDP offer for cameras: {cameras}")
if not cameras:
return {"type": "error", "message": "No cameras specified"}
url = self._build_url(cameras)
http_client = AsyncHTTPClient()
try:
request = HTTPRequest(
url=url,
method="POST",
body=sdp,
headers={
"Content-Type": "application/sdp",
"Accept": "application/sdp",
},
request_timeout=10,
)
logging.debug(f"Sending SDP offer to: {url}")
response = await http_client.fetch(request)
if response.code in (200, 201):
logging.info(f"Received SDP answer for cameras: {cameras}")
return {"type": "answer", "sdp": response.body.decode("utf-8")}
else:
error_msg = response.body.decode("utf-8")
logging.error(f"go2rtc API error {response.code}: {error_msg}")
return {"type": "error", "message": error_msg}
except HTTPError as e:
logging.error(f"HTTP error: {e}")
return {"type": "error", "message": str(e)}
finally:
http_client.close()
except Exception as e:
logging.error(f"SDP handling error: {e}")
return {"type": "error", "message": str(e)}
def load_component(config: ConfigHelper) -> WebRTCBridge:
return WebRTCBridge(config)