我想实现 paho-mqtt,其中它应该异步处理传入消息。
我已经使用 asyncio 实现了 gmqtt,它运行得很好,但据我了解 paho-mqtt 比 gmqtt 更好用(链接:https://www.emqx.io/blog/comparision-of-python- mqtt-客户端)。
gmqtt 带异步:
def assign_callbacks_to_client(self, client):
""" Helper function which sets up client's callbacks. """
client.on_connect = self.on_connect
client.on_message = self.on_message
client.on_disconnect = self.on_disconnect
client.on_subscribe = self.on_subscribe
async def subscriber(self, mqtt_name):
""" Connect to mqtt-broker. """
sub_client = MQTTClient(mqtt_name)
self.assign_callbacks_to_client(sub_client)
logging.info("connecting")
await sub_client.connect(host=config.MQTT_HOST, port=int(config.MQTT_PORT))
return sub_client
您能让我知道如何使用 asyncio 库实现 paho 吗? 将异步使用loop_start,据我了解,每次执行它都会在后台启动一个新线程。
loop_start()
仅创建一个运行所有回调的后台线程,您不应该直接在这些回调中执行长时间运行的任务,因为它们会阻止客户端的所有其他操作。
如果您想在不阻塞的情况下处理传入消息,那么您需要实现自己的线程池,并只需使用
on_message()
回调将消息推送到该线程池。
看看aiomqtt。 aiomqtt 将经过时间验证的 paho-mqtt 库的稳定性与直观、惯用的异步界面相结合。