允许气流DAG通过Livy:session和/或批处理运行Spark作业。

airflow-livy-operators的Python项目详细描述


气流Livy操作员

Build StatusCode coverage

让气流DAG通过Livy运行Spark作业:

  • 会议
  • 批处理。此模式支持通过Spark/YARN REST API进行附加验证。在

请参阅this blog post以获取更多信息和从气流运行Spark作业的方法的详细比较。在

感兴趣的目录和文件

  • airflow_home/plugins:flow Livy操作员代码。在
  • airflow_home/dags:气流的DAGs示例。在
  • batches:Spark作业代码,将在Livy批处理中使用。在
  • sessions:Livy会话的Spark代码。可以添加模板 以将参数传递到其中。在
  • helper.sh:助手shell脚本。可用于运行样本DAG, 准备开发环境等。 运行它来找出还有哪些命令可用。在

我怎么

…运行示例?

先决条件:

现在

  1. 可选-如果要模拟 机器。打开助手.sh。在init_airflow()函数中,您将看到气流 Livy,Spark和YARN的连接。根据需要重新定义。在
  2. 定义将此repo中的示例批处理文件传递到集群的方式:
    1. 如果使用docker compose群集:重新定义BATCH_DIR变量 视情况而定。在
    2. 如果您使用自己的集群:修改copy_batches()函数,使其 将文件传递到集群可访问的位置(可以是aws s3 cp等)
  3. 运行./helper.sh up以启动整个基础结构。 气流用户界面将在 localhost:8888。在
  4. Ctrl+C停止气流。然后./helper.sh down来处理 剩余的气流过程(如果一切顺利,就不需要了。 如果由于某些非信息性错误而无法再次启动气流,请运行此程序)。在

。。。在我的项目中使用它?

pip install airflow-livy-operators

导入方法如下:

^{pr2}$

请参阅airflow_home/dags下的示例dag以了解如何使用运算符。在

。。。建立开发环境?

好吧,你想做点贡献,并且需要能够在你的机器上运行这些东西, 以及IDE(调试、语法高亮显示)带来的通常的美好。在

  • 运行./helper.sh dev安装所有的开发依赖项。在
  • ./helper.sh updev使用本地操作员代码运行气流(与 从PyPi中提取它们)。对开发有用。在
  • (特定于Pycharm)将Pycharm指向新创建的虚拟环境:转到 "Preferences" -> "Project: airflow-livy-operators" -> "Project interpreter", select "Existing environment"并从venv文件夹中选择python3可执行文件 (venv/bin/python3
  • ./helper.sh cov-使用覆盖率报告运行测试 (将保存到htmlcov/)。在
  • ./helper.sh lint-突出显示代码样式错误。在
  • ./helper.sh format以重新格式化所有代码。 (此项目依赖Black+ isort
  • ./helper.sh pypi-为PyPi生成包。在

。。。调试?

  • (特定于Pycharm)使用airflow test逐步调试 以及在本地运行PySpark批处理作业(以及调试) 通过.idea/runConfigurations下的运行配置支持。 你不应该做任何事情来使用它们-只要打开文件夹 作为一个项目。在
  • 如何在本地Spark上运行批处理的示例:
python ./batches/join_2_files.py \"file:////Users/vpanov/data/vpanov/bigdata-docker-compose/data/grades.csv"\"file:///Users/vpanov/data/vpanov/bigdata-docker-compose/data/ssn-address.tsv"\
-file1_sep=, -file1_header=true\
-file1_schema="\`Last name\` STRING, \`First name\` STRING, SSN STRING, Test1 INT, Test2 INT, Test3 INT, Test4 INT, Final INT, Grade STRING"\
-file1_join_column=SSN -file2_header=false\
-file2_schema="\`Last name\` STRING, \`First name\` STRING, SSN STRING, Address1 STRING, Address2 STRING"\
-file2_join_column=SSN -output_header=true\
-output_columns="file1.\`Last name\` AS LastName, file1.\`First name\` AS FirstName, file1.SSN, file2.Address1, file2.Address2"# Optionally append to save result to file#-output_path="file:///Users/vpanov/livy_batch_example" 

托多

  • 在助手.sh-替换为现代工具(例如pipenv+Docker image)
  • 禁用一些flake8标志以获得更干净的代码

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
java获取JEditorPane中字符的绝对位置   java Datetime:将时间段拆分为天、小时和分钟   java是使此HashMap更高效的一种方法   java项目reactor:collectList()之后的block()对Flux不起作用。创建()   java在Mac OSX上安装OpenCV   java递归地确定一组数字是否包含两个总和相等的子集   Quad2D曲线上的几何图形Java绘图箭头   java将SSL证书导入Glassfish 4。十、   java Android未找到处理Intent MediaScanner的活动   EclipseJava。安全cert.CertificateParsingException:java。木卫一。IOException:主题密钥,无法创建EC公钥   java我能在O(M log N)时间内完成吗?   java跟踪eclipse中的资源更改也在中。元数据和。项目   java如何完全禁用Android键盘   java返回到上一个活动