我需要计算一个3gbgzip压缩的英语句子纯文本文件的词频,解压缩时大约是30gb。在
我有一个带有collections.Counter
和gzip.open
的单线程脚本,需要几个小时才能完成。在
因为逐行读取文件比拆分和计数快得多,所以我考虑一个producer-consumer流,其中一个文件读取器生成行,几个消费者执行拆分和计数,最后,合并Counter
s以获得单词的出现。在
但是,我找不到ProcessPoolExecutor
向Executor
发送队列的示例,它们只是map
列表中的单个项目。
只有asyncio.Queue
的单线程示例。在
它是一个巨大的文件,所以我不能在计数之前读取整个文件并得到list
,因此我不能使用concurrent.futures.Executor.map
。但我读到的所有例子都是以一个固定的列表作为开始。
拆分和计算一个句子的时间相当于fork a process,所以我必须使每个消费过程的寿命更长。我不认为map
可以合并Counter
,因此我不能使用chunksize
>;1。因此,我必须给消费者一个队列,让他们继续计数,直到整个文件完成。但大多数示例只向消费者发送一个项目,并使用chunksize=1000
来减少fork
次。
你能给我写个例子吗?在
py与py3.3向后兼容的速度更快。在
我的实际情况是更具体的文件格式:
chr1 10011 141 0 157 4 41 50
chr1 10012 146 1 158 4 42 51
chr1 10013 150 0 163 4 43 53
chr1 10014 164 3 167 4 44 54
我需要计算第3列到第8列的每个柱状图。 所以我把词频作为一个简单的例子。在
我的代码是:
^{pr2}$csv.DictReader
花费的时间最多。在
我的问题是,虽然gzip阅读器很快,csv阅读器很快,但我需要数到几十亿行。而且csv阅读器肯定比gzip阅读器慢。在
因此,我需要将行扩展到csv阅读器的不同工作进程,并分别进行下游计数。在一个生产者和许多消费者之间使用队列是很方便的。在
由于我使用的是Python而不是C,是否有一些抽象的包装器用于多处理和队列?是否可以将ProcessPoolExecutor
与Queue
类一起使用?在
一个30gb的文本文件足够大,可以把你的问题放到大数据领域。所以为了解决这个问题,我建议使用像Hadoop和Spark这样的大数据工具。您所解释的“生产者-消费者流”基本上就是
MapReduce
算法的设计目的。单词计数频率是一个典型的MapReduce问题。查一下,你会发现很多例子。在这个想法是把大文件分成更小的文件。调用许多将执行计数作业并返回计数器的工作线程。 最后合并计数器。在
我从来没有测试过这个代码,但应该可以工作。在
第一件事是检查行数
将数据拆分为n个分区
^{pr2}$现在开始工作:
函数的一个示例
不过,在执行任何操作之前,您仍需要检查行数
相关问题 更多 >
编程相关推荐