我在Windows中具有以下目录/文件结构:
Learn Airflow
|
project
|_dags
|_file_mover.py
|_first_dag.py
|_dockerfiles
|_Dockerfile
|_docker-compose.yml
在file_mover.py
中,我有一个简单的脚本,它将一些文件从LocationA
移动到LocationB
。在first_dag.py
中,我有一个脚本正在触发file_mover.py
。因此,当我在终端中执行docker-compose up --build
并检查webserver localhost:8080
时,我确实在气流中看到first_dag
。因此,当我打开DAG时,我希望文件从LocationA
移动到LocationB
e.q.file_mover.py
被触发。。然而,这并没有发生,我也不知道为什么
这是文件\u mover.py
import os
import shutil
location_a = r'c:\data\GG\Desktop\LocationA'
location_b = r'c:\data\GG\Desktop\LocationB'
files = os.listdir(location_a)
for f in files:
file_path = os.path.join(location_a, f)
shutil.move(file_path, location_b)
这是第一次_dag.py
try:
from datetime import timedelta
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
import os
import sys
print('All dag modules are ok.....')
except Exception as e:
print('Error {}'.format(e))
def first_function_execute():
os.system('python c:\data\GG\Desktop\Python Microsoft Visual Studio\Learn Airflow\project\dags\file_mover.py')
with DAG (
dag_id = 'first_dag',
schedule_interval='@daily',
default_args={
'owner': 'airflow',
'retries': 1,
'retry_delay': timedelta(minutes=5),
'start_date': datetime(2021, 1, 1),
},
catchup=False) as f:
first_function_execute = PythonOperator(
task_id='first_function_execute',
python_callable=first_function_execute)
我最终想要的是通过Airflow localhost调度和监视file_mover.py
应用程序,但是上面的尝试似乎不起作用
正如您已经正确处理的那样,应该使用
PythonOperator
来运行脚本。我会将它作为一个函数进行导出,然后您只需从模块中导入它,只要它可以在您的PYTHONPATH中访问即可然后,可以使用默认参数和计划间隔计划DAG:
然后,在启动Airflow之前,您可以将脚本的路径添加到PYTHONPATH中,如下所示:
有关Python操作以及如何向函数传递参数的详细信息:https://airflow.incubator.apache.org/code.html#airflow.operators.PythonOperator
相关问题 更多 >
编程相关推荐