如何在一个子进程完成时终止运行中的子进程(多进程)
我遇到了一个关于多进程的问题:
class PythonHelper(object):
@staticmethod
def run_in_parallel(*functions):
processes=list()
for function in functions:
process=Process(target=function)
process.start()
processes.append(process)
for process in processes:
process.join()
上面的静态方法是我用来同时运行几个函数的(把它们放在一个进程里)。一切都很好,直到我需要在一个“子进程”结束时强制终止整个进程。
举个例子:
from PythonHelper import PythonHelper as ph
from Recorder import Recorder
class Logger(object):
def run_recorder_proc(self):
rec=Recorder()
rec.record_video()
def run_printer_proc(self):
#hypothetical function: execution takes a long time
for i in range(9000000):
print("number: {}".format(i))
def run_logger(self):
ph.run_in_parallel(self.run_printer_proc,self.run_recorder_proc)
self.run_printer_proc 和 self.run_recorder_proc 是我的子进程。怎么才能在其中一个完成时“杀掉”剩下的子进程呢?
编辑:
完整的源代码:
class PythonHelper(object):
@staticmethod
#with your fix
def run_in_parallel(*functions):
processes={}
for function in functions:
process=Process(target=function)
process.start()
processes[process.pid]=process
# wait for any process to complete
pid, status = os.waitpid(-1, 0)
# one process terminated
# join it
processes[pid].join()
del processes[pid]
# terminate the rest
for process in processes.values():
process.terminate()
for process in processes.values():
process.join()
class Logger(object):
def run_numbers_1(self):
for i in range(900000):
print("number: {}".format(i))
def run_numbers_2(self):
for i in range(100000):
print("number: {}".format(i))
def run_logger(self):
ph.run_in_parallel(self.run_numbers_1,self.run_numbers_2)
if __name__=="__main__":
logger=Logger()
logger.run_logger()
根据上面的例子,我想在run_numbers_2完成时强制终止run_numbers_1。
1 个回答
2
你可以通过稍微修改一下 run_in_parallel()
来实现这个功能:
def run_in_parallel(*functions):
processes={}
for function in functions:
process=Process(target=function)
process.start()
processes[process.pid]=process
# wait for any process to complete
pid, status = os.waitpid(-1, 0)
# one process terminated
# join it
processes[pid].join()
del processes[pid]
# terminate the rest
for process in processes.itervalues():
process.terminate()
for process in processes.itervalues():
process.join()
[更新]
根据你提供的完整代码,这里有一个可以正常工作的例子。它使用了 Event
对象,而不是容易出问题的 os.waitpid()
,这个对象会在其他进程完成时被设置:
from multiprocessing import Process, Event
class MyProcess(Process):
def __init__(self, event, *args, **kwargs):
self.event = event
Process.__init__(self, *args, **kwargs)
def run(self):
Process.run(self)
self.event.set()
class PythonHelper(object):
@staticmethod
#with your fix
def run_in_parallel(*functions):
event = Event()
processes=[]
for function in functions:
process=MyProcess(event, target=function)
process.start()
processes.append(process)
# wait for any process to complete
event.wait()
# one process completed
# terminate all child processes
for process in processes:
process.terminate()
for process in processes:
process.join()
class Logger(object):
def run_numbers_1(self):
for i in range(90000):
print("1 number: {}".format(i))
def run_numbers_2(self):
for i in range(10000):
print("2 number: {}".format(i))
def run_logger(self):
PythonHelper.run_in_parallel(self.run_numbers_1,self.run_numbers_2)
if __name__=="__main__":
logger=Logger()
logger.run_logger()