允许气流DAG通过Livy:session和/或批处理运行Spark作业。
airflow-livy-operators的Python项目详细描述
气流Livy操作员
让气流DAG通过Livy运行Spark作业:
- 会议
- 批处理。此模式支持通过Spark/YARN REST API进行附加验证。在
请参阅this blog post以获取更多信息和从气流运行Spark作业的方法的详细比较。在
感兴趣的目录和文件
airflow_home/plugins
:flow Livy操作员代码。在airflow_home/dags
:气流的DAGs示例。在batches
:Spark作业代码,将在Livy批处理中使用。在sessions
:Livy会话的Spark代码。可以添加模板 以将参数传递到其中。在helper.sh
:助手shell脚本。可用于运行样本DAG, 准备开发环境等。 运行它来找出还有哪些命令可用。在
我怎么
…运行示例?
先决条件:
- Python 3。确保它已安装并位于$PATH中
- 与丽芙星星之火。我强烈建议你在你的机器上“模仿”一个 my Spark cluster on Docker Compose。在
现在
- 可选-如果要模拟
机器。打开助手.sh。在
init_airflow()
函数中,您将看到气流 Livy,Spark和YARN的连接。根据需要重新定义。在 - 定义将此repo中的示例批处理文件传递到集群的方式:
- 如果使用docker compose群集:重新定义BATCH_DIR变量 视情况而定。在
- 如果您使用自己的集群:修改
copy_batches()
函数,使其 将文件传递到集群可访问的位置(可以是aws s3 cp
等)
- 运行
./helper.sh up
以启动整个基础结构。 气流用户界面将在 localhost:8888。在 - Ctrl+C停止气流。然后
./helper.sh down
来处理 剩余的气流过程(如果一切顺利,就不需要了。 如果由于某些非信息性错误而无法再次启动气流,请运行此程序)。在
。。。在我的项目中使用它?
pip install airflow-livy-operators
导入方法如下:
^{pr2}$请参阅airflow_home/dags
下的示例dag以了解如何使用运算符。在
。。。建立开发环境?
好吧,你想做点贡献,并且需要能够在你的机器上运行这些东西, 以及IDE(调试、语法高亮显示)带来的通常的美好。在
- 运行
./helper.sh dev
安装所有的开发依赖项。在 ./helper.sh updev
使用本地操作员代码运行气流(与 从PyPi中提取它们)。对开发有用。在- (特定于Pycharm)将Pycharm指向新创建的虚拟环境:转到
"Preferences" -> "Project: airflow-livy-operators" -> "Project interpreter", select "Existing environment"
并从venv文件夹中选择python3可执行文件 (venv/bin/python3) ./helper.sh cov
-使用覆盖率报告运行测试 (将保存到htmlcov/)。在./helper.sh lint
-突出显示代码样式错误。在./helper.sh format
以重新格式化所有代码。 (此项目依赖Black+ isort)./helper.sh pypi
-为PyPi生成包。在
。。。调试?
- (特定于Pycharm)使用
airflow test
逐步调试 以及在本地运行PySpark批处理作业(以及调试) 通过.idea/runConfigurations
下的运行配置支持。 你不应该做任何事情来使用它们-只要打开文件夹 作为一个项目。在 - 如何在本地Spark上运行批处理的示例:
python ./batches/join_2_files.py \"file:////Users/vpanov/data/vpanov/bigdata-docker-compose/data/grades.csv"\"file:///Users/vpanov/data/vpanov/bigdata-docker-compose/data/ssn-address.tsv"\ -file1_sep=, -file1_header=true\ -file1_schema="\`Last name\` STRING, \`First name\` STRING, SSN STRING, Test1 INT, Test2 INT, Test3 INT, Test4 INT, Final INT, Grade STRING"\ -file1_join_column=SSN -file2_header=false\ -file2_schema="\`Last name\` STRING, \`First name\` STRING, SSN STRING, Address1 STRING, Address2 STRING"\ -file2_join_column=SSN -output_header=true\ -output_columns="file1.\`Last name\` AS LastName, file1.\`First name\` AS FirstName, file1.SSN, file2.Address1, file2.Address2"# Optionally append to save result to file#-output_path="file:///Users/vpanov/livy_batch_example"
托多
- 在助手.sh-替换为现代工具(例如pipenv+Docker image)
- 禁用一些flake8标志以获得更干净的代码
- 项目
标签: