Python多处理并行写入单个gzip

2024-06-12 22:15:49 发布

您现在位置:Python中文网/ 问答频道 /正文

TL;DR:是否有一个python库允许从多个进程并行写入单个gzip文件

详细信息:

我正在尝试使用python将一个大的压缩文件(.gz)复制到另一个压缩文件(.gz)。我将对代码示例中不存在的数据执行中间处理。我希望能够使用带锁的多处理从多个进程并行地写入新的gzip,但是我在输出gz文件上得到了一个无效的格式错误

我假设这是因为锁不足以支持并行写入gzip。由于压缩数据需要“了解”之前的数据,以便在归档文件中创建正确的条目,我认为python在默认情况下无法处理这一点。我猜每个进程都保持自己对gzip输出的感知,并且这种状态在第一次写入后会发生变化

如果我在脚本中打开目标文件而不使用gzip,那么这一切都可以正常工作。我也可以编写多个gzip并合并它们,但如果可能的话,我更愿意避免这样做

以下是我的源代码:

#python3.8
import gzip
from itertools import islice
from multiprocessing import Process, Queue, Lock

def reader(infile, data_queue, coordinator_queue, chunk_size):
    print("Reader Started.")
    while True:
        data_chunk = list(islice(infile, chunk_size))
        data_queue.put(data_chunk)
        coordinator_queue.put('CHUNK_READ')
        if not data_chunk:
            coordinator_queue.put('READ_DONE')
            #Process exit
            break
        
def writer(outfile, data_queue, coordinator_queue, write_lock, ID):
    print("Writer Started.")
    while True:
        queue_message = data_queue.get()
        if (queue_message == 'DONE'):
            outfile.flush() 
            coordinator_queue.put('WRITE_DONE')
            #Process exit
            break
        else:
            print("Writer",ID,"-","Write Lock:",write_lock)
            write_lock.acquire()
            print("Writer",ID,"-","Write Lock:",write_lock)
            for line in queue_message:
                print("Line write:",line)
                outfile.write(line)
            write_lock.release()
            print("Writer",ID,"-","Write Lock:",write_lock)

def coordinator(reader_procs, writer_procs, coordinator_queue, data_queue):
    print("Coordinator Started.")
    active_readers=reader_procs
    active_writers=writer_procs
    while True:
        queue_message = coordinator_queue.get()
        if queue_message=='READ_DONE':
            active_readers = active_readers-1
            if active_readers == 0:
                while not data_queue.qsize() == 0:
                    continue
                [data_queue.put('DONE') for x in range(writer_procs)]
        if queue_message=='WRITE_DONE':
            active_writers = active_writers-1
            if active_writers == 0:
                break

def main():
    reader_procs=1
    writer_procs=2
    chunk_size=1
    queue_size=96
    data_queue = Queue(queue_size)
    coordinator_queue=Queue()
    write_lock=Lock()
    infile_path='/directory/input_records.json.gz'
    infile = gzip.open(infile_path, 'rt')
    outfile_path='/directory/output_records.json.gz'
    outfile = gzip.open(outfile_path, 'wt')
    #Works when it is uncompressed
    #outfile=open(outfile_path, 'w')
    readers = [Process(target=reader, args=(infile, data_queue, coordinator_queue, chunk_size)) for x in range(reader_procs)]
    writers = [Process(target=writer, args=(outfile, data_queue, coordinator_queue, write_lock, x)) for x in range(writer_procs)]   
    coordinator_p = Process(target=coordinator, args=(reader_procs, writer_procs, coordinator_queue, data_queue))

    coordinator_p.start()
    for process in readers:
        process.start()
    for process in writers:
        process.start()
    for process in readers:
        process.join()
    for process in writers:
        process.join()
    coordinator_p.join()
    outfile.flush()
    outfile.close()

main()

代码注释:

  • “块大小”确定从输入文件中提取的行数
    • 我一直在使用一个小得多的测试文件,同时试图使这项工作
  • 我的输入文件解压缩时超过200gb
  • 解压时,我的输出文件将超过200gb
  • 这个版本的代码已经被删减,可能有一些错误,但它直接基于我正在运行的内容。
    • 脚本的所有功能区域都已保留

我想我需要一个库,它能够以某种方式协调不同进程之间的压缩写入。显然,这意味着使用单个进程来执行写操作(如协调进程),但这可能会引入瓶颈

堆栈上有与some相关的帖子,但似乎没有一篇专门针对我试图做的事情。我还看到像“mgzip”、“pigz”和“migz”这样的实用程序可以并行压缩,但我认为它们不适用于这个用例。mgzip在我的测试(0大小的文件)中不起作用,pigz似乎在命令行上使用整个文件作为输入,migz是一个java库,因此我不确定如何将其集成到python中

如果不能做到这一点,那么就这样做,但任何答案将不胜感激

----更新和工作代码:

在MarkAdler的帮助下,我能够创建一个并行压缩数据的多处理脚本,并有一个编写器进程将其添加到目标gz文件中。由于现代NVME驱动器上的吞吐量,这降低了在成为I/O绑定之前受压缩限制的CPU的可能性

要使此代码正常工作,需要进行的最大更改如下:

  • gzip.compress(bytes(string, 'utf-8'),compresslevel=9)需要压缩单个“块”或“流”:
  • file = open(outfile, 'wb')是打开可以成为目标gzip的未编码二进制输出文件所必需的
  • file.write()操作必须从单个进程执行,因为它必须串行执行

值得注意的是,并不是并行写入文件,而是并行处理压缩。无论如何,压缩是这个过程中最繁重的部分

更新的代码(已测试并按原样工作):

#python3.8
import gzip
from itertools import islice
from multiprocessing import Process, Queue

def reader(infile, data_queue, coordinator_queue, chunk_size):
    print("Reader Started.")
    while True:
        data_chunk = list(islice(infile, chunk_size))
        data_queue.put(data_chunk)
        coordinator_queue.put('CHUNK_READ')
        if not data_chunk:
            coordinator_queue.put('READ_DONE')
            #Process exit
            break

def compressor(data_queue, compressed_queue, coordinator_queue):
    print("Compressor Started.")
    while True:
        chunk = ''
        queue_message = data_queue.get()
        if (queue_message == 'DONE'):
            #Notify coordinator process of task completion      
            coordinator_queue.put('COMPRESS_DONE')
            #Process exit
            break
        else:
            for line in queue_message:
                #Assemble concatenated string from list
                chunk += line
            #Encode the string as binary so that it can be compressed
            #Setting gzip compression level to 9 (highest)
            compressed_chunk=gzip.compress(bytes(chunk,'utf-8'),compresslevel=9)            
            compressed_queue.put(compressed_chunk)

def writer(outfile, compressed_queue, coordinator_queue):
    print("Writer Started.")
    while True:
        queue_message = compressed_queue.get()
        if (queue_message == 'DONE'):
            #Notify coordinator process of task completion      
            coordinator_queue.put('WRITE_DONE')
            #Process exit
            break
        else:
            outfile.write(queue_message)

def coordinator(reader_procs, writer_procs, compressor_procs, coordinator_queue, data_queue, compressed_queue):
    print("Coordinator Started.")
    active_readers=reader_procs
    active_compressors=compressor_procs
    active_writers=writer_procs
    while True:
        queue_message = coordinator_queue.get()
        if queue_message=='READ_DONE':
            active_readers = active_readers-1
            if active_readers == 0:
                while not data_queue.qsize() == 0:
                    continue
                [data_queue.put('DONE') for x in range(compressor_procs)]
        if queue_message=='COMPRESS_DONE':
            active_compressors = active_compressors-1
            if active_compressors == 0:
                while not compressed_queue.qsize() == 0:
                    continue
                [compressed_queue.put('DONE') for x in range(writer_procs)]
        if queue_message=='WRITE_DONE':
            active_writers = active_writers-1
            if active_writers == 0:
                break

def main():
    reader_procs=1
    compressor_procs=2
    #writer_procs really needs to stay as 1 since writing must be done serially
    #This could probably be written out...
    writer_procs=1
    chunk_size=600
    queue_size=96
    data_queue = Queue(queue_size)
    compressed_queue=Queue(queue_size)
    coordinator_queue=Queue()
    infile_path='/directory/input_records.json.gz'
    infile = gzip.open(infile_path, 'rt')
    outfile_path='/directory/output_records.json.gz'
    outfile=open(outfile_path, 'wb')
    readers = [Process(target=reader, args=(infile, data_queue, coordinator_queue, chunk_size)) for x in range(reader_procs)]
    compressors = [Process(target=compressor, args=(data_queue, compressed_queue, coordinator_queue)) for x in range(compressor_procs)]
    writers = [Process(target=writer, args=(outfile, compressed_queue, coordinator_queue)) for x in range(writer_procs)]
    coordinator_p = Process(target=coordinator, args=(reader_procs, writer_procs, compressor_procs, coordinator_queue, data_queue, compressed_queue))
    coordinator_p.start()
    for process in readers:
        process.start()
    for process in compressors:
        process.start()     
    for process in writers:
        process.start()
    for process in compressors:
        process.join()
    for process in readers:
        process.join()
    for process in writers:
        process.join()
    coordinator_p.join()
    outfile.flush()
    outfile.close()

main()

Tags: inmessagefordataifqueueprocessoutfile
1条回答
网友
1楼 · 发布于 2024-06-12 22:15:49

通过将每个线程的完整gzip流写入单个输出文件,实际上非常简单。是的,您需要一个线程来完成所有的写操作,在另一个压缩线程开始写之前,每个压缩线程轮流写其gzip流的all。压缩线程都可以并行进行压缩,但写入需要序列化

这样做的原因是gzip标准RFC 1952说gzip文件由一系列成员组成,其中每个成员都是gzip头、压缩数据和gzip尾部

相关问题 更多 >