Celery任务链和访问**kwargs
我遇到的情况跟这里说的有点像,不过我想做的不是用多个参数来连接任务,而是连接那些返回多个条目的字典的任务。
这就是我想要做的事情,虽然说得有点抽象:
tasks.py
@task()
def task1(item1=None, item2=None):
item3 = #do some stuff with item1 and item2 to yield item3
return_object = dict(item1=item1, item2=item2, item3=item3)
return return_object
def task2(item1=None, item2=None, item3=None):
item4 = #do something with item1, item2, item3 to yield item4
return_object = dict(item1=item1, item2=item2, item3=item3, item4=item4)
return return_object
在ipython环境下,我可以单独并且异步地调用task1,没遇到任何问题。
我也可以单独调用task2,并把task1返回的结果作为双星参数传给它:
>>res1 = task1.s(item1=something, item2=something_else).apply_async()
>>res1.status
'SUCCESS'
>>res2 = task2.s(**res1.result).apply_async()
>>res2.status
'SUCCESS
但是,我最终想要的结果是和上面一样,但通过任务链来实现。在这里,我搞不清楚怎么让task2用task1.result作为**kwargs来实例化,而不是用task1返回的(位置)参数:
chain_result = (task1.s(item1=something, item2=something_else) | task2.s()).apply_async() #THIS DOESN'T WORK!
我怀疑我可以回去重写我的任务,让它们返回位置参数,而不是字典,这样可能会解决问题。但我觉得应该有办法在task2中访问task1的返回对象,功能上等同于**双星参数。我也觉得我可能在Celery子任务的实现或者*args和**kwargs之间的区别上漏掉了一些明显的东西。
希望这些能让人明白。提前感谢任何建议。
3 个回答
因为这个功能在celery里没有自带,所以我自己写了一个装饰器函数,做了类似的事情。
# Use this wrapper with functions in chains that return a tuple. The
# next function in the chain will get called with that the contents of
# tuple as (first) positional args, rather than just as just the first
# arg. Note that both the sending and receiving function must have
# this wrapper, which goes between the @task decorator and the
# function definition. This wrapper should not otherwise interfere
# when these conditions are not met.
class UnwrapMe(object):
def __init__(self, contents):
self.contents = contents
def __call__(self):
return self.contents
def wrap_for_chain(f):
""" Too much deep magic. """
@functools.wraps(f)
def _wrapper(*args, **kwargs):
if type(args[0]) == UnwrapMe:
args = list(args[0]()) + list(args[1:])
result = f(*args, **kwargs)
if type(result) == tuple and current_task.request.callbacks:
return UnwrapMe(result)
else:
return result
return _wrapper
我的这个装饰器的工作方式像是starchain
的概念,不过你也可以很简单地修改它,让它处理关键字参数(kwargs)。
这是我对这个问题的看法,我使用了一个抽象任务类:
from __future__ import absolute_import
from celery import Task
from myapp.tasks.celery import app
class ChainedTask(Task):
abstract = True
def __call__(self, *args, **kwargs):
if len(args) == 1 and isinstance(args[0], dict):
kwargs.update(args[0])
args = ()
return super(ChainedTask, self).__call__(*args, **kwargs)
@app.task(base=ChainedTask)
def task1(x, y):
return {'x': x * 2, 'y': y * 2, 'z': x * y}
@app.task(base=ChainedTask)
def task2(x, y, z):
return {'x': x * 3, 'y': y * 3, 'z': z * 2}
现在你可以像这样定义和执行你的任务链:
from celery import chain
pipe = chain(task1.s(x=1, y=2) | task2.s())
pipe.apply_async()
chain
和其他画布基本元素属于一些功能性工具的家族,类似于 map
和 reduce
。
比如说,map(target, items)
会对列表中的每一个项目调用 target(item)
。而在 Python 中,有一个不太常用的版本叫 itertools.starmap
,它会对每个项目调用 target(*item)
。
虽然我们可以在工具箱里添加 starchain
甚至 kwstarchain
,但这些会非常专业,可能用得不多。
有趣的是,Python 通过列表和生成器表达式让这些变得不必要,因此 map
被替换成了 [target(item) for item in items]
,而 starmap
被替换成了 [target(*item) for item in items]
。
所以,与其为每个基本元素实现多个替代方案,我认为我们应该专注于找到一种更灵活的支持方式,比如使用类似于 celery 的生成器表达式(如果可能的话,如果不行,也要找一些同样强大的东西)。