如何在Airflow中调度Python脚本?

2024-03-29 10:49:34 发布

您现在位置:Python中文网/ 问答频道 /正文

我在Windows中具有以下目录/文件结构:

Learn Airflow
        |
        project
           |_dags
               |_file_mover.py
               |_first_dag.py
            |_dockerfiles
                |_Dockerfile
            |_docker-compose.yml

file_mover.py中,我有一个简单的脚本,它将一些文件从LocationA移动到LocationB。在first_dag.py中,我有一个脚本正在触发file_mover.py。因此,当我在终端中执行docker-compose up --build并检查webserver localhost:8080时,我确实在气流中看到first_dag。因此,当我打开DAG时,我希望文件从LocationA移动到LocationBe.q.file_mover.py被触发。。然而,这并没有发生,我也不知道为什么

这是文件\u mover.py

import os
import shutil  

location_a = r'c:\data\GG\Desktop\LocationA'
location_b = r'c:\data\GG\Desktop\LocationB'

files = os.listdir(location_a)

for f in files:
    file_path = os.path.join(location_a, f)
    shutil.move(file_path, location_b)

这是第一次_dag.py

    try: 
     from datetime import timedelta
     from airflow import DAG
     from airflow.operators.python_operator import PythonOperator
     from datetime import datetime
     import os
     import sys
     print('All dag modules are ok.....')

except Exception as e:
    print('Error {}'.format(e))

def first_function_execute():
     os.system('python c:\data\GG\Desktop\Python Microsoft Visual Studio\Learn Airflow\project\dags\file_mover.py')
  
with DAG (
     dag_id = 'first_dag',
     schedule_interval='@daily',
     default_args={
          'owner': 'airflow',
          'retries': 1,
          'retry_delay':  timedelta(minutes=5), 
          'start_date': datetime(2021, 1, 1),
     },
     catchup=False) as f:

     first_function_execute = PythonOperator(
          task_id='first_function_execute',
          python_callable=first_function_execute)
      

我最终想要的是通过Airflow localhost调度和监视file_mover.py应用程序,但是上面的尝试似乎不起作用


Tags: 文件frompyimportexecutedatetimeosfunction
1条回答
网友
1楼 · 发布于 2024-03-29 10:49:34

正如您已经正确处理的那样,应该使用PythonOperator来运行脚本。我会将它作为一个函数进行导出,然后您只需从模块中导入它,只要它可以在您的PYTHONPATH中访问即可

import os
import shutil  

def move(location_a, location_b):
    files = os.listdir(location_a)
    for f in files:
        file_path = os.path.join(location_a, f)
        shutil.move(file_path, location_b)

然后,可以使用默认参数和计划间隔计划DAG:

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

from my_script import my_python_function

default_args = {
        'owner': 'airflow',
        'depends_on_past': False,
        'start_date': datetime.today(),
        'email': ['airflow@airflow.com'],
        'email_on_failure': False,
        'email_on_retry': False,
        'retries': 1,
        'retry_delay': timedelta(minutes=5),
}

dag = DAG('tutorial', default_args=default_args,schedule_interval="* * * * *")

PythonOperator(dag=dag,
               task_id='my_move_task',
               provide_context=False,
               python_callable=move,
               op_args=['arguments_passed_to_callable'],
               op_kwargs={'keyword_argument':'which will be passed to function'})

然后,在启动Airflow之前,您可以将脚本的路径添加到PYTHONPATH中,如下所示:

export PYTHONPATH=/path/to/my/scripts/dir/:$PYTHONPATH

有关Python操作以及如何向函数传递参数的详细信息:https://airflow.incubator.apache.org/code.html#airflow.operators.PythonOperator

相关问题 更多 >