# Configuration Helper
#
# Copyright (C) 2020 Eric Callahan <arksine.code@gmail.com>
#
# This file may be distributed under the terms of the GNU GPLv3 license

from __future__ import annotations
import configparser
import os
import hashlib
import pathlib
import re
import threading
import copy
import logging
from io import StringIO
from utils import SentinelClass
from components.template import JinjaTemplate

# Annotation imports
from typing import (
    TYPE_CHECKING,
    Any,
    Awaitable,
    Callable,
    IO,
    Optional,
    Set,
    Tuple,
    TypeVar,
    Union,
    Dict,
    List,
    Type,
)
if TYPE_CHECKING:
    from moonraker import Server
    from components.gpio import GpioFactory, GpioOutputPin
    from components.template import TemplateFactory
    from io import TextIOWrapper
    _T = TypeVar("_T")
    ConfigVal = Union[None, int, float, bool, str, dict, list]

SENTINEL = SentinelClass.get_instance()
DOCS_URL = "https://moonraker.readthedocs.io/en/latest"

class ConfigError(Exception):
    pass


class ConfigHelper:
    error = ConfigError
    def __init__(self,
                 server: Server,
                 config_source: ConfigSourceWrapper,
                 section: str,
                 parsed: Dict[str, Dict[str, ConfigVal]],
                 fallback_section: Optional[str] = None
                 ) -> None:
        self.server = server
        self.source = config_source
        self.config = config_source.get_parser()
        self.section = section
        self.fallback_section: Optional[str] = fallback_section
        self.parsed = parsed
        if self.section not in self.parsed:
            self.parsed[self.section] = {}
        self.sections = self.config.sections
        self.has_section = self.config.has_section

    def get_server(self) -> Server:
        return self.server

    def get_source(self) -> ConfigSourceWrapper:
        return self.source

    def __getitem__(self, key: str) -> ConfigHelper:
        return self.getsection(key)

    def __contains__(self, key: str) -> bool:
        return key in self.config

    def has_option(self, option: str) -> bool:
        return self.config.has_option(self.section, option)

    def set_option(self, option: str, value: str) -> None:
        self.source.set_option(self.section, option, value)

    def get_name(self) -> str:
        return self.section

    def get_file(self) -> Optional[pathlib.Path]:
        return self.source.find_config_file(self.section)

    def get_options(self) -> Dict[str, str]:
        if self.section not in self.config:
            return {}
        return dict(self.config[self.section])

    def get_hash(self) -> hashlib._Hash:
        hash = hashlib.sha256()
        section = self.section
        if self.section not in self.config:
            return hash
        for option, val in self.config[section].items():
            hash.update(option.encode())
            hash.update(val.encode())
        return hash

    def get_prefix_sections(self, prefix: str) -> List[str]:
        return [s for s in self.sections() if s.startswith(prefix)]

    def getsection(
        self, section: str, fallback: Optional[str] = None
    ) -> ConfigHelper:
        return ConfigHelper(
            self.server, self.source, section, self.parsed, fallback
        )

    def _get_option(self,
                    func: Callable[..., Any],
                    option: str,
                    default: Union[SentinelClass, _T],
                    above: Optional[Union[int, float]] = None,
                    below: Optional[Union[int, float]] = None,
                    minval: Optional[Union[int, float]] = None,
                    maxval: Optional[Union[int, float]] = None,
                    deprecate: bool = False
                    ) -> _T:
        section = self.section
        warn_fallback = False
        if (
            self.section not in self.config and
            self.fallback_section is not None
        ):
            section = self.fallback_section
            warn_fallback = True
        try:
            val = func(section, option)
        except (configparser.NoOptionError, configparser.NoSectionError) as e:
            if isinstance(default, SentinelClass):
                raise ConfigError(str(e)) from None
            val = default
            section = self.section
        except Exception:
            raise ConfigError(
                f"Error parsing option ({option}) from "
                f"section [{self.section}]")
        else:
            if deprecate:
                self.server.add_warning(
                    f"[{self.section}]: Option '{option}' is "
                    "deprecated, see the configuration documention "
                    f"at {DOCS_URL}/configuration/")
            if warn_fallback:
                help = f"{DOCS_URL}/configuration/#option-moved-deprecations"
                self.server.add_warning(
                    f"[{section}]: Option '{option}' has been moved "
                    f"to section [{self.section}].  Please correct your "
                    f"configuration, see {help} for detailed documentation."
                )
            self._check_option(option, val, above, below, minval, maxval)
        if option not in self.parsed[section]:
            if (
                val is None or
                isinstance(val, (int, float, bool, str, dict, list))
            ):
                self.parsed[section][option] = val
            else:
                # If the item cannot be encoded to json serialize to a string
                self.parsed[section][option] = str(val)
        return val

    def _check_option(self,
                      option: str,
                      value: Union[int, float],
                      above: Optional[Union[int, float]],
                      below: Optional[Union[int, float]],
                      minval: Optional[Union[int, float]],
                      maxval: Optional[Union[int, float]]
                      ) -> None:
        if above is not None and value <= above:
            raise self.error(
                f"Config Error: Section [{self.section}], Option "
                f"'{option}: {value}': value is not above {above}")
        if below is not None and value >= below:
            raise self.error(
                f"Config Error: Section [{self.section}], Option "
                f"'{option}: {value}': value is not below {below}")
        if minval is not None and value < minval:
            raise self.error(
                f"Config Error: Section [{self.section}], Option "
                f"'{option}: {value}': value is below minimum value {minval}")
        if maxval is not None and value > maxval:
            raise self.error(
                f"Config Error: Section [{self.section}], Option "
                f"'{option}: {value}': value is above maximum value {minval}")

    def get(self,
            option: str,
            default: Union[SentinelClass, _T] = SENTINEL,
            deprecate: bool = False
            ) -> Union[str, _T]:
        return self._get_option(
            self.config.get, option, default,
            deprecate=deprecate)

    def getint(self,
               option: str,
               default: Union[SentinelClass, _T] = SENTINEL,
               above: Optional[int] = None,
               below: Optional[int] = None,
               minval: Optional[int] = None,
               maxval: Optional[int] = None,
               deprecate: bool = False
               ) -> Union[int, _T]:
        return self._get_option(
            self.config.getint, option, default,
            above, below, minval, maxval, deprecate)

    def getboolean(self,
                   option: str,
                   default: Union[SentinelClass, _T] = SENTINEL,
                   deprecate: bool = False
                   ) -> Union[bool, _T]:
        return self._get_option(
            self.config.getboolean, option, default,
            deprecate=deprecate)

    def getfloat(self,
                 option: str,
                 default: Union[SentinelClass, _T] = SENTINEL,
                 above: Optional[float] = None,
                 below: Optional[float] = None,
                 minval: Optional[float] = None,
                 maxval: Optional[float] = None,
                 deprecate: bool = False
                 ) -> Union[float, _T]:
        return self._get_option(
            self.config.getfloat, option, default,
            above, below, minval, maxval, deprecate)

    def getlists(self,
                 option: str,
                 default: Union[SentinelClass, _T] = SENTINEL,
                 list_type: Type = str,
                 separators: Tuple[Optional[str], ...] = ('\n',),
                 count: Optional[Tuple[Optional[int], ...]] = None,
                 deprecate: bool = False
                 ) -> Union[List[Any], _T]:
        if count is not None and len(count) != len(separators):
            raise ConfigError(
                f"Option '{option}' in section "
                f"[{self.section}]: length of 'count' argument must ",
                "match length of 'separators' argument")
        else:
            count = tuple(None for _ in range(len(separators)))

        def list_parser(value: str,
                        ltype: Type,
                        seps: Tuple[Optional[str], ...],
                        expected_cnt: Tuple[Optional[int], ...]
                        ) -> List[Any]:
            sep = seps[0]
            seps = seps[1:]
            cnt = expected_cnt[0]
            expected_cnt = expected_cnt[1:]
            ret: List[Any] = []
            if seps:
                sub_lists = [val.strip() for val in value.split(sep)
                             if val.strip()]
                for sub_list in sub_lists:
                    ret.append(list_parser(sub_list, ltype, seps,
                                           expected_cnt))
            else:
                ret = [ltype(val.strip()) for val in value.split(sep)
                       if val.strip()]
            if cnt is not None and len(ret) != cnt:
                raise ConfigError(
                    f"List length mismatch, expected {cnt}, "
                    f"parsed {len(ret)}")
            return ret

        def getlist_wrapper(sec: str, opt: str) -> List[Any]:
            val = self.config.get(sec, opt)
            assert count is not None
            return list_parser(val, list_type, separators, count)

        return self._get_option(getlist_wrapper, option, default,
                                deprecate=deprecate)


    def getlist(self,
                option: str,
                default: Union[SentinelClass, _T] = SENTINEL,
                separator: Optional[str] = '\n',
                count: Optional[int] = None,
                deprecate: bool = False
                ) -> Union[List[str], _T]:
        return self.getlists(option, default, str, (separator,), (count,),
                             deprecate=deprecate)

    def getintlist(self,
                   option: str,
                   default: Union[SentinelClass, _T] = SENTINEL,
                   separator: Optional[str] = '\n',
                   count: Optional[int] = None,
                   deprecate: bool = False
                   ) -> Union[List[int], _T]:
        return self.getlists(option, default, int, (separator,), (count,),
                             deprecate=deprecate)

    def getfloatlist(self,
                     option: str,
                     default: Union[SentinelClass, _T] = SENTINEL,
                     separator: Optional[str] = '\n',
                     count: Optional[int] = None,
                     deprecate: bool = False
                     ) -> Union[List[float], _T]:
        return self.getlists(option, default, float, (separator,), (count,),
                             deprecate=deprecate)

    def getdict(self,
                option: str,
                default: Union[SentinelClass, _T] = SENTINEL,
                separators: Tuple[Optional[str], Optional[str]] = ('\n', '='),
                dict_type: Type = str,
                allow_empty_fields: bool = False,
                deprecate: bool = False
                ) -> Union[Dict[str, Any], _T]:
        if len(separators) != 2:
            raise ConfigError(
                "The `separators` argument of getdict() must be a Tuple"
                "of length of 2")

        def getdict_wrapper(sec: str, opt: str) -> Dict[str, Any]:
            val = self.config.get(sec, opt)
            ret: Dict[str, Any] = {}
            for line in val.split(separators[0]):
                line = line.strip()
                if not line:
                    continue
                parts = line.split(separators[1], 1)
                if len(parts) == 1:
                    if allow_empty_fields:
                        ret[parts[0].strip()] = None
                    else:
                        raise ConfigError(
                            f"Failed to parse dictionary field, {line}")
                else:
                    ret[parts[0].strip()] = dict_type(parts[1].strip())
            return ret

        return self._get_option(getdict_wrapper, option, default,
                                deprecate=deprecate)

    def getgpioout(self,
                   option: str,
                   default: Union[SentinelClass, _T] = SENTINEL,
                   initial_value: int = 0,
                   deprecate: bool = False
                   ) -> Union[GpioOutputPin, _T]:
        try:
            gpio: GpioFactory = self.server.load_component(self, 'gpio')
        except Exception:
            raise ConfigError(
                f"Section [{self.section}], option '{option}', "
                "GPIO Component not available")

        def getgpio_wrapper(sec: str, opt: str) -> GpioOutputPin:
            val = self.config.get(sec, opt)
            return gpio.setup_gpio_out(val, initial_value)
        return self._get_option(getgpio_wrapper, option, default,
                                deprecate=deprecate)

    def gettemplate(self,
                    option: str,
                    default: Union[SentinelClass, _T] = SENTINEL,
                    is_async: bool = False,
                    deprecate: bool = False
                    ) -> Union[JinjaTemplate, _T]:
        try:
            template: TemplateFactory
            template = self.server.load_component(self, 'template')
        except Exception:
            raise ConfigError(
                f"Section [{self.section}], option '{option}', "
                "Template Component not available")

        def gettemplate_wrapper(sec: str, opt: str) -> JinjaTemplate:
            val = self.config.get(sec, opt)
            return template.create_template(val.strip(), is_async)

        return self._get_option(gettemplate_wrapper, option, default,
                                deprecate=deprecate)

    def load_template(self,
                      option: str,
                      default: Union[SentinelClass, str] = SENTINEL,
                      is_async: bool = False,
                      deprecate: bool = False
                      ) -> JinjaTemplate:
        val = self.gettemplate(option, default, is_async, deprecate)
        if isinstance(val, str):
            template: TemplateFactory
            template = self.server.lookup_component('template')
            return template.create_template(val.strip(), is_async)
        return val

    def getpath(self,
                option: str,
                default: Union[SentinelClass, _T] = SENTINEL,
                deprecate: bool = False
                ) -> Union[pathlib.Path, _T]:
        val = self.gettemplate(option, default, deprecate=deprecate)
        if isinstance(val, JinjaTemplate):
            ctx = {"data_path": self.server.get_app_args()["data_path"]}
            strpath = val.render(ctx)
            return pathlib.Path(strpath).expanduser().resolve()
        return val

    def read_supplemental_dict(self, obj: Dict[str, Any]) -> ConfigHelper:
        if not obj:
            raise ConfigError(f"Cannot ready Empty Dict")
        source = DictSourceWrapper()
        source.read_dict(obj)
        sections = source.config.sections()
        return ConfigHelper(self.server, source, sections[0], {})

    def read_supplemental_config(self, file_name: str) -> ConfigHelper:
        fpath = pathlib.Path(file_name).expanduser().resolve()
        source = FileSourceWrapper(self.server)
        source.read_file(fpath)
        sections = source.config.sections()
        return ConfigHelper(self.server, source, sections[0], {})

    def write_config(self, file_obj: IO[str]) -> None:
        self.config.write(file_obj)

    def get_parsed_config(self) -> Dict[str, Dict[str, ConfigVal]]:
        return dict(self.parsed)

    def get_orig_config(self) -> Dict[str, Dict[str, str]]:
        return self.source.as_dict()

    def get_file_sections(self) -> Dict[str, List[str]]:
        return self.source.get_file_sections()

    def get_config_files(self) -> List[str]:
        return [str(f) for f in self.source.get_files()]

    def validate_config(self) -> None:
        for sect in self.config.sections():
            if sect not in self.parsed:
                self.server.add_warning(
                    f"Unparsed config section [{sect}] detected.  This "
                    "may be the result of a component that failed to "
                    "load.  In the future this will result in a startup "
                    "error.")
                continue
            parsed_opts = self.parsed[sect]
            for opt, val in self.config.items(sect):
                if opt not in parsed_opts:
                    self.server.add_warning(
                        f"Unparsed config option '{opt}: {val}' detected in "
                        f"section [{sect}].  This may be an option no longer "
                        "available or could be the result of a module that "
                        "failed to load.  In the future this will result "
                        "in a startup error.")

    def create_backup(self):
        cfg_path = self.server.get_app_args()["config_file"]
        cfg = pathlib.Path(cfg_path).expanduser().resolve()
        backup = cfg.parent.joinpath(f".{cfg.name}.bkp")
        backup_fp: Optional[TextIOWrapper] = None
        try:
            if backup.exists():
                cfg_mtime: int = 0
                for cfg in self.source.get_files():
                    cfg_mtime = max(cfg_mtime, cfg.stat().st_mtime_ns)
                backup_mtime = backup.stat().st_mtime_ns
                if backup_mtime >= cfg_mtime:
                    # Backup already exists and is current
                    return
            backup_fp = backup.open("w")
            self.config.write(backup_fp)
            logging.info(f"Backing up last working configuration to '{backup}'")
        except Exception:
            logging.exception("Failed to create a backup")
        finally:
            if backup_fp is not None:
                backup_fp.close()

class ConfigSourceWrapper:
    def __init__(self):
        self.config = configparser.ConfigParser(interpolation=None)

    def get_parser(self):
        return self.config

    def as_dict(self) -> Dict[str, Dict[str, str]]:
        return {key: dict(val) for key, val in self.config.items()}

    def write_to_string(self) -> str:
        sio = StringIO()
        self.config.write(sio)
        val = sio.getvalue()
        sio.close()
        return val

    def get_files(self) -> List[pathlib.Path]:
        return []

    def set_option(self, section: str, option: str, value: str) -> None:
        self.config.set(section, option, value)

    def remove_option(self, section: str, option: str) -> None:
        self.config.remove_option(section, option)

    def add_section(self, section: str) -> None:
        self.config.add_section(section)

    def remove_section(self, section: str) -> None:
        self.config.remove_section(section)

    def get_file_sections(self) -> Dict[str, List[str]]:
        return {}

    def find_config_file(
        self, section: str, option: Optional[str] = None
    ) -> Optional[pathlib.Path]:
        return None

class DictSourceWrapper(ConfigSourceWrapper):
    def __init__(self):
        super().__init__()

    def read_dict(self, cfg: Dict[str, Any]) -> None:
        try:
            self.config.read_dict(cfg)
        except Exception as e:
            raise ConfigError("Error Reading config as dict") from e

class FileSourceWrapper(ConfigSourceWrapper):
    section_r = re.compile(r"\s*\[([^]]+)\]")

    def __init__(self, server: Server) -> None:
        super().__init__()
        self.server = server
        self.files: List[pathlib.Path] = []
        self.raw_config_data: List[str] = []
        self.updates_pending: Set[int] = set()
        self.file_section_map: Dict[str, List[int]] = {}
        self.file_option_map: Dict[Tuple[str, str], List[int]] = {}
        self.save_lock = threading.Lock()
        self.backup: Dict[str, Any] = {}

    def get_files(self) -> List[pathlib.Path]:
        return self.files

    def is_in_transaction(self) -> bool:
        return (
            len(self.updates_pending) > 0 or
            self.save_lock.locked()
        )

    def backup_source(self) -> None:
        self.backup = {
            "raw_data": list(self.raw_config_data),
            "section_map": copy.deepcopy(self.file_section_map),
            "option_map": copy.deepcopy(self.file_option_map),
            "config": self.write_to_string()
        }

    def _acquire_save_lock(self) -> None:
        if not self.files:
            raise ConfigError(
                "Can only modify file backed configurations"
            )
        if not self.save_lock.acquire(blocking=False):
            raise ConfigError("Configuration locked, cannot modify")

    def set_option(self, section: str, option: str, value: str) -> None:
        self._acquire_save_lock()
        try:
            value = value.strip()
            try:
                if (self.config.get(section, option).strip() == value):
                    return
            except (configparser.NoSectionError, configparser.NoOptionError):
                pass
            file_idx: int = 0
            has_sec = has_opt = False
            if (section, option) in self.file_option_map:
                file_idx = self.file_option_map[(section, option)][0]
                has_sec = has_opt = True
            elif section in self.file_section_map:
                file_idx = self.file_section_map[section][0]
                has_sec = True
            buf = self.raw_config_data[file_idx].splitlines()
            new_opt_list = [f"{option}: {value}"]
            if "\n" in value:
                vals = [f"  {v}" for v in value.split("\n")]
                new_opt_list = [f"{option}:"] + vals
            sec_info = self._find_section_info(section, buf, raise_error=False)
            if sec_info:
                options: Dict[str, Any] = sec_info["options"]
                indent: int = sec_info["indent"]
                opt_start: int = sec_info["end"]
                opt_end: int = sec_info["end"]
                opt_info: Optional[Dict[str, Any]] = options.get(option)
                if opt_info is not None:
                    indent = opt_info["indent"]
                    opt_start = opt_info["start"]
                    opt_end = opt_info["end"]
                elif options:
                    # match indentation of last option in section
                    last_opt = list(options.values())[-1]
                    indent = last_opt["indent"]
                if indent:
                    padding = " " * indent
                    new_opt_list = [f"{padding}{v}" for v in new_opt_list]
                buf[opt_start:] = new_opt_list + buf[opt_end:]
            else:
                # Append new section to the end of the file
                new_opt_list.insert(0, f"[{section}]")
                if buf and buf[-1].strip() != "":
                    new_opt_list.insert(0, "")
                buf.extend(new_opt_list)
            buf.append("")
            updated_cfg = "\n".join(buf)
            # test changes to the configuration
            test_parser = configparser.ConfigParser(interpolation=None)
            try:
                test_parser.read_string(updated_cfg)
                if not test_parser.has_option(section, option):
                    raise ConfigError("Option not added")
            except Exception as e:
                raise ConfigError(
                    f"Failed to set option '{option}' in section "
                    f"[{section}], file: {self.files[file_idx]}"
                ) from e
            # Update local configuration/tracking
            self.raw_config_data[file_idx] = updated_cfg
            self.updates_pending.add(file_idx)
            if not has_sec:
                self.file_section_map[section] = [file_idx]
            if not has_opt:
                self.file_option_map[(section, option)] = [file_idx]
            if not self.config.has_section(section):
                self.config.add_section(section)
            self.config.set(section, option, value)
        finally:
            self.save_lock.release()

    def remove_option(self, section: str, option: str) -> None:
        self._acquire_save_lock()
        try:
            key = (section, option)
            if key not in self.file_option_map:
                return
            pending: List[Tuple[int, str]] = []
            file_indices = self.file_option_map[key]
            for idx in file_indices:
                buf = self.raw_config_data[idx].splitlines()
                try:
                    sec_info = self._find_section_info(section, buf)
                    opt_info = sec_info["options"][option]
                    start = opt_info["start"]
                    end = opt_info["end"]
                    if (
                        end < len(buf) and
                        not buf[start-1].strip()
                        and not buf[end].strip()
                    ):
                        end += 1
                    buf[start:] = buf[end:]
                    buf.append("")
                    updated_cfg = "\n".join(buf)
                    test_parser = configparser.ConfigParser(interpolation=None)
                    test_parser.read_string(updated_cfg)
                    if test_parser.has_option(section, option):
                        raise ConfigError("Option still exists")
                    pending.append((idx, updated_cfg))
                except Exception as e:
                    raise ConfigError(
                        f"Failed to remove option '{option}' from section "
                        f"[{section}], file: {self.files[idx]}"
                    ) from e
            # Update configuration/tracking
            for (idx, data) in pending:
                self.updates_pending.add(idx)
                self.raw_config_data[idx] = data
            del self.file_option_map[key]
            self.config.remove_option(section, option)
        finally:
            self.save_lock.release()

    def add_section(self, section: str) -> None:
        self._acquire_save_lock()
        try:
            if section in self.file_section_map:
                return
            # add section to end of primary file
            buf = self.raw_config_data[0].splitlines()
            if buf and buf[-1].strip() != "":
                buf.append("")
            buf.extend([f"[{section}]", ""])
            updated_cfg = "\n".join(buf)
            try:
                test_parser = configparser.ConfigParser(interpolation=None)
                test_parser.read_string(updated_cfg)
                if not test_parser.has_section(section):
                    raise ConfigError("Section not added")
            except Exception as e:
                raise ConfigError(
                    f"Failed to add section [{section}], file: {self.files[0]}"
                ) from e
            self.updates_pending.add(0)
            self.file_section_map[section] = [0]
            self.raw_config_data[0] = updated_cfg
            self.config.add_section(section)
        finally:
            self.save_lock.release()

    def remove_section(self, section: str) -> None:
        self._acquire_save_lock()
        try:
            if section not in self.file_section_map:
                return
            pending: List[Tuple[int, str]] = []
            file_indices = self.file_section_map[section]
            for idx in file_indices:
                buf = self.raw_config_data[idx].splitlines()
                try:
                    sec_info = self._find_section_info(section, buf)
                    start = sec_info["start"]
                    end = sec_info["end"]
                    if (
                        end < len(buf) and
                        not buf[start-1].strip()
                        and not buf[end].strip()
                    ):
                        end += 1
                    buf[start:] = buf[end:]
                    buf.append("")
                    updated_cfg = "\n".join(buf)
                    test_parser = configparser.ConfigParser(interpolation=None)
                    test_parser.read_string(updated_cfg)
                    if test_parser.has_section(section):
                        raise ConfigError("Section still exists")
                    pending.append((idx, updated_cfg))
                except Exception as e:
                    raise ConfigError(
                        f"Failed to remove section [{section}], "
                        f"file: {self.files[0]}"
                    ) from e
            for (idx, data) in pending:
                self.updates_pending.add(idx)
                self.raw_config_data[idx] = data
            del self.file_section_map[section]
            self.config.remove_section(section)
        finally:
            self.save_lock.release()

    def save(self) -> Awaitable[bool]:
        eventloop = self.server.get_event_loop()
        if self.server.is_running():
            fut = eventloop.run_in_thread(self._do_save)
        else:
            fut = eventloop.create_future()
            fut.set_result(self._do_save())
        return fut

    def _do_save(self) -> bool:
        with self.save_lock:
            self.backup.clear()
            if not self.updates_pending:
                return False
            for idx in self.updates_pending:
                fpath = self.files[idx]
                fpath.write_text(
                    self.raw_config_data[idx], encoding="utf-8"
                )
            self.updates_pending.clear()
            return True

    def cancel(self):
        self._acquire_save_lock()
        try:
            if not self.backup or not self.updates_pending:
                self.backup.clear()
                return
            self.raw_config_data = self.backup["raw_data"]
            self.file_option_map = self.backup["option_map"]
            self.file_section_map = self.backup["section_map"]
            self.config.clear()
            self.config.read_string(self.backup["config"])
            self.updates_pending.clear()
            self.backup.clear()
        finally:
            self.save_lock.release()

    def revert(self) -> Awaitable[bool]:
        eventloop = self.server.get_event_loop()
        if self.server.is_running():
            fut = eventloop.run_in_thread(self._do_revert)
        else:
            fut = eventloop.create_future()
            fut.set_result(self._do_revert())
        return fut

    def _do_revert(self) -> bool:
        with self.save_lock:
            if not self.updates_pending:
                return False
            self.backup.clear()
            entry = self.files[0]
            self.read_file(entry)
            return True

    def write_config(
        self, dest_folder: Union[str, pathlib.Path]
    ) -> Awaitable[None]:
        eventloop = self.server.get_event_loop()
        if self.server.is_running():
            fut = eventloop.run_in_thread(self._do_write, dest_folder)
        else:
            self._do_write(dest_folder)
            fut = eventloop.create_future()
            fut.set_result(None)
        return fut

    def _do_write(self, dest_folder: Union[str, pathlib.Path]) -> None:
        with self.save_lock:
            if isinstance(dest_folder, str):
                dest_folder = pathlib.Path(dest_folder)
            dest_folder = dest_folder.expanduser().resolve()
            cfg_parent = self.files[0].parent
            for i, path in enumerate(self.files):
                try:
                    rel_path = path.relative_to(cfg_parent)
                    dest_file = dest_folder.joinpath(rel_path)
                except ValueError:
                    dest_file = dest_folder.joinpath(
                        f"{path.parent.name}-{path.name}"
                    )
                os.makedirs(str(dest_file.parent), exist_ok=True)
                dest_file.write_text(self.raw_config_data[i])

    def _find_section_info(
        self, section: str, file_data: List[str], raise_error: bool = True
    ) -> Dict[str, Any]:
        options: Dict[str, Dict[str, Any]] = {}
        result: Dict[str, Any] = {
            "indent": -1,
            "start": -1,
            "end": -1,
            "options": options
        }
        last_option: str = ""
        opt_indent = -1
        for idx, line in enumerate(file_data):
            if not line.strip() or line.lstrip()[0] in "#;":
                # skip empty lines, whitespace, and comments
                continue
            line = line.expandtabs()
            line_indent = len(line) - len(line.strip())
            if opt_indent != -1 and line_indent > opt_indent:
                if last_option:
                    options[last_option]["end"] = idx + 1
                # Continuation of an option
                if result["start"] != -1:
                    result["end"] = idx + 1
                continue
            sec_match = self.section_r.match(line)
            if sec_match is not None:
                opt_indent = -1
                if result["start"] != -1:
                    break
                cursec = sec_match.group(1)
                if section == cursec:
                    result["indent"] = line_indent
                    result["start"] = idx
                    result["end"] = idx + 1
            else:
                # This is an option
                opt_indent = line_indent
                if result["start"] != -1:
                    result["end"] = idx + 1
                    last_option = re.split(r"[:=]", line, 1)[0].strip()
                    options[last_option] = {
                        "indent": line_indent,
                        "start": idx,
                        "end": idx + 1
                    }
        if result["start"] != -1:
            return result
        if raise_error:
            raise ConfigError(f"Unable to find section [{section}]")
        return {}

    def get_file_sections(self) -> Dict[str, List[str]]:
        sections_by_file: Dict[str, List[str]] = {
            str(fname): [] for fname in self.files
        }
        for section, idx_list in self.file_section_map.items():
            for idx in idx_list:
                fname = str(self.files[idx])
                sections_by_file[fname].append(section)
        return sections_by_file

    def find_config_file(
        self, section: str, option: Optional[str] = None
    ) -> Optional[pathlib.Path]:
        idx: int = -1
        if option is not None:
            key = (section, option)
            if key in self.file_option_map:
                idx = self.file_option_map[key][0]
        elif section in self.file_section_map:
            idx = self.file_section_map[section][0]
        if idx == -1:
            return None
        return self.files[idx]

    def _write_buffer(self, buffer: List[str], fpath: pathlib.Path) -> None:
        if not buffer:
            return
        self.config.read_string("\n".join(buffer), fpath.name)

    def _parse_file(
        self, file_path: pathlib.Path, visited: List[Tuple[int, int]]
    ) -> None:
        buffer: List[str] = []
        try:
            stat = file_path.stat()
            cur_stat = (stat.st_dev, stat.st_ino)
            if cur_stat in visited:
                raise ConfigError(
                    f"Recursive include directive detected, {file_path}"
                )
            visited.append(cur_stat)
            self.files.append(file_path)
            file_index = len(self.files) - 1
            cfg_data = file_path.read_text(encoding="utf-8")
            self.raw_config_data.append(cfg_data)
            lines = cfg_data.splitlines()
            last_section = ""
            opt_indent = -1
            for line in lines:
                if not line.strip() or line.lstrip()[0] in "#;":
                    # ignore lines that contain only whitespace/comments
                    continue
                line = line.expandtabs(tabsize=4)
                # Remove inline comments
                for prefix in "#;":
                    icmt = line.find(prefix)
                    if icmt > 0 and line[icmt-1] != "\\":
                        # inline comment, remove it
                        line = line[:icmt]
                        break
                line_indent = len(line) - len(line.lstrip())
                if opt_indent != -1 and line_indent > opt_indent:
                    # Multi-line value, append to buffer and resume parsing
                    buffer.append(line)
                    continue
                sect_match = self.section_r.match(line)
                if sect_match is not None:
                    # Section detected
                    opt_indent = -1
                    section = sect_match.group(1)
                    if section.startswith("include "):
                        inc_path = section[8:].strip()
                        if not inc_path:
                            raise ConfigError(
                                f"Invalid include directive: [{section}]"
                            )
                        if inc_path[0] == "/":
                            new_path = pathlib.Path(inc_path).resolve()
                            paths = sorted(new_path.parent.glob(new_path.name))
                        else:
                            paths = sorted(file_path.parent.glob(inc_path))
                        if not paths:
                            raise ConfigError(
                                "No files matching include directive "
                                f"[{section}]"
                            )
                        # Write out buffered data to the config before parsing
                        # included files
                        self._write_buffer(buffer, file_path)
                        buffer.clear()
                        for p in paths:
                            self._parse_file(p, visited)
                        # Don't add included sections to the configparser
                        continue
                    else:
                        last_section = section
                        if section not in self.file_section_map:
                            self.file_section_map[section] = []
                        elif file_index in self.file_section_map[section]:
                            raise ConfigError(
                                f"Duplicate section [{section}] in file "
                                f"{file_path}"
                            )
                        self.file_section_map[section].insert(0, file_index)
                else:
                    # This line must specify an option
                    opt_indent = line_indent
                    option = re.split(r"[:=]", line, 1)[0].strip()
                    key = (last_section, option)
                    if key not in self.file_option_map:
                        self.file_option_map[key] = []
                    elif file_index in self.file_option_map[key]:
                        raise ConfigError(
                            f"Duplicate option '{option}' in section "
                            f"[{last_section}], file {file_path} "
                        )
                    self.file_option_map[key].insert(0, file_index)
                buffer.append(line)
            self._write_buffer(buffer, file_path)
        except ConfigError:
            raise
        except Exception as e:
            if not file_path.is_file():
                raise ConfigError(
                    f"Configuration File Not Found: '{file_path}''") from e
            if not os.access(file_path, os.R_OK):
                raise ConfigError(
                    "Moonraker does not have Read/Write permission for "
                    f"config file at path '{file_path}'") from e
            raise ConfigError(f"Error Reading Config: '{file_path}'") from e

    def read_file(self, main_conf: pathlib.Path) -> None:
        self.config.clear()
        self.files.clear()
        self.raw_config_data.clear()
        self.updates_pending.clear()
        self.file_section_map.clear()
        self.file_option_map.clear()
        self._parse_file(main_conf, [])
        size = sum([len(rawcfg) for rawcfg in self.raw_config_data])
        logging.info(
            f"Configuration File '{main_conf}' parsed, total size: {size} B"
        )


def get_configuration(
    server: Server, app_args: Dict[str, Any]
) -> ConfigHelper:
    start_path = pathlib.Path(app_args['config_file']).expanduser().resolve()
    source = FileSourceWrapper(server)
    source.read_file(start_path)
    if not source.config.has_section('server'):
        raise ConfigError("No section [server] in config")
    return ConfigHelper(server, source, 'server', {})

def find_config_backup(cfg_path: str) -> Optional[str]:
    cfg = pathlib.Path(cfg_path).expanduser().resolve()
    backup = cfg.parent.joinpath(f".{cfg.name}.bkp")
    if backup.is_file():
        return str(backup)
    return None