Spark HashAggregate 在 SparkSQL 中交换列

问题描述 投票:0回答:1

我有一个 AWS Glue 作业,它使用 Spark SQL 连接两个数据帧。该作业连续 6 个月每周正确运行,没有出现任何问题,然后连接突然开始交换结果数据集中的值。两个数据帧的架构没有改变。在每周运行期间,过去 6 个月的结果已交换(始终按相同顺序)。

我的 Glue 作业中所有 7 个不同的 Spark 连接都发生这种情况。

仅使用 SparkSQL“select * from df_view”不会混淆列。 Spark 使用 SparkSQL 和使用 Spark API 的 .join 方法混合连接结果。

查看查询计划,交换似乎发生在第 7 步 HashAggregate 中。有人知道为什么会发生这种情况以及如何解决吗?

counts_df.createOrReplaceTempView("counts_vw")
logger.info(counts_df.printSchema())
renamed_df2.createOrReplaceTempView("tmp_vw")
logger.info(renamed_df2.printSchema())

#pull in the var_index field
spark_df = spark.sql("""select tmp_vw.*, counts_vw.var_index
                        from tmp_vw, counts_vw
                        where tmp_vw.prod_id = counts_vw.prod_id
                            and tmp_vw.var_sku = counts_vw.var_sku
                        order by prod_id, var_sku, run_date
                    """)

logger.info(spark_df.explain('FORMATTED'))

输出:

root
 |-- prod_id: string (nullable = true)
 |-- var_id: string (nullable = true)
 |-- var_sku: string (nullable = true)
 |-- var_public_title: string (nullable = true)
 |-- var_index: string (nullable = true)

root
 |-- run_date: string (nullable = true)
 |-- prod_id: string (nullable = true)
 |-- prod_vendor: string (nullable = true)
 |-- prod_type: string (nullable = true)
 |-- var_id: string (nullable = true)
 |-- var_price: string (nullable = true)
 |-- var_name: string (nullable = true)
 |-- var_public_title: string (nullable = true)
 |-- var_sku: string (nullable = true)
 |-- url: string (nullable = true)

== Physical Plan ==
AdaptiveSparkPlan (21)
+- Sort (20)
   +- Exchange (19)
      +- Project (18)
         +- SortMergeJoin Inner (17)
            :- Sort (11)
            :  +- Exchange (10)
            :     +- HashAggregate (9)
            :        +- Exchange (8)
            :           +- HashAggregate (7)
            :              +- Union (6)
            :                 :- Filter (2)
            :                 :  +- Scan ExistingRDD (1)
            :                 +- Project (5)
            :                    +- Filter (4)
            :                       +- Scan ExistingRDD (3)
            +- Sort (16)
               +- Exchange (15)
                  +- Project (14)
                     +- Filter (13)
                        +- Scan ExistingRDD (12)

(1) Scan ExistingRDD
Output [10]: [run_date#202, prod_id#203, prod_vendor#204, prod_type#205, var_id#206, var_price#207, var_name#208, var_public_title#209, var_sku#210, url#211]
Arguments: [run_date#202, prod_id#203, prod_vendor#204, prod_(2) Filter
Input [10]: [run_date#202, prod_id#203, prod_vendor#204, prod_type#205, var_id#206, var_price#207, var_name#208, var_public_title#209, var_sku#210, url#211]
Condition : (isnotnull(prod_id#203) AND isnotnull(var_sku#210))

(3) Scan ExistingRDD
Output [10]: [run_date#222, prod_id#223L, prod_vendor#224, prod_type#225, var_id#226L, var_price#227, var_name#228, var_public_title#229, var_sku#230, url#231]
Arguments: [run_date#222, prod_id#223L, prod_vendor#224, prod_type#225, var_id#226L, var_price#227, var_name#228, var_public_title#229, var_sku#230, url#231], MapPartitionsRDD[78] at map at DynamicFrame.scala:384, ExistingRDD, UnknownPartitioning(0)

(4) Filter
Input [10]: [run_date#222, prod_id#223L, prod_vendor#224, prod_type#225, var_id#226L, var_price#227, var_name#228, var_public_title#229, var_sku#230, url#231]
Condition : (isnotnull(cast(prod_id#223L as string)) AND isnotnull(var_sku#230))

(5) Project
Output [10]: [run_date#222, cast(prod_id#223L as string) AS prod_id#253, prod_vendor#224, prod_type#225, cast(var_id#226L as string) AS var_id#254, cast(var_price#227 as string) AS var_price#255, var_name#228, var_public_title#229, var_sku#230, substring(url#231, 44, 200) AS url#242]
Input [10]: [run_date#222, prod_id#223L, prod_vendor#224, prod_type#225, var_id#226L, var_price#227, var_name#228, var_public_title#229, var_sku#230, url#231]

(6) Union

(7) HashAggregate
Input [10]: [run_date#202, prod_id#203, prod_vendor#204, prod_type#205, var_id#206, var_price#207, var_name#208, var_public_title#209, var_sku#210, url#211]
Keys [10]: [prod_id#203, var_public_title#209, prod_vendor#204, var_sku#210, url#211, run_date#202, var_id#206, prod_type#205, var_name#208, var_price#207]
Functions: []
Aggregate Attributes: []
Results [10]: [prod_id#203, var_public_title#209, prod_vendor#204, var_sku#210, url#211, run_date#202, var_id#206, prod_type#205, var_name#208, var_price#207]

(8) Exchange
Input [10]: [prod_id#203, var_public_title#209, prod_vendor#204, var_sku#210, url#211, run_date#202, var_id#206, prod_type#205, var_name#208, var_price#207]
Arguments: hashpartitioning(prod_id#203, var_public_title#209, prod_vendor#204, var_sku#210, url#211, run_date#202, var_id#206, prod_type#205, var_name#208, var_price#207, 8), ENSURE_REQUIREMENTS, [id=#723]

(9) HashAggregate
Input [10]: [prod_id#203, var_public_title#209, prod_vendor#204, var_sku#210, url#211, run_date#202, var_id#206, prod_type#205, var_name#208, var_price#207]
Keys [10]: [prod_id#203, var_public_title#209, prod_vendor#204, var_sku#210, url#211, run_date#202, var_id#206, prod_type#205, var_name#208, var_price#207]
Functions: []
Aggregate Attributes: []
Results [10]: [run_date#202, prod_id#203, prod_vendor#204, prod_type#205, var_id#206, var_price#207, var_name#208, var_public_title#209, var_sku#210, translate(url#211, ?,, ) AS url#266]

(10) Exchange
Input [10]: [run_date#202, prod_id#203, prod_vendor#204, prod_type#205, var_id#206, var_price#207, var_name#208, var_public_title#209, var_sku#210, url#266]
Arguments: hashpartitioning(prod_id#203, var_sku#210, 8), ENSURE_REQUIREMENTS, [id=#727]

(11) Sort
Input [10]: [run_date#202, prod_id#203, prod_vendor#204, prod_type#205, var_id#206, var_price#207, var_name#208, var_public_title#209, var_sku#210, url#266]
Arguments: [prod_id#203 ASC NULLS FIRST, var_sku#210 ASC NULLS FIRST], false, 0

(12) Scan ExistingRDD
Output [5]: [prod_id#321, var_id#322, var_sku#323, var_public_title#324, var_index#325]
Arguments: [prod_id#321, var_id#322, var_sku#323, var_public_title#324, var_index#325], MapPartitionsRDD[137] at applySchemaToPythonRDD at NativeMethodAccessorImpl.java:0, ExistingRDD, UnknownPartitioning(0)

(13) Filter
Input [5]: [prod_id#321, var_id#322, var_sku#323, var_public_title#324, var_index#325]
Condition : (isnotnull(prod_id#321) AND isnotnull(var_sku#323))

(14) Project
Output [3]: [prod_id#321, var_sku#323, var_index#325]
Input [5]: [prod_id#321, var_id#322, var_sku#323, var_public_title#324, var_index#325]

(15) Exchange
Input [3]: [prod_id#321, var_sku#323, var_index#325]
Arguments: hashpartitioning(prod_id#321, var_sku#323, 8), ENSURE_REQUIREMENTS, [id=#728]

(16) Sort
Input [3]: [prod_id#321, var_sku#323, var_index#325]
Arguments: [prod_id#321 ASC NULLS FIRST, var_sku#323 ASC NULLS FIRST], false, 0

(17) SortMergeJoin
Left keys [2]: [prod_id#203, var_sku#210]
Right keys [2]: [prod_id#321, var_sku#323]
Join condition: None

(18) Project
Output [11]: [run_date#202, prod_id#203, prod_vendor#204, prod_type#205, var_id#206, var_price#207, var_name#208, var_public_title#209, var_sku#210, url#266, var_index#325]
Input [13]: [run_date#202, prod_id#203, prod_vendor#204, prod_type#205, var_id#206, var_price#207, var_name#208, var_public_title#209, var_sku#210, url#266, prod_id#321, var_sku#323, var_index#325]

(19) Exchange
Input [11]: [run_date#202, prod_id#203, prod_vendor#204, prod_type#205, var_id#206, var_price#207, var_name#208, var_public_title#209, var_sku#210, url#266, var_index#325]
Arguments: rangepartitioning(prod_id#203 ASC NULLS FIRST, var_sku#210 ASC NULLS FIRST, run_date#202 ASC NULLS FIRST, 8), ENSURE_REQUIREMENTS, [id=#734]

(20) Sort
Input [11]: [run_date#202, prod_id#203, prod_vendor#204, prod_type#205, var_id#206, var_price#207, var_name#208, var_public_title#209, var_sku#210, url#266, var_index#325]
Arguments: [prod_id#203 ASC NULLS FIRST, var_sku#210 ASC NULLS FIRST, run_date#202 ASC NULLS FIRST], true, 0

(21) AdaptiveSparkPlan
Output [11]: [run_date#202, prod_id#203, prod_vendor#204, prod_type#205, var_id#206, var_price#207, var_name#208, var_public_title#209, var_sku#210, url#266, var_index#325]
Arguments: isFinalPlan=false

生成的数据帧具有不正确的值(来自输入表中的其他行)和不匹配的字段:

  1. 对于错误的连接记录,run_date 字段的值为 var_sku
  2. prod_id 字段具有正确连接的 run_date 值
  3. prod_vendor 字段具有正确连接的部分字段的值
  4. prod_type 字段具有正确连接字段的 prod_id 值 等等....

这真是一团糟。谁能帮助我理解发生了什么事?谢谢!

我尝试了与 SparkSQL 和 Spark Join API 相同的连接 - 相同的结果。我没有使用“select *”,而是手动输入了每个列名称 - 相同的结果。在AWSglue中,我克隆了该作业,更改了Glue版本和spark版本。没有任何帮助。

-----更新------ 我还发现,如果我使用 pandas 而不是 Spark,列会位于错误的位置。

我还发现我的查询计划的项目步骤有问题:

(1) Scan ExistingRDD
Output [10]: [run_date#266, prod_id#267, prod_vendor#268, prod_type#269, var_id#270, var_price#271, var_name#272, var_public_title#273, var_sku#274, url#275]
Arguments: [run_date#266, prod_id#267, prod_vendor#268, prod_type#269, var_id#270, var_price#271, var_name#272, var_public_title#273, var_sku#274, url#275], MapPartitionsRDD[80] at map at DynamicFrame.scala:374, ExistingRDD, UnknownPartitioning(0)

(2) Project
Output [4]: [prod_id#267, var_id#270, var_sku#274, var_public_title#273]
Input [10]: [run_date#266, prod_id#267, prod_vendor#268, prod_type#269, var_id#270, var_price#271, var_name#272, var_public_title#273, var_sku#274, url#275]

pyspark apache-spark-sql aws-glue
1个回答
0
投票

仅供参考,此问题已通过删除“optimizePerformance”解决:csv 阅读器中的 True 设置:

print("从 S3 提取中提取数据") s3_extract_dyf=glueContext.create_dynamic_frame.from_options(format_options={ "quoteChar": '"', "withHeader": True, "separator": ",", "optimizePerformance": True, <<<<<<<<<< remove this line! }, connection_type="s3", format="csv", connection_options={"paths": ["s3://xxxxxxxx/"], "recurse": True}, transformation_ctx="s3_extract_dyf", )

© www.soinside.com 2019 - 2024. All rights reserved.