使用多进程写入文件

16 投票
3 回答
24213 浏览
提问于 2025-04-16 20:32

我在用Python的时候遇到了一个问题。

我需要进行一些并行计算,然后把结果按顺序写入一个文件。所以我创建了一个函数,这个函数接收一个 multiprocessing.Queue 和一个文件句柄,进行计算并把结果打印到文件里:

import multiprocessing
from multiprocessing import Process, Queue
from mySimulation import doCalculation   

# doCalculation(pars) is a function I must run for many different sets of parameters and collect the results in a file

def work(queue, fh):
while True:
    try:
        parameter = queue.get(block = False)
        result = doCalculation(parameter) 
        print >>fh, string
    except:
        break


if __name__ == "__main__":
    nthreads = multiprocessing.cpu_count()
    fh = open("foo", "w")
    workQueue = Queue()
    parList = # list of conditions for which I want to run doCalculation()
    for x in parList:
        workQueue.put(x)
    processes = [Process(target = writefh, args = (workQueue, fh)) for i in range(nthreads)]
    for p in processes:
       p.start()
    for p in processes:
       p.join()
    fh.close()

但是在脚本运行后,文件却是空的。我尝试把worker()函数改成:

def work(queue, filename):
while True:
    try:
        fh = open(filename, "a")
        parameter = queue.get(block = False)
        result = doCalculation(parameter) 
        print >>fh, string
        fh.close()
    except:
        break

然后把文件名作为参数传进去。这样就按照我想的那样工作了。当我尝试不使用多进程,顺序执行的时候,它也能正常工作。

为什么第一种版本不行呢?我看不出问题出在哪里。

还有:我能保证两个进程不会同时写文件吗?


编辑:

谢谢。我明白了。这是可以正常工作的版本:

import multiprocessing
from multiprocessing import Process, Queue
from time import sleep
from random import uniform

def doCalculation(par):
    t = uniform(0,2)
    sleep(t)
    return par * par  # just to simulate some calculation

def feed(queue, parlist):
    for par in parlist:
            queue.put(par)

def calc(queueIn, queueOut):
    while True:
        try:
            par = queueIn.get(block = False)
            print "dealing with ", par, "" 
            res = doCalculation(par)
            queueOut.put((par,res))
        except:
            break

def write(queue, fname):
    fhandle = open(fname, "w")
    while True:
        try:
            par, res = queue.get(block = False)
            print >>fhandle, par, res
        except:
            break
    fhandle.close()

if __name__ == "__main__":
    nthreads = multiprocessing.cpu_count()
    fname = "foo"
    workerQueue = Queue()
    writerQueue = Queue()
    parlist = [1,2,3,4,5,6,7,8,9,10]
    feedProc = Process(target = feed , args = (workerQueue, parlist))
    calcProc = [Process(target = calc , args = (workerQueue, writerQueue)) for i in range(nthreads)]
    writProc = Process(target = write, args = (writerQueue, fname))


    feedProc.start()
    for p in calcProc:
        p.start()
    writProc.start()

    feedProc.join ()
    for p in calcProc:
        p.join()
    writProc.join ()

3 个回答

0

在写入工作者的代码里有个错误,如果设置了block为false,工作者就永远得不到任何数据。应该改成这样:

par, res = queue.get(block = True)

你可以通过在

 print "QSize",queueOut.qsize()

后面加一行来检查这个问题,

queueOut.put((par,res))

当block=False时,队列的长度会不断增加,直到填满;而如果是block=True,队列的长度总是“1”。

9

如果有人在寻找简单的方法来实现这个功能,这个方法可能会对你有帮助。 我觉得用这种方式没有什么缺点。如果有的话,请告诉我。

import multiprocessing 
import re

def mp_worker(item):
    # Do something
    return item, count

def mp_handler():
    cpus = multiprocessing.cpu_count()
    p = multiprocessing.Pool(cpus)
    # The below 2 lines populate the list. This listX will later be accessed parallely. This can be replaced as long as listX is passed on to the next step.
    with open('ExampleFile.txt') as f:
        listX = [line for line in (l.strip() for l in f) if line]
    with open('results.txt', 'w') as f:
        for result in p.imap(mp_worker, listX):
            # (item, count) tuples from worker
            f.write('%s: %d\n' % result)

if __name__=='__main__':
    mp_handler()

来源:Python:在使用多进程池时通过队列写入单个文件

20

你真的应该使用两个队列和三种不同的处理方式。

  1. 把东西放进队列 #1。

  2. 从队列 #1 取出东西进行计算,然后把结果放进队列 #2。你可以有很多这样的处理,因为它们是从一个队列取东西,再安全地放入另一个队列。

  3. 从队列 #2 取出东西并写入文件。你必须只有一个这样的处理,不能多。它“拥有”这个文件,确保文件的访问是原子的,并且绝对保证文件写入时是干净和一致的。

撰写回答