我有一个 avro 文件,其中有一个名为 timeStamp 的字段,这是一个强制字段,没有任何默认值。这意味着没有机会将该字段设置为空。架构定义如下
{"name": "timestamp","type": {"type": "long", "logicalType": "timestamp-millis"}}
我有一个冰山表,其中时间戳列是一个分区,其定义为 NOT NULL 列
timestamp TIMESTAMP NOT NULL)
USING iceberg
PARTITIONED BY (days(_timestamp))
当我使用 Spark 将数据写入该表时。即使我的源 avro 数据没有空值,它也会引发以下异常
Cannot write nullable values to non-null column 'timestamp'
我尝试了多种选择,例如
不知何故,使用所有这些选项 Spark 会将时间戳字段读取为可为空,这就是导致问题的原因。除了我尝试过的上述解决方案之外,如何让 Spark 理解它是一个不可为空的字段
我使用的是spark 3.3.0和Iceberg 1.2.0版本
这就是我使用分区覆盖将这些数据写入 Iceberg 的方式
spark.read()
.format("avro")
.option("recursiveFileLookup", "true")
.load(getS3aPath())
.writeTo(getTableNameWithDB())
.overwritePartitions();
提前致谢
我自己找到了答案;)发布它,以便它可以帮助其他人
此行为是设计使然。即使您在 avro 中将其定义为必填字段,Spark 也会将所有字段读取为 nullbale。你必须做到以下几点
模式推断和 RDD 转换:Spark 读取 Avro 数据并从文件中推断模式。当您使用 myStructType 将 DataFrame 转换为 RDD 并返回 DataFrame 时,您可以强制执行所需的架构,确保时间戳字段遵循您指定的可为空性规则。
代码如下所示
StructType yourStucktType = getYourStucktType();
Dataset<Row> avroDataSet = spark.read()
.format("avro")
.load(Path());
Dataset<Row> convertedDataSet = spark.createDataFrame(avroDataSet.rdd(),
yourStucktType); --> this does the magic :)
convertedDataSet.writeTo(getTableName())
.overwritePartitions();