我有一个可以批量收集数据的应用程序。每次收集完一批数据后,它都会保存数据。为了加快速度,我一直在使用 asyncio,这样它就可以继续收集数据,同时保存前一批数据,而不是在保存时停止程序。
但是我遇到了一个问题:
假设我的应用程序收集批次 A 并开始保存它。然而,Batch B 确实很小,因此程序在完成保存 Batch A 之前就开始保存 Batch B。这把事情搞砸了,它一次应该只保存一个批次。以下是正在发生的情况的一个大概示例:
import asyncio
import random
async def save_data():
print("I'm saving a batch")
await asyncio.sleep(2)
print("I'm done saving")
async def collect_data():
event_loop = asyncio.get_event_loop()
while True:
print("I'm collecting data")
await asyncio.sleep(random.randint(1, 5))
event_loop.create_task(save_data())
asyncio.run(collect_data())
理想情况下,任何时候都只应运行一个
save_data()
实例。
如何确保在开始保存下一批之前保存上一批,同时在后台完成保存?
理想情况下,任何时候都应该只运行一个 save_data() 实例。
那么你有两个选择:
async def collect_data():
event_loop = asyncio.get_event_loop()
last_save = None
while True:
print("I'm collecting data")
await asyncio.sleep(random.randint(1, 5))
if last_save: # don't let saving overlap
await last_save
last_save = event_loop.create_task(save_data())
if last_save:
await last_save
maxsize
参数有助于防止这种情况发生。async def collect_data():
event_loop = asyncio.get_event_loop()
queue = asyncio.Queue(maxsize=16)
async def save_all():
while True:
try:
batch = await queue.get()
except asyncio.CancelledError:
return
await save_data()
queue.task_done()
saving = event_loop.create_task(save_all())
while True:
print("I'm collecting data")
await asyncio.sleep(random.randint(1, 5))
batch = ...
await queue.put(batch)
await queue.join()
saving.cancel()
(正确的异常处理留给读者作为练习)