创建后有没有方法编辑气流操作符?

2024-04-26 20:23:45 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一个python脚本,它基于映射每个所需选项的JSON文件动态创建task(flow operator)和DAG。 该脚本还专用函数来创建所需的任何运算符。 有时我想根据映射激活一些条件选项。。。例如,在bigqueryOperator中,有时我需要一个time_分区和一个destination_表,但我不想在每个映射的任务上都进行设置。在

我试图阅读有关BaseOperator的文档,但是我看不到任何类似java的set方法。在

函数返回运算符,例如bigQuery

def bqOperator(mappedTask):
    try:
        return BigQueryOperator(
        task_id=mappedTask.get('task_id'),
        sql=mappedTask.get('sql'),  
##destination_dataset_table=project+'.'+dataset+'.'+mappedTask.get('target'),
        write_disposition=mappedTask.get('write_disposition'),
        allow_large_results=mappedTask.get('allow_large_results'),
        ##time_partitioning=mappedTask.get('time_partitioning'),
        use_legacy_sql=mappedTask.get('use_legacy_sql'),
        dag=dag,
        )
    except Exception as e:
        error = 'Error creating BigQueryOperator for task : ' + mappedTask.get('task_id')
        logger.error(error)
        raise Exception(error) 

json文件中的mappedTask没有分区

^{pr2}$

带有分区的json文件内的mappedTask

^{3}$

Tags: 文件函数脚本idtasksqlgettime
2条回答

如下所示更改bqOperator来处理这种情况,当它在json中找不到该字段时,它基本上不会传递任何消息:

def bqOperator(mappedTask):
    try:
        return BigQueryOperator(
        task_id=mappedTask.get('task_id'),
        sql=mappedTask.get('sql'),  
        destination_dataset_table="{}.{}.{}".format(project, dataset, mappedTask.get('target')) if mappedTask.get('target', None)  else None,
        write_disposition=mappedTask.get('write_disposition'),
        allow_large_results=mappedTask.get('allow_large_results'),
        time_partitioning=mappedTask.get('time_partitioning', None),
        use_legacy_sql=mappedTask.get('use_legacy_sql'),
        dag=dag,
        )
    except Exception as e:
        error = 'Error creating BigQueryOperator for task : ' + mappedTask.get('task_id')
        logger.error(error)
        raise Exception(error) 

python中没有私有的方法或字段,因此可以直接设置和获取

op.use_legacy_sql = True

鉴于我强烈反对这样做,因为这是一个真正的代码气味。相反,您可以修改factory类以将一些默认值应用于json数据。 应用json,甚至更好地应用于json本身。而不是保存并使用更新的json。这将使事情变得更可预测。在

相关问题 更多 >