This adds the framework for unit testing Moonraker via pytest. Initally only moonraker.py, klippy_connection.py, and confighelper.py have acceptable coverage. Coverage for other modules will be added on an incremental basis, when most of Moonraker's source is covered tests will be conducted via GitHub actions. Signed-off-by: Eric Callahan <arksine.code@gmail.com>
194 lines
6.2 KiB
Python
194 lines
6.2 KiB
Python
from __future__ import annotations
|
|
import os
|
|
import logging
|
|
from typing import Dict, Optional, List, Tuple
|
|
|
|
class GpioException(Exception):
|
|
pass
|
|
|
|
class MockGpiod:
|
|
LINE_REQ_DIR_OUT = 3
|
|
LINE_REQ_EV_BOTH_EDGES = 6
|
|
LINE_REQ_FLAG_ACTIVE_LOW = 1 << 2
|
|
LINE_REQ_FLAG_BIAS_DISABLE = 1 << 3
|
|
LINE_REQ_FLAG_BIAS_PULL_DOWN = 1 << 4
|
|
LINE_REQ_FLAG_BIAS_PULL_UP = 1 << 5
|
|
|
|
def __init__(self, version: str = "1.2") -> None:
|
|
self.version = version
|
|
self.Chip = MockChipWrapper(self)
|
|
self.LineEvent = MockLineEvent
|
|
self.chips: Dict[str, MockChip] = {}
|
|
|
|
def version_string(self) -> str:
|
|
return self.version
|
|
|
|
def version_tuple(self) -> Tuple[int, ...]:
|
|
return tuple([int(v) for v in self.version.split(".")])
|
|
|
|
def get_chip(self, chip_name) -> Optional[MockChip]:
|
|
return self.chips.get(chip_name, None)
|
|
|
|
def add_chip(self, chip: MockChip):
|
|
self.chips[chip.name] = chip
|
|
|
|
def pop_chip(self, name: str):
|
|
self.chips.pop(name, None)
|
|
|
|
def find_line(self, chip_id: str, pin_id: str) -> MockLine:
|
|
if chip_id not in self.chips:
|
|
raise GpioException(f"Unable to find chip {chip_id}")
|
|
return self.chips[chip_id].find_line(pin_id)
|
|
|
|
class MockChipWrapper:
|
|
OPEN_BY_NAME = 2
|
|
def __init__(self, gpiod: MockGpiod) -> None:
|
|
self.mock_gpiod = gpiod
|
|
|
|
def __call__(self, chip_name: str, flags: int) -> MockChip:
|
|
if chip_name in self.mock_gpiod.chips:
|
|
return self.mock_gpiod.chips[chip_name]
|
|
chip = MockChip(chip_name, flags, self.mock_gpiod)
|
|
self.mock_gpiod.add_chip(chip)
|
|
return chip
|
|
|
|
class MockChip:
|
|
def __init__(self,
|
|
chip_name: str,
|
|
flags: int,
|
|
mock_gpiod: MockGpiod
|
|
) -> None:
|
|
self.name = chip_name
|
|
self.flags = flags
|
|
self.mock_gpiod = mock_gpiod
|
|
self.requested_lines: Dict[str, MockLine] = {}
|
|
|
|
def get_line(self, pin_id: str) -> MockLine:
|
|
if pin_id in self.requested_lines:
|
|
raise GpioException(f"Line {pin_id} already reserved")
|
|
line = MockLine(self, pin_id, self.mock_gpiod)
|
|
self.requested_lines[pin_id] = line
|
|
return line
|
|
|
|
def find_line(self, pin_id: str) -> MockLine:
|
|
if pin_id not in self.requested_lines:
|
|
raise GpioException(f"Unable to find line {pin_id}")
|
|
return self.requested_lines[pin_id]
|
|
|
|
def pop_line(self, name: str) -> None:
|
|
self.requested_lines.pop(name, None)
|
|
|
|
def close(self) -> None:
|
|
for line in list(self.requested_lines.values()):
|
|
line.release()
|
|
self.requested_lines = {}
|
|
self.mock_gpiod.pop_chip(self.name)
|
|
|
|
class MockLine:
|
|
def __init__(self,
|
|
chip: MockChip,
|
|
name: str,
|
|
mock_gpiod: MockGpiod
|
|
) -> None:
|
|
self.mock_gpiod = mock_gpiod
|
|
self.chip = chip
|
|
self.name = name
|
|
self.consumer_name: str = ""
|
|
self.is_event = False
|
|
self.invert = False
|
|
self.value = 0
|
|
self.read_pipe: Optional[int] = None
|
|
self.write_pipe: Optional[int] = None
|
|
self.bias = "not_configured"
|
|
|
|
def request(self,
|
|
consumer: str,
|
|
type: int,
|
|
flags: int = 0,
|
|
default_vals: Optional[List[int]] = None,
|
|
default_val: Optional[int] = None
|
|
) -> None:
|
|
self.consumer_name = consumer
|
|
version = self.mock_gpiod.version_tuple()
|
|
if type == MockGpiod.LINE_REQ_DIR_OUT:
|
|
self.is_event = False
|
|
if default_vals is not None:
|
|
if version > (1, 2):
|
|
logging.warn("default_vals is deprecated in gpiod 1.3+")
|
|
self.value = default_vals[0]
|
|
elif default_val is not None:
|
|
if version < (1, 3):
|
|
raise GpioException(
|
|
"default_val not available in gpiod < 1.3")
|
|
self.value = default_val
|
|
elif type == MockGpiod.LINE_REQ_EV_BOTH_EDGES:
|
|
self.is_event = True
|
|
if version >= (1, 5):
|
|
if flags & MockGpiod.LINE_REQ_FLAG_BIAS_DISABLE:
|
|
self.bias = "disabled"
|
|
elif flags & MockGpiod.LINE_REQ_FLAG_BIAS_PULL_DOWN:
|
|
self.bias = "pulldown"
|
|
elif flags & MockGpiod.LINE_REQ_FLAG_BIAS_PULL_UP:
|
|
self.bias = "pullup"
|
|
self.read_pipe, self.write_pipe = os.pipe2(os.O_NONBLOCK)
|
|
else:
|
|
raise GpioException("Unsupported GPIO Type")
|
|
if flags & MockGpiod.LINE_REQ_FLAG_ACTIVE_LOW:
|
|
self.invert = True
|
|
|
|
def release(self) -> None:
|
|
if self.read_pipe is not None:
|
|
try:
|
|
os.close(self.read_pipe)
|
|
except Exception:
|
|
pass
|
|
if self.write_pipe is not None:
|
|
try:
|
|
os.close(self.write_pipe)
|
|
except Exception:
|
|
pass
|
|
self.chip.pop_line(self.name)
|
|
|
|
def set_value(self, value: int) -> None:
|
|
if self.is_event:
|
|
raise GpioException("Cannot set the value for an input pin")
|
|
self.value = int(not not value)
|
|
|
|
def get_value(self) -> int:
|
|
return self.value
|
|
|
|
def event_read(self) -> MockLineEvent:
|
|
if self.read_pipe is None:
|
|
raise GpioException
|
|
try:
|
|
data = os.read(self.read_pipe, 64)
|
|
except Exception:
|
|
pass
|
|
else:
|
|
value = int(not not data[-1])
|
|
self.value = value
|
|
return MockLineEvent(self.value)
|
|
|
|
def event_get_fd(self) -> int:
|
|
if self.read_pipe is None:
|
|
raise GpioException("Event not configured")
|
|
return self.read_pipe
|
|
|
|
def simulate_line_event(self, value: int) -> None:
|
|
if self.write_pipe is None:
|
|
raise GpioException("Event not configured")
|
|
val = bytes([int(not not value)])
|
|
try:
|
|
os.write(self.write_pipe, val)
|
|
except Exception:
|
|
pass
|
|
|
|
class MockLineEvent:
|
|
RISING_EDGE = 1
|
|
FALLING_EDGE = 2
|
|
def __init__(self, value: int) -> None:
|
|
if value == 1:
|
|
self.type = self.RISING_EDGE
|
|
else:
|
|
self.type = self.FALLING_EDGE
|