我有一个 AWS Glue 作业,它使用 Spark SQL 连接两个数据帧。该作业连续 6 个月每周正确运行,没有出现任何问题,然后连接突然开始交换结果数据集中的值。两个数据帧的架构没有改变。在每周运行期间,过去 6 个月的结果已交换(始终按相同顺序)。
我的 Glue 作业中所有 7 个不同的 Spark 连接都发生这种情况。
仅使用 SparkSQL“select * from df_view”不会混淆列。 Spark 使用 SparkSQL 和使用 Spark API 的 .join 方法混合连接结果。
查看查询计划,交换似乎发生在第 7 步 HashAggregate 中。有人知道为什么会发生这种情况以及如何解决吗?
counts_df.createOrReplaceTempView("counts_vw")
logger.info(counts_df.printSchema())
renamed_df2.createOrReplaceTempView("tmp_vw")
logger.info(renamed_df2.printSchema())
#pull in the var_index field
spark_df = spark.sql("""select tmp_vw.*, counts_vw.var_index
from tmp_vw, counts_vw
where tmp_vw.prod_id = counts_vw.prod_id
and tmp_vw.var_sku = counts_vw.var_sku
order by prod_id, var_sku, run_date
""")
logger.info(spark_df.explain('FORMATTED'))
输出:
root
|-- prod_id: string (nullable = true)
|-- var_id: string (nullable = true)
|-- var_sku: string (nullable = true)
|-- var_public_title: string (nullable = true)
|-- var_index: string (nullable = true)
root
|-- run_date: string (nullable = true)
|-- prod_id: string (nullable = true)
|-- prod_vendor: string (nullable = true)
|-- prod_type: string (nullable = true)
|-- var_id: string (nullable = true)
|-- var_price: string (nullable = true)
|-- var_name: string (nullable = true)
|-- var_public_title: string (nullable = true)
|-- var_sku: string (nullable = true)
|-- url: string (nullable = true)
== Physical Plan ==
AdaptiveSparkPlan (21)
+- Sort (20)
+- Exchange (19)
+- Project (18)
+- SortMergeJoin Inner (17)
:- Sort (11)
: +- Exchange (10)
: +- HashAggregate (9)
: +- Exchange (8)
: +- HashAggregate (7)
: +- Union (6)
: :- Filter (2)
: : +- Scan ExistingRDD (1)
: +- Project (5)
: +- Filter (4)
: +- Scan ExistingRDD (3)
+- Sort (16)
+- Exchange (15)
+- Project (14)
+- Filter (13)
+- Scan ExistingRDD (12)
(1) Scan ExistingRDD
Output [10]: [run_date#202, prod_id#203, prod_vendor#204, prod_type#205, var_id#206, var_price#207, var_name#208, var_public_title#209, var_sku#210, url#211]
Arguments: [run_date#202, prod_id#203, prod_vendor#204, prod_(2) Filter
Input [10]: [run_date#202, prod_id#203, prod_vendor#204, prod_type#205, var_id#206, var_price#207, var_name#208, var_public_title#209, var_sku#210, url#211]
Condition : (isnotnull(prod_id#203) AND isnotnull(var_sku#210))
(3) Scan ExistingRDD
Output [10]: [run_date#222, prod_id#223L, prod_vendor#224, prod_type#225, var_id#226L, var_price#227, var_name#228, var_public_title#229, var_sku#230, url#231]
Arguments: [run_date#222, prod_id#223L, prod_vendor#224, prod_type#225, var_id#226L, var_price#227, var_name#228, var_public_title#229, var_sku#230, url#231], MapPartitionsRDD[78] at map at DynamicFrame.scala:384, ExistingRDD, UnknownPartitioning(0)
(4) Filter
Input [10]: [run_date#222, prod_id#223L, prod_vendor#224, prod_type#225, var_id#226L, var_price#227, var_name#228, var_public_title#229, var_sku#230, url#231]
Condition : (isnotnull(cast(prod_id#223L as string)) AND isnotnull(var_sku#230))
(5) Project
Output [10]: [run_date#222, cast(prod_id#223L as string) AS prod_id#253, prod_vendor#224, prod_type#225, cast(var_id#226L as string) AS var_id#254, cast(var_price#227 as string) AS var_price#255, var_name#228, var_public_title#229, var_sku#230, substring(url#231, 44, 200) AS url#242]
Input [10]: [run_date#222, prod_id#223L, prod_vendor#224, prod_type#225, var_id#226L, var_price#227, var_name#228, var_public_title#229, var_sku#230, url#231]
(6) Union
(7) HashAggregate
Input [10]: [run_date#202, prod_id#203, prod_vendor#204, prod_type#205, var_id#206, var_price#207, var_name#208, var_public_title#209, var_sku#210, url#211]
Keys [10]: [prod_id#203, var_public_title#209, prod_vendor#204, var_sku#210, url#211, run_date#202, var_id#206, prod_type#205, var_name#208, var_price#207]
Functions: []
Aggregate Attributes: []
Results [10]: [prod_id#203, var_public_title#209, prod_vendor#204, var_sku#210, url#211, run_date#202, var_id#206, prod_type#205, var_name#208, var_price#207]
(8) Exchange
Input [10]: [prod_id#203, var_public_title#209, prod_vendor#204, var_sku#210, url#211, run_date#202, var_id#206, prod_type#205, var_name#208, var_price#207]
Arguments: hashpartitioning(prod_id#203, var_public_title#209, prod_vendor#204, var_sku#210, url#211, run_date#202, var_id#206, prod_type#205, var_name#208, var_price#207, 8), ENSURE_REQUIREMENTS, [id=#723]
(9) HashAggregate
Input [10]: [prod_id#203, var_public_title#209, prod_vendor#204, var_sku#210, url#211, run_date#202, var_id#206, prod_type#205, var_name#208, var_price#207]
Keys [10]: [prod_id#203, var_public_title#209, prod_vendor#204, var_sku#210, url#211, run_date#202, var_id#206, prod_type#205, var_name#208, var_price#207]
Functions: []
Aggregate Attributes: []
Results [10]: [run_date#202, prod_id#203, prod_vendor#204, prod_type#205, var_id#206, var_price#207, var_name#208, var_public_title#209, var_sku#210, translate(url#211, ?,, ) AS url#266]
(10) Exchange
Input [10]: [run_date#202, prod_id#203, prod_vendor#204, prod_type#205, var_id#206, var_price#207, var_name#208, var_public_title#209, var_sku#210, url#266]
Arguments: hashpartitioning(prod_id#203, var_sku#210, 8), ENSURE_REQUIREMENTS, [id=#727]
(11) Sort
Input [10]: [run_date#202, prod_id#203, prod_vendor#204, prod_type#205, var_id#206, var_price#207, var_name#208, var_public_title#209, var_sku#210, url#266]
Arguments: [prod_id#203 ASC NULLS FIRST, var_sku#210 ASC NULLS FIRST], false, 0
(12) Scan ExistingRDD
Output [5]: [prod_id#321, var_id#322, var_sku#323, var_public_title#324, var_index#325]
Arguments: [prod_id#321, var_id#322, var_sku#323, var_public_title#324, var_index#325], MapPartitionsRDD[137] at applySchemaToPythonRDD at NativeMethodAccessorImpl.java:0, ExistingRDD, UnknownPartitioning(0)
(13) Filter
Input [5]: [prod_id#321, var_id#322, var_sku#323, var_public_title#324, var_index#325]
Condition : (isnotnull(prod_id#321) AND isnotnull(var_sku#323))
(14) Project
Output [3]: [prod_id#321, var_sku#323, var_index#325]
Input [5]: [prod_id#321, var_id#322, var_sku#323, var_public_title#324, var_index#325]
(15) Exchange
Input [3]: [prod_id#321, var_sku#323, var_index#325]
Arguments: hashpartitioning(prod_id#321, var_sku#323, 8), ENSURE_REQUIREMENTS, [id=#728]
(16) Sort
Input [3]: [prod_id#321, var_sku#323, var_index#325]
Arguments: [prod_id#321 ASC NULLS FIRST, var_sku#323 ASC NULLS FIRST], false, 0
(17) SortMergeJoin
Left keys [2]: [prod_id#203, var_sku#210]
Right keys [2]: [prod_id#321, var_sku#323]
Join condition: None
(18) Project
Output [11]: [run_date#202, prod_id#203, prod_vendor#204, prod_type#205, var_id#206, var_price#207, var_name#208, var_public_title#209, var_sku#210, url#266, var_index#325]
Input [13]: [run_date#202, prod_id#203, prod_vendor#204, prod_type#205, var_id#206, var_price#207, var_name#208, var_public_title#209, var_sku#210, url#266, prod_id#321, var_sku#323, var_index#325]
(19) Exchange
Input [11]: [run_date#202, prod_id#203, prod_vendor#204, prod_type#205, var_id#206, var_price#207, var_name#208, var_public_title#209, var_sku#210, url#266, var_index#325]
Arguments: rangepartitioning(prod_id#203 ASC NULLS FIRST, var_sku#210 ASC NULLS FIRST, run_date#202 ASC NULLS FIRST, 8), ENSURE_REQUIREMENTS, [id=#734]
(20) Sort
Input [11]: [run_date#202, prod_id#203, prod_vendor#204, prod_type#205, var_id#206, var_price#207, var_name#208, var_public_title#209, var_sku#210, url#266, var_index#325]
Arguments: [prod_id#203 ASC NULLS FIRST, var_sku#210 ASC NULLS FIRST, run_date#202 ASC NULLS FIRST], true, 0
(21) AdaptiveSparkPlan
Output [11]: [run_date#202, prod_id#203, prod_vendor#204, prod_type#205, var_id#206, var_price#207, var_name#208, var_public_title#209, var_sku#210, url#266, var_index#325]
Arguments: isFinalPlan=false
生成的数据帧具有不正确的值(来自输入表中的其他行)和不匹配的字段:
这真是一团糟。谁能帮助我理解发生了什么事?谢谢!
我尝试了与 SparkSQL 和 Spark Join API 相同的连接 - 相同的结果。我没有使用“select *”,而是手动输入了每个列名称 - 相同的结果。在AWSglue中,我克隆了该作业,更改了Glue版本和spark版本。没有任何帮助。
-----更新------ 我还发现,如果我使用 pandas 而不是 Spark,列会位于错误的位置。
我还发现我的查询计划的项目步骤有问题:
(1) Scan ExistingRDD
Output [10]: [run_date#266, prod_id#267, prod_vendor#268, prod_type#269, var_id#270, var_price#271, var_name#272, var_public_title#273, var_sku#274, url#275]
Arguments: [run_date#266, prod_id#267, prod_vendor#268, prod_type#269, var_id#270, var_price#271, var_name#272, var_public_title#273, var_sku#274, url#275], MapPartitionsRDD[80] at map at DynamicFrame.scala:374, ExistingRDD, UnknownPartitioning(0)
(2) Project
Output [4]: [prod_id#267, var_id#270, var_sku#274, var_public_title#273]
Input [10]: [run_date#266, prod_id#267, prod_vendor#268, prod_type#269, var_id#270, var_price#271, var_name#272, var_public_title#273, var_sku#274, url#275]
仅供参考,此问题已通过删除“optimizePerformance”解决:csv 阅读器中的 True 设置:
print("从 S3 提取中提取数据") s3_extract_dyf=glueContext.create_dynamic_frame.from_options(format_options={ "quoteChar": '"', "withHeader": True, "separator": ",", "optimizePerformance": True, <<<<<<<<<< remove this line! }, connection_type="s3", format="csv", connection_options={"paths": ["s3://xxxxxxxx/"], "recurse": True}, transformation_ctx="s3_extract_dyf", )