vanilla Spark 中 delta 表的模式演变

问题描述 投票:0回答:1

我正在尝试将原始版本的 Spark 中的增量表与

合并
if not DeltaTable.isDeltaTable(spark,delta_table_path):
        df.write.format("delta").mode("overwrite").option("mergeSchema", "true").save(delta_table_path)
else:
        target=DeltaTable.forPath(spark, delta_table_path)
        matchOncondition=matchOnKey(key)
        matchOncondition=matchOncondition+""" AND target.is_Current='Y'AND target.isDeletedInSource='N' """
        
        target.alias("target").merge(df.filter(df.rowNum==1).alias("source"),matchOncondition)\
        .whenMatchedUpdate( set={"is_Current":lit("N"),"end_date":lit(current_timestamp())} ).execute()
        df.select('*').write.mode("Append").option("mergeSchema", "true").option("overwriteSchema", "true").format("delta").save(delta_table_path)

spark = SparkSession.builder \
    .appName("Delta Lake Example") \
    .master("local[*]") \
    .config('spark.jars.packages','io.delta:delta-core_2.12:2.4.0')\
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("delta.enableTypeWidening", "true") \
    .getOrCreate()

现在我的目标模式具有 double 类型的列之一,但源将其作为字符串,并且它给了我以下错误

Failed to merge fields 'customerNotificationId' and 'customerNotificationId'. Failed to merge incompatible data types DoubleType and StringType

我相信 mergeSchema 选项为 true 且 .config("delta.enableTypeWidening", "true") 为 true 时,它应该发展模式。我不想手动进化它。

pyspark 版本 - 3.4 和 delta-2.4

apache-spark pyspark delta-lake delta
1个回答
0
投票

发展架构并不意味着您可以拥有不兼容的类型,只是可以添加新的可为空列,请参阅此处了解更多详细信息:https://stackoverflow.com/a/67114160/1028537并为该答案投票。

© www.soinside.com 2019 - 2024. All rights reserved.