我有一个主应用程序,它初始化fastAPI并使用fastAPI MQTT。现在,该文件正在事件循环中运行,并正在寻找常规 MQTT 端口。
我有第二个应用程序,它使用 MQTT Websocket 并在事件循环中运行。现在,我希望这两个应用程序能够相互交换信息。我怎样才能让他们交换信息。一种是使用 websocket MQTT,另一种是使用快速 API MQTT。有什么办法可以让他们互相交谈。这是
main.py - 运行 fastapi 和 mqtt - 常规的。
from fastapi import FastAPI
from fastapi_mqtt import FastMQTT, MQTTConfig
from pydantic import BaseModel
from ipaddress import IPv4Address
import jsonpickle
app = FastAPI()
class Nmap(BaseModel):
host: IPv4Address
portRange: str
class Config:
schema_extra = {
"example" : {
"host": "10.0.2.15",
"portRange": "22-80",
"description": "Scan the port from 22 to 80 of the ip address 10.0.2.15"
}
}
## for docker - compose using mqtt - broker name from docker compose.
## for normal leave that blank to fall off to localhost.
mqtt_config = MQTTConfig(host = "mqtt")
mqtt = FastMQTT(config=mqtt_config)
mqtt.init_app(app)
@mqtt.on_connect()
def connect(client, flags, rc, properties):
mqtt.client.subscribe("/mqtt/toModel/#") # subscribing mqtt topic wildcard- multi-level
print("connected: ", client, flags, rc, properties)
@mqtt.on_message()
async def message(client, topic, payload, qos, properties):
print("received message: ", topic, jsonpickle.decode(payload.decode()), qos, properties)
return 0
@mqtt.on_disconnect()
def disconnect(client, packet, exc=None):
print("Disconnected")
@mqtt.on_subscribe()
def subscribe(client, mid, qos, properties):
print("subscribed", client, mid, qos, properties)
@app.get("/")
async def func():
mqtt.client.publish("/mqtt", "Hello from fastApi")
return {"result": True, "message": "Published"}
@app.post("/scan/{host}")
async def scan_host_port(nmap_details : Nmap):
results = {"got_val" : nmap_details}
print(type(nmap_details))
mqtt.client.publish("/mqtt/fromModel/nmap", jsonpickle.encode(nmap_details))
return results
这是基于 websocket 的 mqtt。
import paho.mqtt.client as paho
import time
broker="10.0.2.15"
#port= 80
#port=1883
port= 9001
sub_topic="ws/pythonApp"
def on_subscribe(client, userdata, mid, granted_qos): #create function for callback
print("subscribed with qos",granted_qos, "\n")
pass
def on_message(client, userdata, message):
print("message received " ,str(message.payload.decode("utf-8")))
def on_publish(client,userdata,mid): #create function for callback
print("data published mid=",mid, "\n")
pass
def on_disconnect(client, userdata, rc):
print("client disconnected ok")
client= paho.Client("client-socks",transport='websockets') #create client object
#client= paho.Client("control1")
client.on_subscribe = on_subscribe #assign function to callback
client.on_publish = on_publish #assign function to callback
client.on_message = on_message #assign function to callback
client.on_disconnect = on_disconnect
print("connecting to broker ",broker,"on port ",port)
client.connect(broker,port) #establish connection
client.loop_start()
print("subscribing to ",sub_topic)
client.subscribe(sub_topic)
time.sleep(3)
client.publish("ws/jsclient","on") #publish
time.sleep(4)
client.disconnect()
您无需执行任何操作,MQTT 代理会将两种输入协议(本机 MQTT 和 WebSocket 上的 MQTT)桥接到同一主题空间。
发布的任何内容都将发布给任何订阅者,无论订阅者是通过本机 MQTT 还是通过 WebSockets 的 MQTT 连接。
MQTT 是一种发布/订阅协议,而不是端点协议。 客户如何连接到经纪人并不重要。您无需在客户端之间显式创建任何连接或路径。这就是 MQTT 的全部意义! 唯一允许他们交换数据的是主题名称。 因此,如果客户端 1 向主题“myTopic”发布消息,并且客户端 2 订阅了该主题(或过滤该主题的通配符主题),则客户端 2 将收到该消息。就是这样。