将Python数据管道转换为PySpark管道

2024-10-02 02:39:50 发布

您现在位置:Python中文网/ 问答频道 /正文

我最近编写了一个python脚本(AWS Glue job)来处理来自S3的数百个JSON文件,并通过应用连接和过滤器等转换将它们转换为拼花,然后写回S3并处理一些异常

但在生产系统中,我们预计将收到近1000000个文件(100万个文件)

需要帮助将下面的python代码转换为PySpark代码。如果其他人有类似的场景,这可能对他们有用

import json
import boto3
import pandas as pd
import awswrangler as wr
import io

#read json file and load into dataframe/parquet-file

s3 = boto3.client('s3')
s3_resource = boto3.resource('s3')
my_bucket = s3_resource.Bucket('my-bucket')
#provide absolute prefix with "/" to avoid reading other folders with similar prefix
for key in s3.list_objects(Bucket='my-bucket',Prefix='lob1/')['Contents']:
    key_value=key['Key']
    tablename=key_value[key_value.find('/')+1:-5]
    response = s3.get_object(Bucket='my-bucket', Key=key_value)
    # check key is a file and not empty
    if len(tablename)>1 and response['ContentLength']!=0:
        s3_data = io.BytesIO(response.get('Body').read())
        data=json.load(s3_data)
        docdf = pd.DataFrame.from_dict(data, orient='index')
        docdf = docdf.transpose()
        docdf['keycol']='1'
        # read from subdoc array from docdf and convert to dataframe
        if 'subdocs' in docdf.columns:
            subdocdf = pd.DataFrame.from_dict(docdf['subdocs'][0])
            subdocdf['keycol']='1'
            alldocsdef = pd.merge(docdf, subdocdf, on='keycol')
            alldocsdef=alldocsdef.drop(['subdocs'],axis=1)
        else:
            subdocdf=[]
            alldocsdef=docdf
        # convert dtype to string for all columns
        for col in alldocsdef.columns:
            alldocsdef[col] = alldocsdef[col].astype('string')
        # write to s3 file if dataframe is not empty
        if len(alldocsdef.index)!=0:
            wr.s3.to_parquet(df=alldocsdef,path="s3://my-dw-bucket/dataextract/lob1"+"/"+tablename+".parquet")
            print('written_to_parquet')
        else:
            print("skipped all")
            continue
        current_file='s3://my-bucket/'+key_value
        archive_file='lob1_archive/'+key_value[key_value.find('/')+1:]
        # file archival by moving to another s3 folder 
        copy_source = {'Bucket': 'my-bucket','Key': key_value}
        s3_resource.Object('my-bucket', archive_file).copy_from( CopySource=copy_source)
        s3_resource.Object('my-bucket', key_value).delete()
    else:
        continue

Tags: andtokeyfromimports3bucketvalue

热门问题