在 Spark 中跳过格式错误的 Avro 记录

问题描述 投票:0回答:1

我正在使用 Spark 2.4.7 从 Kafka 提取一些 Avro 记录。不幸的是,有时可能会出现格式错误的数据,这会导致我的工作失败。我正在使用 Scala API。

这是一些示例代码:

val data = spark.readStream.format("kafka")
  .options(config.kafkaOptions ++ Map("subscribe" -> inputTopic))
  .option("mode", "PERMISSIVE")
  .load()
---
val decodedColumn = org.apache.spark.sql.avro.from_avro($"value", schema) as "value"
---
val q = data
          .writeStream
          .option("checkpointLocation", checkpointLocation)
          .trigger(streaming.Trigger.ProcessingTime(outputTriggerTime))
          .foreachBatch { (df, id) => {
            val dataFrame : DataFrame = df
              .filter(
                row => {
                  if (row.schema.equals(configuredStructTypeSchema))
                    true
                  else
                    false
                }
              )
              .select(decodedColumn)
            val outputData = spark.createDataFrame(dataFrame.rdd, dataFrame.schema)
              .select("value.*")
            writer.write(outputData)
          }}.start    

我添加了过滤子句来尝试处理不良数据。对于我的测试,我发送一些乱码文本作为输入,而不是正确的 avro 记录。无论有没有过滤子句,我都会收到以下错误 -

org.apache.spark.SparkException:任务不可序列化 在 org.apache.spark.util.ClosureCleaner$.ensureSerialized(ClosureCleaner.scala:416)

...

引起:java.io.NotSerializedException:org.apache.spark.sql.Column 序列化堆栈: - 对象不可序列化(类:org.apache.spark.sql.Column,值:from_avro(value) AS

value

我如何确保我的代码仅处理符合配置架构的传入 Avro 数据,并丢弃其他所有内容?

apache-spark apache-spark-sql spark-avro
1个回答
0
投票

从文档中您的过滤器当前不太可能工作,因为数据框行将始终具有以下架构:

Column  Type
key (optional)  string or binary
value (required)    string or binary
headers (optional)  array
topic (*optional)   string
partition (optional)    int

您正在寻找的是

value
列的 avro 模式,女巫很可能是存储为二进制的 avro 消息。一种方法是尝试/捕获在过滤器中使用预期的 avro 架构对值进行反序列化。

© www.soinside.com 2019 - 2024. All rights reserved.