delayed() 函数的作用是什么(与 Python 中的 joblib 一起使用时)

问题描述 投票:0回答:5

我已经阅读了文档,但我不明白这是什么意思:

The delayed function is a simple trick to be able to create a tuple (function, args, kwargs) with a function-call syntax.

我使用它来迭代我想要操作的列表(allImages),如下所示:

def joblib_loop():
    Parallel(n_jobs=8)(delayed(getHog)(i) for i in allImages)

这会返回我的 HOG 功能,就像我想要的那样(并且使用我所有的 8 个核心获得速度增益),但我只是不确定它实际上在做什么。

我的Python知识充其量只是还可以,而且很可能我遗漏了一些基本的东西。任何正确方向的指示将不胜感激

python multiprocessing joblib
5个回答
183
投票

如果我们看看如果我们简单地写会发生什么,事情可能会变得更清楚

Parallel(n_jobs=8)(getHog(i) for i in allImages)

在这种情况下,可以更自然地表达为:

  1. 使用
    Parallel
     创建一个 
    n_jobs=8
  2. 实例
  3. 为列表创建一个生成器
    [getHog(i) for i in allImages]
  4. 将该生成器传递给
    Parallel
    实例

有什么问题吗?当列表传递给

Parallel
对象时,所有
getHog(i)
调用都已经返回 - 所以没有什么可以并行执行的了!所有工作都已在主线程中按顺序完成。

我们实际上想要的是告诉Python我们想要用什么参数来调用哪些函数,而不实际调用它们——换句话说,我们想要延迟执行。

这就是

delayed
方便我们做的事情,语法清晰。如果我们想告诉 Python 我们想稍后调用
foo(2, g=3)
,我们可以简单地写
delayed(foo)(2, g=3)
。返回的是元组
(foo, [2], {g: 3})
,包含:

  • 对我们要调用的函数的引用,例如
    foo
  • 所有参数(短“args”),不带关键字,例如
    2
  • 所有关键字参数(短“kwargs”),例如
    g=3

因此,通过写

Parallel(n_jobs=8)(delayed(getHog)(i) for i in allImages)
,而不是上面的顺序,现在会发生以下情况:

  1. 创建带有

    Parallel
    n_jobs=8
    实例

  2. 名单

     [delayed(getHog)(i) for i in allImages]
    

    被创建,评估为

     [(getHog, [img1], {}), (getHog, [img2], {}), ... ]
    
  3. 该列表被传递到

    Parallel
    实例

  4. Parallel
    实例创建8个线程并将列表中的元组分配给它们

  5. 最后,每个线程开始执行元组,即,它们调用第一个元素,并将第二个和第三个元素解压为参数

    tup[0](*tup[1], **tup[2])
    ,将元组返回到我们实际想要执行的调用,
    getHog(img2)
    .


3
投票

我们需要一个循环来测试不同模型配置的列表。这是驱动网格搜索过程的主函数,并将为每个模型配置调用 Score_model() 函数。我们可以通过并行评估模型配置来显着加快网格搜索过程。一种方法是使用 Joblib 库。我们可以定义一个包含要使用的核心数量的 Parallel 对象,并将其设置为在您的硬件中检测到的分数数量。

定义执行者

executor = Parallel(n_jobs=cpu_count(), backend= 'multiprocessing' )

然后创建一个要并行执行的任务列表,这将是对我们拥有的每个模型配置的 Score model() 函数的一次调用。

假设

def score_model(data, n_test, cfg):
                   ........................

定义任务列表

tasks = (delayed(score_model)(data, n_test, cfg) for cfg in cfg_list)

我们可以使用 Parallel 对象并行执行任务列表。

scores = executor(tasks)


0
投票

因此,您希望能够做的是堆积一组函数调用及其参数,以便您可以将它们有效地传递给调度程序/执行程序。 Delayed 是一个 decorator ,它接收一个函数及其参数并将它们包装到一个对象中,该对象可以放入列表中并根据需要弹出。 Dask 有同样的东西,它部分地用于将其输入到其图形调度程序中。


0
投票

LakshayAAAgrawal 的帖子很好地解释了它的工作原理。我想添加一些关于便捷实现的细节:

import joblib

@joblib.delayed
def getHog(image):
    """Some time-consuming function on an image"""
    ...

# Running this in parallel
with joblib.Parallel(backend="loky", n_jobs=8) as parallel:
    result = parallel(getHog(img) for img in allImages)

请注意,对

parallel(getHog(...)...)
的调用不需要
delayed()
调用,因为后者已作为 decorator 添加到函数定义中。


-5
投票

来自参考https://wiki.python.org/moin/ParallelProcessing Parallel 对象创建一个多处理池,在多个进程中分叉 Python 解释器来执行列表中的每个项目。延迟函数是一个简单的技巧,能够使用函数调用语法创建元组(函数、args、kwargs)。

我想建议的另一件事是,我们可以这样概括,而不是显式定义核心数:

import multiprocessing
num_core=multiprocessing.cpu_count()
最新问题
© www.soinside.com 2019 - 2025. All rights reserved.