diff --git a/moonraker/components/update_manager/update_manager.py b/moonraker/components/update_manager/update_manager.py index 13052a9..a9121fc 100644 --- a/moonraker/components/update_manager/update_manager.py +++ b/moonraker/components/update_manager/update_manager.py @@ -9,14 +9,12 @@ import asyncio import os import pathlib import logging -import json import sys import shutil import zipfile import time import tempfile import re -from tornado.httpclient import AsyncHTTPClient from thirdparty.packagekit import enums as PkEnum from .base_deploy import BaseDeploy from .app_deploy import AppDeploy @@ -37,7 +35,6 @@ from typing import ( cast ) if TYPE_CHECKING: - from tornado.httpclient import HTTPResponse from moonraker import Server from confighelper import ConfigHelper from websockets import WebRequest @@ -47,6 +44,7 @@ if TYPE_CHECKING: from components.database import NamespaceWrapper from components.dbus_manager import DbusManager from components.machine import Machine + from components.http_client import HttpClient from eventloop import FlexTimer from dbus_next import Variant from dbus_next.aio import ProxyInterface @@ -457,13 +455,14 @@ class UpdateManager: return "ok" def close(self) -> None: - self.cmd_helper.close() if self.refresh_timer is not None: self.refresh_timer.stop() class CommandHelper: def __init__(self, config: ConfigHelper) -> None: self.server = config.get_server() + self.http_client: HttpClient + self.http_client = self.server.lookup_component("http_client") self.debug_enabled = config.getboolean('enable_repo_debug', False) if self.debug_enabled: logging.warning("UPDATE MANAGER: REPO DEBUG ENABLED") @@ -471,8 +470,6 @@ class CommandHelper: self.scmd_error = shell_cmd.error self.build_shell_command = shell_cmd.build_shell_command self.pkg_updater: Optional[PackageDeploy] = None - self.http_client = AsyncHTTPClient() - self.github_request_cache: Dict[str, CachedGithubResponse] = {} # database management db: DBComp = self.server.lookup_component('database') @@ -497,6 +494,9 @@ class CommandHelper: def get_server(self) -> Server: return self.server + def get_http_client(self) -> HttpClient: + return self.http_client + def get_refresh_interval(self) -> float: return self.refresh_interval @@ -531,13 +531,6 @@ class CommandHelper: def set_package_updater(self, updater: PackageDeploy) -> None: self.pkg_updater = updater - def get_rate_limit_stats(self) -> Dict[str, Any]: - return { - 'github_rate_limit': self.gh_rate_limit, - 'github_requests_remaining': self.gh_limit_remaining, - 'github_limit_reset_time': self.gh_limit_reset_time, - } - async def run_cmd(self, cmd: str, timeout: float = 20., @@ -568,135 +561,6 @@ class CommandHelper: sig_idx=sig_idx) return result - async def github_api_request(self, - url: str, - retries: int = 5 - ) -> JsonType: - if ( - self.gh_limit_reset_time is not None and - self.gh_limit_remaining == 0 - ): - curtime = time.time() - if curtime < self.gh_limit_reset_time: - raise self.server.error( - f"GitHub Rate Limit Reached\nRequest: {url}\n" - f"Limit Reset Time: {time.ctime(self.gh_limit_remaining)}") - if url in self.github_request_cache: - cached_request = self.github_request_cache[url] - etag: Optional[str] = cached_request.get_etag() - else: - cached_request = CachedGithubResponse() - etag = None - self.github_request_cache[url] = cached_request - headers = {"Accept": "application/vnd.github.v3+json"} - if etag is not None: - headers['If-None-Match'] = etag - for i in range(retries): - error: Optional[Exception] = None - try: - fut = self.http_client.fetch( - url, headers=headers, connect_timeout=5., - request_timeout=5., raise_error=False) - resp: HTTPResponse - resp = await asyncio.wait_for(fut, 10.) - except Exception: - logging.exception( - f"Error Processing GitHub API request: {url}") - if i + 1 < retries: - await asyncio.sleep(1.) - continue - etag = resp.headers.get('etag', None) - if etag is not None: - if etag[:2] == "W/": - etag = etag[2:] - logging.info( - "GitHub API Request Processed\n" - f"URL: {url}\n" - f"Response Code: {resp.code}\n" - f"Response Reason: {resp.reason}\n" - f"ETag: {etag}") - if resp.code == 403: - error = self.server.error( - f"Forbidden GitHub Request: {resp.reason}") - elif resp.code == 304: - logging.info(f"Github Request not Modified: {url}") - return cached_request.get_cached_result() - if resp.code != 200: - logging.info( - f"Github Request failed: {resp.code} {resp.reason}") - if i + 1 < retries: - await asyncio.sleep(1.) - continue - # Update rate limit on return success - if 'X-Ratelimit-Limit' in resp.headers: - self.gh_rate_limit = int(resp.headers['X-Ratelimit-Limit']) - self.gh_limit_remaining = int( - resp.headers['X-Ratelimit-Remaining']) - self.gh_limit_reset_time = float( - resp.headers['X-Ratelimit-Reset']) - if error is not None: - raise error - decoded = json.loads(resp.body) - if etag is not None: - cached_request.update_result(etag, decoded) - return decoded - raise self.server.error( - f"Retries exceeded for GitHub API request: {url}") - - async def http_download_request(self, - url: str, - content_type: str, - timeout: float = 180., - retries: int = 5 - ) -> bytes: - for i in range(retries): - try: - fut = self.http_client.fetch( - url, headers={"Accept": content_type}, - connect_timeout=5., request_timeout=timeout) - resp: HTTPResponse - resp = await asyncio.wait_for(fut, timeout + 10.) - except Exception: - logging.exception("Error Processing Download") - if i + 1 == retries: - raise - await asyncio.sleep(1.) - continue - return resp.body - raise self.server.error( - f"Retries exceeded for GitHub API request: {url}") - - async def streaming_download_request(self, - url: str, - dest: Union[str, pathlib.Path], - content_type: str, - size: int, - timeout: float = 180., - retries: int = 5 - ) -> None: - if isinstance(dest, str): - dest = pathlib.Path(dest) - for i in range(retries): - dl = StreamingDownload(self, dest, size) - try: - fut = self.http_client.fetch( - url, headers={"Accept": content_type}, - connect_timeout=5., request_timeout=timeout, - streaming_callback=dl.on_chunk_recd) - resp: HTTPResponse - resp = await asyncio.wait_for(fut, timeout + 10.) - except Exception: - logging.exception("Error Processing Download") - if i + 1 == retries: - raise - await asyncio.sleep(1.) - continue - finally: - await dl.close() - if resp.code < 400: - return - raise self.server.error(f"Retries exceeded for request: {url}") - def notify_update_response(self, resp: Union[str, bytes], is_complete: bool = False @@ -723,66 +587,20 @@ class CommandHelper: return await self.pkg_updater.install_packages(package_list, **kwargs) - def close(self) -> None: - self.http_client.close() + def get_rate_limit_stats(self): + return self.http_client.github_api_stats() -class CachedGithubResponse: - def __init__(self) -> None: - self.etag: Optional[str] = None - self.cached_result: JsonType = {} - - def get_etag(self) -> Optional[str]: - return self.etag - - def get_cached_result(self) -> JsonType: - return self.cached_result - - def update_result(self, etag: str, result: JsonType) -> None: - self.etag = etag - self.cached_result = result - -class StreamingDownload: - def __init__(self, - cmd_helper: CommandHelper, - dest: pathlib.Path, - download_size: int) -> None: - self.cmd_helper = cmd_helper - self.event_loop = cmd_helper.get_server().get_event_loop() - self.name = dest.name - self.file_hdl = dest.open('wb') - self.download_size = download_size - self.total_recd: int = 0 - self.last_pct: int = 0 - self.chunk_buffer: List[bytes] = [] - self.busy_evt: asyncio.Event = asyncio.Event() - self.busy_evt.set() - - def on_chunk_recd(self, chunk: bytes) -> None: - if not chunk: - return - self.chunk_buffer.append(chunk) - if not self.busy_evt.is_set(): - return - self.busy_evt.clear() - self.event_loop.register_callback(self._process_buffer) - - async def close(self): - await self.busy_evt.wait() - self.file_hdl.close() - - async def _process_buffer(self): - while self.chunk_buffer: - chunk = self.chunk_buffer.pop(0) - self.total_recd += len(chunk) - pct = int(self.total_recd / self.download_size * 100 + .5) - await self.event_loop.run_in_thread(self.file_hdl.write, chunk) - if pct >= self.last_pct + 5: - self.last_pct = pct - totals = f"{self.total_recd // 1024} KiB / " \ - f"{self.download_size // 1024} KiB" - self.cmd_helper.notify_update_response( - f"Downloading {self.name}: {totals} [{pct}%]") - self.busy_evt.set() + def on_download_progress(self, + progress: int, + download_size: int, + downloaded: int + ) -> None: + totals = ( + f"{downloaded // 1024} KiB / " + f"{download_size// 1024} KiB" + ) + self.notify_update_response( + f"Downloading {self.cur_update_app}: {totals} [{progress}%]") class PackageDeploy(BaseDeploy): def __init__(self, @@ -1334,22 +1152,32 @@ class WebClientDeploy(BaseDeploy): async def _get_remote_version(self) -> None: # Remote state - url = f"https://api.github.com/repos/{self.repo}/releases" - try: - releases = await self.cmd_helper.github_api_request(url) - assert isinstance(releases, list) - except Exception: - logging.exception(f"Client {self.repo}: Github Request Error") - releases = [] - result: Dict[str, Any] = {} - for release in releases: - if self.channel == "stable": - if not release['prerelease']: - result = release - break + if self.channel == "stable": + resource = f"repos/{self.repo}/releases/latest" + else: + resource = f"repos/{self.repo}/releases?per_page=1" + client = self.cmd_helper.get_http_client() + resp = await client.github_api_request(resource, attempts=3) + release: Union[List[Any], Dict[str, Any]] = {} + if resp.status_code == 304: + if self.remote_version == "?" and resp.content: + # Not modified, however we need to restore state from + # cached content + release = resp.json() else: - result = release - break + # Either not necessary or not possible to restore from cache + return + elif resp.has_error(): + logging.info( + f"Client {self.repo}: Github Request Error - {resp.error}") + else: + release = resp.json() + result: Dict[str, Any] = {} + if isinstance(release, list): + if release: + result = release[0] + else: + result = release self.remote_version = result.get('name', "?") release_asset: Dict[str, Any] = result.get('assets', [{}])[0] dl_url: str = release_asset.get('browser_download_url', "?") @@ -1395,8 +1223,10 @@ class WebClientDeploy(BaseDeploy): tempdir = pathlib.Path(tempdirname) temp_download_file = tempdir.joinpath(f"{self.name}.zip") temp_persist_dir = tempdir.joinpath(self.name) - await self.cmd_helper.streaming_download_request( - dl_url, temp_download_file, content_type, size) + client = self.cmd_helper.get_http_client() + await client.download_file( + dl_url, content_type, temp_download_file, size, + self.cmd_helper.on_download_progress) self.cmd_helper.notify_update_response( f"Download Complete, extracting release to '{self.path}'") await event_loop.run_in_thread( diff --git a/moonraker/components/update_manager/zip_deploy.py b/moonraker/components/update_manager/zip_deploy.py index 6e90559..227e2e4 100644 --- a/moonraker/components/update_manager/zip_deploy.py +++ b/moonraker/components/update_manager/zip_deploy.py @@ -185,9 +185,9 @@ class ZipDeploy(AppDeploy): f"Host repo mismatch, received: {host_repo}, " f"expected: {self.host_repo}. This could result in " " a failed update.") - url = f"https://api.github.com/repos/{self.host_repo}/releases" + resource = f"repos/{self.host_repo}/releases" current_release, latest_release = await self._fetch_github_releases( - url, release_tag) + resource, release_tag) await self._validate_current_release(release_info, current_release) if not self.errors: self.verified = True @@ -196,11 +196,14 @@ class ZipDeploy(AppDeploy): self._log_zipapp_info() async def _fetch_github_releases(self, - release_url: str, + resource: str, current_tag: Optional[str] = None ) -> Tuple[Dict[str, Any], Dict[str, Any]]: try: - releases = await self.cmd_helper.github_api_request(release_url) + client = self.cmd_helper.get_http_client() + resp = await client.github_api_request(resource, attempts=3) + resp.raise_for_status() + releases = resp.json() assert isinstance(releases, list) except Exception: self.log_exc("Error fetching releases from GitHub") @@ -237,8 +240,8 @@ class ZipDeploy(AppDeploy): self._add_error( "RELEASE_INFO not found in current release assets") info_url, content_type, size = asset_info['RELEASE_INFO'] - rinfo_bytes = await self.cmd_helper.http_download_request( - info_url, content_type) + client = self.cmd_helper.get_http_client() + rinfo_bytes = await client.get_file(info_url, content_type) github_rinfo: Dict[str, Any] = json.loads(rinfo_bytes) if github_rinfo.get(self.name, {}) != release_info: self._add_error( @@ -255,8 +258,8 @@ class ZipDeploy(AppDeploy): asset_info = self._get_asset_urls(release, asset_names) if "RELEASE_INFO" in asset_info: asset_url, content_type, size = asset_info['RELEASE_INFO'] - rinfo_bytes = await self.cmd_helper.http_download_request( - asset_url, content_type) + client = self.cmd_helper.get_http_client() + rinfo_bytes = await client.get_file(asset_url, content_type) update_release_info: Dict[str, Any] = json.loads(rinfo_bytes) update_info = update_release_info.get(self.name, {}) self.lastest_hash = update_info.get('commit_hash', "?") @@ -272,8 +275,8 @@ class ZipDeploy(AppDeploy): # Only report commit log if versions change if "COMMIT_LOG" in asset_info: asset_url, content_type, size = asset_info['COMMIT_LOG'] - commit_bytes = await self.cmd_helper.http_download_request( - asset_url, content_type) + client = self.cmd_helper.get_http_client() + commit_bytes = await client.get_file(asset_url, content_type) commit_info: Dict[str, Any] = json.loads(commit_bytes) self.commit_log = commit_info.get(self.name, []) if zip_file_name in asset_info: @@ -379,8 +382,10 @@ class ZipDeploy(AppDeploy): suffix=self.name, prefix="app") as tempdirname: tempdir = pathlib.Path(tempdirname) temp_download_file = tempdir.joinpath(f"{self.name}.zip") - await self.cmd_helper.streaming_download_request( - dl_url, temp_download_file, content_type, size) + client = self.cmd_helper.get_http_client() + await client.download_file( + dl_url, content_type, temp_download_file, size, + self.cmd_helper.on_download_progress) self.notify_status( f"Download Complete, extracting release to '{self.path}'") event_loop = self.server.get_event_loop() @@ -396,8 +401,8 @@ class ZipDeploy(AppDeploy): hard: bool = False, force_dep_update: bool = False ) -> None: - url = f"https://api.github.com/repos/{self.host_repo}/releases" - releases = await self._fetch_github_releases(url) + res = f"repos/{self.host_repo}/releases" + releases = await self._fetch_github_releases(res) await self._process_latest_release(releases[1]) await self.update(force_dep_update=force_dep_update)