我正在尝试使用 Python 中的 Dask 分布式计算来处理大型数据集(大约 100 万个任务)。 (我正在从数据库获取数据来处理它,并且正在检索大约 1M 行)。这里我刚刚制作了一个更简单的代码版本:
每个任务都会模拟一些计算,我希望将这些任务有效地分配给多个工作人员,以最大限度地提高资源利用率。
这是我的代码的最小可重现示例:
from dask.distributed import Client, as_completed
from tqdm import tqdm
import time
import random
# Dummy computational function
def compute_task(data):
# Simulate some computation
time.sleep(random.uniform(0.01, 0.02)) # Simulate computation time
return data * data
# Function to process a chunk of data
def process_chunk(chunk):
results = []
for item in chunk:
result = compute_task(item)
results.append(result)
return results
def main(scheduler_address, num_tasks=1000000, chunk_size=100, max_concurrent_tasks=1000):
client = Client(scheduler_address)
print(f"Connected to Dask scheduler at {scheduler_address}")
try:
# Generate dummy data
data = list(range(num_tasks))
total_chunks = (num_tasks + chunk_size - 1) // chunk_size
# Create a generator for chunks
def chunk_generator():
for i in range(0, len(data), chunk_size):
yield data[i:i + chunk_size]
chunks = chunk_generator()
active_futures = []
# Initial submission of tasks
for _ in range(min(max_concurrent_tasks, total_chunks)):
try:
chunk = next(chunks)
future = client.submit(process_chunk, chunk)
active_futures.append(future)
except StopIteration:
break
completed_chunks = 0
with tqdm(total=total_chunks, desc="Processing data") as pbar:
for completed_future in as_completed(active_futures):
results = completed_future.result()
# Here we could do something with the results
pbar.update(1)
completed_chunks += 1
# Submit new tasks to keep the pipeline full
try:
chunk = next(chunks)
future = client.submit(process_chunk, chunk)
active_futures.append(future)
except StopIteration:
pass
# Remove completed future from the list
active_futures.remove(completed_future)
print("Processing complete.")
finally:
client.close()
print("Client closed.")
if __name__ == "__main__":
main(scheduler_address='tcp://localhost:8786')
说明:
问题:
尽管如此,工作人员很快就会耗尽任务并变得闲置。并且工作池被剥夺了任务。看来我提交和补充任务的逻辑并没有让工人充分忙碌,导致资源利用效率低下。工作人员处理任务的速度比提交新任务的速度快,导致他们在等待更多任务时变得空闲。
我的问题:
我怀疑我的任务提交和管理逻辑的开销导致了延迟。管理每个工作人员队列并在 client.submit 中指定工作人员可能会引入不必要的复杂性和延迟。考虑让 Dask 通过删除工人参数来处理工人分配,但我不确定如何相应地调整我的代码。
任何指导或建议将不胜感激!
以下行阻止了您的代码:
results = completed_future.result()
它强制主进程等待结果(如果工作分布在网络上,则包括通过网络传输)。从代码中你无论如何都没有对结果做任何有用的事情,所以删除该行应该是安全的。
如果在实际代码中您正在对结果执行一些有用的操作,那么最好将相关过程移至单独的函数中(或集成到现有函数中),以允许并行、非阻塞处理。