我有一个 avro 文件,其中有一个名为 timeStamp 的字段,它是必填字段,没有任何默认值。这意味着不可能将此字段设为空。模式定义如下...
我有一个名为 timeStamp 的 avro 文件,它是必填字段,没有任何默认值。这意味着不可能将此字段设为空。架构定义如下
{"name": "timestamp","type": {"type": "long", "logicalType": "timestamp-millis"}}
我有一张冰山表,其中这个时间戳列是一个分区,并且它被定义为 NOT NULL 列
timestamp TIMESTAMP NOT NULL)
USING iceberg
PARTITIONED BY (days(_timestamp))
当我使用 spark 将数据写入此表时,即使我的源 avro 数据没有空值,它也会引发以下异常
Cannot write nullable values to non-null column 'timestamp'
我尝试了多种选择,例如
不知何故,所有这些选项都会导致 spark 将时间戳字段读取为可空,从而导致问题。除了我尝试过的上述解决方案之外,如何让 spark 理解它是一个不可空字段
我使用的是 spark 3.3.0 和 Iceberg 1.2.0 版本
这就是我使用分区覆盖将数据写入冰山的方法
spark.read()
.format("avro")
.option("recursiveFileLookup", "true")
.load(getS3aPath())
.writeTo(getTableNameWithDB())
.overwritePartitions();
提前致谢
我自己找到了答案;)发布它以便可以帮助其他人
此行为是设计使然。即使您在 avro 中将其定义为必填字段,Spark 也会将所有字段读取为空值。您必须执行以下操作
模式推断和 RDD 转换:Spark 读取 Avro 数据并从文件中推断出模式。当您使用 myStructType 将 DataFrame 转换为 RDD 并转换回 DataFrame 时,您会强制执行所需的模式,确保时间戳字段遵循您指定的可空性规则。
代码如下所示
StructType yourStucktType = getYourStucktType();
Dataset<Row> avroDataSet = spark.read()
.format("avro")
.load(Path());
Dataset<Row> convertedDataSet = spark.createDataFrame(avroDataSet.rdd(),
yourStucktType); --> this does the magic :)
convertedDataSet.writeTo(getTableName())
.overwritePartitions();