使用Python、Flask和Celery进行并发异步处理
我正在开发一个小型但计算量很大的Python应用。这个计算密集型的工作可以分成几个部分,同时执行。我想找一个合适的技术栈来实现这个目标。
目前我打算使用Flask框架,搭配Apache2和WSGI,再加上Celery来处理任务队列。
接下来,如果有3个或更多的工作进程可用,a_long_process()
、another_long_process()
和yet_another_long_process()
会同时执行吗?在这些进程执行时,Flask应用会被阻塞吗?
来自Flask应用的代码:
@myapp.route('/foo')
def bar():
task_1 = a_long_process.delay(x, y)
task_1_result = task_1.get(timeout=1)
task_2 = another_long_process.delay(x, y)
task_2_result = task_2.get(timeout=1)
task_3 = yet_another_long_process.delay(x, y)
task_3_result = task_3.get(timeout=1)
return task_1 + task_2 + task_3
任务处理的代码:
from celery import Celery
celery = Celery('tasks', broker="amqp://guest@localhost//", backend="amqp://")
@celery.task
def a_long_process(x, y):
return something
@celery.task
def another_long_process(x, y):
return something_else
@celery.task
def yet_another_long_process(x, y):
return a_third_thing
3 个回答
使用celery画布中的Group功能:
Group是一个可以同时执行多个任务的功能,它接受一个任务列表。
下面是文档中提供的示例:
from celery import group
from proj.tasks import add
g = group(add.s(2, 2), add.s(4, 4))
res = g()
res.get()
这个示例的输出是 [4, 8]
。
根据文档,result.get()
这个方法会等到结果准备好后再返回,所以通常情况下它是会阻塞的,也就是说在结果没出来之前,程序会停在那里等。不过,因为你设置了timeout=1
,如果任务超过1秒还没完成,调用get()
就会抛出一个超时错误。
默认情况下,Celery的工作进程会根据可用的CPU数量来设置并发级别。这个并发级别决定了可以用来处理任务的线程数量。所以,如果并发级别大于等于3,Celery的工作进程应该能够同时处理这么多任务(也许有更懂Celery的人可以确认这一点?)。
你应该修改你的代码,让多个工作者可以同时工作:
@myapp.route('/foo')
def bar():
# start tasks
task_1 = a_long_process.delay(x, y)
task_2 = another_long_process.delay(x, y)
task_3 = yet_another_long_process.delay(x, y)
# fetch results
try:
task_1_result = task_1.get(timeout=1)
task_2_result = task_2.get(timeout=1)
task_3_result = task_3.get(timeout=1)
except TimeoutError:
# Handle this or don't specify a timeout.
raise
# combine results
return task_1 + task_2 + task_3
这段代码会一直等到所有结果都出来(或者超时)才继续。
在这些进程执行的时候,Flask应用会被阻塞吗?
这段代码只会阻塞你WSGI容器中的一个工作者。整个网站是否会无响应,取决于你使用的WSGI容器(比如Apache + mod_wsgi、uWSGI、gunicorn等)。大多数WSGI容器会启动多个工作者,所以在你的代码等待任务结果的时候,只有一个工作者会被阻塞。
对于这种应用,我建议使用gevent,它为每个请求启动一个独立的绿色线程,非常轻量。