Python并发:使用apply_async()时挂起

1 投票
1 回答
2117 浏览
提问于 2025-04-18 18:47

我正在学习Python的并发编程。作为一个实验,我写了一个程序,使用了进程池,并通过apply_async()来调用工作进程。为了在进程之间共享信息(工作和结果),我使用了multiprocessing.Manager()中的队列。

不过,这段代码有时候会卡住,尤其是在工作队列中的所有任务都处理完后,我不太明白为什么。为了观察这个现象,我得多次运行程序。

顺便提一下,我其实可以让这个程序正常工作:我找到了一种设计模式,有人称之为“毒丸”方法,这似乎有效。(在我的worker()方法中,我进入一个无限循环,只有当我的工作队列中出现一个特定的值时才会跳出循环。我在工作队列中创建的特定值数量和正在运行的进程数量是一样的。)

但我仍然想弄清楚为什么这段代码会卡住。 我得到的输出如下(进程ID在括号中):

Found 8 CPUs.  
Operation queue has 20 items.  
Will start 2 processes.  
Joining pool...  
[5885] entering worker() with work_queue size of 20  
[5885] processed work item 0  
[5885] worker() still running because work_queue has size 19  
[5885] processed work item 1  
[5885] worker() still running because work_queue has size 18  
[5885] processed work item 2  
[5885] worker() still running because work_queue has size 17  
[5885] processed work item 3  
[5885] worker() still running because work_queue has size 16  
[5885] processed work item 4  
[5885] worker() still running because work_queue has size 15  
[5885] processed work item 5  
[5886] entering worker() with work_queue size of 14  
[5885] worker() still running because work_queue has size 14  
[5886] processed work item 6  
[5886] worker() still running because work_queue has size 13  
[5885] processed work item 7  
[5886] processed work item 8  
[5885] worker() still running because work_queue has size 11  
[5886] worker() still running because work_queue has size 11  
[5885] processed work item 9  
[5886] processed work item 10  
[5885] worker() still running because work_queue has size 9  
[5886] worker() still running because work_queue has size 9  
[5885] processed work item 11  
[5886] processed work item 12  
[5885] worker() still running because work_queue has size 7  
[5886] worker() still running because work_queue has size 7  
[5885] processed work item 13  
[5886] processed work item 14  
[5885] worker() still running because work_queue has size 5  
[5886] worker() still running because work_queue has size 5  
[5885] processed work item 15  
[5886] processed work item 16  
[5885] worker() still running because work_queue has size 3  
[5886] worker() still running because work_queue has size 3  
[5885] processed work item 17  
[5886] processed work item 18  
[5885] worker() still running because work_queue has size 1  
[5886] worker() still running because work_queue has size 1  
[5885] processed work item 19  
[5885] worker() still running because work_queue has size 0  
[5885] worker() is finished; returning results of size 20  

(程序在最后一行卡住了。另一个进程 -- 5886 -- 似乎没有完成。)

import multiprocessing
from multiprocessing import Pool
import os

# Python 2.7.6 on Linux

# Worker (consumer) process
def worker(work_queue, results_queue):
    print "[%d] entering worker() with work_queue size of %d" % (os.getpid(), work_queue.qsize())
    while not work_queue.empty():
        item = work_queue.get()   
        print "[%d] processed work item %s" % (os.getpid(), item)  
        s = '[%d] has processed %s.' % (os.getpid(), item)
        results_queue.put(s)
        work_queue.task_done()
        print "[%d] worker() still running because work_queue has size %d" % (os.getpid(), work_queue.qsize())
    print "[%d] worker() is finished; returning results of size %d" % (os.getpid(), results_queue.qsize())

if __name__ == '__main__':

    MAX_PROCESS = 2     # Max number of processes to create
    MAX_ITEMS = 20      # Max work items to process

    m = multiprocessing.Manager()
    results_queue = m.Queue()
    work_queue = m.Queue()

    # Assign work
    for x in xrange(MAX_ITEMS):
        work_queue.put(x)     

    print "Found %d CPUs." % multiprocessing.cpu_count()
    print "Operation queue has %d items." % work_queue.qsize()
    print "Will start %d processes." % MAX_PROCESS

    # Pool method
    pool = Pool(processes=MAX_PROCESS)
    for n in range(0, MAX_PROCESS):
        pool.apply_async(worker, args=(work_queue, results_queue))
    pool.close()
    print "Joining pool..."
    pool.join()
    print "Joining pool finished..."
    print "--- After pool completion ---"

    print "Work queue has %d items." % work_queue.qsize()
    print "Results queue has %d items." % results_queue.qsize()
    print "Results are:"

    while not results_queue.empty():
        item = results_queue.get()
        print str(item)
        results_queue.task_done()
    print "--End---"

谢谢你的帮助。

1 个回答

1

你遇到了一个竞争条件的问题——进程 5886 发现 Pool 里有一个项目:

[5886] worker() still running because work_queue has size 1

所以它又回到阻塞的 get 调用:

while not work_queue.empty(): # It sees it's not emtpy here
    item = work_queue.get()   # But there's no item left by the time it gets here!

但是,在它调用 work_queue.empty() 之后, 它调用 work_queue.get() 之前,另一个工作进程 (5885) 消耗了队列中的最后一个项目:

[5885] processed work item 19  
[5885] worker() still running because work_queue has size 0  
[5885] worker() is finished; returning results of size 20  

所以现在 5886 会在 get 上永远阻塞。一般来说,如果有多个消费者在使用同一个队列,你不应该用 empty() 方法来判断是否要进行阻塞的 get() 调用,因为这样容易出现这种竞争条件。你提到的“毒丸”/哨兵方法是处理这种情况的正确方式,或者使用非阻塞的 get 调用,并在发生时捕获 Empty 异常:

try:
    item = work_queue.get_nowait()
    print "[%d] processed work item %s" % (os.getpid(), item)  
    s = '[%d] has processed %s.' % (os.getpid(), item)
    results_queue.put(s)
    work_queue.task_done()
    print "[%d] worker() still running because work_queue has size %d" % (os.getpid(), work_queue.qsize())
except Queue.Empty:
    print "[%d] worker() is finished; returning results of size %d" % (os.getpid(), results_queue.qsize())

请注意,只有在你知道队列在工作进程开始消费后不会再增加大小时,才能使用这种方法。否则,你可能会在队列还有更多项目要添加时决定工作进程应该退出。

撰写回答