Tornado与Celery集成技巧

12 投票
4 回答
7675 浏览
提问于 2025-04-17 06:42

因为没有人给出解决方案来回应这个帖子,而且我急需一个变通办法,所以我来分享一下我的情况和一些抽象的解决思路,供大家讨论。

我的技术栈:

  1. Tornado
  2. Celery
  3. MongoDB
  4. Redis
  5. RabbitMQ

我的问题是:如何让Tornado去发送一个celery任务(这个已经解决了),然后异步地获取结果(有没有什么想法?)。

场景1:(请求/响应的变通方法加上Webhook)

  • Tornado接收到一个(用户)请求后,会在本地内存(或者Redis)中保存一个{ jobID : (用户)请求},以便记住如何发送响应,然后启动一个celery任务并附上jobID。
  • 当celery完成任务后,它会在某个网址上进行Webhook,告诉Tornado这个jobID的任务已经完成(以及结果)。
  • Tornado再取回(用户)请求,并将响应转发给(用户)。

这样可以实现吗?这样做有道理吗?

场景2:(Tornado加上长轮询)

  • Tornado发送celery任务,并返回一些初步的json数据给客户端(jQuery)。
  • jQuery在收到初步的json后,会进行长轮询,比如每隔x微秒请求一次,Tornado根据某个数据库标志来回复。当celery任务完成时,这个数据库标志会被设置为True,然后jQuery的“循环”就结束了。

这样做有效吗?

还有其他的想法或方案吗?

4 个回答

8

这是我们解决这个问题的方法。因为我们在应用程序中需要在多个处理器中查找结果,所以我们把celery的查找功能做成了一个混合类。

这样做还让代码更容易阅读,使用了tornado.gen的模式。

from functools import partial

class CeleryResultMixin(object):
    """
    Adds a callback function which could wait for the result asynchronously
    """
    def wait_for_result(self, task, callback):
        if task.ready():
            callback(task.result)
        else:
            # TODO: Is this going to be too demanding on the result backend ?
            # Probably there should be a timeout before each add_callback
            tornado.ioloop.IOLoop.instance().add_callback(
                partial(self.wait_for_result, task, callback)
            )


class ARemoteTaskHandler(CeleryResultMixin, tornado.web.RequestHandler):
    """Execute a task asynchronously over a celery worker.
    Wait for the result without blocking
    When the result is available send it back
    """
    @tornado.web.asynchronous
    @tornado.web.authenticated
    @tornado.gen.engine
    def post(self):
        """Test the provided Magento connection
        """
        task = expensive_task.delay(
            self.get_argument('somearg'),
        )

        result = yield tornado.gen.Task(self.wait_for_result, task)

        self.write({
            'success': True,
            'result': result.some_value
        })
        self.finish()
9

我的解决方案是让tornado去询问celery的状态:

class CeleryHandler(tornado.web.RequestHandlerr):

    @tornado.web.asynchronous
    def get(self):    

        task = yourCeleryTask.delay(**kwargs)

        def check_celery_task():
            if task.ready():
                self.write({'success':True} )
                self.set_header("Content-Type", "application/json")  
                self.finish()
            else:   
                tornado.ioloop.IOLoop.instance().add_timeout(datetime.timedelta(0.00001), check_celery_task)

        tornado.ioloop.IOLoop.instance().add_timeout(datetime.timedelta(0.00001), check_celery_task)

这里有一篇关于这个内容的文章

4

我偶然看到这个问题,反复访问结果后端对我来说感觉不是个好主意。所以我实现了一个类似于你Scenario 1的Mixin,使用了Unix套接字。

它会在任务完成后立即通知Tornado(准确来说,是在链中的下一个任务运行时),并且只访问一次结果后端。这里有个链接

撰写回答