diff --git a/scripts/motan/data_logger.py b/scripts/motan/data_logger.py index 05dcc1986..3f23b2e8c 100755 --- a/scripts/motan/data_logger.py +++ b/scripts/motan/data_logger.py @@ -63,19 +63,14 @@ class DataLogger: self.logger = LogWriter(log_prefix + ".json.gz") self.index = LogWriter(log_prefix + ".index.gz") # Handlers - self.query_handlers = { - "info": self.handle_info, "list": self.handle_list, - "status": self.handle_subscribe, - } - self.async_handlers = { - "status": self.handle_async_db, - } + self.query_handlers = {} + self.async_handlers = {} # get_status databasing - self.db = {"status": {}} + self.db = {} self.next_index_time = 0. # Start login process - self.send_msg({"id": "info", "method": "info", - "params": { "client_info": ClientInfo }}) + self.send_query("info", "info", {"client_info": ClientInfo}, + self.handle_info) def error(self, msg): sys.stderr.write(msg + "\n") def finish(self, msg): @@ -84,7 +79,9 @@ class DataLogger: self.index.close() sys.exit(0) # Unix Domain Socket IO - def send_msg(self, msg): + def send_query(self, msg_id, method, params, cb): + self.query_handlers[msg_id] = cb + msg = {"id": msg_id, "method": method, "params": params} cm = json.dumps(msg, separators=(',', ':')).encode() self.webhook_socket.send(cm + b"\x03") def process_socket(self): @@ -110,7 +107,10 @@ class DataLogger: msg_id = msg.get("id") hdl = self.query_handlers.get(msg_id) if hdl is not None: + del self.query_handlers[msg_id] hdl(msg, part) + if not self.query_handlers: + self.flush_index() continue self.error("ERROR: Message with unknown id") def run(self): @@ -123,52 +123,49 @@ class DataLogger: except KeyboardInterrupt as e: self.finish("Keyboard Interrupt") # Query response handlers + def send_subscribe(self, msg_id, method, params, cb=None, async_cb=None): + if cb is None: + cb = self.handle_dump + if async_cb is not None: + self.async_handlers[msg_id] = async_cb + params["response_template"] = {"q": msg_id} + self.send_query(msg_id, method, params, cb) def handle_info(self, msg, raw_msg): if msg["result"]["state"] != "ready": self.finish("Klipper not in ready state") - self.send_msg({"id": "list", "method": "objects/list"}) + self.send_query("list", "objects/list", {}, self.handle_list) def handle_list(self, msg, raw_msg): subreq = {o: None for o in msg["result"]["objects"]} - self.send_msg({"id": "status", "method": "objects/subscribe", - "params": { "objects": subreq, - "response_template": {"q": "status"}}}) + self.send_subscribe("status", "objects/subscribe", {"objects": subreq}, + self.handle_subscribe, self.handle_async_db) def handle_subscribe(self, msg, raw_msg): result = msg["result"] self.next_index_time = result["eventtime"] + INDEX_UPDATE_TIME - status = result["status"] - self.db["status"].update(status) - motion_report = status.get("motion_report") - if motion_report is not None: - for trapq in motion_report.get("trapq", []): - qname = "trapq:" + trapq - self.query_handlers[qname] = self.handle_dump - self.send_msg({"id": qname, - "method": "motion_report/dump_trapq", - "params": { "name": trapq, - "response_template": {"q": qname}}}) - for stepper in motion_report.get("steppers", []): - qname = "stepq:" + stepper - self.query_handlers[qname] = self.handle_dump - self.send_msg({"id": qname, - "method": "motion_report/dump_stepper", - "params": { "name": stepper, - "response_template": {"q": qname}}}) + self.db["status"] = status = result["status"] + # Subscribe to trapq and stepper queue updates + motion_report = status.get("motion_report", {}) + for trapq in motion_report.get("trapq", []): + self.send_subscribe("trapq:" + trapq, "motion_report/dump_trapq", + {"name": trapq}) + for stepper in motion_report.get("steppers", []): + self.send_subscribe("stepq:" + stepper, + "motion_report/dump_stepper", {"name": stepper}) def handle_dump(self, msg, raw_msg): msg_id = msg["id"] self.db.setdefault("subscriptions", {})[msg_id] = msg["result"] + def flush_index(self): + self.db['file_position'] = self.logger.flush() + self.index.add_data(json.dumps(self.db, separators=(',', ':')).encode()) + self.db = {"status": {}} def handle_async_db(self, msg, raw_msg): params = msg["params"] db_status = self.db['status'] for k, v in params.get("status", {}).items(): db_status.setdefault(k, {}).update(v) eventtime = params['eventtime'] - if eventtime < self.next_index_time: - return - # Update index file - self.next_index_time = eventtime + INDEX_UPDATE_TIME - self.db['file_position'] = self.logger.flush() - self.index.add_data(json.dumps(self.db, separators=(',', ':')).encode()) - self.db = {"status": {}} + if eventtime >= self.next_index_time: + self.next_index_time = eventtime + INDEX_UPDATE_TIME + self.flush_index() def nice(): try: