在Docker容器中执行Airflow任务

1 投票
1 回答
59 浏览
提问于 2025-04-14 18:30

我有一个任务想要定时执行,使用的是 airflow。我在 docker 中运行 airflow,使用的是 airflow docker 教程中提供的 docker-compose.yaml 文件。

我用 docker build -f Dockerfile -t twm_step01 命令构建了这个任务的 docker 镜像。

我的任务是一个 bash 脚本,它会先设置一些目录,以便读取数据,然后再调用 docker run

下面的脚本叫做 ex-my-script.sh,它会读取另一个叫 config.sh 的脚本,这个脚本提供了应该读取和写入的目录路径。

在 docker 容器中执行的另一个脚本是 my-script.sh,如下面所示。这个脚本会执行另一个 bash 脚本,最后再执行一个最终的 bash 脚本,这个脚本会调用容器中安装的软件程序来写入任务的输出数据。

#!/bin/bash

source scripts/config.sh

in_dir=$event_image_dir
in_ext=$zip_ext
processing_graph_xml=$graph_0
out_dir=$out_step01
out_ext=$dim_ext
lvl_parallelism=$parallel_lvl
data_dir=$data_directory

docker run -it \
        -v $(pwd)/write_storage:$data_directory \
        twm_step01 \
                bash /scripts/my-script.sh \
                        $in_dir \
                        $in_ext \
                        $processing_graph_xml \
                        $out_dir \
                        $out_ext \
                        $lvl_parallelism \
                        $data_dir

这里是 config.sh,方便大家理解。

parallel_lvl=4

local_directory=/opt/airflow/tasks
data_directory=/opt/airflow/tasks/write_storage

zip_ext=.zip
dim_ext=.dim
txt_ext=.txt
tif_ext=.tif
shp_ext=.shp

# step01
event_image_dir=Events_Images/2015
graph_0=/snap_graphs/snap_graph_0.xml
out_step01=step01

这是 docker-compose.yaml 中的 volumes 部分,我在这里添加了本地目录。我添加了最后一行 docker.sock,因为我看到了这个回答: 如何从在 docker 容器中运行的 airflow 中运行 docker 操作任务?

我觉得这和我想做的事情很一致。

volumes:
    - ${AIRFLOW_PROJ_DIR:-.}/dags:/opt/airflow/dags
    - ${AIRFLOW_PROJ_DIR:-.}/logs:/opt/airflow/logs
    - ${AIRFLOW_PROJ_DIR:-.}/config:/opt/airflow/config
    - ${AIRFLOW_PROJ_DIR:-.}/plugins:/opt/airflow/plugins
    - ${AIRFLOW_PROJ_DIR:-.}/scripts:/opt/airflow/scripts
    - ${AIRFLOW_PROJ_DIR:-.}/src:/opt/airflow/src
    - ${AIRFLOW_PROJ_DIR:-.}/write_storage:/opt/airflow/tasks/write_storage
    - ${AIRFLOW_PROJ_DIR:-.}/snap_graphs:/opt/airflow/snap_graphs
    - /var/run/docker.sock:/var/run/docker.sock

我的 dag 看起来是这样的:

import os
from datetime import timedelta, datetime
from airflow.decorators import dag, task
from airflow.operators.bash import BashOperator
from airflow.providers.docker.operators.docker import DockerOperator

@dag(
    dag_id="SAR_flooding_demo_docker",
    start_date=datetime(24, 1, 15),
    schedule="@continuous",
    max_active_runs=1,
    catchup=False,
    default_args={
        "retries":0,
        "retry_delay": timedelta(minutes=1)
    },
    description="Testing containerized demo",
    tags=["Test"]
)
def demo_runner():

    @task
    def task_01():
        #t1=BashOperator(
        #    task_id="Task01",
        #    bash_command='/opt/airflow/scripts/ex-my-script.sh ')
        t1 = DockerOperator(
        task_id="Task01",
        image="twm_step01",
        api_version='auto',
        auto_remove=True,
        command='echo "this is a test message shown from within the container',
        docker_url='unix://var/run/docker.sock',
        network_mode='bridge'
        )
        return

    task_01()

demo_runner()

我尝试了 BashOperator 和 DockerOperator。DAG 的调度没有问题,也没有失败,但我怀疑有什么地方不对,因为它完成的时间不到一秒。我也在寻找一种方法来检查任务输出的数据是否符合我的预期。

我对 airflow 和 docker 都很陌生,所以我只是尝试我能想到的任何方法。

我在 docker-compose.yamlvolumes: 部分中放入了我本地机器上 bash 脚本和输入/输出数据的位置。

如果任务无法执行 bash 脚本,为什么它不失败?如果任务可以执行 bash 脚本,为什么它在不到一秒的时间内就成功了?

这是其中一次“成功”任务运行的日志:

bdb1f78ac8d2
*** Found local files:
***   * /opt/airflow/logs/dag_id=SAR_flooding_demo_docker/run_id=scheduled__2024-03-04T10:00:06.056780+00:00/task_id=task_01/attempt=1.log
[2024-03-04, 10:00:08 UTC] {taskinstance.py:1979} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: SAR_flooding_demo_docker.task_01 scheduled__2024-03-04T10:00:06.056780+00:00 [queued]>
[2024-03-04, 10:00:08 UTC] {taskinstance.py:1979} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: SAR_flooding_demo_docker.task_01 scheduled__2024-03-04T10:00:06.056780+00:00 [queued]>
[2024-03-04, 10:00:08 UTC] {taskinstance.py:2193} INFO - Starting attempt 1 of 1
[2024-03-04, 10:00:08 UTC] {taskinstance.py:2214} INFO - Executing <Task(_PythonDecoratedOperator): task_01> on 2024-03-04 10:00:06.056780+00:00
[2024-03-04, 10:00:08 UTC] {standard_task_runner.py:60} INFO - Started process 699 to run task
[2024-03-04, 10:00:08 UTC] {standard_task_runner.py:87} INFO - Running: ['***', 'tasks', 'run', 'SAR_flooding_demo_docker', 'task_01', 'scheduled__2024-03-04T10:00:06.056780+00:00', '--job-id', '296', '--raw', '--subdir', 'DAGS_FOLDER/SAR_flooding_demonstator_dag.py', '--cfg-path', '/tmp/tmpd36mdp9m']
[2024-03-04, 10:00:08 UTC] {standard_task_runner.py:88} INFO - Job 296: Subtask task_01
[2024-03-04, 10:00:09 UTC] {task_command.py:423} INFO - Running <TaskInstance: SAR_flooding_demo_docker.task_01 scheduled__2024-03-04T10:00:06.056780+00:00 [running]> on host bdb1f78ac8d2
[2024-03-04, 10:00:09 UTC] {taskinstance.py:2510} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='***' AIRFLOW_CTX_DAG_ID='SAR_flooding_demo_docker' AIRFLOW_CTX_TASK_ID='task_01' AIRFLOW_CTX_EXECUTION_DATE='2024-03-04T10:00:06.056780+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='scheduled__2024-03-04T10:00:06.056780+00:00'
[2024-03-04, 10:00:09 UTC] {python.py:202} INFO - Done. Returned value was: None
[2024-03-04, 10:00:09 UTC] {taskinstance.py:1149} INFO - Marking task as SUCCESS. dag_id=SAR_flooding_demo_docker, task_id=task_01, execution_date=20240304T100006, start_date=20240304T100008, end_date=20240304T100009
[2024-03-04, 10:00:09 UTC] {local_task_job_runner.py:234} INFO - Task exited with return code 0
[2024-03-04, 10:00:09 UTC] {taskinstance.py:3309} INFO - 0 downstream tasks scheduled from follow-on schedule check```

1 个回答

1

你不能在一个任务里面再运行另一个任务,因为内部的任务不会被执行,因为Airflow的调度器和工作者并不知道它的存在。

在你的情况下,你可以直接使用任务流操作符,比如docker,还有

任务Docker装饰器

@task.docker(...)
def task_01():
    ...

在Airflow 2.9.0(还没发布)中,将可以使用任务Bash装饰器

@task.bash(...)
def task_01():
    ...

或者使用经典的操作符

BashOperator(
    task_id="task_01",
    ...
)
DockerOperator(
    task_id="task_01",
    ...
)

撰写回答