如何同时计算一个大文件中的词频?

2024-05-14 07:38:30 发布

您现在位置:Python中文网/ 问答频道 /正文

我需要计算一个3gbgzip压缩的英语句子纯文本文件的词频,解压缩时大约是30gb。在

我有一个带有collections.Countergzip.open的单线程脚本,需要几个小时才能完成。在

因为逐行读取文件比拆分和计数快得多,所以我考虑一个producer-consumer流,其中一个文件读取器生成行,几个消费者执行拆分和计数,最后,合并Counters以获得单词的出现。在

但是,我找不到ProcessPoolExecutorExecutor发送队列的示例,它们只是map列表中的单个项目。 只有asyncio.Queue的单线程示例。在

  • 它是一个巨大的文件,所以我不能在计数之前读取整个文件并得到list,因此我不能使用concurrent.futures.Executor.map。但我读到的所有例子都是以一个固定的列表作为开始。

  • 拆分和计算一个句子的时间相当于fork a process,所以我必须使每个消费过程的寿命更长。我不认为map可以合并Counter,因此我不能使用chunksize>;1。因此,我必须给消费者一个队列,让他们继续计数,直到整个文件完成。但大多数示例只向消费者发送一个项目,并使用chunksize=1000来减少fork次。

你能给我写个例子吗?在

py与py3.3向后兼容的速度更快。在


我的实际情况是更具体的文件格式:

chr1    10011   141     0       157     4       41      50
chr1    10012   146     1       158     4       42      51
chr1    10013   150     0       163     4       43      53
chr1    10014   164     3       167     4       44      54

我需要计算第3列到第8列的每个柱状图。 所以我把词频作为一个简单的例子。在

我的代码是:

^{pr2}$

csv.DictReader花费的时间最多。在

cProfile


我的问题是,虽然gzip阅读器很快,csv阅读器很快,但我需要数到几十亿行。而且csv阅读器肯定比gzip阅读器慢。在

因此,我需要将行扩展到csv阅读器的不同工作进程,并分别进行下游计数。在一个生产者和许多消费者之间使用队列是很方便的。在

由于我使用的是Python而不是C,是否有一些抽象的包装器用于多处理和队列?是否可以将ProcessPoolExecutorQueue类一起使用?在


Tags: 文件csv示例map队列counter消费者例子
3条回答

一个30gb的文本文件足够大,可以把你的问题放到大数据领域。所以为了解决这个问题,我建议使用像Hadoop和Spark这样的大数据工具。您所解释的“生产者-消费者流”基本上就是MapReduce算法的设计目的。单词计数频率是一个典型的MapReduce问题。查一下,你会发现很多例子。在

这个想法是把大文件分成更小的文件。调用许多将执行计数作业并返回计数器的工作线程。 最后合并计数器。在

from itertools import islice
from multiprocessing import Pool
from collections import Counter
import os

NUM_OF_LINES = 3
INPUT_FILE = 'huge.txt'
POOL_SIZE = 10


def slice_huge_file():
    cnt = 0
    with open(INPUT_FILE) as f:
        while True:
            next_n_lines = list(islice(f, NUM_OF_LINES))
            cnt += 1
            if not next_n_lines:
                break
            with open('sub_huge_{}.txt'.format(cnt), 'w') as out:
                out.writelines(next_n_lines)


def count_file_words(input_file):
    with open(input_file, 'r') as f:
        return Counter([w.strip() for w in f.readlines()])


if __name__ == '__main__':
    slice_huge_file()
    pool = Pool(POOL_SIZE)
    sub_files = [os.path.join('.',f) for f in os.listdir('.') if f.startswith('sub_huge')]
    results = pool.map(count_file_words, sub_files)
    final_counter = Counter()
    for counter in results:
        final_counter += counter
    print(final_counter)

我从来没有测试过这个代码,但应该可以工作。在

第一件事是检查行数

f =('myfile.txt')
def file_len(f):
    with open(f) as f:
        for i, l in enumerate(f):
            pass
    return i + 1
num_lines = file_len(f)

将数据拆分为n个分区

^{pr2}$

现在开始工作:

from multiprocessing import Process
import linecache
jobs = []

for part in range(len(parts)):
    p = Process(target = function_here, args = ('myfile.txt', parts[part], split_size))
    jobs.append(p)
    p.start()

for p in jobs:
    p.join()

函数的一个示例

def function_here(your_file_name, line_number, split_size):

    for current_line in range(line_number, (line_number+split_size)+1):
        print( linecache.getline(your_file_name, current_line))

不过,在执行任何操作之前,您仍需要检查行数

相关问题 更多 >

    热门问题