Celery 并行分布式任务与多进程
我有一个需要大量CPU运算的Celery任务。我想利用很多EC2实例的所有处理能力(也就是多个核心),让这个任务更快完成(这是一个使用多进程的Celery并行分布式任务 - 我觉得是这样)。
我正在努力理解一些术语,比如线程、多进程、分布式计算和分布式并行处理。
示例任务:
@app.task
for item in list_of_millions_of_ids:
id = item # do some long complicated equation here very CPU heavy!!!!!!!
database.objects(newid=id).save()
使用上面的代码(如果可能的话给个例子),我想知道如何利用Celery将这个任务分配到多个机器上,让这个任务可以被拆分,从而充分利用云中所有可用机器的CPU计算能力?
4 个回答
增加更多的celery工作进程确实可以加快任务的执行速度。不过,你可能还有另一个瓶颈,那就是数据库。要确保数据库能够处理同时进行的插入和更新操作。
关于你的问题:你是通过在你的EC2实例上分配另一个进程来添加celery工作进程,命名为celeryd
。根据你需要的工作进程数量,你可能还想增加更多的实例。
为什么不使用 group
的 celery 任务呢?
http://celery.readthedocs.org/en/latest/userguide/canvas.html#groups
简单来说,你应该把 ids
分成几块(或者范围),然后把这些块交给一组 group
任务去处理。
如果你需要做一些更复杂的事情,比如汇总某些 celery 任务的结果,我之前成功使用过 chord
任务来实现类似的目的:
http://celery.readthedocs.org/en/latest/userguide/canvas.html#chords
把 settings.CELERYD_CONCURRENCY
设置成一个合理的数字,这个数字是你能承受的,然后这些 celery 工作者就会一直执行你的任务,直到完成,无论是以组的方式还是以和弦的方式。
注意:由于 kombu
的一个bug,之前在处理大量任务时重用工作者会有问题,我不知道现在是否修复了。也许修复了,但如果没有,就把 CELERYD_MAX_TASKS_PER_CHILD 的值调低。
以下是我运行的一个简化和修改后的代码示例:
@app.task
def do_matches():
match_data = ...
result = chord(single_batch_processor.s(m) for m in match_data)(summarize.s())
summarize
会获取所有 single_batch_processor
任务的结果。每个任务都在任意的 Celery 工作者上运行,kombu
负责协调这一切。
现在我明白了:single_batch_processor
和 summarize
也必须是 celery 任务,而不是普通函数——否则当然就无法并行处理了(我甚至不确定如果不是 celery 任务,和弦构造器是否会接受它)。
在分布式系统中,有一件事你一定要记住:
过早优化是万恶之源。—— D. Knuth
我知道这听起来很明显,但在分发数据之前,最好再检查一下你是否在使用最好的算法(如果有的话)。
说到这里,优化分发其实是在三件事情之间找到平衡:
- 从持久存储中写入/读取数据,
- 将数据从介质A移动到介质B,
- 处理数据,
计算机的设计是这样的:你越靠近处理单元(第3点),写入和读取数据(第1点和第2点)就会越快越高效。在一个经典的集群中,顺序是这样的:网络硬盘、地方硬盘、内存、处理单元内部……
现在的处理器越来越复杂,可以看作是多个独立的硬件处理单元,通常称为核心,这些核心通过线程来处理数据(第3点)。想象一下,如果你的核心非常快,当你用一个线程发送数据时,你只用了50%的计算机性能;如果核心有两个线程,那你就能用到100%。每个核心有两个线程叫做超线程,你的操作系统会把每个超线程核心看作两个CPU。
在处理器中管理线程通常叫做多线程。 操作系统管理CPU通常叫做多进程。 在集群中管理并发任务通常叫做并行编程。 在集群中管理依赖任务通常叫做分布式编程。
那么你的瓶颈在哪里呢?
- 在第(1)点:尽量从更高层次的存储进行持久化和流式处理(也就是离你的处理单元更近的存储,比如如果网络硬盘慢,先保存到地方硬盘)。
- 在第(2)点:这是最常见的瓶颈,尽量避免不必要的通信数据包,或者实时压缩数据包(比如如果硬盘慢,只保存一个“批量计算”的消息,把中间结果保存在内存中)。
- 在第(3)点:你完成了!你正在充分利用所有可用的处理能力。
那Celery呢?
Celery是一个用于分布式编程的消息框架,它会使用一个代理模块进行通信(第2点)和一个后端模块进行持久化(第1点),这意味着你可以通过更改配置来避免大部分瓶颈(如果可能的话),主要是在你的网络上。
首先,分析你的代码,以便在单台计算机上获得最佳性能。
然后在你的集群中使用Celery,使用默认配置并设置 CELERY_RESULT_PERSISTENT=True
:
from celery import Celery
app = Celery('tasks',
broker='amqp://guest@localhost//',
backend='redis://localhost')
@app.task
def process_id(all_the_data_parameters_needed_to_process_in_this_computer):
#code that does stuff
return result
在执行过程中,打开你喜欢的监控工具,我使用rabbitMQ的默认监控工具、Celery的flower和CPU的top,结果会保存在你的后端中。一个网络瓶颈的例子是任务队列增长得太多,导致执行延迟,你可以考虑更换模块或调整Celery配置,如果不是这样,那你的瓶颈可能在别的地方。
你的目标是:
- 把工作分配到很多机器上(分布式计算/分布式并行处理)
- 在一台机器上把工作分配到所有的CPU上(多进程/多线程)
Celery可以很轻松地做到这两点。首先要明白的是,每个celery工作进程默认会根据系统中可用的CPU核心数量来运行任务:
并发性是指用来同时处理任务的预先启动的工作进程数量,当这些进程都在忙着工作时,新任务就得等其中一个任务完成后才能处理。
默认的并发数就是机器上的CPU数量(包括核心),你可以通过-c选项指定一个自定义的数字。没有推荐的值,因为最佳数量取决于多种因素,但如果你的任务大多数是I/O密集型的,可以尝试增加这个数字。实验表明,增加到超过CPU数量的两倍通常效果不佳,反而可能会降低性能。
这意味着每个单独的任务不需要担心使用多进程或多线程来利用多个CPU或核心。相反,celery会同时运行足够的任务来使用每个可用的CPU。
了解这一点后,下一步是创建一个任务来处理你list_of_millions_of_ids
中的一部分。这里有几个选择——一个是让每个任务处理一个ID,这样你就运行N个任务,其中N == len(list_of_millions_of_ids)
。这样可以确保工作在所有任务之间均匀分配,因为不会出现某个工作进程提前完成而无事可做的情况;如果它需要工作,可以从队列中取一个ID。你可以使用celery的group
来实现这一点(正如John Doe提到的)。
tasks.py:
@app.task
def process_ids(item):
id = item #long complicated equation here
database.objects(newid=id).save()
然后执行这些任务:
from celery import group
from tasks import process_id
jobs = group(process_ids(item) for item in list_of_millions_of_ids)
result = jobs.apply_async()
另一个选择是把列表分成小块,然后把这些小块分配给你的工作进程。这种方法有可能浪费一些计算资源,因为可能会有一些工作进程在等待,而其他的仍在工作。不过,celery文档指出,这种担忧往往是多余的:
有些人可能担心将任务分块会降低并行性,但在繁忙的集群中,这种情况很少发生,实际上,由于避免了消息传递的开销,性能可能会显著提高。
所以,你可能会发现将列表分块并把这些块分配给每个任务的效果更好,因为减少了消息传递的开销。这样做也可能稍微减轻数据库的负担,通过计算每个ID,存储在一个列表中,然后在完成后一次性把整个列表添加到数据库中,而不是一个一个地添加。分块的方法大致如下:
tasks.py:
@app.task
def process_ids(items):
for item in items:
id = item #long complicated equation here
database.objects(newid=id).save() # Still adding one id at a time, but you don't have to.
然后启动这些任务:
from tasks import process_ids
jobs = process_ids.chunks(list_of_millions_of_ids, 30) # break the list into 30 chunks. Experiment with what number works best here.
jobs.apply_async()
你可以尝试不同的分块大小,看看哪个效果最好。你想找到一个平衡点,既能减少消息传递的开销,又能保持块的大小足够小,以免出现某个工作进程完成得比其他工作进程快得多,然后就无事可做的情况。