Celery调度不工作,Django中出现KeyError
环境:django 1.6.5,python 2.7,celery 3.1.11
我想在目标服务器上运行这个项目,但没有安装djcelery。所以,我不打算使用djcelery。我正在按照文档Django的第一步和在你的应用中使用Celery来操作。当我运行celery -A djproj -B -l debug
时,出现了KeyError的错误。而且在[tasks]
中实际上没有目标任务。有没有人知道怎么解决这个问题?谢谢。
错误
[tasks]
. celery.backend_cleanup
. celery.chain
. celery.chord
. celery.chord_unlock
. celery.chunks
. celery.group
. celery.map
. celery.starmap
. djproj.celery.debug_task
. djproj.celery.defaulttask1
. djproj.celery.hello
... ...
[2014-06-19 16:08:23,007: INFO/MainProcess] Received task: djproj.celery.hello[794e54c7-62c8-4ad8-bcbb-64ff809366d1]
[2014-06-19 16:08:23,008: DEBUG/MainProcess] TaskPool: Apply <function _fast_trace_task at 0x1036d17d0> (args:('djproj.celery.hello', '794e54c7-62c8-4ad8-bcbb-64ff809366d1', [], {}, {'utc': True, u'is_eager': False, 'chord': None, u'group': None, 'args': [], 'retries': 0, u'delivery_info': {u'priority': 0, u'redelivered': None, u'routing_key': u'celery', u'exchange': u'celery'}, 'expires': None, u'hostname': 'celery@nluckys-Mac.local', 'task': 'djproj.celery.hello', 'callbacks': None, u'correlation_id': u'794e54c7-62c8-4ad8-bcbb-64ff809366d1', 'errbacks': None, 'timelimit': (None, None), 'taskset': None, 'kwargs': {}, 'eta': None, u'reply_to': u'20fe5513-efc2-3e69-9541-8e82758f94f9', 'id': '794e54c7-62c8-4ad8-bcbb-64ff809366d1', u'headers': {}}) kwargs:{})
[2014-06-19 16:08:23,010: WARNING/Worker-2] helloincelery
[2014-06-19 16:08:23,010: ERROR/MainProcess] Received unregistered task of type 'apps.app1.tuan.tuantask1'.
The message has been ignored and discarded.
Did you remember to import the module containing this task?
Or maybe you are using relative imports?
Please see http://bit.ly/gLye1c for more information.
The full contents of the message body was:
{'utc': True, 'chord': None, 'args': [], 'retries': 0, 'expires': None, 'task': 'apps.app1.tuan.tuantask1', 'callbacks': None, 'errbacks': None, 'timelimit': (None, None), 'taskset': None, 'kwargs': {}, 'eta': None, 'id': 'f2f92c1d-ac6d-4131-a029-123fbfc4ab48'} (220b)
Traceback (most recent call last):
File "/Library/Python/2.7/site-packages/celery/worker/consumer.py", line 455, in on_task_received
strategies[name](message, body,
KeyError: 'apps.app1.tuan.tuantask1'
调度似乎没有工作。我的完整代码可以在这里找到。下面是一些代码的片段。
我的项目文件夹结构:
djproj
├── apps
│ ├── __init__.py
│ └── app1
│ ├── __init__.py
│ ├── admin.py
│ ├── models.py
│ ├── tests.py
│ ├── tuan.py
│ └── views.py
├── djproj
│ ├── __init__.py
│ ├── celery.py
│ ├── settings.py
│ ├── urls.py
│ └── wsgi.py
├── files
├── manage.py
└── run.sh
djproj/djproj/__init__.py
from __future__ import absolute_import
# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app
djproj/djproj/celery.py
from __future__ import absolute_import
import os
import datetime
from celery import Celery
from django.conf import settings
# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'djproj.settings')
app = Celery('djproj')
# Using a string here means the worker will not have to
# pickle the object when using Windows.
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
@app.task(bind=True)
def debug_task(self):
print('Request: {0!r}'.format(self.request))
@app.task
def defaulttask1():
currt = datetime.datetime.now()
with open("files/defaulttask1.txt", "w") as fo:
print >> fo, currt.isoformat()+"default_task_1"
return currt
@app.task
def hello():
print "helloincelery"
djproj/djproj/settings.py
"""
Django settings for djproj project.
For more information on this file, see
https://docs.djangoproject.com/en/1.6/topics/settings/
For the full list of settings and their values, see
https://docs.djangoproject.com/en/1.6/ref/settings/
"""
from __future__ import absolute_import
from celery.schedules import crontab
BROKER_URL = 'redis://localhost:6379/0'
from datetime import timedelta
CELERYBEAT_SCHEDULE = {
#'defaulttask1': {
# 'task': 'djproj.celery.defaulttask1',
# 'schedule': timedelta(seconds=3)
#},
'hello': {
'task': 'djproj.celery.hello',
'schedule': timedelta(seconds=4)
},
'tuantask1': {
'task': 'apps.app1.tuan.tuantask1',
'schedule': timedelta(seconds=6)
},
#'tuantask2': {
# 'task': 'app1.tuan.tuantask2',
# 'schedule': crontab(minute=55, hour=17)
#}
}
TIME_ZONE = 'Asia/Shanghai'
DATETIME_FORMAT = 'Y-m-d H:i:s'
TIME_FORMAT = 'Y-m-d H:i:s'
# Build paths inside the project like this: os.path.join(BASE_DIR, ...)
import os
BASE_DIR = os.path.dirname(os.path.dirname(__file__))
# Quick-start development settings - unsuitable for production
# See https://docs.djangoproject.com/en/1.6/howto/deployment/checklist/
# SECURITY WARNING: keep the secret key used in production secret!
SECRET_KEY = '+df53@zaea16*pa%)kyta=ciam#$1c1&tjx-5!59f+nlo)n#!4'
# SECURITY WARNING: don't run with debug turned on in production!
DEBUG = True
TEMPLATE_DEBUG = True
ALLOWED_HOSTS = []
# Application definition
INSTALLED_APPS = (
'django.contrib.admin',
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.messages',
'django.contrib.staticfiles',
'apps.app1',
)
MIDDLEWARE_CLASSES = (
'django.contrib.sessions.middleware.SessionMiddleware',
'django.middleware.common.CommonMiddleware',
'django.middleware.csrf.CsrfViewMiddleware',
'django.contrib.auth.middleware.AuthenticationMiddleware',
'django.contrib.messages.middleware.MessageMiddleware',
'django.middleware.clickjacking.XFrameOptionsMiddleware',
)
ROOT_URLCONF = 'djproj.urls'
WSGI_APPLICATION = 'djproj.wsgi.application'
# Database
# https://docs.djangoproject.com/en/1.6/ref/settings/#databases
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.sqlite3',
'NAME': os.path.join(BASE_DIR, 'db.sqlite3'),
}
}
# Internationalization
# https://docs.djangoproject.com/en/1.6/topics/i18n/
LANGUAGE_CODE = 'zh_CN'
#TIME_ZONE = 'UTC'
USE_I18N = True
USE_L10N = True
USE_TZ = True
# Static files (CSS, JavaScript, Images)
# https://docs.djangoproject.com/en/1.6/howto/static-files/
STATIC_URL = '/static/'
djproj/apps/app1/tuan.py
from __future__ import absolute_import
#from djproj.celery import app
from celery import shared_task
import datetime
#@shared_task
#def add(x, y):
# return x + y
#@shared_task
#def mul(x, y):
# return x * y
#@shared_task
#def xsum(numbers):
# return sum(numbers)
@shared_task
def tuantask1():
currt = datetime.datetime.now()
with open("files/tuantask1.txt", "w") as fo:
print >> fo, currt.isoformat()+"tuantask_1"
return currt.isoformat()
@shared_task
def tuantask2():
print "stest===="
djproj/run.sh
celery -A djproj worker -B -l debug
1 个回答
0
我知道发生了什么。我要把 tuan.py
改成 tasks.py
,因为在 celery.py
里,autodiscover_tasks
只能找到名为 tasks.py
的文件中的任务。如果不改文件名的话,可以手动导入任务,方法是在 settings.py
里使用 CELERY_IMPORTS
,并且要用绝对路径(比如 'apps.app1.tuan')。具体可以参考 文档。
a common practice for reusable apps is to define all tasks in a separate
tasks.py module, and Celery does have a way to autodiscover these modules
|
This way you do not have to manually add the individual modules to the
CELERY_IMPORTS setting.