我正在使用 Spark 2.4.7 从 Kafka 提取一些 Avro 记录。不幸的是,有时可能会出现格式错误的数据,这会导致我的工作失败。我正在使用 Scala API。
这是一些示例代码:
val data = spark.readStream.format("kafka")
.options(config.kafkaOptions ++ Map("subscribe" -> inputTopic))
.option("mode", "PERMISSIVE")
.load()
---
val decodedColumn = org.apache.spark.sql.avro.from_avro($"value", schema) as "value"
---
val q = data
.writeStream
.option("checkpointLocation", checkpointLocation)
.trigger(streaming.Trigger.ProcessingTime(outputTriggerTime))
.foreachBatch { (df, id) => {
val dataFrame : DataFrame = df
.filter(
row => {
if (row.schema.equals(configuredStructTypeSchema))
true
else
false
}
)
.select(decodedColumn)
val outputData = spark.createDataFrame(dataFrame.rdd, dataFrame.schema)
.select("value.*")
writer.write(outputData)
}}.start
我添加了过滤子句来尝试处理不良数据。对于我的测试,我发送一些乱码文本作为输入,而不是正确的 avro 记录。无论有没有过滤子句,我都会收到以下错误 -
org.apache.spark.SparkException:任务不可序列化 在 org.apache.spark.util.ClosureCleaner$.ensureSerialized(ClosureCleaner.scala:416)
...
引起:java.io.NotSerializedException:org.apache.spark.sql.Column 序列化堆栈: - 对象不可序列化(类:org.apache.spark.sql.Column,值:from_avro(value) AS
)value
我如何确保我的代码仅处理符合配置架构的传入 Avro 数据,并丢弃其他所有内容?
从文档中您的过滤器当前不太可能工作,因为数据框行将始终具有以下架构:
Column Type
key (optional) string or binary
value (required) string or binary
headers (optional) array
topic (*optional) string
partition (optional) int
您正在寻找的是
value
列的 avro 模式,女巫很可能是存储为二进制的 avro 消息。一种方法是尝试/捕获在过滤器中使用预期的 avro 架构对值进行反序列化。