moonraker: move klippy connection logic to its own class

Signed-off-by:  Eric Callahan <arksine.code@gmail.com>
This commit is contained in:
Arksine 2020-08-15 15:22:17 -04:00
parent a3c0f06c5f
commit 76493215c5

View File

@ -41,7 +41,8 @@ class Server:
# Klippy Connection Handling # Klippy Connection Handling
self.klippy_address = config.get( self.klippy_address = config.get(
'klippy_uds_address', "/tmp/klippy_uds") 'klippy_uds_address', "/tmp/klippy_uds")
self.klippy_iostream = None self.klippy_connection = KlippyConnection(
self.process_command, self.on_connection_closed)
self.is_klippy_ready = False self.is_klippy_ready = False
self.gc_response_registered = False self.gc_response_registered = False
self.klippy_state = "disconnected" self.klippy_state = "disconnected"
@ -146,74 +147,45 @@ class Server:
# ***** Klippy Connection ***** # ***** Klippy Connection *****
async def _connect_klippy(self): async def _connect_klippy(self):
ksock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) ret = await self.klippy_connection.connect(self.klippy_address)
kstream = iostream.IOStream(ksock) if not ret:
try:
await kstream.connect(self.klippy_address)
except iostream.StreamClosedError:
# Klippy Socket Server not available
self.ioloop.call_later(1., self._connect_klippy) self.ioloop.call_later(1., self._connect_klippy)
return return
await gen.sleep(0.5)
if kstream.closed():
# Klippy Connection was rejected
self.ioloop.call_later(1., self._connect_klippy)
return
logging.info("Klippy Connection Established")
self.klippy_iostream = kstream
self.klippy_iostream.set_close_callback(
self._handle_stream_closed)
self.ioloop.spawn_callback(
self._read_klippy_stream, self.klippy_iostream)
# begin server iniialization # begin server iniialization
self.init_cb.start() self.init_cb.start()
async def _read_klippy_stream(self, stream): def process_command(self, cmd):
while not stream.closed(): method = cmd.get('method', None)
try: if method is not None:
data = await stream.read_until(b'\x03') # This is a remote method called from klippy
except iostream.StreamClosedError as e: cb = self.remote_methods.get(method, None)
return if cb is not None:
except Exception: params = cmd.get('params', {})
logging.exception("Klippy Stream Read Error") cb(**params)
continue else:
try: logging.info(f"Unknown method received: {method}")
decoded_cmd = json.loads(data[:-1]) return
method = decoded_cmd.get('method', None) # This is a response to a request, process
if method is not None: req_id = cmd.get('id', None)
# This is a remote method called from klippy request = self.pending_requests.pop(req_id, None)
cb = self.remote_methods.get(method, None) if request is None:
if cb is not None: logging.info(
params = decoded_cmd.get('params', {}) f"No request matching request ID: {req_id}, "
cb(**params) f"response: {cmd}")
else: return
logging.info(f"Unknown method received: {method}") if 'result' in cmd:
continue result = cmd['result']
# This is a response to a request, process if not result:
req_id = decoded_cmd.get('id', None) result = "ok"
request = self.pending_requests.pop(req_id, None) else:
if request is None: err = cmd.get('error', "Malformed Klippy Response")
logging.info( result = ServerError(err, 400)
f"No request matching request ID: {req_id}, " request.notify(result)
f"response: {decoded_cmd}")
continue
if 'result' in decoded_cmd:
result = decoded_cmd['result']
if not result:
result = "ok"
else:
err = decoded_cmd.get('error', "Malformed Klippy Response")
result = ServerError(err, 400)
request.notify(result)
except Exception:
logging.exception(
f"Error processing Klippy Host Response: {data.decode()}")
def _handle_stream_closed(self): def on_connection_closed(self):
self.is_klippy_ready = False self.is_klippy_ready = False
self.gc_response_registered = False self.gc_response_registered = False
self.klippy_state = "disconnected" self.klippy_state = "disconnected"
self.klippy_iostream = None
self.init_cb.stop() self.init_cb.stop()
for request in self.pending_requests.values(): for request in self.pending_requests.values():
request.notify(ServerError("Klippy Disconnected", 503)) request.notify(ServerError("Klippy Disconnected", 503))
@ -222,16 +194,6 @@ class Server:
self.send_event("server:klippy_state_changed", "disconnect") self.send_event("server:klippy_state_changed", "disconnect")
self.ioloop.call_later(1., self._connect_klippy) self.ioloop.call_later(1., self._connect_klippy)
async def send_klippy_request(self, request):
if self.klippy_iostream is None:
request.notify(ServerError("Klippy Host not connected", 503))
return
data = json.dumps(request.to_dict()).encode() + b"\x03"
try:
await self.klippy_iostream.write(data)
except iostream.StreamClosedError:
request.notify(ServerError("Klippy Host not connected", 503))
async def _initialize(self): async def _initialize(self):
await self._request_endpoints() await self._request_endpoints()
if not self.is_klippy_ready: if not self.is_klippy_ready:
@ -367,7 +329,7 @@ class Server:
base_request = BaseRequest(rpc_method, params) base_request = BaseRequest(rpc_method, params)
self.pending_requests[base_request.id] = base_request self.pending_requests[base_request.id] = base_request
self.ioloop.spawn_callback( self.ioloop.spawn_callback(
self.send_klippy_request, base_request) self.klippy_connection.send_request, base_request)
result = await base_request.wait() result = await base_request.wait()
return result return result
@ -380,14 +342,66 @@ class Server:
for plugin in self.plugins: for plugin in self.plugins:
if hasattr(plugin, "close"): if hasattr(plugin, "close"):
await plugin.close() await plugin.close()
if self.klippy_iostream is not None and \ self.klippy_connection.close()
not self.klippy_iostream.closed():
self.klippy_iostream.close()
if self.server_running: if self.server_running:
self.server_running = False self.server_running = False
await self.moonraker_app.close() await self.moonraker_app.close()
self.ioloop.stop() self.ioloop.stop()
class KlippyConnection:
def __init__(self, on_recd, on_close):
self.ioloop = IOLoop.current()
self.iostream = None
self.on_recd = on_recd
self.on_close = on_close
async def connect(self, address):
ksock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
kstream = iostream.IOStream(ksock)
try:
await kstream.connect(address)
except iostream.StreamClosedError:
return False
logging.info("Klippy Connection Established")
self.iostream = kstream
self.iostream.set_close_callback(self.on_close)
self.ioloop.spawn_callback(self._read_stream, self.iostream)
return True
async def _read_stream(self, stream):
while not stream.closed():
try:
data = await stream.read_until(b'\x03')
except iostream.StreamClosedError as e:
return
except Exception:
logging.exception("Klippy Stream Read Error")
continue
try:
decoded_cmd = json.loads(data[:-1])
self.on_recd(decoded_cmd)
except Exception:
logging.exception(
f"Error processing Klippy Host Response: {data.decode()}")
async def send_request(self, request):
if self.iostream is None:
request.notify(ServerError("Klippy Host not connected", 503))
return
data = json.dumps(request.to_dict()).encode() + b"\x03"
try:
await self.iostream.write(data)
except iostream.StreamClosedError:
request.notify(ServerError("Klippy Host not connected", 503))
def is_connected(self):
return self.iostream is not None and not self.iostream.closed()
def close(self):
if self.iostream is not None and \
not self.iostream.closed():
self.iostream.close()
# Basic WebRequest class, easily converted to dict for json encoding # Basic WebRequest class, easily converted to dict for json encoding
class BaseRequest: class BaseRequest:
def __init__(self, rpc_method, params): def __init__(self, rpc_method, params):