将 paho MQTT 与 Python 中的另一个异步进程结合起来

问题描述 投票:0回答:2

我在 Python 中有一个基本的 MQTTListener 类,它侦听有关某些主题的消息,并且应该启动或停止从另一个脚本导入的异步进程。这个过程永远运行,除非它被手动停止。让我们假设监听器看起来像这样:

import paho.mqtt.client as mqtt
import json
from python_file.py import async_forever_function

class MqttListener:

    def __init__(self, host, port, client_id):
        self.host = host
        self.port = port
        self.client_id = client_id
        self.client = mqtt.Client(client_id=self.client_id)
        self.client.connect(host=self.host, port=self.port)

    def on_connect(self, client, userdata, flags, rc):
        self.client.subscribe(topic=[("start", 1), ])
        self.client.subscribe(topic=[("stop", 1), ])
        logging.info(msg="MQTT - connected!")

    def on_disconnect(client, userdata, rc):
        logging.info(msg="MQTT - disconnected!")

    def on_message(self, client, userdata, message, ):
        print('PROCESSING MESSAGE', message.topic, message.payload.decode('utf-8'), )

        if message.topic == 'start':
            async_forever_function(param='start')
            print('process started')
        else:
            async_forever_function(param='stop')
            print('process removed')

    def start(self):
        self.client.on_connect = lambda client, userdata, flags, rc: self.on_connect(client, userdata, flags, rc)
        self.client.on_message = lambda client, userdata, message: self.on_message(client, userdata, message)
        self.client.on_disconnect = lambda client, userdata, rc: self.on_disconnect(client, userdata, rc)

        self.client.loop_start()

    def stop(self):
        self.client.loop_stop()

现在,这适用于 starting 一个新的异步进程。也就是说,当在启动 MQTT 主题上发布消息时,会正确触发 async_function。然而,一旦这个异步进程启动,侦听器就不再能够接收/处理来自停止 MQTT 主题的消息,并且异步进程将永远继续运行,而实际上它应该被停止。

我的问题:我如何调整此类的代码,使其在后台运行活动异步进程时也可以处理消息?

python-3.x asynchronous mqtt
2个回答
0
投票

您不能在

on_message()
回调中执行阻塞任务。

此回调在 MQTT 客户端线程(由

loop_start()
函数启动的线程)上运行。此线程处理所有网络流量和消息处理,如果您阻止它,则它无法执行任何操作。

如果您想从

on_message()
回调中调用长时间运行的任务,您需要为长时间运行的任务启动一个新线程,这样它就不会阻塞 MQTT 客户端循环。


0
投票

正如 hardillb 所说,如果您阻止

on_message
您的客户端,它将无法接收更多传入消息。有两种选择:使用 https://pypi.org/project/asyncio-mqtt/ 编写代码或编写守护进程。第一个选择是异步行为(即协程),它比第二个更容易访问。我用 Python 写了一个例子。 要点.

© www.soinside.com 2019 - 2024. All rights reserved.