install: update package resolution method

Embed the sysdeps_parser module in the install script
for package dependency resolution.  This method is
more robust than the bash implementation and adds
support for the new requirement specifiers.

Signed-off-by:  Eric Callahan <arksine.code@gmail.com>
This commit is contained in:
Eric Callahan 2025-02-21 10:30:37 -05:00
parent 31cb1fc94c
commit f2d53fe386
3 changed files with 251 additions and 209 deletions

View File

@ -1,7 +1,6 @@
#!/bin/bash
# This script installs Moonraker on Debian based Linux distros.
SUPPORTED_DISTROS="debian ubuntu"
PYTHONDIR="${MOONRAKER_VENV:-${HOME}/moonraker-env}"
SYSTEMDDIR="/etc/systemd/system"
REBUILD_ENV="${MOONRAKER_REBUILD_ENV:-n}"
@ -38,89 +37,196 @@ if [ -f "${SRCDIR}/moonraker/__init__.py" ]; then
IS_SRC_DIST="y"
fi
compare_version () {
if [ -z "$DISTRO_VERSION" ]; then
return 1
fi
compare_script=$(cat << EOF
import re
def convert_ver(ver):
ver = ver.strip()
ver_match = re.match(r"\d+(\.\d+)*((?:-|\.).+)?", ver)
if ver_match is None:
return (ver,)
return tuple([int(p) if p.isdigit() else p for p in re.split(r"\.|-", ver)])
dist_version = convert_ver("$DISTRO_VERSION")
req_version = convert_ver("$2")
exit(int(not dist_version $1 req_version))
EOF
)
python3 -c "$compare_script"
}
# Detect Current Distribution
detect_distribution() {
distro_list=""
orig_id=""
if [ -f "/etc/os-release" ]; then
DISTRO_VERSION="$( grep -Po "^VERSION_ID=\"?\K[^\"]+" /etc/os-release || true )"
orig_id="$( grep -Po "^ID=\K.+" /etc/os-release || true )"
distro_list=$orig_id
like_str="$( grep -Po "^ID_LIKE=\K.+" /etc/os-release || true )"
if [ ! -z "${like_str}" ]; then
distro_list="${distro_list} ${like_str}"
fi
if [ ! -z "${distro_list}" ]; then
echo "Found Linux distribution IDs: ${distro_list}"
else
echo "Unable to detect Linux Distribution."
fi
source "/etc/os-release"
DISTRO_VERSION="$VERSION_ID"
DISTRIBUTION="$ID"
fi
distro_id=""
while [ "$distro_list" != "$distro_id" ]; do
distro_id="${distro_list%% *}"
distro_list="${distro_list#$distro_id }"
supported_dists=$SUPPORTED_DISTROS
supported_id=""
while [ "$supported_dists" != "$supported_id" ]; do
supported_id="${supported_dists%% *}"
supported_dists="${supported_dists#$supported_id }"
if [ "$distro_id" = "$supported_id" ]; then
DISTRIBUTION=$distro_id
echo "Distribution detected: $DISTRIBUTION"
break
fi
done
[ ! -z "$DISTRIBUTION" ] && break
done
# *** AUTO GENERATED OS PACKAGE SCRIPT START ***
get_pkgs_script=$(cat << EOF
from __future__ import annotations
import shlex
import re
import pathlib
import logging
if [ "$DISTRIBUTION" != "$orig_id" ]; then
DISTRO_VERSION=""
fi
from typing import Tuple, Dict, List, Any
if [ -z "$DISTRIBUTION" ] && [ -x "$( which apt-get || true )" ]; then
# Fall back to debian if apt-get is detected
echo "Found apt-get, falling back to debian distribution"
DISTRIBUTION="debian"
fi
def _get_distro_info() -> Dict[str, Any]:
try:
import distro
except ModuleNotFoundError:
pass
else:
return dict(
distro_id=distro.id(),
distro_version=distro.version(),
aliases=distro.like().split()
)
release_file = pathlib.Path("/etc/os-release")
release_info: Dict[str, str] = {}
with release_file.open("r") as f:
lexer = shlex.shlex(f, posix=True)
lexer.whitespace_split = True
for item in list(lexer):
if "=" in item:
key, val = item.split("=", maxsplit=1)
release_info[key] = val
return dict(
distro_id=release_info.get("ID", ""),
distro_version=release_info.get("VERSION_ID", ""),
aliases=release_info.get("ID_LIKE", "").split()
)
# *** AUTO GENERATED OS PACKAGE DEPENDENCIES START ***
if [ ${DISTRIBUTION} = "debian" ]; then
PACKAGES="python3-virtualenv python3-dev libopenjp2-7 libsodium-dev zlib1g-dev"
PACKAGES="${PACKAGES} libjpeg-dev packagekit wireless-tools curl"
PACKAGES="${PACKAGES} build-essential"
elif [ ${DISTRIBUTION} = "ubuntu" ]; then
PACKAGES="python3-virtualenv python3-dev libopenjp2-7 libsodium-dev zlib1g-dev"
PACKAGES="${PACKAGES} libjpeg-dev packagekit curl build-essential"
if ( compare_version "<=" "24.04" ); then
PACKAGES="${PACKAGES} wireless-tools"
fi
if ( compare_version ">=" "24.10" ); then
PACKAGES="${PACKAGES} iw"
fi
fi
# *** AUTO GENERATED OS PACKAGE DEPENDENCIES END ***
def _convert_version(version: str) -> Tuple[str | int, ...]:
version = version.strip()
ver_match = re.match(r"\d+(\.\d+)*((?:-|\.).+)?", version)
if ver_match is not None:
return tuple([
int(part) if part.isdigit() else part
for part in re.split(r"\.|-", version)
])
return (version,)
class SysDepsParser:
def __init__(self, distro_info: Dict[str, Any] | None = None) -> None:
if distro_info is None:
distro_info = _get_distro_info()
self.distro_id: str = distro_info.get("distro_id", "")
self.aliases: List[str] = distro_info.get("aliases", [])
self.distro_version: Tuple[int | str, ...] = tuple()
version = distro_info.get("distro_version")
if version:
self.distro_version = _convert_version(version)
def _parse_spec(self, full_spec: str) -> str | None:
parts = full_spec.split(";", maxsplit=1)
if len(parts) == 1:
return full_spec
pkg_name = parts[0].strip()
expressions = re.split(r"( and | or )", parts[1].strip())
if not len(expressions) & 1:
logging.info(
f"Requirement specifier is missing an expression "
f"between logical operators : {full_spec}"
)
return None
last_result: bool = True
last_logical_op: str | None = "and"
for idx, exp in enumerate(expressions):
if idx & 1:
if last_logical_op is not None:
logging.info(
"Requirement specifier contains sequential logical "
f"operators: {full_spec}"
)
return None
logical_op = exp.strip()
if logical_op not in ("and", "or"):
logging.info(
f"Invalid logical operator {logical_op} in requirement "
f"specifier: {full_spec}")
return None
last_logical_op = logical_op
continue
elif last_logical_op is None:
logging.info(
f"Requirement specifier contains two seqential expressions "
f"without a logical operator: {full_spec}")
return None
dep_parts = re.split(r"(==|!=|<=|>=|<|>)", exp.strip())
req_var = dep_parts[0].strip().lower()
if len(dep_parts) != 3:
logging.info(f"Invalid comparison, must be 3 parts: {full_spec}")
return None
elif req_var == "distro_id":
left_op: str | Tuple[int | str, ...] = self.distro_id
right_op = dep_parts[2].strip().strip("\"'")
elif req_var == "distro_version":
if not self.distro_version:
logging.info(
"Distro Version not detected, cannot satisfy requirement: "
f"{full_spec}"
)
return None
left_op = self.distro_version
right_op = _convert_version(dep_parts[2].strip().strip("\"'"))
else:
logging.info(f"Invalid requirement specifier: {full_spec}")
return None
operator = dep_parts[1].strip()
try:
compfunc = {
"<": lambda x, y: x < y,
">": lambda x, y: x > y,
"==": lambda x, y: x == y,
"!=": lambda x, y: x != y,
">=": lambda x, y: x >= y,
"<=": lambda x, y: x <= y
}.get(operator, lambda x, y: False)
result = compfunc(left_op, right_op)
if last_logical_op == "and":
last_result &= result
else:
last_result |= result
last_logical_op = None
except Exception:
logging.exception(f"Error comparing requirements: {full_spec}")
return None
if last_result:
return pkg_name
return None
def parse_dependencies(self, sys_deps: Dict[str, List[str]]) -> List[str]:
if not self.distro_id:
logging.info(
"Failed to detect current distro ID, cannot parse dependencies"
)
return []
all_ids = [self.distro_id] + self.aliases
for distro_id in all_ids:
if distro_id in sys_deps:
if not sys_deps[distro_id]:
logging.info(
f"Dependency data contains an empty package definition "
f"for linux distro '{distro_id}'"
)
continue
processed_deps: List[str] = []
for dep in sys_deps[distro_id]:
parsed_dep = self._parse_spec(dep)
if parsed_dep is not None:
processed_deps.append(parsed_dep)
return processed_deps
else:
logging.info(
f"Dependency data has no package definition for linux "
f"distro '{self.distro_id}'"
)
return []
# *** SYSTEM DEPENDENCIES START ***
system_deps = {
"debian": [
"python3-virtualenv", "python3-dev", "libopenjp2-7", "libsodium-dev",
"zlib1g-dev", "libjpeg-dev", "packagekit",
"wireless-tools; distro_id != 'ubuntu' or distro_version <= '24.04'",
"iw; distro_id == 'ubuntu' and distro_version >= '24.10'", "curl",
"build-essential"
],
}
# *** SYSTEM DEPENDENCIES END ***
parser = SysDepsParser()
pkgs = parser.parse_dependencies(system_deps)
if pkgs:
print(' '.join(pkgs), end="")
exit(0)
EOF
)
# *** AUTO GENERATED OS PACKAGE SCRIPT END ***
PACKAGES="$( python3 -c "$get_pkgs_script" )"
}
# Step 2: Clean up legacy installation

View File

@ -10,110 +10,96 @@ import argparse
import pathlib
import tomllib
import json
import re
from typing import Dict, List, Tuple
import ast
from io import StringIO, TextIOBase
from typing import Dict, List, Iterator
MAX_LINE_LENGTH = 88
SCRIPTS_PATH = pathlib.Path(__file__).parent
INST_PKG_HEADER = "# *** AUTO GENERATED OS PACKAGE DEPENDENCIES START ***"
INST_PKG_FOOTER = "# *** AUTO GENERATED OS PACKAGE DEPENDENCIES END ***"
INST_PKG_HEADER = "# *** AUTO GENERATED OS PACKAGE SCRIPT START ***"
INST_PKG_FOOTER = "# *** AUTO GENERATED OS PACKAGE SCRIPT END ***"
DEPS_HEADER = "# *** SYSTEM DEPENDENCIES START ***"
DEPS_FOOTER = "# *** SYSTEM DEPENDENCIES END ***"
def gen_multline_var(
var_name: str,
values: List[str],
indent: int = 0,
is_first: bool = True
) -> str:
def gen_pkg_list(values: List[str], indent: int = 0) -> Iterator[str]:
idt = " " * indent
if not values:
return f'{idt}{var_name}=""'
line_list: List[str] = []
if is_first:
current_line = f"{idt}{var_name}=\"{values.pop(0)}"
else:
current_line = (f"{idt}{var_name}=\"${{{var_name}}} {values.pop(0)}")
return
current_line = f"{idt}\"{values.pop(0)}\","
for val in values:
if len(current_line) + len(val) + 2 > MAX_LINE_LENGTH:
line_list.append(f'{current_line}"')
current_line = (f"{idt}{var_name}=\"${{{var_name}}} {val}")
if len(current_line) + len(val) + 4 > MAX_LINE_LENGTH:
yield current_line + "\n"
current_line = f"{idt}\"{val}\","
else:
current_line += f" {val}"
line_list.append(f'{current_line}"')
return "\n".join(line_list)
current_line += f" \"{val}\","
yield current_line.rstrip(",") + "\n"
def parse_sysdeps_file() -> Dict[str, List[Tuple[str, str, str]]]:
sys_deps_file = SCRIPTS_PATH.joinpath("system-dependencies.json")
base_deps: Dict[str, List[str]] = json.loads(sys_deps_file.read_bytes())
parsed_deps: Dict[str, List[Tuple[str, str, str]]] = {}
for distro, pkgs in base_deps.items():
parsed_deps[distro] = []
for dep in pkgs:
parts = dep.split(";", maxsplit=1)
if len(parts) == 1:
parsed_deps[distro].append((dep.strip(), "", ""))
else:
pkg_name = parts[0].strip()
dep_parts = re.split(r"(==|!=|<=|>=|<|>)", parts[1].strip())
comp_var = dep_parts[0].strip().lower()
if len(dep_parts) != 3 or comp_var != "distro_version":
continue
operator = dep_parts[1].strip()
req_version = dep_parts[2].strip()
parsed_deps[distro].append((pkg_name, operator, req_version))
return parsed_deps
def write_parser_script(sys_deps: Dict[str, List[str]], out_hdl: TextIOBase) -> None:
parser_file = SCRIPTS_PATH.parent.joinpath("moonraker/utils/sysdeps_parser.py")
out_hdl.write(" get_pkgs_script=$(cat << EOF\n")
with parser_file.open("r") as f:
for line in f:
if not line.strip().startswith("#"):
out_hdl.write(line)
out_hdl.write(f"{DEPS_HEADER}\n")
out_hdl.write("system_deps = {\n")
for distro, packages in sys_deps.items():
indent = " " * 4
out_hdl.write(f"{indent}\"{distro}\": [\n")
# Write packages
for line in gen_pkg_list(packages, 8):
out_hdl.write(line)
out_hdl.write(f"{indent}],\n")
out_hdl.write("}\n")
out_hdl.write(f"{DEPS_FOOTER}\n")
out_hdl.writelines("""
parser = SysDepsParser()
pkgs = parser.parse_dependencies(system_deps)
if pkgs:
print(' '.join(pkgs), end="")
exit(0)
EOF
)
""".lstrip())
def sync_packages() -> int:
inst_script = SCRIPTS_PATH.joinpath("install-moonraker.sh")
new_deps = parse_sysdeps_file()
sys_deps_file = SCRIPTS_PATH.joinpath("system-dependencies.json")
prev_deps: Dict[str, List[str]] = {}
new_deps: Dict[str, List[str]] = json.loads(sys_deps_file.read_bytes())
# Copy install script in memory.
install_data: List[str] = []
prev_deps: Dict[str, List[Tuple[str, str, str]]] = {}
distro_name = ""
cur_spec: Tuple[str, str] | None = None
install_data = StringIO()
prev_deps_str: str = ""
skip_data = False
collect_deps = False
with inst_script.open("r") as inst_file:
for line in inst_file:
cur_line = line.strip()
if not skip_data:
install_data.append(line)
install_data.write(line)
else:
# parse current dependencies
distro_match = re.match(
r"(?:el)?if \[ \$\{DISTRIBUTION\} = \"([a-z0-9._-]+)\" \]; then",
cur_line
)
if distro_match is not None:
distro_name = distro_match.group(1)
prev_deps[distro_name] = []
else:
if cur_spec is not None and cur_line == "fi":
cur_spec = None
if collect_deps:
if line.rstrip() == DEPS_FOOTER:
collect_deps = False
else:
req_match = re.match(
r"if \( compare_version \"(<|>|==|!=|<=|>=)\" "
r"\"([a-zA-Z0-9._-]+)\" \); then",
cur_line
)
if req_match is not None:
parts = req_match.groups()
cur_spec = (parts[0], parts[1])
elif cur_line.startswith("PACKAGES"):
pkgs = cur_line.split("=", maxsplit=1)[1].strip('"')
pkg_list = pkgs.split()
if pkg_list and pkg_list[0] == "${PACKAGES}":
pkg_list.pop(0)
operator, req_version = "", ""
if cur_spec is not None:
operator, req_version = cur_spec
for pkg in pkg_list:
prev_deps[distro_name].append(
(pkg, operator, req_version)
)
prev_deps_str += line
elif line.rstrip() == DEPS_HEADER:
collect_deps = True
if cur_line == INST_PKG_HEADER:
skip_data = True
elif cur_line == INST_PKG_FOOTER:
skip_data = False
install_data.append(line)
install_data.write(line)
if prev_deps_str:
try:
# start at the beginning of the dict literal
idx = prev_deps_str.find("{")
if idx > 0:
prev_deps = ast.literal_eval(prev_deps_str[idx:])
except Exception:
pass
print(f"Previous Dependencies:\n{prev_deps}")
# Check if an update is necessary
if set(prev_deps.keys()) == set(new_deps.keys()):
for distro, prev_pkgs in prev_deps.items():
@ -124,52 +110,14 @@ def sync_packages() -> int:
# Dependencies match, exit
print("System package dependencies match")
return 0
install_data.seek(0)
print("Writing new system dependencies to install script...")
with inst_script.open("w+") as inst_file:
# Find and replace old package defs
for line in install_data:
inst_file.write(line)
if line.strip() == INST_PKG_HEADER:
indent_count = len(line) - len(line.lstrip())
idt = " " * indent_count
# Write Package data
first_distro = True
for distro, packages in new_deps.items():
prefix = f"{idt}if" if first_distro else f"{idt}elif"
first_distro = False
inst_file.write(
f'{prefix} [ ${{DISTRIBUTION}} = "{distro}" ]; then\n'
)
pkgs_by_op: Dict[Tuple[str, str], List[str]] = {}
base_list: List[str] = []
for pkg_spec in packages:
if not pkg_spec[1] or not pkg_spec[2]:
base_list.append(pkg_spec[0])
else:
key = (pkg_spec[1], pkg_spec[2])
pkgs_by_op.setdefault(key, []).append(pkg_spec[0])
is_first = True
if base_list:
pkg_var = gen_multline_var(
"PACKAGES", base_list, indent_count + 4
)
inst_file.write(pkg_var)
inst_file.write("\n")
is_first = False
if pkgs_by_op:
for (operator, req_ver), pkg_list in pkgs_by_op.items():
req_idt = idt + " " * 4
inst_file.write(
f"{req_idt}if ( compare_version \"{operator}\" "
f"\"{req_ver}\" ); then\n"
)
req_pkgs = gen_multline_var(
"PACKAGES", pkg_list, indent_count + 8, is_first
)
inst_file.write(req_pkgs)
inst_file.write("\n")
inst_file.write(f"{req_idt}fi\n")
inst_file.write(f"{idt}fi\n")
write_parser_script(new_deps, inst_file)
return 1
def check_reqs_changed(reqs_file: pathlib.Path, new_reqs: List[str]) -> bool:

View File

@ -7,20 +7,8 @@
"zlib1g-dev",
"libjpeg-dev",
"packagekit",
"wireless-tools",
"curl",
"build-essential"
],
"ubuntu": [
"python3-virtualenv",
"python3-dev",
"libopenjp2-7",
"libsodium-dev",
"zlib1g-dev",
"libjpeg-dev",
"packagekit",
"wireless-tools; distro_version <= 24.04",
"iw; distro_version >= 24.10",
"wireless-tools; distro_id != 'ubuntu' or distro_version <= '24.04'",
"iw; distro_id == 'ubuntu' and distro_version >= '24.10'",
"curl",
"build-essential"
]