# Configuration Helper # # Copyright (C) 2020 Eric Callahan # # This file may be distributed under the terms of the GNU GPLv3 license from __future__ import annotations import configparser import os import hashlib import pathlib import re import threading import copy import logging from io import StringIO from utils import SentinelClass from components.template import JinjaTemplate # Annotation imports from typing import ( TYPE_CHECKING, Any, Awaitable, Callable, IO, Optional, Set, Tuple, TypeVar, Union, Dict, List, Type, ) if TYPE_CHECKING: from moonraker import Server from components.gpio import GpioFactory, GpioOutputPin from components.template import TemplateFactory from io import TextIOWrapper _T = TypeVar("_T") ConfigVal = Union[None, int, float, bool, str, dict, list] SENTINEL = SentinelClass.get_instance() DOCS_URL = "https://moonraker.readthedocs.io/en/latest" class ConfigError(Exception): pass class ConfigHelper: error = ConfigError def __init__(self, server: Server, config_source: ConfigSourceWrapper, section: str, parsed: Dict[str, Dict[str, ConfigVal]], fallback_section: Optional[str] = None ) -> None: self.server = server self.source = config_source self.config = config_source.get_parser() self.section = section self.fallback_section: Optional[str] = fallback_section self.parsed = parsed if self.section not in self.parsed: self.parsed[self.section] = {} self.sections = self.config.sections self.has_section = self.config.has_section def get_server(self) -> Server: return self.server def get_source(self) -> ConfigSourceWrapper: return self.source def __getitem__(self, key: str) -> ConfigHelper: return self.getsection(key) def __contains__(self, key: str) -> bool: return key in self.config def has_option(self, option: str) -> bool: return self.config.has_option(self.section, option) def set_option(self, option: str, value: str) -> None: self.source.set_option(self.section, option, value) def get_name(self) -> str: return self.section def get_file(self) -> Optional[pathlib.Path]: return self.source.find_config_file(self.section) def get_options(self) -> Dict[str, str]: if self.section not in self.config: return {} return dict(self.config[self.section]) def get_hash(self) -> hashlib._Hash: hash = hashlib.sha256() section = self.section if self.section not in self.config: return hash for option, val in self.config[section].items(): hash.update(option.encode()) hash.update(val.encode()) return hash def get_prefix_sections(self, prefix: str) -> List[str]: return [s for s in self.sections() if s.startswith(prefix)] def getsection( self, section: str, fallback: Optional[str] = None ) -> ConfigHelper: return ConfigHelper( self.server, self.source, section, self.parsed, fallback ) def _get_option(self, func: Callable[..., Any], option: str, default: Union[SentinelClass, _T], above: Optional[Union[int, float]] = None, below: Optional[Union[int, float]] = None, minval: Optional[Union[int, float]] = None, maxval: Optional[Union[int, float]] = None, deprecate: bool = False ) -> _T: section = self.section warn_fallback = False if ( self.section not in self.config and self.fallback_section is not None ): section = self.fallback_section warn_fallback = True try: val = func(section, option) except (configparser.NoOptionError, configparser.NoSectionError) as e: if isinstance(default, SentinelClass): raise ConfigError(str(e)) from None val = default section = self.section except Exception: raise ConfigError( f"Error parsing option ({option}) from " f"section [{self.section}]") else: if deprecate: self.server.add_warning( f"[{self.section}]: Option '{option}' is " "deprecated, see the configuration documention " f"at {DOCS_URL}/configuration/") if warn_fallback: help = f"{DOCS_URL}/configuration/#option-moved-deprecations" self.server.add_warning( f"[{section}]: Option '{option}' has been moved " f"to section [{self.section}]. Please correct your " f"configuration, see {help} for detailed documentation." ) self._check_option(option, val, above, below, minval, maxval) if option not in self.parsed[section]: if ( val is None or isinstance(val, (int, float, bool, str, dict, list)) ): self.parsed[section][option] = val else: # If the item cannot be encoded to json serialize to a string self.parsed[section][option] = str(val) return val def _check_option(self, option: str, value: Union[int, float], above: Optional[Union[int, float]], below: Optional[Union[int, float]], minval: Optional[Union[int, float]], maxval: Optional[Union[int, float]] ) -> None: if above is not None and value <= above: raise self.error( f"Config Error: Section [{self.section}], Option " f"'{option}: {value}': value is not above {above}") if below is not None and value >= below: raise self.error( f"Config Error: Section [{self.section}], Option " f"'{option}: {value}': value is not below {below}") if minval is not None and value < minval: raise self.error( f"Config Error: Section [{self.section}], Option " f"'{option}: {value}': value is below minimum value {minval}") if maxval is not None and value > maxval: raise self.error( f"Config Error: Section [{self.section}], Option " f"'{option}: {value}': value is above maximum value {minval}") def get(self, option: str, default: Union[SentinelClass, _T] = SENTINEL, deprecate: bool = False ) -> Union[str, _T]: return self._get_option( self.config.get, option, default, deprecate=deprecate) def getint(self, option: str, default: Union[SentinelClass, _T] = SENTINEL, above: Optional[int] = None, below: Optional[int] = None, minval: Optional[int] = None, maxval: Optional[int] = None, deprecate: bool = False ) -> Union[int, _T]: return self._get_option( self.config.getint, option, default, above, below, minval, maxval, deprecate) def getboolean(self, option: str, default: Union[SentinelClass, _T] = SENTINEL, deprecate: bool = False ) -> Union[bool, _T]: return self._get_option( self.config.getboolean, option, default, deprecate=deprecate) def getfloat(self, option: str, default: Union[SentinelClass, _T] = SENTINEL, above: Optional[float] = None, below: Optional[float] = None, minval: Optional[float] = None, maxval: Optional[float] = None, deprecate: bool = False ) -> Union[float, _T]: return self._get_option( self.config.getfloat, option, default, above, below, minval, maxval, deprecate) def getlists(self, option: str, default: Union[SentinelClass, _T] = SENTINEL, list_type: Type = str, separators: Tuple[Optional[str], ...] = ('\n',), count: Optional[Tuple[Optional[int], ...]] = None, deprecate: bool = False ) -> Union[List[Any], _T]: if count is not None and len(count) != len(separators): raise ConfigError( f"Option '{option}' in section " f"[{self.section}]: length of 'count' argument must ", "match length of 'separators' argument") else: count = tuple(None for _ in range(len(separators))) def list_parser(value: str, ltype: Type, seps: Tuple[Optional[str], ...], expected_cnt: Tuple[Optional[int], ...] ) -> List[Any]: sep = seps[0] seps = seps[1:] cnt = expected_cnt[0] expected_cnt = expected_cnt[1:] ret: List[Any] = [] if seps: sub_lists = [val.strip() for val in value.split(sep) if val.strip()] for sub_list in sub_lists: ret.append(list_parser(sub_list, ltype, seps, expected_cnt)) else: ret = [ltype(val.strip()) for val in value.split(sep) if val.strip()] if cnt is not None and len(ret) != cnt: raise ConfigError( f"List length mismatch, expected {cnt}, " f"parsed {len(ret)}") return ret def getlist_wrapper(sec: str, opt: str) -> List[Any]: val = self.config.get(sec, opt) assert count is not None return list_parser(val, list_type, separators, count) return self._get_option(getlist_wrapper, option, default, deprecate=deprecate) def getlist(self, option: str, default: Union[SentinelClass, _T] = SENTINEL, separator: Optional[str] = '\n', count: Optional[int] = None, deprecate: bool = False ) -> Union[List[str], _T]: return self.getlists(option, default, str, (separator,), (count,), deprecate=deprecate) def getintlist(self, option: str, default: Union[SentinelClass, _T] = SENTINEL, separator: Optional[str] = '\n', count: Optional[int] = None, deprecate: bool = False ) -> Union[List[int], _T]: return self.getlists(option, default, int, (separator,), (count,), deprecate=deprecate) def getfloatlist(self, option: str, default: Union[SentinelClass, _T] = SENTINEL, separator: Optional[str] = '\n', count: Optional[int] = None, deprecate: bool = False ) -> Union[List[float], _T]: return self.getlists(option, default, float, (separator,), (count,), deprecate=deprecate) def getdict(self, option: str, default: Union[SentinelClass, _T] = SENTINEL, separators: Tuple[Optional[str], Optional[str]] = ('\n', '='), dict_type: Type = str, allow_empty_fields: bool = False, deprecate: bool = False ) -> Union[Dict[str, Any], _T]: if len(separators) != 2: raise ConfigError( "The `separators` argument of getdict() must be a Tuple" "of length of 2") def getdict_wrapper(sec: str, opt: str) -> Dict[str, Any]: val = self.config.get(sec, opt) ret: Dict[str, Any] = {} for line in val.split(separators[0]): line = line.strip() if not line: continue parts = line.split(separators[1], 1) if len(parts) == 1: if allow_empty_fields: ret[parts[0].strip()] = None else: raise ConfigError( f"Failed to parse dictionary field, {line}") else: ret[parts[0].strip()] = dict_type(parts[1].strip()) return ret return self._get_option(getdict_wrapper, option, default, deprecate=deprecate) def getgpioout(self, option: str, default: Union[SentinelClass, _T] = SENTINEL, initial_value: int = 0, deprecate: bool = False ) -> Union[GpioOutputPin, _T]: try: gpio: GpioFactory = self.server.load_component(self, 'gpio') except Exception: raise ConfigError( f"Section [{self.section}], option '{option}', " "GPIO Component not available") def getgpio_wrapper(sec: str, opt: str) -> GpioOutputPin: val = self.config.get(sec, opt) return gpio.setup_gpio_out(val, initial_value) return self._get_option(getgpio_wrapper, option, default, deprecate=deprecate) def gettemplate(self, option: str, default: Union[SentinelClass, _T] = SENTINEL, is_async: bool = False, deprecate: bool = False ) -> Union[JinjaTemplate, _T]: try: template: TemplateFactory template = self.server.load_component(self, 'template') except Exception: raise ConfigError( f"Section [{self.section}], option '{option}', " "Template Component not available") def gettemplate_wrapper(sec: str, opt: str) -> JinjaTemplate: val = self.config.get(sec, opt) return template.create_template(val.strip(), is_async) return self._get_option(gettemplate_wrapper, option, default, deprecate=deprecate) def load_template(self, option: str, default: Union[SentinelClass, str] = SENTINEL, is_async: bool = False, deprecate: bool = False ) -> JinjaTemplate: val = self.gettemplate(option, default, is_async, deprecate) if isinstance(val, str): template: TemplateFactory template = self.server.lookup_component('template') return template.create_template(val.strip(), is_async) return val def getpath(self, option: str, default: Union[SentinelClass, _T] = SENTINEL, deprecate: bool = False ) -> Union[pathlib.Path, _T]: val = self.gettemplate(option, default, deprecate=deprecate) if isinstance(val, JinjaTemplate): ctx = {"data_path": self.server.get_app_args()["data_path"]} strpath = val.render(ctx) return pathlib.Path(strpath).expanduser().resolve() return val def read_supplemental_dict(self, obj: Dict[str, Any]) -> ConfigHelper: if not obj: raise ConfigError(f"Cannot ready Empty Dict") source = DictSourceWrapper() source.read_dict(obj) sections = source.config.sections() return ConfigHelper(self.server, source, sections[0], {}) def read_supplemental_config(self, file_name: str) -> ConfigHelper: fpath = pathlib.Path(file_name).expanduser().resolve() source = FileSourceWrapper(self.server) source.read_file(fpath) sections = source.config.sections() return ConfigHelper(self.server, source, sections[0], {}) def write_config(self, file_obj: IO[str]) -> None: self.config.write(file_obj) def get_parsed_config(self) -> Dict[str, Dict[str, ConfigVal]]: return dict(self.parsed) def get_orig_config(self) -> Dict[str, Dict[str, str]]: return self.source.as_dict() def get_file_sections(self) -> Dict[str, List[str]]: return self.source.get_file_sections() def get_config_files(self) -> List[str]: return [str(f) for f in self.source.get_files()] def validate_config(self) -> None: for sect in self.config.sections(): if sect not in self.parsed: self.server.add_warning( f"Unparsed config section [{sect}] detected. This " "may be the result of a component that failed to " "load. In the future this will result in a startup " "error.") continue parsed_opts = self.parsed[sect] for opt, val in self.config.items(sect): if opt not in parsed_opts: self.server.add_warning( f"Unparsed config option '{opt}: {val}' detected in " f"section [{sect}]. This may be an option no longer " "available or could be the result of a module that " "failed to load. In the future this will result " "in a startup error.") def create_backup(self): cfg_path = self.server.get_app_args()["config_file"] cfg = pathlib.Path(cfg_path).expanduser().resolve() backup = cfg.parent.joinpath(f".{cfg.name}.bkp") backup_fp: Optional[TextIOWrapper] = None try: if backup.exists(): cfg_mtime: int = 0 for cfg in self.source.get_files(): cfg_mtime = max(cfg_mtime, cfg.stat().st_mtime_ns) backup_mtime = backup.stat().st_mtime_ns if backup_mtime >= cfg_mtime: # Backup already exists and is current return backup_fp = backup.open("w") self.config.write(backup_fp) logging.info(f"Backing up last working configuration to '{backup}'") except Exception: logging.exception("Failed to create a backup") finally: if backup_fp is not None: backup_fp.close() class ConfigSourceWrapper: def __init__(self): self.config = configparser.ConfigParser(interpolation=None) def get_parser(self): return self.config def as_dict(self) -> Dict[str, Dict[str, str]]: return {key: dict(val) for key, val in self.config.items()} def write_to_string(self) -> str: sio = StringIO() self.config.write(sio) val = sio.getvalue() sio.close() return val def get_files(self) -> List[pathlib.Path]: return [] def set_option(self, section: str, option: str, value: str) -> None: self.config.set(section, option, value) def remove_option(self, section: str, option: str) -> None: self.config.remove_option(section, option) def add_section(self, section: str) -> None: self.config.add_section(section) def remove_section(self, section: str) -> None: self.config.remove_section(section) def get_file_sections(self) -> Dict[str, List[str]]: return {} def find_config_file( self, section: str, option: Optional[str] = None ) -> Optional[pathlib.Path]: return None class DictSourceWrapper(ConfigSourceWrapper): def __init__(self): super().__init__() def read_dict(self, cfg: Dict[str, Any]) -> None: try: self.config.read_dict(cfg) except Exception as e: raise ConfigError("Error Reading config as dict") from e class FileSourceWrapper(ConfigSourceWrapper): section_r = re.compile(r"\s*\[([^]]+)\]") def __init__(self, server: Server) -> None: super().__init__() self.server = server self.files: List[pathlib.Path] = [] self.raw_config_data: List[str] = [] self.updates_pending: Set[int] = set() self.file_section_map: Dict[str, List[int]] = {} self.file_option_map: Dict[Tuple[str, str], List[int]] = {} self.save_lock = threading.Lock() self.backup: Dict[str, Any] = {} def get_files(self) -> List[pathlib.Path]: return self.files def is_in_transaction(self) -> bool: return ( len(self.updates_pending) > 0 or self.save_lock.locked() ) def backup_source(self) -> None: self.backup = { "raw_data": list(self.raw_config_data), "section_map": copy.deepcopy(self.file_section_map), "option_map": copy.deepcopy(self.file_option_map), "config": self.write_to_string() } def _acquire_save_lock(self) -> None: if not self.files: raise ConfigError( "Can only modify file backed configurations" ) if not self.save_lock.acquire(blocking=False): raise ConfigError("Configuration locked, cannot modify") def set_option(self, section: str, option: str, value: str) -> None: self._acquire_save_lock() try: value = value.strip() try: if (self.config.get(section, option).strip() == value): return except (configparser.NoSectionError, configparser.NoOptionError): pass file_idx: int = 0 has_sec = has_opt = False if (section, option) in self.file_option_map: file_idx = self.file_option_map[(section, option)][0] has_sec = has_opt = True elif section in self.file_section_map: file_idx = self.file_section_map[section][0] has_sec = True buf = self.raw_config_data[file_idx].splitlines() new_opt_list = [f"{option}: {value}"] if "\n" in value: vals = [f" {v}" for v in value.split("\n")] new_opt_list = [f"{option}:"] + vals sec_info = self._find_section_info(section, buf, raise_error=False) if sec_info: options: Dict[str, Any] = sec_info["options"] indent: int = sec_info["indent"] opt_start: int = sec_info["end"] opt_end: int = sec_info["end"] opt_info: Optional[Dict[str, Any]] = options.get(option) if opt_info is not None: indent = opt_info["indent"] opt_start = opt_info["start"] opt_end = opt_info["end"] elif options: # match indentation of last option in section last_opt = list(options.values())[-1] indent = last_opt["indent"] if indent: padding = " " * indent new_opt_list = [f"{padding}{v}" for v in new_opt_list] buf[opt_start:] = new_opt_list + buf[opt_end:] else: # Append new section to the end of the file new_opt_list.insert(0, f"[{section}]") if buf and buf[-1].strip() != "": new_opt_list.insert(0, "") buf.extend(new_opt_list) buf.append("") updated_cfg = "\n".join(buf) # test changes to the configuration test_parser = configparser.ConfigParser(interpolation=None) try: test_parser.read_string(updated_cfg) if not test_parser.has_option(section, option): raise ConfigError("Option not added") except Exception as e: raise ConfigError( f"Failed to set option '{option}' in section " f"[{section}], file: {self.files[file_idx]}" ) from e # Update local configuration/tracking self.raw_config_data[file_idx] = updated_cfg self.updates_pending.add(file_idx) if not has_sec: self.file_section_map[section] = [file_idx] if not has_opt: self.file_option_map[(section, option)] = [file_idx] if not self.config.has_section(section): self.config.add_section(section) self.config.set(section, option, value) finally: self.save_lock.release() def remove_option(self, section: str, option: str) -> None: self._acquire_save_lock() try: key = (section, option) if key not in self.file_option_map: return pending: List[Tuple[int, str]] = [] file_indices = self.file_option_map[key] for idx in file_indices: buf = self.raw_config_data[idx].splitlines() try: sec_info = self._find_section_info(section, buf) opt_info = sec_info["options"][option] start = opt_info["start"] end = opt_info["end"] if ( end < len(buf) and not buf[start-1].strip() and not buf[end].strip() ): end += 1 buf[start:] = buf[end:] buf.append("") updated_cfg = "\n".join(buf) test_parser = configparser.ConfigParser(interpolation=None) test_parser.read_string(updated_cfg) if test_parser.has_option(section, option): raise ConfigError("Option still exists") pending.append((idx, updated_cfg)) except Exception as e: raise ConfigError( f"Failed to remove option '{option}' from section " f"[{section}], file: {self.files[idx]}" ) from e # Update configuration/tracking for (idx, data) in pending: self.updates_pending.add(idx) self.raw_config_data[idx] = data del self.file_option_map[key] self.config.remove_option(section, option) finally: self.save_lock.release() def add_section(self, section: str) -> None: self._acquire_save_lock() try: if section in self.file_section_map: return # add section to end of primary file buf = self.raw_config_data[0].splitlines() if buf and buf[-1].strip() != "": buf.append("") buf.extend([f"[{section}]", ""]) updated_cfg = "\n".join(buf) try: test_parser = configparser.ConfigParser(interpolation=None) test_parser.read_string(updated_cfg) if not test_parser.has_section(section): raise ConfigError("Section not added") except Exception as e: raise ConfigError( f"Failed to add section [{section}], file: {self.files[0]}" ) from e self.updates_pending.add(0) self.file_section_map[section] = [0] self.raw_config_data[0] = updated_cfg self.config.add_section(section) finally: self.save_lock.release() def remove_section(self, section: str) -> None: self._acquire_save_lock() try: if section not in self.file_section_map: return pending: List[Tuple[int, str]] = [] file_indices = self.file_section_map[section] for idx in file_indices: buf = self.raw_config_data[idx].splitlines() try: sec_info = self._find_section_info(section, buf) start = sec_info["start"] end = sec_info["end"] if ( end < len(buf) and not buf[start-1].strip() and not buf[end].strip() ): end += 1 buf[start:] = buf[end:] buf.append("") updated_cfg = "\n".join(buf) test_parser = configparser.ConfigParser(interpolation=None) test_parser.read_string(updated_cfg) if test_parser.has_section(section): raise ConfigError("Section still exists") pending.append((idx, updated_cfg)) except Exception as e: raise ConfigError( f"Failed to remove section [{section}], " f"file: {self.files[0]}" ) from e for (idx, data) in pending: self.updates_pending.add(idx) self.raw_config_data[idx] = data del self.file_section_map[section] self.config.remove_section(section) finally: self.save_lock.release() def save(self) -> Awaitable[bool]: eventloop = self.server.get_event_loop() if self.server.is_running(): fut = eventloop.run_in_thread(self._do_save) else: fut = eventloop.create_future() fut.set_result(self._do_save()) return fut def _do_save(self) -> bool: with self.save_lock: self.backup.clear() if not self.updates_pending: return False for idx in self.updates_pending: fpath = self.files[idx] fpath.write_text( self.raw_config_data[idx], encoding="utf-8" ) self.updates_pending.clear() return True def cancel(self): self._acquire_save_lock() try: if not self.backup or not self.updates_pending: self.backup.clear() return self.raw_config_data = self.backup["raw_data"] self.file_option_map = self.backup["option_map"] self.file_section_map = self.backup["section_map"] self.config.clear() self.config.read_string(self.backup["config"]) self.updates_pending.clear() self.backup.clear() finally: self.save_lock.release() def revert(self) -> Awaitable[bool]: eventloop = self.server.get_event_loop() if self.server.is_running(): fut = eventloop.run_in_thread(self._do_revert) else: fut = eventloop.create_future() fut.set_result(self._do_revert()) return fut def _do_revert(self) -> bool: with self.save_lock: if not self.updates_pending: return False self.backup.clear() entry = self.files[0] self.read_file(entry) return True def write_config( self, dest_folder: Union[str, pathlib.Path] ) -> Awaitable[None]: eventloop = self.server.get_event_loop() if self.server.is_running(): fut = eventloop.run_in_thread(self._do_write, dest_folder) else: self._do_write(dest_folder) fut = eventloop.create_future() fut.set_result(None) return fut def _do_write(self, dest_folder: Union[str, pathlib.Path]) -> None: with self.save_lock: if isinstance(dest_folder, str): dest_folder = pathlib.Path(dest_folder) dest_folder = dest_folder.expanduser().resolve() cfg_parent = self.files[0].parent for i, path in enumerate(self.files): try: rel_path = path.relative_to(cfg_parent) dest_file = dest_folder.joinpath(rel_path) except ValueError: dest_file = dest_folder.joinpath( f"{path.parent.name}-{path.name}" ) os.makedirs(str(dest_file.parent), exist_ok=True) dest_file.write_text(self.raw_config_data[i]) def _find_section_info( self, section: str, file_data: List[str], raise_error: bool = True ) -> Dict[str, Any]: options: Dict[str, Dict[str, Any]] = {} result: Dict[str, Any] = { "indent": -1, "start": -1, "end": -1, "options": options } last_option: str = "" opt_indent = -1 for idx, line in enumerate(file_data): if not line.strip() or line.lstrip()[0] in "#;": # skip empty lines, whitespace, and comments continue line = line.expandtabs() line_indent = len(line) - len(line.strip()) if opt_indent != -1 and line_indent > opt_indent: if last_option: options[last_option]["end"] = idx + 1 # Continuation of an option if result["start"] != -1: result["end"] = idx + 1 continue sec_match = self.section_r.match(line) if sec_match is not None: opt_indent = -1 if result["start"] != -1: break cursec = sec_match.group(1) if section == cursec: result["indent"] = line_indent result["start"] = idx result["end"] = idx + 1 else: # This is an option opt_indent = line_indent if result["start"] != -1: result["end"] = idx + 1 last_option = re.split(r"[:=]", line, 1)[0].strip() options[last_option] = { "indent": line_indent, "start": idx, "end": idx + 1 } if result["start"] != -1: return result if raise_error: raise ConfigError(f"Unable to find section [{section}]") return {} def get_file_sections(self) -> Dict[str, List[str]]: sections_by_file: Dict[str, List[str]] = { str(fname): [] for fname in self.files } for section, idx_list in self.file_section_map.items(): for idx in idx_list: fname = str(self.files[idx]) sections_by_file[fname].append(section) return sections_by_file def find_config_file( self, section: str, option: Optional[str] = None ) -> Optional[pathlib.Path]: idx: int = -1 if option is not None: key = (section, option) if key in self.file_option_map: idx = self.file_option_map[key][0] elif section in self.file_section_map: idx = self.file_section_map[section][0] if idx == -1: return None return self.files[idx] def _write_buffer(self, buffer: List[str], fpath: pathlib.Path) -> None: if not buffer: return self.config.read_string("\n".join(buffer), fpath.name) def _parse_file( self, file_path: pathlib.Path, visited: List[Tuple[int, int]] ) -> None: buffer: List[str] = [] try: stat = file_path.stat() cur_stat = (stat.st_dev, stat.st_ino) if cur_stat in visited: raise ConfigError( f"Recursive include directive detected, {file_path}" ) visited.append(cur_stat) self.files.append(file_path) file_index = len(self.files) - 1 cfg_data = file_path.read_text(encoding="utf-8") self.raw_config_data.append(cfg_data) lines = cfg_data.splitlines() last_section = "" opt_indent = -1 for line in lines: if not line.strip() or line.lstrip()[0] in "#;": # ignore lines that contain only whitespace/comments continue line = line.expandtabs(tabsize=4) # Remove inline comments for prefix in "#;": icmt = line.find(prefix) if icmt > 0 and line[icmt-1] != "\\": # inline comment, remove it line = line[:icmt] break line_indent = len(line) - len(line.lstrip()) if opt_indent != -1 and line_indent > opt_indent: # Multi-line value, append to buffer and resume parsing buffer.append(line) continue sect_match = self.section_r.match(line) if sect_match is not None: # Section detected opt_indent = -1 section = sect_match.group(1) if section.startswith("include "): inc_path = section[8:].strip() if not inc_path: raise ConfigError( f"Invalid include directive: [{section}]" ) if inc_path[0] == "/": new_path = pathlib.Path(inc_path).resolve() paths = sorted(new_path.parent.glob(new_path.name)) else: paths = sorted(file_path.parent.glob(inc_path)) if not paths: raise ConfigError( "No files matching include directive " f"[{section}]" ) # Write out buffered data to the config before parsing # included files self._write_buffer(buffer, file_path) buffer.clear() for p in paths: self._parse_file(p, visited) # Don't add included sections to the configparser continue else: last_section = section if section not in self.file_section_map: self.file_section_map[section] = [] elif file_index in self.file_section_map[section]: raise ConfigError( f"Duplicate section [{section}] in file " f"{file_path}" ) self.file_section_map[section].insert(0, file_index) else: # This line must specify an option opt_indent = line_indent option = re.split(r"[:=]", line, 1)[0].strip() key = (last_section, option) if key not in self.file_option_map: self.file_option_map[key] = [] elif file_index in self.file_option_map[key]: raise ConfigError( f"Duplicate option '{option}' in section " f"[{last_section}], file {file_path} " ) self.file_option_map[key].insert(0, file_index) buffer.append(line) self._write_buffer(buffer, file_path) except ConfigError: raise except Exception as e: if not file_path.is_file(): raise ConfigError( f"Configuration File Not Found: '{file_path}''") from e if not os.access(file_path, os.R_OK): raise ConfigError( "Moonraker does not have Read/Write permission for " f"config file at path '{file_path}'") from e raise ConfigError(f"Error Reading Config: '{file_path}'") from e def read_file(self, main_conf: pathlib.Path) -> None: self.config.clear() self.files.clear() self.raw_config_data.clear() self.updates_pending.clear() self.file_section_map.clear() self.file_option_map.clear() self._parse_file(main_conf, []) size = sum([len(rawcfg) for rawcfg in self.raw_config_data]) logging.info( f"Configuration File '{main_conf}' parsed, total size: {size} B" ) def get_configuration( server: Server, app_args: Dict[str, Any] ) -> ConfigHelper: start_path = pathlib.Path(app_args['config_file']).expanduser().resolve() source = FileSourceWrapper(server) source.read_file(start_path) if not source.config.has_section('server'): raise ConfigError("No section [server] in config") return ConfigHelper(server, source, 'server', {}) def find_config_backup(cfg_path: str) -> Optional[str]: cfg = pathlib.Path(cfg_path).expanduser().resolve() backup = cfg.parent.joinpath(f".{cfg.name}.bkp") if backup.is_file(): return str(backup) return None