542 lines
20 KiB
Python
542 lines
20 KiB
Python
# Support for Moonraker/Klipper/Client announcements
|
|
#
|
|
# Copyright (C) 2022 Eric Callahan <arksine.code@gmail.com>
|
|
#
|
|
# This file may be distributed under the terms of the GNU GPLv3 license
|
|
|
|
from __future__ import annotations
|
|
import datetime
|
|
import pathlib
|
|
import asyncio
|
|
import logging
|
|
import email.utils
|
|
import xml.etree.ElementTree as etree
|
|
from typing import (
|
|
TYPE_CHECKING,
|
|
Awaitable,
|
|
List,
|
|
Dict,
|
|
Any,
|
|
Optional,
|
|
Union
|
|
)
|
|
if TYPE_CHECKING:
|
|
from confighelper import ConfigHelper
|
|
from websockets import WebRequest
|
|
from http_client import HttpClient
|
|
from components.database import MoonrakerDatabase
|
|
|
|
|
|
MOONLIGHT_URL = "https://arksine.github.io/moonlight"
|
|
UPDATE_CHECK_TIME = 1800.
|
|
etree.register_namespace("moonlight", MOONLIGHT_URL)
|
|
|
|
class Announcements:
|
|
def __init__(self, config: ConfigHelper) -> None:
|
|
self.server = config.get_server()
|
|
self.entry_mgr = EntryManager(config)
|
|
self.eventloop = self.server.get_event_loop()
|
|
self.update_timer = self.eventloop.register_timer(
|
|
self._handle_update_timer
|
|
)
|
|
self.request_lock = asyncio.Lock()
|
|
self.dev_mode = config.getboolean("dev_mode", False)
|
|
self.subscriptions: Dict[str, RssFeed] = {
|
|
"moonraker": RssFeed("moonraker", self.entry_mgr, self.dev_mode),
|
|
"klipper": RssFeed("klipper", self.entry_mgr, self.dev_mode)
|
|
}
|
|
self.stored_feeds: List[str] = []
|
|
sub_list: List[str] = config.getlist("subscriptions", [])
|
|
self.configured_feeds: List[str] = ["moonraker", "klipper"]
|
|
for sub in sub_list:
|
|
sub = sub.lower()
|
|
if sub in self.subscriptions:
|
|
continue
|
|
self.configured_feeds.append(sub)
|
|
self.subscriptions[sub] = RssFeed(
|
|
sub, self.entry_mgr, self.dev_mode
|
|
)
|
|
|
|
self.server.register_endpoint(
|
|
"/server/announcements/list", ["GET"],
|
|
self._list_announcements
|
|
)
|
|
self.server.register_endpoint(
|
|
"/server/announcements/dismiss", ["POST"],
|
|
self._handle_dismiss_request
|
|
)
|
|
self.server.register_endpoint(
|
|
"/server/announcements/update", ["POST"],
|
|
self._handle_update_request
|
|
)
|
|
self.server.register_endpoint(
|
|
"/server/announcements/feed", ["POST", "DELETE"],
|
|
self._handle_feed_request
|
|
)
|
|
self.server.register_endpoint(
|
|
"/server/announcements/feeds", ["GET"],
|
|
self._handle_list_feeds
|
|
)
|
|
self.server.register_notification(
|
|
"announcements:dismissed", "announcement_dismissed"
|
|
)
|
|
self.server.register_notification(
|
|
"announcements:entries_updated", "announcement_update"
|
|
)
|
|
self.server.register_notification(
|
|
"announcements:dismiss_wake", "announcement_wake"
|
|
)
|
|
|
|
async def component_init(self) -> None:
|
|
db: MoonrakerDatabase = self.server.lookup_component("database")
|
|
stored_feeds: List[str] = await db.get_item(
|
|
"moonraker", "announcements.stored_feeds", []
|
|
)
|
|
self.stored_feeds = stored_feeds
|
|
for name in stored_feeds:
|
|
if name in self.subscriptions:
|
|
continue
|
|
feed = RssFeed(name, self.entry_mgr, self.dev_mode)
|
|
self.subscriptions[name] = feed
|
|
async with self.request_lock:
|
|
await self.entry_mgr.initialize()
|
|
for sub in self.subscriptions.values():
|
|
await sub.initialize()
|
|
self.update_timer.start()
|
|
|
|
async def _handle_update_timer(self, eventtime: float) -> float:
|
|
changed = False
|
|
entries: List[Dict[str, Any]] = []
|
|
async with self.request_lock:
|
|
for sub in self.subscriptions.values():
|
|
ret = await sub.update_entries()
|
|
changed |= ret
|
|
if changed:
|
|
entries = await self.entry_mgr.list_entries()
|
|
self.server.send_event(
|
|
"announcements:entries_updated", {"entries": entries}
|
|
)
|
|
return eventtime + UPDATE_CHECK_TIME
|
|
|
|
async def _handle_dismiss_request(
|
|
self, web_request: WebRequest
|
|
) -> Dict[str, Any]:
|
|
async with self.request_lock:
|
|
entry_id: str = web_request.get_str("entry_id")
|
|
wake_time: Optional[int] = web_request.get_int("wake_time", None)
|
|
await self.entry_mgr.dismiss_entry(entry_id, wake_time)
|
|
return {
|
|
"entry_id": entry_id
|
|
}
|
|
|
|
async def _list_announcements(
|
|
self, web_request: WebRequest
|
|
) -> Dict[str, Any]:
|
|
async with self.request_lock:
|
|
incl_dsm = web_request.get_boolean("include_dismissed", True)
|
|
entries = await self.entry_mgr.list_entries(incl_dsm)
|
|
return {
|
|
"entries": entries,
|
|
"feeds": list(self.subscriptions.keys())
|
|
}
|
|
|
|
async def _handle_update_request(
|
|
self, web_request: WebRequest
|
|
) -> Dict[str, Any]:
|
|
subs: Optional[Union[str, List[str]]]
|
|
subs = web_request.get("subscriptions", None)
|
|
if isinstance(subs, str):
|
|
subs = [sub.strip() for sub in subs.split(",") if sub.strip()]
|
|
elif subs is None:
|
|
subs = list(self.subscriptions.keys())
|
|
for sub in subs:
|
|
if sub not in self.subscriptions:
|
|
raise self.server.error(f"No subscription for {sub}")
|
|
async with self.request_lock:
|
|
changed = False
|
|
for sub in subs:
|
|
ret = await self.subscriptions[sub].update_entries()
|
|
changed |= ret
|
|
entries = await self.entry_mgr.list_entries()
|
|
if changed:
|
|
self.eventloop.delay_callback(
|
|
.05, self.server.send_event,
|
|
"announcements:entries_updated",
|
|
{"entries": entries})
|
|
return {
|
|
"entries": entries,
|
|
"modified": changed
|
|
}
|
|
|
|
async def _handle_list_feeds(
|
|
self, web_request: WebRequest
|
|
) -> Dict[str, Any]:
|
|
return {"feeds": list(self.subscriptions.keys())}
|
|
|
|
async def _handle_feed_request(
|
|
self, web_request: WebRequest
|
|
) -> Dict[str, Any]:
|
|
action = web_request.get_action()
|
|
name: str = web_request.get("name")
|
|
name = name.lower()
|
|
changed: bool = False
|
|
db: MoonrakerDatabase = self.server.lookup_component("database")
|
|
result = "skipped"
|
|
if action == "POST":
|
|
if name not in self.subscriptions:
|
|
feed = RssFeed(name, self.entry_mgr, self.dev_mode)
|
|
self.subscriptions[name] = feed
|
|
await feed.initialize()
|
|
changed = await feed.update_entries()
|
|
self.stored_feeds.append(name)
|
|
db.insert_item(
|
|
"moonraker", "announcements.stored_feeds", self.stored_feeds
|
|
)
|
|
result = "added"
|
|
elif action == "DELETE":
|
|
if name not in self.stored_feeds:
|
|
raise self.server.error(f"Feed '{name}' not stored")
|
|
if name in self.configured_feeds:
|
|
raise self.server.error(
|
|
f"Feed '{name}' exists in the configuration, cannot remove"
|
|
)
|
|
self.stored_feeds.remove(name)
|
|
db.insert_item(
|
|
"moonraker", "announcements.stored_feeds", self.stored_feeds
|
|
)
|
|
if name in self.subscriptions:
|
|
del self.subscriptions[name]
|
|
changed = await self.entry_mgr.prune_by_feed(name)
|
|
logging.info(f"Removed Announcement Feed: {name}")
|
|
result = "removed"
|
|
else:
|
|
raise self.server.error(f"Feed does not exist: {name}")
|
|
if changed:
|
|
entries = await self.entry_mgr.list_entries()
|
|
self.eventloop.delay_callback(
|
|
.05, self.server.send_event, "announcements:entries_updated",
|
|
{"entries": entries}
|
|
)
|
|
return {
|
|
"feed": name,
|
|
"action": result
|
|
}
|
|
|
|
def add_internal_announcement(
|
|
self, title: str, desc: str, url: str, priority: str, feed: str
|
|
) -> Dict[str, Any]:
|
|
date = datetime.datetime.utcnow()
|
|
entry_id: str = f"{feed}/{date.isoformat(timespec='seconds')}"
|
|
entry = {
|
|
"entry_id": entry_id,
|
|
"url": url,
|
|
"title": title,
|
|
"description": desc,
|
|
"priority": priority,
|
|
"date": date.timestamp(),
|
|
"dismissed": False,
|
|
"date_dismissed": None,
|
|
"dismiss_wake": None,
|
|
"source": "internal",
|
|
"feed": feed
|
|
}
|
|
self.entry_mgr.add_entry(entry)
|
|
return entry
|
|
|
|
async def remove_announcement(self, entry_id: str) -> None:
|
|
ret = await self.entry_mgr.remove_entry(entry_id)
|
|
if ret is not None:
|
|
entries = await self.entry_mgr.list_entries()
|
|
self.server.send_event(
|
|
"announcements:entries_updated", {"entries": entries}
|
|
)
|
|
async def dismiss_announcement(
|
|
self, entry_id, wake_time: Optional[int] = None
|
|
) -> None:
|
|
await self.entry_mgr.dismiss_entry(entry_id, wake_time)
|
|
|
|
async def get_announcements(
|
|
self, include_dismissed: bool = False
|
|
) -> List[Dict[str, Any]]:
|
|
return await self.entry_mgr.list_entries(include_dismissed)
|
|
|
|
def close(self):
|
|
self.entry_mgr.close()
|
|
|
|
class EntryManager:
|
|
def __init__(self, config: ConfigHelper) -> None:
|
|
self.server = config.get_server()
|
|
database: MoonrakerDatabase
|
|
database = self.server.lookup_component("database")
|
|
database.register_local_namespace("announcements")
|
|
self.announce_db = database.wrap_namespace("announcements")
|
|
self.entry_id_map: Dict[str, str] = {}
|
|
self.next_key = 0
|
|
self.dismiss_handles: Dict[str, asyncio.TimerHandle] = {}
|
|
|
|
async def initialize(self) -> None:
|
|
last_key = ""
|
|
eventloop = self.server.get_event_loop()
|
|
curtime = datetime.datetime.utcnow().timestamp()
|
|
for key, entry in await self.announce_db.items():
|
|
last_key = key
|
|
aid = entry["entry_id"]
|
|
self.entry_id_map[aid] = key
|
|
if entry["dismissed"]:
|
|
wake_time: Optional[float] = entry.get("dismiss_wake")
|
|
if wake_time is not None:
|
|
time_diff = wake_time - curtime
|
|
if time_diff - 10. < 0.:
|
|
# announcement is near or past wake time
|
|
entry["dismissed"] = False
|
|
entry["date_dismissed"] = None
|
|
entry["dismiss_wake"] = None
|
|
self.announce_db[key] = entry
|
|
else:
|
|
self.dismiss_handles[key] = eventloop.delay_callback(
|
|
time_diff, self._wake_dismissed, key
|
|
)
|
|
if last_key:
|
|
self.next_key = int(last_key, 16) + 1
|
|
|
|
async def list_entries(
|
|
self, include_dismissed: bool = True
|
|
) -> List[Dict[str, Any]]:
|
|
vals: List[Dict[str, Any]] = await self.announce_db.values()
|
|
vals = sorted(vals, key=lambda x: x["date"], reverse=True)
|
|
if include_dismissed:
|
|
return vals
|
|
return [val for val in vals if not val["dismissed"]]
|
|
|
|
def has_entry(self, entry_id: str) -> bool:
|
|
return entry_id in self.entry_id_map
|
|
|
|
def add_entry(self, entry: Dict[str, Any]) -> Awaitable[None]:
|
|
aid = entry["entry_id"]
|
|
key = f"{self.next_key:06X}"
|
|
self.next_key += 1
|
|
self.entry_id_map[aid] = key
|
|
return self.announce_db.insert(key, entry)
|
|
|
|
def remove_entry(self, entry_id: str) -> Awaitable[Any]:
|
|
key = self.entry_id_map.pop(entry_id, None)
|
|
if key is None:
|
|
raise self.server.error(f"No key matching entry id: {entry_id}")
|
|
return self.announce_db.pop(key, None)
|
|
|
|
async def dismiss_entry(
|
|
self, entry_id: str, wake_time: Optional[int] = None
|
|
) -> None:
|
|
key = self.entry_id_map.get(entry_id)
|
|
if key is None:
|
|
raise self.server.error(f"No key matching entry id: {entry_id}")
|
|
entry = await self.announce_db[key]
|
|
is_dismissed = entry["dismissed"]
|
|
if is_dismissed:
|
|
return
|
|
entry["dismissed"] = True
|
|
eventloop = self.server.get_event_loop()
|
|
curtime = datetime.datetime.utcnow().timestamp()
|
|
entry["date_dismissed"] = curtime
|
|
if wake_time is not None:
|
|
entry["dismiss_wake"] = curtime + wake_time
|
|
self.dismiss_handles[key] = eventloop.delay_callback(
|
|
wake_time, self._wake_dismissed, key
|
|
)
|
|
self.announce_db[key] = entry
|
|
eventloop.delay_callback(
|
|
.05, self.server.send_event, "announcements:dismissed",
|
|
{"entry_id": entry_id}
|
|
)
|
|
|
|
async def _wake_dismissed(self, key: str) -> None:
|
|
self.dismiss_handles.pop(key, None)
|
|
entry = await self.announce_db.get(key, None)
|
|
if entry is None:
|
|
return
|
|
if not entry["dismissed"]:
|
|
return
|
|
entry["dismissed"] = False
|
|
entry["date_dismissed"] = None
|
|
entry["dismiss_wake"] = None
|
|
self.announce_db[key] = entry
|
|
self.server.send_event(
|
|
"announcements:dismiss_wake", {"entry_id": entry["entry_id"]}
|
|
)
|
|
|
|
def prune_by_prefix(self, prefix: str, valid_ids: List[str]) -> bool:
|
|
del_keys: List[str] = []
|
|
for entry_id in list(self.entry_id_map.keys()):
|
|
if not entry_id.startswith(prefix) or entry_id in valid_ids:
|
|
continue
|
|
# Entry is no longer valid and should be removed
|
|
key = self.entry_id_map.pop(entry_id, None)
|
|
if key is not None:
|
|
del_keys.append(key)
|
|
if del_keys:
|
|
self.announce_db.delete_batch(del_keys)
|
|
return True
|
|
return False
|
|
|
|
async def prune_by_feed(self, feed: str) -> bool:
|
|
entries = await self.list_entries()
|
|
del_keys: List[str] = []
|
|
for entry in entries:
|
|
if entry["feed"].lower() == feed:
|
|
key = self.entry_id_map.pop(entry["entry_id"], None)
|
|
if key is not None:
|
|
del_keys.append(key)
|
|
if del_keys:
|
|
self.announce_db.delete_batch(del_keys)
|
|
return True
|
|
return False
|
|
|
|
def close(self):
|
|
for handle in self.dismiss_handles.values():
|
|
handle.cancel()
|
|
|
|
class RssFeed:
|
|
def __init__(
|
|
self, name: str, entry_mgr: EntryManager, dev_mode: bool
|
|
) -> None:
|
|
self.server = entry_mgr.server
|
|
self.name = name
|
|
self.entry_mgr = entry_mgr
|
|
self.client: HttpClient = self.server.lookup_component("http_client")
|
|
self.database: MoonrakerDatabase
|
|
self.database = self.server.lookup_component("database")
|
|
self.xml_file = f"{self.name}.xml"
|
|
self.asset_url = f"{MOONLIGHT_URL}/assets/{self.xml_file}"
|
|
self.last_modified: int = 0
|
|
self.etag: Optional[str] = None
|
|
self.dev_xml_path: Optional[pathlib.Path] = None
|
|
if dev_mode:
|
|
res_dir = pathlib.Path(__file__).parent.parent.parent.resolve()
|
|
res_path = res_dir.joinpath(".devel/announcement_xml")
|
|
self.dev_xml_path = res_path.joinpath(self.xml_file)
|
|
|
|
async def initialize(self) -> None:
|
|
self.etag = await self.database.get_item(
|
|
"moonraker", f"announcements.{self.name}.etag", None
|
|
)
|
|
|
|
async def update_entries(self) -> bool:
|
|
if self.dev_xml_path is None:
|
|
xml_data = await self._fetch_moonlight()
|
|
else:
|
|
xml_data = await self._fetch_local_folder()
|
|
if not xml_data:
|
|
return False
|
|
return self._parse_xml(xml_data)
|
|
|
|
async def _fetch_moonlight(self) -> str:
|
|
headers = {"Accept": "application/xml"}
|
|
if self.etag is not None:
|
|
headers["If-None-Match"] = self.etag
|
|
resp = await self.client.get(
|
|
self.asset_url, headers, attempts=5,
|
|
retry_pause_time=.5, enable_cache=False,
|
|
)
|
|
if resp.has_error():
|
|
logging.info(
|
|
f"Failed to update subscription '{self.name}': {resp.error}"
|
|
)
|
|
return ""
|
|
if resp.status_code == 304:
|
|
logging.debug(f"Content at {self.xml_file} not modified")
|
|
return ""
|
|
# update etag
|
|
self.etag = resp.etag
|
|
try:
|
|
if self.etag is not None:
|
|
self.database.insert_item(
|
|
"moonraker", f"announcements.{self.name}.etag", resp.etag
|
|
)
|
|
else:
|
|
self.database.delete_item(
|
|
"moonraker", f"announcements.{self.name}.etag",
|
|
)
|
|
except self.server.error:
|
|
pass
|
|
return resp.text
|
|
|
|
async def _fetch_local_folder(self) -> str:
|
|
if self.dev_xml_path is None:
|
|
return ""
|
|
if not self.dev_xml_path.is_file():
|
|
logging.info(f"No file at path {self.dev_xml_path}")
|
|
return ""
|
|
mtime = self.dev_xml_path.stat().st_mtime_ns
|
|
if mtime <= self.last_modified:
|
|
logging.debug(f"Content at {self.xml_file} not modified")
|
|
return ""
|
|
try:
|
|
eventloop = self.server.get_event_loop()
|
|
xml_data = await eventloop.run_in_thread(
|
|
self.dev_xml_path.read_text)
|
|
except Exception:
|
|
logging.exception(f"Unable read xml file {self.dev_xml_path}")
|
|
return ""
|
|
self.last_modified = mtime
|
|
return xml_data
|
|
|
|
def _parse_xml(self, xml_data: str) -> bool:
|
|
root = etree.fromstring(xml_data)
|
|
channel = root.find("channel")
|
|
if channel is None:
|
|
root_str = etree.tostring(root, encoding="unicode")
|
|
logging.debug(f"Feed {self.name}: no channel found\n{root_str}")
|
|
return False
|
|
# extract prefix
|
|
prefix = channel.findtext("title", "").lower()
|
|
if not prefix:
|
|
logging.info(f"Feed {self.name}: No prefix found")
|
|
items = channel.findall("item")
|
|
valid_ids: List[str] = []
|
|
changed: bool = False
|
|
for item in items:
|
|
guid = item.findtext("guid")
|
|
if guid is None:
|
|
item_str = etree.tostring(item, encoding="unicode")
|
|
logging.debug(f"Feed {self.name}: Invalid Item\n{item_str}")
|
|
continue
|
|
if not prefix:
|
|
# fall back to first guid prefix
|
|
prefix = "/".join(guid.split("/")[:2])
|
|
elif not guid.startswith(prefix):
|
|
logging.debug(
|
|
f"Feed {self.name}: Guid {guid} is not "
|
|
f"prefixed with {prefix}")
|
|
valid_ids.append(guid)
|
|
if self.entry_mgr.has_entry(guid):
|
|
continue
|
|
try:
|
|
rfc_date = item.findtext("pubDate", "")
|
|
dt = email.utils.parsedate_to_datetime(rfc_date)
|
|
except Exception:
|
|
dt = datetime.datetime.utcnow()
|
|
entry: Dict[str, Any] = {
|
|
"entry_id": guid,
|
|
"url": item.findtext("link"),
|
|
"title": item.findtext("title"),
|
|
"description": item.findtext("description"),
|
|
"priority": item.findtext("category"),
|
|
"date": dt.timestamp(),
|
|
"dismissed": False,
|
|
"date_dismissed": None,
|
|
"dismiss_wake": None,
|
|
"source": "moonlight",
|
|
"feed": self.name
|
|
}
|
|
changed = True
|
|
self.entry_mgr.add_entry(entry)
|
|
logging.debug(f"Feed {self.name}: found entries {valid_ids}")
|
|
if prefix:
|
|
pruned = self.entry_mgr.prune_by_prefix(prefix, valid_ids)
|
|
changed = changed or pruned
|
|
return changed
|
|
|
|
|
|
def load_component(config: ConfigHelper) -> Announcements:
|
|
return Announcements(config)
|