python 异步并行处理与动态任务队列

问题描述 投票:0回答:2

我是

asyncio
的新手。大多数
asyncio
代码示例显示了固定数量任务的并行处理:

tasks = [asyncio.ensure_future(download_one(url)) for url in urls]
await asyncio.gather(*tasks)

我需要下载大量的网址。我目前正在以上述方式使用

aiohttp + asyncio
来分批运行下载文件,比如说16个并发任务。

这种方法的问题是

asyncio
操作阻塞(没有双关语)直到整个批处理完成。

如何在任务完成后立即将新任务动态添加到队列中?

python asynchronous python-asyncio threadpool aiohttp
2个回答
0
投票

您可以使用

multiprocessing
并将所有并发任务放入循环中。然后通过
len(multiprocessing.active_children())
.

检查当前活动进程

首先,声明您要处理的最大并发任务:

max_processes = 4

定义您要处理的所有任务:

numberOfTasks = 16

假设,您将使用

printing()
函数执行该任务:

def printing(start, end):
  print(f'\n{start} - {end}\n')

然后循环:

for i in range(numberOfTasks + 1):
  if(len(multiprocessing.active_children()) < max_processes):
    pool = multiprocessing.Pool(processes = 1)
    pool.apply_async(printing, args=(0, 100))

  else:
    pool.close()
    pool.join()

每次循环,程序都会创建一个有1个进程的pool,进程数等于

max_processes
,程序会等到pool完成,然后打开一个新任务。


0
投票

这是我最初的解决方案(使用消费者-生产者设计模式):

import asyncio, random

urls = ['url1',....]

def get_url() -> str | None:
    global urls
    return urls.pop() if any(urls) else None


async def producer(queue: asyncio.Queue):
    while True:
        if queue.full():
            print(f"queue full ({queue.qsize()}), sleeping...")
            await asyncio.sleep(0.3)
            continue

        # get a url to fetch
        url = get_url()
        if not url:
            break
        print(f"PRODUCED: {url}")
        await queue.put(url)
        await asyncio.sleep(0.1)


async def consumer(queue: asyncio.Queue):
    while True:
        url = await queue.get()
        # simulate I/O operation
        await asyncio.sleep(random.randint(1, 3))
        queue.task_done()
        print(f"CONSUMED: {url}")


async def main():
    concurrency = 3
    queue: asyncio.Queue = asyncio.Queue(concurrency)

    # fire up the both producers and consumers
    consumers = [asyncio.create_task(consumer(queue)) for _ in range(concurrency)]
    producers = [asyncio.create_task(producer(queue)) for _ in range(1)]

    # with both producers and consumers running, wait for
    # the producers to finish
    await asyncio.gather(*producers)
    print("---- done producing")

    # wait for the remaining tasks to be processed
    await queue.join()

    # cancel the consumers, which are now idle
    for c in consumers:
        c.cancel()


asyncio.run(main())

在我的例子中,由于要处理的 url 列表非常庞大(以百万计),生产者等待工作人员可用,然后再将另一个任务推送到队列中。

© www.soinside.com 2019 - 2024. All rights reserved.