我不是一个专业的python开发人员,这就是为什么我要概述我的步骤
我为apache airflow创建了一个目录~/Desktop/airflow,并制作了
export AIRFLOW_HOME=~/Desktop/airflow
然后我使用
python3 -m venv ~/Desktop/airflow
结果是
然后我做了
source bin/activate
pip3 install apache-airflow==1.10.9
airflow initdb
结果是
在我的aiffort.cfg文件中,我检查了DAG和插件目录。我在$AIRFLOW\u HOME/Desktop/AIRFLOW中创建了DAG和插件目录
我启动了airflow Web服务器和调度程序,确保一切正常
我找到了很多方法来创建气流插件。我尝试了所有可能的方法。让我们开始吧
第一个是在项目中创建一个插件文件夹(First\u plugin),然后创建一个python文件(First\u operator.py)
import logging
from airflow.operators import BaseOperator
from airflow.utils.decorators import apply_defaults
from airflow.plugins_manager import AirflowPlugin
log = logging.getLogger(__name__)
class FirstOperator(BaseOperator):
@apply_defaults
def __init__(self, *args, **kwargs):
super(FirstOperator, self).__init__(*args, **kwargs)
def execute(self, context):
log.info("Hello World!")
class FirstOperatorPlugin(AirflowPlugin):
name = "first_plugin"
operators = [FirstOperator]
看起来像
然后我只需将我的插件文件夹(first_plugin)移动到$AIRFLOW_HOME/DESKTOP/AIRFLOW/plugins,然后重新启动AIRFLOW Web服务器和调度程序
现在是使用自定义操作符创建自定义dag的时候了。如何正确导入插件是一个挑战。有很多方法可以导入自定义运算符。我会展示我的努力
在Pycharm IDE中导入时,这些方法都没有帮助我。比如说,
from airflow.operators.first_plugin import FirstOperator
但我确信,如果我忽略导入行并将自定义dag放入dags文件夹,它将正常工作。(我试过了)。此外,我决定检查气流日志(在调试模式下)
重新启动airflow Web服务器时看到的日志
我花了两天时间,仍然没有任何解决办法。也许你们让我试试别的方法。我试过了
https://pybit.es/introduction-airflow.html
所有这些都是可行的方法,但没有一个能解决我的IDE导入问题
我们不需要创建文件夹
plugin
和添加custom operators
。使用Airflow 2.0+,我们只需要在项目中创建一个文件夹并将其导入dag文件中在dag_1.py中,可以将自定义操作符/挂钩作为以下内容导入:
我绕过了插件步骤,用纯python的方式完成了它。这似乎也是最新版本(1.10.12)的文档所说的
这是我的文件夹结构:
确保
PYTHONPATH
包含AIRFLOW_HOME
路径。然后你可以像这样导入它:阅读更多信息:https://airflow.apache.org/docs/stable/howto/custom-operator.html#creating-a-custom-operator
这是对我有效的文件夹结构,请确保自定义运算符位于
operators
文件夹中,与sensors
和hooks
相同。init.py文件应为空还要检查
airflow.cfg
文件中的plugins_folder
是否指向您的插件文件夹例如,要将gsheet_导入到_redshift_运算符而不出错,我使用了以下语句:
from operators.gsheet_to_redshift_operator import GsheetToRSOperator
相关问题 更多 >
编程相关推荐