我收到这个错误
Can't pickle <class 'google.protobuf.pyext._message.CMessage'>: it's not found as google.protobuf.pyext._message.CMessage
当我尝试在 PySpark 中创建 UDF 时。显然,它使用 CloudPickle 来序列化命令,但是,我知道 protobuf 消息包含
C++
实现,这意味着它不能被腌制。
我已经尝试找到一种方法来覆盖
CloudPickleSerializer
,但是,我找不到方法。
这是我的示例代码:
from MyProject.Proto import MyProtoMessage
from google.protobuf.json_format import MessageToJson
import pyspark.sql.functions as F
def proto_deserialize(body):
msg = MyProtoMessage()
msg.ParseFromString(body)
return MessageToJson(msg)
from_proto = F.udf(lambda s: proto_deserialize(s))
base.withColumn("content", from_proto(F.col("body")))
提前致谢。
Spark 正在尝试序列化您的
MyProtoMessage
和 MessageToJson
并在 Spark 运行 UDF 时将其广播给每个执行者。但是,由于 C++ 实现的 protobuff 消息,驱动程序中的 python 无法 pickle 这个对象。
要解决此问题,您必须遵循 Spark Python 包管理指南 (https://spark.apache.org/docs/latest/api/python/user_guide/python_packaging.html#python-package-management):
当您想在集群上运行 PySpark 应用程序时,例如 YARN、Kubernetes、Mesos等,你需要确保你的代码 并且所有使用的库都可以在执行者上使用。
PySpark 允许上传 Python 文件 (.py)、压缩的 Python 包 (.zip) 和 Egg 文件 (.egg) 到 执行人由以下之一:
设置配置设置 spark.submit.pyFiles
在 Spark 脚本中设置 --py-files 选项
在应用中直接调用pyspark.SparkContext.addPyFile()
这是发送额外自定义 Python 代码的直接方法 到集群。您可以只添加单个文件或整个压缩 打包并上传。使用 pyspark.SparkContext.addPyFile() 即使在开始工作后也允许上传代码。
但是,它不允许添加构建为 Wheels 和 因此不允许包含与本机代码的依赖项。
你应该:
from google.protobuf.json_format import MessageToJson
import pyspark.sql.functions as F
def proto_deserialize(body):
from Proto import MyProtoMessage
msg = MyProtoMessage()
msg.ParseFromString(body)
return MessageToJson(msg)
from_proto = F.udf(lambda s: proto_deserialize(s))
base.withColumn("content", from_proto(F.col("body")))
您需要更新您的代码并将导入的模块放入您的 UDF 中。因此,驱动程序中没有对象的序列化。此外,当您提交您的 Spark 作业时,请确保您已添加
--py-files path/to/your/Proto.py
。如果您有多个依赖项,可以按照使用 .zip 的说明进行操作。