spark中前向填充和后向填充的时间复杂度是多少?

问题描述 投票:0回答:1

我的问题:需要了解spark中动态前向填充和回向填充的时间复杂度 你好,我有一个 scala 作业,它读取 Delta 表 A、转换数据帧并写入 Delta 表 B(空表)

该作业运行处理 94,356,726 条记录并在 16 分钟内完成。

但是,在向 Dataframe 转换添加动态填充逻辑后,作业执行了 2 小时 20 分钟。

性能差距有多大,回填的时间复杂度是多少?

详情 回填的目的

我有这样的数据:

id 版本 数据列
0 4
0 3 “测试2”
0 2
0 1 “测试1”

我想得到这样的数据:

id 版本 数据列
0 4 “测试 2”//从版本 3 填充
0 3 “测试2”
0 2 “测试 1”//从版本 2 填充
0 1 “测试1”

我的方法 这里使用以下方法:Pyspark:使用 DataFrame 的最后观察值进行前向填充

// before using backfilling, the job uses the "partitionWindow " for dedup
val partitionWindow = Window
  .partitionBy("id")
  .orderBy(desc("version"), desc("timestamp'))

val backfillWindow = partitionWindow
  .rowsBetween(0, Window.unboundedFollowing)

df.withColumn("data_filled", coalesce(first("data_column", true).over(backfillWindow), col("data_column")))

我的观察

  • 执行时间:它按预期工作,但性能大幅下降,从 16 分钟开始 至2小时20分钟
  • Spark阶段:在Spark历史服务器中,瓶颈是 “Delta Lake 感人文件:low shuffle merge”,差距1.8分钟 对比 1.1 小时
  • 物理计划:我使用winmerge来比较“Delta Lake moving files: low shuffle merge”阶段的物理计划,2个作业之间,没有太大区别
  • 我检查了pyspark ffill和bfill,它们应该具有相似的时间复杂度,文档没有提到时间复杂度,只是说:“避免对非常大的数据集使用这种方法。”

更多想法: 我正在做单向回填,它应该接近 O(Nlog(N)),对吧?也许双向填充(同时向前和向后)会有所不同

谢谢!

scala performance apache-spark pyspark data-processing
1个回答
0
投票

tl;博士

将窗口规格从

.rowsBetween(0, Window.unboundedFollowing)
更改为
.rowsBetween(Window.unboundedPreceding, 0)
,并反转 bfill/ffill 的逻辑。

请继续阅读

根据测试,当 bfill/ffill 代码指定为

.rowsBetween(0, Window.unboundedFollowing)
.rowsBetween(Window.unboundedPreceding, 0)
时,执行会恶化(并且无论执行回填操作
F.first
或前向填充操作
F.last
)。查询执行计划显示
WholeStageCodeGen 
前者比后者花费的时间要长得多,我不确定为什么,但我发现进行上述更改至少可以提高一个数量级,具体取决于数据集中的行数。 .

测试

设置

在 PySpark 中进行测试,让我们创建一些具有一些空值的合成数据。

import itertools as it

import pyspark.sql.functions as F
from pyspark.sql import DataFrame, SparkSession, Window

spark = SparkSession.builder.master("local[*]").getOrCreate()
print(spark.version)

# Create synthetic data
data = list(it.product(range(30_000), ["ham", "eggs"]))
# create null data, every x % 3 == 1 for ham and every x % 3 == 2 for eggs
_data = []
for _ix, t in enumerate(data):
    if ((t[1] == "ham") and ((t[0] % 3) == 1)) or ((t[1] == "eggs") and ((t[0] % 3) == 2)):
        _data.append((None, t[1], _ix))
    else:
        _data.append((*t, _ix))

sdf = spark.createDataFrame(_data, ["value", "name", "timestamp"]).select(
    "name", "timestamp", "value"
)

sdf.limit(20).show()

创建以下 Spark 数据框:

3.4.1

+----+---------+-----+
|name|timestamp|value|
+----+---------+-----+
| ham|        0|    0|
|eggs|        1|    0|
| ham|        2| null|
|eggs|        3|    1|
| ham|        4|    2|
|eggs|        5| null|
| ham|        6|    3|
|eggs|        7|    3|
| ham|        8| null|
|eggs|        9|    4|
| ham|       10|    5|
|eggs|       11| null|
| ham|       12|    6|
|eggs|       13|    6|
| ham|       14| null|
|eggs|       15|    7|
| ham|       16|    8|
|eggs|       17| null|
| ham|       18|    9|
|eggs|       19|    9|
+----+---------+-----+

定义

我们定义了 2 个不同的回填操作,第一个使用

.rowsBetween(0, Window.unboundedFollowing)
,第二个使用
.rowsBetween(Window.unboundedPreceding, 0)

def bfill_unbounded_following():
    window = (
        Window.partitionBy(["name"])
        .orderBy(F.asc("timestamp"))
        .rowsBetween(0, Window.unboundedFollowing)
    )
    return (F.first("value", ignorenulls=True).over(window)).alias("value_bfill_following")


def bfill_unbounded_preceding():
    window = (
        Window.partitionBy(["name"])
        .orderBy(F.desc("timestamp"))
        .rowsBetween(Window.unboundedPreceding, 0)
    )
    return (F.last("value", ignorenulls=True).over(window)).alias("value_bfill_preceding")

结果

输出

首先我们检查两个函数产生相同的输出。为了更容易看到插补,我们显示

eggs
分区:

sdf_out = sdf.select(
    "*",
    bfill_unbounded_following(),
    bfill_unbounded_preceding(),
    (F.col("value_bfill_following") == F.col("value_bfill_preceding")).alias("check_equals"),
).sort("name", "timestamp")

assert sdf_out.where(~F.col("check_equals")).count() == 0
print("test passed")

sdf_out.limit(20).show()

这些函数产生相同的输出,并且测试已通过。

test passed
+----+---------+-----+---------------------+---------------------+------------+
|name|timestamp|value|value_bfill_following|value_bfill_preceding|check_equals|
+----+---------+-----+---------------------+---------------------+------------+
|eggs|        1|    0|                    0|                    0|        true|
|eggs|        3|    1|                    1|                    1|        true|
|eggs|        5| null|                    3|                    3|        true|
|eggs|        7|    3|                    3|                    3|        true|
|eggs|        9|    4|                    4|                    4|        true|
|eggs|       11| null|                    6|                    6|        true|
|eggs|       13|    6|                    6|                    6|        true|
|eggs|       15|    7|                    7|                    7|        true|
|eggs|       17| null|                    9|                    9|        true|
|eggs|       19|    9|                    9|                    9|        true|
|eggs|       21|   10|                   10|                   10|        true|
|eggs|       23| null|                   12|                   12|        true|
|eggs|       25|   12|                   12|                   12|        true|
|eggs|       27|   13|                   13|                   13|        true|
|eggs|       29| null|                   15|                   15|        true|
|eggs|       31|   15|                   15|                   15|        true|
|eggs|       33|   16|                   16|                   16|        true|
|eggs|       35| null|                   18|                   18|        true|
|eggs|       37|   18|                   18|                   18|        true|
|eggs|       39|   19|                   19|                   19|        true|
+----+---------+-----+---------------------+---------------------+------------+
执行时间

运行

bfill_unbounded_following()
bfill_unbounded_preceding()
慢100倍!

%%timeit -n 1

_ = sdf.select("*", bfill_unbounded_following()).collect()

22.1 s ± 289 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
%%timeit -n 1

_ = sdf.select("*", bfill_unbounded_preceding()).collect()

191 ms ± 14.2 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
执行计划

分析

bfill_unbounded_following
函数的执行计划,我们可以看到瓶颈在哪里:
WholeStageCodeGen (2)
需要约 12 秒才能运行(未显示:
bfill_unbounded_preceding
的执行计划,但速度要快得多)。

bfill_unbounded_following

现在,我不太清楚为什么会有这么大的差异。更糟糕的是,将窗口指定为

.rowsBetween(0, Window.unboundedFollowing)
时的执行时间会随着行数非线性增长。对于 10k 数据集,速度慢 25 倍,对于 30k 数据集,速度慢 100 倍(参见测试)。

© www.soinside.com 2019 - 2024. All rights reserved.