我在 Python 中有一个生成器函数,它是 IO 绑定的。我想将其转换为异步生成器,其中生成器循环在单独的进程或线程中运行。最好,我想使用并发.futures 模块来允许在单独的进程与线程之间灵活选择。解决这个问题的最佳方法是什么?
import time
def blocking():
""" plain generator with blocking i/o """
for i in range(10):
time.sleep(1)
yield i
def consumer():
for i in blocking():
print(i)
是的,不幸的是,这并不简单。
下面的代码片段中的可选内容是“任务组”,但它有助于良好实践:
import time
import asyncio
from concurrent.futures import ThreadPoolExecutor # (or Process*)
executor = ThreadPoolExecutor(10) # Put this wherever you like, with as many workers as you want.
def blocking_task(i):
time.sleep(1)
return i
async def async_gen():
""" plain generator with blocking i/o """
async with asyncio.TaskGroup() as tg:
loop = asyncio.get_running_loop()
async def taskwrapper(awaitable):
return await awaitable
tasks = {tg.create_task(taskwrapper(loop.run_in_executor(executor, blocking_task, i,))) for i in range(10)}
while tasks:
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
for task in done:
yield task.result()
tasks = pending
async def consumer():
async for i in async_gen():
print(i)
asyncio.run(consumer())