Python并发:使用apply_async()时挂起
我正在学习Python的并发编程。作为一个实验,我写了一个程序,使用了进程池,并通过apply_async()来调用工作进程。为了在进程之间共享信息(工作和结果),我使用了multiprocessing.Manager()中的队列。
不过,这段代码有时候会卡住,尤其是在工作队列中的所有任务都处理完后,我不太明白为什么。为了观察这个现象,我得多次运行程序。
顺便提一下,我其实可以让这个程序正常工作:我找到了一种设计模式,有人称之为“毒丸”方法,这似乎有效。(在我的worker()方法中,我进入一个无限循环,只有当我的工作队列中出现一个特定的值时才会跳出循环。我在工作队列中创建的特定值数量和正在运行的进程数量是一样的。)
但我仍然想弄清楚为什么这段代码会卡住。 我得到的输出如下(进程ID在括号中):
Found 8 CPUs.
Operation queue has 20 items.
Will start 2 processes.
Joining pool...
[5885] entering worker() with work_queue size of 20
[5885] processed work item 0
[5885] worker() still running because work_queue has size 19
[5885] processed work item 1
[5885] worker() still running because work_queue has size 18
[5885] processed work item 2
[5885] worker() still running because work_queue has size 17
[5885] processed work item 3
[5885] worker() still running because work_queue has size 16
[5885] processed work item 4
[5885] worker() still running because work_queue has size 15
[5885] processed work item 5
[5886] entering worker() with work_queue size of 14
[5885] worker() still running because work_queue has size 14
[5886] processed work item 6
[5886] worker() still running because work_queue has size 13
[5885] processed work item 7
[5886] processed work item 8
[5885] worker() still running because work_queue has size 11
[5886] worker() still running because work_queue has size 11
[5885] processed work item 9
[5886] processed work item 10
[5885] worker() still running because work_queue has size 9
[5886] worker() still running because work_queue has size 9
[5885] processed work item 11
[5886] processed work item 12
[5885] worker() still running because work_queue has size 7
[5886] worker() still running because work_queue has size 7
[5885] processed work item 13
[5886] processed work item 14
[5885] worker() still running because work_queue has size 5
[5886] worker() still running because work_queue has size 5
[5885] processed work item 15
[5886] processed work item 16
[5885] worker() still running because work_queue has size 3
[5886] worker() still running because work_queue has size 3
[5885] processed work item 17
[5886] processed work item 18
[5885] worker() still running because work_queue has size 1
[5886] worker() still running because work_queue has size 1
[5885] processed work item 19
[5885] worker() still running because work_queue has size 0
[5885] worker() is finished; returning results of size 20
(程序在最后一行卡住了。另一个进程 -- 5886 -- 似乎没有完成。)
import multiprocessing
from multiprocessing import Pool
import os
# Python 2.7.6 on Linux
# Worker (consumer) process
def worker(work_queue, results_queue):
print "[%d] entering worker() with work_queue size of %d" % (os.getpid(), work_queue.qsize())
while not work_queue.empty():
item = work_queue.get()
print "[%d] processed work item %s" % (os.getpid(), item)
s = '[%d] has processed %s.' % (os.getpid(), item)
results_queue.put(s)
work_queue.task_done()
print "[%d] worker() still running because work_queue has size %d" % (os.getpid(), work_queue.qsize())
print "[%d] worker() is finished; returning results of size %d" % (os.getpid(), results_queue.qsize())
if __name__ == '__main__':
MAX_PROCESS = 2 # Max number of processes to create
MAX_ITEMS = 20 # Max work items to process
m = multiprocessing.Manager()
results_queue = m.Queue()
work_queue = m.Queue()
# Assign work
for x in xrange(MAX_ITEMS):
work_queue.put(x)
print "Found %d CPUs." % multiprocessing.cpu_count()
print "Operation queue has %d items." % work_queue.qsize()
print "Will start %d processes." % MAX_PROCESS
# Pool method
pool = Pool(processes=MAX_PROCESS)
for n in range(0, MAX_PROCESS):
pool.apply_async(worker, args=(work_queue, results_queue))
pool.close()
print "Joining pool..."
pool.join()
print "Joining pool finished..."
print "--- After pool completion ---"
print "Work queue has %d items." % work_queue.qsize()
print "Results queue has %d items." % results_queue.qsize()
print "Results are:"
while not results_queue.empty():
item = results_queue.get()
print str(item)
results_queue.task_done()
print "--End---"
谢谢你的帮助。
1 个回答
你遇到了一个竞争条件的问题——进程 5886
发现 Pool
里有一个项目:
[5886] worker() still running because work_queue has size 1
所以它又回到阻塞的 get
调用:
while not work_queue.empty(): # It sees it's not emtpy here
item = work_queue.get() # But there's no item left by the time it gets here!
但是,在它调用 work_queue.empty()
之后,在 它调用 work_queue.get()
之前,另一个工作进程 (5885
) 消耗了队列中的最后一个项目:
[5885] processed work item 19
[5885] worker() still running because work_queue has size 0
[5885] worker() is finished; returning results of size 20
所以现在 5886
会在 get
上永远阻塞。一般来说,如果有多个消费者在使用同一个队列,你不应该用 empty()
方法来判断是否要进行阻塞的 get()
调用,因为这样容易出现这种竞争条件。你提到的“毒丸”/哨兵方法是处理这种情况的正确方式,或者使用非阻塞的 get
调用,并在发生时捕获 Empty
异常:
try:
item = work_queue.get_nowait()
print "[%d] processed work item %s" % (os.getpid(), item)
s = '[%d] has processed %s.' % (os.getpid(), item)
results_queue.put(s)
work_queue.task_done()
print "[%d] worker() still running because work_queue has size %d" % (os.getpid(), work_queue.qsize())
except Queue.Empty:
print "[%d] worker() is finished; returning results of size %d" % (os.getpid(), results_queue.qsize())
请注意,只有在你知道队列在工作进程开始消费后不会再增加大小时,才能使用这种方法。否则,你可能会在队列还有更多项目要添加时决定工作进程应该退出。