# Git Deployment implementation # # Copyright (C) 2021 Eric Callahan # # This file may be distributed under the terms of the GNU GPLv3 license. from __future__ import annotations import asyncio import os import pathlib import shutil import re import logging from .app_deploy import AppDeploy # Annotation imports from typing import ( TYPE_CHECKING, Any, Tuple, Optional, Dict, List, ) if TYPE_CHECKING: from confighelper import ConfigHelper from components import shell_command from .update_manager import CommandHelper class GitDeploy(AppDeploy): def __init__(self, config: ConfigHelper, cmd_helper: CommandHelper) -> None: super().__init__(config, cmd_helper) self.repo = GitRepo( cmd_helper, self.path, self.name, self.origin, self.moved_origin, self.channel ) if self.type != 'git_repo': self.need_channel_update = True @staticmethod async def from_application(app: AppDeploy) -> GitDeploy: new_app = GitDeploy(app.config, app.cmd_helper) await new_app.reinstall() return new_app async def initialize(self) -> Dict[str, Any]: storage = await super().initialize() self.repo.restore_state(storage) return storage async def refresh(self) -> None: try: await self._update_repo_state() except Exception: logging.exception("Error Refreshing git state") async def _update_repo_state(self, need_fetch: bool = True) -> None: self._is_valid = False await self.repo.initialize(need_fetch=need_fetch) self.log_info( f"Channel: {self.channel}, " f"Need Channel Update: {self.need_channel_update}" ) invalids = self.repo.report_invalids(self.primary_branch) if invalids: msgs = '\n'.join(invalids) self.log_info( f"Repo validation checks failed:\n{msgs}") if self.debug: self._is_valid = True self.log_info( "Repo debug enabled, overriding validity checks") else: self.log_info("Updates on repo disabled") else: self._is_valid = True self.log_info("Validity check for git repo passed") self._save_state() async def update(self) -> bool: await self.repo.wait_for_init() if not self._is_valid: raise self.log_exc("Update aborted, repo not valid", False) if self.repo.is_dirty(): raise self.log_exc( "Update aborted, repo has been modified", False) if self.repo.is_current(): # No need to update return False self.cmd_helper.notify_update_response( f"Updating Application {self.name}...") inst_hash = await self._get_file_hash(self.install_script) pyreqs_hash = await self._get_file_hash(self.python_reqs) npm_hash = await self._get_file_hash(self.npm_pkg_json) await self._pull_repo() # Check Semantic Versions await self._update_dependencies(inst_hash, pyreqs_hash, npm_hash) # Refresh local repo state await self._update_repo_state(need_fetch=False) await self.restart_service() self.notify_status("Update Finished...", is_complete=True) return True async def recover(self, hard: bool = False, force_dep_update: bool = False ) -> None: self.notify_status("Attempting Repo Recovery...") inst_hash = await self._get_file_hash(self.install_script) pyreqs_hash = await self._get_file_hash(self.python_reqs) npm_hash = await self._get_file_hash(self.npm_pkg_json) if hard: await self.repo.clone() await self._update_repo_state() else: self.notify_status("Resetting Git Repo...") await self.repo.reset() await self._update_repo_state() if self.repo.is_dirty() or not self._is_valid: raise self.server.error( "Recovery attempt failed, repo state not pristine", 500) await self._update_dependencies(inst_hash, pyreqs_hash, npm_hash, force=force_dep_update) await self.restart_service() self.notify_status("Reinstall Complete", is_complete=True) async def reinstall(self): # Clear the persistent storage prior to a channel swap. # After the next update is complete new data will be # restored. umdb = self.cmd_helper.get_umdb() await umdb.pop(self.name, None) await self.initialize() await self.recover(True, True) def get_update_status(self) -> Dict[str, Any]: status = super().get_update_status() status.update(self.repo.get_repo_status()) return status def get_persistent_data(self) -> Dict[str, Any]: storage = super().get_persistent_data() storage.update(self.repo.get_persisent_data()) return storage async def _pull_repo(self) -> None: self.notify_status("Updating Repo...") try: if self.repo.is_detached(): await self.repo.fetch() await self.repo.checkout() else: await self.repo.pull() except Exception: raise self.log_exc("Error running 'git pull'") async def _update_dependencies(self, inst_hash: Optional[str], pyreqs_hash: Optional[str], npm_hash: Optional[str], force: bool = False ) -> None: ret = await self._check_need_update(inst_hash, self.install_script) if force or ret: package_list = await self._parse_install_script() if package_list is not None: await self._install_packages(package_list) ret = await self._check_need_update(pyreqs_hash, self.python_reqs) if force or ret: if self.python_reqs is not None: await self._update_virtualenv(self.python_reqs) ret = await self._check_need_update(npm_hash, self.npm_pkg_json) if force or ret: if self.npm_pkg_json is not None: self.notify_status("Updating Node Packages...") try: await self.cmd_helper.run_cmd( "npm ci --only=prod", notify=True, timeout=600., cwd=str(self.path)) except Exception: self.notify_status("Node Package Update failed") async def _parse_install_script(self) -> Optional[List[str]]: if self.install_script is None: return None # Open install file file and read inst_path: pathlib.Path = self.install_script if not inst_path.is_file(): self.log_info(f"Unable to open install script: {inst_path}") return None event_loop = self.server.get_event_loop() data = await event_loop.run_in_thread(inst_path.read_text) packages: List[str] = re.findall(r'PKGLIST="(.*)"', data) packages = [p.lstrip("${PKGLIST}").strip() for p in packages] if not packages: self.log_info(f"No packages found in script: {inst_path}") return None logging.debug(f"Repo {self.name}: Detected Packages: {repr(packages)}") return packages GIT_ASYNC_TIMEOUT = 300. GIT_ENV_VARS = { 'GIT_HTTP_LOW_SPEED_LIMIT': "1000", 'GIT_HTTP_LOW_SPEED_TIME ': "20" } GIT_MAX_LOG_CNT = 100 GIT_LOG_FMT = ( "\"sha:%H%x1Dauthor:%an%x1Ddate:%ct%x1Dsubject:%s%x1Dmessage:%b%x1E\"" ) GIT_OBJ_ERR = "fatal: loose object" GIT_REF_FMT = ( "'%(if)%(*objecttype)%(then)%(*objecttype) (*objectname)" "%(else)%(objecttype) %(objectname)%(end) %(refname)'" ) class GitRepo: tag_r = re.compile(r"(v?\d+\.\d+\.\d+(-(alpha|beta)(\.\d+)?)?)(-\d+)?") def __init__(self, cmd_helper: CommandHelper, git_path: pathlib.Path, alias: str, origin_url: str, moved_origin_url: Optional[str], channel: str ) -> None: self.server = cmd_helper.get_server() self.cmd_helper = cmd_helper self.alias = alias self.git_path = git_path git_dir = git_path.parent git_base = git_path.name self.backup_path = git_dir.joinpath(f".{git_base}_repo_backup") self.origin_url = origin_url self.moved_origin_url = moved_origin_url self.recovery_message = \ f""" Manually restore via SSH with the following commands: sudo service {self.alias} stop cd {git_dir} rm -rf {git_base} git clone {self.origin_url} sudo service {self.alias} start """ self.init_evt: Optional[asyncio.Event] = None self.initialized: bool = False self.git_operation_lock = asyncio.Lock() self.fetch_timeout_handle: Optional[asyncio.Handle] = None self.fetch_input_recd: bool = False self.is_beta = channel == "beta" def restore_state(self, storage: Dict[str, Any]) -> None: self.valid_git_repo: bool = storage.get('repo_valid', False) self.git_owner: str = storage.get('git_owner', "?") self.git_repo_name: str = storage.get('git_repo_name', "?") self.git_remote: str = storage.get('git_remote', "?") self.git_branch: str = storage.get('git_branch', "?") self.current_version: str = storage.get('current_version', "?") self.upstream_version: str = storage.get('upstream_version', "?") self.current_commit: str = storage.get('current_commit', "?") self.upstream_commit: str = storage.get('upstream_commit', "?") self.upstream_url: str = storage.get('upstream_url', "?") self.full_version_string: str = storage.get('full_version_string', "?") self.branches: List[str] = storage.get('branches', []) self.dirty: bool = storage.get('dirty', False) self.head_detached: bool = storage.get('head_detached', False) self.git_messages: List[str] = storage.get('git_messages', []) self.commits_behind: List[Dict[str, Any]] = storage.get( 'commits_behind', []) def get_persisent_data(self) -> Dict[str, Any]: return { 'repo_valid': self.valid_git_repo, 'git_owner': self.git_owner, 'git_repo_name': self.git_repo_name, 'git_remote': self.git_remote, 'git_branch': self.git_branch, 'current_version': self.current_version, 'upstream_version': self.upstream_version, 'current_commit': self.current_commit, 'upstream_commit': self.upstream_commit, 'upstream_url': self.upstream_url, 'full_version_string': self.full_version_string, 'branches': self.branches, 'dirty': self.dirty, 'head_detached': self.head_detached, 'git_messages': self.git_messages, 'commits_behind': self.commits_behind } async def initialize(self, need_fetch: bool = True) -> None: if self.init_evt is not None: # No need to initialize multiple requests await self.init_evt.wait() if self.initialized: return self.initialized = False self.init_evt = asyncio.Event() self.git_messages.clear() try: await self.update_repo_status() self._verify_repo() if not self.head_detached: # lookup remote via git config self.git_remote = await self.get_config_item( f"branch.{self.git_branch}.remote") # Fetch the upstream url. If the repo has been moved, # set the new url self.upstream_url = await self.remote(f"get-url {self.git_remote}") if self.moved_origin_url is not None: origin = self.upstream_url.lower().strip() if not origin.endswith(".git"): origin += ".git" moved_origin = self.moved_origin_url.lower().strip() if not moved_origin.endswith(".git"): moved_origin += ".git" if origin == moved_origin: logging.info( f"Git Repo {self.alias}: Moved Repo Detected, Moving " f"from {self.upstream_url} to {self.origin_url}") need_fetch = True await self.remote( f"set-url {self.git_remote} {self.origin_url}") self.upstream_url = self.origin_url if need_fetch: await self.fetch() # Populate list of current branches blist = await self.list_branches() self.branches = [] for branch in blist: branch = branch.strip() if branch[0] == "*": branch = branch[2:] if branch[0] == "(": continue self.branches.append(branch) # Parse GitHub Owner from URL owner_match = re.match(r"https?://[^/]+/([^/]+)", self.upstream_url) self.git_owner = "?" if owner_match is not None: self.git_owner = owner_match.group(1) # Parse GitHub Repository Name from URL repo_match = re.match(r".*\/([^\.]*).*", self.upstream_url) self.git_repo_name = "?" if repo_match is not None: self.git_repo_name = repo_match.group(1) self.current_commit = await self.rev_parse("HEAD") git_desc = await self.describe( "--always --tags --long --dirty") self.full_version_string = git_desc.strip() self.dirty = git_desc.endswith("dirty") if self.is_beta: await self._get_beta_versions(git_desc) else: await self._get_dev_versions(git_desc) # Get Commits Behind self.commits_behind = [] cbh = await self.get_commits_behind() if cbh: tagged_commits = await self.get_tagged_commits() debug_msg = '\n'.join([f"{k}: {v}" for k, v in tagged_commits.items()]) logging.debug(f"Git Repo {self.alias}: Tagged Commits\n" f"{debug_msg}") for i, commit in enumerate(cbh): tag = tagged_commits.get(commit['sha'], None) if i < 30 or tag is not None: commit['tag'] = tag self.commits_behind.append(commit) self.log_repo_info() except Exception: logging.exception(f"Git Repo {self.alias}: Initialization failure") raise else: self.initialized = True finally: self.init_evt.set() self.init_evt = None async def _get_dev_versions(self, current_version: str) -> None: self.upstream_commit = await self.rev_parse( f"{self.git_remote}/{self.git_branch}") upstream_version = await self.describe( f"{self.git_remote}/{self.git_branch} " "--always --tags --long") # Get the latest tag as a fallback for shallow clones commit, tag = await self._parse_latest_tag() # Parse Version Info versions: List[str] = [] for ver in [current_version, upstream_version]: tag_version = "?" ver_match = self.tag_r.match(ver) if ver_match: tag_version = ver_match.group() elif tag != "?": if len(versions) == 0: count = await self.rev_list(f"{tag}..HEAD --count") full_ver = f"{tag}-{count}-g{ver}-shallow" self.full_version_string = full_ver else: count = await self.rev_list( f"{tag}..{self.upstream_commit} --count") tag_version = f"{tag}-{count}" versions.append(tag_version) self.current_version, self.upstream_version = versions async def _get_beta_versions(self, current_version: str) -> None: upstream_commit, upstream_tag = await self._parse_latest_tag() ver_match = self.tag_r.match(current_version) if ver_match: current_version = ver_match.group(1) elif upstream_tag != "?": count = await self.rev_list(f"{upstream_tag}..HEAD --count") full_ver = f"{upstream_tag}-{count}-g{current_version}-shallow" self.full_version_string = full_ver current_version = upstream_tag self.upstream_commit = upstream_commit if current_version == upstream_tag: self.upstream_commit = self.current_commit self.current_version = current_version self.upstream_version = upstream_tag async def _parse_latest_tag(self) -> Tuple[str, str]: commit = tag = "?" try: commit = await self.rev_list("--tags --max-count=1") tag = await self.describe(f"--tags {commit}") except Exception: pass else: tag_match = self.tag_r.match(tag) if tag_match is not None: tag = tag_match.group(1) else: tag = "?" return commit, tag async def wait_for_init(self) -> None: if self.init_evt is not None: await self.init_evt.wait() if not self.initialized: raise self.server.error( f"Git Repo {self.alias}: Initialization failure") async def update_repo_status(self) -> bool: async with self.git_operation_lock: if not self.git_path.joinpath(".git").is_dir(): logging.info( f"Git Repo {self.alias}: path '{self.git_path}'" " is not a valid git repo") return False await self._wait_for_lock_release() self.valid_git_repo = False retries = 3 while retries: self.git_messages.clear() try: resp: Optional[str] = await self._run_git_cmd( "status -u no", retries=1) except Exception: retries -= 1 resp = None # Attempt to recover from "loose object" error if retries and GIT_OBJ_ERR in "\n".join(self.git_messages): ret = await self._repair_loose_objects() if not ret: # Since we are unable to recover, immediately # return return False else: break if resp is None: return False resp = resp.strip().split('\n', 1)[0] self.head_detached = resp.startswith("HEAD detached") branch_info = resp.split()[-1] if self.head_detached: bparts = branch_info.split("/", 1) if len(bparts) == 2: self.git_remote, self.git_branch = bparts else: if self.git_remote == "?": msg = "Resolve by manually checking out" \ " a branch via SSH." else: msg = "Defaulting to previously tracked " \ f"{self.git_remote}/{self.git_branch}." logging.info( f"Git Repo {self.alias}: HEAD detached on untracked " f"commit {branch_info}. {msg}") else: self.git_branch = branch_info self.valid_git_repo = True return True def log_repo_info(self) -> None: logging.info( f"Git Repo {self.alias} Detected:\n" f"Owner: {self.git_owner}\n" f"Repository Name: {self.git_repo_name}\n" f"Path: {self.git_path}\n" f"Remote: {self.git_remote}\n" f"Branch: {self.git_branch}\n" f"Remote URL: {self.upstream_url}\n" f"Current Commit SHA: {self.current_commit}\n" f"Upstream Commit SHA: {self.upstream_commit}\n" f"Current Version: {self.current_version}\n" f"Upstream Version: {self.upstream_version}\n" f"Is Dirty: {self.dirty}\n" f"Is Detached: {self.head_detached}\n" f"Commits Behind: {len(self.commits_behind)}") def report_invalids(self, primary_branch: str) -> List[str]: invalids: List[str] = [] upstream_url = self.upstream_url.lower() if upstream_url[-4:] != ".git": upstream_url += ".git" if upstream_url != self.origin_url.lower(): invalids.append(f"Unofficial remote url: {self.upstream_url}") if self.git_branch != primary_branch or self.git_remote != "origin": invalids.append( "Repo not on valid remote branch, expected: " f"origin/{primary_branch}, detected: " f"{self.git_remote}/{self.git_branch}") if self.head_detached: invalids.append("Detached HEAD detected") return invalids def _verify_repo(self, check_remote: bool = False) -> None: if not self.valid_git_repo: raise self.server.error( f"Git Repo {self.alias}: repo not initialized") if check_remote: if self.git_remote == "?": raise self.server.error( f"Git Repo {self.alias}: No valid git remote detected") async def reset(self) -> None: if self.git_remote == "?" or self.git_branch == "?": raise self.server.error("Cannot reset, unknown remote/branch") async with self.git_operation_lock: await self._run_git_cmd("clean -d -f", retries=2) await self._run_git_cmd( f"reset --hard {self.git_remote}/{self.git_branch}", retries=2) async def fetch(self) -> None: self._verify_repo(check_remote=True) async with self.git_operation_lock: await self._run_git_cmd_async( f"fetch {self.git_remote} --prune --progress") async def pull(self) -> None: self._verify_repo() if self.head_detached: raise self.server.error( f"Git Repo {self.alias}: Cannot perform pull on a " "detached HEAD") cmd = "pull --progress" if self.cmd_helper.is_debug_enabled(): cmd = f"{cmd} --rebase" if self.is_beta: cmd = f"{cmd} {self.git_remote} {self.upstream_commit}" async with self.git_operation_lock: await self._run_git_cmd_async(cmd) async def list_branches(self) -> List[str]: self._verify_repo() async with self.git_operation_lock: resp = await self._run_git_cmd("branch --list") return resp.strip().split("\n") async def remote(self, command: str) -> str: self._verify_repo(check_remote=True) async with self.git_operation_lock: resp = await self._run_git_cmd( f"remote {command}") return resp.strip() async def describe(self, args: str = "") -> str: self._verify_repo() async with self.git_operation_lock: resp = await self._run_git_cmd(f"describe {args}".strip()) return resp.strip() async def rev_parse(self, args: str = "") -> str: self._verify_repo() async with self.git_operation_lock: resp = await self._run_git_cmd(f"rev-parse {args}".strip()) return resp.strip() async def rev_list(self, args: str = "") -> str: self._verify_repo() async with self.git_operation_lock: resp = await self._run_git_cmd(f"rev-list {args}".strip()) return resp.strip() async def get_config_item(self, item: str) -> str: self._verify_repo() async with self.git_operation_lock: resp = await self._run_git_cmd(f"config --get {item}") return resp.strip() async def checkout(self, branch: Optional[str] = None) -> None: self._verify_repo() async with self.git_operation_lock: if branch is None: if self.is_beta: branch = self.upstream_commit else: branch = f"{self.git_remote}/{self.git_branch}" await self._run_git_cmd(f"checkout -q {branch}") async def run_fsck(self) -> None: async with self.git_operation_lock: await self._run_git_cmd("fsck --full", timeout=300., retries=1) async def clone(self) -> None: async with self.git_operation_lock: self.cmd_helper.notify_update_response( f"Git Repo {self.alias}: Starting Clone Recovery...") event_loop = self.server.get_event_loop() if self.backup_path.exists(): await event_loop.run_in_thread(shutil.rmtree, self.backup_path) await self._check_lock_file_exists(remove=True) git_cmd = f"clone {self.origin_url} {self.backup_path}" try: await self._run_git_cmd_async(git_cmd, 1, False, False) except Exception as e: self.cmd_helper.notify_update_response( f"Git Repo {self.alias}: Git Clone Failed") raise self.server.error("Git Clone Error") from e if self.git_path.exists(): await event_loop.run_in_thread(shutil.rmtree, self.git_path) await event_loop.run_in_thread( shutil.move, str(self.backup_path), str(self.git_path)) self.cmd_helper.notify_update_response( f"Git Repo {self.alias}: Git Clone Complete") async def get_commits_behind(self) -> List[Dict[str, Any]]: self._verify_repo() if self.is_current(): return [] async with self.git_operation_lock: if self.is_beta: ref = self.upstream_commit else: ref = f"{self.git_remote}/{self.git_branch}" resp = await self._run_git_cmd( f"log {self.current_commit}..{ref} " f"--format={GIT_LOG_FMT} --max-count={GIT_MAX_LOG_CNT}") commits_behind: List[Dict[str, Any]] = [] for log_entry in resp.split('\x1E'): log_entry = log_entry.strip() if not log_entry: continue log_items = [li.strip() for li in log_entry.split('\x1D') if li.strip()] cbh = [li.split(':', 1) for li in log_items] commits_behind.append(dict(cbh)) # type: ignore return commits_behind async def get_tagged_commits(self) -> Dict[str, Any]: self._verify_repo() async with self.git_operation_lock: resp = await self._run_git_cmd( "for-each-ref --count=10 --sort='-creatordate' " f"--format={GIT_REF_FMT} 'refs/tags'") tagged_commits: Dict[str, Any] = {} for line in resp.split('\n'): parts = line.strip().split() if len(parts) != 3 or parts[0] != "commit": continue sha, ref = parts[1:] tag = ref.split('/')[-1] tagged_commits[sha] = tag # Return tagged commits as SHA keys mapped to tag values return tagged_commits def get_repo_status(self) -> Dict[str, Any]: return { 'detected_type': "git_repo", 'remote_alias': self.git_remote, 'branch': self.git_branch, 'owner': self.git_owner, 'repo_name': self.git_repo_name, 'version': self.current_version, 'remote_version': self.upstream_version, 'current_hash': self.current_commit, 'remote_hash': self.upstream_commit, 'is_dirty': self.dirty, 'detached': self.head_detached, 'commits_behind': self.commits_behind, 'git_messages': self.git_messages, 'full_version_string': self.full_version_string, 'pristine': not self.dirty } def get_version(self, upstream: bool = False) -> Tuple[Any, ...]: version = self.upstream_version if upstream else self.current_version return tuple(re.findall(r"\d+", version)) def is_detached(self) -> bool: return self.head_detached def is_dirty(self) -> bool: return self.dirty def is_current(self) -> bool: return self.current_commit == self.upstream_commit async def _check_lock_file_exists(self, remove: bool = False) -> bool: lock_path = self.git_path.joinpath(".git/index.lock") if lock_path.is_file(): if remove: logging.info(f"Git Repo {self.alias}: Git lock file found " "after git process exited, removing") try: event_loop = self.server.get_event_loop() await event_loop.run_in_thread(os.remove, lock_path) except Exception: pass return True return False async def _wait_for_lock_release(self, timeout: int = 60) -> None: while timeout: if await self._check_lock_file_exists(): if not timeout % 10: logging.info(f"Git Repo {self.alias}: Git lock file " f"exists, {timeout} seconds remaining " "before removal.") await asyncio.sleep(1.) timeout -= 1 else: return await self._check_lock_file_exists(remove=True) async def _repair_loose_objects(self) -> bool: try: await self.cmd_helper.run_cmd_with_response( "find .git/objects/ -type f -empty | xargs rm", timeout=10., retries=1, cwd=str(self.git_path)) await self._run_git_cmd_async( "fetch --all -p", retries=1, fix_loose=False) await self._run_git_cmd("fsck --full", timeout=300., retries=1) except Exception: logging.exception("Attempt to repair loose objects failed") return False return True async def _run_git_cmd_async(self, cmd: str, retries: int = 5, need_git_path: bool = True, fix_loose: bool = True ) -> None: # Fetch and pull require special handling. If the request # gets delayed we do not want to terminate it while the command # is processing. await self._wait_for_lock_release() event_loop = self.server.get_event_loop() env = os.environ.copy() env.update(GIT_ENV_VARS) if need_git_path: git_cmd = f"git -C {self.git_path} {cmd}" else: git_cmd = f"git {cmd}" scmd = self.cmd_helper.build_shell_command( git_cmd, callback=self._handle_process_output, env=env) while retries: self.git_messages.clear() self.fetch_input_recd = False self.fetch_timeout_handle = event_loop.delay_callback( GIT_ASYNC_TIMEOUT, self._check_process_active, scmd, cmd) try: await scmd.run(timeout=0) except Exception: pass self.fetch_timeout_handle.cancel() ret = scmd.get_return_code() if ret == 0: self.git_messages.clear() return elif fix_loose: if GIT_OBJ_ERR in "\n".join(self.git_messages): ret = await self._repair_loose_objects() if ret: break # since the attept to repair failed, bypass retries # and immediately raise an exception raise self.server.error( f"Unable to repair loose objects, use hard recovery") retries -= 1 await asyncio.sleep(.5) await self._check_lock_file_exists(remove=True) raise self.server.error(f"Git Command '{cmd}' failed") def _handle_process_output(self, output: bytes) -> None: self.fetch_input_recd = True out = output.decode().strip() if out: self.git_messages.append(out) self.cmd_helper.notify_update_response(out) logging.debug( f"Git Repo {self.alias}: {out}") async def _check_process_active(self, scmd: shell_command.ShellCommand, cmd_name: str ) -> None: ret = scmd.get_return_code() if ret is not None: logging.debug(f"Git Repo {self.alias}: {cmd_name} returned") return if self.fetch_input_recd: # Received some input, reschedule timeout logging.debug( f"Git Repo {self.alias}: {cmd_name} active, rescheduling") event_loop = self.server.get_event_loop() self.fetch_input_recd = False self.fetch_timeout_handle = event_loop.delay_callback( GIT_ASYNC_TIMEOUT, self._check_process_active, scmd, cmd_name) else: # Request has timed out with no input, terminate it logging.debug(f"Git Repo {self.alias}: {cmd_name} timed out") # Cancel with SIGKILL await scmd.cancel(2) async def _run_git_cmd(self, git_args: str, timeout: float = 20., retries: int = 5, env: Optional[Dict[str, str]] = None ) -> str: try: return await self.cmd_helper.run_cmd_with_response( f"git -C {self.git_path} {git_args}", timeout=timeout, retries=retries, env=env, sig_idx=2) except self.cmd_helper.scmd_error as e: stdout = e.stdout.decode().strip() stderr = e.stderr.decode().strip() if stdout: self.git_messages.append(stdout) if stderr: self.git_messages.append(stderr) raise