为什么?多处理.池以及多处理过程在Linux中表现得如此不同

2024-06-16 08:48:36 发布

您现在位置:Python中文网/ 问答频道 /正文

我运行了下面的一些测试代码来检查在Linux中使用池和进程的性能。我使用的是python2.7。的源代码多处理.池似乎显示它在使用多处理过程. 然而,多处理.池花费的时间和内存比多处理过程,我不明白。在

以下是我所做的:

  1. 创建一个大dict,然后创建子流程。

  2. 将dict传递给每个子进程以只读。

  3. 每个子进程进行一些计算并返回一个小结果。

以下是我的测试代码:

from multiprocessing import Pool, Process, Queue
import time, psutil, os, gc

gct = time.time
costTime = lambda ET: time.strftime('%H:%M:%S', time.gmtime(int(ET)))

def getMemConsumption():
    procId = os.getpid()
    proc = psutil.Process(procId)
    mem = proc.memory_info().rss
    return "process ID %d.\nMemory usage: %.6f GB" % (procId, mem*1.0/1024**3)

def f_pool(l, n, jobID):
    try:
        result = {}
        # example of subprocess work
        for i in xrange(n):
            result[i] = l[i]
        # work done
        # gc.collect()
        print getMemConsumption()
        return 1, result, jobID
    except:
        return 0, {}, jobID

def f_proc(q, l, n, jobID):
    try:
        result = {}
        # example of subprocess work
        for i in xrange(n):
            result[i] = l[i]
        # work done
        print getMemConsumption()
        q.put([1, result, jobID])
    except:
        q.put([0, {}, jobID])

def initialSubProc(targetFunc, procArgs, jobID):
    outQueue = Queue()
    args = [outQueue]
    args.extend(procArgs)
    args.append(jobID)
    p = Process(target = targetFunc, args = tuple(args))
    p.start()
    return p, outQueue


def track_add_Proc(procList, outQueueList, maxProcN, jobCount, 
                   maxJobs, targetFunc, procArgs, joinFlag, all_result):
    if len(procList) < maxProcN:
        p, q = initialSubProc(targetFunc, procArgs, jobCount)
        outQueueList.append(q)
        procList.append(p)
        jobCount += 1
        joinFlag.append(0)
    else:
        for i in xrange(len(procList)):
            if not procList[i].is_alive() and joinFlag[i] == 0:
                procList[i].join()
                all_results.append(outQueueList[i].get())
                joinFlag[i] = 1 # in case of duplicating result of joined subprocess
                if jobCount < maxJobs:
                    p, q = initialSubProc(targetFunc, procArgs, jobCount)
                    procList[i] = p
                    outQueueList[i] = q
                    jobCount += 1
                    joinFlag[i] = 0
    return jobCount

if __name__ == '__main__':
    st = gct()
    d = {i:i**2 for i in xrange(10000000)}
    print "MainProcess create data dict\n%s" % getMemConsumption()
    print 'Time to create dict: %s\n\n' % costTime(gct()-st)

    nproc = 2
    jobs = 8
    subProcReturnDictLen = 1000
    procArgs = [d, subProcReturnDictLen]

    print "Use multiprocessing.Pool, max subprocess = %d, jobs = %d\n" % (nproc, jobs)
    st = gct()
    pool = Pool(processes = nproc)
    for i in xrange(jobs):
        procArgs.append(i)
        sp = pool.apply_async(f_pool, tuple(procArgs))
        procArgs.pop(2)
        res = sp.get()
        if res[0] == 1:
            # do something with the result
            pass
        else:
            # do something with subprocess exception handle
            pass
    pool.close()
    pool.join()
    print "Total time used to finish all jobs: %s" % costTime(gct()-st)
    print "Main Process\n", getMemConsumption(), '\n'

    print "Use multiprocessing.Process, max subprocess = %d, jobs = %d\n" % (nproc, jobs)
    st = gct()
    procList = []
    outQueueList = []
    all_results = []
    jobCount = 0
    joinFlag = []
    while (jobCount < jobs):
        jobCount = track_add_Proc(procList, outQueueList, nproc, jobCount, 
                                  jobs, f_proc, procArgs, joinFlag, all_results)
    for i in xrange(nproc):
        if joinFlag[i] == 0:
            procList[i].join()
            all_results.append(outQueueList[i].get())
            joinFlag[i] = 1
    for i in xrange(jobs):
        res = all_results[i]
        if res[0] == 1:
            # do something with the result
            pass
        else:
            # do something with subprocess exception handle
            pass
    print "Total time used to finish all jobs: %s" % costTime(gct()-st)
    print "Main Process\n", getMemConsumption()

结果如下:

^{pr2}$

我不知道为什么子流程来自多处理.池开始时需要大约1.6GB,但子进程多处理过程只需要0.84gbs,相当于主进程的内存开销。在我看来多处理过程享受linux的“写时拷贝”优势,因为所有需要的作业时间不到1秒,我不知道为什么多处理.池不喜欢这样。从源代码来看,多处理.池看起来像是多处理过程. 在


Tags: infortimejobsresultallsubprocessprint
1条回答
网友
1楼 · 发布于 2024-06-16 08:48:36

Question: I don't know why subprocesses from multiprocessing.Pool need about 1.6GB in the beginning,
... Pool seems like a wrapper of multiprocessing.Process

这是因为Pool为所有作业的结果保留内存。
第二,Pool使用两个SimpleQueue()三个Threads
第三,在传递给process之前,Pool复制所有传递的argv数据。在

您的process示例只使用一个Queue()作为所有,按原样传递argv。在

Pool远不能成为包装器。在

相关问题 更多 >