设置示例
警告:5GB内存 df 创建
import time
import numpy as np
import polars as pl
rng = np.random.default_rng(1)
nrows = 50_000_000
df = pl.DataFrame(
dict(
id=rng.integers(1, 50, nrows),
id2=rng.integers(1, 500, nrows),
v=rng.normal(0, 1, nrows),
v1=rng.normal(0, 1, nrows),
v2=rng.normal(0, 1, nrows),
v3=rng.normal(0, 1, nrows),
v4=rng.normal(0, 1, nrows),
v5=rng.normal(0, 1, nrows),
v6=rng.normal(0, 1, nrows),
v7=rng.normal(0, 1, nrows),
v8=rng.normal(0, 1, nrows),
v9=rng.normal(0, 1, nrows),
v10=rng.normal(0, 1, nrows),
)
)
我手头有一个简单的任务如下。
start = time.perf_counter()
res = (
df.lazy()
.with_columns(
pl.col(f"v{i}") - pl.col(f"v{i}").mean().over("id", "id2")
for i in range(1, 11)
)
.group_by("id", "id2")
.agg((pl.col(f"v{i}") * pl.col("v")).sum() for i in range(1, 11))
.collect()
)
time.perf_counter() - start
# 9.85
上述任务在 16 核机器上大约 10 秒内完成。
但是,如果我首先将
df
拆分/分区为 id
,然后执行与上面相同的计算并在最后调用 collect_all
和 concat
,我可以获得近 2 倍的加速。
start = time.perf_counter()
res2 = pl.concat(
pl.collect_all(
dfi.lazy()
.with_columns(
pl.col(f"v{i}") - pl.col(f"v{i}").mean().over("id", "id2")
for i in range(1, 11)
)
.group_by("id", "id2")
.agg((pl.col(f"v{i}") * pl.col("v")).sum() for i in range(1, 11))
for dfi in df.partition_by("id", maintain_order=False)
)
)
time.perf_counter() - start
# 5.60
另外,如果我用
id2
而不是id
来分区的话,时间会更快~4s。
我还注意到第二种方法(按
id
或id2
分区)比第一种方法具有更好的CPU利用率。也许这就是第二种方法更快的原因。
我的问题是:
最初的免责声明,这有点推测性,因为我还没有查看来源,但我认为这是有根据的
我在多种模式下进行了这两项测试
测试 | 时间 |
---|---|
测试1(原样) | 20.44 |
测试2(原样) | 16.07 |
test1 w/aggpow | 22.41 |
test2 w/aggpow | 19.99 |
test1 w/w/pow | 52.08 |
test2 w/w/pow | 21.81 |
test1 w/aggpow eager | 47.90 |
test2 w/aggpow eager | 61.49 |
在第一次运行时,我从 test1 到 test2 得到了 20.44 和 16.07,因此我正在复制与您相同的方向,但我的计算机上的严重程度较低。
我在两次测试中都运行了 htop。内存似乎在大致相同的使用情况下达到峰值,但不同的是,在 test1 中,核心没有完全加载,而在 test2 中,我可以看到所有核心都非常一致地接近顶部。
为了进一步探索这一点,我在 with_columns 中添加了
.pow(1.2).pow(0.3888)
,并在聚合中添加了另一轮。
在聚合中进行那些昂贵的操作(具体来说,是在
sum()
test1 取得了 22.41 而 test2 取得了 19.985 之后。
我将其从聚合中取出并放入 with_columns 中(特别是在
pl.col(f"v{i}")
上,而不是仅原始第一个的平均值)。由于那里的手术费用昂贵,差异确实令人震惊。测试 1 为 52.08,测试 2 为 21.81。
在测试 1 期间,我可以看到核心几乎空闲的间歇,而它可能正在执行更便宜的聚合。在 test2 期间,它们几乎一直处于最大状态。我还在 eager 模式下进行了这两项测试,结果却发生了逆转。在 Eager 模式下 test1 为 47.90,而 test2 为 61.49。
根据上述结果,我猜测在
collect_all
模式下,它将一帧交给每个核心,并且该帧仅由该核心处理,而不是每个表达式获得一个核心。因此,预分组操作比聚合操作贵得多并不重要。它会一直努力工作。
在单帧模式下,它无法提前知道操作的成本有多大,因此它只是根据它们的自然组对它们进行分组。结果,它采用第一个组,进行预组操作,然后将其带到聚合中。在聚合过程中,它的工作并不那么努力,我想,这就是为什么我看到核心在波浪中接近闲置的原因。直到聚合完成后,它才会开始下一组,并且核心的强度会再次增加。
我确信我的猜测并不完全正确,但我认为它很接近。