mpi4py:动态数据处理
我有一个包含股票代码的列表,比如 tickers = ['AAPL','XOM','GOOG']
。在我“传统”的 Python 程序中,我会遍历这个 tickers
列表,选择一个股票代码,比如 AAPL
,然后导入一个包含 AAPL
股票收益的 CSV 文件,使用这些收益作为输入,调用一个常用的函数,最后生成一个 CSV 文件作为输出。因为我有超过 4000 个股票代码,而对每个股票代码应用的函数处理起来需要时间。我可以使用一个计算机集群,里面有 mpi4py
这个包,每个任务可以使用大约 100 个处理器。我对这个 mpi
的 例子理解得很好,并且能够在 Python 中实现:
from mpi4py import MPI
comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()
if rank == 0:
data = [i for i in range(8)]
# dividing data into chunks
chunks = [[] for _ in range(size)]
for i, chunk in enumerate(data):
chunks[i % size].append(chunk)
else:
data = None
chunks = None
data = comm.scatter(chunks, root=0)
print str(rank) + ': ' + str(data)
[cha@cluster] ~/utils> mpirun -np 3 ./mpi.py
2: [2, 5]
0: [0, 3, 6]
1: [1, 4, 7]
在这个例子中,我们有一个大小为 8 的数据向量,并且将相同数量的元素分配给每个处理器(总共 3 个)。我想知道如何使用上面的类似例子,将每个股票代码分配给每个处理器,并对每个股票代码应用需要运行的函数?我该如何告诉 Python,一旦某个处理器空闲,就去 tickers
列表中处理一个还没有被处理的 ticker
?
1 个回答
4
还有另一种理解方式。假设你有100个处理器在处理4000块数据。你可以这样想,每个处理器都有一块数据来处理。如果平均分配的话,每个处理器会处理40个数据块。比如,处理器1处理第0到第39块,处理器2处理第40到第79块,依此类推。
用这种方式思考,你就不用担心一个处理器完成任务后会发生什么。只需要一个循环就可以了:
block_size = len(tickers) / size # this will be 40 in your example
for i in range(block_size):
ticker = tickers[rank * block_size + i]
process(ticker)
def process(ticker):
# load data
# process data
# output data
这样理解有没有问题?
[编辑]
如果你想了解更多,这其实只是对行优先顺序索引的一种变体,这是一种常见的方法,用来访问存储在单一内存维度中的多维数据。