我正在尝试从包含具有不同 Proto 有效负载的消息的 Kafka 主题中读取数据。在 Kafka 消息中设置
messageName
key
。
但是当我尝试:
df = spark.readStream.format(constants.KAFKA_INPUT_FORMAT) \
.options(**options) \
.load()
df = df.selectExpr("CAST(key AS STRING)").alias('key')
df = df.select(from_protobuf('value', df.key, desc_file_path).alias('value'))
我收到
pyspark.errors.exceptions.base.PySparkTypeError: [NOT_ITERABLE] Column is not iterable
错误。
如何使用 Kafka 消息属性的
messageName
值动态设置 from_protobuf
函数的 key
参数?
要使用 Kafka 消息属性的键值动态设置 from_protobuf 函数的 messageName 参数,需要使用 UDF(用户定义函数)来处理基于键的反序列化。这是一个逐步解决方案:
这是示例代码:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.types import StringType, BinaryType
from google.protobuf import json_format
import my_protos_pb2 # Import your generated Proto classes
# Initialize Spark session
spark = SparkSession.builder.appName("example").getOrCreate()
# Define a UDF to deserialize the Proto payload
def deserialize_proto(key, value):
if key == "MessageType1":
message = my_protos_pb2.MessageType1()
elif key == "MessageType2":
message = my_protos_pb2.MessageType2()
else:
return None
message.ParseFromString(value)
return json_format.MessageToJson(message)
deserialize_proto_udf = udf(deserialize_proto, StringType())
# Read from Kafka
df = spark.readStream.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "my_topic") \
.load()
# Extract key and value
df = df.selectExpr("CAST(key AS STRING) as key", "value")
# Apply the UDF to deserialize the Proto payload
df = df.withColumn("deserialized_value", deserialize_proto_udf(col("key"), col("value")))
# Write the output to console (or any other sink)
query = df.writeStream \
.outputMode("append") \
.format("console") \
.start()
query.awaitTermination()
确保将 my_protos_pb2 替换为生成的 Proto 类的实际模块名称