<p>我正在使用pytest对我的气流DAG执行完整性测试,这是我当前的文件夹结构:</p>
<pre><code>|-- dags
| |-- 01_lasic_retraining_overview.py
| |-- 02_lasic_retraining_sagemaker_autopilot.py
| |-- 03_lasic_retraining_h20_automl.py
| |-- __init__.py
| `-- common
| |-- __init__.py
| `-- helper.py
|-- docker-compose.yaml
|-- newrelic.ini
|-- plugins
|-- requirements.txt
|-- sample.env
|-- setup.sh
|-- test.sh
`-- tests
|-- common
| `-- test_helper.py
`-- dags
|-- test_02_lasic_retraining_sagemaker_autopilot.py
|-- test_03_lasic_retraining_h20_automl.py
`-- test_dag_integrity.py
</code></pre>
<p>在除<code>01_lasic_retraining_overview.py</code>(非测试)之外的所有DAG中,我将辅助函数从<code>dags/common/helper.py</code>导入到它们,这就是测试失败的原因:</p>
<pre><code>import airflow
from airflow import DAG
from airflow.exceptions import AirflowFailException
from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
> from common.helper import _create_connection, _etl_lasic
E ModuleNotFoundError: No module named 'common'
dags/03_lasic_retraining_h20_automl.py:6: ModuleNotFoundError
=================================== short test summary info ===================================
FAILED tests/dags/test_dag_integrity.py::test_dag_integrity[/Users/yravindranath/algo_lasic2_ct_pipeline/tests/dags/../../dags/02_lasic_retraining_sagemaker_autopilot.py]
FAILED tests/dags/test_dag_integrity.py::test_dag_integrity[/Users/yravindranath/algo_lasic2_ct_pipeline/tests/dags/../../dags/03_lasic_retraining_h20_automl.py]
</code></pre>
<p>现在,这段代码在我的docker容器中运行时没有问题。我尝试过但没有成功的事情:</p>
<ol>
<li>正在将<code>__init__py</code>添加到<code>tests</code>文件夹</李>
<li>正在运行<code>python -m pytest tests/</code></li>
<li>正在删除目录<code>dags</code>中的<code>__init__.py</code>文件</li>
<li>设置<code>PYTHONPATH=. pytest</code></li>
</ol>
完整性测试的代码位于<code>/tests/dags/test_dag_integrity.py</code></h4>
<pre><code>import re
import glob
import importlib.util
import os
import pytest
from airflow.models import DAG
# go to the root dir and browse for any files that match the pattern
# this will find all the dag files
DAG_PATH = os.path.join(
os.path.dirname(__file__),
"..",
"..",
"dags/**/0*.py",
)
# holds a list of all the dag files
DAG_FILES = glob.glob(
DAG_PATH,
recursive=True,
)
# filter the files to exclude the 01 dag run as that is just a plan of the
# pipeline
DAG_FILES = [file for file in DAG_FILES if not re.search("/01", file)]
@pytest.mark.parametrize("dag_file", DAG_FILES)
def test_dag_integrity(dag_file):
# Load file
module_name, _ = os.path.splitext(dag_file)
module_path = os.path.join(DAG_PATH, dag_file)
mod_spec = importlib.util.spec_from_file_location(
module_name,
module_path,
)
module = importlib.util.module_from_spec(
mod_spec, # type: ignore
)
mod_spec.loader.exec_module(module) # type: ignore
# all objects of class DAG found in file
dag_objects = [
var
for var in vars(module).values()
if isinstance(
var,
DAG,
)
]
# check if DAG objects were found in the file
assert dag_objects
# check if there are no cycles in the dags
for dag in dag_objects:
dag.test_cycle() # type: ignore
</code></pre>