我的问题:需要了解spark中动态前向填充和回向填充的时间复杂度 你好,我有一个 scala 作业,它读取 Delta 表 A、转换数据帧并写入 Delta 表 B(空表)
该作业运行处理 94,356,726 条记录并在 16 分钟内完成。
但是,在向 Dataframe 转换添加动态填充逻辑后,作业执行了 2 小时 20 分钟。
性能差距有多大,回填的时间复杂度是多少?
详情 回填的目的
我有这样的数据:
id | 版本 | 数据列 |
---|---|---|
0 | 4 | 空 |
0 | 3 | “测试2” |
0 | 2 | 空 |
0 | 1 | “测试1” |
我想得到这样的数据:
id | 版本 | 数据列 |
---|---|---|
0 | 4 | “测试 2”//从版本 3 填充 |
0 | 3 | “测试2” |
0 | 2 | “测试 1”//从版本 2 填充 |
0 | 1 | “测试1” |
我的方法 这里使用以下方法:Pyspark:使用 DataFrame 的最后观察值进行前向填充
// before using backfilling, the job uses the "partitionWindow " for dedup
val partitionWindow = Window
.partitionBy("id")
.orderBy(desc("version"), desc("timestamp'))
val backfillWindow = partitionWindow
.rowsBetween(0, Window.unboundedFollowing)
df.withColumn("data_filled", coalesce(first("data_column", true).over(backfillWindow), col("data_column")))
我的观察
更多想法: 我正在做单向回填,它应该接近 O(Nlog(N)),对吧?也许双向填充(同时向前和向后)会有所不同
谢谢!
将窗口规格从
.rowsBetween(0, Window.unboundedFollowing)
更改为 .rowsBetween(Window.unboundedPreceding, 0)
,并反转 bfill/ffill 的逻辑。
根据测试,当 bfill/ffill 代码指定为
.rowsBetween(0, Window.unboundedFollowing)
与 .rowsBetween(Window.unboundedPreceding, 0)
时,执行会恶化(并且无论执行回填操作 F.first
或前向填充操作 F.last
)。查询执行计划显示 WholeStageCodeGen
前者比后者花费的时间要长得多,我不确定为什么,但我发现进行上述更改至少可以提高一个数量级,具体取决于数据集中的行数。 .
在 PySpark 中进行测试,让我们创建一些具有一些空值的合成数据。
import itertools as it
import pyspark.sql.functions as F
from pyspark.sql import DataFrame, SparkSession, Window
spark = SparkSession.builder.master("local[*]").getOrCreate()
print(spark.version)
# Create synthetic data
data = list(it.product(range(30_000), ["ham", "eggs"]))
# create null data, every x % 3 == 1 for ham and every x % 3 == 2 for eggs
_data = []
for _ix, t in enumerate(data):
if ((t[1] == "ham") and ((t[0] % 3) == 1)) or ((t[1] == "eggs") and ((t[0] % 3) == 2)):
_data.append((None, t[1], _ix))
else:
_data.append((*t, _ix))
sdf = spark.createDataFrame(_data, ["value", "name", "timestamp"]).select(
"name", "timestamp", "value"
)
sdf.limit(20).show()
创建以下 Spark 数据框:
3.4.1
+----+---------+-----+
|name|timestamp|value|
+----+---------+-----+
| ham| 0| 0|
|eggs| 1| 0|
| ham| 2| null|
|eggs| 3| 1|
| ham| 4| 2|
|eggs| 5| null|
| ham| 6| 3|
|eggs| 7| 3|
| ham| 8| null|
|eggs| 9| 4|
| ham| 10| 5|
|eggs| 11| null|
| ham| 12| 6|
|eggs| 13| 6|
| ham| 14| null|
|eggs| 15| 7|
| ham| 16| 8|
|eggs| 17| null|
| ham| 18| 9|
|eggs| 19| 9|
+----+---------+-----+
我们定义了 2 个不同的回填操作,第一个使用
.rowsBetween(0, Window.unboundedFollowing)
,第二个使用 .rowsBetween(Window.unboundedPreceding, 0)
。
def bfill_unbounded_following():
window = (
Window.partitionBy(["name"])
.orderBy(F.asc("timestamp"))
.rowsBetween(0, Window.unboundedFollowing)
)
return (F.first("value", ignorenulls=True).over(window)).alias("value_bfill_following")
def bfill_unbounded_preceding():
window = (
Window.partitionBy(["name"])
.orderBy(F.desc("timestamp"))
.rowsBetween(Window.unboundedPreceding, 0)
)
return (F.last("value", ignorenulls=True).over(window)).alias("value_bfill_preceding")
首先我们检查两个函数产生相同的输出。为了更容易看到插补,我们显示
eggs
分区:
sdf_out = sdf.select(
"*",
bfill_unbounded_following(),
bfill_unbounded_preceding(),
(F.col("value_bfill_following") == F.col("value_bfill_preceding")).alias("check_equals"),
).sort("name", "timestamp")
assert sdf_out.where(~F.col("check_equals")).count() == 0
print("test passed")
sdf_out.limit(20).show()
这些函数产生相同的输出,并且测试已通过。
test passed
+----+---------+-----+---------------------+---------------------+------------+
|name|timestamp|value|value_bfill_following|value_bfill_preceding|check_equals|
+----+---------+-----+---------------------+---------------------+------------+
|eggs| 1| 0| 0| 0| true|
|eggs| 3| 1| 1| 1| true|
|eggs| 5| null| 3| 3| true|
|eggs| 7| 3| 3| 3| true|
|eggs| 9| 4| 4| 4| true|
|eggs| 11| null| 6| 6| true|
|eggs| 13| 6| 6| 6| true|
|eggs| 15| 7| 7| 7| true|
|eggs| 17| null| 9| 9| true|
|eggs| 19| 9| 9| 9| true|
|eggs| 21| 10| 10| 10| true|
|eggs| 23| null| 12| 12| true|
|eggs| 25| 12| 12| 12| true|
|eggs| 27| 13| 13| 13| true|
|eggs| 29| null| 15| 15| true|
|eggs| 31| 15| 15| 15| true|
|eggs| 33| 16| 16| 16| true|
|eggs| 35| null| 18| 18| true|
|eggs| 37| 18| 18| 18| true|
|eggs| 39| 19| 19| 19| true|
+----+---------+-----+---------------------+---------------------+------------+
运行
bfill_unbounded_following()
比bfill_unbounded_preceding()
慢100倍!
%%timeit -n 1
_ = sdf.select("*", bfill_unbounded_following()).collect()
22.1 s ± 289 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
%%timeit -n 1
_ = sdf.select("*", bfill_unbounded_preceding()).collect()
191 ms ± 14.2 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
分析
bfill_unbounded_following
函数的执行计划,我们可以看到瓶颈在哪里:WholeStageCodeGen (2)
需要约 12 秒才能运行(未显示:bfill_unbounded_preceding
的执行计划,但速度要快得多)。
现在,我不太清楚为什么会有这么大的差异。更糟糕的是,将窗口指定为
.rowsBetween(0, Window.unboundedFollowing)
时的执行时间会随着行数非线性增长。对于 10k 数据集,速度慢 25 倍,对于 30k 数据集,速度慢 100 倍(参见测试)。