我在 Oracle 数据库中有 3 个表,我正在尝试加入这些表并在其上运行一些聚合:
orders: (3000 + rows)
order_line_items: (5000 + rows)
item_wms: (14 million rows)
当我在 PySpark 中运行以下代码时:
joined_df = (orders_df.alias("o")
.join(orders_line_item_df.alias("oli"), F.col("o.order_id") == F.col("oli.order_id"), how="inner")
.join(item_wms_df.alias("iw"), F.col("oli.item_id") == F.col("iw.item_id"), how="inner")
.filter(F.col("o.do_status").isin(["110"]))
)
display(joined_df.limit(100))
它在 40 秒内完成并生成以下查询计划:
== Physical Plan ==
AdaptiveSparkPlan (25)
+- == Final Plan ==
ResultQueryStage (14), Statistics(sizeInBytes=981.6 KiB, rowCount=100, ColumnStat: N/A, isRuntime=true)
+- CollectLimit (13)
+- BroadcastHashJoin Inner BuildLeft (12)
:- AQEShuffleRead (10)
: +- ShuffleQueryStage (9), Statistics(sizeInBytes=10.9 MiB, rowCount=2.01E+3, ColumnStat: N/A, isRuntime=true)
: +- Exchange (8)
: +- BroadcastHashJoin Inner BuildLeft (7)
: :- AQEShuffleRead (5)
: : +- ShuffleQueryStage (4), Statistics(sizeInBytes=4.9 MiB, rowCount=1.35E+3, ColumnStat: N/A, isRuntime=true)
: : +- Exchange (3)
: : +- Filter (2)
: : +- Scan JDBCRelation(orders) [numPartitions=1] (1)
: +- Scan JDBCRelation(order_line_item) [numPartitions=1] (6)
+- Scan JDBCRelation(item_wms) [numPartitions=1] (11)
但是当我添加带有
GROUP BY
聚合的 MIN
时,查询未完成:
SELECT
o.order_id,
MIN(
CASE
WHEN iw.spl_instr_code_4 IN(
'L', 'D'
)
THEN
'1-'
WHEN bill_facility_alias_id = '0000002290'
THEN '2-'
WHEN bill_facility_alias_id IN ('0000000514','0000000963')
THEN '3-'
WHEN bill_facility_alias_id IN('0000468976','0000531509')
THEN '4-'
WHEN bill_facility_alias_id ='0000505550'
THEN '5-'
WHEN bill_facility_alias_id IN('0000375347','0000004281','0000387830','0000435748','0000411534','0000212223','0000372420',
'0000397211','0000423621','0000001871','0000392981','0000283730','0000268523','0000451376','0000464362', '0000473708')
THEN '6-' || 'NFS'
WHEN bill_facility_alias_id not IN
('0000002290','0000000514','0000000963','0000468976','0000004696','0000375347','0000004281','0000387830','0000435748','0000411534','0000212223','0000372420','0000397211','0000423621','0000001871','0000392981','0000283730','0000268523','0000451376','0000464362','0000473708', '0000505550')
THEN '7-'
ELSE
'8-'
END
) new_priority
FROM
orders o
JOIN order_line_item oli ON oli.order_id = o.order_id
JOIN item_wms iw ON oli.item_id = iw.item_id
WHERE
o.do_status IN (
'110'
)
GROUP BY
o.order_id
生成的查询计划为:
== Physical Plan ==
AdaptiveSparkPlan (41)
+- == Current Plan ==
CollectLimit (22)
+- SortAggregate (21)
+- Sort (20)
+- ShuffleQueryStage (19), Statistics(sizeInBytes=2.02E+22 B, ColumnStat: N/A)
+- Exchange (18)
+- SortAggregate (17)
+- * Sort (16)
+- * Project (15)
+- * BroadcastHashJoin Inner BuildLeft (14)
:- AQEShuffleRead (12)
: +- ShuffleQueryStage (11), Statistics(sizeInBytes=94.3 KiB, rowCount=2.01E+3, ColumnStat: N/A, isRuntime=true)
: +- Exchange (10)
: +- * Project (9)
: +- * BroadcastHashJoin Inner BuildLeft (8)
: :- AQEShuffleRead (6)
: : +- ShuffleQueryStage (5), Statistics(sizeInBytes=52.8 KiB, rowCount=1.35E+3, ColumnStat: N/A, isRuntime=true)
: : +- Exchange (4)
: : +- * Project (3)
: : +- * Filter (2)
: : +- * Scan JDBCRelation(orders) [numPartitions=1] (1)
: +- * Scan JDBCRelation(order_line_item) [numPartitions=1] (7)
+- * Scan JDBCRelation(item_wms) [numPartitions=1] (13)
我的理解是:
CollectLimit
实际上限制了结果,它只能对 100 行进行连接并退出。order_id
上聚合。我说得对吗?
更多问题:
+
符号的含义是什么?这是否意味着这是一个不同的阶段或不同的工作?我问这个问题是因为我是 Spark 查询优化的初学者,需要知道如何解释查询计划。
集群信息: 4 核 32 GB 单节点。
我会尝试回答您的一些问题:
1.在第一个查询中,由于 limit(100),Spark 只处理一小部分数据。查询计划包含一个
CollectLimit
操作,这意味着 Spark 在收集到 100 行后立即限制数据。
2.在第二个查询中,
GROUP BY
操作通常需要对数据的所有分区进行排序或洗牌操作。在执行聚合之前,Spark 会执行 ShuffleQueryStage
来跨分区重新分配数据。这就是查询需要更长时间才能完成的原因。您将看到诸如 SortAggregate
、Exchange
和 ShuffleQueryStage
之类的阶段。这些阶段代表排序、洗牌等。
+
是什么意思?
查询计划中的+号并不代表不同的阶段或 工作机会。它们用于表示嵌套操作。阶段越深 树中的 + 符号越多。
当新的工作或舞台被创建时我应该如何解释?
您可以使用 Spark UI 可视化新阶段并识别内存 每个任务的使用情况等