update_manager: improve instance detection

Implement instance package wide for the update_manager.  This
only requires detecting multiple instances once and eliminates
costly calls to git.

Signed-off-by:  Eric Callahan <arksine.code@gmail.com>
This commit is contained in:
Eric Callahan 2024-03-10 12:10:41 -04:00
parent 6b1b8c5102
commit 10dfb0d477
No known key found for this signature in database
GPG Key ID: 5A1EB336DFB4C71B
2 changed files with 89 additions and 58 deletions

View File

@ -26,7 +26,6 @@ from typing import (
if TYPE_CHECKING:
from ...confighelper import ConfigHelper
from ..shell_command import ShellCommand
from ..machine import Machine
from .update_manager import CommandHelper
from ..http_client import HttpClient
@ -177,9 +176,6 @@ class GitDeploy(AppDeploy):
else:
self.repo.set_rollback_state(rb_state)
async def close(self) -> None:
await self.repo.unset_current_instance()
GIT_ASYNC_TIMEOUT = 300.
GIT_ENV_VARS = {
@ -232,7 +228,6 @@ class GitRepo:
self.repo_warnings: List[str] = []
self.repo_anomalies: List[str] = []
self.managing_instances: List[str] = []
self.init_evt: Optional[asyncio.Event] = None
self.initialized: bool = False
self.git_operation_lock = asyncio.Lock()
@ -275,8 +270,6 @@ class GitRepo:
self.rollback_version = GitVersion(str(rbv))
if not await self._detect_git_dir():
self.valid_git_repo = False
if self.valid_git_repo:
await self.set_current_instance()
self._check_warnings()
def get_persistent_data(self) -> Dict[str, Any]:
@ -434,7 +427,6 @@ class GitRepo:
if ext in SRC_EXTS:
self.untracked_files.append(fname)
self.valid_git_repo = True
await self.set_current_instance()
return True
async def _detect_git_dir(self) -> bool:
@ -713,12 +705,6 @@ class GitRepo:
"Repo is dirty. Detected the following modifed files: "
f"{self.modified_files}"
)
if len(self.managing_instances) > 1:
instances = "\n".join([f" {ins}" for ins in self.managing_instances])
self.repo_anomalies.append(
f"Multiple instances of Moonraker managing this repo:\n"
f"{instances}"
)
self._generate_warn_msg()
def _generate_warn_msg(self) -> str:
@ -990,50 +976,6 @@ class GitRepo:
# Return tagged commits as SHA keys mapped to tag values
return tagged_commits
async def set_current_instance(self) -> None:
# Check to see if multiple instances of Moonraker are configured
# to manage this repo
full_id = self._get_instance_id()
self.managing_instances.clear()
try:
instances = await self.config_get(
"moonraker.instance", get_all=True, local_only=True
)
if instances is None:
await self.config_set("moonraker.instance", full_id)
self.managing_instances = [full_id]
else:
det_instances = [
ins.strip() for ins in instances.split("\n") if ins.strip()
]
if full_id not in det_instances:
await self.config_add("moonraker.instance", full_id)
det_instances.append(full_id)
self.managing_instances = det_instances
except asyncio.CancelledError:
raise
except Exception as e:
logging.info(
f"Git Repo {self.alias}: Moonraker Instance Validation Error, {e}"
)
async def unset_current_instance(self) -> None:
full_id = self._get_instance_id()
if full_id not in self.managing_instances:
return
try:
await self.config_unset("moonraker.instance", pattern=full_id)
except asyncio.CancelledError:
raise
except Exception as e:
logging.info(f"Git repo {self.alias}: Error removing instance, {e}")
def _get_instance_id(self) -> str:
machine: Machine = self.server.lookup_component("machine")
cur_name = machine.unit_name
cur_uuid: str = self.server.get_app_args()["instance_uuid"]
return f"{cur_name}@{cur_uuid}"
def get_repo_status(self) -> Dict[str, Any]:
no_untrk_src = len(self.untracked_files) == 0
return {

View File

@ -64,6 +64,7 @@ class UpdateManager:
def __init__(self, config: ConfigHelper) -> None:
self.server = config.get_server()
self.event_loop = self.server.get_event_loop()
self.instance_tracker = InstanceTracker(self.server)
self.kconn: KlippyConnection
self.kconn = self.server.lookup_component("klippy_connection")
self.app_config = get_base_configuration(config)
@ -178,6 +179,7 @@ class UpdateManager:
return self.updaters
async def component_init(self) -> None:
self.instance_tracker.set_instance_id()
# Prune stale data from the database
umdb = self.cmd_helper.get_umdb()
db_keys = await umdb.keys()
@ -495,6 +497,7 @@ class UpdateManager:
async def close(self) -> None:
if self.refresh_timer is not None:
self.refresh_timer.stop()
self.instance_tracker.close()
for updater in self.updaters.values():
ret = updater.close()
if ret is not None:
@ -665,6 +668,92 @@ class CommandHelper:
eventloop = self.server.get_event_loop()
return await eventloop.run_in_thread(_createdir, suffix, prefix)
class InstanceTracker:
def __init__(self, server: Server) -> None:
self.server = server
self.inst_id = b""
self.shm = self._try_open_shm()
def _try_open_shm(self) -> Any:
prev_mask = os.umask(0)
try:
from multiprocessing.shared_memory import SharedMemory
setattr(SharedMemory, "_mode", 438)
try:
return SharedMemory("moonraker_instance_ids", True, 4096)
except FileExistsError:
return SharedMemory("moonraker_instance_ids")
except Exception as e:
self.server.add_log_rollover_item(
"um_multi_instance_msg",
"Failed to open shared memory, update_manager instance tracking "
f"disabled.\n{e.__class__.__name__}: {e}"
)
return None
finally:
os.umask(prev_mask)
def get_instance_id(self) -> bytes:
machine: Machine = self.server.lookup_component("machine")
cur_name = "".join(machine.unit_name.split())
cur_uuid: str = self.server.get_app_args()["instance_uuid"]
pid = os.getpid()
return f"{cur_name}:{cur_uuid}:{pid}".encode(errors="ignore")
def _read_instance_ids(self) -> List[bytes]:
if self.shm is not None:
try:
data = bytearray(self.shm.buf)
idx = data.find(b"\x00")
if idx > 1:
return bytes(data[:idx]).strip().splitlines()
except Exception:
logging.exception("Failed to Read Shared Memory")
return []
def set_instance_id(self) -> None:
if self.shm is None:
return
self.inst_id = self.get_instance_id()
iids = self._read_instance_ids()
if self.inst_id not in iids:
iids.append(self.inst_id)
if len(iids) > 1:
id_str = "\n".join([iid.decode(errors="ignore") for iid in iids])
self.server.add_log_rollover_item(
"um_multi_instance_msg",
"Multiple instances of Moonraker have the update manager enabled."
f"\n{id_str}"
)
encoded_ids = b"\n".join(iids) + b"\x00"
if len(encoded_ids) > self.shm.size:
iid = self.inst_id.decode(errors="ignore")
logging.info(f"Not enough storage in shared memory for id {iid}")
return
try:
buf: memoryview = self.shm.buf
buf[:len(encoded_ids)] = encoded_ids
except Exception:
logging.exception("Failed to Write Shared Memory")
def close(self) -> None:
if self.shm is None:
return
# Remove current id and clean up shared memory
iids = self._read_instance_ids()
if self.inst_id in iids:
iids.remove(self.inst_id)
try:
buf: memoryview = self.shm.buf
null_len = min(self.shm.size, max(len(self.inst_id), 10))
data = b"\n".join(iids) + b"\x00" if iids else b"\x00" * null_len
buf[:len(data)] = data
self.shm.close()
if not iids:
self.shm.unlink()
except Exception:
logging.exception("Failed to write/close shared memory")
def load_component(config: ConfigHelper) -> UpdateManager:
return UpdateManager(config)