pytest/celery失败:不允许访问数据库

2024-05-15 05:31:01 发布

您现在位置:Python中文网/ 问答频道 /正文

我在让pytest测试celery任务时遇到了一些问题,该任务是django项目的一部分。我能让它工作的唯一方法是在测试方法中导入任务。在

# tasks.py
from __future__ import absolute_import
import logging
import django
django.setup() # flake8: noqa
from .celery import app
from .models import Run
from celery.exceptions import TaskError


@app.task(bind=True, throws=(TaskError,))
def get_runs_task(self, testrun_id=None):
    ...
    return True

作品:

^{pr2}$

显示Failed: Database access not allowed, use the "django_db" mark...错误:

# test_tasks.py
# flake8: noqa
from django.test import override_settings
import pytest
from ami.tasks import get_runs_task


@override_settings(CELERY_ALWAYS_EAGER=True)
@pytest.mark.django_db
def test_get_runs_one():
    result = get_runs_task.delay(testrun_id='123456')
    assert result.successful()

任何帮助都将不胜感激,谢谢。在

更新了:(还有更多字段,为了简洁起见,我省略了)

# models.py
from django.db import models


class Run(models.Model):
    run_id = models.CharField(
        max_length=15,
        verbose_name='Run ID')

# test_models.py
from ami.models import Run
from django.core.exceptions import ValidationError
import pytest

@pytest.fixture
def create_good_run():
    run = Run.objects.create(
        run_id='12345'
    return run

@pytest.mark.django_db
def test_inherit_values():
    run = create_good_run()
    assert run

我在tasks.py文件中有import django和{},因为我得到的是{},我把它们移到了下面的celery.py中:

# celery.py
from __future__ import absolute_import
import os
import django
django.setup() # flake8: noqa
from celery import Celery

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'amiproj.settings')

from django.conf import settings  # flake8: noqa

app = Celery('ami')

app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

if __name__ == '__main__':
    app.start()

Tags: djangorunfrompytestimportappsettings
1条回答
网友
1楼 · 发布于 2024-05-15 05:31:01

尝试使用pytest-djangosettings夹具:

import pytest
from ami.tasks import get_runs_task


@pytest.mark.django_db
def test_get_runs_one(settings):
    settings.CELERY_ALWAYS_EAGER = True
    result = get_runs_task.delay(testrun_id='123456')
    assert result.successful()

我打赌Django的settings test helper与pytest的工作方式不符。在

有关settings夹具的信息,请参见^{} documentation。在

相关问题 更多 >

    热门问题