Python Asyncio - 等待一个特定的后台任务完成,然后再开始下一个

问题描述 投票:0回答:1

我有一个可以批量收集数据的应用程序。每次收集完一批数据后,它都会保存数据。为了加快速度,我一直在使用 asyncio,这样它就可以继续收集数据,同时保存前一批数据,而不是在保存时停止程序。

但是我遇到了一个问题:

假设我的应用程序收集批次 A 并开始保存它。然而,Batch B 确实很小,因此程序在完成保存 Batch A 之前就开始保存 Batch B。这把事情搞砸了,它一次应该只保存一个批次。以下是正在发生的情况的一个大概示例:

import asyncio
import random
async def save_data():
    print("I'm saving a batch")
    await asyncio.sleep(2)
    print("I'm done saving")
    
async def collect_data():
    event_loop = asyncio.get_event_loop()
    while True:
        print("I'm collecting data")
        await asyncio.sleep(random.randint(1, 5))
        event_loop.create_task(save_data())

asyncio.run(collect_data())

理想情况下,任何时候都只应运行一个

save_data()
实例。

如何确保在开始保存下一批之前保存上一批,同时在后台完成保存?

python asynchronous python-asyncio
1个回答
0
投票

理想情况下,任何时候都应该只运行一个 save_data() 实例。

那么你有两个选择:

  1. 您将阻止收集,直到上一批的保存完成。这很简单并且与双缓冲非常相似。缺点是,如果批次大小差异很大,并且您可能希望在保存大批次的同时收集多个小批次,则性能可能会受到影响。
async def collect_data():
    event_loop = asyncio.get_event_loop()
    last_save = None
    while True:
        print("I'm collecting data")
        await asyncio.sleep(random.randint(1, 5))
        if last_save: # don't let saving overlap
            await last_save
        last_save = event_loop.create_task(save_data())
    if last_save:
        await last_save
  1. 将所有批次放入队列中,以便按顺序保存。现在你必须小心,不要因为收集速度快于保存速度而压垮系统,从而耗尽你的内存。队列的
    maxsize
    参数有助于防止这种情况发生。
async def collect_data():
    event_loop = asyncio.get_event_loop()
    queue = asyncio.Queue(maxsize=16)
    async def save_all():
        while True:
            try:
                batch = await queue.get()
            except asyncio.CancelledError:
                return
            await save_data()
            queue.task_done()
    saving = event_loop.create_task(save_all())
    while True:
        print("I'm collecting data")
        await asyncio.sleep(random.randint(1, 5))
        batch = ...
        await queue.put(batch)
    await queue.join()
    saving.cancel()

(正确的异常处理留给读者作为练习)

© www.soinside.com 2019 - 2024. All rights reserved.