PySpark:如何从具有可变消息类型的 Kafka 消息反序列化原始有效负载

问题描述 投票:0回答:1

我正在尝试从包含具有不同 Proto 有效负载的消息的 Kafka 主题中读取数据。在 Kafka 消息中设置

messageName
key

但是当我尝试:

df = spark.readStream.format(constants.KAFKA_INPUT_FORMAT) \
        .options(**options) \
        .load()
df = df.selectExpr("CAST(key AS STRING)").alias('key')
df = df.select(from_protobuf('value', df.key, desc_file_path).alias('value'))

我收到

pyspark.errors.exceptions.base.PySparkTypeError: [NOT_ITERABLE] Column is not iterable
错误。

如何使用 Kafka 消息属性的

messageName
值动态设置
from_protobuf
函数的
key
参数?

apache-spark pyspark apache-kafka protocol-buffers streaming
1个回答
0
投票

要使用 Kafka 消息属性的键值动态设置 from_protobuf 函数的 messageName 参数,需要使用 UDF(用户定义函数)来处理基于键的反序列化。这是一个逐步解决方案:

  1. 定义UDF:创建UDF以根据密钥中指定的消息类型反序列化Proto负载。
  2. 从 Kafka 读取: 读取 Kafka 消息并提取键和值。
  3. 应用UDF:使用UDF根据key反序列化value。

这是示例代码:

from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.types import StringType, BinaryType
from google.protobuf import json_format
import my_protos_pb2  # Import your generated Proto classes

# Initialize Spark session
spark = SparkSession.builder.appName("example").getOrCreate()

# Define a UDF to deserialize the Proto payload
def deserialize_proto(key, value):
    if key == "MessageType1":
        message = my_protos_pb2.MessageType1()
    elif key == "MessageType2":
        message = my_protos_pb2.MessageType2()
    else:
        return None
    message.ParseFromString(value)
    return json_format.MessageToJson(message)

deserialize_proto_udf = udf(deserialize_proto, StringType())

# Read from Kafka
df = spark.readStream.format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "my_topic") \
    .load()

# Extract key and value
df = df.selectExpr("CAST(key AS STRING) as key", "value")

# Apply the UDF to deserialize the Proto payload
df = df.withColumn("deserialized_value", deserialize_proto_udf(col("key"), col("value")))

# Write the output to console (or any other sink)
query = df.writeStream \
    .outputMode("append") \
    .format("console") \
    .start()

query.awaitTermination()

确保将 my_protos_pb2 替换为生成的 Proto 类的实际模块名称

© www.soinside.com 2019 - 2024. All rights reserved.