在aws glue中运行的pandas_udf函数在没有打印功能的情况下不会将对象放入s3

2024-05-17 23:35:13 发布

您现在位置:Python中文网/ 问答频道 /正文

这就是我想做的

SparkDataframe->;groupby->;调用udf方法,该方法使用boto3在s3中创建一些文件

自由基法

# Pandas_UDF method
saveSNFile_schema = StructType([
    StructField('cloc', StringType(), True), # chunk file location
    StructField('aggloc', StringType(), True), # chunk file location
    StructField('trackloc', StringType(), True), # chunk file location
    StructField('sn',StringType(),True), # serial number
    StructField('len',LongType(),True), # number of rows saved
])
@pandas_udf(saveSNFile_schema, PandasUDFType.GROUPED_MAP)
def saveSNFile(sndf):
    """
        Save dataframe as a parquet file in S3

    parameters:
        sndf: pandas dataframe 

    return:
        pandas dataframe
    """

    fname = sndf.iloc[0]['field1']
    sn = "%s" %(sndf.iloc[0]['tmp_sn'])
    year = sndf.iloc[0]['partition_0']
    month = sndf.iloc[0]['partition_1']
    day = sndf.iloc[0]['partition_2']

    # get the file name in Tier-3 to save the dataframe
    chunkFileLocation, aggFileLocation, tradkingFileLocation = get_ChunkLocation(fname, sn, year, month, day)

    csv_buffer = BytesIO()
    # convert pandas dataframe to parquet format
    sndf.to_parquet(csv_buffer, compression='gzip')
    s3_resource = boto3.resource('s3')

    # save the file to S3
    s3_resource.Object("S3bucketName", chunkFileLocation).put(Body=csv_buffer.getvalue())

    # create a tracking empty file
    s3_resource.Object("S3bucketName", tradkingFileLocation).put()

    # create empty file with SN as file name
    s3_resource.Object("S3bucketName", "%s/aggregated_sn/%s" % (T3TrackingPrefix, sn)).put()

    # delete the aggregated files since we have new files in chunks location 
    rval = s3_resource.Object("S3bucketName", aggFileLocation).delete()

    # Return pandas dataframe with additional information as schema defined above 
    rdf = pd.DataFrame([[chunkFileLocation, aggFileLocation, tradkingFileLocation, sn, len(sndf)]], columns=[ 'cloc', 'aggloc', 'trackloc','sn', 'len'])


    return rdf

调用udf方法

rdf = dataFromS3Df.groupby("tmp_sn").apply(saveSNFile)
print(rdf.show()) # <---- without this print method, udf function does not create any files in S3

使用上面的print(rdf.show())方法可以按预期工作

如果没有print(rdf.show()),udf函数不会向s3存储桶发送任何PUT

这对我来说真的很奇怪。我也和aws的支持人员谈过,他们没有任何线索

我知道pandas_udf方法正在工作节点中运行,因此我必须在函数中创建到s3的连接,但奇怪的是,即使在我的笔记本运行之后,直到我对数据帧(rdf)执行某些操作,它也不会在s3中创建文件


Tags: 方法truedataframepandass3rdfresourcefile