并行化集合交集操作?

2 投票
1 回答
951 浏览
提问于 2025-04-17 04:24

我有一个这样的文件:

A 1
A 1
A 2
A 3
B 2
B 3
C 2
C 3

我把它转换成了下面这种数据结构:

s = [set([1, 2, 3]), set([2, 3]), set([2,3])]

为了找出所有2个组合的交集长度,我使用了以下方法:

from itertools import combinations
for i in combinations(s, 2):
    inter = i[0] & i[1]
    print len(inter)

这里的 s 有30万个不同的集合,每个集合的长度大约是1000。现在有两个瓶颈:

  • 读取文件
  • 计算交集的长度

第一个瓶颈可能是无法避免的,但第二个是可以改进的。我有一台64核的机器,所以我想知道如何让这个程序并行运行。有没有适合多核机器的map reduce库可以使用呢?

1 个回答

0

如果你还没了解过,可以看看 multiprocessing 模块。虽然使用起来很方便,但其实不一定要用 itertools.combinations() 来获取所有独特的两两组合。如果你能接受使用全局变量,可以用 multiprocessing.Pool.map() 来把任务分配给多个进程。比如:

from multiprocessing import Pool

def tally(n):
    return [len(s[n] & t) for t in s[n+1:]]

p=Pool()
for resultset in p.map(tally, xrange(len(s)), chunksize=1):
    for result in resultset:
        print result

tally() 函数会在列表 l 中的第 n 个位置与后面每一个位置进行集合交集的计算,这个过程是在一个进程中完成的。而 p.map() 则可以把这个任务并行处理,针对 l 中的每个位置使用尽可能多的进程,这个数量是通过 cpu_count() 返回的。

你可以在 https://gist.github.com/c576fd7f48be5f66deaa 找到一个完整的示例。对于大数据集来说,在四核机器上运行这个方法的性能提升是相当明显的,相比于只用内置的 map() 函数在单个进程中运行。

撰写回答