实现ssdp扫描功能

This commit is contained in:
2025-05-07 11:12:22 +08:00
parent 16d41bd11d
commit 3791adfec5

View File

@@ -101,6 +101,7 @@ class ZeroconfRegistrar:
else:
# Use the UUID. First 8 hex digits should be unique enough
instance_name = f"Moonraker-{instance_uuid[:8]}"
instance_name = "CreatBot"
hi = self.server.get_host_info()
host = self.mdns_name
zc_service_props = {
@@ -167,9 +168,9 @@ class ZeroconfRegistrar:
SSDP_ADDR = ("239.255.255.250", 1900)
SSDP_SERVER_ID = "Moonraker SSDP/UPNP Server"
SSDP_SERVER_ID = "CreatBot SSDP/UPNP Server"
SSDP_MAX_AGE = 1800
SSDP_DEVICE_TYPE = "urn:arksine.github.io:device:Moonraker:1"
SSDP_DEVICE_TYPE = "urn:creatbot-com:device:3dprinter:2"
SSDP_DEVICE_XML = """
<?xml version="1.0"?>
<root xmlns="urn:schemas-upnp-org:device-1-0" configId="{config_id}">
@@ -180,12 +181,12 @@ SSDP_DEVICE_XML = """
<device>
<deviceType>{device_type}</deviceType>
<friendlyName>{friendly_name}</friendlyName>
<manufacturer>Arksine</manufacturer>
<manufacturerURL>https://github.com/Arksine/moonraker</manufacturerURL>
<manufacturer>CreatBot</manufacturer>
<manufacturerURL>https://www.creatbot.com</manufacturerURL>
<modelDescription>API Server for Klipper</modelDescription>
<modelName>Moonraker</modelName>
<modelName>CreatBot</modelName>
<modelNumber>{model_number}</modelNumber>
<modelURL>https://github.com/Arksine/moonraker</modelURL>
<modelURL>https://github.com/CreatBotOfficail</modelURL>
<serialNumber>{serial_number}</serialNumber>
<UDN>uuid:{device_uuid}</UDN>
<presentationURL>{presentation_url}</presentationURL>
@@ -197,7 +198,10 @@ class SSDPServer(asyncio.protocols.DatagramProtocol):
def __init__(self, config: ConfigHelper) -> None:
self.server = config.get_server()
self.unique_id = uuid.UUID(self.server.get_app_args()["instance_uuid"])
self.name: str = "Moonraker"
self.name: str = "CreatBot"
machine: Machine = self.server.lookup_component("machine")
self.serial_number = machine.get_system_info().get("cpu_info", {}).get("serial_number", "N/A")
self.serial_number = self.serial_number[1:].upper()
self.base_url: str = ""
self.response_headers: List[str] = []
self.registered: bool = False
@@ -280,10 +284,18 @@ class SSDPServer(asyncio.protocols.DatagramProtocol):
if len(name) > 64:
name = name[:64]
self.name = name
model = self.name
device_name = self.name
if "(" in self.name:
model = self.name.split("(", 1)[-1].rsplit("-", 1)[0].rstrip(")")
elif "-" in self.name:
model = self.name.rsplit("-", 1)[0]
if "(" in self.name and ")" in self.name:
device_name = self.name.split("(", 1)[1].split(")", 1)[0]
app: MoonrakerApp = self.server.lookup_component("application")
self.base_url = f"http://{host_name_or_ip}:{port}{app.route_prefix}"
self.base_url = f"http://{host_name_or_ip}"
self.response_headers = [
f"USN: uuid:{self.unique_id}::upnp:rootdevice",
f"USN: uuid:{self.serial_number}::upnp:rootdevice::urn:creatbot-com:device:3dprinter:2",
f"LOCATION: {self.base_url}/server/zeroconf/ssdp",
"ST: upnp:rootdevice",
"EXT:",
@@ -291,6 +303,8 @@ class SSDPServer(asyncio.protocols.DatagramProtocol):
f"CACHE-CONTROL: max-age={SSDP_MAX_AGE}",
f"BOOTID.UPNP.ORG: {self.boot_id}",
f"CONFIGID.UPNP.ORG: {self.config_id}",
f"DEVICE-MODEL: {model}",
f"DEVICE-NAME: {device_name}",
]
self.registered = True
advertisements = self._build_notifications("ssdp:alive")
@@ -302,14 +316,20 @@ class SSDPServer(asyncio.protocols.DatagramProtocol):
async def _handle_xml_request(self, web_request: WebRequest) -> str:
if not self.registered:
raise self.server.error("Moonraker SSDP Device not registered", 404)
raise self.server.error("CreatBot SSDP Device not registered", 404)
app_args = self.server.get_app_args()
if "(" in self.name:
model = self.name.split("(", 1)[-1].rsplit("-", 1)[0].rstrip(")")
elif "-" in self.name:
model = self.name.rsplit("-", 1)[0]
else:
model = self.name
return SSDP_DEVICE_XML.format(
device_type=SSDP_DEVICE_TYPE,
config_id=str(self.config_id),
friendly_name=self.name,
model_number=app_args["software_version"],
serial_number=self.unique_id.hex,
model_number=model,
serial_number=self.serial_number,
device_uuid=str(self.unique_id),
presentation_url=self.base_url
)
@@ -360,7 +380,7 @@ class SSDPServer(asyncio.protocols.DatagramProtocol):
):
# Not a discovery request
return
if headers.get("ST") not in ["upnp:rootdevice", "ssdp:all"]:
if headers.get("ST") not in ["upnp:rootdevice", "ssdp:all", "urn:creatbot-com:device:3dprinter:2",]:
# Service Type doesn't apply
return
if self.response_handle is not None:
@@ -390,7 +410,7 @@ class SSDPServer(asyncio.protocols.DatagramProtocol):
notify_types = [
("upnp:rootdevice", f"uuid:{self.unique_id}::upnp:rootdevice"),
(f"uuid:{self.unique_id}", f"uuid:{self.unique_id}"),
(SSDP_DEVICE_TYPE, f"uuid:{self.unique_id}::{SSDP_DEVICE_TYPE}")
(SSDP_DEVICE_TYPE, f"uuid:{self.serial_number}::{SSDP_DEVICE_TYPE}"),
]
for (nt, usn) in notify_types:
notifications.append(