Spark 3.0 无法将非空数据写入iceberg

问题描述 投票:0回答:1

我有一个 avro 文件,其中有一个名为 timeStamp 的字段,这是一个强制字段,没有任何默认值。这意味着没有机会将该字段设置为空。架构定义如下

    {"name": "timestamp","type": {"type": "long", "logicalType": "timestamp-millis"}}

我有一个冰山表,其中时间戳列是一个分区,其定义为 NOT NULL 列

timestamp TIMESTAMP NOT NULL)
USING iceberg
PARTITIONED BY (days(_timestamp))

当我使用 Spark 将数据写入该表时。即使我的源 avro 数据没有空值,它也会引发以下异常

Cannot write nullable values to non-null column 'timestamp'

我尝试了多种选择,例如

  1. 在 Spark 配置中添加 .config("spark.sql.iceberg.check-nullability", "false") -> 不起作用
  2. 尝试自定义模式映射以使时间戳在 Spark 内不可为空 -> 仍然无法工作 Spark 将其读取为可空

不知何故,使用所有这些选项 Spark 会将时间戳字段读取为可为空,这就是导致问题的原因。除了我尝试过的上述解决方案之外,如何让 Spark 理解它是一个不可为空的字段

我使用的是spark 3.3.0和Iceberg 1.2.0版本

这就是我使用分区覆盖将这些数据写入 Iceberg 的方式

    spark.read()
        .format("avro")
        .option("recursiveFileLookup", "true")
        .load(getS3aPath())
        .writeTo(getTableNameWithDB())
        .overwritePartitions();

提前致谢

apache-spark apache-spark-sql spark-streaming spark-structured-streaming apache-iceberg
1个回答
0
投票

我自己找到了答案;)发布它,以便它可以帮助其他人

此行为是设计使然。即使您在 avro 中将其定义为必填字段,Spark 也会将所有字段读取为 nullbale。你必须做到以下几点

模式推断和 RDD 转换:Spark 读取 Avro 数据并从文件中推断模式。当您使用 myStructType 将 DataFrame 转换为 RDD 并返回 DataFrame 时,您可以强制执行所需的架构,确保时间戳字段遵循您指定的可为空性规则。

代码如下所示

   StructType yourStucktType = getYourStucktType();
    Dataset<Row> avroDataSet = spark.read()
        .format("avro")
        .load(Path());
    Dataset<Row> convertedDataSet = spark.createDataFrame(avroDataSet.rdd(),
        yourStucktType); --> this does the magic :) 
    convertedDataSet.writeTo(getTableName())
        .overwritePartitions();
© www.soinside.com 2019 - 2024. All rights reserved.