Eric Callahan 51e3568aac
update_manager: add git repo instance detection
Use git-config to set track the current Moonraker instance managing a
a git repo.  If multiple  instances are detected log and create a repo
warning.

Singed-off-by:  Eric Callahan <arksine.code@gmail.com>
2023-07-13 07:58:40 -04:00

1203 lines
49 KiB
Python

# Git Deployment implementation
#
# Copyright (C) 2021 Eric Callahan <arksine.code@gmail.com>
#
# This file may be distributed under the terms of the GNU GPLv3 license.
from __future__ import annotations
import asyncio
import os
import pathlib
import shutil
import re
import logging
from .app_deploy import AppDeploy
from .common import Channel
from ...utils.versions import GitVersion
# Annotation imports
from typing import (
TYPE_CHECKING,
Any,
Optional,
Dict,
List,
)
if TYPE_CHECKING:
from ...confighelper import ConfigHelper
from ..shell_command import ShellCommand
from ..machine import Machine
from .update_manager import CommandHelper
from ..http_client import HttpClient
class GitDeploy(AppDeploy):
def __init__(self, config: ConfigHelper, cmd_helper: CommandHelper) -> None:
super().__init__(config, cmd_helper, "Git Repo")
self._configure_path(config)
self._configure_virtualenv(config)
self._configure_dependencies(config)
self.origin: str = config.get('origin')
self.moved_origin: Optional[str] = config.get('moved_origin', None)
self.primary_branch = config.get("primary_branch", "master")
self.repo = GitRepo(
cmd_helper, self.path, self.name, self.origin,
self.moved_origin, self.primary_branch, self.channel
)
async def initialize(self) -> Dict[str, Any]:
storage = await super().initialize()
await self.repo.restore_state(storage)
if not self.needs_refresh():
self.repo.log_repo_info()
return storage
async def refresh(self) -> None:
try:
await self._update_repo_state()
except Exception:
logging.exception("Error Refreshing git state")
async def _update_repo_state(self, need_fetch: bool = True) -> None:
self._is_valid = False
await self.repo.refresh_repo_state(need_fetch=need_fetch)
self.log_info(f"Channel: {self.channel}")
if not self.repo.check_is_valid():
self.log_info("Repo validation check failed")
if self.server.is_debug_enabled():
self._is_valid = True
self.log_info(
"Repo debug enabled, overriding validity checks")
else:
self.log_info("Updates on repo disabled")
else:
self._is_valid = True
self.log_info("Validity check for git repo passed")
self._save_state()
async def update(self) -> bool:
await self.repo.wait_for_init()
if not self._is_valid:
raise self.log_exc("Update aborted, repo not valid", False)
if self.repo.is_dirty():
raise self.log_exc(
"Update aborted, repo has been modified", False)
if self.repo.is_current():
# No need to update
return False
self.cmd_helper.notify_update_response(
f"Updating Application {self.name}...")
dep_info = await self._collect_dependency_info()
await self._pull_repo()
# Check Semantic Versions
await self._update_dependencies(dep_info)
# Refresh local repo state
await self._update_repo_state(need_fetch=False)
await self.restart_service()
self.notify_status("Update Finished...", is_complete=True)
return True
async def recover(self,
hard: bool = False,
force_dep_update: bool = False
) -> None:
self.notify_status("Attempting Repo Recovery...")
dep_info = await self._collect_dependency_info()
if hard:
await self.repo.clone()
if self.channel != Channel.DEV:
if self.repo.upstream_commit != "?":
# If on beta or stable reset to the latest tagged
# upstream commit
await self.repo.reset()
else:
self.notify_status(
f"No upstream commit for repo on {self.channel} channel, "
"skipping reset."
)
await self._update_repo_state()
else:
self.notify_status("Resetting Git Repo...")
await self.repo.reset()
await self._update_repo_state()
self.repo.set_rollback_state(None)
if self.repo.is_dirty() or not self._is_valid:
raise self.server.error(
"Recovery attempt failed, repo state not pristine", 500)
await self._update_dependencies(dep_info, force=force_dep_update)
await self.restart_service()
self.notify_status("Reinstall Complete", is_complete=True)
async def rollback(self) -> bool:
dep_info = await self._collect_dependency_info()
ret = await self.repo.rollback()
if ret:
await self._update_dependencies(dep_info)
await self._update_repo_state(need_fetch=False)
await self.restart_service()
msg = "Rollback Complete"
else:
msg = "Rollback not performed"
self.notify_status(msg, is_complete=True)
return ret
def get_update_status(self) -> Dict[str, Any]:
status = super().get_update_status()
status.update(self.repo.get_repo_status())
return status
def get_persistent_data(self) -> Dict[str, Any]:
storage = super().get_persistent_data()
storage.update(self.repo.get_persistent_data())
return storage
async def _pull_repo(self) -> None:
self.notify_status("Updating Repo...")
rb_state = self.repo.capture_state_for_rollback()
try:
await self.repo.fetch()
if self.repo.is_detached():
await self.repo.checkout()
elif await self.repo.check_diverged():
self.notify_status(
"Repo has diverged, attempting git reset"
)
await self.repo.reset()
else:
await self.repo.pull()
except Exception as e:
if self.repo.repo_corrupt:
self._is_valid = False
self._save_state()
event_loop = self.server.get_event_loop()
event_loop.delay_callback(
.2, self.cmd_helper.notify_update_refreshed
)
raise self.log_exc(str(e))
else:
self.repo.set_rollback_state(rb_state)
async def _collect_dependency_info(self) -> Dict[str, Any]:
pkg_deps = await self._read_system_dependencies()
pyreqs = await self._read_python_reqs()
npm_hash = await self._get_file_hash(self.npm_pkg_json)
logging.debug(
f"\nApplication {self.name}: Pre-update dependencies:\n"
f"Packages: {pkg_deps}\n"
f"Python Requirements: {pyreqs}"
)
return {
"system_packages": pkg_deps,
"python_modules": pyreqs,
"npm_hash": npm_hash
}
async def _update_dependencies(
self, dep_info: Dict[str, Any], force: bool = False
) -> None:
packages = await self._read_system_dependencies()
modules = await self._read_python_reqs()
logging.debug(
f"\nApplication {self.name}: Post-update dependencies:\n"
f"Packages: {packages}\n"
f"Python Requirements: {modules}"
)
if not force:
packages = list(set(packages) - set(dep_info["system_packages"]))
modules = list(set(modules) - set(dep_info["python_modules"]))
logging.debug(
f"\nApplication {self.name}: Dependencies to install:\n"
f"Packages: {packages}\n"
f"Python Requirements: {modules}\n"
f"Force All: {force}"
)
if packages:
await self._install_packages(packages)
if modules:
await self._update_python_requirements(self.python_reqs or modules)
npm_hash: Optional[str] = dep_info["npm_hash"]
ret = await self._check_need_update(npm_hash, self.npm_pkg_json)
if force or ret:
if self.npm_pkg_json is not None:
self.notify_status("Updating Node Packages...")
try:
await self.cmd_helper.run_cmd(
"npm ci --only=prod", notify=True, timeout=600.,
cwd=str(self.path))
except Exception:
self.notify_status("Node Package Update failed")
async def close(self) -> None:
await self.repo.unset_current_instance()
GIT_ASYNC_TIMEOUT = 300.
GIT_ENV_VARS = {
'GIT_HTTP_LOW_SPEED_LIMIT': "1000",
'GIT_HTTP_LOW_SPEED_TIME ': "20"
}
GIT_MAX_LOG_CNT = 100
GIT_LOG_FMT = (
"\"sha:%H%x1Dauthor:%an%x1Ddate:%ct%x1Dsubject:%s%x1Dmessage:%b%x1E\""
)
GIT_REF_FMT = (
"'%(if)%(*objecttype)%(then)%(*objecttype) %(*objectname)"
"%(else)%(objecttype) %(objectname)%(end) %(refname)'"
)
class GitRepo:
def __init__(
self,
cmd_helper: CommandHelper,
src_path: pathlib.Path,
alias: str,
origin_url: str,
moved_origin_url: Optional[str],
primary_branch: str,
channel: Channel
) -> None:
self.server = cmd_helper.get_server()
self.cmd_helper = cmd_helper
self.alias = alias
self.src_path = src_path
git_dir = src_path.parent
git_base = src_path.name
self.backup_path = git_dir.joinpath(f".{git_base}_repo_backup")
self.git_folder_path = src_path.joinpath(".git")
self.origin_url = origin_url
if not self.origin_url.endswith(".git"):
self.origin_url += ".git"
self.moved_origin_url = moved_origin_url
self.primary_branch = primary_branch
self.recovery_message = \
f"""
Manually restore via SSH with the following commands:
sudo service {self.alias} stop
cd {git_dir}
rm -rf {git_base}
git clone {self.origin_url}
sudo service {self.alias} start
"""
self.repo_warnings: List[str] = []
self.managing_instances: List[str] = []
self.init_evt: Optional[asyncio.Event] = None
self.initialized: bool = False
self.git_operation_lock = asyncio.Lock()
self.fetch_timeout_handle: Optional[asyncio.Handle] = None
self.fetch_input_recd: bool = False
self.channel = channel
async def restore_state(self, storage: Dict[str, Any]) -> None:
self.valid_git_repo: bool = storage.get('repo_valid', False)
self.git_owner: str = storage.get('git_owner', "?")
self.git_repo_name: str = storage.get('git_repo_name', "?")
self.git_remote: str = storage.get('git_remote', "?")
self.git_branch: str = storage.get('git_branch', "?")
if "full_version_string" in storage:
self.current_version = GitVersion(storage["full_version_string"])
else:
self.current_version = GitVersion(storage.get('current_version', "?"))
self.upstream_version = GitVersion(storage.get('upstream_version', "?"))
self.current_commit: str = storage.get('current_commit', "?")
self.upstream_commit: str = storage.get('upstream_commit', "?")
self.upstream_url: str = storage.get('upstream_url', "?")
self.recovery_url: str = storage.get(
'recovery_url',
self.upstream_url if self.git_remote == "origin" else "?"
)
self.branches: List[str] = storage.get('branches', [])
self.head_detached: bool = storage.get('head_detached', False)
self.git_messages: List[str] = storage.get('git_messages', [])
self.commits_behind: List[Dict[str, Any]] = storage.get('commits_behind', [])
self.diverged: bool = storage.get("diverged", False)
self.repo_corrupt: bool = storage.get('corrupt', False)
def_rbs = self.capture_state_for_rollback()
self.rollback_commit: str = storage.get('rollback_commit', self.current_commit)
self.rollback_branch: str = storage.get('rollback_branch', def_rbs["branch"])
rbv = storage.get('rollback_version', self.current_version)
self.rollback_version = GitVersion(str(rbv))
if self.valid_git_repo:
await self.set_current_instance()
self._check_warnings()
def get_persistent_data(self) -> Dict[str, Any]:
return {
'repo_valid': self.valid_git_repo,
'git_owner': self.git_owner,
'git_repo_name': self.git_repo_name,
'git_remote': self.git_remote,
'git_branch': self.git_branch,
'current_version': self.current_version.full_version,
'upstream_version': self.upstream_version.full_version,
'current_commit': self.current_commit,
'upstream_commit': self.upstream_commit,
'rollback_commit': self.rollback_commit,
'rollback_branch': self.rollback_branch,
'rollback_version': self.rollback_version.full_version,
'upstream_url': self.upstream_url,
'recovery_url': self.recovery_url,
'branches': self.branches,
'head_detached': self.head_detached,
'git_messages': self.git_messages,
'commits_behind': self.commits_behind,
'diverged': self.diverged,
'corrupt': self.repo_corrupt
}
async def refresh_repo_state(self, need_fetch: bool = True) -> None:
if self.init_evt is not None:
# No need to initialize multiple requests
await self.init_evt.wait()
if self.initialized:
return
self.initialized = False
self.init_evt = asyncio.Event()
self.git_messages.clear()
try:
await self._check_repo_status()
self._verify_repo()
await self._find_current_branch()
# Fetch the upstream url. If the repo has been moved,
# set the new url
self.upstream_url = await self.remote(f"get-url {self.git_remote}")
if await self._check_moved_origin():
need_fetch = True
if self.git_remote == "origin":
self.recovery_url = self.upstream_url
else:
remote_list = (await self.remote("")).splitlines()
logging.debug(
f"Git Repo {self.alias}: Detected Remotes - {remote_list}"
)
if "origin" in remote_list:
self.recovery_url = await self.remote("get-url origin")
else:
logging.info(
f"Git Repo {self.alias}: Unable to detect recovery URL, "
"Hard Recovery not available"
)
self.recovery_url = "?"
if need_fetch:
await self.fetch()
self.diverged = await self.check_diverged()
# Parse GitHub Owner from URL
owner_match = re.match(r"https?://[^/]+/([^/]+)", self.upstream_url)
self.git_owner = "?"
if owner_match is not None:
self.git_owner = owner_match.group(1)
# Parse GitHub Repository Name from URL
repo_match = re.match(r".*\/([^\.]*).*", self.upstream_url)
self.git_repo_name = "?"
if repo_match is not None:
self.git_repo_name = repo_match.group(1)
self.current_commit = await self.rev_parse("HEAD")
git_desc = await self.describe("--always --tags --long --dirty --abbrev=8")
cur_ver = GitVersion(git_desc.strip())
upstream_ver = await self._get_upstream_version()
await self._set_versions(cur_ver, upstream_ver)
# Get Commits Behind
self.commits_behind = []
cbh = await self.get_commits_behind()
if cbh:
tagged_commits = await self.get_tagged_commits()
debug_msg = '\n'.join([f"{k}: {v}" for k, v in tagged_commits.items()])
logging.debug(f"Git Repo {self.alias}: Tagged Commits\n{debug_msg}")
for i, commit in enumerate(cbh):
tag = tagged_commits.get(commit['sha'], None)
if i < 30 or tag is not None:
commit['tag'] = tag
self.commits_behind.append(commit)
self._check_warnings()
except Exception:
logging.exception(f"Git Repo {self.alias}: Initialization failure")
raise
else:
self.initialized = True
# If no exception was raised assume the repo is not corrupt
self.repo_corrupt = False
if self.rollback_commit == "?" or self.rollback_branch == "?":
# Reset Rollback State
self.set_rollback_state(None)
self.log_repo_info()
finally:
self.init_evt.set()
self.init_evt = None
async def _check_repo_status(self) -> bool:
async with self.git_operation_lock:
self.valid_git_repo = False
if self.git_folder_path.is_file():
# Submodules have a file that contain the path to
# the .git folder
eventloop = self.server.get_event_loop()
data = await eventloop.run_in_thread(self.git_folder_path.read_text)
ident, _, gitdir = data.partition(":")
if ident.strip() != "gitdir" or not gitdir.strip():
return False
self.git_folder_path = pathlib.Path(gitdir).expanduser().resolve()
if not self.git_folder_path.is_dir():
logging.info(
f"Git Repo {self.alias}: path '{self.src_path}'"
" is not a valid git repo")
return False
await self._wait_for_lock_release()
retries = 3
while retries:
self.git_messages.clear()
try:
cmd = "status --porcelain -b"
resp: Optional[str] = await self._run_git_cmd(cmd, retries=1)
except Exception:
retries -= 1
resp = None
# Attempt to recover from "loose object" error
if retries and self.repo_corrupt:
if not await self._repair_loose_objects():
# Since we are unable to recover, immediately
# return
return False
else:
break
if resp is None:
return False
self.valid_git_repo = True
await self.set_current_instance()
return True
async def _find_current_branch(self) -> None:
# Populate list of current branches
blist = await self.list_branches()
current_branch = ""
self.branches = []
for branch in blist:
branch = branch.strip()
if not branch:
continue
if branch[0] == "*":
branch = branch[2:].strip()
current_branch = branch
if branch[0] == "(":
continue
self.branches.append(branch)
if current_branch.startswith("(HEAD detached"):
self.head_detached = True
ref_name = current_branch.split()[-1][:-1]
remote_list = (await self.remote("")).splitlines()
for remote in remote_list:
remote = remote.strip()
if not remote:
continue
if ref_name.startswith(remote):
self.git_branch = ref_name[len(remote)+1:]
self.git_remote = remote
break
else:
if self.git_remote == "?":
msg = "Resolve by manually checking out a branch via SSH."
else:
prev = f"{self.git_remote}/{self.git_branch}"
msg = f"Defaulting to previously tracked {prev}."
logging.info(f"Git Repo {self.alias}: {current_branch} {msg}")
else:
self.head_detached = False
self.git_branch = current_branch
rkey = f"branch.{self.git_branch}.remote"
self.git_remote = (await self.config_get(rkey)) or "?"
async def _check_moved_origin(self) -> bool:
detected_origin = self.upstream_url.lower().strip()
if not detected_origin.endswith(".git"):
detected_origin += ".git"
if (
self.server.is_debug_enabled() or
not detected_origin.startswith("http") or
detected_origin == self.origin_url.lower()
):
# Skip the moved origin check if:
# Repo Debug is enabled
# The detected origin url is not http(s)
# The detected origin matches the expected origin url
return False
moved = False
client: HttpClient = self.server.lookup_component("http_client")
check_url = detected_origin[:-4]
logging.info(
f"Git repo {self.alias}: Performing moved origin check - "
f"{check_url}"
)
resp = await client.get(check_url, enable_cache=False)
if not resp.has_error():
final_url = resp.final_url.lower()
if not final_url.endswith(".git"):
final_url += ".git"
logging.debug(f"Git repo {self.alias}: Resolved url - {final_url}")
if final_url == self.origin_url.lower():
logging.info(
f"Git Repo {self.alias}: Moved Repo Detected, Moving "
f"from {self.upstream_url} to {self.origin_url}")
moved = True
await self.remote(
f"set-url {self.git_remote} {self.origin_url}")
self.upstream_url = self.origin_url
if self.moved_origin_url is not None:
moved_origin = self.moved_origin_url.lower().strip()
if not moved_origin.endswith(".git"):
moved_origin += ".git"
if moved_origin != detected_origin:
self.server.add_warning(
f"Git Repo {self.alias}: Origin URL does not "
"not match configured 'moved_origin'option. "
f"Expected: {detected_origin}"
)
else:
logging.debug(f"Move Request Failed: {resp.error}")
return moved
async def _get_upstream_version(self) -> GitVersion:
if self.channel == Channel.DEV:
self.upstream_commit = await self.rev_parse(
f"{self.git_remote}/{self.git_branch}"
)
upstream_ver_str = await self.describe(
f"{self.git_remote}/{self.git_branch} --always --tags --long --abbrev=8"
)
else:
tagged_commits = await self.get_tagged_commits()
upstream_commit = upstream_ver_str = "?"
for sha, tag in tagged_commits.items():
ver = GitVersion(tag)
if not ver.is_valid_version():
continue
if (
(self.channel == Channel.STABLE and ver.is_final_release()) or
(self.channel == Channel.BETA and not ver.is_alpha_release())
):
upstream_commit = sha
upstream_ver_str = tag
break
self.upstream_commit = upstream_commit
return GitVersion(upstream_ver_str)
async def _set_versions(
self, current_version: GitVersion, upstream_version: GitVersion
) -> None:
if not current_version.is_valid_version():
log_msg = (
f"Git repo {self.alias}: Failed to detect current version, got "
f"'{current_version}'. "
)
tag = upstream_version.infer_last_tag()
count = await self.rev_list("HEAD --count")
sha_part = ""
if current_version.is_fallback():
sha_part = f"-g{current_version}"
elif self.current_commit not in ("?", ""):
sha_part = f"-g{self.current_commit[:8]}"
current_version = GitVersion(f"{tag}-{count}{sha_part}-inferred")
log_msg += f"Falling back to inferred version: {current_version}"
logging.info(log_msg)
if self.channel == Channel.DEV:
if not upstream_version.is_valid_version():
log_msg = (
f"Git repo {self.alias}: Failed to detect upstream version, got "
f"'{upstream_version}'. "
)
tag = current_version.tag
if current_version.inferred:
count = await self.rev_list(f"{self.upstream_commit} --count")
else:
log_msg += "\nRemote has diverged, approximating dev count. "
count = await self.rev_list(f"{self.upstream_commit}..HEAD --count")
count = str(int(count) + current_version.dev_count)
upstream_version = GitVersion(f"{tag}-{count}-inferred")
log_msg += f"Falling back to inferred version: {upstream_version}"
logging.info(log_msg)
else:
if not upstream_version.is_valid_version():
self.upstream_commit = self.current_commit
upstream_version = current_version
elif upstream_version <= current_version:
self.upstream_commit = self.current_commit
self.current_version = current_version
self.upstream_version = upstream_version
async def wait_for_init(self) -> None:
if self.init_evt is not None:
await self.init_evt.wait()
if not self.initialized:
raise self.server.error(
f"Git Repo {self.alias}: Initialization failure")
async def check_diverged(self) -> bool:
self._verify_repo(check_remote=True)
if self.head_detached:
return False
async with self.git_operation_lock:
cmd = f"merge-base --is-ancestor HEAD {self.git_remote}/{self.git_branch}"
for _ in range(3):
try:
await self._run_git_cmd(cmd, retries=1, corrupt_msg="error: ")
except self.cmd_helper.scmd_error as err:
if err.return_code == 1:
return True
if self.repo_corrupt:
raise
else:
break
await asyncio.sleep(.5)
return False
def log_repo_info(self) -> None:
warnings = ""
if self.repo_warnings:
warnings = "\nRepo Warnings:\n"
warnings += '\n'.join([f" {warn}" for warn in self.repo_warnings])
logging.info(
f"Git Repo {self.alias} Detected:\n"
f"Owner: {self.git_owner}\n"
f"Repository Name: {self.git_repo_name}\n"
f"Path: {self.src_path}\n"
f"Remote: {self.git_remote}\n"
f"Branch: {self.git_branch}\n"
f"Remote URL: {self.upstream_url}\n"
f"Recovery URL: {self.recovery_url}\n"
f"Current Commit SHA: {self.current_commit}\n"
f"Upstream Commit SHA: {self.upstream_commit}\n"
f"Current Version: {self.current_version}\n"
f"Upstream Version: {self.upstream_version}\n"
f"Rollback Commit: {self.rollback_commit}\n"
f"Rollback Branch: {self.rollback_branch}\n"
f"Rollback Version: {self.rollback_version}\n"
f"Is Dirty: {self.current_version.dirty}\n"
f"Is Detached: {self.head_detached}\n"
f"Commits Behind: {len(self.commits_behind)}\n"
f"Diverged: {self.diverged}"
f"{warnings}"
)
def _check_warnings(self) -> None:
if self.upstream_url == "?":
self.repo_warnings.append("Failed to detect repo url")
return
self.repo_warnings.clear()
upstream_url = self.upstream_url.lower()
if upstream_url[-4:] != ".git":
upstream_url += ".git"
if upstream_url != self.origin_url.lower():
self.repo_warnings.append(f"Unofficial remote url: {self.upstream_url}")
if self.git_branch != self.primary_branch or self.git_remote != "origin":
self.repo_warnings.append(
"Repo not on offical remote/branch, expected: "
f"origin/{self.primary_branch}, detected: "
f"{self.git_remote}/{self.git_branch}")
if self.head_detached:
self.repo_warnings.append("Detached HEAD detected")
if self.diverged:
self.repo_warnings.append("Repo has diverged from remote")
if len(self.managing_instances) > 1:
instances = "\n".join([f" {ins}" for ins in self.managing_instances])
self.repo_warnings.append(
f"Multiple instances of Moonraker managing this repo:\n"
f"{instances}"
)
ro_msg = f"Git Repo {self.alias}: No warnings detected"
if self.repo_warnings:
ro_msg = f"Git Repo {self.alias} Warnings Detected:\n"
ro_msg += "\n".join(self.repo_warnings)
self.server.add_log_rollover_item(f"umgr_{self.alias}_warn", ro_msg, log=False)
def check_is_valid(self):
return not self.head_detached and not self.diverged
def _verify_repo(self, check_remote: bool = False) -> None:
if not self.valid_git_repo:
raise self.server.error(
f"Git Repo {self.alias}: repo not initialized")
if check_remote:
if self.git_remote == "?":
raise self.server.error(
f"Git Repo {self.alias}: No valid git remote detected")
async def reset(self, ref: Optional[str] = None) -> None:
async with self.git_operation_lock:
if ref is None:
if self.channel != Channel.DEV:
ref = self.upstream_commit
else:
if self.git_remote == "?" or self.git_branch == "?":
raise self.server.error("Cannot reset, unknown remote/branch")
ref = f"{self.git_remote}/{self.git_branch}"
await self._run_git_cmd(f"reset --hard {ref}", retries=2)
self.repo_corrupt = False
async def fetch(self) -> None:
self._verify_repo(check_remote=True)
async with self.git_operation_lock:
await self._run_git_cmd_async(
f"fetch {self.git_remote} --prune --progress")
async def clean(self) -> None:
self._verify_repo()
async with self.git_operation_lock:
await self._run_git_cmd("clean -d -f", retries=2)
async def pull(self) -> None:
self._verify_repo()
if self.head_detached:
raise self.server.error(
f"Git Repo {self.alias}: Cannot perform pull on a "
"detached HEAD")
cmd = "pull --progress"
if self.server.is_debug_enabled():
cmd = f"{cmd} --rebase"
if self.channel != Channel.DEV:
cmd = f"{cmd} {self.git_remote} {self.upstream_commit}"
async with self.git_operation_lock:
await self._run_git_cmd_async(cmd)
async def list_branches(self) -> List[str]:
self._verify_repo()
async with self.git_operation_lock:
resp = await self._run_git_cmd("branch --list --no-color")
return resp.strip().split("\n")
async def remote(self, command: str) -> str:
self._verify_repo(check_remote=True)
async with self.git_operation_lock:
resp = await self._run_git_cmd(
f"remote {command}")
return resp.strip()
async def describe(self, args: str = "") -> str:
self._verify_repo()
async with self.git_operation_lock:
resp = await self._run_git_cmd(f"describe {args}".strip())
return resp.strip()
async def rev_parse(self, args: str = "") -> str:
self._verify_repo()
async with self.git_operation_lock:
resp = await self._run_git_cmd(f"rev-parse {args}".strip())
return resp.strip()
async def rev_list(self, args: str = "") -> str:
self._verify_repo()
async with self.git_operation_lock:
resp = await self._run_git_cmd(f"rev-list {args}".strip())
return resp.strip()
async def config_get(
self,
key: str,
pattern: str = "",
get_all: bool = False,
local_only: bool = False
) -> Optional[str]:
local = "--local " if local_only else ""
cmd = f"{local}--get-all" if get_all else f"{local}--get"
args = f"{cmd} {key} '{pattern}'" if pattern else f"{cmd} {key}"
try:
return await self.config_cmd(args)
except self.cmd_helper.scmd_error as e:
if e.return_code == 1:
return None
raise
async def config_set(self, key: str, value: str) -> None:
await self.config_cmd(f"{key} '{value}'")
async def config_add(self, key: str, value: str) -> None:
await self.config_cmd(f"--add {key} '{value}'")
async def config_unset(
self, key: str, pattern: str = "", unset_all: bool = False
) -> None:
cmd = "--unset-all" if unset_all else "--unset"
args = f"{cmd} {key} '{pattern}'" if pattern else f"{cmd} {key}"
await self.config_cmd(args)
async def config_cmd(self, args: str) -> str:
self._verify_repo()
verbose = self.server.is_verbose_enabled()
async with self.git_operation_lock:
for attempt in range(3):
try:
return await self._run_git_cmd(
f"config {args}", retries=1, log_complete=verbose
)
except self.cmd_helper.scmd_error as e:
if 1 <= (e.return_code or 10) <= 6 or attempt == 2:
raise
raise self.server.error("Failed to run git-config")
async def checkout(self, branch: Optional[str] = None) -> None:
self._verify_repo()
reset_commit: Optional[str] = None
async with self.git_operation_lock:
if branch is None:
# No branch is specifed so we are checking out detached
if self.channel != Channel.DEV:
reset_commit = self.upstream_commit
branch = f"{self.git_remote}/{self.git_branch}"
await self._run_git_cmd(f"checkout -q {branch}")
if reset_commit is not None:
await self.reset(reset_commit)
async def run_fsck(self) -> None:
async with self.git_operation_lock:
await self._run_git_cmd("fsck --full", timeout=300., retries=1)
async def clone(self) -> None:
async with self.git_operation_lock:
if self.recovery_url == "?":
raise self.server.error(
"Recovery url has not been detected, clone aborted"
)
self.cmd_helper.notify_update_response(
f"Git Repo {self.alias}: Starting Clone Recovery...")
event_loop = self.server.get_event_loop()
if self.backup_path.exists():
await event_loop.run_in_thread(shutil.rmtree, self.backup_path)
await self._check_lock_file_exists(remove=True)
cmd = f"clone --filter=blob:none {self.recovery_url} {self.backup_path}"
try:
await self._run_git_cmd_async(cmd, 1, False, False)
except Exception as e:
self.cmd_helper.notify_update_response(
f"Git Repo {self.alias}: Git Clone Failed")
raise self.server.error("Git Clone Error") from e
if self.src_path.exists():
await event_loop.run_in_thread(shutil.rmtree, self.src_path)
await event_loop.run_in_thread(
shutil.move, str(self.backup_path), str(self.src_path))
self.repo_corrupt = False
self.cmd_helper.notify_update_response(
f"Git Repo {self.alias}: Git Clone Complete")
async def rollback(self) -> bool:
if self.rollback_commit == "?" or self.rollback_branch == "?":
raise self.server.error("Incomplete rollback data stored, cannot rollback")
if self.rollback_branch != self.git_branch:
await self.checkout(self.rollback_branch)
elif self.rollback_commit == self.current_commit:
return False
await self.reset(self.rollback_commit)
return True
def capture_state_for_rollback(self) -> Dict[str, Any]:
branch = self.git_branch
if self.head_detached:
valid = "?" not in (self.git_remote, self.git_branch)
branch = f"{self.git_remote}/{self.git_branch}" if valid else "?"
return {
"commit": self.current_commit,
"branch": branch,
"version": self.current_version
}
def set_rollback_state(self, rb_state: Optional[Dict[str, Any]]) -> None:
if rb_state is None:
rb_state = self.capture_state_for_rollback()
self.rollback_commit = rb_state["commit"]
self.rollback_branch = rb_state["branch"]
self.rollback_version = rb_state["version"]
async def get_commits_behind(self) -> List[Dict[str, Any]]:
self._verify_repo()
if self.is_current():
return []
async with self.git_operation_lock:
if self.channel != Channel.DEV:
ref = self.upstream_commit
else:
ref = f"{self.git_remote}/{self.git_branch}"
resp = await self._run_git_cmd(
f"log {self.current_commit}..{ref} "
f"--format={GIT_LOG_FMT} --max-count={GIT_MAX_LOG_CNT}")
commits_behind: List[Dict[str, Any]] = []
for log_entry in resp.split('\x1E'):
log_entry = log_entry.strip()
if not log_entry:
continue
log_items = [li.strip() for li in log_entry.split('\x1D')
if li.strip()]
cbh = [li.split(':', 1) for li in log_items]
commits_behind.append(dict(cbh)) # type: ignore
return commits_behind
async def get_tagged_commits(self, count: int = 100) -> Dict[str, str]:
self._verify_repo()
async with self.git_operation_lock:
resp = await self._run_git_cmd(
f"for-each-ref --count={count} --sort='-creatordate' "
f"--contains=HEAD --format={GIT_REF_FMT} 'refs/tags'"
)
tagged_commits: Dict[str, str] = {}
for line in resp.split('\n'):
parts = line.strip().split()
if len(parts) != 3 or parts[0] != "commit":
continue
sha, ref = parts[1:]
tag = ref.split('/')[-1]
tagged_commits[sha] = tag
# Return tagged commits as SHA keys mapped to tag values
return tagged_commits
async def set_current_instance(self) -> None:
# Check to see if multiple instances of Moonraker are configured
# to manage this repo
full_id = self._get_instance_id()
self.managing_instances.clear()
try:
instances = await self.config_get(
"moonraker.instance", get_all=True, local_only=True
)
if instances is None:
await self.config_set("moonraker.instance", full_id)
self.managing_instances = [full_id]
else:
det_instances = [
ins.strip() for ins in instances.split("\n") if ins.strip()
]
if full_id not in det_instances:
await self.config_add("moonraker.instance", full_id)
det_instances.append(full_id)
self.managing_instances = det_instances
except asyncio.CancelledError:
raise
except Exception as e:
logging.info(
f"Git Repo {self.alias}: Moonraker Instance Validation Error, {e}"
)
async def unset_current_instance(self) -> None:
full_id = self._get_instance_id()
if full_id not in self.managing_instances:
return
try:
await self.config_unset("moonraker.instance", pattern=full_id)
except asyncio.CancelledError:
raise
except Exception as e:
logging.info(f"Git repo {self.alias}: Error removing instance, {e}")
def _get_instance_id(self) -> str:
machine: Machine = self.server.lookup_component("machine")
cur_name = machine.unit_name
cur_uuid: str = self.server.get_app_args()["instance_uuid"]
return f"{cur_name}@{cur_uuid}"
def get_repo_status(self) -> Dict[str, Any]:
return {
'detected_type': "git_repo",
'remote_alias': self.git_remote,
'branch': self.git_branch,
'owner': self.git_owner,
'repo_name': self.git_repo_name,
'remote_url': self.upstream_url,
'recovery_url': self.recovery_url,
'version': self.current_version.short_version,
'remote_version': self.upstream_version.short_version,
'rollback_version': self.rollback_version.short_version,
'current_hash': self.current_commit,
'remote_hash': self.upstream_commit,
'is_dirty': self.current_version.dirty,
'detached': self.head_detached,
'commits_behind': self.commits_behind,
'git_messages': self.git_messages,
'full_version_string': self.current_version.full_version,
'pristine': not self.current_version.dirty,
'corrupt': self.repo_corrupt,
'warnings': self.repo_warnings
}
def get_version(self, upstream: bool = False) -> GitVersion:
return self.upstream_version if upstream else self.current_version
def is_detached(self) -> bool:
return self.head_detached
def is_dirty(self) -> bool:
return self.current_version.dirty
def is_current(self) -> bool:
return self.current_commit == self.upstream_commit
async def _check_lock_file_exists(self, remove: bool = False) -> bool:
lock_path = self.git_folder_path.joinpath("index.lock")
if lock_path.is_file():
if remove:
logging.info(f"Git Repo {self.alias}: Git lock file found "
"after git process exited, removing")
try:
event_loop = self.server.get_event_loop()
await event_loop.run_in_thread(os.remove, lock_path)
except Exception:
pass
return True
return False
async def _wait_for_lock_release(self, timeout: int = 60) -> None:
while timeout:
if await self._check_lock_file_exists():
if not timeout % 10:
logging.info(f"Git Repo {self.alias}: Git lock file "
f"exists, {timeout} seconds remaining "
"before removal.")
await asyncio.sleep(1.)
timeout -= 1
else:
return
await self._check_lock_file_exists(remove=True)
async def _repair_loose_objects(self, notify: bool = False) -> bool:
if notify:
self.cmd_helper.notify_update_response(
"Attempting to repair loose objects..."
)
try:
await self.cmd_helper.run_cmd_with_response(
"find .git/objects/ -type f -empty | xargs rm",
timeout=10., retries=1, cwd=str(self.src_path))
await self._run_git_cmd_async(
"fetch --all -p", retries=1, fix_loose=False)
await self._run_git_cmd("fsck --full", timeout=300., retries=1)
except Exception:
msg = (
"Attempt to repair loose objects failed, "
"hard recovery is required"
)
logging.exception(msg)
if notify:
self.cmd_helper.notify_update_response(msg)
return False
if notify:
self.cmd_helper.notify_update_response("Loose objects repaired")
self.repo_corrupt = False
return True
async def _run_git_cmd_async(self,
cmd: str,
retries: int = 5,
need_git_path: bool = True,
fix_loose: bool = True
) -> None:
# Fetch and pull require special handling. If the request
# gets delayed we do not want to terminate it while the command
# is processing.
await self._wait_for_lock_release()
event_loop = self.server.get_event_loop()
env = os.environ.copy()
env.update(GIT_ENV_VARS)
if need_git_path:
git_cmd = f"git -C {self.src_path} {cmd}"
else:
git_cmd = f"git {cmd}"
scmd = self.cmd_helper.build_shell_command(
git_cmd, callback=self._handle_process_output,
env=env)
while retries:
self.git_messages.clear()
self.fetch_input_recd = False
self.fetch_timeout_handle = event_loop.delay_callback(
GIT_ASYNC_TIMEOUT, self._check_process_active,
scmd, cmd)
try:
await scmd.run(timeout=0)
except Exception:
pass
self.fetch_timeout_handle.cancel()
ret = scmd.get_return_code()
if ret == 0:
self.git_messages.clear()
return
elif self.repo_corrupt and fix_loose:
if await self._repair_loose_objects(notify=True):
# Only attempt to repair loose objects once. Re-run
# the command once.
fix_loose = False
retries = 2
else:
# since the attept to repair failed, bypass retries
# and immediately raise an exception
raise self.server.error(
f"Unable to repair loose objects, use hard recovery")
retries -= 1
await asyncio.sleep(.5)
await self._check_lock_file_exists(remove=True)
raise self.server.error(f"Git Command '{cmd}' failed")
def _handle_process_output(self, output: bytes) -> None:
self.fetch_input_recd = True
out = output.decode().strip()
if out:
if out.startswith("fatal: "):
self.repo_corrupt = True
self.git_messages.append(out)
self.cmd_helper.notify_update_response(out)
logging.debug(
f"Git Repo {self.alias}: {out}")
async def _check_process_active(
self, scmd: ShellCommand, cmd_name: str
) -> None:
ret = scmd.get_return_code()
if ret is not None:
logging.debug(f"Git Repo {self.alias}: {cmd_name} returned")
return
if self.fetch_input_recd:
# Received some input, reschedule timeout
logging.debug(
f"Git Repo {self.alias}: {cmd_name} active, rescheduling")
event_loop = self.server.get_event_loop()
self.fetch_input_recd = False
self.fetch_timeout_handle = event_loop.delay_callback(
GIT_ASYNC_TIMEOUT, self._check_process_active,
scmd, cmd_name)
else:
# Request has timed out with no input, terminate it
logging.debug(f"Git Repo {self.alias}: {cmd_name} timed out")
# Cancel with SIGKILL
await scmd.cancel(2)
async def _run_git_cmd(
self,
git_args: str,
timeout: float = 20.,
retries: int = 5,
env: Optional[Dict[str, str]] = None,
corrupt_msg: str = "fatal: ",
log_complete: bool = True
) -> str:
try:
return await self.cmd_helper.run_cmd_with_response(
f"git -C {self.src_path} {git_args}",
timeout=timeout,
retries=retries,
env=env,
sig_idx=2,
log_complete=log_complete
)
except self.cmd_helper.scmd_error as e:
stdout = e.stdout.decode().strip()
stderr = e.stderr.decode().strip()
msg_lines: List[str] = []
if stdout:
msg_lines.extend(stdout.split("\n"))
self.git_messages.append(stdout)
if stderr:
msg_lines.extend(stdout.split("\n"))
self.git_messages.append(stderr)
for line in msg_lines:
line = line.strip().lower()
if line.startswith(corrupt_msg):
self.repo_corrupt = True
break
raise