在Docker容器中执行Airflow任务
我有一个任务想要定时执行,使用的是 airflow。我在 docker 中运行 airflow,使用的是 airflow docker 教程中提供的 docker-compose.yaml 文件。
我用 docker build -f Dockerfile -t twm_step01
命令构建了这个任务的 docker 镜像。
我的任务是一个 bash 脚本,它会先设置一些目录,以便读取数据,然后再调用 docker run
。
下面的脚本叫做 ex-my-script.sh
,它会读取另一个叫 config.sh
的脚本,这个脚本提供了应该读取和写入的目录路径。
在 docker 容器中执行的另一个脚本是 my-script.sh
,如下面所示。这个脚本会执行另一个 bash 脚本,最后再执行一个最终的 bash 脚本,这个脚本会调用容器中安装的软件程序来写入任务的输出数据。
#!/bin/bash
source scripts/config.sh
in_dir=$event_image_dir
in_ext=$zip_ext
processing_graph_xml=$graph_0
out_dir=$out_step01
out_ext=$dim_ext
lvl_parallelism=$parallel_lvl
data_dir=$data_directory
docker run -it \
-v $(pwd)/write_storage:$data_directory \
twm_step01 \
bash /scripts/my-script.sh \
$in_dir \
$in_ext \
$processing_graph_xml \
$out_dir \
$out_ext \
$lvl_parallelism \
$data_dir
这里是 config.sh
,方便大家理解。
parallel_lvl=4
local_directory=/opt/airflow/tasks
data_directory=/opt/airflow/tasks/write_storage
zip_ext=.zip
dim_ext=.dim
txt_ext=.txt
tif_ext=.tif
shp_ext=.shp
# step01
event_image_dir=Events_Images/2015
graph_0=/snap_graphs/snap_graph_0.xml
out_step01=step01
这是 docker-compose.yaml
中的 volumes 部分,我在这里添加了本地目录。我添加了最后一行 docker.sock
,因为我看到了这个回答: 如何从在 docker 容器中运行的 airflow 中运行 docker 操作任务?
我觉得这和我想做的事情很一致。
volumes:
- ${AIRFLOW_PROJ_DIR:-.}/dags:/opt/airflow/dags
- ${AIRFLOW_PROJ_DIR:-.}/logs:/opt/airflow/logs
- ${AIRFLOW_PROJ_DIR:-.}/config:/opt/airflow/config
- ${AIRFLOW_PROJ_DIR:-.}/plugins:/opt/airflow/plugins
- ${AIRFLOW_PROJ_DIR:-.}/scripts:/opt/airflow/scripts
- ${AIRFLOW_PROJ_DIR:-.}/src:/opt/airflow/src
- ${AIRFLOW_PROJ_DIR:-.}/write_storage:/opt/airflow/tasks/write_storage
- ${AIRFLOW_PROJ_DIR:-.}/snap_graphs:/opt/airflow/snap_graphs
- /var/run/docker.sock:/var/run/docker.sock
我的 dag 看起来是这样的:
import os
from datetime import timedelta, datetime
from airflow.decorators import dag, task
from airflow.operators.bash import BashOperator
from airflow.providers.docker.operators.docker import DockerOperator
@dag(
dag_id="SAR_flooding_demo_docker",
start_date=datetime(24, 1, 15),
schedule="@continuous",
max_active_runs=1,
catchup=False,
default_args={
"retries":0,
"retry_delay": timedelta(minutes=1)
},
description="Testing containerized demo",
tags=["Test"]
)
def demo_runner():
@task
def task_01():
#t1=BashOperator(
# task_id="Task01",
# bash_command='/opt/airflow/scripts/ex-my-script.sh ')
t1 = DockerOperator(
task_id="Task01",
image="twm_step01",
api_version='auto',
auto_remove=True,
command='echo "this is a test message shown from within the container',
docker_url='unix://var/run/docker.sock',
network_mode='bridge'
)
return
task_01()
demo_runner()
我尝试了 BashOperator 和 DockerOperator。DAG 的调度没有问题,也没有失败,但我怀疑有什么地方不对,因为它完成的时间不到一秒。我也在寻找一种方法来检查任务输出的数据是否符合我的预期。
我对 airflow 和 docker 都很陌生,所以我只是尝试我能想到的任何方法。
我在 docker-compose.yaml
的 volumes:
部分中放入了我本地机器上 bash 脚本和输入/输出数据的位置。
如果任务无法执行 bash 脚本,为什么它不失败?如果任务可以执行 bash 脚本,为什么它在不到一秒的时间内就成功了?
这是其中一次“成功”任务运行的日志:
bdb1f78ac8d2
*** Found local files:
*** * /opt/airflow/logs/dag_id=SAR_flooding_demo_docker/run_id=scheduled__2024-03-04T10:00:06.056780+00:00/task_id=task_01/attempt=1.log
[2024-03-04, 10:00:08 UTC] {taskinstance.py:1979} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: SAR_flooding_demo_docker.task_01 scheduled__2024-03-04T10:00:06.056780+00:00 [queued]>
[2024-03-04, 10:00:08 UTC] {taskinstance.py:1979} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: SAR_flooding_demo_docker.task_01 scheduled__2024-03-04T10:00:06.056780+00:00 [queued]>
[2024-03-04, 10:00:08 UTC] {taskinstance.py:2193} INFO - Starting attempt 1 of 1
[2024-03-04, 10:00:08 UTC] {taskinstance.py:2214} INFO - Executing <Task(_PythonDecoratedOperator): task_01> on 2024-03-04 10:00:06.056780+00:00
[2024-03-04, 10:00:08 UTC] {standard_task_runner.py:60} INFO - Started process 699 to run task
[2024-03-04, 10:00:08 UTC] {standard_task_runner.py:87} INFO - Running: ['***', 'tasks', 'run', 'SAR_flooding_demo_docker', 'task_01', 'scheduled__2024-03-04T10:00:06.056780+00:00', '--job-id', '296', '--raw', '--subdir', 'DAGS_FOLDER/SAR_flooding_demonstator_dag.py', '--cfg-path', '/tmp/tmpd36mdp9m']
[2024-03-04, 10:00:08 UTC] {standard_task_runner.py:88} INFO - Job 296: Subtask task_01
[2024-03-04, 10:00:09 UTC] {task_command.py:423} INFO - Running <TaskInstance: SAR_flooding_demo_docker.task_01 scheduled__2024-03-04T10:00:06.056780+00:00 [running]> on host bdb1f78ac8d2
[2024-03-04, 10:00:09 UTC] {taskinstance.py:2510} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='***' AIRFLOW_CTX_DAG_ID='SAR_flooding_demo_docker' AIRFLOW_CTX_TASK_ID='task_01' AIRFLOW_CTX_EXECUTION_DATE='2024-03-04T10:00:06.056780+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='scheduled__2024-03-04T10:00:06.056780+00:00'
[2024-03-04, 10:00:09 UTC] {python.py:202} INFO - Done. Returned value was: None
[2024-03-04, 10:00:09 UTC] {taskinstance.py:1149} INFO - Marking task as SUCCESS. dag_id=SAR_flooding_demo_docker, task_id=task_01, execution_date=20240304T100006, start_date=20240304T100008, end_date=20240304T100009
[2024-03-04, 10:00:09 UTC] {local_task_job_runner.py:234} INFO - Task exited with return code 0
[2024-03-04, 10:00:09 UTC] {taskinstance.py:3309} INFO - 0 downstream tasks scheduled from follow-on schedule check```
1 个回答
你不能在一个任务里面再运行另一个任务,因为内部的任务不会被执行,因为Airflow的调度器和工作者并不知道它的存在。
在你的情况下,你可以直接使用任务流操作符,比如docker,还有
@task.docker(...)
def task_01():
...
在Airflow 2.9.0(还没发布)中,将可以使用任务Bash装饰器
@task.bash(...)
def task_01():
...
或者使用经典的操作符
BashOperator(
task_id="task_01",
...
)
DockerOperator(
task_id="task_01",
...
)