芹菜任务和定制d

2024-04-26 01:03:59 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在做一个使用django和芹菜(django芹菜)的项目。我们的团队决定在(app-name)/manager.py中包装所有数据访问代码(而不是像django那样包装到管理器中),并让代码进入(app name)/task.py,只处理用芹菜组装和执行的任务(因此我们在这一层没有django ORM依赖关系)。

在我的manager.py中,我有这样的东西:

def get_tag(tag_name):
    ctype = ContentType.objects.get_for_model(Photo)
    try:
        tag = Tag.objects.get(name=tag_name)
    except ObjectDoesNotExist:
        return Tag.objects.none()
    return tag

def get_tagged_photos(tag):
    ctype = ContentType.objects.get_for_model(Photo)
    return TaggedItem.objects.filter(content_type__pk=ctype.pk, tag__pk=tag.pk)

def get_tagged_photos_count(tag):
    return get_tagged_photos(tag).count()

在我的task.py中,我喜欢将它们包装成任务(然后可能使用这些任务来执行更复杂的任务),因此我编写了这个装饰器:

import manager #the module within same app containing data access functions

class mfunc_to_task(object):
    def __init__(mfunc_type='get'):
        self.mfunc_type = mfunc_type

    def __call__(self, f):
        def wrapper_f(*args, **kwargs):
            callback = kwargs.pop('callback', None)

            mfunc = getattr(manager, f.__name__)

            result = mfunc(*args, **kwargs)
            if callback:
                if self.mfunc_type == 'get':
                    subtask(callback).delay(result)
                elif self.mfunc_type == 'get_or_create':
                    subtask(callback).delay(result[0])
                else:
                    subtask(callback).delay()
            return result            

        return wrapper_f

然后(仍在task.py中):

#@task
@mfunc_to_task()
def get_tag():
    pass

#@task
@mfunc_to_task()
def get_tagged_photos():
    pass

#@task
@mfunc_to_task()
def get_tagged_photos_count():
    pass

没有@task一切都很好。 但是,在应用了@taskdecorator(按照芹菜文档的说明放在顶部)之后,事情就开始崩溃了。显然,每次调用mfunc_to_task.__call__时,相同的task.get_tag函数都作为f传递。所以我每次都得到相同的标签,现在我唯一能做的就是得到一个标签。

我是新来的装修师。有谁能帮助我理解这里出了什么问题,或者指出其他方法来完成任务?我真的不喜欢为我的每个数据访问函数编写相同的任务包装代码。


Tags: todjangonamepytaskgetreturnobjects
2条回答

为什么不创建扩展celery.Task的基类而不是使用decorator?

这样,所有任务都可以扩展自定义任务类,在该类中,您可以使用方法__call__after_return实现您的个人行为 . 还可以为所有任务定义通用方法和对象。

class MyCoolTask(celery.Task):

    def __call__(self, *args, **kwargs):
        """In celery task this function call the run method, here you can
        set some environment variable before the run of the task"""
        return self.run(*args, **kwargs)

    def after_return(self, status, retval, task_id, args, kwargs, einfo):
        #exit point of the task whatever is the state
        pass

不太清楚为什么传递论点行不通?

如果使用此示例:

@task()
def add(x, y):
    return x + y

让我们向mycoltask添加一些日志:

from celery import task
from celery.registry import tasks

import logging
import celery

logger = logging.getLogger(__name__)

class MyCoolTask(celery.Task):

    def __call__(self, *args, **kwargs):
        """In celery task this function call the run method, here you can
        set some environment variable before the run of the task"""
        logger.info("Starting to run")
        return self.run(*args, **kwargs)

    def after_return(self, status, retval, task_id, args, kwargs, einfo):
        #exit point of the task whatever is the state
        logger.info("Ending run")
        pass

并创建一个扩展类(扩展mycoltask,但现在使用参数):

class AddTask(MyCoolTask):

    def run(self,x,y):
        if x and y:
            result=add(x,y)
            logger.info('result = %d' % result)
            return result
        else:
            logger.error('No x or y in arguments')

tasks.register(AddTask)

并确保将kwargs作为json数据传递:

{"x":8,"y":9}

我得到的结果是:

[2013-03-05 17:30:25,853: INFO/MainProcess] Starting to run
[2013-03-05 17:30:25,855: INFO/MainProcess] result = 17
[2013-03-05 17:30:26,739: INFO/MainProcess] Ending run
[2013-03-05 17:30:26,741: INFO/MainProcess] Task iamscheduler.tasks.AddTask[6a62641d-16a6-44b6-a1cf-7d4bdc8ea9e0] succeeded in 0.888684988022s: 17

相关问题 更多 >

    热门问题