需要帮助理解为什么引入 GROUP BY 后 Spark 查询需要更长的时间来执行

问题描述 投票:0回答:1

我在 Oracle 数据库中有 3 个表,我正在尝试加入这些表并在其上运行一些聚合:

orders: (3000 + rows)
order_line_items: (5000 + rows)
item_wms: (14 million rows)

当我在 PySpark 中运行以下代码时:

joined_df = (orders_df.alias("o")
             .join(orders_line_item_df.alias("oli"), F.col("o.order_id") == F.col("oli.order_id"), how="inner")
             .join(item_wms_df.alias("iw"), F.col("oli.item_id") == F.col("iw.item_id"), how="inner")
             .filter(F.col("o.do_status").isin(["110"]))
)

display(joined_df.limit(100))

它在 40 秒内完成并生成以下查询计划:

== Physical Plan ==
AdaptiveSparkPlan (25)
+- == Final Plan ==
   ResultQueryStage (14), Statistics(sizeInBytes=981.6 KiB, rowCount=100, ColumnStat: N/A, isRuntime=true)
   +- CollectLimit (13)
      +- BroadcastHashJoin Inner BuildLeft (12)
         :- AQEShuffleRead (10)
         :  +- ShuffleQueryStage (9), Statistics(sizeInBytes=10.9 MiB, rowCount=2.01E+3, ColumnStat: N/A, isRuntime=true)
         :     +- Exchange (8)
         :        +- BroadcastHashJoin Inner BuildLeft (7)
         :           :- AQEShuffleRead (5)
         :           :  +- ShuffleQueryStage (4), Statistics(sizeInBytes=4.9 MiB, rowCount=1.35E+3, ColumnStat: N/A, isRuntime=true)
         :           :     +- Exchange (3)
         :           :        +- Filter (2)
         :           :           +- Scan JDBCRelation(orders) [numPartitions=1]  (1)
         :           +- Scan JDBCRelation(order_line_item) [numPartitions=1]  (6)
         +- Scan JDBCRelation(item_wms) [numPartitions=1]  (11)

但是当我添加带有

GROUP BY
聚合的
MIN
时,查询未完成:

SELECT
        o.order_id,
        MIN(
            CASE
                WHEN iw.spl_instr_code_4 IN(
                    'L', 'D' 
                )
                     THEN
                    '1-' 
                    WHEN bill_facility_alias_id = '0000002290'
                    THEN '2-' 
                WHEN bill_facility_alias_id  IN ('0000000514','0000000963')
                    THEN '3-' 
                WHEN bill_facility_alias_id IN('0000468976','0000531509')
                    THEN '4-' 
                WHEN bill_facility_alias_id ='0000505550'
                    THEN '5-' 
                WHEN bill_facility_alias_id  IN('0000375347','0000004281','0000387830','0000435748','0000411534','0000212223','0000372420',
                '0000397211','0000423621','0000001871','0000392981','0000283730','0000268523','0000451376','0000464362', '0000473708')
                    THEN '6-' || 'NFS'
                WHEN bill_facility_alias_id not IN
                ('0000002290','0000000514','0000000963','0000468976','0000004696','0000375347','0000004281','0000387830','0000435748','0000411534','0000212223','0000372420','0000397211','0000423621','0000001871','0000392981','0000283730','0000268523','0000451376','0000464362','0000473708', '0000505550')
                    THEN '7-' 
                ELSE
                    '8-' 
            END
        ) new_priority
    FROM
        orders            o
        JOIN order_line_item   oli ON oli.order_id = o.order_id
        JOIN item_wms          iw ON oli.item_id = iw.item_id
    WHERE
        o.do_status IN (
            '110'
        )
    GROUP BY
        o.order_id

生成的查询计划为:

== Physical Plan ==
AdaptiveSparkPlan (41)
+- == Current Plan ==
   CollectLimit (22)
   +- SortAggregate (21)
      +- Sort (20)
         +- ShuffleQueryStage (19), Statistics(sizeInBytes=2.02E+22 B, ColumnStat: N/A)
            +- Exchange (18)
               +- SortAggregate (17)
                  +- * Sort (16)
                     +- * Project (15)
                        +- * BroadcastHashJoin Inner BuildLeft (14)
                           :- AQEShuffleRead (12)
                           :  +- ShuffleQueryStage (11), Statistics(sizeInBytes=94.3 KiB, rowCount=2.01E+3, ColumnStat: N/A, isRuntime=true)
                           :     +- Exchange (10)
                           :        +- * Project (9)
                           :           +- * BroadcastHashJoin Inner BuildLeft (8)
                           :              :- AQEShuffleRead (6)
                           :              :  +- ShuffleQueryStage (5), Statistics(sizeInBytes=52.8 KiB, rowCount=1.35E+3, ColumnStat: N/A, isRuntime=true)
                           :              :     +- Exchange (4)
                           :              :        +- * Project (3)
                           :              :           +- * Filter (2)
                           :              :              +- * Scan JDBCRelation(orders) [numPartitions=1]  (1)
                           :              +- * Scan JDBCRelation(order_line_item) [numPartitions=1]  (7)
                           +- * Scan JDBCRelation(item_wms) [numPartitions=1]  (13)

我的理解是:

  • 我认为第一个查询提前完成的原因是因为它不需要完全连接。
    CollectLimit
    实际上限制了结果,它只能对 100 行进行连接并退出。
  • 第二个查询的情况并非如此。它需要对整个表进行排序,然后在
    order_id
    上聚合。

我说得对吗?

更多问题:

  • 查询计划中各种
    +
    符号的含义是什么?这是否意味着这是一个不同的阶段或不同的工作?
  • 当我阅读查询计划时,我应该如何解释何时创建新作业或阶段?

我问这个问题是因为我是 Spark 查询优化的初学者,需要知道如何解释查询计划。

集群信息: 4 核 32 GB 单节点。

apache-spark pyspark apache-spark-sql query-optimization database-performance
1个回答
0
投票

我会尝试回答您的一些问题:

1.在第一个查询中,由于 limit(100),Spark 只处理一小部分数据。查询计划包含一个

CollectLimit
操作,这意味着 Spark 在收集到 100 行后立即限制数据。

2.在第二个查询中,

GROUP BY
操作通常需要对数据的所有分区进行排序或洗牌操作。在执行聚合之前,Spark 会执行
ShuffleQueryStage
来跨分区重新分配数据。这就是查询需要更长时间才能完成的原因。您将看到诸如
SortAggregate
Exchange
ShuffleQueryStage
之类的阶段。这些阶段代表排序、洗牌等。

+
是什么意思?

查询计划中的+号并不代表不同的阶段或 工作机会。它们用于表示嵌套操作。阶段越深 树中的 + 符号越多。

当新的工作或舞台被创建时我应该如何解释?

您可以使用 Spark UI 可视化新阶段并识别内存 每个任务的使用情况等

© www.soinside.com 2019 - 2024. All rights reserved.