From 13a85fe9e002f736c76d00f937fc21d2c58d6615 Mon Sep 17 00:00:00 2001 From: Eric Callahan Date: Sat, 10 Jul 2021 17:17:10 -0400 Subject: [PATCH] file_manager: replace references to ioloop with eventloop Signed-off-by: Eric Callahan --- .../components/file_manager/file_manager.py | 111 ++++++++---------- 1 file changed, 52 insertions(+), 59 deletions(-) diff --git a/moonraker/components/file_manager/file_manager.py b/moonraker/components/file_manager/file_manager.py index 96a1d40..a6fbeff 100644 --- a/moonraker/components/file_manager/file_manager.py +++ b/moonraker/components/file_manager/file_manager.py @@ -12,9 +12,6 @@ import logging import json import tempfile import asyncio -from concurrent.futures import ThreadPoolExecutor -from tornado.ioloop import IOLoop -from tornado.locks import Event, Lock, Condition from inotify_simple import INotify from inotify_simple import flags as iFlags @@ -57,6 +54,7 @@ WATCH_FLAGS = iFlags.CREATE | iFlags.DELETE | iFlags.MODIFY \ class FileManager: def __init__(self, config: ConfigHelper) -> None: self.server = config.get_server() + self.event_loop = self.server.get_event_loop() self.file_paths: Dict[str, str] = {} db: DBComp = self.server.load_component(config, "database") gc_path: str = db.get_item( @@ -64,7 +62,7 @@ class FileManager: self.gcode_metadata = MetadataStorage(self.server, gc_path, db) self.inotify_handler = INotifyHandler(config, self, self.gcode_metadata) - self.write_mutex = Lock() + self.write_mutex = asyncio.Lock() self.notify_sync_lock: Optional[NotifySyncLock] = None self.fixed_path_args: Dict[str, Any] = {} @@ -148,7 +146,7 @@ class FileManager: path = os.path.realpath(path) if not os.path.isdir(path) or path == "/": logging.info( - f"\nSupplied path ({path}) for ({root}) a valid. Make sure\n" + f"\nSupplied path ({path}) for ({root}) is invalid. Make sure\n" "that the path exists and is not the file system root.") return False permissions = os.R_OK @@ -172,7 +170,7 @@ class FileManager: # Refresh the file list and add watches self.inotify_handler.add_root_watch(root, path) else: - IOLoop.current().spawn_callback( + self.event_loop.register_callback( self.inotify_handler.notify_filelist_changed, "root_update", root, path) return True @@ -257,12 +255,10 @@ class FileManager: # Make sure that the directory does not contain a file # loaded by the virtual_sdcard await self._handle_operation_check(dir_path) - ioloop = IOLoop.current() self.notify_sync_lock = NotifySyncLock(dir_path) try: - with ThreadPoolExecutor(max_workers=1) as tpe: - await ioloop.run_in_executor( - tpe, shutil.rmtree, dir_path) + await self.event_loop.run_in_thread( + shutil.rmtree, dir_path) except Exception: self.notify_sync_lock.cancel() self.notify_sync_lock = None @@ -356,12 +352,10 @@ class FileManager: else: result['action'] = "create_file" op_func = shutil.copy2 - ioloop = IOLoop.current() self.notify_sync_lock = NotifySyncLock(dest_path) try: - with ThreadPoolExecutor(max_workers=1) as tpe: - full_dest = await ioloop.run_in_executor( - tpe, op_func, source_path, dest_path) + full_dest = await self.event_loop.run_in_thread( + op_func, source_path, dest_path) except Exception as e: self.notify_sync_lock.cancel() self.notify_sync_lock = None @@ -411,10 +405,10 @@ class FileManager: return path_info def gen_temp_upload_path(self) -> str: - ioloop = IOLoop.current() + loop_time = int(self.event_loop.get_loop_time()) return os.path.join( tempfile.gettempdir(), - f"moonraker.upload-{int(ioloop.time())}.mru") + f"moonraker.upload-{loop_time}.mru") async def finalize_upload(self, form_args: Dict[str, Any] @@ -705,11 +699,12 @@ class InotifyNode: name: str ) -> None: self.ihdlr = ihdlr + self.event_loop = ihdlr.event_loop self.name = name self.parent_node = parent self.child_nodes: Dict[str, InotifyNode] = {} self.watch_desc = self.ihdlr.add_watch(self) - self.pending_node_events: Dict[str, object] = {} + self.pending_node_events: Dict[str, asyncio.Handle] = {} self.pending_deleted_children: Set[Tuple[str, bool]] = set() self.pending_file_events: Dict[str, str] = {} @@ -724,7 +719,7 @@ class InotifyNode: node_path = self.get_path() root = self.get_root() # Scan child nodes for unwatched directories and metadata - mevts: List[Event] = self.scan_node() + mevts: List[asyncio.Event] = self.scan_node() if mevts: mfuts = [e.wait() for e in mevts] await asyncio.gather(*mfuts) @@ -753,12 +748,12 @@ class InotifyNode: def scan_node(self, visited_dirs: Set[Tuple[int, int]] = set() - ) -> List[Event]: + ) -> List[asyncio.Event]: dir_path = self.get_path() st = os.stat(dir_path) if st in visited_dirs: return [] - metadata_events: List[Event] = [] + metadata_events: List[asyncio.Event] = [] visited_dirs.add((st.st_dev, st.st_ino)) for fname in os.listdir(dir_path): if fname[0] == ".": @@ -885,34 +880,33 @@ class InotifyNode: self.reset_event(evt_name, timeout) return callback = getattr(self, f"_finish_{evt_name}") - hdl = IOLoop.current().call_later(timeout, callback) + hdl = self.event_loop.delay_callback(timeout, callback) self.pending_node_events[evt_name] = hdl def reset_event(self, evt_name: str, timeout: float) -> None: if evt_name in self.pending_node_events: - ioloop = IOLoop.current() hdl = self.pending_node_events[evt_name] - ioloop.remove_timeout(hdl) + hdl.cancel() callback = getattr(self, f"_finish_{evt_name}") - hdl = ioloop.call_later(timeout, callback) + hdl = self.event_loop.delay_callback(timeout, callback) self.pending_node_events[evt_name] = hdl def stop_event(self, evt_name: str) -> None: if evt_name in self.pending_node_events: hdl = self.pending_node_events[evt_name] - IOLoop.current().remove_timeout(hdl) + hdl.cancel() def remove_event(self, evt_name: str) -> None: hdl = self.pending_node_events.pop(evt_name, None) if hdl is not None: - IOLoop.current().remove_timeout(hdl) + hdl.cancel() def clear_events(self, include_children: bool = True) -> None: if include_children: for child in self.child_nodes.values(): child.clear_events(include_children) for hdl in self.pending_node_events.values(): - IOLoop.current().remove_timeout(hdl) + hdl.cancel() self.pending_node_events.clear() self.pending_deleted_children.clear() self.pending_file_events.clear() @@ -945,7 +939,7 @@ class InotifyRootNode(InotifyNode): class NotifySyncLock: def __init__(self, dest_path: str) -> None: self.wait_fut: Optional[asyncio.Future] = None - self.sync_condition = Condition() + self.sync_event = asyncio.Event() self.dest_path = dest_path self.notified_paths: Set[str] = set() self.finished: bool = False @@ -970,7 +964,7 @@ class NotifySyncLock: await asyncio.wait_for(self.wait_fut, timeout) except asyncio.TimeoutError: pass - self.sync_condition.notify_all() + self.sync_event.set() self.finished = True async def sync(self, path, timeout: Optional[float] = None) -> None: @@ -980,10 +974,8 @@ class NotifySyncLock: if self.wait_fut is not None and self.dest_path == path: self.wait_fut.set_result(None) # Transfer control to waiter - if timeout is not None: - timeout = IOLoop.current().time() + timeout try: - await self.sync_condition.wait(timeout) + await asyncio.wait_for(self.sync_event.wait(), timeout) except Exception: pass else: @@ -996,7 +988,7 @@ class NotifySyncLock: return if self.wait_fut is not None and not self.wait_fut.done(): self.wait_fut.set_result(None) - self.sync_condition.notify_all() + self.sync_event.set() self.finished = True class INotifyHandler: @@ -1006,21 +998,23 @@ class INotifyHandler: gcode_metadata: MetadataStorage ) -> None: self.server = config.get_server() + self.event_loop = self.server.get_event_loop() self.debug_enabled = config['server'].getboolean( 'enable_debug_logging', False) self.file_manager = file_manager self.gcode_metadata = gcode_metadata self.inotify = INotify(nonblocking=True) - IOLoop.current().add_handler( - self.inotify.fileno(), self._handle_inotify_read, - IOLoop.READ | IOLoop.ERROR) + self.event_loop.add_reader( + self.inotify.fileno(), self._handle_inotify_read) - self.event_loop_busy: bool = False + self.node_loop_busy: bool = False self.pending_inotify_events: List[InotifyEvent] = [] self.watched_roots: Dict[str, InotifyRootNode] = {} self.watched_nodes: Dict[int, InotifyNode] = {} - self.pending_moves: Dict[int, Tuple[InotifyNode, str, object]] = {} + self.pending_moves: Dict[ + int, Tuple[InotifyNode, str, asyncio.Handle]] = {} + def add_root_watch(self, root: str, root_path: str) -> None: if root not in FULL_ACCESS_ROOTS: @@ -1034,11 +1028,11 @@ class INotifyHandler: self.watched_roots[root] = root_node mevts = root_node.scan_node() self.log_nodes() - IOLoop.current().spawn_callback( + self.event_loop.register_callback( self._notify_root_updated, mevts, root, root_path) async def _notify_root_updated(self, - mevts: List[Event], + mevts: List[asyncio.Event], root: str, root_path: str ) -> None: @@ -1119,14 +1113,14 @@ class INotifyHandler: f"Watch: {wdesc}" logging.debug(debug_msg) - def parse_gcode_metadata(self, file_path: str) -> Event: + def parse_gcode_metadata(self, file_path: str) -> asyncio.Event: rel_path = self.file_manager.get_relative_path("gcodes", file_path) try: path_info = self.file_manager.get_path_info(file_path) except Exception: logging.exception( f"Error retreiving path info for file {file_path}") - evt = Event() + evt = asyncio.Event() evt.set() return evt ext = os.path.splitext(file_path)[-1].lower() @@ -1159,15 +1153,12 @@ class INotifyHandler: parent_node: InotifyNode, is_dir: bool ) -> None: - hdl = IOLoop.current().call_later( + hdl = self.event_loop.delay_callback( INOTIFY_MOVE_TIME, self._handle_move_timeout, evt.cookie, is_dir) self.pending_moves[evt.cookie] = (parent_node, evt.name, hdl) - def _handle_inotify_read(self, fd: int, events: int) -> None: - if events & IOLoop.ERROR: - logging.info("INotify Read Error") - return + def _handle_inotify_read(self) -> None: evt: InotifyEvent for evt in self.inotify.read(timeout=0): if evt.mask & iFlags.IGNORED: @@ -1180,9 +1171,9 @@ class INotifyHandler: f"flags: {flags}") continue self.pending_inotify_events.append(evt) - if not self.event_loop_busy: - self.event_loop_busy = True - IOLoop.current().spawn_callback(self._process_inotify_events) + if not self.node_loop_busy: + self.node_loop_busy = True + self.event_loop.register_callback(self._process_inotify_events) async def _process_inotify_events(self) -> None: while self.pending_inotify_events: @@ -1192,7 +1183,7 @@ class INotifyHandler: await self._process_dir_event(evt, node) else: await self._process_file_event(evt, node) - self.event_loop_busy = False + self.node_loop_busy = False async def _process_dir_event(self, evt: InotifyEvent, @@ -1222,7 +1213,7 @@ class INotifyHandler: if moved_evt is not None: # Moved from a currently watched directory prev_parent, child_name, hdl = moved_evt - IOLoop.current().remove_timeout(hdl) + hdl.cancel() await prev_parent.move_child_node(child_name, evt.name, node) else: # Moved from an unwatched directory, for our @@ -1260,7 +1251,7 @@ class INotifyHandler: if moved_evt is not None: # Moved from a currently watched directory prev_parent, prev_name, hdl = moved_evt - IOLoop.current().remove_timeout(hdl) + hdl.cancel() prev_root = prev_parent.get_root() prev_path = os.path.join(prev_parent.get_path(), prev_name) move_success = await self.try_move_metadata( @@ -1314,7 +1305,7 @@ class INotifyHandler: if sync_lock is not None: # Delay this notification so that it occurs after an item logging.debug(f"Syncing notification: {full_path}") - IOLoop.current().spawn_callback( + self.event_loop.register_callback( self._sync_with_request, result, sync_lock.sync(full_path), is_valid) elif is_valid and self._check_need_notify(file_info): @@ -1337,7 +1328,7 @@ class INotifyHandler: return True def close(self) -> None: - IOLoop.current().remove_handler(self.inotify.fileno()) + self.event_loop.remove_reader(self.inotify.fileno()) for watch in self.watched_nodes.keys(): try: self.inotify.rm_watch(watch) @@ -1368,7 +1359,8 @@ class MetadataStorage: db.insert_item( "moonraker", "file_manager.metadata_version", METADATA_VERSION) - self.pending_requests: Dict[str, Tuple[Dict[str, Any], Event]] = {} + self.pending_requests: Dict[ + str, Tuple[Dict[str, Any], asyncio.Event]] = {} self.busy: bool = False if self.gc_path: # Check for removed gcode files while moonraker was shutdown @@ -1477,8 +1469,8 @@ class MetadataStorage: def parse_metadata(self, fname: str, path_info: Dict[str, Any] - ) -> Event: - mevt = Event() + ) -> asyncio.Event: + mevt = asyncio.Event() ext = os.path.splitext(fname)[1] if fname in self.pending_requests or \ ext not in VALID_GCODE_EXTS or \ @@ -1490,7 +1482,8 @@ class MetadataStorage: if self.busy: return mevt self.busy = True - IOLoop.current().spawn_callback(self._process_metadata_update) + event_loop = self.server.get_event_loop() + event_loop.register_callback(self._process_metadata_update) return mevt async def _process_metadata_update(self) -> None: