我创建了一个程序,用于处理来自设备的数据流并将其写入 Websocket 供 Web 应用程序使用。我编写的用于从所述设备读取的库及其计算值是一个同步(因此阻塞)库。我不想用异步重写它,所以我使用 asyncio
yield
函数在线程中运行它(准确地说:我使用龙卷风,因为该程序也会接受网络请求)。虽然代码有效,但我经常收到错误消息,指出从未检索到 Future 异常。此异常是与在执行器中运行阻塞函数的代码相关的超时错误(代码块下方的错误)。
注意,如果不将异步事件策略设置为run_in_executor
,我无法在执行器中正确运行该函数。如果我不这样做,我会不断收到“ThreadExecutor 中没有当前事件循环”的错误,并且我还没有找到解决方法。
tornado.platform.asyncio.AnyThreadEventLoopPolicy()
错误(删节):
class ClientHandler(tornado.websocket.WebSocketHandler):
clients = set()
def __init__(self, *args, **kwargs):
self.config = kwargs.pop("config")
super().__init__(*args, **kwargs)
self.started = False
def on_message(self, message):
if message == "data":
WebClientStreamHandler.clients.add(self)
self.start_client()
def write_data(self, message: str):
for client in WebClientStreamHandler.clients:
client.write_message(message)
def start_client(self):
if self.started:
return
asyncio.set_event_loop_policy(tornado.platform.asyncio.AnyThreadEventLoopPolicy())
sync_func = partial(run_client_forever, self.config)
loop = tornado.ioloop.IOLoop.current()
loop.run_in_executor(None, sync_func)
self.started = True
基本上,
ERROR:asyncio:Future exception was never retrieved
future: <Future finished exception=TimeoutError('timed out') created at /usr/lib/python3.11/asyncio/base_events.py:427>
Traceback:
...
File "/path/src/cps_demo/web_stream_handler.py", line 118, in start_client
loop.run_in_executor(None, sync_func)
File "/path/venv/lib/python3.11/site-packages/tornado/platform/asyncio.py", line 266, in run_in_executor
return self.asyncio_loop.run_in_executor(executor, func, *args)
File "/usr/lib/python3.11/asyncio/base_events.py", line 828, in run_in_executor
return futures.wrap_future(
File "/usr/lib/python3.11/asyncio/futures.py", line 417, in wrap_future
new_future = loop.create_future()
File "/usr/lib/python3.11/asyncio/base_events.py", line 427, in create_future
return futures.Future(loop=self)
TimeoutError: timed out
变量是一个实例变量,但由于tornado在每个连接上启动处理程序的新实例,因此
started
被设置为started
,导致客户端第二次启动。由于设备一次仅支持单个 TCP 连接,因此第二个客户端最终超时,导致异常。
将
False
与
started
一起作为类变量解决了问题。我可能应该将此函数包装在单例中。