气流DAG调度程序循环在Azure数据工厂上产生高成本

2024-03-28 21:11:02 发布

您现在位置:Python中文网/ 问答频道 /正文

我们的气流DAG在ADF上产生了非常高的成本

气流调度器不断循环以检查DAG是否已更改。在此日志中,我们可以找到ADF管道的列表。每次检查DAG的更改(>;每分钟3次)时,都会查询ADF管道的列表

以下是每天重复的日志项:

[2021-05-10 00:00:06,032] {{logging_mixin.py:112}} INFO - [2021-05-10 00:00:06,031] {{settings.py:253}} INFO - settings.configure_orm(): Using pool settings. pool_size=5, max_overflow=10, pool_recycle=1800, pid=29572
[2021-05-10 00:00:06,035] {{scheduler_job.py:153}} INFO - Started process (PID=29572) to work on /usr/local/airflow/dags/ingestSource.py
[2021-05-10 00:00:06,038] {{scheduler_job.py:1560}} INFO - Processing file /usr/local/airflow/dags/ingestSource.py for tasks to queue
[2021-05-10 00:00:06,038] {{logging_mixin.py:112}} INFO - [2021-05-10 00:00:06,038] {{dagbag.py:403}} INFO - Filling up the DagBag from /usr/local/airflow/dags/ingestSource.py
[2021-05-10 00:00:07,395] {{logging_mixin.py:112}} INFO - Existing data factory pipelines: ['Load Facts', 'Refresh Report', 'Post Staging Logic', 'Load Dimensions', 'etc..']
[2021-05-10 00:00:08,000] {{scheduler_job.py:161}} INFO - Processing /usr/local/airflow/dags/ingestSource.py took 1.965 seconds

DAG的这一部分似乎触发了对ADF的查询:

azure_data_factory_hook = AzureDataFactoryHook(
    subscription_id=os.environ["AZURE_SUBSCRIPTION_ID"],
    resource_group_name=os.environ["AZURE_RESOURCE_GROUP_NAME"],
    factory_name=os.environ["AZURE_DATA_FACTORY_NAME"],
    credentials=MSIAuthentication() 
)

我真的很想知道为什么会触发对ADF的查询,以及如何防止这种情况在一整天的循环中发生

谢谢你的帮助

完整的DAG:

import json
import os
from datetime import datetime, timedelta
from typing import Dict, List

from airflow import DAG
from airflow.hooks.azure_data_factory_plugin import AzureDataFactoryHook
from airflow.operators.azure_data_factory_plugin import AdfPipelineRunOperator
from msrestazure.azure_active_directory import MSIAuthentication

DEFAULT_ARGS = {
    'owner': 'Me',
    'depends_on_past': False,
    'execution_timeout': timedelta(hours=6)
}

dag = DAG(dag_id="IngestSource",
          start_date=datetime(2020, 5, 29),
          default_args=DEFAULT_ARGS,
          max_active_runs=5,
          schedule_interval='0 1 * * *')

azure_data_factory_hook = AzureDataFactoryHook(
    subscription_id=os.environ["AZURE_SUBSCRIPTION_ID"],
    resource_group_name=os.environ["AZURE_RESOURCE_GROUP_NAME"],
    factory_name=os.environ["AZURE_DATA_FACTORY_NAME"],
    credentials=MSIAuthentication() 
)

common_task_params = {
    "dag": dag,
    "azureDataFactoryHook": azure_data_factory_hook,
    "execution_timeout": timedelta(hours=2),
}

ingest_source = AdfPipelineRunOperator(
    **common_task_params,
    task_id="ingestSource",
    pipeline_name="From Source to Target",
    pipeline_parameters=json.dumps()
)

Tags: namefrompyimportinfodataosfactory
1条回答
网友
1楼 · 发布于 2024-03-28 21:11:02

可能钩子中有检索现有ADF管道以写入日志的内容。这个钩子被认为是顶级代码(操作符的execute()方法之外的任何函数、逻辑等),它被认为是在DAG文件中避免的最佳实践。顶层代码在调度程序的每个文件解析间隔执行,因此您会注意到频繁的查询。您应该尝试将钩子移动到execute()方法范围内的自定义操作符中

相关问题 更多 >