diff --git a/moonraker/components/update_manager/git_deploy.py b/moonraker/components/update_manager/git_deploy.py index 33f9e3a..66896cd 100644 --- a/moonraker/components/update_manager/git_deploy.py +++ b/moonraker/components/update_manager/git_deploy.py @@ -26,7 +26,6 @@ from typing import ( if TYPE_CHECKING: from ...confighelper import ConfigHelper from ..shell_command import ShellCommand - from ..machine import Machine from .update_manager import CommandHelper from ..http_client import HttpClient @@ -177,9 +176,6 @@ class GitDeploy(AppDeploy): else: self.repo.set_rollback_state(rb_state) - async def close(self) -> None: - await self.repo.unset_current_instance() - GIT_ASYNC_TIMEOUT = 300. GIT_ENV_VARS = { @@ -232,7 +228,6 @@ class GitRepo: self.repo_warnings: List[str] = [] self.repo_anomalies: List[str] = [] - self.managing_instances: List[str] = [] self.init_evt: Optional[asyncio.Event] = None self.initialized: bool = False self.git_operation_lock = asyncio.Lock() @@ -275,8 +270,6 @@ class GitRepo: self.rollback_version = GitVersion(str(rbv)) if not await self._detect_git_dir(): self.valid_git_repo = False - if self.valid_git_repo: - await self.set_current_instance() self._check_warnings() def get_persistent_data(self) -> Dict[str, Any]: @@ -434,7 +427,6 @@ class GitRepo: if ext in SRC_EXTS: self.untracked_files.append(fname) self.valid_git_repo = True - await self.set_current_instance() return True async def _detect_git_dir(self) -> bool: @@ -713,12 +705,6 @@ class GitRepo: "Repo is dirty. Detected the following modifed files: " f"{self.modified_files}" ) - if len(self.managing_instances) > 1: - instances = "\n".join([f" {ins}" for ins in self.managing_instances]) - self.repo_anomalies.append( - f"Multiple instances of Moonraker managing this repo:\n" - f"{instances}" - ) self._generate_warn_msg() def _generate_warn_msg(self) -> str: @@ -990,50 +976,6 @@ class GitRepo: # Return tagged commits as SHA keys mapped to tag values return tagged_commits - async def set_current_instance(self) -> None: - # Check to see if multiple instances of Moonraker are configured - # to manage this repo - full_id = self._get_instance_id() - self.managing_instances.clear() - try: - instances = await self.config_get( - "moonraker.instance", get_all=True, local_only=True - ) - if instances is None: - await self.config_set("moonraker.instance", full_id) - self.managing_instances = [full_id] - else: - det_instances = [ - ins.strip() for ins in instances.split("\n") if ins.strip() - ] - if full_id not in det_instances: - await self.config_add("moonraker.instance", full_id) - det_instances.append(full_id) - self.managing_instances = det_instances - except asyncio.CancelledError: - raise - except Exception as e: - logging.info( - f"Git Repo {self.alias}: Moonraker Instance Validation Error, {e}" - ) - - async def unset_current_instance(self) -> None: - full_id = self._get_instance_id() - if full_id not in self.managing_instances: - return - try: - await self.config_unset("moonraker.instance", pattern=full_id) - except asyncio.CancelledError: - raise - except Exception as e: - logging.info(f"Git repo {self.alias}: Error removing instance, {e}") - - def _get_instance_id(self) -> str: - machine: Machine = self.server.lookup_component("machine") - cur_name = machine.unit_name - cur_uuid: str = self.server.get_app_args()["instance_uuid"] - return f"{cur_name}@{cur_uuid}" - def get_repo_status(self) -> Dict[str, Any]: no_untrk_src = len(self.untracked_files) == 0 return { diff --git a/moonraker/components/update_manager/update_manager.py b/moonraker/components/update_manager/update_manager.py index 652caa7..1181426 100644 --- a/moonraker/components/update_manager/update_manager.py +++ b/moonraker/components/update_manager/update_manager.py @@ -64,6 +64,7 @@ class UpdateManager: def __init__(self, config: ConfigHelper) -> None: self.server = config.get_server() self.event_loop = self.server.get_event_loop() + self.instance_tracker = InstanceTracker(self.server) self.kconn: KlippyConnection self.kconn = self.server.lookup_component("klippy_connection") self.app_config = get_base_configuration(config) @@ -178,6 +179,7 @@ class UpdateManager: return self.updaters async def component_init(self) -> None: + self.instance_tracker.set_instance_id() # Prune stale data from the database umdb = self.cmd_helper.get_umdb() db_keys = await umdb.keys() @@ -495,6 +497,7 @@ class UpdateManager: async def close(self) -> None: if self.refresh_timer is not None: self.refresh_timer.stop() + self.instance_tracker.close() for updater in self.updaters.values(): ret = updater.close() if ret is not None: @@ -665,6 +668,92 @@ class CommandHelper: eventloop = self.server.get_event_loop() return await eventloop.run_in_thread(_createdir, suffix, prefix) +class InstanceTracker: + def __init__(self, server: Server) -> None: + self.server = server + self.inst_id = b"" + self.shm = self._try_open_shm() + + def _try_open_shm(self) -> Any: + prev_mask = os.umask(0) + try: + from multiprocessing.shared_memory import SharedMemory + setattr(SharedMemory, "_mode", 438) + try: + return SharedMemory("moonraker_instance_ids", True, 4096) + except FileExistsError: + return SharedMemory("moonraker_instance_ids") + except Exception as e: + self.server.add_log_rollover_item( + "um_multi_instance_msg", + "Failed to open shared memory, update_manager instance tracking " + f"disabled.\n{e.__class__.__name__}: {e}" + ) + return None + finally: + os.umask(prev_mask) + + def get_instance_id(self) -> bytes: + machine: Machine = self.server.lookup_component("machine") + cur_name = "".join(machine.unit_name.split()) + cur_uuid: str = self.server.get_app_args()["instance_uuid"] + pid = os.getpid() + return f"{cur_name}:{cur_uuid}:{pid}".encode(errors="ignore") + + def _read_instance_ids(self) -> List[bytes]: + if self.shm is not None: + try: + data = bytearray(self.shm.buf) + idx = data.find(b"\x00") + if idx > 1: + return bytes(data[:idx]).strip().splitlines() + except Exception: + logging.exception("Failed to Read Shared Memory") + return [] + + def set_instance_id(self) -> None: + if self.shm is None: + return + self.inst_id = self.get_instance_id() + iids = self._read_instance_ids() + if self.inst_id not in iids: + iids.append(self.inst_id) + if len(iids) > 1: + id_str = "\n".join([iid.decode(errors="ignore") for iid in iids]) + self.server.add_log_rollover_item( + "um_multi_instance_msg", + "Multiple instances of Moonraker have the update manager enabled." + f"\n{id_str}" + ) + encoded_ids = b"\n".join(iids) + b"\x00" + if len(encoded_ids) > self.shm.size: + iid = self.inst_id.decode(errors="ignore") + logging.info(f"Not enough storage in shared memory for id {iid}") + return + try: + buf: memoryview = self.shm.buf + buf[:len(encoded_ids)] = encoded_ids + except Exception: + logging.exception("Failed to Write Shared Memory") + + def close(self) -> None: + if self.shm is None: + return + # Remove current id and clean up shared memory + iids = self._read_instance_ids() + if self.inst_id in iids: + iids.remove(self.inst_id) + try: + buf: memoryview = self.shm.buf + null_len = min(self.shm.size, max(len(self.inst_id), 10)) + data = b"\n".join(iids) + b"\x00" if iids else b"\x00" * null_len + buf[:len(data)] = data + self.shm.close() + if not iids: + self.shm.unlink() + except Exception: + logging.exception("Failed to write/close shared memory") + def load_component(config: ConfigHelper) -> UpdateManager: return UpdateManager(config)