使用以下代码,我可以强制 mqtt 客户端继续重新建立与消息代理的连接,即使它尚未启动。我想通过使用自定义 try/ except 来实现它,如下所示。
但是对我来说,似乎应该有一种本地方法来使其工作。
import time
import paho.mqtt.client as mqtt
def main():
# The callback for when the client receives a CONNACK response from the server.
def on_connect(client, userdata, flags, reason_code, properties):
print(f"Connected with result code {reason_code}")
# Subscribing in on_connect() means that if we lose the connection and
# reconnect then subscriptions will be renewed.
client.subscribe("sensors")
# The callback for when a PUBLISH message is received from the server.
def on_message(client, userdata, msg):
print(msg.topic+" "+str(msg.payload))
def on_connect_fail(client, userdata, flags, reason_code, properties):
print(f"Connection failed with result code {reason_code}")
mqttc = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, protocol=mqtt.MQTTv311)
# mqttc.enable_logger()
mqttc.on_connect = on_connect
mqttc.on_message = on_message
mqttc.on_connect_fail = on_connect_fail
mqttc.username_pw_set("username", "password")
mqttc.tls_set(ca_certs="ca.crt", certfile="client.crt", keyfile="client.key")
# Blocking call that processes network traffic, dispatches callbacks and
# handles reconnecting.
# Other loop*() functions are available that give a threaded interface and a
# manual interface.
try:
mqttc.connect("mqtt.client.server.com", port=8883, keepalive=60)
mqttc.loop_forever()
except (TimeoutError, ConnectionRefusedError) as e:
print("Error: ", e)
time.sleep(10)
print("Attempting again")
main()
if __name__ == "__main__":
main()
工作流程:
经纪商机器已停止
脚本已启动
经纪商机器启动
经纪商已启动
客户端连接成功
❯ poetry run paho
Warning: 'paho' is an entry point defined in pyproject.toml, but it's not installed as a script. You may get improper `sys.argv[0]`.
The support to run uninstalled scripts will be removed in a future release.
Run `poetry install` to resolve and get rid of this message.
0
Error: timed out
Attempting again
0
Error: timed out
Attempting again
0
Error: timed out
Attempting again
0
Error: timed out
Attempting again
0
Error: timed out
Attempting again
0
Error: timed out
Attempting again
0
Error: timed out
Attempting again
0
Error: timed out
Attempting again
0
Error: timed out
Attempting again
0
Error: timed out
Attempting again
0
Error: timed out
Attempting again
0
Error: timed out
Attempting again
0
Error: [Errno 61] Connection refused
Attempting again
0
Error: timed out
Attempting again
0
Connected with result code Success
我尝试将
loop_forever
设置为重试第一个连接为 true,但它不起作用。
mqttc.loop_forever(retry_first_connection=True)
您正在拨打
mqttc.connect("mqtt.client.server.com", port=8883, keepalive=60)
,并且根据文档 connect
:
是一个阻塞调用,用于建立底层连接并传输 CONNECT 数据包。
您看到的异常是由
connect
引发的,这意味着您传递给 retry_first_connection=True
的 loop_forever
是无关紧要的(因为该行未运行)。
最简单的解决方案是避免阻塞调用
connect
,并使用 connect_async
代替,即:
mqttc.connect_async("mqtt.client.server.com", port=1883, keepalive=60)
try:
mqttc.loop_forever(retry_first_connection=True)
connect_async
不会尝试直接连接,而是进行配置,以便在网络循环运行时建立连接(这意味着retry_first_connection
将按预期工作)。
注意:本期中有介绍。