从csv到aws s3拼花的etl作业
s3-parquetifier的Python项目详细描述
S3 Parquetifier
s3 parquetifier是一个etl工具,它可以从s3 bucket中获取一个文件,将其转换为parquet格式并 把它存到另一个桶里。
s3 parquetifier支持以下文件类型
- [X]CSV
- []json
- []TSV
说明
如何安装
要安装软件包,请运行以下命令
pipinstalls3-parquetifier
如何使用
s3 parquetifier需要一个aws帐户,该帐户至少具有目标bucket的读取权限 以及目标存储桶的读写权限。
您可以阅读以下关于如何设置s3角色和策略的文章here
运行脚本
froms3_parquetifierimportS3Parquetifier# Call the covertorS3Parquetifier(aws_access_key='<your aws access key>',aws_secret_key='<your aws secret key>',region='<the region>',verbose=True,# for verbosity or notsource_bucket='<the bucket'snamewheretheCSVsare>',target_bucket='<the bucket'snamewhereyouwanttheparquetfiletobesaved>',type='S3'# for now only S3).convert_from_s3(source_key='<the key of the object or the folder>',target_key='<the key of the object or the folder>',file_type='csv'# for now only CSV,chunk_size=100000,# The number of rows per parquetdtype=None,# A dictionary defining the types of the columnsskip_rows=None,# How many rows to skip per chunkcompression='gzip',# The compression typekeep_original_name_locally=True,# In order to keep the original filename or create a random when downloading the fileencoding='utf-8'# Set the encoding of the file)
添加自定义预处理功能
可以在源文件中添加自定义预处理函数。因为这个工具是为大文件设计的预处理 每一块都是单独发生的。如果预处理需要完整的文件,则需要在源文件中进行本地预处理。
在下面的示例中,我们将使用一些自定义值在块上添加自定义列。
我们将分别添加值为1, 2, 3
的列test1, test2, test3
。
我们在下面定义了名为pre_process
的函数,还定义了函数kwargs
的参数。
Kwargs中不需要块数据帧,默认情况下采用它。您必须将函数作为参数传入
pre_process_chunk
和convert_from_s3
方法中kwargs
中的参数。
froms3_parquetifierimportS3Parquetifier# Add three new columns with custom valuesdefpre_process(chunk,columns=None,values=None):forindex,columninenumerate(columns):chunk[column]=values[index]returnchunk# define the arguments for the pre-processorkwargs={'columns':['test1','test2','test3'],'values':[1,2,3]}# Call the covertorS3Parquetifier(aws_access_key='<your aws access key>',aws_secret_key='<your aws secret key>',region='<the region>',verbose=True,# for verbosity or notsource_bucket='<the bucket'snamewheretheCSVsare>',target_bucket='<the bucket'snamewhereyouwanttheparquetfiletobesaved>',type='S3'# for now only S3).convert_from_s3(source_key='<the key of the object or the folder>',target_key='<the key of the object or the folder>',file_type='csv'# for now only CSV,chunk_size=100000,# The number of rows per parquetdtype=None,# A dictionary defining the types of the columnsskip_rows=None,# How many rows to skip per chunkcompression='gzip',# The compression typekeep_original_name_locally=True,# In order to keep the original filename or create a random when downloading the fileencoding='utf-8',# Set the encoding of the filepre_process_chunk=pre_process,# A preprocessing function that will pre-process the each chunkkwargs=kwargs# potential extra arguments for the pre-preocess function)
待办事项
- [X]添加对处理本地文件的支持
- []添加对json的支持
- []通过URL支持添加流媒体