Celery调度不工作,Django中出现KeyError

2 投票
1 回答
3936 浏览
提问于 2025-04-18 10:20

环境:django 1.6.5,python 2.7,celery 3.1.11

我想在目标服务器上运行这个项目,但没有安装djcelery。所以,我不打算使用djcelery。我正在按照文档Django的第一步在你的应用中使用Celery来操作。当我运行celery -A djproj -B -l debug时,出现了KeyError的错误。而且在[tasks]中实际上没有目标任务。有没有人知道怎么解决这个问题?谢谢。

错误

[tasks]
  . celery.backend_cleanup
  . celery.chain
  . celery.chord
  . celery.chord_unlock
  . celery.chunks
  . celery.group
  . celery.map
  . celery.starmap
  . djproj.celery.debug_task
  . djproj.celery.defaulttask1
  . djproj.celery.hello

... ...

[2014-06-19 16:08:23,007: INFO/MainProcess] Received task: djproj.celery.hello[794e54c7-62c8-4ad8-bcbb-64ff809366d1]
[2014-06-19 16:08:23,008: DEBUG/MainProcess] TaskPool: Apply <function _fast_trace_task at 0x1036d17d0> (args:('djproj.celery.hello', '794e54c7-62c8-4ad8-bcbb-64ff809366d1', [], {}, {'utc': True, u'is_eager': False, 'chord': None, u'group': None, 'args': [], 'retries': 0, u'delivery_info': {u'priority': 0, u'redelivered': None, u'routing_key': u'celery', u'exchange': u'celery'}, 'expires': None, u'hostname': 'celery@nluckys-Mac.local', 'task': 'djproj.celery.hello', 'callbacks': None, u'correlation_id': u'794e54c7-62c8-4ad8-bcbb-64ff809366d1', 'errbacks': None, 'timelimit': (None, None), 'taskset': None, 'kwargs': {}, 'eta': None, u'reply_to': u'20fe5513-efc2-3e69-9541-8e82758f94f9', 'id': '794e54c7-62c8-4ad8-bcbb-64ff809366d1', u'headers': {}}) kwargs:{})
[2014-06-19 16:08:23,010: WARNING/Worker-2] helloincelery
[2014-06-19 16:08:23,010: ERROR/MainProcess] Received unregistered task of type 'apps.app1.tuan.tuantask1'.
The message has been ignored and discarded.

Did you remember to import the module containing this task?
Or maybe you are using relative imports?
Please see http://bit.ly/gLye1c for more information.

The full contents of the message body was:
{'utc': True, 'chord': None, 'args': [], 'retries': 0, 'expires': None, 'task': 'apps.app1.tuan.tuantask1', 'callbacks': None, 'errbacks': None, 'timelimit': (None, None), 'taskset': None, 'kwargs': {}, 'eta': None, 'id': 'f2f92c1d-ac6d-4131-a029-123fbfc4ab48'} (220b)
Traceback (most recent call last):
  File "/Library/Python/2.7/site-packages/celery/worker/consumer.py", line 455, in on_task_received
    strategies[name](message, body,
KeyError: 'apps.app1.tuan.tuantask1'

调度似乎没有工作。我的完整代码可以在这里找到。下面是一些代码的片段。

我的项目文件夹结构:

djproj
├── apps
│   ├── __init__.py
│   └──  app1
│       ├── __init__.py
│       ├── admin.py
│       ├── models.py
│       ├── tests.py
│       ├── tuan.py
│       └── views.py
├── djproj
│   ├── __init__.py
│   ├── celery.py
│   ├── settings.py
│   ├── urls.py
│   └── wsgi.py
├── files
├── manage.py
└── run.sh

djproj/djproj/__init__.py

from __future__ import absolute_import

# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app

djproj/djproj/celery.py

from __future__ import absolute_import

import os
import datetime
from celery import Celery

from django.conf import settings

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'djproj.settings')

app = Celery('djproj')

# Using a string here means the worker will not have to
# pickle the object when using Windows.
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))

@app.task
def defaulttask1():
    currt = datetime.datetime.now()
    with open("files/defaulttask1.txt", "w") as fo:
        print >> fo, currt.isoformat()+"default_task_1"
        return currt 

@app.task
def hello():
    print "helloincelery" 

djproj/djproj/settings.py

"""
Django settings for djproj project.

For more information on this file, see
https://docs.djangoproject.com/en/1.6/topics/settings/

For the full list of settings and their values, see
https://docs.djangoproject.com/en/1.6/ref/settings/
"""
from __future__ import absolute_import
from celery.schedules import crontab

BROKER_URL = 'redis://localhost:6379/0'

from datetime import timedelta
CELERYBEAT_SCHEDULE = {
    #'defaulttask1': {
    #    'task': 'djproj.celery.defaulttask1',
    #    'schedule': timedelta(seconds=3)
    #}, 
    'hello': {
        'task': 'djproj.celery.hello',
        'schedule': timedelta(seconds=4)
    }, 
    'tuantask1': {
        'task': 'apps.app1.tuan.tuantask1',
        'schedule': timedelta(seconds=6)
    },  
    #'tuantask2': {
    #    'task': 'app1.tuan.tuantask2',
    #    'schedule': crontab(minute=55, hour=17)
    #}
}

TIME_ZONE = 'Asia/Shanghai'

DATETIME_FORMAT = 'Y-m-d H:i:s'

TIME_FORMAT = 'Y-m-d H:i:s'

# Build paths inside the project like this: os.path.join(BASE_DIR, ...)
import os
BASE_DIR = os.path.dirname(os.path.dirname(__file__))


# Quick-start development settings - unsuitable for production
# See https://docs.djangoproject.com/en/1.6/howto/deployment/checklist/

# SECURITY WARNING: keep the secret key used in production secret!
SECRET_KEY = '+df53@zaea16*pa%)kyta=ciam#$1c1&tjx-5!59f+nlo)n#!4'

# SECURITY WARNING: don't run with debug turned on in production!
DEBUG = True

TEMPLATE_DEBUG = True

ALLOWED_HOSTS = []


# Application definition

INSTALLED_APPS = (
    'django.contrib.admin',
    'django.contrib.auth',
    'django.contrib.contenttypes',
    'django.contrib.sessions',
    'django.contrib.messages',
    'django.contrib.staticfiles',
    'apps.app1',
)

MIDDLEWARE_CLASSES = (
    'django.contrib.sessions.middleware.SessionMiddleware',
    'django.middleware.common.CommonMiddleware',
    'django.middleware.csrf.CsrfViewMiddleware',
    'django.contrib.auth.middleware.AuthenticationMiddleware',
    'django.contrib.messages.middleware.MessageMiddleware',
    'django.middleware.clickjacking.XFrameOptionsMiddleware',
)

ROOT_URLCONF = 'djproj.urls'

WSGI_APPLICATION = 'djproj.wsgi.application'


# Database
# https://docs.djangoproject.com/en/1.6/ref/settings/#databases

DATABASES = {
    'default': {
        'ENGINE': 'django.db.backends.sqlite3',
        'NAME': os.path.join(BASE_DIR, 'db.sqlite3'),
    }
}

# Internationalization
# https://docs.djangoproject.com/en/1.6/topics/i18n/

LANGUAGE_CODE = 'zh_CN'

#TIME_ZONE = 'UTC'

USE_I18N = True

USE_L10N = True

USE_TZ = True


# Static files (CSS, JavaScript, Images)
# https://docs.djangoproject.com/en/1.6/howto/static-files/

STATIC_URL = '/static/'

djproj/apps/app1/tuan.py

from __future__ import absolute_import

#from djproj.celery import app
from celery import shared_task
import datetime

#@shared_task
#def add(x, y):
#    return x + y

#@shared_task
#def mul(x, y):
#    return x * y

#@shared_task
#def xsum(numbers):
#    return sum(numbers)

@shared_task
def tuantask1():
    currt = datetime.datetime.now()
    with open("files/tuantask1.txt", "w") as fo:
        print >> fo, currt.isoformat()+"tuantask_1"
    return currt.isoformat()

@shared_task
def tuantask2():
    print "stest===="

djproj/run.sh

celery -A djproj worker -B -l debug

1 个回答

0

我知道发生了什么。我要把 tuan.py 改成 tasks.py,因为在 celery.py 里,autodiscover_tasks 只能找到名为 tasks.py 的文件中的任务。如果不改文件名的话,可以手动导入任务,方法是在 settings.py 里使用 CELERY_IMPORTS,并且要用绝对路径(比如 'apps.app1.tuan')。具体可以参考 文档

a common practice for reusable apps is to define all tasks in a separate 
tasks.py module, and Celery does have a way to autodiscover these modules

|

This way you do not have to manually add the individual modules to the 
CELERY_IMPORTS setting.

撰写回答