我有一个Python函数
foo
,它实际上是一个由许多函数组成的层,并且定期回调我提供的Python函数bar
,例如foo(bar)
。我还有一个在主线程上运行的 asyncio 事件循环,但是 foo
的内部并不知道 asyncio。
现在我有另一个线程正在运行,它将事件发送到主线程的事件循环。我想使用回调
bar
来处理这些事件。在其他框架/语言中,例如 Qt 中的 processEvents
之类的函数可用于完成此任务,但我找不到 asyncio 的类似功能。
根据我对文档的印象,我尝试了一些我认为可行的方法,例如
def bar():
asyncio.gather(*asyncio.all_tasks())
或
def bar():
asyncio.sleep(0)
但是从另一个线程发布的事件仍然没有被处理。
免责声明:更好的软件设计可能是让
foo
使用asyncio,但我正在寻找一种解决方案来解决无法修改foo
代码的情况。
bar
需要是一个异步函数,它将在主线程的事件循环中运行,即正在等待的任务正在运行的事件循环中。
在您的代码中,您使用对
asyncio.all_tasks
的调用,它仅返回未完成的任务。不幸的是,根据定义,这将包括回调任务,因此回调任务将等待其自身完成,而您将永远等待。我建议您将要等待的任务添加到 tasks_to_be_awaited
列表中,如下面的代码所示,以便我们准确地知道应该等待的任务。
技巧是使用
asyncio.run_coroutine_threadsafe(coro, loop)
来调度协程 coro
在事件循环 loop
中运行。请参阅asyncio.run_coroutine_threadsafe)。
import asyncio
from collections import deque
tasks_to_be_awaited = []
def foo():
...
future = asyncio.run_coroutine_threadsafe(bar(), main_event_loop)
future.result() # Wait for callback to complete
async def bar():
while tasks_to_be_awaited:
task = tasks_to_be_awaited.pop()
if not task.done():
result = await task
print(task.get_name(), result)
async def my_task():
# So that it is still running when we do the callback
await asyncio.sleep(1)
return 17
async def main():
global main_event_loop
# Make our event loop accessible to all threads:
main_event_loop = asyncio.get_running_loop() # loop for this thread
# Create a task to be awaited by our callback bar:
task = asyncio.create_task(my_task())
tasks_to_be_awaited.append(task)
# Emulate another, non-async thread running:
future = main_event_loop.run_in_executor(None, foo)
await future # Wait for foo to complete
asyncio.run(main())
打印:
Task-2 17