PySpark groupby应用程序将对象另存为文件问题

2024-10-03 13:23:39 发布

您现在位置:Python中文网/ 问答频道 /正文

我在Windows电脑上运行PySpark 3.1,并在Jupyter笔记本上使用本地模式。我在Spark数据帧上称为“ApplyPandas”

下面的函数对输入数据帧应用一些数据转换,并训练一个SGBT模型。然后,它将经过训练的模型序列化为二进制文件,并将其作为对象保存到S3 bucket中。最后返回数据帧。我从最后一行中由两列组成的Spark数据帧调用此函数。我没有收到任何错误,返回的数据帧与输入的长度相同。返回每个组的数据

问题在于保存的模型对象。S3中只保存了两个组的对象,而每个组都应该有模型。没有丢失/错误的数据点会导致模型训练失败。(无论如何,我都会收到一个错误或警告。)到目前为止,我尝试的是:

  • 替换S3并保存到本地文件系统:结果相同
  • 将“pickle”替换为“joblib”和“BytesIO”:结果相同
  • 调用函数前重新分区:现在我为不同的组保存了更多的对象,但不是全部。[我是通过调用“val_large_df.coalesce(1.groupby('la…)在最后一行中)”来实现的。]

所以我怀疑这是关于并行性和分布的,但我想不出来。谢谢

def train_sgbt(pdf):      
       ##Some data transformations here##    
       #Train the model
       sgbt_mdl=GradientBoostingRegressor(--Params.--).fit(--Params.--)
       sgbt_mdl_b=pickle.dumps(sgbt_mdl) #Serialize
       #Initiate s3_client
       s3_client = boto3.client(--Params.--)
       #Put file in S3
       s3_client.put_object(Body=sgbt_mdl_b, Bucket='my-bucket-name', 
            Key="models/BT_"+str(pdf.latGroup_m[0])+"_"+str(pdf.lonGroup_m[0])+".mdl")    
       return pdf

dummy_df=val_large_df.groupby("latGroup_m","lonGroup_m").applyInPandas(train_sgbt, 
           schema="fcast_error double")
dummy_df.show()

Tags: 数据对象函数模型clientdfs3bucket
1条回答
网友
1楼 · 发布于 2024-10-03 13:23:39

Spark对dummy_df{a1}进行求值,因此train_sgbt将仅对完成Spark操作所需的组进行调用

这里的火花作用是show()。此操作仅打印前20行,因此仅对前20行中至少有一个元素的组调用train_sgbt。Spark可以评估更多的组,但不能保证

解决问题的一种方法是调用另一个操作,例如csv

相关问题 更多 >