Python:为什么加入让我久等?

2024-03-28 10:38:16 发布

您现在位置:Python中文网/ 问答频道 /正文

我想在10000个模型上做聚类。在此之前,我必须计算每两个模型的皮尔逊相关系数。这是一个很大的计算量,所以我使用多重处理来生成进程,将计算作业分配到16CPU。我的代码如下:

import numpy as np
from multiprocessing import Process, Queue

def cc_calculator(begin, end, q):
    index=lambda i,j,n: i*n+j-i*(i+1)/2-i-1
    for i in range(begin, end):
        for j in range(i, nmodel):
            all_cc[i][j]=get_cc(i,j)
            q.put((index(i,j,nmodel),all_cc[i][j]))

def func(i):
    res=(16-i)/16
    res=res**0.5
    res=int(nmodel*(1-res))
    return res

nmodel=int(raw_input("Entering the number of models:"))
all_cc=np.zeros((nmodel,nmodel))
ncc=int(nmodel*(nmodel-1)/2)
condensed_cc=[0]*ncc
q=Queue()
mprocess=[]

for ii in range(16):
    begin=func(i)
    end=func(i+1)
    p=Process(target=cc_calculator,args=(begin,end,q))
    mprocess+=[p]
    p.start()

for x in mprocess:
    x.join()

while not q.empty():
    (ind, value)=q.get()
    ind=int(ind)
    condensed_cc[ind]=value
np.save("condensed_cc",condensed_cc)

其中get_cc(i,j)计算与模型i和j相关的相关系数。所有的_cc是一个上三角矩阵,所有的_cc[i][j]存储cc值。condensed\u cc是all\u cc的另一个版本。我将把它处理成一个压缩的区域来进行聚类。“func”函数有助于为每个cpu分配几乎相同的计算量。你知道吗

我用nmodel=20成功运行了程序。但是,当我尝试用nmodel=10000运行程序时,似乎永远不会结束。我等待了大约两天,在另一个终端窗口中使用top命令,没有命令为“python”的进程仍在运行。但程序仍在运行,没有输出文件。我使用Ctrl+C强制它停止,它指向行:x.join()。nmodel=40运行速度很快,但失败了,出现了相同的问题。你知道吗

也许这个问题与q有关,因为如果我注释行:q.put(…),它就会运行成功。或者像这样:

q.put(...)
q.get()

它也是好吧,但是这两种方法都不会给出正确的结论。它们不会改变所有的\u cc或压缩的\u cc。你知道吗

另一个仅包含一个子流程的示例:

from multiprocessing import Process, Queue

def g(q):
    num=10**2
    for i in range(num):
        print '='*10
        print i
        q.put((i,i+2))
        print "qsize: ", q.qsize()

q=Queue()
p=Process(target=g,args=(q,))
p.start()
p.join()

while not q.empty():
    q.get()

num=100可以,num=10000失败。即使num=100**2,他们也打印了所有i和q。我不明白为什么。另外,Ctrl+C会使跟踪返回到p.join()。你知道吗

我想说更多关于队列大小的问题。Documentation about Queue and its put method将Queue作为Queue([maxsize])引入,并说明put方法:…阻止if在空闲插槽可用之前是必需的。这些都使人认为子进程被阻塞是因为队列的空间用完了。但是,正如我之前在第二个示例中提到的,屏幕上打印的结果证明qsize在增加,这意味着队列没有满。我加上一行:

print q.full()

在print size语句之后,num=10000总是false,而程序仍然停留在某个地方。强调一点:另一个终端中的top命令不显示命令python的进程。我真的很困惑。你知道吗

我使用的是python2.7.9。你知道吗


Tags: inforgetqueueput进程resprocess
1条回答
网友
1楼 · 发布于 2024-03-28 10:38:16

我相信您遇到的问题在多处理编程指南中有描述:https://docs.python.org/2/library/multiprocessing.html#multiprocessing-programming

特别是本节:

Joining processes that use queues

Bear in mind that a process that has put items in a queue will wait before terminating until all the buffered items are fed by the “feeder” thread to the underlying pipe. (The child process can call the cancel_join_thread() method of the queue to avoid this behaviour.)

This means that whenever you use a queue you need to make sure that all items which have been put on the queue will eventually be removed before the process is joined. Otherwise you cannot be sure that processes which have put items on the queue will terminate. Remember also that non-daemonic processes will be joined automatically.

An example which will deadlock is the following:

from multiprocessing import Process, Queue

def f(q):
    q.put('X' * 1000000)

if __name__ == '__main__':
    queue = Queue()
    p = Process(target=f, args=(queue,))
    p.start()
    p.join()                    # this deadlocks
    obj = queue.get()

A fix here would be to swap the last two lines (or simply remove the p.join() line).

您可能还想查看“避免共享状态”部分。你知道吗

看起来您正在使用.join来避免q.empty()在添加某些内容之前返回True的竞争条件。在使用多处理(或多线程)时,根本不应该依赖.empty()。相反,您应该通过信号从工作进程发送到主进程来处理这个问题,当它完成向队列添加项时。这通常是通过在队列中放置一个sentinal值来完成的,但也有其他选项。你知道吗

相关问题 更多 >