如何在循环运行时运行协程并等待同步函数的结果?

问题描述 投票:0回答:3

我有一个像下面这样的代码:

def render():
    loop = asyncio.get_event_loop()

    async def test():
        await asyncio.sleep(2)
        print("hi")
        return 200

    if loop.is_running():
        result = asyncio.ensure_future(test())
    else:
        result = loop.run_until_complete(test())

loop
未运行时,非常简单:只需使用
loop.run_until_complete
即可返回 coro 结果,但如果循环已经在运行(我的阻塞代码在已经运行循环的应用程序中运行)。我不能使用
loop.run_until_complete
因为它会引发异常;当我调用
asyncio.ensure_future
时,任务被安排并运行,但我想在那里等待结果,有人知道该怎么做吗?文档不太清楚如何执行此操作。

我尝试在 coro 内传递一个

concurrent.futures.Future
调用
set_result
,然后在我的阻塞代码上调用
Future.result()
,但它不起作用:它阻塞在那里并且不让其他任何东西运行。任何帮助将不胜感激。

python python-3.x python-asyncio
3个回答
12
投票

要使用建议的设计实现

runner
,您需要一种方法来从事件循环内部运行的回调单步执行事件循环。 Asyncio 明确禁止递归事件循环,因此这种方法是死胡同。

鉴于该限制,您有两种选择:

  1. 使
    render()
    本身成为协程;
  2. 在与运行异步事件循环的线程不同的线程中执行
    render()
    (及其调用者)。

假设#1 是不可能的,您可以像这样实现

render()
的 #2 变体:

def render():
    loop = _event_loop  # can't call get_event_loop()

    async def test():
        await asyncio.sleep(2)
        print("hi")
        return 200

    future = asyncio.run_coroutine_threadsafe(test(), loop)
    result = future.result()

请注意,您不能在

asyncio.get_event_loop()
中使用
render
,因为没有(也不应该)为该线程设置事件循环。相反,生成运行器线程的代码必须调用
asyncio.get_event_loop()
并将其发送到线程,或者只是将其保留在全局变量或共享结构中。


9
投票

同步等待异步协程

如果异步事件循环已通过调用

loop.run_forever
运行,它将阻塞执行线程,直到调用
loop.stop
[请参阅 docs]。因此,同步等待的唯一方法是在专用线程上运行事件循环,在循环上安排异步函数,并从另一个线程同步等待它。

为此,我按照 user4815162342 的 answer 编写了自己的最小解决方案。我还添加了在所有工作完成后清理循环的部件 [参见

loop.close
]。

下面代码中的

main
函数在专用线程上运行事件循环,在事件循环上调度多个任务,以及要同步等待其结果的任务。同步等待将阻塞,直到所需的结果准备就绪。最后,循环被关闭并与其线程一起被优雅地清理。

专用线程和函数

stop_loop
run_forever_safe
await_sync
可以封装在模块或类中。

有关线程安全注意事项,请参阅 asyncio 文档中的“并发和多线程”部分。

import asyncio
import threading
#----------------------------------------

def stop_loop(loop):
    ''' stops an event loop '''
    loop.stop()
    print (".: LOOP STOPPED:", loop.is_running())

def run_forever_safe(loop):
    ''' run a loop for ever and clean up after being stopped '''

    loop.run_forever()
    # NOTE: loop.run_forever returns after calling loop.stop

    #-- cancell all tasks and close the loop gracefully
    print(".: CLOSING LOOP...")
    # source: <https://xinhuang.github.io/posts/2017-07-31-common-mistakes-using-python3-asyncio.html>

    loop_tasks_all = asyncio.Task.all_tasks(loop=loop)

    for task in loop_tasks_all: task.cancel()
    # NOTE: `cancel` does not guarantee that the Task will be cancelled

    for task in loop_tasks_all:
        if not (task.done() or task.cancelled()):
            try:
                # wait for task cancellations
                loop.run_until_complete(task)
            except asyncio.CancelledError: pass
    #END for
    print(".: ALL TASKS CANCELLED.")

    loop.close()
    print(".: LOOP CLOSED:", loop.is_closed())

def await_sync(task):
    ''' synchronously waits for a task '''
    while not task.done(): pass
    print(".: AWAITED TASK DONE")
    return task.result()
#----------------------------------------

async def asyncTask(loop, k):
    ''' asynchronous task '''
    print("--start async task %s" % k)
    await asyncio.sleep(3, loop=loop)
    print("--end async task %s." % k)
    key = "KEY#%s" % k
    return key

def main():
    loop = asyncio.new_event_loop() # construct a new event loop

    #-- closures for running and stopping the event-loop
    run_loop_forever = lambda: run_forever_safe(loop)
    close_loop_safe = lambda: loop.call_soon_threadsafe(stop_loop, loop)

    #-- make dedicated thread for running the event loop
    thread = threading.Thread(target=run_loop_forever)

    #-- add some tasks along with my particular task
    myTask = asyncio.run_coroutine_threadsafe(asyncTask(loop, 100200300), loop=loop)
    otherTasks = [asyncio.run_coroutine_threadsafe(asyncTask(loop, i), loop=loop)
                  for i in range(1, 10)]

    #-- begin the thread to run the event-loop
    print(".: EVENT-LOOP THREAD START")
    thread.start()

    #-- _synchronously_ wait for the result of my task
    result = await_sync(myTask) # blocks until task is done
    print("* final result of my task:", result) 

    #... do lots of work ...
    print("*** ALL WORK DONE ***")
    #========================================

    # close the loop gracefully when everything is finished
    close_loop_safe()
    thread.join()
#----------------------------------------

main()

2
投票

这是我的情况,我的整个程序是异步的,但调用一些同步库,然后回调到我的异步函数。

关注用户4815162342的回答。

import asyncio

async def asyncTask(k):
    ''' asynchronous task '''
    print("--start async task %s" % k)
    # await asyncio.sleep(3, loop=loop)
    await asyncio.sleep(3)
    print("--end async task %s." % k)
    key = "KEY#%s" % k
    return key


def my_callback():
    print("here i want to call my async func!")
    future = asyncio.run_coroutine_threadsafe(asyncTask(1), LOOP)
    return future.result()

def sync_third_lib(cb):
    print("here will call back to your code...")
    cb()

async def main():
    print("main start...")

    print("call sync third lib ...")
    await asyncio.to_thread(sync_third_lib, my_callback)
    # await loop.run_in_executor(None, func=sync_third_lib)
    print("another work...keep async...")
    await asyncio.sleep(2)

    print("done!")


LOOP = asyncio.get_event_loop()
LOOP.run_until_complete(main())
© www.soinside.com 2019 - 2024. All rights reserved.