Django + Dask 集成:使用情况和进展?

问题描述 投票:0回答:1

关于性能和最佳实践

注意,以下问题的完整代码已在 Github 上公开。 欢迎查看该项目! https://github.com/b-long/moose-dj-uv/pull/3

我正在尝试进行一个简单的 Django + Dask 集成,其中一个视图启动一个长时间运行的进程,另一个视图能够检查该工作的状态。 稍后,我可能会以

get_task_status
(或其他一些 Django 视图函数)能够返回工作输出的方式增强这一点。

我使用

time.sleep(2)
来有意模仿长时间运行的工作。 另外,重要的是要看到整体工作状态为
"running"
。 为此,我还在测试中使用了
time.sleep()
,这感觉很愚蠢。

查看代码如下:

from uuid import uuid4
from django.http import JsonResponse
from dask.distributed import Client
import time

# Initialize Dask client
client = Client(n_workers=8, threads_per_worker=2)

NUM_FAKE_TASKS = 25

# Dictionary to store futures with task_id as key
task_futures = {}


def long_running_process(work_list):
    def task_function(task):
        time.sleep(2)
        return task

    futures = [client.submit(task_function, task) for task in work_list]
    return futures


async def start_task(request):
    work_list = []

    for t in range(NUM_FAKE_TASKS):
        task_id = str(uuid4())  # Generate a unique ID for the task
        work_list.append(
            {"address": f"foo--{t}@example.com", "message": f"Mail task: {task_id}"}
        )

    futures = long_running_process(work_list)
    dask_task_id = futures[0].key  # Use the key of the first future as the task ID

    # Store the futures in the dictionary with task_id as key
    task_futures[dask_task_id] = futures

    return JsonResponse({"task_id": dask_task_id})


async def get_task_status(request, task_id):
    futures = task_futures.get(task_id)

    if futures:
        if not all(future.done() for future in futures):
            progress = 0
            return JsonResponse({"status": "running", "progress": progress})
        else:
            results = client.gather(futures, asynchronous=False)

            # Calculate progress, based on futures that are 'done'
            progress = int((sum(future.done() for future in futures) / len(futures)) * 100)

            return JsonResponse(
                {
                    "task_id": task_id,
                    "status": "completed",
                    "progress": progress,
                    "results": results,
                }
            )
    else:
        return JsonResponse({"status": "error", "message": "Task not found"})

我编写了一个测试,大约 5.5 秒内完成:

from django.test import Client
from django.urls import reverse
import time


def test_immediate_response_with_dask():
    client = Client()
    response = client.post(reverse("start_task_dask"), data={"data": "foo"})
    assert response.status_code == 200
    assert "task_id" in response.json()

    task_id = response.json()["task_id"]
    response2 = client.get(reverse("get_task_status_dask", kwargs={"task_id": task_id}))
    assert response2.status_code == 200
    r2_status = response2.json()["status"]
    assert r2_status == "running"

    attempts = 0
    max_attempts = 8

    while attempts < max_attempts:
        time.sleep(1)
        try:
            response3 = client.get(
                reverse("get_task_status_dask", kwargs={"task_id": task_id})
            )
            assert response3.status_code == 200

            r3_status = response3.json()["status"]
            r3_progress = response3.json()["progress"]

            assert r3_progress >= 99
            assert r3_status == "completed"
            break  # Exit the loop if successful
        except Exception:
            attempts += 1
            if attempts == max_attempts:
                raise  # Raise the last exception if all attempts failed

我的问题是,是否有更高效的方法来实现相同的 API? 如果

NUM_FAKE_TASKS = 10000
怎么办?

我在浪费周期吗?

编辑:如何查看进度百分比?

感谢 @GuillaumeEB 的提示

所以,我们知道以下内容是阻塞的:

client.gather(futures, asynchronous=False)

但是,这似乎也不符合预期:

client.gather(futures, asynchronous=True)

有什么方法可以使用

client.persist()
client.compute()
来查看增量进度吗?

我知道我无法坚持

list
<class 'distributed.client.Future'>
,并且使用
client.compute(futures)
似乎也表现不正确(将进度从
0
跳跃到
100
)。

python django concurrency dask
1个回答
0
投票

我认为您正在寻找的解决方案是 as_completed: https://docs.dask.org/en/latest/futures.html#waiting-on-futures

您还可以使用 as_completed 函数在 future 完成时对其进行迭代

© www.soinside.com 2019 - 2024. All rights reserved.