我正在使用 PySpark 并尝试使用 confluence-kafka 库的 Producer 将分区的 DataFrame 发送到 Kafka。但是,我在 Spark 执行器上遇到了 Kafka 生产者的序列化问题
这是我的代码:
broadcast_config = spark.sparkContext.broadcast((kafka_broker, kafka_topic))
def send_partition_to_kafka(partition):
kafka_broker, kafka_topic = broadcast_config.value
for row in partition:
producer = Producer(bootstrap_servers=kafka_broker,
value_serializer=lambda v: json.dumps(v).encode('utf-8'))
producer.send(kafka_topic, value=row.asDict())
producer.flush()
grouped_df.foreachPartition(send_partition_to_kafka)
我不断收到此错误:
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/opt/c2b_py_etl/spark-3.4.3-bin-hadoop3/python/lib/pyspark.zip/pyspark/worker.py", line 812, in main
func, profiler, deserializer, serializer = read_command(pickleSer, infile)
File "/opt/c2b_py_etl/spark-3.4.3-bin-hadoop3/python/lib/pyspark.zip/pyspark/worker.py", line 87, in read_command
command = serializer._read_with_length(file)
File "/opt/c2b_py_etl/spark-3.4.3-bin-hadoop3/python/lib/pyspark.zip/pyspark/serializers.py", line 174, in _read_with_length
return self.loads(obj)
File "/opt/c2b_py_etl/spark-3.4.3-bin-hadoop3/python/lib/pyspark.zip/pyspark/serializers.py", line 472, in loads
return cloudpickle.loads(obj, encoding=encoding)
AttributeError: type object 'Producer' has no attribute '__bool__'
尽管在每个分区内实例化生产者,错误仍然存在。如何解决这个序列化问题?
任何帮助或指示将不胜感激。谢谢!
我认为您在全局范围内导入类 Producer 到 Spark 尝试序列化它并将其发送给执行程序,我认为您应该在广播方法中使用导入,例如
应该期望该模块被添加到 Spark 会话依赖项并可以从执行器访问
broadcast_config = spark.sparkContext.broadcast((kafka_broker, kafka_topic))
def send_partition_to_kafka(partition):
from module import Producer
kafka_broker, kafka_topic = broadcast_config.value
for row in partition:
producer = Producer(bootstrap_servers=kafka_broker,
value_serializer=lambda v: json.dumps(v).encode('utf-8'))
producer.send(kafka_topic, value=row.asDict())
producer.flush()
grouped_df.foreachPartition(send_partition_to_kafka)