如何列出由多进程池启动的进程?
在尝试把 multiprocessing
的 process 实例存储到名为 'poolList' 的列表变量时,我遇到了一个异常:
SimpleQueue 对象只能通过继承在进程之间共享
我想把 PROCESS 实例存储在变量中的原因是,方便以后终止所有或部分进程(比如说某个进程卡住了)。如果不能把 PROCESS 存储在变量中,我想知道如何获取或列出所有由 multiprocessing
的 POOL 启动的进程。这和 .current_process()
方法的功能很像。不同的是,.current_process
只获取一个进程,而我需要获取所有启动的进程或当前正在运行的所有进程。
我有两个问题:
是否有可能存储一个 Process 的实例(比如通过
mp.current_process()
)?目前我只能在进程运行的函数内部获取一个进程(在
myFunct()
内部使用.current_process()
方法)。
我希望能够列出当前由 multiprocessing
运行的所有进程。该怎么做呢?
import multiprocessing as mp
poolList=mp.Manager().list()
def myFunct(arg):
print 'myFunct(): current process:', mp.current_process()
try: poolList.append(mp.current_process())
except Exception, e: print e
for i in range(110):
for n in range(500000):
pass
poolDict[arg]=i
print 'myFunct(): completed', arg, poolDict
from multiprocessing import Pool
pool = Pool(processes=2)
myArgsList=['arg1','arg2','arg3']
pool=Pool(processes=2)
pool.map_async(myFunct, myArgsList)
pool.close()
pool.join()
3 个回答
是的,你可以获取所有正在运行的进程,并根据进程的名称执行一些操作。例如:
multiprocessing.Process(target=foo, name="refresh-reports")
然后你可以这样做:
for p in multiprocessing.active_children():
if p.name == "foo":
p.terminate()
如果你想查看由一个 Pool() 实例启动的进程,可以用 pool._pool-list。这个列表里包含了所有进程的实例。
不过,这个列表并不是官方文档里提到的内容,所以其实不应该使用它。 但是……我觉得它不太可能就这样改变。我的意思是,他们会不会突然不再维护这个进程列表了?而且不再叫它 _pool? 还有,让我觉得烦的是,至少应该有一个获取进程的方法,或者类似的东西。处理因为名字改变而导致的问题也不应该太难。
不过,使用这个列表要自己承担风险:
from multiprocessing import pool
# Have to run in main
if __name__ == '__main__':
# Create 3 worker processes
_my_pool = pool.Pool(3)
# Loop, terminate, and remove from the process list
# Use a copy [:] of the list to remove items correctly
for _curr_process in _my_pool._pool[:]:
print("Terminating process "+ str(_curr_process.pid))
_curr_process.terminate()
_my_pool._pool.remove(_curr_process)
# If you call _repopulate, the pool will again contain 3 worker processes.
_my_pool._repopulate_pool()
for _curr_process in _my_pool._pool[:]:
print("After repopulation "+ str(_curr_process.pid))
这个例子创建了一个进程池,并手动终止了所有进程。
重要的是,如果你想让 Pool() 继续正常工作,记得自己把你终止的进程从池里删除。
_my_pool._repopulate 会把工作进程的数量重新增加到 3,这个信息虽然不是回答问题所必需的,但能让你了解一些幕后情况。
你正在创建一个管理的
List
对象,但却让关联的Manager
对象失效了。Process
对象是可以共享的,因为它们不能被序列化,也就是说,它们不是简单的对象。奇怪的是,
multiprocessing
模块没有类似于threading.enumerate()
的功能,也就是说,你 不能 列出所有正在运行的进程。作为解决方法,我只是把进程存储在一个列表里。我从来不使用terminate()
来结束一个进程,而是在父进程中使用sys.exit(0)
。这样做有点粗糙,因为工作进程可能会留下不一致的状态,但对于小程序来说还可以。要结束一个卡住的工作进程,我建议:1)工作进程定期在队列中接收“心跳”任务,2)如果父进程发现工作进程 A 在一定时间内没有响应心跳,就调用
p.terminate()
。可以考虑在另一个 StackOverflow 问题中重新表述这个问题,因为这很有趣。
老实说,使用 map
的东西比使用 Manager 要简单得多。
这是我用过的一个 Manager 示例。一个工作进程往一个共享列表中添加东西。另一个工作进程偶尔会醒来,处理列表中的所有内容,然后再回去睡觉。代码中还有详细的日志,这对调试非常重要。
source
# producer adds to fixed-sized list; scanner uses them
import logging, multiprocessing, sys, time
def producer(objlist):
'''
add an item to list every sec; ensure fixed size list
'''
logger = multiprocessing.get_logger()
logger.info('start')
while True:
try:
time.sleep(1)
except KeyboardInterrupt:
return
msg = 'ding: {:04d}'.format(int(time.time()) % 10000)
logger.info('put: %s', msg)
del objlist[0]
objlist.append( msg )
def scanner(objlist):
'''
every now and then, run calculation on objlist
'''
logger = multiprocessing.get_logger()
logger.info('start')
while True:
try:
time.sleep(5)
except KeyboardInterrupt:
return
logger.info('items: %s', list(objlist))
def main():
logger = multiprocessing.log_to_stderr(
level=logging.INFO
)
logger.info('setup')
# create fixed-length list, shared between producer & consumer
manager = multiprocessing.Manager()
my_objlist = manager.list( # pylint: disable=E1101
[None] * 10
)
multiprocessing.Process(
target=producer,
args=(my_objlist,),
name='producer',
).start()
multiprocessing.Process(
target=scanner,
args=(my_objlist,),
name='scanner',
).start()
logger.info('running forever')
try:
manager.join() # wait until both workers die
except KeyboardInterrupt:
pass
logger.info('done')
if __name__=='__main__':
main()