如何一次处理1亿多行文字

2024-05-23 17:53:57 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一段代码,可以逐行读取和处理文本文件,问题是我的文本文件有15-20亿行,而且要花很长时间。是否可以同时处理超过1亿条生产线

from cryptotools.BTC.HD import check, WORDS



with open("input.txt", "r") as a_file:
    for line in a_file:
        stripped_line = line.strip()
        for word in WORDS:
            mnemonic = stripped_line.format(x=word)
            if check(mnemonic):
               print(mnemonic)
               with open("print.txt", "a") as i:
                   i.write(mnemonic)
                   i.write("\n")

输入文件具有以下采样行:

gloom document {x} stomach uncover peasant sock minor decide special roast rural
happy seven {x} gown rally tennis yard patrol confirm actress pledge luggage
tattoo time {x} other horn motor symbol dice update outer fiction sign
govern wire {x} pill valid matter tomato scheme girl garbage action pulp

Tags: intxtforcheckaswithlineopen
1条回答
网友
1楼 · 发布于 2024-05-23 17:53:57

要一次处理1亿行,必须有1亿个线程。另一种提高代码速度的方法是在不同的线程之间分配工作(少于1亿个线程)。
由于文件的写入和读取操作不是异步的,所以最好在程序开始时读取所有文件,并在程序结束时写出处理过的数据。在下面的代码中,我假设您不关心写入文件的顺序。但是,如果顺序很重要,您可以设置一个字典,该字典以特定线程所阐述的当前行的位置值为键,并在末尾进行相应的排序。

import concurrent.futures as cf

N_THREADS = 20
result = []

def doWork(data):
    for line in data:
        #do what you have to do
        result.append(mnemonic)

m_input = open("input.txt", "r")
lines = [line for line in m_input]
#the data for the threads will be here
#as a list of rows for each thread
m_data= { i: [] for i in range(0, N_THREADS)} 
for l, n in zip(lines, range(0, len(lines))):
    m_data[n%N_THREADS].append(l)
'''
If you have to trim the number of threads uncomment these lines
m_data= { k:v for k, v in m_data.items() if len(v) != 0}
N_THREADS = N_THREADS if len(m_data) > N_THREADS else len(m_data)
if(N_THREADS == 0): 
    exit()
'''
with cf.ThreadPoolExecutor(max_workers=N_THREADS) as tp:
    for d in m_data.keys():
        tp.submit(doWork,m_data[d])
    
#work done
output = open("print.txt", "w")
for item in result:
    output.write(f"{item}\n")
output.close()

更改您认为最有效的线程数


编辑(使用内存优化):

上面的代码虽然非常快,但占用了大量内存,因为它将整个文件加载到内存中,然后再对其进行处理。

然后您有两个选项:

  • 将您的文件拆分为多个较小的文件,根据我的测试(见下文),使用一个约1000万行的测试文件,该程序实际上运行速度非常快,但使用了高达1.3 GB的ram
  • 使用下面的代码,每次加载一行,并将该行分配给在该行上工作的线程,然后将数据推送到只负责写入文件的线程。这样,内存使用率显著降低,但执行时间增加

下面的代码从文件中读取一行(一行有1000万行,约为~500 MB),然后将该数据发送给管理固定数量线程的类。目前,每当一个线程完成时,我都会生成一个新线程,事实上,这样会更有效,始终使用相同的线程,并为每个线程使用一个队列。然后我生成一个writer线程,它唯一的工作就是写入将包含结果的out.txt文件。在我的测试中,我只读取文本文件,并在另一个文件中写入相同的行
我发现如下(使用1000万行文件):

  • 原始代码:它花费了14.20630669593811秒,使用了1.301 GB(平均使用率)的ram和10%的cpu使用率
  • 更新代码:用了1230.4356942176819秒,使用了4.3 MB(平均使用率)的ram和10%的cpu使用率,内部参数如下代码所示

两个程序使用相同数量的线程获得计时结果
从这些结果可以明显看出,内存优化的代码在使用更少ram的情况下运行速度明显较慢。您可以调整内部参数,如线程数或最大队列大小,以提高性能,同时要记住这会影响内存使用。经过大量测试后,我建议将文件拆分为多个子文件,这些子文件可以放在内存中,并运行原始版本的代码(见上文),因为在我看来,时间和速度之间的权衡是不合理的
在这里,我将针对内存消耗优化的代码放入中,但要记住,没有以任何重要方式进行优化。就线程管理而言,建议始终使用相同的线程,并使用多个队列将数据传递给这些线程
在这里,我留下了我用来优化内存消耗的代码(是的,比上面XD的代码要复杂得多,可能比它需要的还要复杂):


from threading import Thread
import time
import os
import queue

MAX_Q_SIZE = 100000
m_queue = queue.Queue(maxsize=MAX_Q_SIZE)
end_thread = object()

def doWork(data):
    #do your work here, before
    #checking if the queue is full,
    #otherwise when you finish the 
    #queue might be full again
    while m_queue.full():
        time.sleep(0.1)
        pass
    
    m_queue.put(data)

def writer():
    #check if file exists or creates it
    try:
        out = open("out.txt", "r")
        out.close()
    except FileNotFoundError:
        out = open("out.txt", "w")
        out.close()
    out = open("out.txt", "w")
    _end = False
    while True:
        if m_queue.qsize == 0:
            if _end:
                break
            continue
        try:
            item = m_queue.get()
            if item is end_thread:
                out.close()
                _end = True
                break
            global written_lines
            written_lines += 1
            out.write(item)
        except:
            break


class Spawner:
    def __init__(self, max_threads):
        self.max_threads = max_threads
        self.current_threads = [None]*max_threads
        self.active_threads = 0
        self.writer = Thread(target=writer)
        self.writer.start()

    def sendWork(self, data):
        m_thread = Thread(target=doWork, args=(data, ))
        replace_at = -1
        if self.active_threads >= self.max_threads:
            #wait for at least 1 thread to finish
            while True:
                for index in range(self.max_threads):
                    if self.current_threads[index].is_alive() :
                        pass
                    else:
                        self.current_threads[index] = None
                        self.active_threads -= 1
                        replace_at = index
                        break
                if replace_at != -1:
                    break
                #else: no threads have finished, keep waiting
        if replace_at == -1:
            #only if len(current_threads) < max_threads
            for i in range(len(self.current_threads)):
                if self.current_threads[i] == None:
                    replace_at = i
                    break
        self.current_threads[replace_at] = m_thread
        self.active_threads += 1
        m_thread.start()

    def waitEnd(self):
        for t in self.current_threads:
            if t.is_alive():
                t.join()
            self.active_threads -= 1
        while True:
            if m_queue.qsize == MAX_Q_SIZE:
                time.sleep(0.1)
                continue
            m_queue.put(end_thread)
            break
        if self.writer.is_alive():
            self.writer.join()


start_time = time.time()

spawner = Spawner(50)
with open("input.txt", "r") as infile:
    for line in infile:
        spawner.sendWork(line)

spawner.waitEnd()
print(" - %s seconds  -" % (time.time() - start_time))

您可以暂时删除打印,我留下这些只是为了参考,以了解我如何计算程序运行所需的时间,下面您可以从task manager中找到两个程序执行的屏幕截图

  • 内存优化版本: Memory optimized version

  • 原始版本(截图时我忘了展开终端进程,不管怎样,备忘录y终端子进程的使用相对于程序使用的子进程是可以忽略的,并且1.3 GB的ram是准确的): Original version

相关问题 更多 >