如何对这个单词计数函数进行并行化?

4 投票
3 回答
1221 浏览
提问于 2025-04-17 05:40

我有一些串行代码,用来计算单词的搭配,也就是统计一起出现的单词对。下面这个程序可以正常工作,不过句子的列表是为了演示而准备的。

import sys
from collections import defaultdict

GLOBAL_CONCORDANCE = defaultdict(lambda: defaultdict(lambda: defaultdict(lambda: [])))

def BuildConcordance(sentences):
    global GLOBAL_CONCORDANCE
    for sentenceIndex, sentence in enumerate(sentences):
        words = [word for word in sentence.split()]

        for index, word in enumerate(words):
            for i, collocate in enumerate(words[index:len(words)]):
                GLOBAL_CONCORDANCE[word][collocate][i].append(sentenceIndex)

def main():
    sentences = ["Sentence 1", "Sentence 2", "Sentence 3", "Sentence 4"]
    BuildConcordance(sentences)
    print GLOBAL_CONCORDANCE

if __name__ == "__main__":
    main()

在我看来,第一个循环可以并行处理,因为计算的数字是相互独立的。但是,正在修改的数据结构是一个全局的。

我尝试使用Python的Pool模块,但遇到了一些序列化的问题,这让我怀疑我是不是在用正确的设计模式。有没有人能推荐一个好的方法来并行处理这段代码?

3 个回答

0

我对这个解决方案不是特别满意(因为我为了避免序列化错误而转换了defaultdict的方式看起来有点糟糕!!)不过这是一个有效的解决办法:

import sys
from collections import defaultdict
from multiprocessing import Pool, Queue
import re

def cdefaultdict():
    return []

def bdefaultdict():
    return defaultdict(cdefaultdict)

def adefaultdict():
    return defaultdict(bdefaultdict)

GLOBAL_CONCORDANCE = defaultdict(adefaultdict)

def ConcordanceWorker(work):
    sentenceIndex, sentence = work[0], work[1]
    local_concordance = defaultdict(adefaultdict)
    words = [word for word in sentence.split()]

    for index, word in enumerate(words):
        for i, collocate in enumerate(words[index:len(words)]):
            local_concordance[word][collocate][i].append(sentenceIndex)

    return local_concordance


def BuildConcordance(sentences):
    global GLOBAL_CONCORDANCE
    work = []

    for sentenceIndex, sentence in enumerate(sentences):
        work.append([sentenceIndex, sentence])

    pool = Pool(8)

    results = pool.map(ConcordanceWorker, work)

    for result in results:
        for word in result:
            for collocate in result[word]:
                for i in result[word][collocate]:
                    GLOBAL_CONCORDANCE[word][collocate][i] += result[word][collocate][i]

    print len(GLOBAL_CONCORDANCE)


def main():
    sentences = ["Sentence 1", "Sentence 2", "Sentence 3", "Sentence 4"]
    BuildConcordance(sentences)

if __name__ == "__main__":
    main()
1

我真的怀疑把这个计算做成并行处理会给你带来什么好处(除了让你了解并行处理的概念)。

你现在用的数据结构在内存使用上并不是很高效,因为Python的字典占用的内存比较大,而你创建了很多这样的字典。此外,你还在每个句子中添加了非常多的项目到“词汇表”里。

我觉得你运行这个程序几分钟后,可能就会把电脑的内存用光,尤其是处理像一本书的一个章节这样的内容。这可能会导致你的程序卡住或者崩溃。你有没有尝试过在一大堆输入上运行这个程序,看看会发生什么?如果真是这样,那么计算是并行进行还是串行进行就没什么关系了。

我认为在你找到更好的输出存储方式之前,并行处理并不会带来真正的好处。只有在你实际遇到并诊断出问题后,才考虑使用并行处理(或其他优化方法)。

1

一般来说,使用函数式编程风格时,多进程处理会比较简单。在这种情况下,我建议你让每个工作函数返回一个结果的列表,里面是一些元组。使用嵌套的 defaultdict 会增加复杂性,但其实并没有带来什么好处。可以试试这样做:

import sys
from collections import defaultdict
from multiprocessing import Pool, Queue
import re

GLOBAL_CONCORDANCE = defaultdict(lambda: defaultdict(lambda: defaultdict(list)))

def concordance_worker(index_sentence):
    sent_index, sentence = index_sentence
    words = sentence.split()

    return [(word, colo_word, colo_index, sent_index)
            for i, word in enumerate(words)
            for colo_index, colo_word in enumerate(words[i:])]

def build_concordance(sentences):
    global GLOBAL_CONCORDANCE
    pool = Pool(8)

    results = pool.map(concordance_worker, enumerate(sentences))

    for result in results:
        for word, colo_word, colo_index, sent_index in result:
            GLOBAL_CONCORDANCE[word][colo_word][colo_index].append(sent_index)

    print len(GLOBAL_CONCORDANCE)


def main():
    sentences = ["Sentence 1", "Sentence 2", "Sentence 3", "Sentence 4"]
    build_concordance(sentences)

if __name__ == "__main__":
    main()

如果这样做还不能满足你的需求,告诉我哦。

撰写回答