使用 websockets 与另一个线程发送消息,没有 RuntimeWarning:协程 'WebSocketCommonProtocol.send' 从未等待过

问题描述 投票:0回答:1

我尝试在另一个线程中使用 websocket 客户端以 json 字符串发送运动传感器数据,以避免 MotionSensor 类中的无限循环导致其余代码的执行阻塞。但显然

ws.send()
需要等待关键字。如果我添加它,我会收到错误

RuntimeWarning:从未等待过协程“MotionSensors.run” 自我运行() RuntimeWarning:启用tracemalloc以获取对象分配回溯 并且它不会向服务器发送任何内容

# motionSensor.py
import threading
import time
from client.ClientRequest import Request


class MotionSensors(threading.Thread):
    def __init__(self, ws):
        threading.Thread.__init__(self)
        self.ws = ws
        self.open = True

    async def run(self):
        await self.SendData()

    async def SendData(self):
        while self.open:
            print("Sending motion state....")
            state = 1 # Motion state demo value
            request = Request()
            request.push("mcu/sensors/motion")
            request.addBody({
                "state_type": "single",
                "devices": {"state": state, "device_no": "DVC-876435"}
            })
            await self.ws.send(request.getAsJsonString())
            print("sleeping now for 2 seconds....")
            time.sleep(2)

这是我的主要代码 客户端.py

# client.py
import settings
import asyncio
import websockets
from client.ClientHandler import Devices
from client.Rounte import Route
from ClientRequest import Request
from client.dbHandler import mcuConfig
from client.devices.motionSensor import MotionSensors

def ResponseMSG(request):
    print(request)

route = Route()
route.addRoute("/response", ResponseMSG)


def onMessage(request):
    route.fireRequest(request)

async def WsClient():
    uri = settings.WS_URL
    async with websockets.connect(uri) as websocket:
        #####################################
        ###INITIALIZE DEVICES
        motion = MotionSensors(websocket)
        motion.start()
        while True:
            print("waiting to recieve......")
            message = await websocket.recv()
            onMessage(message)

loop = asyncio.get_event_loop()
loop.run_until_complete(WsClient())
loop.run_forever()

伙计们,我需要你们的帮助,用 while 循环在另一个线程中发送数据,而不会阻止代码的执行,也不会出现错误。提前非常感谢您

python multithreading websocket python-asyncio
1个回答
1
投票

更新:2024 年

import asyncio
from websockets.server import serve
import threading
from threading import Event
import time

clients = set()
TASK_QUE = {}


async def register(websocket, path):
    clients.add(websocket)


async def unregister(websocket, path):
    if len(clients) > 0:
        clients.remove(websocket)


async def Task(ws, taskId, event: Event):
    for i in range(0, 100):
        print(f"Task #{taskId} work {i} started")
        time.sleep(3)
        print(f"Task #{taskId} work {i} completed")
        if event.is_set():
            # End Task
            await ws.send("Task stopped")
            break
        await ws.send("Task completed")


def Worker(ws, taskId, event):
    asyncio.run(Task(ws, taskId, event))


async def NotifyListeners(message):
    for client in clients:
        await client.send(message)


def getTaskId(path):
    return str(path).replace("/", "").lower()


def StopTask(taskId):
    if taskId not in TASK_QUE:
        task = TASK_QUE["taskId"]
        _event = task["event"]
        _event.set()
        TASK_QUE.pop(taskId)


def RemoveTask(taskId):
    if taskId not in TASK_QUE:
        task = TASK_QUE["taskId"]
        _event = task["event"]
        _thread = task["thread"]
        if _thread.is_alive():  # Stop the thread if it is still active
            _event.set()
        TASK_QUE.pop(taskId)


def StartTask(taskId):
    if taskId not in TASK_QUE:
        task = TASK_QUE["taskId"]
        _thread = task["thread"]
        if not _thread.is_alive():
            _thread.start()


def addTask(ws, taskId, start=True):
    event = threading.Event()
    _thread = threading.Thread(target=Worker, args=(ws, taskId, event))
    if start:
        _thread.start()
    TASK_QUE[taskId] = {"thread": _thread, "event": event}


async def HandleClient(message, ws, path):
    taskId = getTaskId(
        path
    )  # URL should have a Task or Session Id e.g ws://127.0.0.1:2625/77776565/
    await NotifyListeners(message)
    if taskId not in TASK_QUE:
        addTask(ws, taskId)
    # You can send Events to start and stop specific tasks with StartTask(taskId) and StopTask(taskId)  from client using JSON
    ## URL Path


async def WebSocketServer(websocket, path):
    try:
        await register(websocket, path)
        async for message in websocket:
            await HandleClient(message, websocket, path)
    finally:
        await unregister(websocket, path)


async def Main():
    # Start server
    async with serve(WebSocketServer, "0.0.0.0", 2625):
        print(f"WebSocketServer Server started at port {2625}")
        await asyncio.Future()  # run forever


asyncio.run(Main())
© www.soinside.com 2019 - 2024. All rights reserved.