我有一个到处使用 async/await 运行的应用程序。通常,它调用同步函数。这些函数有时必须再次调用异步函数。我试图通过使用线程中的
asyncio.run_coroutine_threadsafe()
,使用主线程的循环来完成这项工作,但等待 run_coroutine_threadsafe()
的 Future
通过调用 .result()
来完成似乎是挂起的。
以下是我在 Python 3.12 和 3.13 上重现问题的示例代码:
from asyncio import run_coroutine_threadsafe, get_running_loop, run, AbstractEventLoop
from collections.abc import Callable, Awaitable
from concurrent.futures.thread import ThreadPoolExecutor
from typing import TypeVar
_T = TypeVar("_T")
def _await_in_thread(loop: AbstractEventLoop, f: Callable[[], Awaitable[_T]]) -> _T:
"""
Await something inside a thread, using the main thread's loop.
"""
print("THIS IS PRINTED")
return run_coroutine_threadsafe(f(), loop).result()
def _await_to_thread(pool: ThreadPoolExecutor, f: Callable[[], Awaitable[_T]]) -> _T:
"""
Await something by moving it to a thread.
"""
return pool.submit(_await_in_thread, get_running_loop(), f).result()
async def _async_main(pool: ThreadPoolExecutor) -> None:
"""
Run the main application, which is asynchronous.
"""
# Eventually, the application calls a function that due to its nature (maybe a third-party API) is synchronous.
_some_sync_function(pool)
def _sync_main() -> None:
with ThreadPoolExecutor() as pool:
run(_async_main(pool))
def _some_sync_function(pool: ThreadPoolExecutor) -> None:
# This synchronous function then has to call a function that is asynchronous.
result = _await_to_thread(pool, _some_async_function)
assert result == 123
async def _some_async_function() -> int:
print("BUT THIS IS NOT PRINTED")
return 123
if __name__ == "__main__":
_sync_main()
虽然示例可以稍微简单一些,但我想包括
ThreadPoolExecutor
的使用以及返回值的传播,因为这是最终产品的两个要求。
此外,Python 兼容性为 3.11+,但如果没有其他解决方案,我可能会确信将其设为 3.12+。
这是 IRC 上的单个答案以及我在 Python 文档以及各种问答网站和博客中找到的非常简洁的
run_coroutine_threadsafe()
示例的结果。它似乎不是人们普遍知道或使用的功能。
我的问题是:我误解了什么?这应该如何运作?
您遇到了僵局。
从 asyncio 的事件循环中,您将一个函数提交到池中,同时也在等待结果。在等待期间,事件循环被阻塞(这在 asyncio 中不应该发生!)。它发生在这里:
return pool.submit(_await_in_thread, get_running_loop(), f).result()
相当于:
future = pool.sumbit(...)
return future.result() # wait for the result synchronously!
这个提交的函数提交另一个函数,这次提交给 asyncio,并再次等待结果。与上面相同的模式,实际代码:
return run_coroutine_threadsafe(f(), loop).result()
但是 asyncio 无法执行任何工作,因为它如上所述被阻止。
要纠正问题,必须重新审视程序逻辑。