我尝试在另一个线程中使用 websocket 客户端以 json 字符串发送运动传感器数据,以避免 MotionSensor 类中的无限循环导致其余代码的执行阻塞。但显然
ws.send()
需要等待关键字。如果我添加它,我会收到错误
RuntimeWarning:从未等待过协程“MotionSensors.run” 自我运行() RuntimeWarning:启用tracemalloc以获取对象分配回溯 并且它不会向服务器发送任何内容
# motionSensor.py
import threading
import time
from client.ClientRequest import Request
class MotionSensors(threading.Thread):
def __init__(self, ws):
threading.Thread.__init__(self)
self.ws = ws
self.open = True
async def run(self):
await self.SendData()
async def SendData(self):
while self.open:
print("Sending motion state....")
state = 1 # Motion state demo value
request = Request()
request.push("mcu/sensors/motion")
request.addBody({
"state_type": "single",
"devices": {"state": state, "device_no": "DVC-876435"}
})
await self.ws.send(request.getAsJsonString())
print("sleeping now for 2 seconds....")
time.sleep(2)
这是我的主要代码 客户端.py
# client.py
import settings
import asyncio
import websockets
from client.ClientHandler import Devices
from client.Rounte import Route
from ClientRequest import Request
from client.dbHandler import mcuConfig
from client.devices.motionSensor import MotionSensors
def ResponseMSG(request):
print(request)
route = Route()
route.addRoute("/response", ResponseMSG)
def onMessage(request):
route.fireRequest(request)
async def WsClient():
uri = settings.WS_URL
async with websockets.connect(uri) as websocket:
#####################################
###INITIALIZE DEVICES
motion = MotionSensors(websocket)
motion.start()
while True:
print("waiting to recieve......")
message = await websocket.recv()
onMessage(message)
loop = asyncio.get_event_loop()
loop.run_until_complete(WsClient())
loop.run_forever()
伙计们,我需要你们的帮助,用 while 循环在另一个线程中发送数据,而不会阻止代码的执行,也不会出现错误。提前非常感谢您
更新:2024 年
import asyncio
from websockets.server import serve
import threading
from threading import Event
import time
clients = set()
TASK_QUE = {}
async def register(websocket, path):
clients.add(websocket)
async def unregister(websocket, path):
if len(clients) > 0:
clients.remove(websocket)
async def Task(ws, taskId, event: Event):
for i in range(0, 100):
print(f"Task #{taskId} work {i} started")
time.sleep(3)
print(f"Task #{taskId} work {i} completed")
if event.is_set():
# End Task
await ws.send("Task stopped")
break
await ws.send("Task completed")
def Worker(ws, taskId, event):
asyncio.run(Task(ws, taskId, event))
async def NotifyListeners(message):
for client in clients:
await client.send(message)
def getTaskId(path):
return str(path).replace("/", "").lower()
def StopTask(taskId):
if taskId not in TASK_QUE:
task = TASK_QUE["taskId"]
_event = task["event"]
_event.set()
TASK_QUE.pop(taskId)
def RemoveTask(taskId):
if taskId not in TASK_QUE:
task = TASK_QUE["taskId"]
_event = task["event"]
_thread = task["thread"]
if _thread.is_alive(): # Stop the thread if it is still active
_event.set()
TASK_QUE.pop(taskId)
def StartTask(taskId):
if taskId not in TASK_QUE:
task = TASK_QUE["taskId"]
_thread = task["thread"]
if not _thread.is_alive():
_thread.start()
def addTask(ws, taskId, start=True):
event = threading.Event()
_thread = threading.Thread(target=Worker, args=(ws, taskId, event))
if start:
_thread.start()
TASK_QUE[taskId] = {"thread": _thread, "event": event}
async def HandleClient(message, ws, path):
taskId = getTaskId(
path
) # URL should have a Task or Session Id e.g ws://127.0.0.1:2625/77776565/
await NotifyListeners(message)
if taskId not in TASK_QUE:
addTask(ws, taskId)
# You can send Events to start and stop specific tasks with StartTask(taskId) and StopTask(taskId) from client using JSON
## URL Path
async def WebSocketServer(websocket, path):
try:
await register(websocket, path)
async for message in websocket:
await HandleClient(message, websocket, path)
finally:
await unregister(websocket, path)
async def Main():
# Start server
async with serve(WebSocketServer, "0.0.0.0", 2625):
print(f"WebSocketServer Server started at port {2625}")
await asyncio.Future() # run forever
asyncio.run(Main())