如何对这个单词计数函数进行并行化?
我有一些串行代码,用来计算单词的搭配,也就是统计一起出现的单词对。下面这个程序可以正常工作,不过句子的列表是为了演示而准备的。
import sys
from collections import defaultdict
GLOBAL_CONCORDANCE = defaultdict(lambda: defaultdict(lambda: defaultdict(lambda: [])))
def BuildConcordance(sentences):
global GLOBAL_CONCORDANCE
for sentenceIndex, sentence in enumerate(sentences):
words = [word for word in sentence.split()]
for index, word in enumerate(words):
for i, collocate in enumerate(words[index:len(words)]):
GLOBAL_CONCORDANCE[word][collocate][i].append(sentenceIndex)
def main():
sentences = ["Sentence 1", "Sentence 2", "Sentence 3", "Sentence 4"]
BuildConcordance(sentences)
print GLOBAL_CONCORDANCE
if __name__ == "__main__":
main()
在我看来,第一个循环可以并行处理,因为计算的数字是相互独立的。但是,正在修改的数据结构是一个全局的。
我尝试使用Python的Pool
模块,但遇到了一些序列化的问题,这让我怀疑我是不是在用正确的设计模式。有没有人能推荐一个好的方法来并行处理这段代码?
3 个回答
0
我对这个解决方案不是特别满意(因为我为了避免序列化错误而转换了defaultdict的方式看起来有点糟糕!!)不过这是一个有效的解决办法:
import sys
from collections import defaultdict
from multiprocessing import Pool, Queue
import re
def cdefaultdict():
return []
def bdefaultdict():
return defaultdict(cdefaultdict)
def adefaultdict():
return defaultdict(bdefaultdict)
GLOBAL_CONCORDANCE = defaultdict(adefaultdict)
def ConcordanceWorker(work):
sentenceIndex, sentence = work[0], work[1]
local_concordance = defaultdict(adefaultdict)
words = [word for word in sentence.split()]
for index, word in enumerate(words):
for i, collocate in enumerate(words[index:len(words)]):
local_concordance[word][collocate][i].append(sentenceIndex)
return local_concordance
def BuildConcordance(sentences):
global GLOBAL_CONCORDANCE
work = []
for sentenceIndex, sentence in enumerate(sentences):
work.append([sentenceIndex, sentence])
pool = Pool(8)
results = pool.map(ConcordanceWorker, work)
for result in results:
for word in result:
for collocate in result[word]:
for i in result[word][collocate]:
GLOBAL_CONCORDANCE[word][collocate][i] += result[word][collocate][i]
print len(GLOBAL_CONCORDANCE)
def main():
sentences = ["Sentence 1", "Sentence 2", "Sentence 3", "Sentence 4"]
BuildConcordance(sentences)
if __name__ == "__main__":
main()
1
我真的怀疑把这个计算做成并行处理会给你带来什么好处(除了让你了解并行处理的概念)。
你现在用的数据结构在内存使用上并不是很高效,因为Python的字典占用的内存比较大,而你创建了很多这样的字典。此外,你还在每个句子中添加了非常多的项目到“词汇表”里。
我觉得你运行这个程序几分钟后,可能就会把电脑的内存用光,尤其是处理像一本书的一个章节这样的内容。这可能会导致你的程序卡住或者崩溃。你有没有尝试过在一大堆输入上运行这个程序,看看会发生什么?如果真是这样,那么计算是并行进行还是串行进行就没什么关系了。
我认为在你找到更好的输出存储方式之前,并行处理并不会带来真正的好处。只有在你实际遇到并诊断出问题后,才考虑使用并行处理(或其他优化方法)。
1
一般来说,使用函数式编程风格时,多进程处理会比较简单。在这种情况下,我建议你让每个工作函数返回一个结果的列表,里面是一些元组。使用嵌套的 defaultdict
会增加复杂性,但其实并没有带来什么好处。可以试试这样做:
import sys
from collections import defaultdict
from multiprocessing import Pool, Queue
import re
GLOBAL_CONCORDANCE = defaultdict(lambda: defaultdict(lambda: defaultdict(list)))
def concordance_worker(index_sentence):
sent_index, sentence = index_sentence
words = sentence.split()
return [(word, colo_word, colo_index, sent_index)
for i, word in enumerate(words)
for colo_index, colo_word in enumerate(words[i:])]
def build_concordance(sentences):
global GLOBAL_CONCORDANCE
pool = Pool(8)
results = pool.map(concordance_worker, enumerate(sentences))
for result in results:
for word, colo_word, colo_index, sent_index in result:
GLOBAL_CONCORDANCE[word][colo_word][colo_index].append(sent_index)
print len(GLOBAL_CONCORDANCE)
def main():
sentences = ["Sentence 1", "Sentence 2", "Sentence 3", "Sentence 4"]
build_concordance(sentences)
if __name__ == "__main__":
main()
如果这样做还不能满足你的需求,告诉我哦。