我有一个异步运行循环,并且从协程中调用同步函数,有什么方法可以在同步函数中调用异步函数并获取结果 尝试了下面的代码,它不起作用 想要在 i() 中打印 hel() 的输出而不将 i() 更改为异步函数 有可能吗,如果有的话怎么办?
import asyncio
async def hel():
return 4
def i():
loop = asyncio.get_running_loop()
x = asyncio.run_coroutine_threadsafe(hel(), loop) ## need to change
y = x.result() ## this lines
print(y)
async def h():
i()
asyncio.run(h())
这是这里最常见的问题类型之一。 执行此操作的工具位于标准库中,并且只需要几行设置代码。 然而,结果并不是 100% 稳健,需要谨慎使用。 这可能就是为什么它还不是高级函数的原因。
从同步函数运行异步函数的基本问题是异步函数包含等待表达式。 Await 表达式暂停当前任务的执行,并允许事件循环运行其他任务。 因此,异步函数(协程)具有特殊的属性,允许它们放弃控制并从中断处重新恢复。 同步功能无法做到这一点。 因此,当您的同步函数调用异步函数并且该函数遇到等待表达式时,应该会发生什么? 同步功能无法让出和恢复。
一个简单的解决方案是在另一个线程中运行异步函数,并使用自己的事件循环。 调用线程会阻塞,直到结果可用。 异步函数的行为与普通函数类似,返回一个值。 缺点是异步函数现在在另一个线程中运行,这可能会导致线程编程带来的所有众所周知的问题。 在许多情况下,这可能不是问题。
可以如下设置。 这是一个完整的脚本,可以导入到应用程序的任何位置。
if __name__ == "__main__"
块中运行的测试代码与原始问题中的代码几乎相同。
线程是延迟初始化的,因此在使用之前不会创建它。 它是一个守护线程,因此不会阻止您的程序退出。
该解决方案不关心主线程中是否有正在运行的事件循环。
import asyncio
import threading
_loop = asyncio.new_event_loop()
_thr = threading.Thread(target=_loop.run_forever, name="Async Runner",
daemon=True)
# This will block the calling thread until the coroutine is finished.
# Any exception that occurs in the coroutine is raised in the caller
def run_async(coro): # coro is a couroutine, see example
if not _thr.is_alive():
_thr.start()
future = asyncio.run_coroutine_threadsafe(coro, _loop)
return future.result()
if __name__ == "__main__":
async def hel():
await asyncio.sleep(0.1)
print("Running in thread", threading.current_thread())
return 4
def i():
y = run_async(hel())
print("Answer", y, threading.current_thread())
async def h():
i()
asyncio.run(h())
输出:
Running in thread <Thread(Async Runner, started daemon 28816)>
Answer 4 <_MainThread(MainThread, started 22100)>
为了从同步方法调用异步函数,您需要使用
asyncio.run
,但这应该是异步程序的单个入口点,因此 asyncio 确保您不会在每个程序中多次执行此操作,所以你不能这样做。
话虽这么说,这个项目 https://github.com/erdewit/nest_asyncio 修补了 asyncio 事件循环来做到这一点,所以使用它后,您应该能够在同步函数中调用
asyncio.run
。
我有点晚了,但我相信有一个比公认的解决方案更简单的解决方案:我们创建一个大小为 1 的可重用线程池,并向其提交带有参数协程的工作函数
asyncio.run
hel()
import asyncio
# We could also muse multiprocessing.pool.ThreadPool:
from concurrent.futures import ThreadPoolExecutor
_run_async_pool = None
def run_async(coro):
global _run_async_pool
if _run_async_pool is None:
_run_async_pool = ThreadPoolExecutor(1)
return _run_async_pool.submit(asyncio.run, coro).result()
async def hel():
return 4
def i():
y = run_async(hel())
print(y)
async def h():
i()
asyncio.run(h())
打印:
4