在 Python 中启动或恢复异步方法/协程而不等待它

问题描述 投票:0回答:1

在 JavaScript(和其他语言)中,

async
函数在调用时立即执行,直到遇到第一个
await
。 Python 中并非如此。来自
asyncio
文档

请注意,简单地调用协程不会安排其执行:

我觉得这很令人困惑,因为我的异步编程思维模型是这样的:

  • S尽快设置/初始化/排队异步任务(通常快速且非阻塞)
  • A同步任务在后台发生
  • 做其他事情
  • W等待任务尽可能晚地完成。也许它已经完成了,所以不会发生等待。
| S | AAAAAAAAAAAAAAAAAAAAA | W |
        | S | AAAAAAAAAAAAA | W |
           | S | AAAAAAAAAA | WWWWWW |

为了减少等待时间,应尽快执行安装。 但是如果该方法在Python中是

async
,那么“Setup”部分根本不会通过调用该方法来执行。只有使用了
await
(或
gather
等)才真正启动。这使得启动一堆协程并等待所有协程或批次完成变得非常困难,因为如果您使用
asyncio.gather
,它们都会在最后同时启动,这可能会浪费大量时间.

无需等待方法即可立即执行设置部分的一种方法是:

async def work():
    print("Setup...")
    await asyncio.sleep(1)

async def main():
    task = asyncio.create_task(work())
    await asyncio.sleep(0)
    # do something else
    await task

但这非常麻烦(而且丑陋),而且还让我在事件循环中随心所欲地实际选择新创建的任务,而不仅仅是换回

main
方法或完全不同的方法。

我设法想出了这个方法装饰器,它使用

send(None)
立即执行异步方法,并将
async
方法包装到返回
Awaitable
的普通方法中:

def js_like_await(coro):
    class JSLikeCoroutine:
        def __init__(self, first_return, coro_obj):
            self.first_return = first_return
            self.coro_obj = coro_obj
        def __await__(self):
            try:
                yield self.first_return
                while True:
                    yield self.coro_obj.send(None)
            except StopIteration as e:
                return e.value
    def coroutine_with_js_like_await(*args, **kwargs):
        coro_obj = coro(*args, **kwargs)
        first_return = None
        try:
            first_return = coro_obj.send(None)
            return JSLikeCoroutine(first_return, coro_obj)
        except StopIteration as e:
            result = asyncio.Future()
            result.set_result(e.value)
            return result
        
    return coroutine_with_js_like_await

但是要使其产生效果,还必须包装可能的内部异步函数调用,这对于方法的外部调用者来说可能无法修改。

我认为如果有一种方法可以“尝试”协程启动或恢复它,我可以使这变得简单得多。 因此,将控制流切换到协程(类似于

send()
的做法),但前提是异步函数当前没有等待某些内容。如果确实如此,请立即返回。

所以我想要一个方法

attempt
,可以像这样使用来控制执行流程:

def attempt(task):
    while not task.is_waiting:
        task.continue_to_next_await()

async def work():
    print("Setup")
    await asyncio.sleep(10)
    print("After first sleep")
    await asyncio.sleep(10)
    print("Done")
    return "Result"

task = work()  # Nothing is printed, because the coroutine is only created.
attempt(task)  # "Setup" is printed and method returns immediately
attempt(task)  # Nothing happens, because task is still sleeping
time.sleep(15) # Block CPU to wait for sleeping to finish
attempt(task)  # "After first sleep" is printed, because the sleep time is over
attempt(task)  # Nothing happens, because task is still sleeping
time.sleep(15) # Block CPU to wait for second sleeping to finish
attempt(task)  # "Done" is printed.
attempt(task)  # Nothing happens, because task is finished
r = await task # Get result.

有没有办法实现这样的目标?

我无法让

send(None)
多次工作,因为在第二个
send
之后,如果我丢弃从await wasn't used with future
返回的期货,它会因
send(None)
而崩溃:

async def work():
    print("Setup")
    await asyncio.sleep(10)
    print("Done")

async def main():
    task = work()
    _ = task.send(None) # Returns <Future pending> of type <class '_asyncio.Future'>
    _ = task.send(None) # RuntimeError: await wasn't used with future

我觉得我必须以某种方式将返回的 Future 返回到事件循环,但我不知道如何。如果我将

attempt
制作为
__await__
对象,而该对象只是
yield
Future
中的
send
,它将等待
await
并且
sleep
会阻塞。

注意,我知道“不制定你的方法

attempt”的明显解决方案。但可悲的是,为时已晚,无法让每个人都遵守这一点。


    

python asynchronous async-await python-asyncio coroutine
1个回答
0
投票
https://peps.python.org/pep-0525/

的组合,并在另一个事件循环上并行运行协程线程https://docs.python.org/3/library/asyncio-task.html#asyncio.run_coroutine_threadsafe。 虽然这是可行的,但您可能需要考虑不同的模式。 使用run_coroutine_threadsafe

代码

async

输出

import asyncio import time from threading import Thread global_loop = None def thread_for_event_loop(): global global_loop global_loop = asyncio.new_event_loop() asyncio.set_event_loop(global_loop) global_loop.run_forever() t = Thread(target=thread_for_event_loop) t.daemon = True t.start() time.sleep(1) # wait for thread to start old_print = print print = lambda *_: old_print(round(time.perf_counter(), 1), *_) def attempt(future): # doesn't actually do anything, only prints if task is done print(future.done()) async def work(): print("SETUP") await asyncio.sleep(2) print("MIDDLE") await asyncio.sleep(2) print("END") return "Result" async def main(): print("START", int(time.perf_counter())) task = asyncio.run_coroutine_threadsafe(work(), global_loop) attempt(task) attempt(task) print("before first sleep") time.sleep(3) print("after first sleep") attempt(task) attempt(task) print("before second sleep") time.sleep(3) # Block CPU to wait for second sleeping to finish print("after second sleep") attempt(task) attempt(task) print(await asyncio.wrap_future(task)) asyncio.run(main())

© www.soinside.com 2019 - 2024. All rights reserved.