TL;DR:是否有一个python库允许从多个进程并行写入单个gzip文件
详细信息:
我正在尝试使用python将一个大的压缩文件(.gz)复制到另一个压缩文件(.gz)。我将对代码示例中不存在的数据执行中间处理。我希望能够使用带锁的多处理从多个进程并行地写入新的gzip,但是我在输出gz文件上得到了一个无效的格式错误
我假设这是因为锁不足以支持并行写入gzip。由于压缩数据需要“了解”之前的数据,以便在归档文件中创建正确的条目,我认为python在默认情况下无法处理这一点。我猜每个进程都保持自己对gzip输出的感知,并且这种状态在第一次写入后会发生变化
如果我在脚本中打开目标文件而不使用gzip,那么这一切都可以正常工作。我也可以编写多个gzip并合并它们,但如果可能的话,我更愿意避免这样做
以下是我的源代码:
#python3.8
import gzip
from itertools import islice
from multiprocessing import Process, Queue, Lock
def reader(infile, data_queue, coordinator_queue, chunk_size):
print("Reader Started.")
while True:
data_chunk = list(islice(infile, chunk_size))
data_queue.put(data_chunk)
coordinator_queue.put('CHUNK_READ')
if not data_chunk:
coordinator_queue.put('READ_DONE')
#Process exit
break
def writer(outfile, data_queue, coordinator_queue, write_lock, ID):
print("Writer Started.")
while True:
queue_message = data_queue.get()
if (queue_message == 'DONE'):
outfile.flush()
coordinator_queue.put('WRITE_DONE')
#Process exit
break
else:
print("Writer",ID,"-","Write Lock:",write_lock)
write_lock.acquire()
print("Writer",ID,"-","Write Lock:",write_lock)
for line in queue_message:
print("Line write:",line)
outfile.write(line)
write_lock.release()
print("Writer",ID,"-","Write Lock:",write_lock)
def coordinator(reader_procs, writer_procs, coordinator_queue, data_queue):
print("Coordinator Started.")
active_readers=reader_procs
active_writers=writer_procs
while True:
queue_message = coordinator_queue.get()
if queue_message=='READ_DONE':
active_readers = active_readers-1
if active_readers == 0:
while not data_queue.qsize() == 0:
continue
[data_queue.put('DONE') for x in range(writer_procs)]
if queue_message=='WRITE_DONE':
active_writers = active_writers-1
if active_writers == 0:
break
def main():
reader_procs=1
writer_procs=2
chunk_size=1
queue_size=96
data_queue = Queue(queue_size)
coordinator_queue=Queue()
write_lock=Lock()
infile_path='/directory/input_records.json.gz'
infile = gzip.open(infile_path, 'rt')
outfile_path='/directory/output_records.json.gz'
outfile = gzip.open(outfile_path, 'wt')
#Works when it is uncompressed
#outfile=open(outfile_path, 'w')
readers = [Process(target=reader, args=(infile, data_queue, coordinator_queue, chunk_size)) for x in range(reader_procs)]
writers = [Process(target=writer, args=(outfile, data_queue, coordinator_queue, write_lock, x)) for x in range(writer_procs)]
coordinator_p = Process(target=coordinator, args=(reader_procs, writer_procs, coordinator_queue, data_queue))
coordinator_p.start()
for process in readers:
process.start()
for process in writers:
process.start()
for process in readers:
process.join()
for process in writers:
process.join()
coordinator_p.join()
outfile.flush()
outfile.close()
main()
代码注释:
我想我需要一个库,它能够以某种方式协调不同进程之间的压缩写入。显然,这意味着使用单个进程来执行写操作(如协调进程),但这可能会引入瓶颈
堆栈上有与some相关的帖子,但似乎没有一篇专门针对我试图做的事情。我还看到像“mgzip”、“pigz”和“migz”这样的实用程序可以并行压缩,但我认为它们不适用于这个用例。mgzip在我的测试(0大小的文件)中不起作用,pigz似乎在命令行上使用整个文件作为输入,migz是一个java库,因此我不确定如何将其集成到python中
如果不能做到这一点,那么就这样做,但任何答案将不胜感激
----更新和工作代码:
在MarkAdler的帮助下,我能够创建一个并行压缩数据的多处理脚本,并有一个编写器进程将其添加到目标gz文件中。由于现代NVME驱动器上的吞吐量,这降低了在成为I/O绑定之前受压缩限制的CPU的可能性
要使此代码正常工作,需要进行的最大更改如下:
gzip.compress(bytes(string, 'utf-8'),compresslevel=9)
需要压缩单个“块”或“流”:file = open(outfile, 'wb')
是打开可以成为目标gzip的未编码二进制输出文件所必需的file.write()
操作必须从单个进程执行,因为它必须串行执行值得注意的是,并不是并行写入文件,而是并行处理压缩。无论如何,压缩是这个过程中最繁重的部分
更新的代码(已测试并按原样工作):
#python3.8
import gzip
from itertools import islice
from multiprocessing import Process, Queue
def reader(infile, data_queue, coordinator_queue, chunk_size):
print("Reader Started.")
while True:
data_chunk = list(islice(infile, chunk_size))
data_queue.put(data_chunk)
coordinator_queue.put('CHUNK_READ')
if not data_chunk:
coordinator_queue.put('READ_DONE')
#Process exit
break
def compressor(data_queue, compressed_queue, coordinator_queue):
print("Compressor Started.")
while True:
chunk = ''
queue_message = data_queue.get()
if (queue_message == 'DONE'):
#Notify coordinator process of task completion
coordinator_queue.put('COMPRESS_DONE')
#Process exit
break
else:
for line in queue_message:
#Assemble concatenated string from list
chunk += line
#Encode the string as binary so that it can be compressed
#Setting gzip compression level to 9 (highest)
compressed_chunk=gzip.compress(bytes(chunk,'utf-8'),compresslevel=9)
compressed_queue.put(compressed_chunk)
def writer(outfile, compressed_queue, coordinator_queue):
print("Writer Started.")
while True:
queue_message = compressed_queue.get()
if (queue_message == 'DONE'):
#Notify coordinator process of task completion
coordinator_queue.put('WRITE_DONE')
#Process exit
break
else:
outfile.write(queue_message)
def coordinator(reader_procs, writer_procs, compressor_procs, coordinator_queue, data_queue, compressed_queue):
print("Coordinator Started.")
active_readers=reader_procs
active_compressors=compressor_procs
active_writers=writer_procs
while True:
queue_message = coordinator_queue.get()
if queue_message=='READ_DONE':
active_readers = active_readers-1
if active_readers == 0:
while not data_queue.qsize() == 0:
continue
[data_queue.put('DONE') for x in range(compressor_procs)]
if queue_message=='COMPRESS_DONE':
active_compressors = active_compressors-1
if active_compressors == 0:
while not compressed_queue.qsize() == 0:
continue
[compressed_queue.put('DONE') for x in range(writer_procs)]
if queue_message=='WRITE_DONE':
active_writers = active_writers-1
if active_writers == 0:
break
def main():
reader_procs=1
compressor_procs=2
#writer_procs really needs to stay as 1 since writing must be done serially
#This could probably be written out...
writer_procs=1
chunk_size=600
queue_size=96
data_queue = Queue(queue_size)
compressed_queue=Queue(queue_size)
coordinator_queue=Queue()
infile_path='/directory/input_records.json.gz'
infile = gzip.open(infile_path, 'rt')
outfile_path='/directory/output_records.json.gz'
outfile=open(outfile_path, 'wb')
readers = [Process(target=reader, args=(infile, data_queue, coordinator_queue, chunk_size)) for x in range(reader_procs)]
compressors = [Process(target=compressor, args=(data_queue, compressed_queue, coordinator_queue)) for x in range(compressor_procs)]
writers = [Process(target=writer, args=(outfile, compressed_queue, coordinator_queue)) for x in range(writer_procs)]
coordinator_p = Process(target=coordinator, args=(reader_procs, writer_procs, compressor_procs, coordinator_queue, data_queue, compressed_queue))
coordinator_p.start()
for process in readers:
process.start()
for process in compressors:
process.start()
for process in writers:
process.start()
for process in compressors:
process.join()
for process in readers:
process.join()
for process in writers:
process.join()
coordinator_p.join()
outfile.flush()
outfile.close()
main()
通过将每个线程的完整gzip流写入单个输出文件,实际上非常简单。是的,您需要一个线程来完成所有的写操作,在另一个压缩线程开始写之前,每个压缩线程轮流写其gzip流的all。压缩线程都可以并行进行压缩,但写入需要序列化
这样做的原因是gzip标准RFC 1952说gzip文件由一系列成员组成,其中每个成员都是gzip头、压缩数据和gzip尾部
相关问题 更多 >
编程相关推荐