pyspark:dataframe上的avro反序列化函数按预期失败

2024-05-18 08:44:08 发布

您现在位置:Python中文网/ 问答频道 /正文

avro反序列化函数需要列表中的字节,但在数据帧上应用时失败。仅适用于collect(),但驱动程序/主程序的内存不足

在python 3.6.8中使用spark版本2.3.3 正在从包含avro序列化消息的配置单元表select创建数据帧。 然后我使用https://github.com/Landoop/python-serializers.git,因为它支持在python3.x上使用合流模式注册表进行avro反序列化

尝试对数据帧应用反序列化函数,但失败。 只有当我数据框收集(),并使用for循环反序列化每个条目,或者在将df转换为rdd.地图并反序列化每一行。 这两种情况都只能在test中工作,在prod中,它要么是OOM,要么是永远在运行在5node 30GB、8cpu服务器上的10GB配置单元数据上运行。你知道吗

spark = SparkSession \
    .builder \
....
    .enableHiveSupport() \
    .getOrCreate()

df = spark.sql("SELECT * FROM table1")
unbase_df = df.select(unbase64(df.mycolumn1))

client = SchemaRegistryClient(url='1.2.3.4:1234')
serializer = MessageSerializer(client)

##attempt 1##FAILS##
decode_df = df.select(serializer.decode_message('mycolumn1'))
###->ERROR -> 
##attempt 2##FAILS##
decode_df_2 = df.select(serializer.decode_message(b'mycolumn1'))

##attempt 3##WORKS BUT OOM with high volume on master(drivermanager)##
unbase_collect = unbase_df.collect()
decode_list = [serializer.decode_message(msg.mycolumn1) for msg in unbase_collect]

##attempt 4##WORKS BUT RUNS FOR EVER##
def avrodecoder(row):
    decoded_row = serializer.decode_message(row['mycolumn1'])
    return decoded_row

decode_rdd = unbase_df.select("*").rdd.map(avrodecoder)

## After #3 or #4 works I convert back to dataframe with schema
schema = StructType([
    StructField("1column",  StructType([
.......
    StructField("ncolumn", StringType()])

decode_df = spark.createDataFrame(decode_rdd,schema)

尝试1时的错误消息

in decode_message(self, message)
    185             raise SerializerError("message is too small to decode")
    186
--> 187         with ContextBytesIO(message) as payload:
    188             magic, schema_id = struct.unpack('>bI', payload.read(5))
    189             if magic != MAGIC_BYTE:

TypeError: a bytes-like object is required, not 'str'```

Error message in case of #attempt 2
```.....python3.6/site-packages/datamountaineer/schemaregistry/serializers/MessageSerializer.py
in decode_message(self, message)
    188             magic, schema_id = struct.unpack('>bI', payload.read(5))
    189             if magic != MAGIC_BYTE:
--> 190                 raise SerializerError("message does not start with magic byte")
    191             decoder_func = self._get_decoder_func(schema_id, payload)
    192             return decoder_func(payload)

SerializerError: the message does not start with a magic byte ```
  1. 如何在数据帧上直接通过合流模式注册表反序列化avro
  2. 我如何确保所有的转换只在worker/executors上执行
  3. 我如何才能使它足够高效地工作,而不必为<;10GB的数据OOM或运行5,6个多小时
  4. 我不明白为什么“纱线挂起内存”图显示,在这两种工作情况下,它都会上升到7+TB甚至更高

Tags: 数据messagedf序列化schemawithmagicselect
1条回答
网友
1楼 · 发布于 2024-05-18 08:44:08

Column上应用普通Python函数之前,必须将其转换为用户定义的函数(UDF):

from pyspark.sql.functions import udf

@udf(decoded_row_schema)
def avrodecoder(row):
    decoded_row = serializer.decode_message(row['mycolumn1'])
    return decoded_row

其中decoded_row_schema描述返回对象的形状。你知道吗

但是,如果您使用的是当前的(>;=2.4.0)版本,则可能根本不需要这样做-Pyspark 2.4.0, read avro from kafka with read stream - Python

相关问题 更多 >