我正在尝试将原始版本的 Spark 中的增量表与
合并if not DeltaTable.isDeltaTable(spark,delta_table_path):
df.write.format("delta").mode("overwrite").option("mergeSchema", "true").save(delta_table_path)
else:
target=DeltaTable.forPath(spark, delta_table_path)
matchOncondition=matchOnKey(key)
matchOncondition=matchOncondition+""" AND target.is_Current='Y'AND target.isDeletedInSource='N' """
target.alias("target").merge(df.filter(df.rowNum==1).alias("source"),matchOncondition)\
.whenMatchedUpdate( set={"is_Current":lit("N"),"end_date":lit(current_timestamp())} ).execute()
df.select('*').write.mode("Append").option("mergeSchema", "true").option("overwriteSchema", "true").format("delta").save(delta_table_path)
spark = SparkSession.builder \
.appName("Delta Lake Example") \
.master("local[*]") \
.config('spark.jars.packages','io.delta:delta-core_2.12:2.4.0')\
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.config("delta.enableTypeWidening", "true") \
.getOrCreate()
现在我的目标模式具有 double 类型的列之一,但源将其作为字符串,并且它给了我以下错误
Failed to merge fields 'customerNotificationId' and 'customerNotificationId'. Failed to merge incompatible data types DoubleType and StringType
我相信 mergeSchema 选项为 true 且 .config("delta.enableTypeWidening", "true") 为 true 时,它应该发展模式。我不想手动进化它。
pyspark 版本 - 3.4 和 delta-2.4
发展架构并不意味着您可以拥有不兼容的类型,只是可以添加新的可为空列,请参阅此处了解更多详细信息:https://stackoverflow.com/a/67114160/1028537并为该答案投票。