Celery任务与自定义装饰器
我正在做一个项目,使用的是django和celery(django-celery)。我们团队决定把所有的数据访问代码放在(app-name)/manager.py
里(不是像django
那样用管理器),而让(app-name)/task.py
里的代码只负责组装和执行celery的任务(这样我们在这一层就不需要依赖django的ORM了)。
在我的manager.py
里,有这样的代码:
def get_tag(tag_name):
ctype = ContentType.objects.get_for_model(Photo)
try:
tag = Tag.objects.get(name=tag_name)
except ObjectDoesNotExist:
return Tag.objects.none()
return tag
def get_tagged_photos(tag):
ctype = ContentType.objects.get_for_model(Photo)
return TaggedItem.objects.filter(content_type__pk=ctype.pk, tag__pk=tag.pk)
def get_tagged_photos_count(tag):
return get_tagged_photos(tag).count()
在我的task.py
里,我想把这些代码封装成任务(然后可能用这些任务来做更复杂的事情),所以我写了这个装饰器:
import manager #the module within same app containing data access functions
class mfunc_to_task(object):
def __init__(mfunc_type='get'):
self.mfunc_type = mfunc_type
def __call__(self, f):
def wrapper_f(*args, **kwargs):
callback = kwargs.pop('callback', None)
mfunc = getattr(manager, f.__name__)
result = mfunc(*args, **kwargs)
if callback:
if self.mfunc_type == 'get':
subtask(callback).delay(result)
elif self.mfunc_type == 'get_or_create':
subtask(callback).delay(result[0])
else:
subtask(callback).delay()
return result
return wrapper_f
然后(仍然在task.py
里):
#@task
@mfunc_to_task()
def get_tag():
pass
#@task
@mfunc_to_task()
def get_tagged_photos():
pass
#@task
@mfunc_to_task()
def get_tagged_photos_count():
pass
没有使用@task
的时候,一切都运行得很好。但是,当我按照celery的文档把@task
装饰器加到最上面后,事情就开始出问题了。显然,每次调用mfunc_to_task.__call__
时,都会把同一个task.get_tag
函数传递给f
。所以我每次得到的wrapper_f
都是一样的,现在我唯一能做的就是获取一个单一的标签。
我对装饰器还不太了解。有没有人能帮我理解这里出了什么问题,或者指出其他实现这个任务的方法?我真的不想为每个数据访问函数都写一遍相同的任务包装代码。
2 个回答
10
为什么不直接创建一个基类来继承 celery.Task
,而不是使用装饰器呢?
这样的话,你的所有任务都可以继承你自定义的任务类。在这个类里,你可以通过 __call__
和 after_return
方法来实现你想要的功能。
你还可以为所有任务定义一些共同的方法和对象。
class MyCoolTask(celery.Task):
def __call__(self, *args, **kwargs):
"""In celery task this function call the run method, here you can
set some environment variable before the run of the task"""
return self.run(*args, **kwargs)
def after_return(self, status, retval, task_id, args, kwargs, einfo):
#exit point of the task whatever is the state
pass
20
不太明白为什么传递参数不行?
如果你用这个例子:
@task()
def add(x, y):
return x + y
我们来给 MyCoolTask 加点日志记录:
from celery import task
from celery.registry import tasks
import logging
import celery
logger = logging.getLogger(__name__)
class MyCoolTask(celery.Task):
def __call__(self, *args, **kwargs):
"""In celery task this function call the run method, here you can
set some environment variable before the run of the task"""
logger.info("Starting to run")
return self.run(*args, **kwargs)
def after_return(self, status, retval, task_id, args, kwargs, einfo):
#exit point of the task whatever is the state
logger.info("Ending run")
pass
然后创建一个扩展类(继承 MyCoolTask,但这次带参数):
class AddTask(MyCoolTask):
def run(self,x,y):
if x and y:
result=add(x,y)
logger.info('result = %d' % result)
return result
else:
logger.error('No x or y in arguments')
tasks.register(AddTask)
确保你把参数以 JSON 数据的形式传递:
{"x":8,"y":9}
我得到的结果是:
[2013-03-05 17:30:25,853: INFO/MainProcess] Starting to run
[2013-03-05 17:30:25,855: INFO/MainProcess] result = 17
[2013-03-05 17:30:26,739: INFO/MainProcess] Ending run
[2013-03-05 17:30:26,741: INFO/MainProcess] Task iamscheduler.tasks.AddTask[6a62641d-16a6-44b6-a1cf-7d4bdc8ea9e0] succeeded in 0.888684988022s: 17