我最近编写了一个python脚本(AWS Glue job)来处理来自S3的数百个JSON文件,并通过应用连接和过滤器等转换将它们转换为拼花,然后写回S3并处理一些异常
但在生产系统中,我们预计将收到近1000000个文件(100万个文件)
需要帮助将下面的python代码转换为PySpark代码。如果其他人有类似的场景,这可能对他们有用
import json
import boto3
import pandas as pd
import awswrangler as wr
import io
#read json file and load into dataframe/parquet-file
s3 = boto3.client('s3')
s3_resource = boto3.resource('s3')
my_bucket = s3_resource.Bucket('my-bucket')
#provide absolute prefix with "/" to avoid reading other folders with similar prefix
for key in s3.list_objects(Bucket='my-bucket',Prefix='lob1/')['Contents']:
key_value=key['Key']
tablename=key_value[key_value.find('/')+1:-5]
response = s3.get_object(Bucket='my-bucket', Key=key_value)
# check key is a file and not empty
if len(tablename)>1 and response['ContentLength']!=0:
s3_data = io.BytesIO(response.get('Body').read())
data=json.load(s3_data)
docdf = pd.DataFrame.from_dict(data, orient='index')
docdf = docdf.transpose()
docdf['keycol']='1'
# read from subdoc array from docdf and convert to dataframe
if 'subdocs' in docdf.columns:
subdocdf = pd.DataFrame.from_dict(docdf['subdocs'][0])
subdocdf['keycol']='1'
alldocsdef = pd.merge(docdf, subdocdf, on='keycol')
alldocsdef=alldocsdef.drop(['subdocs'],axis=1)
else:
subdocdf=[]
alldocsdef=docdf
# convert dtype to string for all columns
for col in alldocsdef.columns:
alldocsdef[col] = alldocsdef[col].astype('string')
# write to s3 file if dataframe is not empty
if len(alldocsdef.index)!=0:
wr.s3.to_parquet(df=alldocsdef,path="s3://my-dw-bucket/dataextract/lob1"+"/"+tablename+".parquet")
print('written_to_parquet')
else:
print("skipped all")
continue
current_file='s3://my-bucket/'+key_value
archive_file='lob1_archive/'+key_value[key_value.find('/')+1:]
# file archival by moving to another s3 folder
copy_source = {'Bucket': 'my-bucket','Key': key_value}
s3_resource.Object('my-bucket', archive_file).copy_from( CopySource=copy_source)
s3_resource.Object('my-bucket', key_value).delete()
else:
continue
目前没有回答
相关问题 更多 >
编程相关推荐