file_manager: implement configurable fs observer

Currently the choices are "none" and "inotify".

Signed-off-by:  Eric Callahan <arksine.code@gmail.com>
This commit is contained in:
Eric Callahan 2023-02-18 14:04:21 -05:00
parent 1e97571aa8
commit 2fd668bf0d
No known key found for this signature in database
GPG Key ID: 5A1EB336DFB4C71B

View File

@ -34,6 +34,7 @@ from typing import (
Awaitable,
Callable,
TypeVar,
Type,
cast,
)
@ -79,7 +80,21 @@ class FileManager:
)
self.gcode_metadata = MetadataStorage(config, db)
self.sync_lock = NotifySyncLock(config)
self.inotify_handler = INotifyHandler(
avail_observers: Dict[str, Type[BaseFileSystemObserver]] = {
"none": BaseFileSystemObserver,
"inotify": InotifyObserver
}
observer = config.get("file_system_observer", "inotify").lower()
obs_class = avail_observers.get(observer)
if obs_class is None:
self.server.add_warning(
f"[file_manager]: Invalid value '{observer}' for option "
"'file_system_observer'. Falling back to no observer."
)
obs_class = BaseFileSystemObserver
logging.info(f"Using File System Observer: {observer}")
self.no_observe = observer == "none"
self.fs_observer = obs_class(
config, self, self.gcode_metadata, self.sync_lock
)
self.scheduled_notifications: Dict[str, asyncio.TimerHandle] = {}
@ -141,7 +156,7 @@ class FileManager:
self.gcode_metadata.prune_storage()
async def component_init(self):
self.inotify_handler.initalize_roots()
self.fs_observer.initialize()
def _update_fixed_paths(self) -> None:
kinfo = self.server.get_klippy_info()
@ -264,11 +279,9 @@ class FileManager:
self.gcode_metadata.update_gcode_path(path)
if full_access:
# Refresh the file list and add watches
self.inotify_handler.add_root_watch(root, path)
self.fs_observer.add_root_watch(root, path)
elif self.server.is_running():
self.event_loop.register_callback(
self.inotify_handler.notify_filelist_changed,
"root_update", root, path)
self._sched_changed_event("root_update", root, path, immediate=True)
return True
def check_reserved_path(
@ -983,7 +996,8 @@ class FileManager:
root: str,
full_path: str,
source_root: Optional[str] = None,
source_path: Optional[str] = None
source_path: Optional[str] = None,
immediate: bool = False
) -> Dict[str, Any]:
rel_path = self.get_relative_path(root, full_path)
path_info = self.get_path_info(full_path, root, raise_error=False)
@ -995,9 +1009,14 @@ class FileManager:
if source_path is not None and source_root is not None:
src_rel_path = self.get_relative_path(source_root, source_path)
notify_info['source_item'] = {'path': src_rel_path, 'root': source_root}
immediate |= self.no_observe
delay = .005 if immediate else 1.
key = f"{action}-{root}-{rel_path}"
handle = self.event_loop.delay_callback(1., self._do_notify, key, notify_info)
self.scheduled_notifications[key] = handle
handle = self.event_loop.delay_callback(
delay, self._do_notify, key, notify_info
)
if not immediate:
self.scheduled_notifications[key] = handle
return notify_info
def _do_notify(self, key: str, notify_info: Dict[str, Any]) -> None:
@ -1013,7 +1032,7 @@ class FileManager:
for hdl in self.scheduled_notifications.values():
hdl.cancel()
self.scheduled_notifications.clear()
self.inotify_handler.close()
self.fs_observer.close()
class NotifySyncLock(asyncio.Lock):
@ -1036,7 +1055,7 @@ class NotifySyncLock(asyncio.Lock):
"Cannot call setup unless the lock has been acquired"
)
# Called by a file manager request. Sets the destination path to sync
# with the inotify handler.
# with the file system observer (inotify).
if self.dest_path is not None:
logging.debug(
"NotifySync Error: Setup requested while a path is still pending"
@ -1081,7 +1100,7 @@ class NotifySyncLock(asyncio.Lock):
self.move_copy_fut = None
def finish(self) -> None:
# Called by a file manager request upon completion. The inotify handler
# Called by a file manager request upon completion. The inotify observer
# can now emit the websocket notification
for waiter in self.sync_waiters:
if not waiter.done():
@ -1099,7 +1118,7 @@ class NotifySyncLock(asyncio.Lock):
self.check_pending = False
def add_pending_path(self, action: str, pending_path: StrOrPath) -> None:
# Called by the inotify handler whenever a create or move event
# Called by the inotify observer whenever a create or move event
# is detected. This is only necessary to track for move/copy actions,
# since we don't get the final destination until the request is complete.
if (
@ -1116,9 +1135,9 @@ class NotifySyncLock(asyncio.Lock):
def check_in_request(
self, action: str, inotify_path: StrOrPath
) -> Optional[asyncio.Future]:
# Called by the inotify handler to check if request synchronization
# Called by the inotify observer to check if request synchronization
# is necessary. If so, this method will return a future the inotify
# handler can await.
# observer can await.
if self.dest_path is None:
return None
if isinstance(inotify_path, str):
@ -1166,21 +1185,49 @@ class NotifySyncLock(asyncio.Lock):
self.finish()
class BaseFileSystemObserver:
def __init__(
self,
config: ConfigHelper,
file_manager: FileManager,
gcode_metadata: MetadataStorage,
sync_lock: NotifySyncLock
) -> None:
self.server = config.get_server()
self.event_loop = self.server.get_event_loop()
self.enable_warn = config.getboolean("enable_observer_warnings", True)
self.file_manager = file_manager
self.gcode_metadata = gcode_metadata
self.sync_lock = sync_lock
def initialize(self) -> None:
pass
def add_root_watch(self, root: str, root_path: str) -> None:
# Just emit the notification
if self.server.is_running():
fm = self.file_manager
fm._sched_changed_event("root_update", root, root_path, immediate=True)
def close(self) -> None:
pass
INOTIFY_BUNDLE_TIME = .25
INOTIFY_MOVE_TIME = 1.
class InotifyNode:
def __init__(self,
ihdlr: INotifyHandler,
iobsvr: InotifyObserver,
parent: InotifyNode,
name: str
) -> None:
self.ihdlr = ihdlr
self.event_loop = ihdlr.event_loop
self.iobsvr = iobsvr
self.event_loop = iobsvr.event_loop
self.name = name
self.parent_node = parent
self.child_nodes: Dict[str, InotifyNode] = {}
self.watch_desc = self.ihdlr.add_watch(self)
self.watch_desc = self.iobsvr.add_watch(self)
self.pending_node_events: Dict[str, asyncio.Handle] = {}
self.pending_deleted_children: Set[Tuple[str, bool]] = set()
self.pending_file_events: Dict[str, str] = {}
@ -1204,11 +1251,11 @@ class InotifyNode:
mfuts = [e.wait() for e in mevts]
await asyncio.gather(*mfuts)
self.is_processing_metadata = False
self.ihdlr.log_nodes()
self.ihdlr.notify_filelist_changed(
self.iobsvr.log_nodes()
self.iobsvr.notify_filelist_changed(
"create_dir", root, node_path)
for args in self.queued_move_notificatons:
self.ihdlr.notify_filelist_changed(*args)
self.iobsvr.notify_filelist_changed(*args)
self.queued_move_notificatons.clear()
def _finish_delete_child(self) -> None:
@ -1225,8 +1272,8 @@ class InotifyNode:
for (name, is_node) in self.pending_deleted_children:
item_path = os.path.join(node_path, name)
item_type = "dir" if is_node else "file"
self.ihdlr.clear_metadata(root, item_path, is_node)
self.ihdlr.notify_filelist_changed(
self.iobsvr.clear_metadata(root, item_path, is_node)
self.iobsvr.notify_filelist_changed(
f"delete_{item_type}", root, item_path)
self.pending_deleted_children.clear()
@ -1242,14 +1289,14 @@ class InotifyNode:
for fname in os.listdir(dir_path):
item_path = os.path.join(dir_path, fname)
if os.path.isdir(item_path):
fm = self.ihdlr.file_manager
fm = self.iobsvr.file_manager
if fm.check_reserved_path(item_path, True, False):
continue
new_child = self.create_child_node(fname, False)
if new_child is not None:
metadata_events.extend(new_child.scan_node(visited_dirs))
elif os.path.isfile(item_path) and self.get_root() == "gcodes":
mevt = self.ihdlr.parse_gcode_metadata(item_path)
mevt = self.iobsvr.parse_gcode_metadata(item_path)
metadata_events.append(mevt)
return metadata_events
@ -1272,7 +1319,7 @@ class InotifyNode:
new_root = child_node.get_root()
logging.debug(f"Moving node from '{prev_path}' to '{new_path}'")
# Attempt to move metadata
move_res = self.ihdlr.try_move_metadata(
move_res = self.iobsvr.try_move_metadata(
prev_root, new_root, prev_path, new_path, is_dir=True
)
if new_root == "gcodes":
@ -1283,12 +1330,12 @@ class InotifyNode:
if mevts:
mfuts = [e.wait() for e in mevts]
await asyncio.gather(*mfuts)
self.ihdlr.notify_filelist_changed(
self.iobsvr.notify_filelist_changed(
"move_dir", new_root, new_path, prev_root, prev_path
)
self.ihdlr.queue_gcode_notificaton(_notify_move_dir())
self.iobsvr.queue_gcode_notificaton(_notify_move_dir())
else:
self.ihdlr.notify_filelist_changed(
self.iobsvr.notify_filelist_changed(
"move_dir", new_root, new_path, prev_root, prev_path
)
@ -1320,15 +1367,15 @@ class InotifyNode:
root = self.get_root()
if root == "gcodes":
async def _notify_file_write():
mevt = self.ihdlr.parse_gcode_metadata(file_path)
mevt = self.iobsvr.parse_gcode_metadata(file_path)
if os.path.splitext(file_path)[1].lower() == ".ufp":
# don't notify .ufp files
return
await mevt.wait()
self.ihdlr.notify_filelist_changed(evt_name, root, file_path)
self.ihdlr.queue_gcode_notificaton(_notify_file_write())
self.iobsvr.notify_filelist_changed(evt_name, root, file_path)
self.iobsvr.queue_gcode_notificaton(_notify_file_write())
else:
self.ihdlr.notify_filelist_changed(evt_name, root, file_path)
self.iobsvr.notify_filelist_changed(evt_name, root, file_path)
def add_child_node(self, node: InotifyNode) -> None:
self.child_nodes[node.name] = node
@ -1348,7 +1395,7 @@ class InotifyNode:
if name in self.child_nodes:
return self.child_nodes[name]
try:
new_child = InotifyNode(self.ihdlr, self, name)
new_child = InotifyNode(self.iobsvr, self, name)
except Exception:
# This node is already watched under another root,
# bypass creation
@ -1368,7 +1415,7 @@ class InotifyNode:
child_node = self.child_nodes.pop(child_name, None)
if child_node is None:
return
self.ihdlr.remove_watch(
self.iobsvr.remove_watch(
child_node.watch_desc, need_low_level_rm=False)
child_node.remove_event("delete_child")
self.pending_deleted_children.add((child_name, is_node))
@ -1378,7 +1425,7 @@ class InotifyNode:
for cnode in self.child_nodes.values():
# Delete all of the children's children
cnode.clear_watches()
self.ihdlr.remove_watch(self.watch_desc)
self.iobsvr.remove_watch(self.watch_desc)
def get_path(self) -> str:
return os.path.join(self.parent_node.get_path(), self.name)
@ -1454,22 +1501,22 @@ class InotifyNode:
):
self.queued_move_notificatons.append(args)
else:
if self.ihdlr.server.is_verbose_enabled():
if self.iobsvr.server.is_verbose_enabled():
path = self.get_path()
logging.debug(
f"Node {path} received a move notification queue request, "
f"however node is not pending: {args}"
)
self.ihdlr.notify_filelist_changed(*args)
self.iobsvr.notify_filelist_changed(*args)
class InotifyRootNode(InotifyNode):
def __init__(self,
ihdlr: INotifyHandler,
iobsvr: InotifyObserver,
root_name: str,
root_path: str
) -> None:
self.root_name = root_name
super().__init__(ihdlr, self, root_path)
super().__init__(iobsvr, self, root_path)
def get_path(self) -> str:
return self.name
@ -1493,7 +1540,7 @@ class InotifyRootNode(InotifyNode):
return self
return None
class INotifyHandler:
class InotifyObserver(BaseFileSystemObserver):
def __init__(
self,
config: ConfigHelper,
@ -1501,12 +1548,10 @@ class INotifyHandler:
gcode_metadata: MetadataStorage,
sync_lock: NotifySyncLock
) -> None:
self.server = config.get_server()
self.event_loop = self.server.get_event_loop()
self.enable_warn = config.getboolean("enable_inotify_warnings", True)
self.file_manager = file_manager
self.gcode_metadata = gcode_metadata
self.sync_lock = sync_lock
super().__init__(config, file_manager, gcode_metadata, sync_lock)
self.enable_warn = config.getboolean(
"enable_inotify_warnings", self.enable_warn, deprecate=True
)
self.inotify = INotify(nonblocking=True)
self.event_loop.add_reader(
self.inotify.fileno(), self._handle_inotify_read)
@ -1536,7 +1581,7 @@ class INotifyHandler:
self.event_loop.register_callback(
self._notify_root_updated, mevts, root, root_path)
def initalize_roots(self):
def initialize(self) -> None:
for root, node in self.watched_roots.items():
evts = node.scan_node()
if not evts: