使用Python多进程管道
我正在尝试写一个类,用来通过多个进程计算校验和,这样可以利用多个核心的优势。我有一个很简单的类,运行起来效果很好,特别是在处理简单的情况时。但是每当我创建两个或更多这个类的实例时,工作进程就不会退出。看起来它似乎没有收到父进程关闭管道的消息。
下面是所有的代码。我首先分别计算md5和sha1的校验和,这个部分是可以正常工作的。然后我尝试并行计算,但当要关闭管道时,程序就卡住了。
这到底是怎么回事呢?为什么管道没有按我预期的那样工作?我想我可以通过在队列中发送一个“停止”消息来解决这个问题,让子进程以这种方式退出,但我真的很想知道为什么现在的方式不奏效。
import multiprocessing
import hashlib
class ChecksumPipe(multiprocessing.Process):
def __init__(self, csname):
multiprocessing.Process.__init__(self, name = csname)
self.summer = eval("hashlib.%s()" % csname)
self.child_conn, self.parent_conn = multiprocessing.Pipe(duplex = False)
self.result_queue = multiprocessing.Queue(1)
self.daemon = True
self.start()
self.child_conn.close() # This is the parent. Close the unused end.
def run(self):
self.parent_conn.close() # This is the child. Close unused end.
while True:
try:
print "Waiting for more data...", self
block = self.child_conn.recv_bytes()
print "Got some data...", self
except EOFError:
print "Finished work", self
break
self.summer.update(block)
self.result_queue.put(self.summer.hexdigest())
self.result_queue.close()
self.child_conn.close()
def update(self, block):
self.parent_conn.send_bytes(block)
def hexdigest(self):
self.parent_conn.close()
return self.result_queue.get()
def main():
# Calculating the first checksum works
md5 = ChecksumPipe("md5")
md5.update("hello")
print "md5 is", md5.hexdigest()
# Calculating the second checksum works
sha1 = ChecksumPipe("sha1")
sha1.update("hello")
print "sha1 is", sha1.hexdigest()
# Calculating both checksums in parallel causes a lockup!
md5, sha1 = ChecksumPipe("md5"), ChecksumPipe("sha1")
md5.update("hello")
sha1.update("hello")
print "md5 and sha1 is", md5.hexdigest(), sha1.hexdigest() # Lockup here!
main()
附注:这个问题已经解决 如果有人感兴趣,这里有一个可以正常工作的代码版本:
import multiprocessing
import hashlib
class ChecksumPipe(multiprocessing.Process):
all_open_parent_conns = []
def __init__(self, csname):
multiprocessing.Process.__init__(self, name = csname)
self.summer = eval("hashlib.%s()" % csname)
self.child_conn, self.parent_conn = multiprocessing.Pipe(duplex = False)
ChecksumPipe.all_open_parent_conns.append(self.parent_conn)
self.result_queue = multiprocessing.Queue(1)
self.daemon = True
self.start()
self.child_conn.close() # This is the parent. Close the unused end.
def run(self):
for conn in ChecksumPipe.all_open_parent_conns:
conn.close() # This is the child. Close unused ends.
while True:
try:
print "Waiting for more data...", self
block = self.child_conn.recv_bytes()
print "Got some data...", self
except EOFError:
print "Finished work", self
break
self.summer.update(block)
self.result_queue.put(self.summer.hexdigest())
self.result_queue.close()
self.child_conn.close()
def update(self, block):
self.parent_conn.send_bytes(block)
def hexdigest(self):
self.parent_conn.close()
return self.result_queue.get()
def main():
# Calculating the first checksum works
md5 = ChecksumPipe("md5")
md5.update("hello")
print "md5 is", md5.hexdigest()
# Calculating the second checksum works
sha1 = ChecksumPipe("sha1")
sha1.update("hello")
print "sha1 is", sha1.hexdigest()
# Calculating both checksums also works fine now
md5, sha1 = ChecksumPipe("md5"), ChecksumPipe("sha1")
md5.update("hello")
sha1.update("hello")
print "md5 and sha1 is", md5.hexdigest(), sha1.hexdigest()
main()
1 个回答
7
是的,这确实是个让人意外的情况。
不过,如果你查看一下两个并行子进程的lsof
输出,你会发现第二个子进程打开的文件描述符比第一个多。
发生的情况是,当两个并行的子进程启动时,第二个子进程会继承父进程的管道。因此,当父进程调用self.parent_conn.close()
时,第二个子进程仍然保持着那个管道的文件描述符是打开的,这样在内核中这个管道的文件描述符就不会被关闭(引用计数大于0),结果就是第一个并行子进程中的self.child_conn.recv_bytes()
永远不会读取到EOF
,也就不会抛出EOFError
错误。
你可能需要发送一个明确的关闭消息,而不仅仅是关闭文件描述符,因为似乎对哪些文件描述符会在不同进程之间共享没有太多控制(没有关闭时复制文件描述符的标志)。