PySpark和Protobuf反序列化UDF问题

2024-06-26 14:09:22 发布

您现在位置:Python中文网/ 问答频道 /正文

我犯了这个错误

Can't pickle <class 'google.protobuf.pyext._message.CMessage'>: it's not found as google.protobuf.pyext._message.CMessage

当我尝试在PySpark中创建UDF时。显然,它使用CloudPickle来序列化命令,但是我知道protobuf消息包含C++实现,这意味着它不能被pickle

我试图找到一种可能覆盖CloudPickleSerializer的方法,但是,我没有找到一种方法

下面是我的示例代码:

from MyProject.Proto import MyProtoMessage
from google.protobuf.json_format import MessageToJson
import pyspark.sql.functions as F

def proto_deserialize(body):
  msg = MyProtoMessage()
  msg.ParseFromString(body)
  return MessageToJson(msg)

from_proto = F.udf(lambda s: proto_deserialize(s))

base.withColumn("content", from_proto(F.col("body")))

提前谢谢


Tags: 方法fromimportmessageasgooglebodymsg