常规MQTT和websocket MQTT如何通信?

问题描述 投票:0回答:2

我有一个主应用程序,它初始化fastAPI并使用fastAPI MQTT。现在,该文件正在事件循环中运行,并正在寻找常规 MQTT 端口。

我有第二个应用程序,它使用 MQTT Websocket 并在事件循环中运行。现在,我希望这两个应用程序能够相互交换信息。我怎样才能让他们交换信息。一种是使用 websocket MQTT,另一种是使用快速 API MQTT。有什么办法可以让他们互相交谈。这是

main.py - 运行 fastapi 和 mqtt - 常规的。

from fastapi import FastAPI
from fastapi_mqtt import FastMQTT, MQTTConfig
from pydantic import BaseModel
from ipaddress import IPv4Address
import jsonpickle

app = FastAPI()


class Nmap(BaseModel):
    host: IPv4Address
    portRange: str

    class Config:
        schema_extra = {
            "example" : {
                "host": "10.0.2.15",
                 "portRange": "22-80",
                 "description": "Scan the port from 22 to 80 of the ip address 10.0.2.15"
            }
        }


## for docker - compose using mqtt - broker name from docker compose.
## for normal leave that blank to fall off to localhost.
mqtt_config = MQTTConfig(host = "mqtt")

mqtt = FastMQTT(config=mqtt_config)

mqtt.init_app(app)

@mqtt.on_connect()
def connect(client, flags, rc, properties):
    mqtt.client.subscribe("/mqtt/toModel/#") # subscribing mqtt topic wildcard- multi-level
    print("connected: ", client, flags, rc, properties)

@mqtt.on_message()
async def message(client, topic, payload, qos, properties):
    print("received message: ", topic, jsonpickle.decode(payload.decode()), qos, properties)
    return 0 


@mqtt.on_disconnect()
def disconnect(client, packet, exc=None):
    print("Disconnected")

@mqtt.on_subscribe()
def subscribe(client, mid, qos, properties):
    print("subscribed", client, mid, qos, properties)

@app.get("/")
async def func():
    mqtt.client.publish("/mqtt", "Hello from fastApi") 
    return {"result": True, "message": "Published"}

@app.post("/scan/{host}")
async def scan_host_port(nmap_details : Nmap):
    results = {"got_val" : nmap_details}
    print(type(nmap_details))
    mqtt.client.publish("/mqtt/fromModel/nmap", jsonpickle.encode(nmap_details)) 
    return results

这是基于 websocket 的 mqtt。

import paho.mqtt.client as paho
import time
broker="10.0.2.15"
#port= 80
#port=1883
port= 9001
sub_topic="ws/pythonApp"
def on_subscribe(client, userdata, mid, granted_qos):   #create function for callback
   print("subscribed with qos",granted_qos, "\n")
   pass
def on_message(client, userdata, message):
    print("message received  "  ,str(message.payload.decode("utf-8")))
    
def on_publish(client,userdata,mid):   #create function for callback
   print("data published mid=",mid, "\n")
   pass

def on_disconnect(client, userdata, rc):
   print("client disconnected ok") 
client= paho.Client("client-socks",transport='websockets')       #create client object
#client= paho.Client("control1")
client.on_subscribe = on_subscribe       #assign function to callback
client.on_publish = on_publish        #assign function to callback
client.on_message = on_message        #assign function to callback
client.on_disconnect = on_disconnect
print("connecting to broker ",broker,"on port ",port)
client.connect(broker,port)           #establish connection
client.loop_start()
print("subscribing to ",sub_topic)
client.subscribe(sub_topic)
time.sleep(3)
client.publish("ws/jsclient","on")    #publish
time.sleep(4)

client.disconnect()
python-3.x websocket mqtt fastapi phpmqtt
2个回答
1
投票

您无需执行任何操作,MQTT 代理会将两种输入协议(本机 MQTT 和 WebSocket 上的 MQTT)桥接到同一主题空间。

发布的任何内容都将发布给任何订阅者,无论订阅者是通过本机 MQTT 还是通过 WebSockets 的 MQTT 连接。


0
投票

MQTT 是一种发布/订阅协议,而不是端点协议。 客户如何连接到经纪人并不重要。您无需在客户端之间显式创建任何连接或路径。这就是 MQTT 的全部意义! 唯一允许他们交换数据的是主题名称。 因此,如果客户端 1 向主题“myTopic”发布消息,并且客户端 2 订阅了该主题(或过滤该主题的通配符主题),则客户端 2 将收到该消息。就是这样。

© www.soinside.com 2019 - 2024. All rights reserved.