file_manager: allow concurrent inotify processing

All of the awaitable calls in the inotify loop only block to
postpone notifications.  Only gcode notifications require async
processing due to metadata analysis.  Queue these notifications
while allowing inotify events to be received concurrently.

Signed-off-by:  Eric Callahan <arksine.code@gmail.com>

fix
This commit is contained in:
Eric Callahan
2023-02-16 18:11:51 -05:00
parent 9f08aad6f6
commit d39792b84e

View File

@@ -31,6 +31,7 @@ from typing import (
List, List,
Set, Set,
Coroutine, Coroutine,
Awaitable,
Callable, Callable,
TypeVar, TypeVar,
cast, cast,
@@ -1225,11 +1226,12 @@ class InotifyNode:
metadata_events.append(mevt) metadata_events.append(mevt)
return metadata_events return metadata_events
async def move_child_node(self, def move_child_node(
child_name: str, self,
new_name: str, child_name: str,
new_parent: InotifyNode new_name: str,
) -> None: new_parent: InotifyNode
) -> None:
self.flush_delete() self.flush_delete()
child_node = self.pop_child_node(child_name) child_node = self.pop_child_node(child_name)
if child_node is None: if child_node is None:
@@ -1243,17 +1245,25 @@ class InotifyNode:
new_root = child_node.get_root() new_root = child_node.get_root()
logging.debug(f"Moving node from '{prev_path}' to '{new_path}'") logging.debug(f"Moving node from '{prev_path}' to '{new_path}'")
# Attempt to move metadata # Attempt to move metadata
move_success = await self.ihdlr.try_move_metadata( move_res = self.ihdlr.try_move_metadata(
prev_root, new_root, prev_path, new_path, is_dir=True) prev_root, new_root, prev_path, new_path, is_dir=True
if not move_success: )
# Need rescan if new_root == "gcodes":
mevts = child_node.scan_node() async def _notify_move_dir():
if mevts: if move_res is False:
mfuts = [e.wait() for e in mevts] # Need rescan
await asyncio.gather(*mfuts) mevts = child_node.scan_node()
self.ihdlr.notify_filelist_changed( if mevts:
"move_dir", new_root, new_path, mfuts = [e.wait() for e in mevts]
prev_root, prev_path) await asyncio.gather(*mfuts)
self.ihdlr.notify_filelist_changed(
"move_dir", new_root, new_path, prev_root, prev_path
)
self.ihdlr.queue_gcode_notificaton(_notify_move_dir())
else:
self.ihdlr.notify_filelist_changed(
"move_dir", new_root, new_path, prev_root, prev_path
)
def schedule_file_event(self, file_name: str, evt_name: str) -> None: def schedule_file_event(self, file_name: str, evt_name: str) -> None:
if file_name in self.pending_file_events: if file_name in self.pending_file_events:
@@ -1263,7 +1273,7 @@ class InotifyNode:
pending_node.stop_event("create_node") pending_node.stop_event("create_node")
self.pending_file_events[file_name] = evt_name self.pending_file_events[file_name] = evt_name
async def complete_file_write(self, file_name: str) -> None: def complete_file_write(self, file_name: str) -> None:
self.flush_delete() self.flush_delete()
evt_name = self.pending_file_events.pop(file_name, None) evt_name = self.pending_file_events.pop(file_name, None)
if evt_name is None: if evt_name is None:
@@ -1282,12 +1292,16 @@ class InotifyNode:
file_path = os.path.join(self.get_path(), file_name) file_path = os.path.join(self.get_path(), file_name)
root = self.get_root() root = self.get_root()
if root == "gcodes": if root == "gcodes":
mevt = self.ihdlr.parse_gcode_metadata(file_path) async def _notify_file_write():
if os.path.splitext(file_path)[1].lower() == ".ufp": mevt = self.ihdlr.parse_gcode_metadata(file_path)
# don't notify .ufp files if os.path.splitext(file_path)[1].lower() == ".ufp":
return # don't notify .ufp files
await mevt.wait() return
self.ihdlr.notify_filelist_changed(evt_name, root, file_path) await mevt.wait()
self.ihdlr.notify_filelist_changed(evt_name, root, file_path)
self.ihdlr.queue_gcode_notificaton(_notify_file_write())
else:
self.ihdlr.notify_filelist_changed(evt_name, root, file_path)
def add_child_node(self, node: InotifyNode) -> None: def add_child_node(self, node: InotifyNode) -> None:
self.child_nodes[node.name] = node self.child_nodes[node.name] = node
@@ -1438,16 +1452,14 @@ class INotifyHandler:
self.inotify = INotify(nonblocking=True) self.inotify = INotify(nonblocking=True)
self.event_loop.add_reader( self.event_loop.add_reader(
self.inotify.fileno(), self._handle_inotify_read) self.inotify.fileno(), self._handle_inotify_read)
self.node_loop_busy: bool = False
self.pending_inotify_events: List[InotifyEvent] = []
self.watched_roots: Dict[str, InotifyRootNode] = {} self.watched_roots: Dict[str, InotifyRootNode] = {}
self.watched_nodes: Dict[int, InotifyNode] = {} self.watched_nodes: Dict[int, InotifyNode] = {}
self.pending_moves: Dict[ self.pending_moves: Dict[
int, Tuple[InotifyNode, str, asyncio.Handle]] = {} int, Tuple[InotifyNode, str, asyncio.Handle]] = {}
self.create_gcode_notifications: Dict[str, Any] = {} self.create_gcode_notifications: Dict[str, Any] = {}
self.initialized: bool = False self.initialized: bool = False
self.pending_gcode_notificatons: List[Coroutine] = []
self._gcode_queue_busy: bool = False
def add_root_watch(self, root: str, root_path: str) -> None: def add_root_watch(self, root: str, root_path: str) -> None:
# remove all exisiting watches on root # remove all exisiting watches on root
@@ -1545,26 +1557,25 @@ class INotifyHandler:
else: else:
self.gcode_metadata.remove_file_metadata(rel_path) self.gcode_metadata.remove_file_metadata(rel_path)
async def try_move_metadata(self, def try_move_metadata(
prev_root: str, self,
new_root: str, prev_root: str,
prev_path: str, new_root: str,
new_path: str, prev_path: str,
is_dir: bool = False new_path: str,
) -> bool: is_dir: bool = False
) -> Union[bool, Awaitable]:
if new_root == "gcodes": if new_root == "gcodes":
if prev_root == "gcodes": if prev_root == "gcodes":
# moved within the gcodes root, move metadata # moved within the gcodes root, move metadata
prev_rel_path = self.file_manager.get_relative_path( fm = self.file_manager
"gcodes", prev_path) gcm = self.gcode_metadata
new_rel_path = self.file_manager.get_relative_path( prev_rel_path = fm.get_relative_path("gcodes", prev_path)
"gcodes", new_path) new_rel_path = fm.get_relative_path("gcodes", new_path)
if is_dir: if is_dir:
await self.gcode_metadata.move_directory_metadata( gcm.move_directory_metadata(prev_rel_path, new_rel_path)
prev_rel_path, new_rel_path)
else: else:
return await self.gcode_metadata.move_file_metadata( return gcm.move_file_metadata(prev_rel_path, new_rel_path)
prev_rel_path, new_rel_path)
else: else:
# move from a non-gcodes root to gcodes root needs a rescan # move from a non-gcodes root to gcodes root needs a rescan
self.clear_metadata(prev_root, prev_path, is_dir) self.clear_metadata(prev_root, prev_path, is_dir)
@@ -1644,25 +1655,13 @@ class INotifyHandler:
f"not currently tracked: name: {evt.name}, " f"not currently tracked: name: {evt.name}, "
f"flags: {flags}") f"flags: {flags}")
continue continue
self.pending_inotify_events.append(evt)
if not self.node_loop_busy:
self.node_loop_busy = True
self.event_loop.register_callback(self._process_inotify_events)
async def _process_inotify_events(self) -> None:
while self.pending_inotify_events:
evt = self.pending_inotify_events.pop(0)
node = self.watched_nodes[evt.wd] node = self.watched_nodes[evt.wd]
if evt.mask & iFlags.ISDIR: if evt.mask & iFlags.ISDIR:
await self._process_dir_event(evt, node) self._process_dir_event(evt, node)
else: else:
await self._process_file_event(evt, node) self._process_file_event(evt, node)
self.node_loop_busy = False
async def _process_dir_event(self, def _process_dir_event(self, evt: InotifyEvent, node: InotifyNode) -> None:
evt: InotifyEvent,
node: InotifyNode
) -> None:
if evt.name in ['.', ".."]: if evt.name in ['.', ".."]:
# ignore events for self and parent # ignore events for self and parent
return return
@@ -1691,7 +1690,7 @@ class INotifyHandler:
# Moved from a currently watched directory # Moved from a currently watched directory
prev_parent, child_name, hdl = moved_evt prev_parent, child_name, hdl = moved_evt
hdl.cancel() hdl.cancel()
await prev_parent.move_child_node(child_name, evt.name, node) prev_parent.move_child_node(child_name, evt.name, node)
else: else:
# Moved from an unwatched directory, for our # Moved from an unwatched directory, for our
# purposes this is the same as creating a # purposes this is the same as creating a
@@ -1699,10 +1698,7 @@ class INotifyHandler:
self.sync_lock.add_pending_path("create_dir", full_path) self.sync_lock.add_pending_path("create_dir", full_path)
node.create_child_node(evt.name) node.create_child_node(evt.name)
async def _process_file_event(self, def _process_file_event(self, evt: InotifyEvent, node: InotifyNode) -> None:
evt: InotifyEvent,
node: InotifyNode
) -> None:
ext: str = os.path.splitext(evt.name)[-1].lower() ext: str = os.path.splitext(evt.name)[-1].lower()
root = node.get_root() root = node.get_root()
node_path = node.get_path() node_path = node.get_path()
@@ -1714,7 +1710,7 @@ class INotifyHandler:
node.schedule_file_event(evt.name, "create_file") node.schedule_file_event(evt.name, "create_file")
if os.path.islink(file_path): if os.path.islink(file_path):
logging.debug(f"Inotify symlink create: {file_path}") logging.debug(f"Inotify symlink create: {file_path}")
await node.complete_file_write(evt.name) node.complete_file_write(evt.name)
elif evt.mask & iFlags.DELETE: elif evt.mask & iFlags.DELETE:
logging.debug(f"Inotify file delete: {root}, " logging.debug(f"Inotify file delete: {root}, "
f"{node_path}, {evt.name}") f"{node_path}, {evt.name}")
@@ -1741,35 +1737,76 @@ class INotifyHandler:
hdl.cancel() hdl.cancel()
prev_root = prev_parent.get_root() prev_root = prev_parent.get_root()
prev_path = os.path.join(prev_parent.get_path(), prev_name) prev_path = os.path.join(prev_parent.get_path(), prev_name)
move_success = await self.try_move_metadata( move_res = self.try_move_metadata(prev_root, root, prev_path, file_path)
prev_root, root, prev_path, file_path) if root == "gcodes":
if not move_success: coro = self._finish_gcode_move(
# Unable to move, metadata needs parsing root, prev_root, file_path, prev_path, can_notify, move_res
mevt = self.parse_gcode_metadata(file_path) )
await mevt.wait() self.queue_gcode_notificaton(coro)
if can_notify: else:
self.notify_filelist_changed( self.notify_filelist_changed(
"move_file", root, file_path, "move_file", root, file_path, prev_root, prev_path
prev_root, prev_path) )
else: else:
if can_notify: if can_notify:
self.sync_lock.add_pending_path("create_file", file_path) self.sync_lock.add_pending_path("create_file", file_path)
if root == "gcodes": if root == "gcodes":
mevt = self.parse_gcode_metadata(file_path) coro = self._finish_gcode_create_from_move(file_path, can_notify)
await mevt.wait() self.queue_gcode_notificaton(coro)
if can_notify: else:
self.notify_filelist_changed( self.notify_filelist_changed("create_file", root, file_path)
"create_file", root, file_path)
if not can_notify: if not can_notify:
logging.debug("Metadata is processing, suppressing move " logging.debug(
f"notification: {file_path}") "Metadata is processing, suppressing move notification: "
f"{file_path}"
)
elif evt.mask & iFlags.MODIFY: elif evt.mask & iFlags.MODIFY:
self.sync_lock.add_pending_path("modify_file", file_path) self.sync_lock.add_pending_path("modify_file", file_path)
node.schedule_file_event(evt.name, "modify_file") node.schedule_file_event(evt.name, "modify_file")
elif evt.mask & iFlags.CLOSE_WRITE: elif evt.mask & iFlags.CLOSE_WRITE:
logging.debug(f"Inotify writable file closed: {file_path}") logging.debug(f"Inotify writable file closed: {file_path}")
# Only process files that have been created or modified # Only process files that have been created or modified
await node.complete_file_write(evt.name) node.complete_file_write(evt.name)
async def _finish_gcode_move(
self,
root: str,
prev_root: str,
file_path: str,
prev_path: str,
can_notify: bool,
move_result: Union[bool, Awaitable]
) -> None:
if not isinstance(move_result, bool):
await move_result
elif not move_result:
# Unable to move, metadata needs parsing
mevt = self.parse_gcode_metadata(file_path)
await mevt.wait()
if can_notify:
self.notify_filelist_changed(
"move_file", root, file_path, prev_root, prev_path
)
async def _finish_gcode_create_from_move(
self, file_path: str, can_notify: bool
) -> None:
mevt = self.parse_gcode_metadata(file_path)
await mevt.wait()
if can_notify:
self.notify_filelist_changed("create_file", "gcodes", file_path)
def queue_gcode_notificaton(self, coro: Coroutine) -> None:
self.pending_gcode_notificatons.append(coro)
if not self._gcode_queue_busy:
self._gcode_queue_busy = True
self.event_loop.create_task(self._process_gcode_notifications())
async def _process_gcode_notifications(self) -> None:
while self.pending_gcode_notificatons:
coro = self.pending_gcode_notificatons.pop(0)
await coro
self._gcode_queue_busy = False
def notify_filelist_changed(self, def notify_filelist_changed(self,
action: str, action: str,
@@ -1980,10 +2017,7 @@ class MetadataStorage:
except Exception: except Exception:
logging.debug(f"Error removing thumb at {thumb_path}") logging.debug(f"Error removing thumb at {thumb_path}")
async def move_directory_metadata(self, def move_directory_metadata(self, prev_dir: str, new_dir: str) -> None:
prev_dir: str,
new_dir: str
) -> None:
if prev_dir[-1] != "/": if prev_dir[-1] != "/":
prev_dir += "/" prev_dir += "/"
moved: List[Tuple[str, str, Dict[str, Any]]] = [] moved: List[Tuple[str, str, Dict[str, Any]]] = []
@@ -2000,14 +2034,12 @@ class MetadataStorage:
source = [m[0] for m in moved] source = [m[0] for m in moved]
dest = [m[1] for m in moved] dest = [m[1] for m in moved]
self.mddb.move_batch(source, dest) self.mddb.move_batch(source, dest)
eventloop = self.server.get_event_loop() # It shouldn't be necessary to move the thumbnails
await eventloop.run_in_thread(self._move_thumbnails, moved) # as they would be moved with the parent directory
async def move_file_metadata(self, def move_file_metadata(
prev_fname: str, self, prev_fname: str, new_fname: str
new_fname: str, ) -> Union[bool, Awaitable]:
move_thumbs: bool = True
) -> bool:
metadata: Optional[Dict[str, Any]] metadata: Optional[Dict[str, Any]]
metadata = self.metadata.pop(prev_fname, None) metadata = self.metadata.pop(prev_fname, None)
if metadata is None: if metadata is None:
@@ -2017,18 +2049,15 @@ class MetadataStorage:
if self.metadata.pop(new_fname, None) is not None: if self.metadata.pop(new_fname, None) is not None:
self.mddb.pop(new_fname, None) self.mddb.pop(new_fname, None)
return False return False
self.metadata[new_fname] = metadata self.metadata[new_fname] = metadata
self.mddb.move_batch([prev_fname], [new_fname]) self.mddb.move_batch([prev_fname], [new_fname])
if move_thumbs: return self._move_thumbnails([(prev_fname, new_fname, metadata)])
eventloop = self.server.get_event_loop()
await eventloop.run_in_thread(
self._move_thumbnails,
[(prev_fname, new_fname, metadata)])
return True
def _move_thumbnails(self, async def _move_thumbnails(
records: List[Tuple[str, str, Dict[str, Any]]] self, records: List[Tuple[str, str, Dict[str, Any]]]
) -> None: ) -> None:
eventloop = self.server.get_event_loop()
for (prev_fname, new_fname, metadata) in records: for (prev_fname, new_fname, metadata) in records:
prev_dir = os.path.dirname(os.path.join(self.gc_path, prev_fname)) prev_dir = os.path.dirname(os.path.join(self.gc_path, prev_fname))
new_dir = os.path.dirname(os.path.join(self.gc_path, new_fname)) new_dir = os.path.dirname(os.path.join(self.gc_path, new_fname))
@@ -2042,12 +2071,21 @@ class MetadataStorage:
if not os.path.isfile(thumb_path): if not os.path.isfile(thumb_path):
continue continue
new_path = os.path.join(new_dir, path) new_path = os.path.join(new_dir, path)
new_parent = os.path.dirname(new_path)
try: try:
os.makedirs(os.path.dirname(new_path), exist_ok=True) if not os.path.exists(new_parent):
shutil.move(thumb_path, new_path) os.mkdir(new_parent)
# Wait for inotify to register the node before the move
await asyncio.sleep(.2)
await eventloop.run_in_thread(
shutil.move, thumb_path, new_path
)
except asyncio.CancelledError:
raise
except Exception: except Exception:
logging.debug(f"Error moving thumb from {thumb_path}" logging.exception(
f" to {new_path}") f"Error moving thumb from {thumb_path} to {new_path}"
)
def parse_metadata(self, def parse_metadata(self,
fname: str, fname: str,