从Tornado调用Celery任务
怎么从tornado里调用一个celery任务,并通过回调获取结果呢?
这篇文章说,只需要通过RabbitMQ发送一条消息,任务就会被执行。这听起来没问题,但有没有人能给个python的例子(最好是用tornado,并带上回调)?我个人是用mongodb作为消息中介,但我也可以换成Redis或者RabbitMQ。
补充一下,我想要一个带回调的例子。比如,这段tornado代码
TestTask.delay(callback = self._on_celery_response)
...
def _on_celery_response(self, result):
print "hello from _on_celery_repsonse" , result
是不能工作的。我的TestTask是:
class TestTask(Task):
name = "tornadoServer.Test"
def run(self, callback=None, **kwargs):
result = {'result': "hello from celery task invoked by tornado"}
if callback is not None:
subtask(callback).delay(result)
return result
还有错误追踪信息:
File "/home/hymloth/Desktop/DJANGO/NOO1/tornadoServer/tornado/stack_context.py", line 183, in wrapped
callback(*args, **kwargs)
File "/home/hymloth/Desktop/DJANGO/NOO1/tornadoServer/asyncmongo/connection.py", line 183, in _parse_response
callback(response)
File "/home/hymloth/Desktop/DJANGO/NOO1/tornadoServer/asyncmongo/cursor.py", line 399, in _handle_response
orig_callback(result['data'], error=None)
File "/home/hymloth/Desktop/DJANGO/NOO1/tornadoServer/basic_auth_handlers.py", line 66, in _on_response
celery_tasks.TestTask.delay(self._on_celery_response)
File "/usr/local/lib/python2.6/dist-packages/celery-2.2.7-py2.6.egg/celery/task/base.py", line 338, in delay
return self.apply_async(args, kwargs)
File "/usr/local/lib/python2.6/dist-packages/celery-2.2.7-py2.6.egg/celery/task/base.py", line 460, in apply_async
**options)
File "/usr/local/lib/python2.6/dist-packages/celery-2.2.7-py2.6.egg/celery/app/amqp.py", line 230, in delay_task
send(body, exchange=exchange, **extract_msg_options(kwargs))
File "/usr/local/lib/python2.6/dist-packages/kombu-1.1.6-py2.6.egg/kombu/compat.py", line 101, in send
return self.publish(*args, **kwargs)
File "/usr/local/lib/python2.6/dist-packages/kombu-1.1.6-py2.6.egg/kombu/messaging.py", line 124, in publish
compression, headers)
File "/usr/local/lib/python2.6/dist-packages/kombu-1.1.6-py2.6.egg/kombu/messaging.py", line 147, in _prepare
body) = encode(body, serializer=serializer)
File "/usr/local/lib/python2.6/dist-packages/kombu-1.1.6-py2.6.egg/kombu/serialization.py", line 119, in encode
payload = encoder(data)
File "/usr/lib/python2.6/copy_reg.py", line 70, in _reduce_ex
raise TypeError, "can't pickle %s objects" % base.__name__
TypeError: can't pickle instancemethod objects
这个任务在没有回调的情况下是可以正常工作的……有什么建议吗?
2 个回答
-1
Celery 有一个回调函数,你可以去这个链接了解更多信息:http://ask.github.com/celery/userguide/tasksets.html
from celery.task import task
from celery.task.sets import subtask
@task
def add(x, y, callback=None):
result = x + y
if callback is not None:
subtask(callback).delay(result)
return result
0
回调对象也应该是一个celery任务,否则你的代码就无法正常工作。
如果你的回调函数不需要是celery任务,你可以在任务内部使用信号。