file_manager: add registered directory validation

Make sure that paths registered with full access do not overlap one
another, nor that they overlap sensitive folders such as the database,
Moonraker's source, or Klipper's source.

Signed-off-by:  Eric Callahan <arksine.code@gmail.com>
This commit is contained in:
Eric Callahan 2022-02-26 07:04:17 -05:00
parent ed20223614
commit 3a384e62dd
No known key found for this signature in database
GPG Key ID: 7027245FBBDDF59A
2 changed files with 82 additions and 4 deletions

View File

@ -158,6 +158,9 @@ class MoonrakerDatabase:
"/server/database/item", ["GET", "POST", "DELETE"], "/server/database/item", ["GET", "POST", "DELETE"],
self._handle_item_request) self._handle_item_request)
def get_database_path(self) -> str:
return self.database_path
def _run_command(self, def _run_command(self,
command_func: Callable[..., _T], command_func: Callable[..., _T],
*args *args

View File

@ -7,6 +7,7 @@
from __future__ import annotations from __future__ import annotations
import os import os
import sys import sys
import pathlib
import shutil import shutil
import logging import logging
import json import json
@ -15,6 +16,7 @@ import asyncio
from copy import deepcopy from copy import deepcopy
from inotify_simple import INotify from inotify_simple import INotify
from inotify_simple import flags as iFlags from inotify_simple import flags as iFlags
from utils import MOONRAKER_PATH
# Annotation imports # Annotation imports
from typing import ( from typing import (
@ -40,6 +42,7 @@ if TYPE_CHECKING:
from components import klippy_apis from components import klippy_apis
from components import shell_command from components import shell_command
from components.job_queue import JobQueue from components.job_queue import JobQueue
StrOrPath = Union[str, pathlib.Path]
DBComp = database.MoonrakerDatabase DBComp = database.MoonrakerDatabase
APIComp = klippy_apis.KlippyAPI APIComp = klippy_apis.KlippyAPI
SCMDComp = shell_command.ShellCommandFactory SCMDComp = shell_command.ShellCommandFactory
@ -56,9 +59,13 @@ class FileManager:
def __init__(self, config: ConfigHelper) -> None: def __init__(self, config: ConfigHelper) -> None:
self.server = config.get_server() self.server = config.get_server()
self.event_loop = self.server.get_event_loop() self.event_loop = self.server.get_event_loop()
self.reserved_paths: Dict[str, pathlib.Path] = {}
self.full_access_roots: Set[str] = set() self.full_access_roots: Set[str] = set()
self.file_paths: Dict[str, str] = {} self.file_paths: Dict[str, str] = {}
self.add_reserved_path("moonraker", MOONRAKER_PATH)
db: DBComp = self.server.load_component(config, "database") db: DBComp = self.server.load_component(config, "database")
db_path = db.get_database_path()
self.add_reserved_path("database", db_path)
gc_path: str = db.get_item( gc_path: str = db.get_item(
"moonraker", "file_manager.gcode_path", "").result() "moonraker", "file_manager.gcode_path", "").result()
self.gcode_metadata = MetadataStorage(config, gc_path, db) self.gcode_metadata = MetadataStorage(config, gc_path, db)
@ -131,6 +138,7 @@ class FileManager:
# Register path for example configs # Register path for example configs
klipper_path = paths.get('klipper_path', None) klipper_path = paths.get('klipper_path', None)
if klipper_path is not None: if klipper_path is not None:
self.add_reserved_path("klipper", klipper_path)
example_cfg_path = os.path.join(klipper_path, "config") example_cfg_path = os.path.join(klipper_path, "config")
self.register_directory("config_examples", example_cfg_path) self.register_directory("config_examples", example_cfg_path)
docs_path = os.path.join(klipper_path, "docs") docs_path = os.path.join(klipper_path, "docs")
@ -154,17 +162,19 @@ class FileManager:
if os.path.islink(path): if os.path.islink(path):
path = os.path.realpath(path) path = os.path.realpath(path)
if not os.path.isdir(path) or path == "/": if not os.path.isdir(path) or path == "/":
logging.info( self.server.add_warning(
f"\nSupplied path ({path}) for ({root}) is invalid. Make sure\n" f"Supplied path ({path}) for ({root}) is invalid. Make sure\n"
"that the path exists and is not the file system root.") "that the path exists and is not the file system root.")
return False return False
permissions = os.R_OK permissions = os.R_OK
if full_access: if full_access:
if not self._check_root_safe(root, path):
return False
permissions |= os.W_OK permissions |= os.W_OK
self.full_access_roots.add(root) self.full_access_roots.add(root)
if not os.access(path, permissions): if not os.access(path, permissions):
logging.info( self.server.add_warning(
f"\nMoonraker does not have permission to access path " f"Moonraker does not have permission to access path "
f"({path}) for ({root}).") f"({path}) for ({root}).")
return False return False
if path != self.file_paths.get(root, ""): if path != self.file_paths.get(root, ""):
@ -184,6 +194,71 @@ class FileManager:
"root_update", root, path) "root_update", root, path)
return True return True
def _paths_overlap(self,
path_one: StrOrPath,
path_two: StrOrPath
) -> bool:
if isinstance(path_one, str):
path_one = pathlib.Path(path_one)
path_one = path_one.expanduser().resolve()
if isinstance(path_two, str):
path_two = pathlib.Path(path_two)
path_two = path_two.expanduser().resolve()
return (
path_one == path_two or
path_one in path_two.parents or
path_two in path_one.parents
)
def _check_root_safe(self, new_root: str, new_path: StrOrPath) -> bool:
# Make sure that registered full access paths
# do no overlap one another, nor a reserved path
if isinstance(new_path, str):
new_path = pathlib.Path(new_path)
new_path = new_path.expanduser().resolve()
for reg_root, reg_path in self.file_paths.items():
exp_reg_path = pathlib.Path(reg_path).expanduser().resolve()
if (
reg_root not in self.full_access_roots or
(reg_root == new_root and new_path == exp_reg_path)
):
continue
if self._paths_overlap(new_path, exp_reg_path):
self.server.add_warning(
f"Failed to register '{new_root}': '{new_path}', path "
f"overlaps registered root '{reg_root}': '{exp_reg_path}'")
return False
for res_name, res_path in self.reserved_paths.items():
if self._paths_overlap(new_path, res_path):
self.server.add_warning(
f"Failed to register '{new_root}': '{new_path}', path "
f"overlaps reserved path '{res_name}': '{res_path}'")
return False
return True
def add_reserved_path(self, name: str, res_path: StrOrPath) -> bool:
if isinstance(res_path, str):
res_path = pathlib.Path(res_path)
res_path = res_path.expanduser().resolve()
if (
name in self.reserved_paths and
res_path == self.reserved_paths[name]
):
return True
self.reserved_paths[name] = res_path
check_passed = True
for reg_root, reg_path in list(self.file_paths.items()):
if reg_root not in self.full_access_roots:
continue
exp_reg_path = pathlib.Path(reg_path).expanduser().resolve()
if self._paths_overlap(res_path, exp_reg_path):
self.server.add_warning(
f"Full access root '{reg_root}' overlaps reserved path "
f"'{name}', removing access")
self.file_paths.pop(reg_root, None)
check_passed = False
return check_passed
def get_sd_directory(self) -> str: def get_sd_directory(self) -> str:
return self.file_paths.get('gcodes', "") return self.file_paths.get('gcodes', "")