如何暂停多进程池的执行
在使用:
def myFunction(arg):
for i in range(10000):
pass
from multiprocessing import Pool
pool = Pool(processes=3)
pool.map_async( myFunction, ['first','second','third'] )
我希望用户能够在多进程的池(Pool)启动后,随时暂停执行。然后,我希望用户能够继续(恢复)处理池中的其他项目。该怎么做呢?
编辑:
这是根据Blckknght提出的建议实现的工作代码。谢谢Blckknght!
import multiprocessing
from PyQt4 import QtGui, QtCore
def setup(event):
global unpaused
unpaused = event
def myFunction( arg=None):
unpaused.wait()
print "Task started...", arg
for i in range(15000000):
pass
print '...task completed.', arg
class MyApp(object):
def __init__(self):
super(MyApp, self).__init__()
app = QtGui.QApplication(sys.argv)
self.mainWidget = QtGui.QWidget()
self.mainLayout = QtGui.QVBoxLayout()
self.mainWidget.setLayout(self.mainLayout)
self.groupbox = QtGui.QGroupBox()
self.layout = QtGui.QVBoxLayout()
self.groupbox.setLayout(self.layout)
self.pauseButton = QtGui.QPushButton('Pause')
self.pauseButton.clicked.connect(self.pauseButtonClicked)
self.layout.addWidget(self.pauseButton)
self.okButton = QtGui.QPushButton('Start Pool')
self.okButton.clicked.connect(self.startPool)
self.layout.addWidget(self.okButton)
self.layout.addWidget(self.pauseButton)
self.mainLayout.addWidget(self.groupbox)
self.mainWidget.show()
sys.exit(app.exec_())
def startPool(self):
self.event = multiprocessing.Event()
self.pool=multiprocessing.Pool(1, setup, (self.event,))
self.result=self.pool.map_async(myFunction, [1,2,3,4,5,6,7,8,9,10])
self.event.set()
# self.result.wait()
def pauseJob(self):
self.event.clear()
def continueJob(self):
self.event.set()
def pauseButtonClicked(self):
if self.pauseButton.text()=='Pause':
print '\n\t\t ...pausing job...','\n'
self.pauseButton.setText('Resume')
self.pauseJob()
else:
print '\n\t\t ...resuming job...','\n'
self.pauseButton.setText('Pause')
self.continueJob()
if __name__ == '__main__':
MyApp()
1 个回答
7
听起来你想用一个 multiprocessing.Event
来控制你的工作函数的运行。你可以创建一个事件,然后把它传递给池的 initializer
,接着在 myFunction
中等待这个事件。
这里有个例子,展示了工作进程每秒打印一次它们的参数。你可以通过 clear
这个事件来暂停工作进程,再通过 set
重新启动它们。
from time import sleep
import multiprocessing
def setup(event):
global unpaused
unpaused = event
def myFunction(arg):
for i in range(10):
unpaused.wait()
print(arg)
sleep(1)
if __name__ == "__main__":
event = multiprocessing.Event() # initially unset, so workers will be paused at first
pool = multiprocessing.Pool(3, setup, (event,))
result = pool.map_async(myFunction, ["foo", "bar", "baz"])
event.set() # unpause workers
sleep(5)
event.clear() # pause after five seconds
sleep(5)
event.set() # unpause again after five more seconds
result.wait() # wait for the rest of the work to be completed
这些工作进程应该打印 "foo"
、"bar"
和 "baz"
各十次,每次之间间隔一秒。不过,工作进程在前五秒后会暂停,然后再过五秒后重新启动。根据你的实际需求,可能有很多方法可以改进这段代码,但希望这个例子能给你指明方向。