我是
asyncio
的新手。大多数 asyncio
代码示例显示了固定数量任务的并行处理:
tasks = [asyncio.ensure_future(download_one(url)) for url in urls]
await asyncio.gather(*tasks)
我需要下载大量的网址。我目前正在以上述方式使用
aiohttp + asyncio
来分批运行下载文件,比如说16个并发任务。
这种方法的问题是
asyncio
操作阻塞(没有双关语)直到整个批处理完成。
如何在任务完成后立即将新任务动态添加到队列中?
您可以使用
multiprocessing
并将所有并发任务放入循环中。然后通过len(multiprocessing.active_children())
.检查当前活动进程
首先,声明您要处理的最大并发任务:
max_processes = 4
定义您要处理的所有任务:
numberOfTasks = 16
假设,您将使用
printing()
函数执行该任务:
def printing(start, end):
print(f'\n{start} - {end}\n')
然后循环:
for i in range(numberOfTasks + 1):
if(len(multiprocessing.active_children()) < max_processes):
pool = multiprocessing.Pool(processes = 1)
pool.apply_async(printing, args=(0, 100))
else:
pool.close()
pool.join()
每次循环,程序都会创建一个有1个进程的pool,进程数等于
max_processes
,程序会等到pool完成,然后打开一个新任务。
这是我最初的解决方案(使用消费者-生产者设计模式):
import asyncio, random
urls = ['url1',....]
def get_url() -> str | None:
global urls
return urls.pop() if any(urls) else None
async def producer(queue: asyncio.Queue):
while True:
if queue.full():
print(f"queue full ({queue.qsize()}), sleeping...")
await asyncio.sleep(0.3)
continue
# get a url to fetch
url = get_url()
if not url:
break
print(f"PRODUCED: {url}")
await queue.put(url)
await asyncio.sleep(0.1)
async def consumer(queue: asyncio.Queue):
while True:
url = await queue.get()
# simulate I/O operation
await asyncio.sleep(random.randint(1, 3))
queue.task_done()
print(f"CONSUMED: {url}")
async def main():
concurrency = 3
queue: asyncio.Queue = asyncio.Queue(concurrency)
# fire up the both producers and consumers
consumers = [asyncio.create_task(consumer(queue)) for _ in range(concurrency)]
producers = [asyncio.create_task(producer(queue)) for _ in range(1)]
# with both producers and consumers running, wait for
# the producers to finish
await asyncio.gather(*producers)
print("---- done producing")
# wait for the remaining tasks to be processed
await queue.join()
# cancel the consumers, which are now idle
for c in consumers:
c.cancel()
asyncio.run(main())
在我的例子中,由于要处理的 url 列表非常庞大(以百万计),生产者等待工作人员可用,然后再将另一个任务推送到队列中。