用于处理AWS数据的公用皮带。
awswrangler的Python项目详细描述
AWS数据打包器(beta版)
Utility belt to handle data on AWS.
内容:Use CasesInstallationExamplesDiving Deep
用例
- 熊猫-镶木地板(S3)
- 熊猫->;csv(s3)
- 熊猫->;胶水目录
- 熊猫->;雅典娜
- 熊猫->;红移
- csv(s3)->;熊猫(一次性或批量)
- 雅典娜->;熊猫(一次性或批量)
- Pyspark->;红移
- 删除s3对象(并行:rocket:)
- 用kms密钥加密s3数据
安装
pip install awswrangler
仅与Python3.6及更高版本一起运行。
在任何地方运行(aws lambda、aws glue、emr、ec2、本地、本地等)。
p.s.lambda layer bundle和glue egg可用于download。它只是上传到你的帐户和运行!:火箭:
示例
将pandas数据帧写入s3+glue catalog
session=awswrangler.Session()session.pandas.to_parquet(dataframe=dataframe,database="database",path="s3://...",partition_cols=["col_name"],)
如果传递了glue数据库名称,则将在glue目录中创建所有元数据。否则,只会执行s3数据写入。
用kms密钥将pandas数据帧作为拼花加密写入s3
extra_args={"ServerSideEncryption":"aws:kms","SSEKMSKeyId":"YOUR_KMY_KEY_ARN"}session=awswrangler.Session(s3_additional_kwargs=extra_args)session.pandas.to_parquet(path="s3://...")
从aws雅典娜到熊猫的阅读
session=awswrangler.Session()dataframe=session.pandas.read_sql_athena(sql="select * from table",database="database")
从aws athena到pandas的分块阅读(内存限制)
session=awswrangler.Session()dataframe_iter=session.pandas.read_sql_athena(sql="select * from table",database="database",max_result_size=512_000_000# 512 MB)fordataframeindataframe_iter:print(dataframe)# Do whatever you want
从s3(csv)读取熊猫
session=awswrangler.Session()dataframe=session.pandas.read_csv(path="s3://...")
从s3(csv)到pandas的分块读取(内存限制)
session=awswrangler.Session()dataframe_iter=session.pandas.read_csv(path="s3://...",max_result_size=512_000_000# 512 MB)fordataframeindataframe_iter:print(dataframe)# Do whatever you want
典型熊猫ETL
importpandasimportawswranglerdf=pandas.read_...# Read from anywhere# Typical Pandas, Numpy or Pyarrow transformation HERE!session=awswrangler.Session()session.pandas.to_parquet(# Storing the data and metadata to Data Lakedataframe=dataframe,database="database",path="s3://...",partition_cols=["col_name"],)
将pyspark数据帧加载到redshift
session=awswrangler.Session(spark_session=spark)session.spark.to_redshift(dataframe=df,path="s3://...",connection=conn,schema="public",table="table",iam_role="IAM_ROLE_ARN",mode="append",)
删除一组s3对象
session=awswrangler.Session()session.s3.delete_objects(path="s3://...")