Polars - 使用partition_by 和collect_all 加速

问题描述 投票:0回答:1

设置示例

警告:5GB内存 df 创建

import time

import numpy as np
import polars as pl

rng = np.random.default_rng(1)

nrows = 50_000_000
df = pl.DataFrame(
    dict(
        id=rng.integers(1, 50, nrows),
        id2=rng.integers(1, 500, nrows),
        v=rng.normal(0, 1, nrows),
        v1=rng.normal(0, 1, nrows),
        v2=rng.normal(0, 1, nrows),
        v3=rng.normal(0, 1, nrows),
        v4=rng.normal(0, 1, nrows),
        v5=rng.normal(0, 1, nrows),
        v6=rng.normal(0, 1, nrows),
        v7=rng.normal(0, 1, nrows),
        v8=rng.normal(0, 1, nrows),
        v9=rng.normal(0, 1, nrows),
        v10=rng.normal(0, 1, nrows),
    )
)

我手头有一个简单的任务如下。

start = time.perf_counter()
res = (
    df.lazy()
    .with_columns(
        pl.col(f"v{i}") - pl.col(f"v{i}").mean().over("id", "id2")
        for i in range(1, 11)
    )
    .group_by("id", "id2")
    .agg((pl.col(f"v{i}") * pl.col("v")).sum() for i in range(1, 11))
    .collect()
)
time.perf_counter() - start
# 9.85

上述任务在 16 核机器上大约 10 秒内完成。

但是,如果我首先将

df
拆分/分区为
id
,然后执行与上面相同的计算并在最后调用
collect_all
concat
,我可以获得近 2 倍的加速。

start = time.perf_counter()
res2 = pl.concat(
    pl.collect_all(
        dfi.lazy()
        .with_columns(
            pl.col(f"v{i}") - pl.col(f"v{i}").mean().over("id", "id2")
            for i in range(1, 11)
        )
        .group_by("id", "id2")
        .agg((pl.col(f"v{i}") * pl.col("v")).sum() for i in range(1, 11))
            for dfi in df.partition_by("id", maintain_order=False)
    )
)
time.perf_counter() - start
# 5.60

另外,如果我用

id2
而不是
id
来分区的话,时间会更快~4s。

我还注意到第二种方法(按

id
id2
分区)比第一种方法具有更好的CPU利用率。也许这就是第二种方法更快的原因。

我的问题是:

  1. 为什么第二种方法速度更快并且CPU利用率更高?
  2. 它们在性能方面不应该是相同的吗?因为我认为 window/group_by 操作将始终为每个窗口/组并行执行,并使用尽可能多的可用资源?
python python-polars
1个回答
1
投票

最初的免责声明,这有点推测性,因为我还没有查看来源,但我认为这是有根据的

我在多种模式下进行了这两项测试

测试 时间
测试1(原样) 20.44
测试2(原样) 16.07
test1 w/aggpow 22.41
test2 w/aggpow 19.99
test1 w/w/pow 52.08
test2 w/w/pow 21.81
test1 w/aggpow eager 47.90
test2 w/aggpow eager 61.49

在第一次运行时,我从 test1 到 test2 得到了 20.44 和 16.07,因此我正在复制与您相同的方向,但我的计算机上的严重程度较低。

我在两次测试中都运行了 htop。内存似乎在大致相同的使用情况下达到峰值,但不同的是,在 test1 中,核心没有完全加载,而在 test2 中,我可以看到所有核心都非常一致地接近顶部。

为了进一步探索这一点,我在 with_columns 中添加了

.pow(1.2).pow(0.3888)
,并在聚合中添加了另一轮。

在聚合中进行那些昂贵的操作(具体来说,是在

sum()
test1 取得了 22.41 而 test2 取得了 19.985 之后。

我将其从聚合中取出并放入 with_columns 中(特别是在

pl.col(f"v{i}")
上,而不是仅原始第一个的平均值)。由于那里的手术费用昂贵,差异确实令人震惊。测试 1 为 52.08,测试 2 为 21.81。

在测试 1 期间,我可以看到核心几乎空闲的间歇,而它可能正在执行更便宜的聚合。在 test2 期间,它们几乎一直处于最大状态。我还在 eager 模式下进行了这两项测试,结果却发生了逆转。在 Eager 模式下 test1 为 47.90,而 test2 为 61.49。

根据上述结果,我猜测在

collect_all
模式下,它将一帧交给每个核心,并且该帧仅由该核心处理,而不是每个表达式获得一个核心。因此,预分组操作比聚合操作贵得多并不重要。它会一直努力工作。

在单帧模式下,它无法提前知道操作的成本有多大,因此它只是根据它们的自然组对它们进行分组。结果,它采用第一个组,进行预组操作,然后将其带到聚合中。在聚合过程中,它的工作并不那么努力,我想,这就是为什么我看到核心在波浪中接近闲置的原因。直到聚合完成后,它才会开始下一组,并且核心的强度会再次增加。

我确信我的猜测并不完全正确,但我认为它很接近。

© www.soinside.com 2019 - 2024. All rights reserved.