mpi4py:动态数据处理

3 投票
1 回答
901 浏览
提问于 2025-04-18 09:48

我有一个包含股票代码的列表,比如 tickers = ['AAPL','XOM','GOOG']。在我“传统”的 Python 程序中,我会遍历这个 tickers 列表,选择一个股票代码,比如 AAPL,然后导入一个包含 AAPL 股票收益的 CSV 文件,使用这些收益作为输入,调用一个常用的函数,最后生成一个 CSV 文件作为输出。因为我有超过 4000 个股票代码,而对每个股票代码应用的函数处理起来需要时间。我可以使用一个计算机集群,里面有 mpi4py 这个包,每个任务可以使用大约 100 个处理器。我对这个 mpi例子理解得很好,并且能够在 Python 中实现:

from mpi4py import MPI

comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()
if rank == 0:
    data = [i for i in range(8)]
# dividing data into chunks
    chunks = [[] for _ in range(size)]
    for i, chunk in enumerate(data):
        chunks[i % size].append(chunk)
else:
    data = None
    chunks = None
data = comm.scatter(chunks, root=0)
print str(rank) + ': ' + str(data)

[cha@cluster] ~/utils> mpirun -np 3 ./mpi.py 
2: [2, 5]
0: [0, 3, 6]
1: [1, 4, 7]

在这个例子中,我们有一个大小为 8 的数据向量,并且将相同数量的元素分配给每个处理器(总共 3 个)。我想知道如何使用上面的类似例子,将每个股票代码分配给每个处理器,并对每个股票代码应用需要运行的函数?我该如何告诉 Python,一旦某个处理器空闲,就去 tickers 列表中处理一个还没有被处理的 ticker

1 个回答

4

还有另一种理解方式。假设你有100个处理器在处理4000块数据。你可以这样想,每个处理器都有一块数据来处理。如果平均分配的话,每个处理器会处理40个数据块。比如,处理器1处理第0到第39块,处理器2处理第40到第79块,依此类推。

用这种方式思考,你就不用担心一个处理器完成任务后会发生什么。只需要一个循环就可以了:

block_size = len(tickers) / size # this will be 40 in your example
for i in range(block_size):
    ticker = tickers[rank * block_size + i]
    process(ticker)

def process(ticker):
    # load data
    # process data
    # output data

这样理解有没有问题?

[编辑]
如果你想了解更多,这其实只是对行优先顺序索引的一种变体,这是一种常见的方法,用来访问存储在单一内存维度中的多维数据。

撰写回答