我有一个像下面这样的代码:
def render():
loop = asyncio.get_event_loop()
async def test():
await asyncio.sleep(2)
print("hi")
return 200
if loop.is_running():
result = asyncio.ensure_future(test())
else:
result = loop.run_until_complete(test())
当
loop
未运行时,非常简单:只需使用 loop.run_until_complete
即可返回 coro 结果,但如果循环已经在运行(我的阻塞代码在已经运行循环的应用程序中运行)。我不能使用 loop.run_until_complete
因为它会引发异常;当我调用 asyncio.ensure_future
时,任务被安排并运行,但我想在那里等待结果,有人知道该怎么做吗?文档不太清楚如何执行此操作。
我尝试在 coro 内传递一个
concurrent.futures.Future
调用 set_result
,然后在我的阻塞代码上调用 Future.result()
,但它不起作用:它阻塞在那里并且不让其他任何东西运行。任何帮助将不胜感激。
要使用建议的设计实现
runner
,您需要一种方法来从事件循环内部运行的回调单步执行事件循环。 Asyncio 明确禁止递归事件循环,因此这种方法是死胡同。
鉴于该限制,您有两种选择:
render()
本身成为协程;render()
(及其调用者)。假设#1 是不可能的,您可以像这样实现
render()
的 #2 变体:
def render():
loop = _event_loop # can't call get_event_loop()
async def test():
await asyncio.sleep(2)
print("hi")
return 200
future = asyncio.run_coroutine_threadsafe(test(), loop)
result = future.result()
请注意,您不能在
asyncio.get_event_loop()
中使用 render
,因为没有(也不应该)为该线程设置事件循环。相反,生成运行器线程的代码必须调用 asyncio.get_event_loop()
并将其发送到线程,或者只是将其保留在全局变量或共享结构中。
同步等待异步协程
如果异步事件循环已通过调用
loop.run_forever
运行,它将阻塞执行线程,直到调用 loop.stop
[请参阅 docs]。因此,同步等待的唯一方法是在专用线程上运行事件循环,在循环上安排异步函数,并从另一个线程同步等待它。
为此,我按照 user4815162342 的 answer 编写了自己的最小解决方案。我还添加了在所有工作完成后清理循环的部件 [参见
loop.close
]。
下面代码中的
main
函数在专用线程上运行事件循环,在事件循环上调度多个任务,以及要同步等待其结果的任务。同步等待将阻塞,直到所需的结果准备就绪。最后,循环被关闭并与其线程一起被优雅地清理。
专用线程和函数
stop_loop
、run_forever_safe
和await_sync
可以封装在模块或类中。
有关线程安全注意事项,请参阅 asyncio 文档中的“并发和多线程”部分。
import asyncio
import threading
#----------------------------------------
def stop_loop(loop):
''' stops an event loop '''
loop.stop()
print (".: LOOP STOPPED:", loop.is_running())
def run_forever_safe(loop):
''' run a loop for ever and clean up after being stopped '''
loop.run_forever()
# NOTE: loop.run_forever returns after calling loop.stop
#-- cancell all tasks and close the loop gracefully
print(".: CLOSING LOOP...")
# source: <https://xinhuang.github.io/posts/2017-07-31-common-mistakes-using-python3-asyncio.html>
loop_tasks_all = asyncio.Task.all_tasks(loop=loop)
for task in loop_tasks_all: task.cancel()
# NOTE: `cancel` does not guarantee that the Task will be cancelled
for task in loop_tasks_all:
if not (task.done() or task.cancelled()):
try:
# wait for task cancellations
loop.run_until_complete(task)
except asyncio.CancelledError: pass
#END for
print(".: ALL TASKS CANCELLED.")
loop.close()
print(".: LOOP CLOSED:", loop.is_closed())
def await_sync(task):
''' synchronously waits for a task '''
while not task.done(): pass
print(".: AWAITED TASK DONE")
return task.result()
#----------------------------------------
async def asyncTask(loop, k):
''' asynchronous task '''
print("--start async task %s" % k)
await asyncio.sleep(3, loop=loop)
print("--end async task %s." % k)
key = "KEY#%s" % k
return key
def main():
loop = asyncio.new_event_loop() # construct a new event loop
#-- closures for running and stopping the event-loop
run_loop_forever = lambda: run_forever_safe(loop)
close_loop_safe = lambda: loop.call_soon_threadsafe(stop_loop, loop)
#-- make dedicated thread for running the event loop
thread = threading.Thread(target=run_loop_forever)
#-- add some tasks along with my particular task
myTask = asyncio.run_coroutine_threadsafe(asyncTask(loop, 100200300), loop=loop)
otherTasks = [asyncio.run_coroutine_threadsafe(asyncTask(loop, i), loop=loop)
for i in range(1, 10)]
#-- begin the thread to run the event-loop
print(".: EVENT-LOOP THREAD START")
thread.start()
#-- _synchronously_ wait for the result of my task
result = await_sync(myTask) # blocks until task is done
print("* final result of my task:", result)
#... do lots of work ...
print("*** ALL WORK DONE ***")
#========================================
# close the loop gracefully when everything is finished
close_loop_safe()
thread.join()
#----------------------------------------
main()
这是我的情况,我的整个程序是异步的,但调用一些同步库,然后回调到我的异步函数。
关注用户4815162342的回答。
import asyncio
async def asyncTask(k):
''' asynchronous task '''
print("--start async task %s" % k)
# await asyncio.sleep(3, loop=loop)
await asyncio.sleep(3)
print("--end async task %s." % k)
key = "KEY#%s" % k
return key
def my_callback():
print("here i want to call my async func!")
future = asyncio.run_coroutine_threadsafe(asyncTask(1), LOOP)
return future.result()
def sync_third_lib(cb):
print("here will call back to your code...")
cb()
async def main():
print("main start...")
print("call sync third lib ...")
await asyncio.to_thread(sync_third_lib, my_callback)
# await loop.run_in_executor(None, func=sync_third_lib)
print("another work...keep async...")
await asyncio.sleep(2)
print("done!")
LOOP = asyncio.get_event_loop()
LOOP.run_until_complete(main())