如何列出由多处理池启动的进程?

2024-05-13 19:26:41 发布

您现在位置:Python中文网/ 问答频道 /正文

试图将multiprocessing进程实例存储在multiprocessing列表变量“poolList”中时,出现以下异常:

SimpleQueue objects should only be shared between processes through inheritance

我之所以希望将流程实例存储在变量中,是为了能够在以后终止所有或部分实例(例如,如果流程冻结)。如果在变量中存储进程不是一个选项,我想知道如何获取或列出由mutliprocessing池启动的所有进程。这与.current_process()方法的功能非常相似。除了.current_process只获取一个进程,而我需要启动所有进程或当前运行的所有进程。

两个问题:

  1. 是否甚至可以存储进程的实例(由于mp.current_process()

  2. 目前,我只能从进程运行的函数内部(使用.current_process()方法从myFunct()内部)获取单个进程。

相反,我想列出当前由multiprocessing运行的所有进程。如何实现?


import multiprocessing as mp

poolList=mp.Manager().list()

def myFunct(arg):
    print 'myFunct(): current process:', mp.current_process()

    try: poolList.append(mp.current_process())
    except Exception, e: print e

    for i in range(110):
        for n in range(500000):
            pass
        poolDict[arg]=i
    print 'myFunct(): completed', arg, poolDict

from multiprocessing import Pool
pool = Pool(processes=2)
myArgsList=['arg1','arg2','arg3']

pool=Pool(processes=2)
pool.map_async(myFunct, myArgsList)
pool.close()
pool.join()

Tags: 实例方法进程argmp流程currentmultiprocessing
3条回答
  1. 您正在创建一个托管的List对象,然后让关联的Manager对象过期。

  2. Process对象是可共享的,因为它们不可pickle;也就是说,它们并不简单。

  3. 奇怪的是multiprocessing模块没有threading.enumerate()的等价物——也就是说,您不能列出所有未完成的进程。作为一种解决方法,我只是将进程存储在一个列表中。我从不terminate()进程,而是在父进程中sys.exit(0)。这很难,因为工人们会让事情处于不一致的状态,但对于较小的程序来说是可以的

  4. 要杀死冻结的工作进程,我建议:1)工作进程不时在队列中接收“心跳”作业;2)如果父进程发现工作进程a在一定时间内没有响应心跳,则p.terminate()。考虑在另一个非常有趣的问题中重新说明这个问题。

老实说,map东西比使用管理器容易得多。

这是我用过的一个经理的例子。工作人员将内容添加到共享列表中。另一个工人偶尔会醒来,处理列表上的所有内容,然后再回去睡觉。代码还具有详细的日志,这些日志对于调试非常重要。

来源

# producer adds to fixed-sized list; scanner uses them

import logging, multiprocessing, sys, time


def producer(objlist):
    '''
    add an item to list every sec; ensure fixed size list
    '''
    logger = multiprocessing.get_logger()
    logger.info('start')
    while True:
        try:
            time.sleep(1)
        except KeyboardInterrupt:
            return
        msg = 'ding: {:04d}'.format(int(time.time()) % 10000)
        logger.info('put: %s', msg)
        del objlist[0]
        objlist.append( msg )


def scanner(objlist):
    '''
    every now and then, run calculation on objlist
    '''
    logger = multiprocessing.get_logger()
    logger.info('start')
    while True:
        try:
            time.sleep(5)
        except KeyboardInterrupt:
            return
        logger.info('items: %s', list(objlist))


def main():
    logger = multiprocessing.log_to_stderr(
            level=logging.INFO
    )
    logger.info('setup')

    # create fixed-length list, shared between producer & consumer
    manager = multiprocessing.Manager()
    my_objlist = manager.list( # pylint: disable=E1101
        [None] * 10
    )

    multiprocessing.Process(
        target=producer,
        args=(my_objlist,),
        name='producer',
    ).start()

    multiprocessing.Process(
        target=scanner,
        args=(my_objlist,),
        name='scanner',
        ).start()

    logger.info('running forever')
    try:
        manager.join() # wait until both workers die
    except KeyboardInterrupt:
        pass
    logger.info('done')


if __name__=='__main__':
    main()

是的,您可以获取所有活动进程并根据进程名称执行操作 e、 克

multiprocessing.Process(target=foo, name="refresh-reports")

然后

for p in multiprocessing.active_children():
   if p.name == "foo":
      p.terminate()

要列出由Pool()-实例启动的进程(如果我理解正确,这就是您的意思),请使用Pool.\u Pool-list。它包含进程的实例。

但是,它不是文档化接口的一部分,因此,实际上不应该使用它。 但是…似乎有点不太可能像那样改变。我的意思是,他们应该停止在池中有一个进程的内部列表吗?不叫那个泳池? 而且,至少没有get进程方法,这让我很恼火。或者别的什么。 而处理由于名称更改而导致的中断应该没那么困难。

但还是要冒风险使用:

from multiprocessing import pool

# Have to run in main
if __name__ == '__main__':
    # Create 3 worker processes
    _my_pool = pool.Pool(3)

    # Loop, terminate, and remove from the process list
    # Use a copy [:] of the list to remove items correctly
    for _curr_process in _my_pool._pool[:]:
        print("Terminating process "+ str(_curr_process.pid))
        _curr_process.terminate()
        _my_pool._pool.remove(_curr_process)

    # If you call _repopulate, the pool will again contain 3 worker processes.
    _my_pool._repopulate_pool()
    for _curr_process in _my_pool._pool[:]:
        print("After repopulation "+ str(_curr_process.pid))

该示例创建一个池并手动终止所有进程。

重要的是,您要记住从池中删除您自己终止的进程i you want pool()继续正常工作。

_my_pool._repopulate将工作进程的数量再次增加到3个,不需要回答问题,但提供了一点幕后见解。

相关问题 更多 >