From 683d93a894b218d65fb6c0603fd71df9ebfaa5c9 Mon Sep 17 00:00:00 2001 From: Eric Callahan Date: Sat, 27 Apr 2024 10:24:44 -0400 Subject: [PATCH] utils: add file lock utility Uses linux flock to create lock files that can be used to protect access across multiple processes. Signed-off-by: Eric Callahan --- moonraker/utils/filelock.py | 110 ++++++++++++++++++++++++++++++++++++ 1 file changed, 110 insertions(+) create mode 100644 moonraker/utils/filelock.py diff --git a/moonraker/utils/filelock.py b/moonraker/utils/filelock.py new file mode 100644 index 0000000..cdab0d0 --- /dev/null +++ b/moonraker/utils/filelock.py @@ -0,0 +1,110 @@ +# Async file locking using flock +# +# Copyright (C) 2024 Eric Callahan +# +# This file may be distributed under the terms of the GNU GPLv3 license + +from __future__ import annotations +import os +import fcntl +import errno +import logging +import pathlib +import contextlib +import asyncio +from . import ServerError +from typing import Optional, Type +from types import TracebackType + +class LockTimeout(ServerError): + pass + +class AsyncExclusiveFileLock(contextlib.AbstractAsyncContextManager): + def __init__( + self, file_path: pathlib.Path, timeout: int = 0 + ) -> None: + self.lock_path = file_path.parent.joinpath(f".{file_path.name}.lock") + self.timeout = timeout + self.fd: int = -1 + self.locked: bool = False + self.required_wait: bool = False + + async def __aenter__(self) -> AsyncExclusiveFileLock: + await self.acquire() + return self + + async def __aexit__( + self, + __exc_type: Optional[Type[BaseException]], + __exc_value: Optional[BaseException], + __traceback: Optional[TracebackType] + ) -> None: + await self.release() + + def _get_lock(self) -> bool: + flags = os.O_RDWR | os.O_CREAT | os.O_TRUNC + fd = os.open(str(self.lock_path), flags, 0o644) + with contextlib.suppress(PermissionError): + os.chmod(fd, 0o644) + try: + fcntl.flock(fd, fcntl.LOCK_EX | fcntl.LOCK_NB) + except OSError as err: + os.close(fd) + if err.errno == errno.ENOSYS: + raise + return False + stat = os.fstat(fd) + if stat.st_nlink == 0: + # File was deleted before opening and after acquiring + # lock, create a new one + os.close(fd) + return False + self.fd = fd + return True + + async def acquire(self) -> None: + self.required_wait = False + if self.timeout < 0: + return + loop = asyncio.get_running_loop() + endtime = loop.time() + self.timeout + logged: bool = False + while True: + try: + self.locked = await loop.run_in_executor(None, self._get_lock) + except OSError as err: + logging.info( + "Failed to aquire advisory lock, allowing unlocked entry." + f"Error: {err}" + ) + self.locked = False + return + if self.locked: + return + self.required_wait = True + await asyncio.sleep(.25) + if not logged: + logged = True + logging.info( + f"File lock {self.lock_path} is currently acquired by another " + "process, waiting for release." + ) + if self.timeout > 0 and endtime >= loop.time(): + raise LockTimeout( + f"Attempt to acquire lock '{self.lock_path}' timed out" + ) + + def _release_file(self) -> None: + with contextlib.suppress(OSError, PermissionError): + self.lock_path.unlink(missing_ok=True) + with contextlib.suppress(OSError, PermissionError): + fcntl.flock(self.fd, fcntl.LOCK_UN) + with contextlib.suppress(OSError, PermissionError): + os.close(self.fd) + + async def release(self) -> None: + if not self.locked: + return + loop = asyncio.get_running_loop() + await loop.run_in_executor(None, self._release_file) + self.locked = False \ No newline at end of file