Twisted Reactor 未在树莓派上启动任务

问题描述 投票:0回答:1

我不知道为什么这段代码在我的 Windows 计算机上有效,但在我的树莓派上却被阻止。

from twisted.internet import reactor, ssl, threads
from autobahn.twisted.websocket import WebSocketClientFactory, WebSocketClientProtocol
import queue
from queue import Empty
import threading
import time

connection_signal = threading.Event()


class MyWebSocketClientProtocol(WebSocketClientProtocol):
    def onOpen(self):
        print("WebSocket connection opened.")
        self.factory.client_protocol = self

    def onConnect(self, response):
        print(f"Server connected: {response.peer}")
        connection_signal.set()

    def onClose(self, wasClean, code, reason):
        print(f"WebSocket connection closed: {reason}")

    def onMessage(self, payload, isBinary):
        print(f"Message received: {payload}")


class WebSocketManager:
    def __init__(self, queue):
        self.queue = queue
        self.client_protocol = None

    def connect(self, host, port):
        self.factory = WebSocketClientFactory(f"wss://{host}:{port}")
        self.factory.protocol = MyWebSocketClientProtocol
        context_factory = ssl.ClientContextFactory()
        reactor.connectSSL(host, port, self.factory, context_factory)
        connection_signal.wait()

    def disconnect(self):
        self.factory.client_protocol.sendCloseFrame()

    def send_message(self, message):
        self.factory.client_protocol.sendMessage(message.encode("utf-8"))


def reactor_thread(queue):
    manager = WebSocketManager(queue)

    def process_queue():
        while True:
            try:
                msg = queue.get(timeout=0.1)
                if msg:
                    if msg == "connect":
                        manager.connect("192.168.2.98", 8000)
                        time.sleep(0.1)  # allow the connection to be established
                    elif msg == "disconnect":
                        manager.disconnect()
                    else:
                        manager.send_message(msg)
            except Empty:
                pass

    reactor.callInThread(process_queue)
    reactor.run(installSignalHandlers=False)


def main():
    message_queue = queue.Queue()
    reactor_thread_instance = threading.Thread(
        target=reactor_thread, args=(message_queue,)
    )
    reactor_thread_instance.start()

    message_queue.put("connect")
    message_queue.put("Hello WebSocket!")
    message_queue.put("disconnect")
    time.sleep(5)
    # reactor.stop()
    # reactor_thread_instance.join()


if __name__ == "__main__":
    main()

我看到我的计算机运行 selectreactor,而树莓派运行 epollreactor,但我尝试在计算机上强制选择树莓派和 epoll,但没有任何变化。

我发现树莓派永远不会调用 MyWebSocketClientProtocol 类的 __init__ ,而计算机则会调用。

在 venv 中,库和 python 是相同的版本(autobahn == 23.6.2,twisted == 23.10.0,pyopenssl == 24.2.1,python3.12)

python python-3.x raspberry-pi twisted
1个回答
0
投票

这不是使用 Twisted 的正确方法。 您问题中的代码中的多个线程有时可能会意外工作,但正如您发现的那样,它并不可靠。 Twisted 是一个协作式多任务系统,(几乎)专门在单个线程中运行。 API 很大程度上不是线程安全的。 Twisted 为事件通知 (

Deferred
) 和基于时间的调度 (
IReactorTime
) 提供了自己的抽象,作为第一个近似值,您需要使用它们来代替
threading.Event
time.sleep

您问题中的代码有时会挂起,因为 Twisted 的 API 并不像您那样从多个线程调用。 由于时间上的意外,该程序有时可能会产生与您期望的结果类似的结果,但像这样构造的程序永远不会可靠地工作。

© www.soinside.com 2019 - 2024. All rights reserved.