我正在使用Kafka + Spark集成,在其中我要发送案例类对象(网站)并映射到spark中。
case class Website(id: Int, name: String)
implicit val productSchema = Encoders.product[Website]
val website = Website(1,"lokesh")
EmbeddedKafka.publishToKafka(topic, website.toString)(config,new StringSerializer)
val df:Dataset[Website] = spark
.readStream
.format("kafka")
.option("subscribe", topic)
.option("kafka.bootstrap.servers", "localhost:1244")
.option("startingoffsets", "earliest")
.load()
.select("value")
.as[Website]
我得到错误
Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve '`id`' given input columns: [value];
tl; dr使用正确的序列化格式,例如JSON或Avro。
以下代码发出Website
案例类的文本表示形式。
EmbeddedKafka.publishToKafka(topic,website.toString)(config,new StringSerializer)
以下代码将文本表示为Array[Byte]
:
。select(“ value”)
因此,最好将值转换为字符串,然后简单地...解析出一个对象,例如.select($"value" cast "string")
。
这样,您最好发送website
对象的JSON表示形式,这样会使解析变得更加容易。您还可以使用逗号分隔的“序列化格式”,但是这将要求您的website
不包含任何带逗号的字段。