如何列出由多进程池启动的进程?

4 投票
3 回答
12664 浏览
提问于 2025-04-18 08:47

在尝试把 multiprocessingprocess 实例存储到名为 'poolList' 的列表变量时,我遇到了一个异常:

SimpleQueue 对象只能通过继承在进程之间共享

我想把 PROCESS 实例存储在变量中的原因是,方便以后终止所有或部分进程(比如说某个进程卡住了)。如果不能把 PROCESS 存储在变量中,我想知道如何获取或列出所有由 multiprocessing 的 POOL 启动的进程。这和 .current_process() 方法的功能很像。不同的是,.current_process 只获取一个进程,而我需要获取所有启动的进程或当前正在运行的所有进程。

我有两个问题:

  1. 是否有可能存储一个 Process 的实例(比如通过 mp.current_process())?

  2. 目前我只能在进程运行的函数内部获取一个进程(在 myFunct() 内部使用 .current_process() 方法)。

我希望能够列出当前由 multiprocessing 运行的所有进程。该怎么做呢?


import multiprocessing as mp

poolList=mp.Manager().list()

def myFunct(arg):
    print 'myFunct(): current process:', mp.current_process()

    try: poolList.append(mp.current_process())
    except Exception, e: print e

    for i in range(110):
        for n in range(500000):
            pass
        poolDict[arg]=i
    print 'myFunct(): completed', arg, poolDict

from multiprocessing import Pool
pool = Pool(processes=2)
myArgsList=['arg1','arg2','arg3']

pool=Pool(processes=2)
pool.map_async(myFunct, myArgsList)
pool.close()
pool.join()

3 个回答

6

是的,你可以获取所有正在运行的进程,并根据进程的名称执行一些操作。例如:

multiprocessing.Process(target=foo, name="refresh-reports")

然后你可以这样做:

for p in multiprocessing.active_children():
   if p.name == "foo":
      p.terminate()
6

如果你想查看由一个 Pool() 实例启动的进程,可以用 pool._pool-list。这个列表里包含了所有进程的实例。

不过,这个列表并不是官方文档里提到的内容,所以其实不应该使用它。 但是……我觉得它不太可能就这样改变。我的意思是,他们会不会突然不再维护这个进程列表了?而且不再叫它 _pool? 还有,让我觉得烦的是,至少应该有一个获取进程的方法,或者类似的东西。处理因为名字改变而导致的问题也不应该太难。

不过,使用这个列表要自己承担风险:

from multiprocessing import pool

# Have to run in main
if __name__ == '__main__':
    # Create 3 worker processes
    _my_pool = pool.Pool(3)

    # Loop, terminate, and remove from the process list
    # Use a copy [:] of the list to remove items correctly
    for _curr_process in _my_pool._pool[:]:
        print("Terminating process "+ str(_curr_process.pid))
        _curr_process.terminate()
        _my_pool._pool.remove(_curr_process)

    # If you call _repopulate, the pool will again contain 3 worker processes.
    _my_pool._repopulate_pool()
    for _curr_process in _my_pool._pool[:]:
        print("After repopulation "+ str(_curr_process.pid))

这个例子创建了一个进程池,并手动终止了所有进程。

重要的是,如果你想让 Pool() 继续正常工作,记得自己把你终止的进程从池里删除。

_my_pool._repopulate 会把工作进程的数量重新增加到 3,这个信息虽然不是回答问题所必需的,但能让你了解一些幕后情况。

2
  1. 你正在创建一个管理的 List 对象,但却让关联的 Manager 对象失效了。

  2. Process 对象是可以共享的,因为它们不能被序列化,也就是说,它们不是简单的对象。

  3. 奇怪的是,multiprocessing 模块没有类似于 threading.enumerate() 的功能,也就是说,你 不能 列出所有正在运行的进程。作为解决方法,我只是把进程存储在一个列表里。我从来不使用 terminate() 来结束一个进程,而是在父进程中使用 sys.exit(0)。这样做有点粗糙,因为工作进程可能会留下不一致的状态,但对于小程序来说还可以。

  4. 要结束一个卡住的工作进程,我建议:1)工作进程定期在队列中接收“心跳”任务,2)如果父进程发现工作进程 A 在一定时间内没有响应心跳,就调用 p.terminate()。可以考虑在另一个 StackOverflow 问题中重新表述这个问题,因为这很有趣。

老实说,使用 map 的东西比使用 Manager 要简单得多。

这是我用过的一个 Manager 示例。一个工作进程往一个共享列表中添加东西。另一个工作进程偶尔会醒来,处理列表中的所有内容,然后再回去睡觉。代码中还有详细的日志,这对调试非常重要。

source

# producer adds to fixed-sized list; scanner uses them

import logging, multiprocessing, sys, time


def producer(objlist):
    '''
    add an item to list every sec; ensure fixed size list
    '''
    logger = multiprocessing.get_logger()
    logger.info('start')
    while True:
        try:
            time.sleep(1)
        except KeyboardInterrupt:
            return
        msg = 'ding: {:04d}'.format(int(time.time()) % 10000)
        logger.info('put: %s', msg)
        del objlist[0]
        objlist.append( msg )


def scanner(objlist):
    '''
    every now and then, run calculation on objlist
    '''
    logger = multiprocessing.get_logger()
    logger.info('start')
    while True:
        try:
            time.sleep(5)
        except KeyboardInterrupt:
            return
        logger.info('items: %s', list(objlist))


def main():
    logger = multiprocessing.log_to_stderr(
            level=logging.INFO
    )
    logger.info('setup')

    # create fixed-length list, shared between producer & consumer
    manager = multiprocessing.Manager()
    my_objlist = manager.list( # pylint: disable=E1101
        [None] * 10
    )

    multiprocessing.Process(
        target=producer,
        args=(my_objlist,),
        name='producer',
    ).start()

    multiprocessing.Process(
        target=scanner,
        args=(my_objlist,),
        name='scanner',
        ).start()

    logger.info('running forever')
    try:
        manager.join() # wait until both workers die
    except KeyboardInterrupt:
        pass
    logger.info('done')


if __name__=='__main__':
    main()

撰写回答