file_manager: add support for metadata gcode processors

Allow components to register gcode processors which will modify
the gcode file prior to metadata processing.

Signed-off-by:  Eric Callahan <arksine.code@gmail.com>
This commit is contained in:
Eric Callahan 2025-02-14 16:33:35 -05:00
parent e338f537f5
commit b8b28bc0c9
2 changed files with 260 additions and 109 deletions

View File

@ -15,6 +15,8 @@ import asyncio
import zipfile import zipfile
import time import time
import math import math
import shlex
import contextlib
from copy import deepcopy from copy import deepcopy
from inotify_simple import INotify from inotify_simple import INotify
from inotify_simple import flags as iFlags from inotify_simple import flags as iFlags
@ -2307,6 +2309,7 @@ class MetadataStorage:
self.pending_requests: Dict[ self.pending_requests: Dict[
str, Tuple[Dict[str, Any], asyncio.Event]] = {} str, Tuple[Dict[str, Any], asyncio.Event]] = {}
self.busy: bool = False self.busy: bool = False
self.processors: Dict[str, Dict[str, Any]] = {}
def prune_storage(self) -> None: def prune_storage(self) -> None:
# Check for removed gcode files while moonraker was shutdown # Check for removed gcode files while moonraker was shutdown
@ -2360,6 +2363,23 @@ class MetadataStorage:
def is_file_processing(self, fname: str) -> bool: def is_file_processing(self, fname: str) -> bool:
return fname in self.pending_requests return fname in self.pending_requests
def register_gcode_processor(
self, name: str, config: Dict[str, Any] | None
) -> None:
if config is None:
self.processors.pop(name, None)
return
elif name in self.processors:
raise self.server.error(f"File processor {name} already registered")
required_fields = ("name", "command", "timeout")
for req_field in required_fields:
if req_field not in config:
raise self.server.error(
f"File processor configuration requires a `{req_field}` field"
)
self.processors[name] = config
logging.info(f"GCode Processor {name} registered")
def _has_valid_data(self, def _has_valid_data(self,
fname: str, fname: str,
path_info: Dict[str, Any] path_info: Dict[str, Any]
@ -2551,22 +2571,37 @@ class MetadataStorage:
) -> None: ) -> None:
# Escape single quotes in the file name so that it may be # Escape single quotes in the file name so that it may be
# properly loaded # properly loaded
filename = filename.replace("\"", "\\\"") config: Dict[str, Any] = {
cmd = " ".join([sys.executable, METADATA_SCRIPT, "-p", "filename": filename,
self.gc_path, "-f", f"\"{filename}\""]) "gcode_dir": self.gc_path,
"check_objects": self.enable_object_proc,
"ufp_path": ufp_path,
"processors": list(self.processors.values())
}
timeout = self.default_metadata_parser_timeout timeout = self.default_metadata_parser_timeout
if ufp_path is not None and os.path.isfile(ufp_path): if ufp_path is not None or self.enable_object_proc:
timeout = max(timeout, 300.) timeout = max(timeout, 300.)
ufp_path.replace("\"", "\\\"") if self.processors:
cmd += f" -u \"{ufp_path}\"" proc_timeout = sum(
if self.enable_object_proc: [proc.get("timeout", 0) for proc in self.processors.values()]
timeout = max(timeout, 300.) )
cmd += " --check-objects" timeout = max(timeout, proc_timeout)
eventloop = self.server.get_event_loop()
md_cfg = await eventloop.run_in_thread(self._create_metadata_cfg, config)
cmd = " ".join([sys.executable, METADATA_SCRIPT, "-c", shlex.quote(md_cfg)])
result = bytearray() result = bytearray()
sc: SCMDComp = self.server.lookup_component('shell_command') try:
scmd = sc.build_shell_command(cmd, callback=result.extend, log_stderr=True) sc: SCMDComp = self.server.lookup_component('shell_command')
if not await scmd.run(timeout=timeout): scmd = sc.build_shell_command(
raise self.server.error("Extract Metadata returned with error") cmd, callback=result.extend, log_stderr=True
)
if not await scmd.run(timeout=timeout):
raise self.server.error("Extract Metadata returned with error")
finally:
def _rm_md_config():
with contextlib.suppress(OSError):
os.remove(md_cfg)
await eventloop.run_in_thread(_rm_md_config)
try: try:
decoded_resp: Dict[str, Any] = jsonw.loads(result.strip()) decoded_resp: Dict[str, Any] = jsonw.loads(result.strip())
except Exception: except Exception:
@ -2581,5 +2616,12 @@ class MetadataStorage:
self.metadata[path] = metadata self.metadata[path] = metadata
self.mddb[path] = metadata self.mddb[path] = metadata
def _create_metadata_cfg(self, config: Dict[str, Any]) -> str:
with tempfile.NamedTemporaryFile(
prefix="metacfg-", suffix=".json", delete=False
) as f:
f.write(jsonw.dumps(config))
return f.name
def load_component(config: ConfigHelper) -> FileManager: def load_component(config: ConfigHelper) -> FileManager:
return FileManager(config) return FileManager(config)

View File

@ -1,7 +1,7 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
# GCode metadata extraction utility # GCode metadata extraction utility
# #
# Copyright (C) 2020 Eric Callahan <arksine.code@gmail.com> # Copyright (C) 2020-2025 Eric Callahan <arksine.code@gmail.com>
# #
# This file may be distributed under the terms of the GNU GPLv3 license. # This file may be distributed under the terms of the GNU GPLv3 license.
@ -18,6 +18,8 @@ import zipfile
import shutil import shutil
import uuid import uuid
import logging import logging
import shlex
import subprocess
from PIL import Image from PIL import Image
# Annotation imports # Annotation imports
@ -109,6 +111,8 @@ def regex_find_max_float(pattern: str, data: str) -> Optional[float]:
class BaseSlicer(object): class BaseSlicer(object):
def __init__(self, file_path: str) -> None: def __init__(self, file_path: str) -> None:
self.path = file_path self.path = file_path
self.slicer_name = "Unknown"
self.slicer_version = "?"
self.header_data: str = "" self.header_data: str = ""
self.footer_data: str = "" self.footer_data: str = ""
self.layer_height: Optional[float] = None self.layer_height: Optional[float] = None
@ -153,7 +157,14 @@ class BaseSlicer(object):
return True return True
return False return False
def check_identity(self, data: str) -> Optional[Dict[str, str]]: def check_identity(self, data: str) -> bool:
return False
def check_gcode_processor(self, regex: str, location: str) -> Dict[str, Any] | None:
data = self.header_data if location == "header" else self.footer_data
proc_match = re.search(regex, data, re.MULTILINE)
if proc_match is not None:
return proc_match.groupdict()
return None return None
def has_objects(self) -> bool: def has_objects(self) -> bool:
@ -282,9 +293,6 @@ class BaseSlicer(object):
return None return None
class UnknownSlicer(BaseSlicer): class UnknownSlicer(BaseSlicer):
def check_identity(self, data: str) -> Optional[Dict[str, str]]:
return {'slicer': "Unknown"}
def parse_first_layer_height(self) -> Optional[float]: def parse_first_layer_height(self) -> Optional[float]:
return regex_find_min_float(r"G1\sZ(%F)\s", self.header_data) return regex_find_min_float(r"G1\sZ(%F)\s", self.header_data)
@ -304,7 +312,7 @@ class UnknownSlicer(BaseSlicer):
return None return None
class PrusaSlicer(BaseSlicer): class PrusaSlicer(BaseSlicer):
def check_identity(self, data: str) -> Optional[Dict[str, str]]: def check_identity(self, data: str) -> bool:
aliases = { aliases = {
'PrusaSlicer': r"PrusaSlicer\s(.*)\son", 'PrusaSlicer': r"PrusaSlicer\s(.*)\son",
'SuperSlicer': r"SuperSlicer\s(.*)\son", 'SuperSlicer': r"SuperSlicer\s(.*)\son",
@ -318,11 +326,10 @@ class PrusaSlicer(BaseSlicer):
for name, expr in aliases.items(): for name, expr in aliases.items():
match = re.search(expr, data) match = re.search(expr, data)
if match: if match:
return { self.slicer_name = name
'slicer': name, self.slicer_version = match.group(1)
'slicer_version': match.group(1) return True
} return False
return None
def has_objects(self) -> bool: def has_objects(self) -> bool:
return self._check_has_objects( return self._check_has_objects(
@ -423,14 +430,13 @@ class PrusaSlicer(BaseSlicer):
return regex_find_int(r"; total layers count = (%D)", self.footer_data) return regex_find_int(r"; total layers count = (%D)", self.footer_data)
class Slic3rPE(PrusaSlicer): class Slic3rPE(PrusaSlicer):
def check_identity(self, data: str) -> Optional[Dict[str, str]]: def check_identity(self, data: str) -> bool:
match = re.search(r"Slic3r\sPrusa\sEdition\s(.*)\son", data) match = re.search(r"Slic3r\sPrusa\sEdition\s(.*)\son", data)
if match: if match:
return { self.slicer_name = "Slic3r PE"
'slicer': "Slic3r PE", self.slicer_version = match.group(1)
'slicer_version': match.group(1) return True
} return False
return None
def parse_filament_total(self) -> Optional[float]: def parse_filament_total(self) -> Optional[float]:
return regex_find_float(r"filament\sused\s=\s(%F)mm", self.footer_data) return regex_find_float(r"filament\sused\s=\s(%F)mm", self.footer_data)
@ -439,14 +445,13 @@ class Slic3rPE(PrusaSlicer):
return None return None
class Slic3r(Slic3rPE): class Slic3r(Slic3rPE):
def check_identity(self, data: str) -> Optional[Dict[str, str]]: def check_identity(self, data: str) -> bool:
match = re.search(r"Slic3r\s(\d.*)\son", data) match = re.search(r"Slic3r\s(\d.*)\son", data)
if match: if match:
return { self.slicer_name = "Slic3r"
'slicer': "Slic3r", self.slicer_version = match.group(1)
'slicer_version': match.group(1) return True
} return False
return None
def parse_filament_total(self) -> Optional[float]: def parse_filament_total(self) -> Optional[float]:
filament = regex_find_float( filament = regex_find_float(
@ -463,14 +468,13 @@ class Slic3r(Slic3rPE):
return None return None
class Cura(BaseSlicer): class Cura(BaseSlicer):
def check_identity(self, data: str) -> Optional[Dict[str, str]]: def check_identity(self, data: str) -> bool:
match = re.search(r"Cura_SteamEngine\s(.*)", data) match = re.search(r"Cura_SteamEngine\s(.*)", data)
if match: if match:
return { self.slicer_name = "Cura"
'slicer': "Cura", self.slicer_version = match.group(1)
'slicer_version': match.group(1) return True
} return False
return None
def has_objects(self) -> bool: def has_objects(self) -> bool:
return self._check_has_objects(self.header_data, r"\n;MESH:") return self._check_has_objects(self.header_data, r"\n;MESH:")
@ -557,16 +561,14 @@ class Cura(BaseSlicer):
return thumbs return thumbs
class Simplify3D(BaseSlicer): class Simplify3D(BaseSlicer):
def check_identity(self, data: str) -> Optional[Dict[str, str]]: def check_identity(self, data: str) -> bool:
match = re.search(r"Simplify3D\(R\)\sVersion\s(.*)", data) match = re.search(r"Simplify3D\(R\)\sVersion\s(.*)", data)
if match: if match:
self._version = match.group(1) self.slicer_name = "Simplify3D"
self._is_v5 = self._version.startswith("5") self.slicer_version = match.group(1)
return { self._is_v5 = self.slicer_version.startswith("5")
'slicer': "Simplify3D", return True
'slicer_version': match.group(1) return False
}
return None
def parse_first_layer_height(self) -> Optional[float]: def parse_first_layer_height(self) -> Optional[float]:
return regex_find_min_float(r"G1\sZ(%F)\s", self.header_data) return regex_find_min_float(r"G1\sZ(%F)\s", self.header_data)
@ -667,16 +669,16 @@ class Simplify3D(BaseSlicer):
) )
class KISSlicer(BaseSlicer): class KISSlicer(BaseSlicer):
def check_identity(self, data: str) -> Optional[Dict[str, Any]]: def check_identity(self, data: str) -> bool:
match = re.search(r";\sKISSlicer", data) match = re.search(r";\sKISSlicer", data)
if match: if match:
ident = {'slicer': "KISSlicer"} self.slicer_name = "KISSlicer"
vmatch = re.search(r";\sversion\s(.*)", data) vmatch = re.search(r";\sversion\s(.*)", data)
if vmatch: if vmatch:
version = vmatch.group(1).replace(" ", "-") version = vmatch.group(1).replace(" ", "-")
ident['slicer_version'] = version self.slicer_version = version
return ident return True
return None return False
def parse_first_layer_height(self) -> Optional[float]: def parse_first_layer_height(self) -> Optional[float]:
return regex_find_float( return regex_find_float(
@ -718,14 +720,13 @@ class KISSlicer(BaseSlicer):
class IdeaMaker(BaseSlicer): class IdeaMaker(BaseSlicer):
def check_identity(self, data: str) -> Optional[Dict[str, str]]: def check_identity(self, data: str) -> bool:
match = re.search(r"\sideaMaker\s(.*),", data) match = re.search(r"\sideaMaker\s(.*),", data)
if match: if match:
return { self.slicer_name = "IdeaMaker"
'slicer': "IdeaMaker", self.slicer_version = match.group(1)
'slicer_version': match.group(1) return True
} return False
return None
def has_objects(self) -> bool: def has_objects(self) -> bool:
return self._check_has_objects(self.header_data, r"\n;PRINTING:") return self._check_has_objects(self.header_data, r"\n;PRINTING:")
@ -795,15 +796,14 @@ class IdeaMaker(BaseSlicer):
r";Dimension:(?:\s\d+\.\d+){3}\s(%F)", self.header_data) r";Dimension:(?:\s\d+\.\d+){3}\s(%F)", self.header_data)
class IceSL(BaseSlicer): class IceSL(BaseSlicer):
def check_identity(self, data) -> Optional[Dict[str, Any]]: def check_identity(self, data) -> bool:
match = re.search(r"<IceSL\s(.*)>", data) match = re.search(r"<IceSL\s(.*)>", data)
if match: if match:
version = match.group(1) if match.group(1)[0].isdigit() else "-" version = match.group(1) if match.group(1)[0].isdigit() else "-"
return { self.slicer_name = "IceSL"
'slicer': "IceSL", self.slicer_version = version
'slicer_version': version return True
} return False
return None
def parse_first_layer_height(self) -> Optional[float]: def parse_first_layer_height(self) -> Optional[float]:
return regex_find_float( return regex_find_float(
@ -861,7 +861,7 @@ class IceSL(BaseSlicer):
r";\snozzle_diameter_mm_0\s:\s+(%F)", self.header_data) r";\snozzle_diameter_mm_0\s:\s+(%F)", self.header_data)
class KiriMoto(BaseSlicer): class KiriMoto(BaseSlicer):
def check_identity(self, data) -> Optional[Dict[str, Any]]: def check_identity(self, data) -> bool:
variants: Dict[str, str] = { variants: Dict[str, str] = {
"Kiri:Moto": r"; Generated by Kiri:Moto (\d.+)", "Kiri:Moto": r"; Generated by Kiri:Moto (\d.+)",
"SimplyPrint": r"; Generated by Kiri:Moto \(SimplyPrint\) (.+)" "SimplyPrint": r"; Generated by Kiri:Moto \(SimplyPrint\) (.+)"
@ -869,11 +869,10 @@ class KiriMoto(BaseSlicer):
for name, pattern in variants.items(): for name, pattern in variants.items():
match = re.search(pattern, data) match = re.search(pattern, data)
if match: if match:
return { self.slicer_name = name
"slicer": name, self.slicer_version = match.group(1)
"slicer_version": match.group(1) return True
} return False
return None
def parse_first_layer_height(self) -> Optional[float]: def parse_first_layer_height(self) -> Optional[float]:
return regex_find_float( return regex_find_float(
@ -942,9 +941,18 @@ SUPPORTED_DATA = [
'filament_type', 'filament_type',
'filament_total', 'filament_total',
'filament_weight_total', 'filament_weight_total',
'thumbnails'] 'thumbnails'
]
def process_objects(file_path: str, slicer: BaseSlicer, name: str) -> bool: PPC_REGEX = (
r"^; Pre-Processed for Cancel-Object support "
r"by preprocess_cancellation (?P<version>v?\d+(?:\.\d+)*)"
)
def process_objects(file_path: str, slicer: BaseSlicer) -> bool:
name = slicer.slicer_name
if not slicer.has_objects():
return False
try: try:
from preprocess_cancellation import ( from preprocess_cancellation import (
preprocess_slicer, preprocess_slicer,
@ -957,8 +965,7 @@ def process_objects(file_path: str, slicer: BaseSlicer, name: str) -> bool:
return False return False
fname = os.path.basename(file_path) fname = os.path.basename(file_path)
logger.info( logger.info(
f"Performing Object Processing on file: {fname}, " f"Performing Object Processing on file: {fname}, sliced by {name}"
f"sliced by {name}"
) )
with tempfile.TemporaryDirectory() as tmp_dir_name: with tempfile.TemporaryDirectory() as tmp_dir_name:
tmp_file = os.path.join(tmp_dir_name, fname) tmp_file = os.path.join(tmp_dir_name, fname)
@ -989,7 +996,7 @@ def process_objects(file_path: str, slicer: BaseSlicer, name: str) -> bool:
shutil.move(tmp_file, file_path) shutil.move(tmp_file, file_path)
return True return True
def get_slicer(file_path: str) -> Tuple[BaseSlicer, Dict[str, str]]: def get_slicer(file_path: str) -> BaseSlicer:
header_data = footer_data = "" header_data = footer_data = ""
slicer: Optional[BaseSlicer] = None slicer: Optional[BaseSlicer] = None
size = os.path.getsize(file_path) size = os.path.getsize(file_path)
@ -999,12 +1006,10 @@ def get_slicer(file_path: str) -> Tuple[BaseSlicer, Dict[str, str]]:
header_data = f.read(READ_SIZE) header_data = f.read(READ_SIZE)
for impl in SUPPORTED_SLICERS: for impl in SUPPORTED_SLICERS:
slicer = impl(file_path) slicer = impl(file_path)
ident = slicer.check_identity(header_data) if slicer.check_identity(header_data):
if ident is not None:
break break
else: else:
slicer = UnknownSlicer(file_path) slicer = UnknownSlicer(file_path)
ident = slicer.check_identity(header_data)
if size > READ_SIZE * 2: if size > READ_SIZE * 2:
f.seek(size - READ_SIZE) f.seek(size - READ_SIZE)
footer_data = f.read() footer_data = f.read()
@ -1014,23 +1019,91 @@ def get_slicer(file_path: str) -> Tuple[BaseSlicer, Dict[str, str]]:
else: else:
footer_data = header_data footer_data = header_data
slicer.set_data(header_data, footer_data, size) slicer.set_data(header_data, footer_data, size)
if ident is None: return slicer
ident = {"slicer": "unknown"}
return slicer, ident def run_gcode_processors(
gc_file_path: str, slicer: BaseSlicer, processors: List[Dict[str, Any]]
) -> Tuple[List[str], bool]:
reload_slicer_data: bool = False
finished_procs: List[str] = []
short_name = os.path.basename(gc_file_path)
for proc_cfg in processors:
name: str = "Unknown"
try:
name = proc_cfg["name"]
version = proc_cfg.get("version", "v?")
ident: Dict[str, Any] = proc_cfg.get("ident", {})
if ident:
regex: str = ident["regex"]
loc: str = ident["location"]
data = slicer.check_gcode_processor(regex, loc)
if data is not None:
ver = data.get("version", "v?")
logger.info(
f"File {short_name} previously processed by {name} {ver}"
)
finished_procs.append(name)
continue
if not proc_cfg.get("enabled", True):
logger.info(f"Processor {name} is disabled")
continue
arglist: List[str] = []
command = proc_cfg["command"]
if callable(command):
# Local file processor (preprocess_cancellation)
if command(gc_file_path, slicer):
finished_procs.append(name)
reload_slicer_data = True
continue
elif isinstance(command, str):
arglist = shlex.split(command)
else:
arglist = command
assert isinstance(arglist, list)
for idx, arg in enumerate(arglist):
assert isinstance(arg, str)
if arg == "{gcode_file_path}":
arglist[idx] = gc_file_path
timeout: float = proc_cfg.get("timeout", 120.)
assert isinstance(timeout, (int, float)) and timeout > 0.
logger.info(
f"Running processor {name} {version} on file {short_name}..."
)
ret = subprocess.run(arglist, capture_output=True, timeout=timeout)
except Exception:
logger.info(f"Processor {name} failed with error")
logger.info(traceback.format_exc())
continue
if ret.returncode != 0:
logger.info(f"File processor {name} failed with code {ret.returncode}")
stdout = ret.stdout.decode(errors="ignore")
stderr = ret.stderr.decode(errors="ignore")
if stdout:
logger.info(stdout)
if stderr:
logger.info(stderr)
else:
logger.info(f"File processor {name} successfully complete")
finished_procs.append(name)
reload_slicer_data = True
return finished_procs, reload_slicer_data
def extract_metadata( def extract_metadata(
file_path: str, check_objects: bool file_path: str, processors: List[Dict[str, Any]]
) -> Dict[str, Any]: ) -> Dict[str, Any]:
metadata: Dict[str, Any] = {} metadata: Dict[str, Any] = {}
slicer, ident = get_slicer(file_path) proc_list: List[str] = []
if check_objects and slicer.has_objects(): slicer = get_slicer(file_path)
name = ident.get("slicer", "unknown") if processors:
if process_objects(file_path, slicer, name): proc_list, reload = run_gcode_processors(file_path, slicer, processors)
slicer, ident = get_slicer(file_path) if reload:
metadata['size'] = os.path.getsize(file_path) slicer = get_slicer(file_path)
metadata['modified'] = os.path.getmtime(file_path) metadata["size"] = os.path.getsize(file_path)
metadata['uuid'] = str(uuid.uuid4()) metadata["modified"] = os.path.getmtime(file_path)
metadata.update(ident) metadata["uuid"] = str(uuid.uuid4())
metadata["file_processors"] = proc_list
metadata["slicer"] = slicer.slicer_name
metadata["slicer_version"] = slicer.slicer_version
for key in SUPPORTED_DATA: for key in SUPPORTED_DATA:
func = getattr(slicer, "parse_" + key) func = getattr(slicer, "parse_" + key)
result = func() result = func()
@ -1070,12 +1143,23 @@ def extract_ufp(ufp_path: str, dest_path: str) -> None:
except Exception: except Exception:
logger.info(f"Error removing ufp file: {ufp_path}") logger.info(f"Error removing ufp file: {ufp_path}")
def main(path: str, def main(config: Dict[str, Any]) -> None:
filename: str, gc_path: str = config["gcode_dir"]
ufp: Optional[str], filename: str = config["filename"]
check_objects: bool file_path = os.path.join(gc_path, filename)
) -> None: processors: List[Dict[str, Any]] = config.get("processors", [])
file_path = os.path.join(path, filename) processors.append(
{
"name": "preprocess_cancellation",
"command": process_objects,
"enabled": config.get("check_objects", False),
"ident": {
"regex": PPC_REGEX,
"location": "header"
}
}
)
ufp = config.get("ufp_path")
if ufp is not None: if ufp is not None:
extract_ufp(ufp, file_path) extract_ufp(ufp, file_path)
metadata: Dict[str, Any] = {} metadata: Dict[str, Any] = {}
@ -1083,7 +1167,7 @@ def main(path: str,
logger.info(f"File Not Found: {file_path}") logger.info(f"File Not Found: {file_path}")
sys.exit(-1) sys.exit(-1)
try: try:
metadata = extract_metadata(file_path, check_objects) metadata = extract_metadata(file_path, processors)
except Exception: except Exception:
logger.info(traceback.format_exc()) logger.info(traceback.format_exc())
sys.exit(-1) sys.exit(-1)
@ -1103,12 +1187,15 @@ if __name__ == "__main__":
parser = argparse.ArgumentParser( parser = argparse.ArgumentParser(
description="GCode Metadata Extraction Utility") description="GCode Metadata Extraction Utility")
parser.add_argument( parser.add_argument(
"-f", "--filename", metavar='<filename>', "-c", "--config", metavar='<config_file>', default=None,
help="Optional json configuration file for metadata.py"
)
parser.add_argument(
"-f", "--filename", metavar='<filename>', default=None,
help="name gcode file to parse") help="name gcode file to parse")
parser.add_argument( parser.add_argument(
"-p", "--path", default=os.path.abspath(os.path.dirname(__file__)), "-p", "--path", metavar='<path>', default=None,
metavar='<path>', help="optional path to folder containing the file"
help="optional absolute path for file"
) )
parser.add_argument( parser.add_argument(
"-u", "--ufp", metavar="<ufp file>", default=None, "-u", "--ufp", metavar="<ufp file>", default=None,
@ -1118,7 +1205,29 @@ if __name__ == "__main__":
"-o", "--check-objects", dest='check_objects', action='store_true', "-o", "--check-objects", dest='check_objects', action='store_true',
help="process gcode file for exclude opbject functionality") help="process gcode file for exclude opbject functionality")
args = parser.parse_args() args = parser.parse_args()
check_objects = args.check_objects config: Dict[str, Any] = {}
enabled_msg = "enabled" if check_objects else "disabled" if args.config is None:
logger.info(f"Object Processing is {enabled_msg}") if args.filename is None:
main(args.path, args.filename, args.ufp, check_objects) logger.info(
"The '--filename' (-f) option must be specified when "
" --config is not set"
)
sys.exit(-1)
config["filename"] = args.filename
config["gcode_dir"] = args.path
config["ufp_path"] = args.ufp
config["check_objects"] = args.check_objects
else:
# Config file takes priority over command line options
try:
with open(args.config, "r") as f:
config = (json.load(f))
except Exception:
logger.info(traceback.format_exc())
sys.exit(-1)
if config.get("filename") is None:
logger.info("The 'filename' field must be present in the configuration")
sys.exit(-1)
if config.get("gcode_dir") is None:
config["gcode_dir"] = os.path.abspath(os.path.dirname(__file__))
main(config)