从csv到aws s3拼花的etl作业

s3-parquetifier的Python项目详细描述


S3 Parquetifier

Build StatusPyPI version fury.ioMIT license

s3 parquetifier是一个etl工具,它可以从s3 bucket中获取一个文件,将其转换为parquet格式并 把它存到另一个桶里。

s3 parquetifier支持以下文件类型

  • [X]CSV
  • []json
  • []TSV

说明

如何安装

要安装软件包,请运行以下命令

pipinstalls3-parquetifier

如何使用

s3 parquetifier需要一个aws帐户,该帐户至少具有目标bucket的读取权限 以及目标存储桶的读写权限。

您可以阅读以下关于如何设置s3角色和策略的文章here

运行脚本
froms3_parquetifierimportS3Parquetifier# Call the covertorS3Parquetifier(aws_access_key='<your aws access key>',aws_secret_key='<your aws secret key>',region='<the region>',verbose=True,# for verbosity or notsource_bucket='<the bucket'snamewheretheCSVsare>',target_bucket='<the bucket'snamewhereyouwanttheparquetfiletobesaved>',type='S3'# for now only S3).convert_from_s3(source_key='<the key of the object or the folder>',target_key='<the key of the object or the folder>',file_type='csv'# for now only CSV,chunk_size=100000,# The number of rows per parquetdtype=None,# A dictionary defining the types of the columnsskip_rows=None,# How many rows to skip per chunkcompression='gzip',# The compression typekeep_original_name_locally=True,# In order to keep the original filename or create a random when downloading the fileencoding='utf-8'# Set the encoding of the file)

添加自定义预处理功能

可以在源文件中添加自定义预处理函数。因为这个工具是为大文件设计的预处理 每一块都是单独发生的。如果预处理需要完整的文件,则需要在源文件中进行本地预处理。

在下面的示例中,我们将使用一些自定义值在块上添加自定义列。 我们将分别添加值为1, 2, 3的列test1, test2, test3

我们在下面定义了名为pre_process的函数,还定义了函数kwargs的参数。 Kwargs中不需要块数据帧,默认情况下采用它。您必须将函数作为参数传入 pre_process_chunkconvert_from_s3方法中kwargs中的参数。

froms3_parquetifierimportS3Parquetifier# Add three new columns with custom valuesdefpre_process(chunk,columns=None,values=None):forindex,columninenumerate(columns):chunk[column]=values[index]returnchunk# define the arguments for the pre-processorkwargs={'columns':['test1','test2','test3'],'values':[1,2,3]}# Call the covertorS3Parquetifier(aws_access_key='<your aws access key>',aws_secret_key='<your aws secret key>',region='<the region>',verbose=True,# for verbosity or notsource_bucket='<the bucket'snamewheretheCSVsare>',target_bucket='<the bucket'snamewhereyouwanttheparquetfiletobesaved>',type='S3'# for now only S3).convert_from_s3(source_key='<the key of the object or the folder>',target_key='<the key of the object or the folder>',file_type='csv'# for now only CSV,chunk_size=100000,# The number of rows per parquetdtype=None,# A dictionary defining the types of the columnsskip_rows=None,# How many rows to skip per chunkcompression='gzip',# The compression typekeep_original_name_locally=True,# In order to keep the original filename or create a random when downloading the fileencoding='utf-8',# Set the encoding of the filepre_process_chunk=pre_process,# A preprocessing function that will pre-process the each chunkkwargs=kwargs# potential extra arguments for the pre-preocess function)

待办事项

  • [X]添加对处理本地文件的支持
  • []添加对json的支持
  • []通过URL支持添加流媒体

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
java使用McClickListener单击了什么元素   Java时间戳在Oracle时间戳中不同情况下存储12 PM的奇怪行为   java无法使用事件总线对运行在不同机器上的垂直体进行通信   java Mockserver:收到请求后进行回调   java无法将Json字符串转换为Map<string,Object>   java如何按升序排列输出?   java视图行,带有oracle键。jbo。在SrCategoryParentIterator中找不到键[300100120394155]   javafxmysql连接示例   java正在等待加载完成   java是否可以将同一个有状态会话bean实例注入多个其他会话bean?   java无法让万向节检测离开或进入区域   使用JavaCV和OpenCV的java提供了dyld:lazy符号绑定失败:找不到符号:__sincos_stret   xml解析无法使用Java读取xml文档   java无法更改工具栏的颜色   javaapachesshd和JSCH   java无法在firebase存储中检索图像url   java问题与executeUpdate   同一应用程序中不同活动之间的java SharedReference