如何让 Dask 工作人员在处理大型数据集时保持忙碌,以防止他们耗尽任务?

问题描述 投票:0回答:1

我正在尝试使用 Python 中的 Dask 分布式计算来处理大型数据集(大约 100 万个任务)。 (我正在从数据库获取数据来处理它,并且正在检索大约 1M 行)。这里我刚刚制作了一个更简单的代码版本:

每个任务都会模拟一些计算,我希望将这些任务有效地分配给多个工作人员,以最大限度地提高资源利用率。

这是我的代码的最小可重现示例:

from dask.distributed import Client, as_completed
from tqdm import tqdm
import time
import random

# Dummy computational function
def compute_task(data):
    # Simulate some computation
    time.sleep(random.uniform(0.01, 0.02))  # Simulate computation time
    return data * data

# Function to process a chunk of data
def process_chunk(chunk):
    results = []
    for item in chunk:
        result = compute_task(item)
        results.append(result)
    return results

def main(scheduler_address, num_tasks=1000000, chunk_size=100, max_concurrent_tasks=1000):
    client = Client(scheduler_address)
    print(f"Connected to Dask scheduler at {scheduler_address}")

    try:
        # Generate dummy data
        data = list(range(num_tasks))
        total_chunks = (num_tasks + chunk_size - 1) // chunk_size

        # Create a generator for chunks
        def chunk_generator():
            for i in range(0, len(data), chunk_size):
                yield data[i:i + chunk_size]

        chunks = chunk_generator()
        active_futures = []

        # Initial submission of tasks
        for _ in range(min(max_concurrent_tasks, total_chunks)):
            try:
                chunk = next(chunks)
                future = client.submit(process_chunk, chunk)
                active_futures.append(future)
            except StopIteration:
                break

        completed_chunks = 0
        with tqdm(total=total_chunks, desc="Processing data") as pbar:
            for completed_future in as_completed(active_futures):
                results = completed_future.result()
                # Here we could do something with the results
                pbar.update(1)
                completed_chunks += 1

                # Submit new tasks to keep the pipeline full
                try:
                    chunk = next(chunks)
                    future = client.submit(process_chunk, chunk)
                    active_futures.append(future)
                except StopIteration:
                    pass

                # Remove completed future from the list
                active_futures.remove(completed_future)

        print("Processing complete.")

    finally:
        client.close()
        print("Client closed.")

if __name__ == "__main__":
    main(scheduler_address='tcp://localhost:8786')

说明:

  • compute_task:一个虚拟函数,通过以下方式模拟计算工作 睡眠短暂的随机持续时间并返回的平方 输入数据。
  • process_chunk:将compute_task应用于块中的每个项目。
  • 主要功能:
    • 生成数字列表作为虚拟数据。
    • 将数据分割成块。
    • 向worker提交任务,旨在保留一定数量的任务 (tasks_per_worker) 每个工作人员排队。
    • 任务完成时处理结果并尝试补充 工人排队。

问题:

尽管如此,工作人员很快就会耗尽任务并变得闲置。并且工作池被剥夺了任务。看来我提交和补充任务的逻辑并没有让工人充分忙碌,导致资源利用效率低下。工作人员处理任务的速度比提交新任务的速度快,导致他们在等待更多任务时变得空闲。

我的问题:

  • 如何改进我的任务提交逻辑以确保 Dask 工作人员仍然忙碌,直到处理完所有数据?
  • 有没有更有效的方法来在工人之间分配任务 最大化吞吐量和资源利用率?

我怀疑我的任务提交和管理逻辑的开销导致了延迟。管理每个工作人员队列并在 client.submit 中指定工作人员可能会引入不必要的复杂性和延迟。考虑让 Dask 通过删除工人参数来处理工人分配,但我不确定如何相应地调整我的代码。

任何指导或建议将不胜感激!

python dask dask-distributed dask-delayed
1个回答
0
投票

以下行阻止了您的代码:

results = completed_future.result()

它强制主进程等待结果(如果工作分布在网络上,则包括通过网络传输)。从代码中你无论如何都没有对结果做任何有用的事情,所以删除该行应该是安全的。

如果在实际代码中您正在对结果执行一些有用的操作,那么最好将相关过程移至单独的函数中(或集成到现有函数中),以允许并行、非阻塞处理。

© www.soinside.com 2019 - 2024. All rights reserved.