从配置文件生成气流DAG
boundary-layer的Python项目详细描述
边界层
边界层
是一个用于从人类友好的、结构化的、可维护的yaml配置中构建空气流的工具。它包括对Airflow本身未内置的各种可用性增强的一流支持:
- 由dag中的气流创建和销毁的托管资源:例如,dataproc上的短暂dag范围的hadoop集群
- 基于灵活的模式,对所有运算符的所有参数进行类型检查和自动预处理
- 自动导入所需类
- 在操作员组之前和之后进行区分,以便更容易管理在工作流开始或结束时采取的操作
- dag剪枝,用于在保持依赖关系的同时提取或删除图形的部分
边界层还执行各种检查以查找仅在部署到气流实例时才可见的错误,例如DAG中的周期、重复的任务名称等。
边界层在etsy数据平台上被大量使用。我们平台上的每个dag都是由一个边界层
配置来定义的,而不是用原始的python来定义的,这大大减少了我们的数据科学家和工程师开发dag的障碍,同时确保在生成的python代码中始终遵守最佳实践。边界层
是我们完全自助部署过程的核心,在该过程中,DAG由我们的CI工具进行测试,并在允许DAG合并并部署到我们的气流实例之前显示错误。
此外,我们从Oozie到Airflow的迁移在很大程度上依赖于边界层的转换工具。
边界层是可插入的,通过使用
pip>安装的插件支持自定义配置和扩展。核心包不包含任何特定于etsy的定制;相反,这些都是在内部分发的etsy插件包中定义的。
有关更多信息,请参阅我们在etsy上的文章代码为Craft博客。
支持的操作员和气流版本
边界层
要求每个操作员都有一个配置文件来定义其模式、对应的python类等。这些配置文件存储在边界层默认插件中。我们目前包括一些常见气流操作器的配置(足以支持我们在etsy的需求,再加上一些),但我们知道我们缺少了许多满足常见气流用例所需的操作器。我们致力于继续增加对更多操作符的支持,我们还致力于支持对任何贡献的pull请求的快速周转时间,这些请求只增加了对其他操作符的支持。所以请提交一个请求如果g丢失,或者至少删除一个问题以通知我们。
此外,由于气流释放版本之间的操作器和传感器的某些差异,边界层和某些气流版本之间可能存在不兼容。众所周知,我们所有的操作符都使用气流释放版本1.9和1.10(尽管我们的模式验证了1.10的操作符参数,1.10是1.9的参数的超集——可能有一些参数是我们允许的,但1.9不能正确使用)。
安装
边界层
通过pypi分发,可以使用pip安装。
pip install boundary-layer --upgrade
我们建议安装到虚拟环境中,但这取决于您。
现在您应该可以运行边界层并查看其帮助消息:
$ boundary-layer --help
如果安装成功,您将看到如下输出:
usage: boundary-layer [-h] {build-dag,prune-dag,parse-oozie} ...
positional arguments:
{build-dag,prune-dag,parse-oozie}
optional arguments:
-h, --help show this help message and exit
边界层yaml配置
边界层的主要特性是它能够从简单、结构化的yaml文件构建python dag。
下面是一个简单的边界层yaml配置,用于在google cloud dataproc上运行hadoop作业:
name:my_dagdag_args:schedule_interval:'@daily'resources:-name:dataproc-clustertype:dataproc_clusterproperties:cluster_name:my-cluster-{{ execution_date.strftime('%s') }}num_workers:10region:us-central1default_task_args:owner:etsy-data-platformproject_id:my-project-idretries:2start_date:'2018-10-31'dataproc_hadoop_jars:-gs://my-bucket/my/path/to/my.jarbefore:-name:data-sensortype:gcs_object_sensorproperties:bucket:my-bucketobject:my/objectoperators:-name:my-jobtype:dataproc_hadooprequires_resources:-dataproc-clusterproperties:main_class:com.etsy.my.job.ClassNamedataproc_hadoop_properties:mapreduce.map.output.compress:'true'arguments:['--date','{{ds}}']
一些有趣的功能:
- 配置的
resources
部分定义了hadoop作业所需的瞬态dataproc
集群资源。边界层
将在创建DAG时自动插入用于创建和删除此群集的运算符,以及作业和群集之间的依赖关系。 - 配置的
before
部分定义将由边界层插入的传感器,作为DAG中所有下游操作的先决条件,包括创建瞬态DataProc群集。
要将上述yaml配置转换为python dag,请将其保存到一个文件中(为了方便起见,此dag已存储在示例目录中)并运行
$ boundary-layer build-dag readme_example.yaml > readme_example.py
而且,如果一切顺利,这将在文件readme-example.py
中写入一个有效的Airflow DAG。你应该打开这个文件,看看它的内容,了解边界层在做什么。特别是,在文件顶部的一些注释之后,您应该看到如下内容:
importosfromairflowimportDAGimportdatetimefromairflow.operators.dummy_operatorimportDummyOperatorfromairflow.contrib.sensors.gcs_sensorimportGoogleCloudStorageObjectSensorfromairflow.contrib.operators.dataproc_operatorimportDataprocClusterDeleteOperator,DataProcHadoopOperator,DataprocClusterCreateOperatorDEFAULT_TASK_ARGS={'owner':'etsy-data-platform','retries':2,'project_id':'my-project-id','start_date':'2018-10-31','dataproc_hadoop_jars':['gs://my-bucket/my/path/to/my.jar'],}dag=DAG(schedule_interval='@daily',catchup=True,max_active_runs=1,dag_id='my_dag',default_args=DEFAULT_TASK_ARGS,)data_sensor=GoogleCloudStorageObjectSensor(dag=(dag),task_id='data_sensor',object='my/object',bucket='my-bucket',start_date=(datetime.datetime(2018,10,31,0,0)),)dataproc_cluster_create=DataprocClusterCreateOperator(dag=(dag),task_id='dataproc_cluster_create',num_workers=10,region='us-central1',cluster_name="my-cluster-{{ execution_date.strftime('%s') }}",start_date=(datetime.datetime(2018,10,31,0,0)),)dataproc_cluster_create.set_upstream(data_sensor)my_job=DataProcHadoopOperator(dag=(dag),task_id='my_job',dataproc_hadoop_properties={'mapreduce.map.output.compress':'true'},region='us-central1',start_date=(datetime.datetime(2018,10,31,0,0)),cluster_name="my-cluster-{{ execution_date.strftime('%s') }}",arguments=['--date','{{ ds }}'],main_class='com.etsy.my.job.ClassName',)my_job.set_upstream(dataproc_cluster_create)dataproc_cluster_destroy_sentinel=DummyOperator(dag=(dag),start_date=(datetime.datetime(2018,10,31,0,0)),task_id='dataproc_cluster_destroy_sentinel',)dataproc_cluster_destroy_sentinel.set_upstream(my_job)dataproc_cluster_destroy=DataprocClusterDeleteOperator(dag=(dag),task_id='dataproc_cluster_destroy',trigger_rule='all_done',region='us-central1',cluster_name="my-cluster-{{ execution_date.strftime('%s') }}",priority_weight=50,start_date=(datetime.datetime(2018,10,31,0,0)),)dataproc_cluster_destroy.set_upstream(my_job)
这个python dag现在可以直接摄取到正在运行的airlow实例中,遵循任何适合您的airlow部署的过程。
有几点需要注意:
边界层
将开始日期
参数从字符串转换为python日期时间
对象。这是边界层参数预处理器特性的一个例子,它允许将配置参数指定为用户友好的字符串,并自动转换为必要的python数据结构。边界层
与集群破坏节点并行添加了一个哨兵节点,该节点作为气流本身的指示器,用于指示DAG运行的最终结果。气流从DAG的叶节点确定DAG运行状态,通常群集销毁节点将始终执行(与上游故障无关),并可能成功。这将导致关键节点中出现故障的DAG被标记为成功,如果不是哨兵节点。sentinel节点只有在其所有上游依赖项成功时才会触发,否则将标记为"上游失败",这将导致DAG运行的失败状态。
oozie迁移工具
除了允许我们使用yaml配置定义气流工作流之外,边界层r
还提供了一个模块,用于将oozie xml配置文件转换为边界层
yaml配置,然后可用于创建气流dag。
不可否认,边界层的oozie支持目前是有限的:它只能构建将hadoop作业提交给dataproc的dag(例如,它不支持独立的hadoop集群),而且它确实支持。不支持oozie协调员。如果社区对Oozie的支持有需求,我们愿意致力于改善Oozie的支持,当然,我们也愿意为实现这一目标做出社区贡献。
以下命令将把示例oozie工作流转换为边界层dag,该dag将在gcp的us-east1
区域中的64节点dataproc集群上执行,用于gcp项目my project id
:
boundary-layer parse-oozie example \
--local-workflow-base-path test/data/oozie-workflows/ \
--cluster-project-id my-project-id \
--cluster-region us-east1 \
--cluster-num-workers 64