This adds the framework for unit testing Moonraker via pytest. Initally only moonraker.py, klippy_connection.py, and confighelper.py have acceptable coverage. Coverage for other modules will be added on an incremental basis, when most of Moonraker's source is covered tests will be conducted via GitHub actions. Signed-off-by: Eric Callahan <arksine.code@gmail.com>
511 lines
18 KiB
Python
511 lines
18 KiB
Python
from __future__ import annotations
|
|
import pytest
|
|
import pytest_asyncio
|
|
import asyncio
|
|
import socket
|
|
import pathlib
|
|
from collections import namedtuple
|
|
|
|
from moonraker import CORE_COMPONENTS, Server
|
|
from moonraker import main as servermain
|
|
from eventloop import EventLoop
|
|
from utils import ServerError
|
|
from confighelper import ConfigError
|
|
from components.klippy_apis import KlippyAPI
|
|
from mocks import MockComponent, MockWebsocket
|
|
|
|
from typing import (
|
|
TYPE_CHECKING,
|
|
AsyncIterator,
|
|
Dict,
|
|
Optional
|
|
)
|
|
|
|
if TYPE_CHECKING:
|
|
from fixtures import HttpClient, WebsocketClient
|
|
|
|
MockArgs = namedtuple('MockArgs', ["logfile", "nologfile", "configfile"])
|
|
|
|
@pytest.mark.run_paths(moonraker_conf="invalid_config.conf")
|
|
def test_invalid_config(path_args: Dict[str, pathlib.Path]):
|
|
evtloop = EventLoop()
|
|
args = {
|
|
'config_file': str(path_args['moonraker.conf']),
|
|
'log_file': "",
|
|
'software_version': "moonraker-pytest"
|
|
}
|
|
with pytest.raises(ConfigError):
|
|
Server(args, None, evtloop)
|
|
|
|
def test_config_and_log_warnings(path_args: Dict[str, pathlib.Path]):
|
|
evtloop = EventLoop()
|
|
args = {
|
|
'config_file': str(path_args['moonraker.conf']),
|
|
'log_file': "",
|
|
'software_version': "moonraker-pytest",
|
|
'log_warning': "Log Warning Test",
|
|
'config_warning': "Config Warning Test"
|
|
}
|
|
expected = ["Log Warning Test", "Config Warning Test"]
|
|
server = Server(args, None, evtloop)
|
|
assert server.warnings == expected
|
|
|
|
@pytest.mark.run_paths(moonraker_conf="unparsed_server.conf")
|
|
@pytest.mark.asyncio
|
|
async def test_unparsed_config_items(full_server: Server):
|
|
expected_warnings = [
|
|
"Unparsed config section [machine unparsed] detected.",
|
|
"Unparsed config option 'unknown_option: True' detected "
|
|
"in section [server]."]
|
|
warn_cnt = 0
|
|
for warn in full_server.warnings:
|
|
for expected in expected_warnings:
|
|
if warn.startswith(expected):
|
|
warn_cnt += 1
|
|
assert warn_cnt == 2
|
|
|
|
@pytest.mark.run_paths(moonraker_log="moonraker.log")
|
|
@pytest.mark.asyncio
|
|
async def test_file_logger(base_server: Server,
|
|
path_args: Dict[str, pathlib.Path]):
|
|
log_path = path_args.get("moonraker.log", None)
|
|
assert log_path is not None and log_path.exists()
|
|
|
|
def test_signal_handler(base_server: Server,
|
|
event_loop: asyncio.AbstractEventLoop):
|
|
base_server._handle_term_signal()
|
|
event_loop.run_forever()
|
|
assert base_server.exit_reason == "terminate"
|
|
|
|
class TestInstantiation:
|
|
def test_running(self, base_server: Server):
|
|
assert base_server.is_running() is False
|
|
|
|
def test_app_args(self,
|
|
path_args: Dict[str, pathlib.Path],
|
|
base_server: Server):
|
|
args = {
|
|
'config_file': str(path_args['moonraker.conf']),
|
|
'log_file': str(path_args.get("moonlog", "")),
|
|
'software_version': "moonraker-pytest"
|
|
}
|
|
assert base_server.get_app_args() == args
|
|
|
|
def test_pending_tasks(self, base_server: Server):
|
|
loop = base_server.get_event_loop().aioloop
|
|
assert len(loop._ready) == 0
|
|
|
|
def test_klippy_info(self, base_server: Server):
|
|
assert base_server.get_klippy_info() == {}
|
|
|
|
def test_klippy_state(self, base_server: Server):
|
|
assert base_server.get_klippy_state() == "disconnected"
|
|
|
|
def test_host_info(self, base_server: Server):
|
|
hinfo = {
|
|
'hostname': socket.gethostname(),
|
|
'address': "0.0.0.0",
|
|
'port': 7010,
|
|
'ssl_port': 7011
|
|
}
|
|
assert base_server.get_host_info() == hinfo
|
|
|
|
def test_klippy_connection(self, base_server: Server):
|
|
assert base_server.klippy_connection.is_connected() is False
|
|
|
|
def test_components(self, base_server: Server):
|
|
key_list = sorted(list(base_server.components.keys()))
|
|
assert key_list == [
|
|
"application",
|
|
"internal_transport",
|
|
"klippy_connection",
|
|
"websockets",
|
|
]
|
|
|
|
def test_endpoint_registered(self, base_server: Server):
|
|
app = base_server.moonraker_app
|
|
assert "/server/info" in app.api_cache
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_notification(self, base_server: Server):
|
|
base_server.register_notification("test:test_event")
|
|
fut = base_server.event_loop.create_future()
|
|
wsm = base_server.lookup_component("websockets")
|
|
wsm.websockets[1] = MockWebsocket(fut)
|
|
base_server.send_event("test:test_event", "test")
|
|
ret = await fut
|
|
expected = {
|
|
'jsonrpc': "2.0",
|
|
'method': "notify_test_event",
|
|
'params': ["test"]
|
|
}
|
|
assert expected == ret
|
|
|
|
class TestLoadComponent:
|
|
def test_load_component_fail(self, base_server: Server):
|
|
with pytest.raises(ServerError):
|
|
base_server.load_component(
|
|
base_server.config, "invalid_component")
|
|
|
|
def test_failed_component_set(self, base_server: Server):
|
|
assert "invalid_component" in base_server.failed_components
|
|
|
|
def test_load_component_fail_with_default(self, base_server: Server):
|
|
comp = base_server.load_component(
|
|
base_server.config, "invalid_component", None)
|
|
assert comp is None
|
|
|
|
def test_lookup_failed(self, base_server: Server):
|
|
with pytest.raises(ServerError):
|
|
base_server.lookup_component("invalid_component")
|
|
|
|
def test_lookup_failed_with_default(self, base_server: Server):
|
|
comp = base_server.lookup_component("invalid_component", None)
|
|
assert comp is None
|
|
|
|
def test_load_component(self, base_server: Server):
|
|
comp = base_server.load_component(base_server.config, "klippy_apis")
|
|
assert isinstance(comp, KlippyAPI)
|
|
|
|
def test_lookup_component(self, base_server: Server):
|
|
comp = base_server.lookup_component('klippy_apis')
|
|
assert isinstance(comp, KlippyAPI)
|
|
|
|
def test_component_attr(self, base_server: Server):
|
|
key_list = sorted(list(base_server.components.keys()))
|
|
assert key_list == [
|
|
"application",
|
|
"internal_transport",
|
|
"klippy_apis",
|
|
"klippy_connection",
|
|
"websockets",
|
|
]
|
|
|
|
class TestCoreServer:
|
|
@pytest_asyncio.fixture(scope="class")
|
|
async def core_server(self, base_server: Server) -> AsyncIterator[Server]:
|
|
base_server.load_components()
|
|
yield base_server
|
|
await base_server._stop_server("terminate")
|
|
|
|
def test_running(self, core_server: Server):
|
|
assert core_server.is_running() is False
|
|
|
|
def test_http_servers(self, core_server: Server):
|
|
app = core_server.lookup_component("application")
|
|
assert (
|
|
app.http_server is None and
|
|
app.secure_server is None
|
|
)
|
|
|
|
def test_warnings(self, core_server: Server):
|
|
assert len(core_server.warnings) == 0
|
|
|
|
def test_failed_components(self, core_server: Server):
|
|
assert len(core_server.failed_components) == 0
|
|
|
|
def test_lookup_components(self, core_server: Server):
|
|
comps = []
|
|
for comp_name in CORE_COMPONENTS:
|
|
comps.append(core_server.lookup_component(comp_name, None))
|
|
assert None not in comps
|
|
|
|
def test_pending_tasks(self, core_server: Server):
|
|
loop = core_server.get_event_loop().aioloop
|
|
assert len(loop._ready) == 0
|
|
|
|
def test_register_component_fail(self, core_server: Server):
|
|
with pytest.raises(ServerError):
|
|
core_server.register_component("machine", object())
|
|
|
|
def test_register_remote_method(self, core_server: Server):
|
|
core_server.register_remote_method("moonraker_test", lambda: None)
|
|
kconn = core_server.klippy_connection
|
|
assert "moonraker_test" in kconn.remote_methods
|
|
|
|
def test_register_method_exists(self, core_server: Server):
|
|
with pytest.raises(ServerError):
|
|
core_server.register_remote_method(
|
|
"shutdown_machine", lambda: None)
|
|
|
|
class TestServerInit:
|
|
def test_running(self, full_server: Server):
|
|
assert full_server.is_running() is False
|
|
|
|
def test_http_servers(self, full_server: Server):
|
|
app = full_server.lookup_component("application")
|
|
assert (
|
|
app.http_server is None and
|
|
app.secure_server is None
|
|
)
|
|
|
|
def test_warnings(self, full_server: Server):
|
|
assert len(full_server.warnings) == 0
|
|
|
|
def test_failed_components(self, full_server: Server):
|
|
assert len(full_server.failed_components) == 0
|
|
|
|
def test_lookup_components(self, full_server: Server):
|
|
comps = []
|
|
for comp_name in CORE_COMPONENTS:
|
|
comps.append(full_server.lookup_component(comp_name, None))
|
|
assert None not in comps
|
|
|
|
def test_config_backup(self,
|
|
full_server: Server,
|
|
path_args: Dict[str, pathlib.Path]):
|
|
cfg = path_args["config_path"].joinpath(".moonraker.conf.bkp")
|
|
assert cfg.is_file()
|
|
|
|
class TestServerStart:
|
|
@pytest_asyncio.fixture(scope="class")
|
|
async def server(self, full_server: Server) -> Server:
|
|
await full_server.start_server(connect_to_klippy=False)
|
|
return full_server
|
|
|
|
def test_running(self, server: Server):
|
|
assert server.is_running() is True
|
|
|
|
def test_http_servers(self, server: Server):
|
|
app = server.lookup_component("application")
|
|
assert (
|
|
app.http_server is not None and
|
|
app.secure_server is None
|
|
)
|
|
|
|
@pytest.mark.run_paths(moonraker_conf="base_server_ssl.conf")
|
|
class TestSecureServerStart:
|
|
@pytest_asyncio.fixture(scope="class")
|
|
async def server(self, full_server: Server) -> Server:
|
|
await full_server.start_server(connect_to_klippy=False)
|
|
return full_server
|
|
|
|
def test_running(self, server: Server):
|
|
assert server.is_running() is True
|
|
|
|
def test_http_servers(self, server: Server):
|
|
app = server.lookup_component("application")
|
|
assert (
|
|
app.http_server is not None and
|
|
app.secure_server is not None
|
|
)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_component_init_error(base_server: Server):
|
|
base_server.register_component("testcomp", MockComponent(err_init=True))
|
|
await base_server.server_init(False)
|
|
assert "testcomp" in base_server.failed_components
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_component_exit_error(base_server: Server,
|
|
caplog: pytest.LogCaptureFixture):
|
|
base_server.register_component("testcomp", MockComponent(err_exit=True))
|
|
await base_server._stop_server("terminate")
|
|
expected = "Error executing 'on_exit()' for component: testcomp"
|
|
assert expected in caplog.messages
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_component_close_error(base_server: Server,
|
|
caplog: pytest.LogCaptureFixture):
|
|
base_server.register_component("testcomp", MockComponent(err_close=True))
|
|
await base_server._stop_server("terminate")
|
|
expected = "Error executing 'close()' for component: testcomp"
|
|
assert expected in caplog.messages
|
|
|
|
def test_register_event(base_server: Server):
|
|
def test_func():
|
|
pass
|
|
base_server.register_event_handler("test:my_test", test_func)
|
|
assert base_server.events["test:my_test"] == [test_func]
|
|
|
|
def test_register_async_event(base_server: Server):
|
|
async def test_func():
|
|
pass
|
|
base_server.register_event_handler("test:my_test", test_func)
|
|
assert base_server.events["test:my_test"] == [test_func]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_send_event(full_server: Server):
|
|
evtloop = full_server.get_event_loop()
|
|
fut = evtloop.create_future()
|
|
|
|
def test_func(arg):
|
|
fut.set_result(arg)
|
|
full_server.register_event_handler("test:my_test", test_func)
|
|
full_server.send_event("test:my_test", "test")
|
|
result = await fut
|
|
assert result == "test"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_send_async_event(full_server: Server):
|
|
evtloop = full_server.get_event_loop()
|
|
fut = evtloop.create_future()
|
|
|
|
async def test_func(arg):
|
|
fut.set_result(arg)
|
|
full_server.register_event_handler("test:my_test", test_func)
|
|
full_server.send_event("test:my_test", "test")
|
|
result = await fut
|
|
assert result == "test"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_register_remote_method_running(full_server: Server):
|
|
await full_server.start_server(connect_to_klippy=False)
|
|
with pytest.raises(ServerError):
|
|
full_server.register_remote_method(
|
|
"moonraker_test", lambda: None)
|
|
|
|
@pytest.mark.usefixtures("event_loop")
|
|
def test_main(path_args: Dict[str, pathlib.Path],
|
|
monkeypatch: pytest.MonkeyPatch,
|
|
caplog: pytest.LogCaptureFixture):
|
|
tries = [1]
|
|
|
|
def mock_init(self: Server):
|
|
reason = "terminate"
|
|
if tries:
|
|
reason = "restart"
|
|
tries.pop(0)
|
|
self.event_loop.delay_callback(.01, self._stop_server, reason)
|
|
cfg_path = path_args["moonraker.conf"]
|
|
args = MockArgs("", True, str(cfg_path))
|
|
monkeypatch.setattr(Server, "server_init", mock_init)
|
|
code: Optional[int] = None
|
|
try:
|
|
servermain(args)
|
|
except SystemExit as e:
|
|
code = e.code
|
|
assert (
|
|
code == 0 and
|
|
"Attempting Server Restart..." in caplog.messages and
|
|
"Server Shutdown" == caplog.messages[-1]
|
|
)
|
|
|
|
@pytest.mark.run_paths(moonraker_conf="invalid_config.conf")
|
|
def test_main_config_error(path_args: Dict[str, pathlib.Path],
|
|
caplog: pytest.LogCaptureFixture):
|
|
cfg_path = path_args["moonraker.conf"]
|
|
args = MockArgs("", True, str(cfg_path))
|
|
try:
|
|
servermain(args)
|
|
except SystemExit as e:
|
|
code = e.code
|
|
assert code == 1 and "Server Config Error" in caplog.messages
|
|
|
|
@pytest.mark.run_paths(moonraker_conf="invalid_config.conf",
|
|
moonraker_bkp=".moonraker.conf.bkp")
|
|
@pytest.mark.usefixtures("event_loop")
|
|
def test_main_restore_config(path_args: Dict[str, pathlib.Path],
|
|
monkeypatch: pytest.MonkeyPatch,
|
|
caplog: pytest.LogCaptureFixture):
|
|
def mock_init(self: Server):
|
|
reason = "terminate"
|
|
self.event_loop.delay_callback(.01, self._stop_server, reason)
|
|
|
|
cfg_path = path_args["moonraker.conf"]
|
|
args = MockArgs("", True, str(cfg_path))
|
|
monkeypatch.setattr(Server, "server_init", mock_init)
|
|
code: Optional[int] = None
|
|
try:
|
|
servermain(args)
|
|
except SystemExit as e:
|
|
code = e.code
|
|
assert (
|
|
code == 0 and
|
|
"Loaded server from most recent working configuration:" in caplog.text
|
|
)
|
|
|
|
class TestEndpoints:
|
|
@pytest_asyncio.fixture(scope="class")
|
|
async def server(self, full_server: Server):
|
|
await full_server.start_server()
|
|
yield full_server
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_http_server_info(self,
|
|
server: Server,
|
|
http_client: HttpClient):
|
|
ret = await http_client.get("/server/info")
|
|
comps = list(server.components.keys())
|
|
expected = {
|
|
'klippy_connected': False,
|
|
'klippy_state': "disconnected",
|
|
'components': comps,
|
|
'failed_components': [],
|
|
'registered_directories': ["config", "logs"],
|
|
'warnings': [],
|
|
'websocket_count': 0,
|
|
'moonraker_version': "moonraker-pytest",
|
|
'missing_klippy_requirements': []
|
|
}
|
|
assert ret["result"] == expected
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_http_server_config(self,
|
|
server: Server,
|
|
http_client: HttpClient):
|
|
cfg = server.config.get_parsed_config()
|
|
ret = await http_client.get("/server/config")
|
|
assert ret["result"]["config"] == cfg
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_websocket_server_info(self,
|
|
server: Server,
|
|
websocket_client: WebsocketClient):
|
|
ret = await websocket_client.request("server.info")
|
|
comps = list(server.components.keys())
|
|
expected = {
|
|
'klippy_connected': False,
|
|
'klippy_state': "disconnected",
|
|
'components': comps,
|
|
'failed_components': [],
|
|
'registered_directories': ["config", "logs"],
|
|
'warnings': [],
|
|
'websocket_count': 1,
|
|
'moonraker_version': "moonraker-pytest",
|
|
'missing_klippy_requirements': []
|
|
}
|
|
assert ret == expected
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_websocket_server_config(self,
|
|
server: Server,
|
|
websocket_client: WebsocketClient):
|
|
cfg = server.config.get_parsed_config()
|
|
ret = await websocket_client.request("server.config")
|
|
assert ret["config"] == cfg
|
|
|
|
def test_server_restart(base_server: Server,
|
|
http_client: HttpClient,
|
|
event_loop: asyncio.AbstractEventLoop):
|
|
result = {}
|
|
|
|
async def do_restart():
|
|
base_server.load_components()
|
|
await base_server.start_server()
|
|
ret = await http_client.post("/server/restart")
|
|
result.update(ret)
|
|
event_loop.create_task(do_restart())
|
|
event_loop.run_forever()
|
|
assert result["result"] == "ok" and base_server.exit_reason == "restart"
|
|
|
|
@pytest.mark.no_ws_connect
|
|
def test_websocket_restart(base_server: Server,
|
|
websocket_client: WebsocketClient,
|
|
event_loop: asyncio.AbstractEventLoop):
|
|
result = {}
|
|
|
|
async def do_restart():
|
|
base_server.load_components()
|
|
await base_server.start_server()
|
|
await websocket_client.connect()
|
|
ret = await websocket_client.request("server.restart")
|
|
result["result"] = ret
|
|
event_loop.create_task(do_restart())
|
|
event_loop.run_forever()
|
|
assert result["result"] == "ok" and base_server.exit_reason == "restart"
|
|
|
|
|
|
# TODO:
|
|
# test invalid cert, key (probably should do that in test_app.py)
|