从Kafka读取时如何在Spark中对Case类对象进行反序列化?

问题描述 投票:0回答:1

我正在使用Kafka + Spark集成,在其中我要发送案例类对象(网站)并映射到spark中。

    case class Website(id: Int, name: String)

    implicit val productSchema = Encoders.product[Website]
    val website = Website(1,"lokesh")
    EmbeddedKafka.publishToKafka(topic, website.toString)(config,new StringSerializer)

    val df:Dataset[Website] = spark
          .readStream
          .format("kafka")
          .option("subscribe", topic)
          .option("kafka.bootstrap.servers", "localhost:1244")
          .option("startingoffsets", "earliest")
          .load()
          .select("value") 
          .as[Website]

我得到错误

Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve '`id`' given input columns: [value];
apache-spark apache-kafka deserialization spark-structured-streaming encoder
1个回答
0
投票

tl; dr使用正确的序列化格式,例如JSON或Avro。


以下代码发出Website案例类的文本表示形式。

EmbeddedKafka.publishToKafka(topic,website.toString)(config,new StringSerializer)

以下代码将文本表示为Array[Byte]

。select(“ value”)

因此,最好将值转换为字符串,然后简单地...解析出一个对象,例如.select($"value" cast "string")

这样,您最好发送website对象的JSON表示形式,这样会使解析变得更加容易。您还可以使用逗号分隔的“序列化格式”,但是这将要求您的website不包含任何带逗号的字段。

© www.soinside.com 2019 - 2024. All rights reserved.