diff --git a/moonraker/components/mqtt.py b/moonraker/components/mqtt.py index 10501f2..976b86d 100644 --- a/moonraker/components/mqtt.py +++ b/moonraker/components/mqtt.py @@ -244,7 +244,7 @@ class AIOHelper: class MQTTClient(APITransport, Subscribable): def __init__(self, config: ConfigHelper) -> None: self.server = config.get_server() - self.event_loop = self.server.get_event_loop() + self.eventloop = self.server.get_event_loop() self.klippy: Klippy = self.server.lookup_component("klippy_connection") self.address: str = config.get('address') self.port: int = config.getint('port', 1883) @@ -357,7 +357,7 @@ class MQTTClient(APITransport, Subscribable): payload=jsonw.dumps({'server': 'offline'}), qos=self.qos, retain=True) self.client.connect_async(self.address, self.port) - self.connect_task = self.event_loop.create_task( + self.connect_task = self.eventloop.create_task( self._do_reconnect(first=True) ) @@ -379,7 +379,7 @@ class MQTTClient(APITransport, Subscribable): if topic in self.subscribed_topics: cb_hdls = self.subscribed_topics[topic][1] for hdl in cb_hdls: - self.event_loop.register_callback( + self.eventloop.register_callback( hdl.callback, message.payload) else: logging.debug( @@ -401,7 +401,7 @@ class MQTTClient(APITransport, Subscribable): if subs: res, msg_id = client.subscribe(subs) if msg_id is not None: - sub_fut: asyncio.Future = asyncio.Future() + sub_fut: asyncio.Future = self.eventloop.create_future() topics = list(self.subscribed_topics.keys()) sub_fut.add_done_callback( BrokerAckLogger(topics, "subscribe")) @@ -475,7 +475,7 @@ class MQTTClient(APITransport, Subscribable): raise first = False try: - sock = await self.event_loop.create_socket_connection( + sock = await self.eventloop.create_socket_connection( (self.address, self.port), timeout=10 ) self.client.reconnect(sock) @@ -523,7 +523,7 @@ class MQTTClient(APITransport, Subscribable): if self.is_connected() and need_sub: res, msg_id = self.client.subscribe(topic, qos) if msg_id is not None: - sub_fut: asyncio.Future = asyncio.Future() + sub_fut: asyncio.Future = self.eventloop.create_future() sub_fut.add_done_callback( BrokerAckLogger([topic], "subscribe")) self.pending_acks[msg_id] = sub_fut @@ -541,7 +541,7 @@ class MQTTClient(APITransport, Subscribable): del self.subscribed_topics[topic] res, msg_id = self.client.unsubscribe(topic) if msg_id is not None: - unsub_fut: asyncio.Future = asyncio.Future() + unsub_fut: asyncio.Future = self.eventloop.create_future() unsub_fut.add_done_callback( BrokerAckLogger([topic], "unsubscribe")) self.pending_acks[msg_id] = unsub_fut @@ -555,7 +555,7 @@ class MQTTClient(APITransport, Subscribable): qos = qos or self.qos if qos > 2 or qos < 0: raise self.server.error("QOS must be between 0 and 2") - pub_fut: asyncio.Future = asyncio.Future() + pub_fut: asyncio.Future = self.eventloop.create_future() if isinstance(payload, (dict, list)): try: payload = jsonw.dumps(payload) @@ -602,7 +602,7 @@ class MQTTClient(APITransport, Subscribable): qos = qos or self.qos if qos > 2 or qos < 0: raise self.server.error("QOS must be between 0 and 2") - resp_fut: asyncio.Future = asyncio.Future() + resp_fut: asyncio.Future = self.eventloop.create_future() resp_hdl = self.subscribe_topic( response_topic, resp_fut.set_result, qos) self.pending_responses.append(resp_fut) @@ -644,7 +644,7 @@ class MQTTClient(APITransport, Subscribable): topic: str = web_request.get_str("topic") qos: int = web_request.get_int("qos", self.qos) timeout: Optional[float] = web_request.get_float('timeout', None) - resp: asyncio.Future = asyncio.Future() + resp: asyncio.Future = self.eventloop.create_future() hdl: Optional[SubscriptionHandle] = None try: hdl = self.subscribe_topic(topic, resp.set_result, qos)