如何使用 asyncio.run_coroutine_threadsafe() 和主线程循环从子线程调用异步函数?

问题描述 投票:0回答:1

我有一个到处使用 async/await 运行的应用程序。通常,它调用同步函数。这些函数有时必须再次调用异步函数。我试图通过使用线程中的

asyncio.run_coroutine_threadsafe()
,使用主线程的循环来完成这项工作,但等待
run_coroutine_threadsafe()
Future
通过调用
.result()
来完成似乎是挂起的。

以下是我在 Python 3.12 和 3.13 上重现问题的示例代码:

from asyncio import run_coroutine_threadsafe, get_running_loop, run, AbstractEventLoop
from collections.abc import Callable, Awaitable
from concurrent.futures.thread import ThreadPoolExecutor
from typing import TypeVar

_T = TypeVar("_T")


def _await_in_thread(loop: AbstractEventLoop, f: Callable[[], Awaitable[_T]]) -> _T:
    """
    Await something inside a thread, using the main thread's loop.
    """

    print("THIS IS PRINTED")
    return run_coroutine_threadsafe(f(), loop).result()


def _await_to_thread(pool: ThreadPoolExecutor, f: Callable[[], Awaitable[_T]]) -> _T:
    """
    Await something by moving it to a thread.
    """
    return pool.submit(_await_in_thread, get_running_loop(), f).result()


async def _async_main(pool: ThreadPoolExecutor) -> None:
    """
    Run the main application, which is asynchronous.
    """

    # Eventually, the application calls a function that due to its nature (maybe a third-party API) is synchronous.
    _some_sync_function(pool)


def _sync_main() -> None:
    with ThreadPoolExecutor() as pool:
        run(_async_main(pool))


def _some_sync_function(pool: ThreadPoolExecutor) -> None:
    # This synchronous function then has to call a function that is asynchronous.
    result = _await_to_thread(pool, _some_async_function)
    assert result == 123


async def _some_async_function() -> int:
    print("BUT THIS IS NOT PRINTED")
    return 123


if __name__ == "__main__":
    _sync_main()

虽然示例可以稍微简单一些,但我想包括

ThreadPoolExecutor
的使用以及返回值的传播,因为这是最终产品的两个要求。

此外,Python 兼容性为 3.11+,但如果没有其他解决方案,我可能会确信将其设为 3.12+。

这是 IRC 上的单个答案以及我在 Python 文档以及各种问答网站和博客中找到的非常简洁的

run_coroutine_threadsafe()
示例的结果。它似乎不是人们普遍知道或使用的功能。

我的问题是:我误解了什么?这应该如何运作?

python python-asyncio python-multithreading
1个回答
0
投票

您遇到了僵局。

从 asyncio 的事件循环中,您将一个函数提交到池中,同时也在等待结果。在等待期间,事件循环被阻塞(这在 asyncio 中不应该发生!)。它发生在这里:

return pool.submit(_await_in_thread, get_running_loop(), f).result()

相当于:

future = pool.sumbit(...)
return future.result()  # wait for the result synchronously!

这个提交的函数提交另一个函数,这次提交给 asyncio,并再次等待结果。与上面相同的模式,实际代码:

return run_coroutine_threadsafe(f(), loop).result()

但是 asyncio 无法执行任何工作,因为它如上所述被阻止。

要纠正问题,必须重新审视程序逻辑。

© www.soinside.com 2019 - 2024. All rights reserved.