This adds the framework for unit testing Moonraker via pytest. Initally only moonraker.py, klippy_connection.py, and confighelper.py have acceptable coverage. Coverage for other modules will be added on an incremental basis, when most of Moonraker's source is covered tests will be conducted via GitHub actions. Signed-off-by: Eric Callahan <arksine.code@gmail.com>
272 lines
9.5 KiB
Python
272 lines
9.5 KiB
Python
from __future__ import annotations
|
|
import pytest
|
|
import asyncio
|
|
import pathlib
|
|
from typing import TYPE_CHECKING, Dict
|
|
from moonraker import ServerError
|
|
from klippy_connection import KlippyRequest
|
|
from mocks import MockReader, MockWriter
|
|
|
|
if TYPE_CHECKING:
|
|
from moonraker import Server
|
|
from conftest import KlippyProcess
|
|
|
|
@pytest.mark.usefixtures("klippy")
|
|
@pytest.mark.asyncio
|
|
async def test_klippy_startup(full_server: Server):
|
|
evtloop = full_server.get_event_loop()
|
|
futs = [evtloop.create_future() for _ in range(3)]
|
|
events = {
|
|
"server:klippy_identified": lambda: futs[0].set_result("id"),
|
|
"server:klippy_started": lambda x: futs[1].set_result("started"),
|
|
"server:klippy_ready": lambda: futs[2].set_result("ready")
|
|
}
|
|
for name, func in events.items():
|
|
full_server.register_event_handler(name, func)
|
|
await full_server.start_server()
|
|
ret = await asyncio.wait_for(asyncio.gather(*futs), 4.)
|
|
assert (
|
|
ret == ["id", "started", "ready"] and
|
|
full_server.klippy_connection.is_connected()
|
|
)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_gcode_response(ready_server: Server,
|
|
klippy: KlippyProcess):
|
|
evtloop = ready_server.get_event_loop()
|
|
fut = evtloop.create_future()
|
|
|
|
def on_gc_resp(resp: str):
|
|
if not fut.done():
|
|
fut.set_result(resp)
|
|
ready_server.register_event_handler("server:gcode_response", on_gc_resp)
|
|
klippy.send_gcode("M118 Moonraker Test")
|
|
await asyncio.wait_for(fut, 1.)
|
|
assert "Moonraker Test" in fut.result()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_klippy_shutdown(ready_server: Server, klippy: KlippyProcess):
|
|
evtloop = ready_server.get_event_loop()
|
|
fut = evtloop.create_future()
|
|
|
|
def on_shutdown():
|
|
if not fut.done():
|
|
fut.set_result("shutdown")
|
|
ready_server.register_event_handler("server:klippy_shutdown", on_shutdown)
|
|
klippy.send_gcode("M112")
|
|
await asyncio.wait_for(fut, 2.)
|
|
assert fut.result() == "shutdown"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_klippy_disconnect(ready_server: Server, klippy: KlippyProcess):
|
|
evtloop = ready_server.get_event_loop()
|
|
fut = evtloop.create_future()
|
|
|
|
def on_disconnect():
|
|
if not fut.done():
|
|
fut.set_result("disconnect")
|
|
ready_server.register_event_handler("server:klippy_disconnect",
|
|
on_disconnect)
|
|
klippy.stop()
|
|
await asyncio.wait_for(fut, 2.)
|
|
assert fut.result() == "disconnect"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_klippy_reconnect(ready_server: Server, klippy: KlippyProcess):
|
|
evtloop = ready_server.get_event_loop()
|
|
fut = evtloop.create_future()
|
|
|
|
def on_reconnect():
|
|
if not fut.done():
|
|
fut.set_result("test")
|
|
ready_server.register_event_handler("server:klippy_ready",
|
|
on_reconnect)
|
|
klippy.send_gcode("RESTART")
|
|
await asyncio.wait_for(fut, 4.)
|
|
assert fut.result() == "test"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_no_klippy_connection_error(full_server: Server):
|
|
await full_server.start_server()
|
|
with pytest.raises(ServerError):
|
|
kapis = full_server.klippy_connection.klippy_apis
|
|
await kapis.run_gcode("M115")
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_status_update(ready_server: Server, klippy: KlippyProcess):
|
|
evtloop = ready_server.get_event_loop()
|
|
fut = evtloop.create_future()
|
|
|
|
def on_status_update(data):
|
|
if not fut.done():
|
|
fut.set_result(data)
|
|
ready_server.register_event_handler("server:status_update",
|
|
on_status_update)
|
|
kapis = ready_server.klippy_connection.klippy_apis
|
|
await kapis.subscribe_objects({"toolhead": None})
|
|
klippy.send_gcode("G28")
|
|
await asyncio.wait_for(fut, 2.)
|
|
assert isinstance(fut.result(), dict)
|
|
|
|
@pytest.mark.run_paths(printer_cfg="error_printer.cfg")
|
|
@pytest.mark.asyncio
|
|
async def test_klippy_error(ready_server: Server):
|
|
kconn = ready_server.klippy_connection
|
|
assert kconn.state == "error"
|
|
|
|
@pytest.mark.run_paths(printer_cfg="missing_reqs.cfg")
|
|
@pytest.mark.asyncio
|
|
async def test_missing_reqs(ready_server: Server):
|
|
mreqs = sorted(ready_server.klippy_connection.missing_requirements)
|
|
expected = ["display_status", "pause_resume", "virtual_sdcard"]
|
|
assert mreqs == expected
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_connection_close(full_server: Server):
|
|
await full_server.start_server()
|
|
# Test multiple close attempts, the second to enter
|
|
# should wait and exit
|
|
ret = full_server.klippy_connection.close(True)
|
|
ret2 = full_server.klippy_connection.close(True)
|
|
await asyncio.wait_for(asyncio.gather(ret, ret2), 4.)
|
|
kconn = full_server.klippy_connection
|
|
assert kconn.connection_task.cancelled()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_init_error(base_server: Server):
|
|
base_server.server_running = True
|
|
kconn = base_server.klippy_connection
|
|
|
|
def mock_is_connected():
|
|
return kconn.init_attempts < 3
|
|
kconn.is_connected = mock_is_connected
|
|
ret = await kconn._init_klippy_connection()
|
|
assert ret is False
|
|
|
|
def test_connect_fail(base_server: Server):
|
|
ret = base_server.klippy_connection.connect()
|
|
assert ret.result() is False
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_wait_connect_fail(base_server: Server):
|
|
ret = await base_server.klippy_connection.wait_connected()
|
|
assert ret is False
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_no_uds(base_server: Server):
|
|
attempts = [1, 2, 3]
|
|
|
|
def mock_is_running():
|
|
attempts.pop(0)
|
|
return len(attempts) > 0
|
|
base_server.is_running = mock_is_running
|
|
ret = await base_server.klippy_connection._do_connect()
|
|
assert ret is False
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_no_uds_access(base_server: Server,
|
|
path_args: Dict[str, pathlib.Path]):
|
|
attempts = [1, 2, 3]
|
|
uds_path = path_args['klippy_uds_path']
|
|
uds_path.write_text("test")
|
|
uds_path.chmod(mode=222)
|
|
|
|
def mock_is_running():
|
|
attempts.pop(0)
|
|
return len(attempts) > 0
|
|
base_server.is_running = mock_is_running
|
|
ret = await base_server.klippy_connection._do_connect()
|
|
assert ret is False
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_write_not_connected(base_server: Server):
|
|
req = KlippyRequest("", {})
|
|
kconn = base_server.klippy_connection
|
|
await kconn._write_request(req)
|
|
assert isinstance(req.response, ServerError)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_write_error(base_server: Server):
|
|
req = KlippyRequest("", {})
|
|
kconn = base_server.klippy_connection
|
|
kconn.writer = MockWriter()
|
|
await kconn._write_request(req)
|
|
assert isinstance(req.response, ServerError)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_write_cancelled(base_server: Server):
|
|
req = KlippyRequest("", {})
|
|
kconn = base_server.klippy_connection
|
|
kconn.writer = MockWriter(wait_drain=True)
|
|
task = base_server.event_loop.create_task(kconn._write_request(req))
|
|
base_server.event_loop.delay_callback(.01, task.cancel)
|
|
with pytest.raises(asyncio.CancelledError):
|
|
await task
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_read_error(base_server: Server,
|
|
caplog: pytest.LogCaptureFixture):
|
|
mock_reader = MockReader("raise_error")
|
|
kconn = base_server.klippy_connection
|
|
await kconn._read_stream(mock_reader)
|
|
assert "Klippy Stream Read Error" == caplog.messages[-1]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_read_cancelled(base_server: Server):
|
|
mock_reader = MockReader("wait")
|
|
kconn = base_server.klippy_connection
|
|
task = base_server.event_loop.create_task(
|
|
kconn._read_stream(mock_reader))
|
|
base_server.event_loop.delay_callback(.01, task.cancel)
|
|
with pytest.raises(asyncio.CancelledError):
|
|
await task
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_read_decode_error(base_server: Server,
|
|
caplog: pytest.LogCaptureFixture):
|
|
mock_reader = MockReader()
|
|
kconn = base_server.klippy_connection
|
|
await kconn._read_stream(mock_reader)
|
|
assert "Error processing Klippy Host Response:" in caplog.messages[-1]
|
|
|
|
def test_process_unknown_method(base_server: Server,
|
|
caplog: pytest.LogCaptureFixture):
|
|
cmd = {"method": "test_unknown"}
|
|
kconn = base_server.klippy_connection
|
|
kconn._process_command(cmd)
|
|
assert "Unknown method received: test_unknown" == caplog.messages[-1]
|
|
|
|
def test_process_unknown_request(base_server: Server,
|
|
caplog: pytest.LogCaptureFixture):
|
|
cmd = {"id": 4543}
|
|
kconn = base_server.klippy_connection
|
|
kconn._process_command(cmd)
|
|
expected = f"No request matching request ID: 4543, response: {cmd}"
|
|
assert expected == caplog.messages[-1]
|
|
|
|
def test_process_invalid_request(base_server: Server):
|
|
req = KlippyRequest("", {})
|
|
kconn = base_server.klippy_connection
|
|
kconn.pending_requests[req.id] = req
|
|
cmd = {"id": req.id}
|
|
kconn._process_command(cmd)
|
|
assert isinstance(req.response, ServerError)
|
|
|
|
# TODO: This can probably go in a class with test apis
|
|
@pytest.mark.asyncio
|
|
async def test_call_remote_method(base_server: Server,
|
|
klippy: KlippyProcess):
|
|
fut = base_server.get_event_loop().create_future()
|
|
|
|
def method_test(result):
|
|
fut.set_result(result)
|
|
base_server.register_remote_method("moonraker_test", method_test)
|
|
base_server.load_components()
|
|
await base_server.server_init()
|
|
ret = base_server.klippy_connection.wait_connected()
|
|
await asyncio.wait_for(ret, 4.)
|
|
klippy.send_gcode("TEST_REMOTE_METHOD")
|
|
await fut
|
|
await base_server._stop_server("terminate")
|
|
assert fut.result() == "test"
|