From 3a384e62dded79964c467b7dcfb4ef0b01b86c1a Mon Sep 17 00:00:00 2001 From: Eric Callahan Date: Sat, 26 Feb 2022 07:04:17 -0500 Subject: [PATCH] file_manager: add registered directory validation Make sure that paths registered with full access do not overlap one another, nor that they overlap sensitive folders such as the database, Moonraker's source, or Klipper's source. Signed-off-by: Eric Callahan --- moonraker/components/database.py | 3 + .../components/file_manager/file_manager.py | 83 ++++++++++++++++++- 2 files changed, 82 insertions(+), 4 deletions(-) diff --git a/moonraker/components/database.py b/moonraker/components/database.py index 1872da8..ee1b898 100644 --- a/moonraker/components/database.py +++ b/moonraker/components/database.py @@ -158,6 +158,9 @@ class MoonrakerDatabase: "/server/database/item", ["GET", "POST", "DELETE"], self._handle_item_request) + def get_database_path(self) -> str: + return self.database_path + def _run_command(self, command_func: Callable[..., _T], *args diff --git a/moonraker/components/file_manager/file_manager.py b/moonraker/components/file_manager/file_manager.py index 7b45bc9..41edd41 100644 --- a/moonraker/components/file_manager/file_manager.py +++ b/moonraker/components/file_manager/file_manager.py @@ -7,6 +7,7 @@ from __future__ import annotations import os import sys +import pathlib import shutil import logging import json @@ -15,6 +16,7 @@ import asyncio from copy import deepcopy from inotify_simple import INotify from inotify_simple import flags as iFlags +from utils import MOONRAKER_PATH # Annotation imports from typing import ( @@ -40,6 +42,7 @@ if TYPE_CHECKING: from components import klippy_apis from components import shell_command from components.job_queue import JobQueue + StrOrPath = Union[str, pathlib.Path] DBComp = database.MoonrakerDatabase APIComp = klippy_apis.KlippyAPI SCMDComp = shell_command.ShellCommandFactory @@ -56,9 +59,13 @@ class FileManager: def __init__(self, config: ConfigHelper) -> None: self.server = config.get_server() self.event_loop = self.server.get_event_loop() + self.reserved_paths: Dict[str, pathlib.Path] = {} self.full_access_roots: Set[str] = set() self.file_paths: Dict[str, str] = {} + self.add_reserved_path("moonraker", MOONRAKER_PATH) db: DBComp = self.server.load_component(config, "database") + db_path = db.get_database_path() + self.add_reserved_path("database", db_path) gc_path: str = db.get_item( "moonraker", "file_manager.gcode_path", "").result() self.gcode_metadata = MetadataStorage(config, gc_path, db) @@ -131,6 +138,7 @@ class FileManager: # Register path for example configs klipper_path = paths.get('klipper_path', None) if klipper_path is not None: + self.add_reserved_path("klipper", klipper_path) example_cfg_path = os.path.join(klipper_path, "config") self.register_directory("config_examples", example_cfg_path) docs_path = os.path.join(klipper_path, "docs") @@ -154,17 +162,19 @@ class FileManager: if os.path.islink(path): path = os.path.realpath(path) if not os.path.isdir(path) or path == "/": - logging.info( - f"\nSupplied path ({path}) for ({root}) is invalid. Make sure\n" + self.server.add_warning( + f"Supplied path ({path}) for ({root}) is invalid. Make sure\n" "that the path exists and is not the file system root.") return False permissions = os.R_OK if full_access: + if not self._check_root_safe(root, path): + return False permissions |= os.W_OK self.full_access_roots.add(root) if not os.access(path, permissions): - logging.info( - f"\nMoonraker does not have permission to access path " + self.server.add_warning( + f"Moonraker does not have permission to access path " f"({path}) for ({root}).") return False if path != self.file_paths.get(root, ""): @@ -184,6 +194,71 @@ class FileManager: "root_update", root, path) return True + def _paths_overlap(self, + path_one: StrOrPath, + path_two: StrOrPath + ) -> bool: + if isinstance(path_one, str): + path_one = pathlib.Path(path_one) + path_one = path_one.expanduser().resolve() + if isinstance(path_two, str): + path_two = pathlib.Path(path_two) + path_two = path_two.expanduser().resolve() + return ( + path_one == path_two or + path_one in path_two.parents or + path_two in path_one.parents + ) + + def _check_root_safe(self, new_root: str, new_path: StrOrPath) -> bool: + # Make sure that registered full access paths + # do no overlap one another, nor a reserved path + if isinstance(new_path, str): + new_path = pathlib.Path(new_path) + new_path = new_path.expanduser().resolve() + for reg_root, reg_path in self.file_paths.items(): + exp_reg_path = pathlib.Path(reg_path).expanduser().resolve() + if ( + reg_root not in self.full_access_roots or + (reg_root == new_root and new_path == exp_reg_path) + ): + continue + if self._paths_overlap(new_path, exp_reg_path): + self.server.add_warning( + f"Failed to register '{new_root}': '{new_path}', path " + f"overlaps registered root '{reg_root}': '{exp_reg_path}'") + return False + for res_name, res_path in self.reserved_paths.items(): + if self._paths_overlap(new_path, res_path): + self.server.add_warning( + f"Failed to register '{new_root}': '{new_path}', path " + f"overlaps reserved path '{res_name}': '{res_path}'") + return False + return True + + def add_reserved_path(self, name: str, res_path: StrOrPath) -> bool: + if isinstance(res_path, str): + res_path = pathlib.Path(res_path) + res_path = res_path.expanduser().resolve() + if ( + name in self.reserved_paths and + res_path == self.reserved_paths[name] + ): + return True + self.reserved_paths[name] = res_path + check_passed = True + for reg_root, reg_path in list(self.file_paths.items()): + if reg_root not in self.full_access_roots: + continue + exp_reg_path = pathlib.Path(reg_path).expanduser().resolve() + if self._paths_overlap(res_path, exp_reg_path): + self.server.add_warning( + f"Full access root '{reg_root}' overlaps reserved path " + f"'{name}', removing access") + self.file_paths.pop(reg_root, None) + check_passed = False + return check_passed + def get_sd_directory(self) -> str: return self.file_paths.get('gcodes', "")