From c081fa49a1c2fdefb14480914c75ba4804fab229 Mon Sep 17 00:00:00 2001 From: Eric Callahan Date: Sun, 30 Jan 2022 20:38:54 -0500 Subject: [PATCH] history: update for database changes Signed-off-by: Eric Callahan --- moonraker/components/history.py | 158 ++++++++++++++++---------------- 1 file changed, 81 insertions(+), 77 deletions(-) diff --git a/moonraker/components/history.py b/moonraker/components/history.py index a8fe927..57205b6 100644 --- a/moonraker/components/history.py +++ b/moonraker/components/history.py @@ -4,6 +4,7 @@ from __future__ import annotations import time +from asyncio import Lock # Annotation imports from typing import ( @@ -17,11 +18,9 @@ from typing import ( if TYPE_CHECKING: from confighelper import ConfigHelper from websockets import WebRequest - from . import database + from .database import MoonrakerDatabase as DBComp from .job_state import JobState - from .file_manager import file_manager - DBComp = database.MoonrakerDatabase - FMComp = file_manager.FileManager + from .file_manager.file_manager import FileManager HIST_NAMESPACE = "history" MAX_JOBS = 10000 @@ -29,10 +28,10 @@ MAX_JOBS = 10000 class History: def __init__(self, config: ConfigHelper) -> None: self.server = config.get_server() - self.file_manager: FMComp = self.server.lookup_component( + self.file_manager: FileManager = self.server.lookup_component( 'file_manager') + self.request_lock = Lock() database: DBComp = self.server.lookup_component("database") - self.gcdb = database.wrap_namespace("gcode_metadata", parse_keys=False) self.job_totals: Dict[str, float] = database.get_item( "moonraker", "history.job_totals", { @@ -42,7 +41,7 @@ class History: 'total_filament_used': 0., 'longest_job': 0., 'longest_print': 0. - }) + }).result() self.server.register_event_handler( "server:klippy_disconnect", self._handle_disconnect) @@ -77,93 +76,95 @@ class History: self.current_job: Optional[PrinterJob] = None self.current_job_id: Optional[str] = None self.next_job_id: int = 0 - self.cached_job_ids = self.history_ns.keys() + self.cached_job_ids = self.history_ns.keys().result() if self.cached_job_ids: self.next_job_id = int(self.cached_job_ids[-1], 16) + 1 async def _handle_job_request(self, web_request: WebRequest ) -> Dict[str, Any]: - action = web_request.get_action() - if action == "GET": - job_id = web_request.get_str("uid") - if job_id not in self.cached_job_ids: - raise self.server.error(f"Invalid job uid: {job_id}", 404) - job = self.history_ns[job_id] - return {"job": self._prep_requested_job(job, job_id)} - if action == "DELETE": - all = web_request.get_boolean("all", False) - if all: - deljobs = self.cached_job_ids - self.history_ns.clear() - self.cached_job_ids = [] - self.next_job_id = 0 - return {'deleted_jobs': deljobs} + async with self.request_lock: + action = web_request.get_action() + if action == "GET": + job_id = web_request.get_str("uid") + if job_id not in self.cached_job_ids: + raise self.server.error(f"Invalid job uid: {job_id}", 404) + job = await self.history_ns[job_id] + return {"job": self._prep_requested_job(job, job_id)} + if action == "DELETE": + all = web_request.get_boolean("all", False) + if all: + deljobs = self.cached_job_ids + self.history_ns.clear() + self.cached_job_ids = [] + self.next_job_id = 0 + return {'deleted_jobs': deljobs} - job_id = web_request.get_str("uid") - if job_id not in self.cached_job_ids: - raise self.server.error(f"Invalid job uid: {job_id}", 404) + job_id = web_request.get_str("uid") + if job_id not in self.cached_job_ids: + raise self.server.error(f"Invalid job uid: {job_id}", 404) - self.delete_job(job_id) - return {'deleted_jobs': [job_id]} - raise self.server.error("Invalid Request Method") + self.delete_job(job_id) + return {'deleted_jobs': [job_id]} + raise self.server.error("Invalid Request Method") async def _handle_jobs_list(self, web_request: WebRequest ) -> Dict[str, Any]: - i = 0 - count = 0 - end_num = len(self.cached_job_ids) - jobs: List[Dict[str, Any]] = [] - start_num = 0 + async with self.request_lock: + i = 0 + count = 0 + end_num = len(self.cached_job_ids) + jobs: List[Dict[str, Any]] = [] + start_num = 0 - before = web_request.get_float("before", -1) - since = web_request.get_float("since", -1) - limit = web_request.get_int("limit", 50) - start = web_request.get_int("start", 0) - order = web_request.get_str("order", "desc") + before = web_request.get_float("before", -1) + since = web_request.get_float("since", -1) + limit = web_request.get_int("limit", 50) + start = web_request.get_int("start", 0) + order = web_request.get_str("order", "desc") - if order not in ["asc", "desc"]: - raise self.server.error(f"Invalid `order` value: {order}", 400) + if order not in ["asc", "desc"]: + raise self.server.error(f"Invalid `order` value: {order}", 400) - reverse_order = (order == "desc") + reverse_order = (order == "desc") - # cached jobs is asc order, find lower and upper boundary - if since != -1: - while start_num < end_num: - job_id = self.cached_job_ids[start_num] - job: Dict[str, Any] = self.history_ns[job_id] - if job['start_time'] > since: - break - start_num += 1 + # cached jobs is asc order, find lower and upper boundary + if since != -1: + while start_num < end_num: + job_id = self.cached_job_ids[start_num] + job: Dict[str, Any] = await self.history_ns[job_id] + if job['start_time'] > since: + break + start_num += 1 - if before != -1: - while end_num > 0: - job_id = self.cached_job_ids[end_num-1] - job = self.history_ns[job_id] - if job['end_time'] < before: - break - end_num -= 1 + if before != -1: + while end_num > 0: + job_id = self.cached_job_ids[end_num-1] + job = await self.history_ns[job_id] + if job['end_time'] < before: + break + end_num -= 1 - if start_num >= end_num or end_num == 0: - return {"count": 0, "jobs": []} + if start_num >= end_num or end_num == 0: + return {"count": 0, "jobs": []} - i = start - count = end_num - start_num + i = start + count = end_num - start_num - if limit == 0: - limit = MAX_JOBS + if limit == 0: + limit = MAX_JOBS - while i < count and len(jobs) < limit: - if reverse_order: - job_id = self.cached_job_ids[end_num - i - 1] - else: - job_id = self.cached_job_ids[start_num + i] - job = self.history_ns[job_id] - jobs.append(self._prep_requested_job(job, job_id)) - i += 1 + while i < count and len(jobs) < limit: + if reverse_order: + job_id = self.cached_job_ids[end_num - i - 1] + else: + job_id = self.cached_job_ids[start_num + i] + job = await self.history_ns[job_id] + jobs.append(self._prep_requested_job(job, job_id)) + i += 1 - return {"count": count, "jobs": jobs} + return {"count": count, "jobs": jobs} async def _handle_job_totals(self, web_request: WebRequest @@ -186,7 +187,7 @@ class History: 'longest_print': 0. } database: DBComp = self.server.lookup_component("database") - database.insert_item( + await database.insert_item( "moonraker", "history.job_totals", self.job_totals) return {'last_totals': last_totals} @@ -275,16 +276,19 @@ class History: self.current_job = None self.current_job_id = None - def get_job(self, job_id: Union[int, str]) -> Optional[Dict[str, Any]]: + async def get_job(self, + job_id: Union[int, str] + ) -> Optional[Dict[str, Any]]: if isinstance(job_id, int): job_id = f"{job_id:06X}" - return self.history_ns.get(job_id, None) + return await self.history_ns.get(job_id, None) def grab_job_metadata(self) -> None: if self.current_job is None: return filename: str = self.current_job.get("filename") - metadata: Dict[str, Any] = self.gcdb.get(filename, {}) + mdst = self.file_manager.get_metadata_storage() + metadata: Dict[str, Any] = mdst.get(filename, {}) if metadata: # Add the start time and job id to the # persistent metadata storage @@ -292,7 +296,7 @@ class History: 'print_start_time': self.current_job.get('start_time'), 'job_id': self.current_job_id }) - self.gcdb[filename] = metadata + mdst.insert(filename, metadata.copy()) # We don't need to store these fields in the # job metadata, as they are redundant metadata.pop('print_start_time', None)