diff --git a/moonraker/components/database.py b/moonraker/components/database.py index 1872da8..ee1b898 100644 --- a/moonraker/components/database.py +++ b/moonraker/components/database.py @@ -158,6 +158,9 @@ class MoonrakerDatabase: "/server/database/item", ["GET", "POST", "DELETE"], self._handle_item_request) + def get_database_path(self) -> str: + return self.database_path + def _run_command(self, command_func: Callable[..., _T], *args diff --git a/moonraker/components/file_manager/file_manager.py b/moonraker/components/file_manager/file_manager.py index 7b45bc9..41edd41 100644 --- a/moonraker/components/file_manager/file_manager.py +++ b/moonraker/components/file_manager/file_manager.py @@ -7,6 +7,7 @@ from __future__ import annotations import os import sys +import pathlib import shutil import logging import json @@ -15,6 +16,7 @@ import asyncio from copy import deepcopy from inotify_simple import INotify from inotify_simple import flags as iFlags +from utils import MOONRAKER_PATH # Annotation imports from typing import ( @@ -40,6 +42,7 @@ if TYPE_CHECKING: from components import klippy_apis from components import shell_command from components.job_queue import JobQueue + StrOrPath = Union[str, pathlib.Path] DBComp = database.MoonrakerDatabase APIComp = klippy_apis.KlippyAPI SCMDComp = shell_command.ShellCommandFactory @@ -56,9 +59,13 @@ class FileManager: def __init__(self, config: ConfigHelper) -> None: self.server = config.get_server() self.event_loop = self.server.get_event_loop() + self.reserved_paths: Dict[str, pathlib.Path] = {} self.full_access_roots: Set[str] = set() self.file_paths: Dict[str, str] = {} + self.add_reserved_path("moonraker", MOONRAKER_PATH) db: DBComp = self.server.load_component(config, "database") + db_path = db.get_database_path() + self.add_reserved_path("database", db_path) gc_path: str = db.get_item( "moonraker", "file_manager.gcode_path", "").result() self.gcode_metadata = MetadataStorage(config, gc_path, db) @@ -131,6 +138,7 @@ class FileManager: # Register path for example configs klipper_path = paths.get('klipper_path', None) if klipper_path is not None: + self.add_reserved_path("klipper", klipper_path) example_cfg_path = os.path.join(klipper_path, "config") self.register_directory("config_examples", example_cfg_path) docs_path = os.path.join(klipper_path, "docs") @@ -154,17 +162,19 @@ class FileManager: if os.path.islink(path): path = os.path.realpath(path) if not os.path.isdir(path) or path == "/": - logging.info( - f"\nSupplied path ({path}) for ({root}) is invalid. Make sure\n" + self.server.add_warning( + f"Supplied path ({path}) for ({root}) is invalid. Make sure\n" "that the path exists and is not the file system root.") return False permissions = os.R_OK if full_access: + if not self._check_root_safe(root, path): + return False permissions |= os.W_OK self.full_access_roots.add(root) if not os.access(path, permissions): - logging.info( - f"\nMoonraker does not have permission to access path " + self.server.add_warning( + f"Moonraker does not have permission to access path " f"({path}) for ({root}).") return False if path != self.file_paths.get(root, ""): @@ -184,6 +194,71 @@ class FileManager: "root_update", root, path) return True + def _paths_overlap(self, + path_one: StrOrPath, + path_two: StrOrPath + ) -> bool: + if isinstance(path_one, str): + path_one = pathlib.Path(path_one) + path_one = path_one.expanduser().resolve() + if isinstance(path_two, str): + path_two = pathlib.Path(path_two) + path_two = path_two.expanduser().resolve() + return ( + path_one == path_two or + path_one in path_two.parents or + path_two in path_one.parents + ) + + def _check_root_safe(self, new_root: str, new_path: StrOrPath) -> bool: + # Make sure that registered full access paths + # do no overlap one another, nor a reserved path + if isinstance(new_path, str): + new_path = pathlib.Path(new_path) + new_path = new_path.expanduser().resolve() + for reg_root, reg_path in self.file_paths.items(): + exp_reg_path = pathlib.Path(reg_path).expanduser().resolve() + if ( + reg_root not in self.full_access_roots or + (reg_root == new_root and new_path == exp_reg_path) + ): + continue + if self._paths_overlap(new_path, exp_reg_path): + self.server.add_warning( + f"Failed to register '{new_root}': '{new_path}', path " + f"overlaps registered root '{reg_root}': '{exp_reg_path}'") + return False + for res_name, res_path in self.reserved_paths.items(): + if self._paths_overlap(new_path, res_path): + self.server.add_warning( + f"Failed to register '{new_root}': '{new_path}', path " + f"overlaps reserved path '{res_name}': '{res_path}'") + return False + return True + + def add_reserved_path(self, name: str, res_path: StrOrPath) -> bool: + if isinstance(res_path, str): + res_path = pathlib.Path(res_path) + res_path = res_path.expanduser().resolve() + if ( + name in self.reserved_paths and + res_path == self.reserved_paths[name] + ): + return True + self.reserved_paths[name] = res_path + check_passed = True + for reg_root, reg_path in list(self.file_paths.items()): + if reg_root not in self.full_access_roots: + continue + exp_reg_path = pathlib.Path(reg_path).expanduser().resolve() + if self._paths_overlap(res_path, exp_reg_path): + self.server.add_warning( + f"Full access root '{reg_root}' overlaps reserved path " + f"'{name}', removing access") + self.file_paths.pop(reg_root, None) + check_passed = False + return check_passed + def get_sd_directory(self) -> str: return self.file_paths.get('gcodes', "")