一个气流插件,在presto上添加一个分区作为select(apas),使用glue data catalog作为配置单元元存储。

airflow-plugin-glue-presto-apas的Python项目详细描述


Airflow-Plugin-Glue_Presto_APAS

PyPi

在presto上,添加一个分区作为select(apas)的气流插件,它使用glue data catalog作为配置单元元存储。

用法

from datetime import timedelta

import airflow
from airflow.models import DAG

from airflow.operators.glue_add_partition import GlueAddPartitionOperator
from airflow.operators.glue_presto_apas import GluePrestoApasOperator

args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': airflow.utils.dates.days_ago(2),
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}


dag = DAG(
    dag_id='example-dag',
    schedule_interval='0 0 * * *',
    default_args=args,
)

GluePrestoApasOperator(task_id='example-task-1',
                       db='example_db',
                       table='example_table',
                       sql='example.sql',
                       partition_kv={
                           'table_schema': 'example_db',
                           'table_name': 'example_table'
                       },
                       catalog_region_name='ap-northeast-1',
                       dag=dag,
                       )

GlueAddPartitionOperator(task_id='example-task-2',
                         db='example_db',
                         table='example_table',
                         partition_kv={
                             'table_schema': 'example_db',
                             'table_name': 'example_table'
                         },
                         catalog_region_name='ap-northeast-1',
                         dag=dag,
                         )

if __name__ == "__main__":
    dag.cli()

配置

glue_presto_apas.glueprestoapas运算符

  • db:用于分区的数据库名称(字符串,必需)
  • table:用于分区的表名(字符串,必需)
  • sql:用于选择数据的sql文件名(字符串,必需)
  • fmt:存储数据时的数据格式(字符串,默认值=parquet
  • 附加属性:用于创建表的附加属性。(dict[string,string],可选)
  • location:数据的位置(字符串,默认值=由配置单元可修复方式自动生成)
  • partition千伏:分区的键值(dict[string,string],必需)
  • 保存模式:存储数据时的模式(字符串,默认值为overwrite,可用值为skip_if_existserror_if_existsignoreoverwrite
  • catalog\u id:如果使用不同于帐户/区域默认目录的目录,则粘附数据目录id。(字符串,可选)
  • catalog_region_name:如果使用与帐户/区域默认目录不同的目录,请粘附数据目录区域。(字符串,US-EAST-1)
  • presto_conn_id:presto的连接id(字符串,默认值为'presto_default')
  • aws_conn_id:aws的连接id(string,default='aws_default')

模板可用于选项[dbtablesqllocationpartition\u kv]。

glue add分区。glueaddpartitionoperator

  • db:用于分区的数据库名称(字符串,必需)
  • table:用于分区的表名(字符串,必需)
  • location:数据的位置(字符串,默认值=由配置单元可修复方式自动生成)
  • partition千伏:分区的键值(dict[string,string],必需)
  • mode:存储数据时的模式(字符串,默认值为overwrite,可用值为skip_if_existserror_if_existsoverwrite
  • {STR 1 } $UpLeSuxe:跳过添加分区,如果位置不存在则删除分区。(布尔值,默认值=True
  • catalog\u id:如果使用不同于帐户/区域默认目录的目录,则粘附数据目录id。(字符串,可选)
  • catalog_region_name:如果使用与帐户/区域默认目录不同的目录,请粘附数据目录区域。(字符串,US-EAST-1)
  • aws_conn_id:aws的连接id(string,default='aws_default')

模板可用于选项[dbtablelocationpartition\u kv]。

开发

运行示例

PRESTO_HOST=${YOUR PRESTO HOST} PRESTO_PORT=${YOUR PRESTO PORT} ./run-example.sh

释放

poetry publish --build

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
java Hibernate:合并并删除,而不是从数据库中删除实体对象   java如何在WebFilter中有条件地从Mono返回?   java调用mysql c3p0函数   可执行jar Java jar文件加载错误:无法找到或加载主类。类导致的错误:java。lang.ClassNotFoundException:某些东西。班   java如何减少/更改爬网后的延迟?   从其他语言(如Java、PHP、Perl、Python等)调用C/C++代码的最佳方式是什么?   java如何模拟影响对象的void返回方法   当我试图在ubuntu上启动JavaScala时,它抛出了一个异常   java如何正确输出游戏   理解java和C++背景下的JavaScript原型   oracle如何将Java函数转换为postgresql函数   多线程为什么我的java服务器程序在超时后不退出?   java如何使listView中的按钮在单击时工作?   试图将这个嵌套的java forloop转换为python,但我不知道如何转换。有没有一种方法可以像这样为循环执行if语句?   java幂函数在计算器中的应用   如何在java中滚动浏览mysql数据库   在Spring Boot应用程序的JUnit测试中,java没有符合自动连线JPA存储库要求的bean   java如何使用扩展类的JPanel对象?