如何处理使用Process.Terminate()时的队列损坏问题
我正在制作一个Python脚本/应用程序,它会启动多个叫做“抓取器”的东西。
这些抓取器会做一些事情,然后把数据放到一个队列里。
我想确保这些抓取器的运行时间不超过60秒(因为整个应用程序在一个小时内会运行多次)。
在阅读Python文档时,我注意到他们提到使用Process.Terminate()时要小心,因为这可能会导致队列出问题。
我现在的代码是:
# Result Queue
resultQueue = Queue();
# Create Fetcher Instance
fetcher = fetcherClass()
# Create Fetcher Process List
fetcherProcesses = []
# Run Fetchers
for config in configList:
# Create Process to encapsulate Fetcher
log.debug("Creating Fetcher for Target: %s" % config['object_name'])
fetcherProcess = Process(target=fetcher.Run, args=(config,resultQueue))
log.debug("Starting Fetcher for Target: %s" % config['object_name'])
fetcherProcess.start()
fetcherProcesses.append((config, fetcherProcess))
# Wait for all Workers to complete
for config, fetcherProcess in fetcherProcesses:
log.debug("Waiting for Thread to complete (%s)." % str(config['object_name']))
fetcherProcess.join(DEFAULT_FETCHER_TIMEOUT)
if fetcherProcess.is_alive():
log.critical("Fetcher thread for object %s Timed Out! Terminating..." % config['object_name'])
fetcherProcess.terminate()
# Loop thru results, and save them in RRD
while not resultQueue.empty():
config, fetcherResult = resultQueue.get()
result = storage.Save(config, fetcherResult)
我想确保当我的某个抓取器超时时,队列不会被搞坏。
有什么好的方法可以做到这一点呢?
补充说明:在和sebdelsol聊天后,我有几点澄清:
1) 我希望尽快开始处理数据,因为如果不这样做,我就得一次性进行很多磁盘密集型操作。所以让主线程等待X_Timeout是不行的。
2) 我只需要在每个进程中等待一次超时,所以如果主线程启动了50个抓取器,这可能需要几秒到半分钟的时间,我需要进行补偿。
3) 我想确保从Queue.Get()获取的数据是由没有超时的抓取器放进去的(因为理论上可能会发生这样的情况:一个抓取器正在把数据放入队列时超时了,然后被强制终止了……)那些数据应该被丢弃。
超时发生并不是一件特别糟糕的事情,虽然不太理想,但数据损坏就更糟糕了。
2 个回答
0
为什么不这样做呢?
- 创建一个新的队列,然后启动所有需要使用这个队列的获取器。
- 让你的脚本实际暂停一段时间,这段时间就是你希望获取器的进程用来获取结果的时间。
- 从结果队列中获取所有内容——因为你没有杀掉任何进程,所以结果不会被破坏。
- 最后,结束所有仍然在运行的获取器进程。
- 循环进行!
6
你可以给每个你启动的抓取器传递一个新的 multiprocessing.Lock()
。
在抓取器的进程中,确保用这个锁来包裹 Queue.put()
的操作:
with self.lock:
self.queue.put(result)
当你需要结束一个抓取器的进程时,使用它的锁:
with fetcherLock:
fetcherProcess.terminate()
这样一来,在访问队列时杀掉一个抓取器就不会导致你的队列出问题。
有些抓取器的锁可能会出错,但这没关系,因为你每次启动新的抓取器时,都会有一个全新的锁。