这就是我想做的
SparkDataframe->;groupby->;调用udf方法,该方法使用boto3在s3中创建一些文件
自由基法
# Pandas_UDF method
saveSNFile_schema = StructType([
StructField('cloc', StringType(), True), # chunk file location
StructField('aggloc', StringType(), True), # chunk file location
StructField('trackloc', StringType(), True), # chunk file location
StructField('sn',StringType(),True), # serial number
StructField('len',LongType(),True), # number of rows saved
])
@pandas_udf(saveSNFile_schema, PandasUDFType.GROUPED_MAP)
def saveSNFile(sndf):
"""
Save dataframe as a parquet file in S3
parameters:
sndf: pandas dataframe
return:
pandas dataframe
"""
fname = sndf.iloc[0]['field1']
sn = "%s" %(sndf.iloc[0]['tmp_sn'])
year = sndf.iloc[0]['partition_0']
month = sndf.iloc[0]['partition_1']
day = sndf.iloc[0]['partition_2']
# get the file name in Tier-3 to save the dataframe
chunkFileLocation, aggFileLocation, tradkingFileLocation = get_ChunkLocation(fname, sn, year, month, day)
csv_buffer = BytesIO()
# convert pandas dataframe to parquet format
sndf.to_parquet(csv_buffer, compression='gzip')
s3_resource = boto3.resource('s3')
# save the file to S3
s3_resource.Object("S3bucketName", chunkFileLocation).put(Body=csv_buffer.getvalue())
# create a tracking empty file
s3_resource.Object("S3bucketName", tradkingFileLocation).put()
# create empty file with SN as file name
s3_resource.Object("S3bucketName", "%s/aggregated_sn/%s" % (T3TrackingPrefix, sn)).put()
# delete the aggregated files since we have new files in chunks location
rval = s3_resource.Object("S3bucketName", aggFileLocation).delete()
# Return pandas dataframe with additional information as schema defined above
rdf = pd.DataFrame([[chunkFileLocation, aggFileLocation, tradkingFileLocation, sn, len(sndf)]], columns=[ 'cloc', 'aggloc', 'trackloc','sn', 'len'])
return rdf
调用udf方法
rdf = dataFromS3Df.groupby("tmp_sn").apply(saveSNFile)
print(rdf.show()) # <---- without this print method, udf function does not create any files in S3
使用上面的print(rdf.show())
方法可以按预期工作
如果没有print(rdf.show())
,udf函数不会向s3存储桶发送任何PUT
这对我来说真的很奇怪。我也和aws的支持人员谈过,他们没有任何线索
我知道pandas_udf方法正在工作节点中运行,因此我必须在函数中创建到s3的连接,但奇怪的是,即使在我的笔记本运行之后,直到我对数据帧(rdf
)执行某些操作,它也不会在s3中创建文件
rdf.collect()
解决了这个问题相关问题 更多 >
编程相关推荐