从csv到aws s3拼花的etl作业

s3-parquetifier的Python项目详细描述


S3 Parquetifier

Build StatusPyPI version fury.ioMIT license

s3 parquetifier是一个etl工具,它可以从s3 bucket中获取一个文件,将其转换为parquet格式并 把它存到另一个桶里。

s3 parquetifier支持以下文件类型

  • [X]CSV
  • []json
  • []TSV

说明

如何安装

要安装软件包,请运行以下命令

pipinstalls3-parquetifier

如何使用

s3 parquetifier需要一个aws帐户,该帐户至少具有目标bucket的读取权限 以及目标存储桶的读写权限。

您可以阅读以下关于如何设置s3角色和策略的文章here

运行脚本
froms3_parquetifierimportS3Parquetifier# Call the covertorS3Parquetifier(aws_access_key='<your aws access key>',aws_secret_key='<your aws secret key>',region='<the region>',verbose=True,# for verbosity or notsource_bucket='<the bucket'snamewheretheCSVsare>',target_bucket='<the bucket'snamewhereyouwanttheparquetfiletobesaved>',type='S3'# for now only S3).convert_from_s3(source_key='<the key of the object or the folder>',target_key='<the key of the object or the folder>',file_type='csv'# for now only CSV,chunk_size=100000,# The number of rows per parquetdtype=None,# A dictionary defining the types of the columnsskip_rows=None,# How many rows to skip per chunkcompression='gzip',# The compression typekeep_original_name_locally=True,# In order to keep the original filename or create a random when downloading the fileencoding='utf-8'# Set the encoding of the file)

添加自定义预处理功能

可以在源文件中添加自定义预处理函数。因为这个工具是为大文件设计的预处理 每一块都是单独发生的。如果预处理需要完整的文件,则需要在源文件中进行本地预处理。

在下面的示例中,我们将使用一些自定义值在块上添加自定义列。 我们将分别添加值为1, 2, 3的列test1, test2, test3

我们在下面定义了名为pre_process的函数,还定义了函数kwargs的参数。 Kwargs中不需要块数据帧,默认情况下采用它。您必须将函数作为参数传入 pre_process_chunkconvert_from_s3方法中kwargs中的参数。

froms3_parquetifierimportS3Parquetifier# Add three new columns with custom valuesdefpre_process(chunk,columns=None,values=None):forindex,columninenumerate(columns):chunk[column]=values[index]returnchunk# define the arguments for the pre-processorkwargs={'columns':['test1','test2','test3'],'values':[1,2,3]}# Call the covertorS3Parquetifier(aws_access_key='<your aws access key>',aws_secret_key='<your aws secret key>',region='<the region>',verbose=True,# for verbosity or notsource_bucket='<the bucket'snamewheretheCSVsare>',target_bucket='<the bucket'snamewhereyouwanttheparquetfiletobesaved>',type='S3'# for now only S3).convert_from_s3(source_key='<the key of the object or the folder>',target_key='<the key of the object or the folder>',file_type='csv'# for now only CSV,chunk_size=100000,# The number of rows per parquetdtype=None,# A dictionary defining the types of the columnsskip_rows=None,# How many rows to skip per chunkcompression='gzip',# The compression typekeep_original_name_locally=True,# In order to keep the original filename or create a random when downloading the fileencoding='utf-8',# Set the encoding of the filepre_process_chunk=pre_process,# A preprocessing function that will pre-process the each chunkkwargs=kwargs# potential extra arguments for the pre-preocess function)

待办事项

  • [X]添加对处理本地文件的支持
  • []添加对json的支持
  • []通过URL支持添加流媒体

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
JavaSwing:实现TableModel还是扩展AbstractTableModel?   无法在连接了SQL的Java中获取数据抛出结果集   java在运行jar时访问jar外部的prop文件   java如何在使用xPath时选择通过检查的节点,而不是它们的父节点。评估()   java如何将文本文件中的整数值翻两番?   java Update Hibernate给出了一个错误   如何使用Java在类中实现调用mule中onCall方法的Singleton?   java如何修复Hibernate 5的映射未找到异常?   调用AlertDialog。按下后退按钮时Android片段的Java生成器   java基于进度更改JProgressBar中的ColorUIResource   java如何让这个测试通过?   java是否可以在TestNG中对参数化测试进行依赖?   java查询无法通过HQL执行   Spring java 11获取spel问题EL1005E:找不到类型   尝试获取JMH锁时发生java异常