使用Python、Flask和Celery进行并发异步处理

7 投票
3 回答
9044 浏览
提问于 2025-04-17 14:05

我正在开发一个小型但计算量很大的Python应用。这个计算密集型的工作可以分成几个部分,同时执行。我想找一个合适的技术栈来实现这个目标。

目前我打算使用Flask框架,搭配Apache2和WSGI,再加上Celery来处理任务队列。

接下来,如果有3个或更多的工作进程可用,a_long_process()another_long_process()yet_another_long_process()会同时执行吗?在这些进程执行时,Flask应用会被阻塞吗?

来自Flask应用的代码:

@myapp.route('/foo')
def bar():
    task_1 = a_long_process.delay(x, y)
    task_1_result = task_1.get(timeout=1)
    task_2 = another_long_process.delay(x, y)
    task_2_result = task_2.get(timeout=1)
    task_3 = yet_another_long_process.delay(x, y)
    task_3_result = task_3.get(timeout=1)
    return task_1 + task_2 + task_3

任务处理的代码:

from celery import Celery
celery = Celery('tasks', broker="amqp://guest@localhost//", backend="amqp://")
@celery.task
def a_long_process(x, y):
    return something
@celery.task
def another_long_process(x, y):
    return something_else
@celery.task
def yet_another_long_process(x, y):
    return a_third_thing

3 个回答

0

使用celery画布中的Group功能:

Group是一个可以同时执行多个任务的功能,它接受一个任务列表。

下面是文档中提供的示例:

from celery import group
from proj.tasks import add

g = group(add.s(2, 2), add.s(4, 4))
res = g()
res.get()

这个示例的输出是 [4, 8]

1

根据文档,result.get()这个方法会等到结果准备好后再返回,所以通常情况下它是会阻塞的,也就是说在结果没出来之前,程序会停在那里等。不过,因为你设置了timeout=1,如果任务超过1秒还没完成,调用get()就会抛出一个超时错误。

默认情况下,Celery的工作进程会根据可用的CPU数量来设置并发级别。这个并发级别决定了可以用来处理任务的线程数量。所以,如果并发级别大于等于3,Celery的工作进程应该能够同时处理这么多任务(也许有更懂Celery的人可以确认这一点?)。

7

你应该修改你的代码,让多个工作者可以同时工作:

@myapp.route('/foo')
def bar():
    # start tasks
    task_1 = a_long_process.delay(x, y)
    task_2 = another_long_process.delay(x, y)
    task_3 = yet_another_long_process.delay(x, y)
    # fetch results
    try:
        task_1_result = task_1.get(timeout=1)
        task_2_result = task_2.get(timeout=1)
        task_3_result = task_3.get(timeout=1)
    except TimeoutError:
        # Handle this or don't specify a timeout.
        raise
    # combine results
    return task_1 + task_2 + task_3

这段代码会一直等到所有结果都出来(或者超时)才继续。

在这些进程执行的时候,Flask应用会被阻塞吗?

这段代码只会阻塞你WSGI容器中的一个工作者。整个网站是否会无响应,取决于你使用的WSGI容器(比如Apache + mod_wsgi、uWSGI、gunicorn等)。大多数WSGI容器会启动多个工作者,所以在你的代码等待任务结果的时候,只有一个工作者会被阻塞。

对于这种应用,我建议使用gevent,它为每个请求启动一个独立的绿色线程,非常轻量。

撰写回答