如何使用airflows ssh_操作符执行nohup命令?

2024-04-20 12:23:10 发布

您现在位置:Python中文网/ 问答频道 /正文

我是airflow的新手,正在尝试使用airflow的ssh_操作符在ec2实例上运行作业,如下所示:

t2 = SSHOperator(
    ssh_conn_id='ec2_ssh_connection',
    task_id='execute_script',
    command="nohup python test.py &",
    retries=3,
    dag=dag)

这项工作需要几个小时,我希望执行python脚本并结束。但是,当执行命令且dag完成时,脚本将在ec2实例上终止。我还注意到上面的代码没有创建nohup.out文件

我正在研究如何使用SSHOperator运行nohup。这似乎是一个与python相关的问题,因为在执行nohup时,我在EC2脚本上遇到以下错误:

[Errno 32] Broken pipe

谢谢


Tags: 实例脚本idtask作业connectionec2conn
1条回答
网友
1楼 · 发布于 2024-04-20 12:23:10

Airflow的SSHHook使用Paramiko模块进行SSH连接。关于普拉米科和{}有{a1}。其中一个答案建议在nohup命令之后添加sleep。我无法确切解释原因,但它确实有效。还需要在SSHOperator中设置get_pty=True

下面是一个完整的示例,演示了解决方案:

from datetime import datetime
from airflow import DAG
from airflow.contrib.operators.ssh_operator import SSHOperator


default_args = {
    'start_date': datetime(2001, 2, 3, 4, 0),
}

with DAG(
    'a_dag', schedule_interval=None, default_args=default_args, catchup=False,
) as dag:
    op = SSHOperator(
        task_id='ssh',
        ssh_conn_id='ssh_default',
        command=(
            'nohup python -c "import time;time.sleep(30);print(1)" & sleep 10'
        ),
        get_pty=True,  # This is needed!
    )

nohup.out文件被写入用户的$HOME

相关问题 更多 >