我正在使用来自 kafka 主题的 pyspark readstream 以及一系列记录,例如 [ {}, {}, {} ]。 我能够使用 from_avro( F.col('value'), avro_schema ) 解析单个记录。 但是,该主题的实际数据是一组记录,我尝试在我的 avro 架构周围添加 [] 但不起作用。
单条记录的avro模式是
{
"type": "record",
"name": "data",
"fields": [
{
"name": "x",
"type": [
"double",
"null"
]
},
{
"name": "y",
"type": [
"double",
"null"
]
}
]
}
但我需要的是一个 avro 模式,它可以解析这个记录的数组 [{},{}]
我知道我可以使用 pandas UDF,但我只想知道是否有本地方法(使用 spark API)来做到这一点。
如果你的 Kafka 负载是一个数组,你的 Avro schema 需要像这样开始
{
"type": "array",
"items": {
"type": "record",
"name": "data",
"fields": [
...
您不能简单地在记录类型周围添加
[]
。
然后,Spark 应该为反序列化的值列返回一个 Struct 类型的数组
还值得一提 -
from_avro
,默认情况下,如果您的 Kafka 数据是使用 Confluent Schema Registry 生成的,则不起作用...