CreatBotMoonraker/moonraker/confighelper.py
Eric Callahan 484950cb40
confighelper: implement getpath
Signed-off-by:  Eric Callahan <arksine.code@gmail.com>
2022-11-06 10:51:30 -05:00

1064 lines
41 KiB
Python

# Configuration Helper
#
# Copyright (C) 2020 Eric Callahan <arksine.code@gmail.com>
#
# This file may be distributed under the terms of the GNU GPLv3 license
from __future__ import annotations
import configparser
import os
import hashlib
import pathlib
import re
import threading
import copy
import logging
from io import StringIO
from utils import SentinelClass
from components.template import JinjaTemplate
# Annotation imports
from typing import (
TYPE_CHECKING,
Any,
Awaitable,
Callable,
IO,
Optional,
Set,
Tuple,
TypeVar,
Union,
Dict,
List,
Type,
)
if TYPE_CHECKING:
from moonraker import Server
from components.gpio import GpioFactory, GpioOutputPin
from components.template import TemplateFactory
from io import TextIOWrapper
_T = TypeVar("_T")
ConfigVal = Union[None, int, float, bool, str, dict, list]
SENTINEL = SentinelClass.get_instance()
DOCS_URL = "https://moonraker.readthedocs.io/en/latest"
class ConfigError(Exception):
pass
class ConfigHelper:
error = ConfigError
def __init__(self,
server: Server,
config_source: ConfigSourceWrapper,
section: str,
parsed: Dict[str, Dict[str, ConfigVal]],
fallback_section: Optional[str] = None
) -> None:
self.server = server
self.source = config_source
self.config = config_source.get_parser()
self.section = section
self.fallback_section: Optional[str] = fallback_section
self.parsed = parsed
if self.section not in self.parsed:
self.parsed[self.section] = {}
self.sections = self.config.sections
self.has_section = self.config.has_section
def get_server(self) -> Server:
return self.server
def get_source(self) -> ConfigSourceWrapper:
return self.source
def __getitem__(self, key: str) -> ConfigHelper:
return self.getsection(key)
def __contains__(self, key: str) -> bool:
return key in self.config
def has_option(self, option: str) -> bool:
return self.config.has_option(self.section, option)
def set_option(self, option: str, value: str) -> None:
self.source.set_option(self.section, option, value)
def get_name(self) -> str:
return self.section
def get_file(self) -> Optional[pathlib.Path]:
return self.source.find_config_file(self.section)
def get_options(self) -> Dict[str, str]:
if self.section not in self.config:
return {}
return dict(self.config[self.section])
def get_hash(self) -> hashlib._Hash:
hash = hashlib.sha256()
section = self.section
if self.section not in self.config:
return hash
for option, val in self.config[section].items():
hash.update(option.encode())
hash.update(val.encode())
return hash
def get_prefix_sections(self, prefix: str) -> List[str]:
return [s for s in self.sections() if s.startswith(prefix)]
def getsection(
self, section: str, fallback: Optional[str] = None
) -> ConfigHelper:
return ConfigHelper(
self.server, self.source, section, self.parsed, fallback
)
def _get_option(self,
func: Callable[..., Any],
option: str,
default: Union[SentinelClass, _T],
above: Optional[Union[int, float]] = None,
below: Optional[Union[int, float]] = None,
minval: Optional[Union[int, float]] = None,
maxval: Optional[Union[int, float]] = None,
deprecate: bool = False
) -> _T:
section = self.section
warn_fallback = False
if (
self.section not in self.config and
self.fallback_section is not None
):
section = self.fallback_section
warn_fallback = True
try:
val = func(section, option)
except (configparser.NoOptionError, configparser.NoSectionError) as e:
if isinstance(default, SentinelClass):
raise ConfigError(str(e)) from None
val = default
section = self.section
except Exception:
raise ConfigError(
f"Error parsing option ({option}) from "
f"section [{self.section}]")
else:
if deprecate:
self.server.add_warning(
f"[{self.section}]: Option '{option}' is "
"deprecated, see the configuration documention "
f"at {DOCS_URL}/configuration/")
if warn_fallback:
help = f"{DOCS_URL}/configuration/#option-moved-deprecations"
self.server.add_warning(
f"[{section}]: Option '{option}' has been moved "
f"to section [{self.section}]. Please correct your "
f"configuration, see {help} for detailed documentation."
)
self._check_option(option, val, above, below, minval, maxval)
if option not in self.parsed[section]:
if (
val is None or
isinstance(val, (int, float, bool, str, dict, list))
):
self.parsed[section][option] = val
else:
# If the item cannot be encoded to json serialize to a string
self.parsed[section][option] = str(val)
return val
def _check_option(self,
option: str,
value: Union[int, float],
above: Optional[Union[int, float]],
below: Optional[Union[int, float]],
minval: Optional[Union[int, float]],
maxval: Optional[Union[int, float]]
) -> None:
if above is not None and value <= above:
raise self.error(
f"Config Error: Section [{self.section}], Option "
f"'{option}: {value}': value is not above {above}")
if below is not None and value >= below:
raise self.error(
f"Config Error: Section [{self.section}], Option "
f"'{option}: {value}': value is not below {below}")
if minval is not None and value < minval:
raise self.error(
f"Config Error: Section [{self.section}], Option "
f"'{option}: {value}': value is below minimum value {minval}")
if maxval is not None and value > maxval:
raise self.error(
f"Config Error: Section [{self.section}], Option "
f"'{option}: {value}': value is above maximum value {minval}")
def get(self,
option: str,
default: Union[SentinelClass, _T] = SENTINEL,
deprecate: bool = False
) -> Union[str, _T]:
return self._get_option(
self.config.get, option, default,
deprecate=deprecate)
def getint(self,
option: str,
default: Union[SentinelClass, _T] = SENTINEL,
above: Optional[int] = None,
below: Optional[int] = None,
minval: Optional[int] = None,
maxval: Optional[int] = None,
deprecate: bool = False
) -> Union[int, _T]:
return self._get_option(
self.config.getint, option, default,
above, below, minval, maxval, deprecate)
def getboolean(self,
option: str,
default: Union[SentinelClass, _T] = SENTINEL,
deprecate: bool = False
) -> Union[bool, _T]:
return self._get_option(
self.config.getboolean, option, default,
deprecate=deprecate)
def getfloat(self,
option: str,
default: Union[SentinelClass, _T] = SENTINEL,
above: Optional[float] = None,
below: Optional[float] = None,
minval: Optional[float] = None,
maxval: Optional[float] = None,
deprecate: bool = False
) -> Union[float, _T]:
return self._get_option(
self.config.getfloat, option, default,
above, below, minval, maxval, deprecate)
def getlists(self,
option: str,
default: Union[SentinelClass, _T] = SENTINEL,
list_type: Type = str,
separators: Tuple[Optional[str], ...] = ('\n',),
count: Optional[Tuple[Optional[int], ...]] = None,
deprecate: bool = False
) -> Union[List[Any], _T]:
if count is not None and len(count) != len(separators):
raise ConfigError(
f"Option '{option}' in section "
f"[{self.section}]: length of 'count' argument must ",
"match length of 'separators' argument")
else:
count = tuple(None for _ in range(len(separators)))
def list_parser(value: str,
ltype: Type,
seps: Tuple[Optional[str], ...],
expected_cnt: Tuple[Optional[int], ...]
) -> List[Any]:
sep = seps[0]
seps = seps[1:]
cnt = expected_cnt[0]
expected_cnt = expected_cnt[1:]
ret: List[Any] = []
if seps:
sub_lists = [val.strip() for val in value.split(sep)
if val.strip()]
for sub_list in sub_lists:
ret.append(list_parser(sub_list, ltype, seps,
expected_cnt))
else:
ret = [ltype(val.strip()) for val in value.split(sep)
if val.strip()]
if cnt is not None and len(ret) != cnt:
raise ConfigError(
f"List length mismatch, expected {cnt}, "
f"parsed {len(ret)}")
return ret
def getlist_wrapper(sec: str, opt: str) -> List[Any]:
val = self.config.get(sec, opt)
assert count is not None
return list_parser(val, list_type, separators, count)
return self._get_option(getlist_wrapper, option, default,
deprecate=deprecate)
def getlist(self,
option: str,
default: Union[SentinelClass, _T] = SENTINEL,
separator: Optional[str] = '\n',
count: Optional[int] = None,
deprecate: bool = False
) -> Union[List[str], _T]:
return self.getlists(option, default, str, (separator,), (count,),
deprecate=deprecate)
def getintlist(self,
option: str,
default: Union[SentinelClass, _T] = SENTINEL,
separator: Optional[str] = '\n',
count: Optional[int] = None,
deprecate: bool = False
) -> Union[List[int], _T]:
return self.getlists(option, default, int, (separator,), (count,),
deprecate=deprecate)
def getfloatlist(self,
option: str,
default: Union[SentinelClass, _T] = SENTINEL,
separator: Optional[str] = '\n',
count: Optional[int] = None,
deprecate: bool = False
) -> Union[List[float], _T]:
return self.getlists(option, default, float, (separator,), (count,),
deprecate=deprecate)
def getdict(self,
option: str,
default: Union[SentinelClass, _T] = SENTINEL,
separators: Tuple[Optional[str], Optional[str]] = ('\n', '='),
dict_type: Type = str,
allow_empty_fields: bool = False,
deprecate: bool = False
) -> Union[Dict[str, Any], _T]:
if len(separators) != 2:
raise ConfigError(
"The `separators` argument of getdict() must be a Tuple"
"of length of 2")
def getdict_wrapper(sec: str, opt: str) -> Dict[str, Any]:
val = self.config.get(sec, opt)
ret: Dict[str, Any] = {}
for line in val.split(separators[0]):
line = line.strip()
if not line:
continue
parts = line.split(separators[1], 1)
if len(parts) == 1:
if allow_empty_fields:
ret[parts[0].strip()] = None
else:
raise ConfigError(
f"Failed to parse dictionary field, {line}")
else:
ret[parts[0].strip()] = dict_type(parts[1].strip())
return ret
return self._get_option(getdict_wrapper, option, default,
deprecate=deprecate)
def getgpioout(self,
option: str,
default: Union[SentinelClass, _T] = SENTINEL,
initial_value: int = 0,
deprecate: bool = False
) -> Union[GpioOutputPin, _T]:
try:
gpio: GpioFactory = self.server.load_component(self, 'gpio')
except Exception:
raise ConfigError(
f"Section [{self.section}], option '{option}', "
"GPIO Component not available")
def getgpio_wrapper(sec: str, opt: str) -> GpioOutputPin:
val = self.config.get(sec, opt)
return gpio.setup_gpio_out(val, initial_value)
return self._get_option(getgpio_wrapper, option, default,
deprecate=deprecate)
def gettemplate(self,
option: str,
default: Union[SentinelClass, _T] = SENTINEL,
is_async: bool = False,
deprecate: bool = False
) -> Union[JinjaTemplate, _T]:
try:
template: TemplateFactory
template = self.server.load_component(self, 'template')
except Exception:
raise ConfigError(
f"Section [{self.section}], option '{option}', "
"Template Component not available")
def gettemplate_wrapper(sec: str, opt: str) -> JinjaTemplate:
val = self.config.get(sec, opt)
return template.create_template(val.strip(), is_async)
return self._get_option(gettemplate_wrapper, option, default,
deprecate=deprecate)
def load_template(self,
option: str,
default: Union[SentinelClass, str] = SENTINEL,
is_async: bool = False,
deprecate: bool = False
) -> JinjaTemplate:
val = self.gettemplate(option, default, is_async, deprecate)
if isinstance(val, str):
template: TemplateFactory
template = self.server.lookup_component('template')
return template.create_template(val.strip(), is_async)
return val
def getpath(self,
option: str,
default: Union[SentinelClass, _T] = SENTINEL,
deprecate: bool = False
) -> Union[pathlib.Path, _T]:
val = self.gettemplate(option, default, deprecate=deprecate)
if isinstance(val, JinjaTemplate):
ctx = {"data_path": self.server.get_app_args()["data_path"]}
strpath = val.render(ctx)
return pathlib.Path(strpath).expanduser().resolve()
return val
def read_supplemental_dict(self, obj: Dict[str, Any]) -> ConfigHelper:
if not obj:
raise ConfigError(f"Cannot ready Empty Dict")
source = DictSourceWrapper()
source.read_dict(obj)
sections = source.config.sections()
return ConfigHelper(self.server, source, sections[0], {})
def read_supplemental_config(self, file_name: str) -> ConfigHelper:
fpath = pathlib.Path(file_name).expanduser().resolve()
source = FileSourceWrapper(self.server)
source.read_file(fpath)
sections = source.config.sections()
return ConfigHelper(self.server, source, sections[0], {})
def write_config(self, file_obj: IO[str]) -> None:
self.config.write(file_obj)
def get_parsed_config(self) -> Dict[str, Dict[str, ConfigVal]]:
return dict(self.parsed)
def get_orig_config(self) -> Dict[str, Dict[str, str]]:
return self.source.as_dict()
def get_file_sections(self) -> Dict[str, List[str]]:
return self.source.get_file_sections()
def get_config_files(self) -> List[str]:
return [str(f) for f in self.source.get_files()]
def validate_config(self) -> None:
for sect in self.config.sections():
if sect not in self.parsed:
self.server.add_warning(
f"Unparsed config section [{sect}] detected. This "
"may be the result of a component that failed to "
"load. In the future this will result in a startup "
"error.")
continue
parsed_opts = self.parsed[sect]
for opt, val in self.config.items(sect):
if opt not in parsed_opts:
self.server.add_warning(
f"Unparsed config option '{opt}: {val}' detected in "
f"section [{sect}]. This may be an option no longer "
"available or could be the result of a module that "
"failed to load. In the future this will result "
"in a startup error.")
def create_backup(self):
cfg_path = self.server.get_app_args()["config_file"]
cfg = pathlib.Path(cfg_path).expanduser().resolve()
backup = cfg.parent.joinpath(f".{cfg.name}.bkp")
backup_fp: Optional[TextIOWrapper] = None
try:
if backup.exists():
cfg_mtime: int = 0
for cfg in self.source.get_files():
cfg_mtime = max(cfg_mtime, cfg.stat().st_mtime_ns)
backup_mtime = backup.stat().st_mtime_ns
if backup_mtime >= cfg_mtime:
# Backup already exists and is current
return
backup_fp = backup.open("w")
self.config.write(backup_fp)
logging.info(f"Backing up last working configuration to '{backup}'")
except Exception:
logging.exception("Failed to create a backup")
finally:
if backup_fp is not None:
backup_fp.close()
class ConfigSourceWrapper:
def __init__(self):
self.config = configparser.ConfigParser(interpolation=None)
def get_parser(self):
return self.config
def as_dict(self) -> Dict[str, Dict[str, str]]:
return {key: dict(val) for key, val in self.config.items()}
def write_to_string(self) -> str:
sio = StringIO()
self.config.write(sio)
val = sio.getvalue()
sio.close()
return val
def get_files(self) -> List[pathlib.Path]:
return []
def set_option(self, section: str, option: str, value: str) -> None:
self.config.set(section, option, value)
def remove_option(self, section: str, option: str) -> None:
self.config.remove_option(section, option)
def add_section(self, section: str) -> None:
self.config.add_section(section)
def remove_section(self, section: str) -> None:
self.config.remove_section(section)
def get_file_sections(self) -> Dict[str, List[str]]:
return {}
def find_config_file(
self, section: str, option: Optional[str] = None
) -> Optional[pathlib.Path]:
return None
class DictSourceWrapper(ConfigSourceWrapper):
def __init__(self):
super().__init__()
def read_dict(self, cfg: Dict[str, Any]) -> None:
try:
self.config.read_dict(cfg)
except Exception as e:
raise ConfigError("Error Reading config as dict") from e
class FileSourceWrapper(ConfigSourceWrapper):
section_r = re.compile(r"\s*\[([^]]+)\]")
def __init__(self, server: Server) -> None:
super().__init__()
self.server = server
self.files: List[pathlib.Path] = []
self.raw_config_data: List[str] = []
self.updates_pending: Set[int] = set()
self.file_section_map: Dict[str, List[int]] = {}
self.file_option_map: Dict[Tuple[str, str], List[int]] = {}
self.save_lock = threading.Lock()
self.backup: Dict[str, Any] = {}
def get_files(self) -> List[pathlib.Path]:
return self.files
def is_in_transaction(self) -> bool:
return (
len(self.updates_pending) > 0 or
self.save_lock.locked()
)
def backup_source(self) -> None:
self.backup = {
"raw_data": list(self.raw_config_data),
"section_map": copy.deepcopy(self.file_section_map),
"option_map": copy.deepcopy(self.file_option_map),
"config": self.write_to_string()
}
def _acquire_save_lock(self) -> None:
if not self.files:
raise ConfigError(
"Can only modify file backed configurations"
)
if not self.save_lock.acquire(blocking=False):
raise ConfigError("Configuration locked, cannot modify")
def set_option(self, section: str, option: str, value: str) -> None:
self._acquire_save_lock()
try:
value = value.strip()
try:
if (self.config.get(section, option).strip() == value):
return
except (configparser.NoSectionError, configparser.NoOptionError):
pass
file_idx: int = 0
has_sec = has_opt = False
if (section, option) in self.file_option_map:
file_idx = self.file_option_map[(section, option)][0]
has_sec = has_opt = True
elif section in self.file_section_map:
file_idx = self.file_section_map[section][0]
has_sec = True
buf = self.raw_config_data[file_idx].splitlines()
new_opt_list = [f"{option}: {value}"]
if "\n" in value:
vals = [f" {v}" for v in value.split("\n")]
new_opt_list = [f"{option}:"] + vals
sec_info = self._find_section_info(section, buf, raise_error=False)
if sec_info:
options: Dict[str, Any] = sec_info["options"]
indent: int = sec_info["indent"]
opt_start: int = sec_info["end"]
opt_end: int = sec_info["end"]
opt_info: Optional[Dict[str, Any]] = options.get(option)
if opt_info is not None:
indent = opt_info["indent"]
opt_start = opt_info["start"]
opt_end = opt_info["end"]
elif options:
# match indentation of last option in section
last_opt = list(options.values())[-1]
indent = last_opt["indent"]
if indent:
padding = " " * indent
new_opt_list = [f"{padding}{v}" for v in new_opt_list]
buf[opt_start:] = new_opt_list + buf[opt_end:]
else:
# Append new section to the end of the file
new_opt_list.insert(0, f"[{section}]")
if buf and buf[-1].strip() != "":
new_opt_list.insert(0, "")
buf.extend(new_opt_list)
buf.append("")
updated_cfg = "\n".join(buf)
# test changes to the configuration
test_parser = configparser.ConfigParser(interpolation=None)
try:
test_parser.read_string(updated_cfg)
if not test_parser.has_option(section, option):
raise ConfigError("Option not added")
except Exception as e:
raise ConfigError(
f"Failed to set option '{option}' in section "
f"[{section}], file: {self.files[file_idx]}"
) from e
# Update local configuration/tracking
self.raw_config_data[file_idx] = updated_cfg
self.updates_pending.add(file_idx)
if not has_sec:
self.file_section_map[section] = [file_idx]
if not has_opt:
self.file_option_map[(section, option)] = [file_idx]
if not self.config.has_section(section):
self.config.add_section(section)
self.config.set(section, option, value)
finally:
self.save_lock.release()
def remove_option(self, section: str, option: str) -> None:
self._acquire_save_lock()
try:
key = (section, option)
if key not in self.file_option_map:
return
pending: List[Tuple[int, str]] = []
file_indices = self.file_option_map[key]
for idx in file_indices:
buf = self.raw_config_data[idx].splitlines()
try:
sec_info = self._find_section_info(section, buf)
opt_info = sec_info["options"][option]
start = opt_info["start"]
end = opt_info["end"]
if (
end < len(buf) and
not buf[start-1].strip()
and not buf[end].strip()
):
end += 1
buf[start:] = buf[end:]
buf.append("")
updated_cfg = "\n".join(buf)
test_parser = configparser.ConfigParser(interpolation=None)
test_parser.read_string(updated_cfg)
if test_parser.has_option(section, option):
raise ConfigError("Option still exists")
pending.append((idx, updated_cfg))
except Exception as e:
raise ConfigError(
f"Failed to remove option '{option}' from section "
f"[{section}], file: {self.files[idx]}"
) from e
# Update configuration/tracking
for (idx, data) in pending:
self.updates_pending.add(idx)
self.raw_config_data[idx] = data
del self.file_option_map[key]
self.config.remove_option(section, option)
finally:
self.save_lock.release()
def add_section(self, section: str) -> None:
self._acquire_save_lock()
try:
if section in self.file_section_map:
return
# add section to end of primary file
buf = self.raw_config_data[0].splitlines()
if buf and buf[-1].strip() != "":
buf.append("")
buf.extend([f"[{section}]", ""])
updated_cfg = "\n".join(buf)
try:
test_parser = configparser.ConfigParser(interpolation=None)
test_parser.read_string(updated_cfg)
if not test_parser.has_section(section):
raise ConfigError("Section not added")
except Exception as e:
raise ConfigError(
f"Failed to add section [{section}], file: {self.files[0]}"
) from e
self.updates_pending.add(0)
self.file_section_map[section] = [0]
self.raw_config_data[0] = updated_cfg
self.config.add_section(section)
finally:
self.save_lock.release()
def remove_section(self, section: str) -> None:
self._acquire_save_lock()
try:
if section not in self.file_section_map:
return
pending: List[Tuple[int, str]] = []
file_indices = self.file_section_map[section]
for idx in file_indices:
buf = self.raw_config_data[idx].splitlines()
try:
sec_info = self._find_section_info(section, buf)
start = sec_info["start"]
end = sec_info["end"]
if (
end < len(buf) and
not buf[start-1].strip()
and not buf[end].strip()
):
end += 1
buf[start:] = buf[end:]
buf.append("")
updated_cfg = "\n".join(buf)
test_parser = configparser.ConfigParser(interpolation=None)
test_parser.read_string(updated_cfg)
if test_parser.has_section(section):
raise ConfigError("Section still exists")
pending.append((idx, updated_cfg))
except Exception as e:
raise ConfigError(
f"Failed to remove section [{section}], "
f"file: {self.files[0]}"
) from e
for (idx, data) in pending:
self.updates_pending.add(idx)
self.raw_config_data[idx] = data
del self.file_section_map[section]
self.config.remove_section(section)
finally:
self.save_lock.release()
def save(self) -> Awaitable[bool]:
eventloop = self.server.get_event_loop()
if self.server.is_running():
fut = eventloop.run_in_thread(self._do_save)
else:
fut = eventloop.create_future()
fut.set_result(self._do_save())
return fut
def _do_save(self) -> bool:
with self.save_lock:
self.backup.clear()
if not self.updates_pending:
return False
for idx in self.updates_pending:
fpath = self.files[idx]
fpath.write_text(
self.raw_config_data[idx], encoding="utf-8"
)
self.updates_pending.clear()
return True
def cancel(self):
self._acquire_save_lock()
try:
if not self.backup or not self.updates_pending:
self.backup.clear()
return
self.raw_config_data = self.backup["raw_data"]
self.file_option_map = self.backup["option_map"]
self.file_section_map = self.backup["section_map"]
self.config.clear()
self.config.read_string(self.backup["config"])
self.updates_pending.clear()
self.backup.clear()
finally:
self.save_lock.release()
def revert(self) -> Awaitable[bool]:
eventloop = self.server.get_event_loop()
if self.server.is_running():
fut = eventloop.run_in_thread(self._do_revert)
else:
fut = eventloop.create_future()
fut.set_result(self._do_revert())
return fut
def _do_revert(self) -> bool:
with self.save_lock:
if not self.updates_pending:
return False
self.backup.clear()
entry = self.files[0]
self.read_file(entry)
return True
def write_config(
self, dest_folder: Union[str, pathlib.Path]
) -> Awaitable[None]:
eventloop = self.server.get_event_loop()
if self.server.is_running():
fut = eventloop.run_in_thread(self._do_write, dest_folder)
else:
self._do_write(dest_folder)
fut = eventloop.create_future()
fut.set_result(None)
return fut
def _do_write(self, dest_folder: Union[str, pathlib.Path]) -> None:
with self.save_lock:
if isinstance(dest_folder, str):
dest_folder = pathlib.Path(dest_folder)
dest_folder = dest_folder.expanduser().resolve()
cfg_parent = self.files[0].parent
for i, path in enumerate(self.files):
try:
rel_path = path.relative_to(cfg_parent)
dest_file = dest_folder.joinpath(rel_path)
except ValueError:
dest_file = dest_folder.joinpath(
f"{path.parent.name}-{path.name}"
)
os.makedirs(str(dest_file.parent), exist_ok=True)
dest_file.write_text(self.raw_config_data[i])
def _find_section_info(
self, section: str, file_data: List[str], raise_error: bool = True
) -> Dict[str, Any]:
options: Dict[str, Dict[str, Any]] = {}
result: Dict[str, Any] = {
"indent": -1,
"start": -1,
"end": -1,
"options": options
}
last_option: str = ""
opt_indent = -1
for idx, line in enumerate(file_data):
if not line.strip() or line.lstrip()[0] in "#;":
# skip empty lines, whitespace, and comments
continue
line = line.expandtabs()
line_indent = len(line) - len(line.strip())
if opt_indent != -1 and line_indent > opt_indent:
if last_option:
options[last_option]["end"] = idx + 1
# Continuation of an option
if result["start"] != -1:
result["end"] = idx + 1
continue
sec_match = self.section_r.match(line)
if sec_match is not None:
opt_indent = -1
if result["start"] != -1:
break
cursec = sec_match.group(1)
if section == cursec:
result["indent"] = line_indent
result["start"] = idx
result["end"] = idx + 1
else:
# This is an option
opt_indent = line_indent
if result["start"] != -1:
result["end"] = idx + 1
last_option = re.split(r"[:=]", line, 1)[0].strip()
options[last_option] = {
"indent": line_indent,
"start": idx,
"end": idx + 1
}
if result["start"] != -1:
return result
if raise_error:
raise ConfigError(f"Unable to find section [{section}]")
return {}
def get_file_sections(self) -> Dict[str, List[str]]:
sections_by_file: Dict[str, List[str]] = {
str(fname): [] for fname in self.files
}
for section, idx_list in self.file_section_map.items():
for idx in idx_list:
fname = str(self.files[idx])
sections_by_file[fname].append(section)
return sections_by_file
def find_config_file(
self, section: str, option: Optional[str] = None
) -> Optional[pathlib.Path]:
idx: int = -1
if option is not None:
key = (section, option)
if key in self.file_option_map:
idx = self.file_option_map[key][0]
elif section in self.file_section_map:
idx = self.file_section_map[section][0]
if idx == -1:
return None
return self.files[idx]
def _write_buffer(self, buffer: List[str], fpath: pathlib.Path) -> None:
if not buffer:
return
self.config.read_string("\n".join(buffer), fpath.name)
def _parse_file(
self, file_path: pathlib.Path, visited: List[Tuple[int, int]]
) -> None:
buffer: List[str] = []
try:
stat = file_path.stat()
cur_stat = (stat.st_dev, stat.st_ino)
if cur_stat in visited:
raise ConfigError(
f"Recursive include directive detected, {file_path}"
)
visited.append(cur_stat)
self.files.append(file_path)
file_index = len(self.files) - 1
cfg_data = file_path.read_text(encoding="utf-8")
self.raw_config_data.append(cfg_data)
lines = cfg_data.splitlines()
last_section = ""
opt_indent = -1
for line in lines:
if not line.strip() or line.lstrip()[0] in "#;":
# ignore lines that contain only whitespace/comments
continue
line = line.expandtabs(tabsize=4)
# Remove inline comments
for prefix in "#;":
icmt = line.find(prefix)
if icmt > 0 and line[icmt-1] != "\\":
# inline comment, remove it
line = line[:icmt]
break
line_indent = len(line) - len(line.lstrip())
if opt_indent != -1 and line_indent > opt_indent:
# Multi-line value, append to buffer and resume parsing
buffer.append(line)
continue
sect_match = self.section_r.match(line)
if sect_match is not None:
# Section detected
opt_indent = -1
section = sect_match.group(1)
if section.startswith("include "):
inc_path = section[8:].strip()
if not inc_path:
raise ConfigError(
f"Invalid include directive: [{section}]"
)
if inc_path[0] == "/":
new_path = pathlib.Path(inc_path).resolve()
paths = sorted(new_path.parent.glob(new_path.name))
else:
paths = sorted(file_path.parent.glob(inc_path))
if not paths:
raise ConfigError(
"No files matching include directive "
f"[{section}]"
)
# Write out buffered data to the config before parsing
# included files
self._write_buffer(buffer, file_path)
buffer.clear()
for p in paths:
self._parse_file(p, visited)
# Don't add included sections to the configparser
continue
else:
last_section = section
if section not in self.file_section_map:
self.file_section_map[section] = []
elif file_index in self.file_section_map[section]:
raise ConfigError(
f"Duplicate section [{section}] in file "
f"{file_path}"
)
self.file_section_map[section].insert(0, file_index)
else:
# This line must specify an option
opt_indent = line_indent
option = re.split(r"[:=]", line, 1)[0].strip()
key = (last_section, option)
if key not in self.file_option_map:
self.file_option_map[key] = []
elif file_index in self.file_option_map[key]:
raise ConfigError(
f"Duplicate option '{option}' in section "
f"[{last_section}], file {file_path} "
)
self.file_option_map[key].insert(0, file_index)
buffer.append(line)
self._write_buffer(buffer, file_path)
except ConfigError:
raise
except Exception as e:
if not file_path.is_file():
raise ConfigError(
f"Configuration File Not Found: '{file_path}''") from e
if not os.access(file_path, os.R_OK):
raise ConfigError(
"Moonraker does not have Read/Write permission for "
f"config file at path '{file_path}'") from e
raise ConfigError(f"Error Reading Config: '{file_path}'") from e
def read_file(self, main_conf: pathlib.Path) -> None:
self.config.clear()
self.files.clear()
self.raw_config_data.clear()
self.updates_pending.clear()
self.file_section_map.clear()
self.file_option_map.clear()
self._parse_file(main_conf, [])
size = sum([len(rawcfg) for rawcfg in self.raw_config_data])
logging.info(
f"Configuration File '{main_conf}' parsed, total size: {size} B"
)
def get_configuration(
server: Server, app_args: Dict[str, Any]
) -> ConfigHelper:
start_path = pathlib.Path(app_args['config_file']).expanduser().resolve()
source = FileSourceWrapper(server)
source.read_file(start_path)
if not source.config.has_section('server'):
raise ConfigError("No section [server] in config")
return ConfigHelper(server, source, 'server', {})
def find_config_backup(cfg_path: str) -> Optional[str]:
cfg = pathlib.Path(cfg_path).expanduser().resolve()
backup = cfg.parent.joinpath(f".{cfg.name}.bkp")
if backup.is_file():
return str(backup)
return None