我正在使用pytest对我的气流DAG执行完整性测试,这是我当前的文件夹结构:
|-- dags
| |-- 01_lasic_retraining_overview.py
| |-- 02_lasic_retraining_sagemaker_autopilot.py
| |-- 03_lasic_retraining_h20_automl.py
| |-- __init__.py
| `-- common
| |-- __init__.py
| `-- helper.py
|-- docker-compose.yaml
|-- newrelic.ini
|-- plugins
|-- requirements.txt
|-- sample.env
|-- setup.sh
|-- test.sh
`-- tests
|-- common
| `-- test_helper.py
`-- dags
|-- test_02_lasic_retraining_sagemaker_autopilot.py
|-- test_03_lasic_retraining_h20_automl.py
`-- test_dag_integrity.py
在除01_lasic_retraining_overview.py
(非测试)之外的所有DAG中,我将辅助函数从dags/common/helper.py
导入到它们,这就是测试失败的原因:
import airflow
from airflow import DAG
from airflow.exceptions import AirflowFailException
from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
> from common.helper import _create_connection, _etl_lasic
E ModuleNotFoundError: No module named 'common'
dags/03_lasic_retraining_h20_automl.py:6: ModuleNotFoundError
=================================== short test summary info ===================================
FAILED tests/dags/test_dag_integrity.py::test_dag_integrity[/Users/yravindranath/algo_lasic2_ct_pipeline/tests/dags/../../dags/02_lasic_retraining_sagemaker_autopilot.py]
FAILED tests/dags/test_dag_integrity.py::test_dag_integrity[/Users/yravindranath/algo_lasic2_ct_pipeline/tests/dags/../../dags/03_lasic_retraining_h20_automl.py]
现在,这段代码在我的docker容器中运行时没有问题。我尝试过但没有成功的事情:
__init__py
添加到tests
文件夹李>python -m pytest tests/
dags
中的__init__.py
文件PYTHONPATH=. pytest
/tests/dags/test_dag_integrity.py
import re
import glob
import importlib.util
import os
import pytest
from airflow.models import DAG
# go to the root dir and browse for any files that match the pattern
# this will find all the dag files
DAG_PATH = os.path.join(
os.path.dirname(__file__),
"..",
"..",
"dags/**/0*.py",
)
# holds a list of all the dag files
DAG_FILES = glob.glob(
DAG_PATH,
recursive=True,
)
# filter the files to exclude the 01 dag run as that is just a plan of the
# pipeline
DAG_FILES = [file for file in DAG_FILES if not re.search("/01", file)]
@pytest.mark.parametrize("dag_file", DAG_FILES)
def test_dag_integrity(dag_file):
# Load file
module_name, _ = os.path.splitext(dag_file)
module_path = os.path.join(DAG_PATH, dag_file)
mod_spec = importlib.util.spec_from_file_location(
module_name,
module_path,
)
module = importlib.util.module_from_spec(
mod_spec, # type: ignore
)
mod_spec.loader.exec_module(module) # type: ignore
# all objects of class DAG found in file
dag_objects = [
var
for var in vars(module).values()
if isinstance(
var,
DAG,
)
]
# check if DAG objects were found in the file
assert dag_objects
# check if there are no cycles in the dags
for dag in dag_objects:
dag.test_cycle() # type: ignore
你需要检查你的
PYTHONPATH
是什么。您的PYTHONPATH
中可能没有dags
。很可能您的PYTHONPATH
指向了文件结构的根目录,因此导入它的“公共”文件夹的正确方法是与您的通用测试代码类似
Python(甚至Python3)没有很好的机制来导入与当前加载的文件相关的内容。即使存在“相对”导入(前面有“.”),它们也会让人困惑,并且工作方式与您认为的不同。避免使用它们。只需确保您的安全
同时避免将PYTHONPATH设置为“”。它使您的导入工作因当前目录而异。最好的方法是设置一次并导出
上面的命令将
PYTHONPATH
设置为您当前所在的目录,并将其设置为绝对路径我还在Docker容器中运行应用程序,@Jarek Potiuk提供的答案在实际运行DAG时不起作用,因此我使用超级黑客方法,只包括在Docker中工作的导入部件和在本地工作的部件
在这里抛出一个疯狂的想法,尝试将
__init__.py
同时添加到*/dag
或*/common
和*/tests
相关问题 更多 >
编程相关推荐