如何在IDE中导入自定义操作符和插件而不出错

2024-09-20 22:19:14 发布

您现在位置:Python中文网/ 问答频道 /正文

我不是一个专业的python开发人员,这就是为什么我要概述我的步骤

安装部件

我为apache airflow创建了一个目录~/Desktop/airflow,并制作了

export AIRFLOW_HOME=~/Desktop/airflow

然后我使用

python3 -m venv ~/Desktop/airflow

结果是

enter image description here

然后我做了

source bin/activate

pip3 install apache-airflow==1.10.9

airflow initdb

结果是

enter image description here

在我的aiffort.cfg文件中,我检查了DAG和插件目录。我在$AIRFLOW\u HOME/Desktop/AIRFLOW中创建了DAG和插件目录

我启动了airflow Web服务器调度程序,确保一切正常

自定义插件部件

我找到了很多方法来创建气流插件。我尝试了所有可能的方法。让我们开始吧

第一个是在项目中创建一个插件文件夹(First\u plugin),然后创建一个python文件(First\u operator.py

import logging

from airflow.operators import BaseOperator
from airflow.utils.decorators import apply_defaults
from airflow.plugins_manager import AirflowPlugin

log = logging.getLogger(__name__)


class FirstOperator(BaseOperator):

    @apply_defaults
    def __init__(self, *args, **kwargs):
        super(FirstOperator, self).__init__(*args, **kwargs)

    def execute(self, context):
        log.info("Hello World!")


class FirstOperatorPlugin(AirflowPlugin):
    name = "first_plugin"
    operators = [FirstOperator]

看起来像

enter image description here

然后我只需将我的插件文件夹(first_plugin)移动到$AIRFLOW_HOME/DESKTOP/AIRFLOW/plugins,然后重新启动AIRFLOW Web服务器和调度程序

现在是使用自定义操作符创建自定义dag的时候了。如何正确导入插件是一个挑战。有很多方法可以导入自定义运算符。我会展示我的努力

  1. 来自airflow.operators导入FirstOperator-已弃用
  2. 来自airflow.operators.first\u插件导入FirstOperator
  3. 来自airflow.operators.first_operator导入FirstOperator
  4. 从first_plugin.first_操作符导入FirstOperator

在Pycharm IDE中导入时,这些方法都没有帮助我。比如说,

from airflow.operators.first_plugin import FirstOperator

enter image description here

但我确信,如果我忽略导入行并将自定义dag放入dags文件夹,它将正常工作。(我试过了)。此外,我决定检查气流日志(在调试模式下)

重新启动airflow Web服务器时看到的日志

enter image description here

我花了两天时间,仍然没有任何解决办法。也许你们让我试试别的方法。我试过了

https://www.astronomer.io/guides/airflow-importing-custom-hooks-operators/https://www.astronomer.io/guides/airflow-importing-custom-hooks-operators/

https://pybit.es/introduction-airflow.html

所有这些都是可行的方法,但没有一个能解决我的IDE导入问题


Tags: 方法fromimport目录文件夹插件webhome
3条回答

我们不需要创建文件夹plugin和添加custom operators。使用Airflow 2.0+,我们只需要在项目中创建一个文件夹并将其导入dag文件中

airflow_project
+ custom_operator
   + first_operator.py
+ dags
   + dag_1.py

在dag_1.py中,可以将自定义操作符/挂钩作为以下内容导入:

from custom_operator.first_operator import FirstOperator

我绕过了插件步骤,用纯python的方式完成了它。这似乎也是最新版本(1.10.12)的文档所说的

这是我的文件夹结构:

AIRFLOW_HOME
├── dags
│   ├── __init__.py
│   └── my_dag.py
└── utils
    ├── __init__.py
    └── custom_operator.py

确保PYTHONPATH包含AIRFLOW_HOME路径。然后你可以像这样导入它:

from custom_operator.hello_operator import HelloOperator

阅读更多信息:https://airflow.apache.org/docs/stable/howto/custom-operator.html#creating-a-custom-operator

这是对我有效的文件夹结构,请确保自定义运算符位于operators文件夹中,与sensorshooks相同。init.py文件应为空

plugins
├── __init__.py
├── operators
│   ├── __init__.py
│   ├── glue_crawler_operator.py
│   └── gsheet_to_redshift_operator.py
└── sensors
    ├── __init__.py
    └── glue_crawler_sensor.py

还要检查airflow.cfg文件中的plugins_folder是否指向您的插件文件夹

例如,要将gsheet_导入到_redshift_运算符而不出错,我使用了以下语句:

from operators.gsheet_to_redshift_operator import GsheetToRSOperator

相关问题 更多 >

    热门问题