我在 Python 中有一个基本的 MQTTListener 类,它侦听有关某些主题的消息,并且应该启动或停止从另一个脚本导入的异步进程。这个过程永远运行,除非它被手动停止。让我们假设监听器看起来像这样:
import paho.mqtt.client as mqtt
import json
from python_file.py import async_forever_function
class MqttListener:
def __init__(self, host, port, client_id):
self.host = host
self.port = port
self.client_id = client_id
self.client = mqtt.Client(client_id=self.client_id)
self.client.connect(host=self.host, port=self.port)
def on_connect(self, client, userdata, flags, rc):
self.client.subscribe(topic=[("start", 1), ])
self.client.subscribe(topic=[("stop", 1), ])
logging.info(msg="MQTT - connected!")
def on_disconnect(client, userdata, rc):
logging.info(msg="MQTT - disconnected!")
def on_message(self, client, userdata, message, ):
print('PROCESSING MESSAGE', message.topic, message.payload.decode('utf-8'), )
if message.topic == 'start':
async_forever_function(param='start')
print('process started')
else:
async_forever_function(param='stop')
print('process removed')
def start(self):
self.client.on_connect = lambda client, userdata, flags, rc: self.on_connect(client, userdata, flags, rc)
self.client.on_message = lambda client, userdata, message: self.on_message(client, userdata, message)
self.client.on_disconnect = lambda client, userdata, rc: self.on_disconnect(client, userdata, rc)
self.client.loop_start()
def stop(self):
self.client.loop_stop()
现在,这适用于 starting 一个新的异步进程。也就是说,当在启动 MQTT 主题上发布消息时,会正确触发 async_function。然而,一旦这个异步进程启动,侦听器就不再能够接收/处理来自停止 MQTT 主题的消息,并且异步进程将永远继续运行,而实际上它应该被停止。
我的问题:我如何调整此类的代码,使其在后台运行活动异步进程时也可以处理消息?
您不能在
on_message()
回调中执行阻塞任务。
此回调在 MQTT 客户端线程(由
loop_start()
函数启动的线程)上运行。此线程处理所有网络流量和消息处理,如果您阻止它,则它无法执行任何操作。
如果您想从
on_message()
回调中调用长时间运行的任务,您需要为长时间运行的任务启动一个新线程,这样它就不会阻塞 MQTT 客户端循环。
正如 hardillb 所说,如果您阻止
on_message
您的客户端,它将无法接收更多传入消息。有两种选择:使用 https://pypi.org/project/asyncio-mqtt/ 编写代码或编写守护进程。第一个选择是异步行为(即协程),它比第二个更容易访问。我用 Python 写了一个例子。 要点.