从配置文件生成气流DAG

boundary-layer的Python项目详细描述


build statuscovercoverstatus

边界层

边界层是一个用于从人类友好的、结构化的、可维护的yaml配置中构建空气流的工具。它包括对Airflow本身未内置的各种可用性增强的一流支持:

  • 由dag中的气流创建和销毁的托管资源:例如,dataproc上的短暂dag范围的hadoop集群
  • 基于灵活的模式,对所有运算符的所有参数进行类型检查和自动预处理
  • 自动导入所需类
  • 在操作员组之前和之后进行区分,以便更容易管理在工作流开始或结束时采取的操作
  • dag剪枝,用于在保持依赖关系的同时提取或删除图形的部分

边界层还执行各种检查以查找仅在部署到气流实例时才可见的错误,例如DAG中的周期、重复的任务名称等。

边界层在etsy数据平台上被大量使用。我们平台上的每个dag都是由一个边界层配置来定义的,而不是用原始的python来定义的,这大大减少了我们的数据科学家和工程师开发dag的障碍,同时确保在生成的python代码中始终遵守最佳实践。边界层是我们完全自助部署过程的核心,在该过程中,DAG由我们的CI工具进行测试,并在允许DAG合并并部署到我们的气流实例之前显示错误。

此外,我们从Oozie到Airflow的迁移在很大程度上依赖于边界层的转换工具。

边界层是可插入的,通过使用pip>安装的插件支持自定义配置和扩展。核心包不包含任何特定于etsy的定制;相反,这些都是在内部分发的etsy插件包中定义的。

有关更多信息,请参阅我们在etsy上的文章代码为Craft博客。

支持的操作员和气流版本

边界层要求每个操作员都有一个配置文件来定义其模式、对应的python类等。这些配置文件存储在边界层默认插件中。我们目前包括一些常见气流操作器的配置(足以支持我们在etsy的需求,再加上一些),但我们知道我们缺少了许多满足常见气流用例所需的操作器。我们致力于继续增加对更多操作符的支持,我们还致力于支持对任何贡献的pull请求的快速周转时间,这些请求只增加了对其他操作符的支持。所以请提交一个请求如果g丢失,或者至少删除一个问题以通知我们。

此外,由于气流释放版本之间的操作器和传感器的某些差异,边界层和某些气流版本之间可能存在不兼容。众所周知,我们所有的操作符都使用气流释放版本1.9和1.10(尽管我们的模式验证了1.10的操作符参数,1.10是1.9的参数的超集——可能有一些参数是我们允许的,但1.9不能正确使用)。

安装

边界层通过pypi分发,可以使用pip安装。

pip install boundary-layer --upgrade

我们建议安装到虚拟环境中,但这取决于您。

现在您应该可以运行边界层并查看其帮助消息:

$ boundary-layer --help

如果安装成功,您将看到如下输出:

usage: boundary-layer [-h] {build-dag,prune-dag,parse-oozie} ...

positional arguments:
  {build-dag,prune-dag,parse-oozie}

optional arguments:
  -h, --help            show this help message and exit

边界层yaml配置

边界层的主要特性是它能够从简单、结构化的yaml文件构建python dag。

下面是一个简单的边界层yaml配置,用于在google cloud dataproc上运行hadoop作业:

name:my_dagdag_args:schedule_interval:'@daily'resources:-name:dataproc-clustertype:dataproc_clusterproperties:cluster_name:my-cluster-{{ execution_date.strftime('%s') }}num_workers:10region:us-central1default_task_args:owner:etsy-data-platformproject_id:my-project-idretries:2start_date:'2018-10-31'dataproc_hadoop_jars:-gs://my-bucket/my/path/to/my.jarbefore:-name:data-sensortype:gcs_object_sensorproperties:bucket:my-bucketobject:my/objectoperators:-name:my-jobtype:dataproc_hadooprequires_resources:-dataproc-clusterproperties:main_class:com.etsy.my.job.ClassNamedataproc_hadoop_properties:mapreduce.map.output.compress:'true'arguments:['--date','{{ds}}']

一些有趣的功能:

  • 配置的resources部分定义了hadoop作业所需的瞬态dataproc集群资源。边界层将在创建DAG时自动插入用于创建和删除此群集的运算符,以及作业和群集之间的依赖关系。
  • 配置的before部分定义将由边界层插入的传感器,作为DAG中所有下游操作的先决条件,包括创建瞬态DataProc群集。

要将上述yaml配置转换为python dag,请将其保存到一个文件中(为了方便起见,此dag已存储在示例目录中)并运行

$ boundary-layer build-dag readme_example.yaml > readme_example.py

而且,如果一切顺利,这将在文件readme-example.py中写入一个有效的Airflow DAG。你应该打开这个文件,看看它的内容,了解边界层在做什么。特别是,在文件顶部的一些注释之后,您应该看到如下内容:

importosfromairflowimportDAGimportdatetimefromairflow.operators.dummy_operatorimportDummyOperatorfromairflow.contrib.sensors.gcs_sensorimportGoogleCloudStorageObjectSensorfromairflow.contrib.operators.dataproc_operatorimportDataprocClusterDeleteOperator,DataProcHadoopOperator,DataprocClusterCreateOperatorDEFAULT_TASK_ARGS={'owner':'etsy-data-platform','retries':2,'project_id':'my-project-id','start_date':'2018-10-31','dataproc_hadoop_jars':['gs://my-bucket/my/path/to/my.jar'],}dag=DAG(schedule_interval='@daily',catchup=True,max_active_runs=1,dag_id='my_dag',default_args=DEFAULT_TASK_ARGS,)data_sensor=GoogleCloudStorageObjectSensor(dag=(dag),task_id='data_sensor',object='my/object',bucket='my-bucket',start_date=(datetime.datetime(2018,10,31,0,0)),)dataproc_cluster_create=DataprocClusterCreateOperator(dag=(dag),task_id='dataproc_cluster_create',num_workers=10,region='us-central1',cluster_name="my-cluster-{{ execution_date.strftime('%s') }}",start_date=(datetime.datetime(2018,10,31,0,0)),)dataproc_cluster_create.set_upstream(data_sensor)my_job=DataProcHadoopOperator(dag=(dag),task_id='my_job',dataproc_hadoop_properties={'mapreduce.map.output.compress':'true'},region='us-central1',start_date=(datetime.datetime(2018,10,31,0,0)),cluster_name="my-cluster-{{ execution_date.strftime('%s') }}",arguments=['--date','{{ ds }}'],main_class='com.etsy.my.job.ClassName',)my_job.set_upstream(dataproc_cluster_create)dataproc_cluster_destroy_sentinel=DummyOperator(dag=(dag),start_date=(datetime.datetime(2018,10,31,0,0)),task_id='dataproc_cluster_destroy_sentinel',)dataproc_cluster_destroy_sentinel.set_upstream(my_job)dataproc_cluster_destroy=DataprocClusterDeleteOperator(dag=(dag),task_id='dataproc_cluster_destroy',trigger_rule='all_done',region='us-central1',cluster_name="my-cluster-{{ execution_date.strftime('%s') }}",priority_weight=50,start_date=(datetime.datetime(2018,10,31,0,0)),)dataproc_cluster_destroy.set_upstream(my_job)

这个python dag现在可以直接摄取到正在运行的airlow实例中,遵循任何适合您的airlow部署的过程。

有几点需要注意:

  • 边界层开始日期参数从字符串转换为python日期时间对象。这是边界层参数预处理器特性的一个例子,它允许将配置参数指定为用户友好的字符串,并自动转换为必要的python数据结构。
  • 边界层与集群破坏节点并行添加了一个哨兵节点,该节点作为气流本身的指示器,用于指示DAG运行的最终结果。气流从DAG的叶节点确定DAG运行状态,通常群集销毁节点将始终执行(与上游故障无关),并可能成功。这将导致关键节点中出现故障的DAG被标记为成功,如果不是哨兵节点。sentinel节点只有在其所有上游依赖项成功时才会触发,否则将标记为"上游失败",这将导致DAG运行的失败状态。

oozie迁移工具

除了允许我们使用yaml配置定义气流工作流之外,边界层r还提供了一个模块,用于将oozie xml配置文件转换为边界层yaml配置,然后可用于创建气流dag。

不可否认,边界层的oozie支持目前是有限的:它只能构建将hadoop作业提交给dataproc的dag(例如,它不支持独立的hadoop集群),而且它确实支持。不支持oozie协调员。如果社区对Oozie的支持有需求,我们愿意致力于改善Oozie的支持,当然,我们也愿意为实现这一目标做出社区贡献。

以下命令将把示例oozie工作流转换为边界层dag,该dag将在gcp的us-east1区域中的64节点dataproc集群上执行,用于gcp项目my project id:

boundary-layer parse-oozie example \
  --local-workflow-base-path test/data/oozie-workflows/ \
  --cluster-project-id my-project-id \
  --cluster-region us-east1 \
  --cluster-num-workers 64

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
java LineNumberReader。如果查询行为不正确,则返回readLine()   java包含了一个使用AndroidX的工具栏,这让我的应用程序崩溃了   JVM设置通过“java jar”运行应用程序的最佳实践   java如何获取ImageButton宽度   java Oracle SQLLDR实用程序无响应   列出Java获取对象的arrayList中最常见的元素   java使用带有FlowLayout的getContentpane对布局应用更改,但不起作用为什么?   在java中,我可以在画布上绘制画布吗?   编译游戏代码时发生java异常错误   从firestore获取java Webview失败   java将TableLayout中单元格的内容向右对齐   java无法在发布模式下启动活动(使用proguard安卓optimize配置)   java允许在线程期间进行GUI更新。睡觉   java如何对以变量为列表的列表进行排序   API URL上的java Google云端点异常