Python:使用多进程池通过队列写入单个文件
我有成千上万的文本文件,想用不同的方式来处理它们。我希望把处理的结果保存到一个文件里,避免出现同步问题。我之前一直在用多进程池来节省时间,但我不知道怎么把池和队列结合起来。
下面的代码会保存输入文件的名字以及文件中连续出现的“x”的最大数量。不过,我希望所有的进程都能把结果保存到同一个文件里,而不是像我示例中那样保存到不同的文件里。任何帮助都会非常感激。
import multiprocessing
with open('infilenamess.txt') as f:
filenames = f.read().splitlines()
def mp_worker(filename):
with open(filename, 'r') as f:
text=f.read()
m=re.findall("x+", text)
count=len(max(m, key=len))
outfile=open(filename+'_results.txt', 'a')
outfile.write(str(filename)+'|'+str(count)+'\n')
outfile.close()
def mp_handler():
p = multiprocessing.Pool(32)
p.map(mp_worker, filenames)
if __name__ == '__main__':
mp_handler()
3 个回答
3
这是我使用多进程管理器对象的方法。这种方法的好处在于,当处理程序在 run_multi() 函数中退出时,文件写入队列会自动关闭,这样代码就变得非常简洁易懂,你也不用费心去停止监听这个队列。
from functools import partial
from multiprocessing import Manager, Pool, Queue
from random import randint
import time
def run_multi():
input = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
with Manager() as manager:
pool = Pool() # By default pool will size depending on cores available
message_queue = manager.Queue() # Queue for sending messages to file writer listener
pool.apply_async(file_writer, (message_queue, )) # Start file listener ahead of doing the work
pool.map(partial(worker, message_queue=message_queue), input) # Partial function allows us to use map to divide workload
def worker(input: int, message_queue: Queue):
message_queue.put(input * 10)
time.sleep(randint(1, 5)) # Simulate hard work
def file_writer(message_queue: Queue):
with open("demo.txt", "a") as report:
while True:
report.write(f"Value is: {message_queue.get()}\n")
if __name__ == "__main__":
run_multi()
13
我把被认可的答案简化了一下,以便更好地理解这个是怎么回事。我把它发在这里,希望能帮到其他人。
import multiprocessing
def mp_worker(number):
number += 1
return number
def mp_handler():
p = multiprocessing.Pool(32)
numbers = list(range(1000))
with open('results.txt', 'w') as f:
for result in p.imap(mp_worker, numbers):
f.write('%d\n' % result)
if __name__=='__main__':
mp_handler()
45
多进程池为你实现了一个队列。你只需要使用一个池的方法,它会把工作者的返回值传给调用者。使用 imap 方法效果很好:
import multiprocessing
import re
def mp_worker(filename):
with open(filename) as f:
text = f.read()
m = re.findall("x+", text)
count = len(max(m, key=len))
return filename, count
def mp_handler():
p = multiprocessing.Pool(32)
with open('infilenamess.txt') as f:
filenames = [line for line in (l.strip() for l in f) if line]
with open('results.txt', 'w') as f:
for result in p.imap(mp_worker, filenames):
# (filename, count) tuples from worker
f.write('%s: %d\n' % result)
if __name__=='__main__':
mp_handler()