在Celery中使用块处理对象或整数序列

5 投票
1 回答
3444 浏览
提问于 2025-04-18 10:41

当你使用 task.chunks 处理一系列的序列(比如:字符串列表)时,

my_task.chunks(['a', 'b', 'c', 'd'], 2).delay() 

一切都运行得很好。不过,如果你传入其他类型的序列(比如:整数或对象),

my_task.chunks([1, 2, 3, 4], 2).delay()

就会出现这个异常

[2014-06-22 16:50:27,970: ERROR/MainProcess] Task celery.starmap[44b20b4c-ef69-4c68-bd9c-e1de42c34c49] raised unexpected: TypeError('my_task object argument after * must be a sequence, not int',)
Traceback (most recent call last):
  File "/venv/local/lib/python2.7/site-packages/celery/app/trace.py", line 240, in trace_task
    R = retval = fun(*args, **kwargs)
  File "/venv/local/lib/python2.7/site-packages/celery/app/trace.py", line 437, in protected_call
    return self.run(*args, **kwargs)
  File "/venv/local/lib/python2.7/site-packages/celery/app/builtins.py", line 125, in xstarmap
    return [task(*item) for item in it]
TypeError: my_task object argument after * must be a sequence, not int

my_task 目前是个简单的任务

@shared_task
def my_task(word):
    print word

我该如何用 task.chunks 来处理非序列的序列呢?

1 个回答

13

把它改成:

my_task.chunks([(1,), (2,), (3,), (4,)], 2).delay()

根据错误信息的提示,你的可迭代对象中的每个参数都应该是一个序列,这样才能拆分成你想调用的函数的参数。

那么,为什么 ['a', 'b', 'c', 'd'] 能正常工作呢?

这其实是个巧合,因为字符串也是可迭代的。所以 "a" 被转换成了 ("a", ) 这个序列。

如果你尝试使用任何一个字符超过一个的值,你会得到另一个错误,因为你的任务只期望一个参数。

你应该把它改成:

my_task.chunks([('a',), ('b',), ('c',), ('d',)], 2).delay() 

如果你需要把现有的列表转换成元组的列表:

>>> nums = [1, 2, 3, 4]
>>> [(num, ) for num in nums]
[(1,), (2,), (3,), (4,)]
>>> chars = ["a", "b", "c", "d"]
>>> [(c,) for c in chars]
[('a',), ('b',), ('c',), ('d',)]

撰写回答