Celery定时任务方法不起作用

3 投票
1 回答
1908 浏览
提问于 2025-04-17 13:37

我正在尝试在一个方法任务上运行celerybeat,但一直无法正常工作。这里有一个示例设置:

from celery.contrib.methods import task_method
from celery import Celery, current_app

celery=celery('tasks', broker='amqp://guest@localhost//')
celery.config_from_object("celeryconfig")
class X(object):
    @celery.task(filter=task_method, name="X.ppp")
    def ppp(self):
        print "ppp"

我的celeryconfig.py文件是

from datetime import timedelta
CELERYBEAT_SCHEDULE = {
      'test' : {
               'task' : 'X.ppp', 
               'schedule' : timedelta(seconds=5)
               }, 
 }

当我运行 celery beat 时,出现了这样的错误:

 task X.ppp raised exception, TypeError('ppp() takes exactly 1 argument, (0 given)  

当我把这个方法改成普通函数,并加上 `@celery.task` 装饰器时,它就能正常工作,所以其他的设置看起来没问题。我在文档中看到了关于方法任务的一些注意事项 ,但我还是搞不清楚问题出在哪里。有没有人知道怎么解决这个问题?

1 个回答

4

问题是,Celerybeat 在调用方法之前不会创建 X 的实例。如果方法没有绑定到某个对象上,任务方法的过滤器默认会调用这个未绑定的方法。

我想问的是,你想在这里实现什么呢?X 没有状态,那为什么不直接使用模块级别的函数呢?

撰写回答