龙卷芹菜整合攻略

2024-05-15 07:42:57 发布

您现在位置:Python中文网/ 问答频道 /正文

由于没有人提供解决方案this post加上我迫切需要一个解决方案,这里是我的情况和一些抽象的解决方案/想法供讨论。

我的堆栈:

  1. 龙卷风
  2. 芹菜
  3. MongoDB
  4. Redis公司
  5. 拉比马克

我的问题是:找到一种方法,让Tornado发送芹菜任务(已解决),然后异步收集结果(有什么想法?)。

场景1:(请求/响应黑客加webhook)

  • Tornado接收(用户)请求,然后在本地内存(或Redis)中保存{jobID:(用户)请求}以记住在哪里传播响应,并使用jobID启动芹菜任务
  • 当芹菜完成任务时,它会在某个url执行webhook,并告诉tornado这个jobID已经完成(加上结果)
  • Tornado检索(用户)请求并将响应转发给(用户)

这会发生吗?有什么逻辑吗?

场景2:(龙卷风加长轮询)

  • Tornado分派芹菜任务并将一些主json数据返回给客户端(jQuery)
  • jQuery在接收到主json时会进行一些长时间的轮询,例如,每x微秒进行一次,而tornado会根据某个数据库标志进行回复。当芹菜任务完成时,此数据库标志设置为True,然后jQuery“loop”完成。

这有效吗?

还有其他想法/模式吗?


Tags: 用户redis数据库json标志场景webhook解决方案
3条回答

我的解决方案包括从龙卷风到芹菜的投票:

class CeleryHandler(tornado.web.RequestHandlerr):

    @tornado.web.asynchronous
    def get(self):    

        task = yourCeleryTask.delay(**kwargs)

        def check_celery_task():
            if task.ready():
                self.write({'success':True} )
                self.set_header("Content-Type", "application/json")  
                self.finish()
            else:   
                tornado.ioloop.IOLoop.instance().add_timeout(datetime.timedelta(0.00001), check_celery_task)

        tornado.ioloop.IOLoop.instance().add_timeout(datetime.timedelta(0.00001), check_celery_task)

这是关于它的post

这是我们解决问题的办法。由于我们在应用程序中的几个处理程序中查找结果,所以我们将芹菜查找设置为mixin类。

这也使得代码在使用tornado.gen模式时更具可读性。

from functools import partial

class CeleryResultMixin(object):
    """
    Adds a callback function which could wait for the result asynchronously
    """
    def wait_for_result(self, task, callback):
        if task.ready():
            callback(task.result)
        else:
            # TODO: Is this going to be too demanding on the result backend ?
            # Probably there should be a timeout before each add_callback
            tornado.ioloop.IOLoop.instance().add_callback(
                partial(self.wait_for_result, task, callback)
            )


class ARemoteTaskHandler(CeleryResultMixin, tornado.web.RequestHandler):
    """Execute a task asynchronously over a celery worker.
    Wait for the result without blocking
    When the result is available send it back
    """
    @tornado.web.asynchronous
    @tornado.web.authenticated
    @tornado.gen.engine
    def post(self):
        """Test the provided Magento connection
        """
        task = expensive_task.delay(
            self.get_argument('somearg'),
        )

        result = yield tornado.gen.Task(self.wait_for_result, task)

        self.write({
            'success': True,
            'result': result.some_value
        })
        self.finish()

我无意中发现了这个问题,反复点击结果后端对我来说并不理想。所以我使用Unix套接字实现了一个类似于场景1的Mixin。

它会在任务完成后立即通知Tornado(准确地说,在链中的下一个任务运行时通知Tornado),并且只对后端结果命中一次。这是link

相关问题 更多 >