带异常处理的Python生产者/消费者
我正在尝试用Python实现一个看似简单的经典生产者-消费者模式。这里有一个比较快的生产者和多个较慢的消费者。原则上,使用Queue模块来实现这个是很简单的,库的文档中有一个只需要几行代码的例子。
不过,我还希望代码在出现异常时能正常工作。如果发生以下任何情况,生产者和所有消费者都应该停止:
- 生产者出现异常
- 任何消费者出现异常
- 用户停止程序(导致KeyboardInterrupt)
之后,整个过程应该失败,并抛出最初的异常,以便告知调用者出了什么问题。
主要的挑战似乎是要干净地终止消费者线程,而不陷入阻塞的join()状态。设置Thread.deamon=True似乎很流行,但据我了解,这会导致在生产者出现异常时资源泄漏。
我成功写出了一个满足我需求的实现(见下文)。不过,我发现代码比预期的复杂得多。
有没有更简洁的方法来处理这些情况呢?
以下是我当前实现的一些示例调用和最终日志消息:
生产和消费10个项目:
$ python procon.py
INFO:root:processed all items
不生产任何项目:
$ python procon.py --items 0
INFO:root:processed all items
为10个消费者生产5个项目,因此只使用了一部分可用的消费者:
$ python procon.py --items 5 --consumers 10
INFO:root:processed all items
通过按Control-C中断:
$ python procon.py
^CWARNING:root:interrupted by user
生产项目3失败:
$ python procon.py --producer-fails-at 3
ERROR:root:cannot produce item 3
消费项目3失败:
$ python procon.py --consumer-fails-at 3
ERROR:root:cannot consume item 3
消费最后一个项目失败:
$ python procon.py --items 10 --consumer-fails-at 9
ERROR:root:cannot consume item 9
这是可能过于复杂的源代码:
"""
Consumer/producer to test exception handling in threads. Both the producer
and the consumer can be made to fail deliberately when processing a certain
item using command line options.
"""
import logging
import optparse
import Queue
import threading
import time
_PRODUCTION_DELAY = 0.1
_CONSUMPTION_DELAY = 0.3
# Delay for ugly hacks and polling loops.
_HACK_DELAY = 0.05
class _Consumer(threading.Thread):
"""
Thread to consume items from an item queue filled by a producer, which can
be told to terminate in two ways:
1. using `finish()`, which keeps processing the remaining items on the
queue until it is empty
2. using `cancel()`, which finishes consuming the current item and then
terminates
"""
def __init__(self, name, itemQueue, failedConsumers):
super(_Consumer, self).__init__(name=name)
self._log = logging.getLogger(name)
self._itemQueue = itemQueue
self._failedConsumers = failedConsumers
self.error = None
self.itemToFailAt = None
self._log.info(u"waiting for items to consume")
self._isFinishing = False
self._isCanceled = False
def finish(self):
self._isFinishing = True
def cancel(self):
self._isCanceled = True
def consume(self, item):
self._log.info(u"consume item %d", item)
if item == self.itemToFailAt:
raise ValueError("cannot consume item %d" % item)
time.sleep(_CONSUMPTION_DELAY)
def run(self):
try:
while not (self._isFinishing and self._itemQueue.empty()) \
and not self._isCanceled:
# HACK: Use a timeout when getting the item from the queue
# because between `empty()` and `get()` another consumer might
# have removed it.
try:
item = self._itemQueue.get(timeout=_HACK_DELAY)
self.consume(item)
except Queue.Empty:
pass
if self._isCanceled:
self._log.info(u"canceled")
if self._isFinishing:
self._log.info(u"finished")
except Exception, error:
self._log.error(u"cannot continue to consume: %s", error)
self.error = error
self._failedConsumers.put(self)
class Worker(object):
"""
Controller for interaction between producer and consumers.
"""
def __init__(self, itemsToProduceCount, itemProducerFailsAt,
itemConsumerFailsAt, consumerCount):
self._itemsToProduceCount = itemsToProduceCount
self._itemProducerFailsAt = itemProducerFailsAt
self._itemConsumerFailsAt = itemConsumerFailsAt
self._consumerCount = consumerCount
self._itemQueue = Queue.Queue()
self._failedConsumers = Queue.Queue()
self._log = logging.getLogger("producer")
self._consumers = []
def _possiblyRaiseConsumerError(self):
if not self._failedConsumers.empty():
failedConsumer = self._failedConsumers.get()
self._log.info(u"handling failed %s", failedConsumer.name)
raise failedConsumer.error
def _cancelAllConsumers(self):
self._log.info(u"canceling all consumers")
for consumerToCancel in self._consumers:
consumerToCancel.cancel()
self._log.info(u"waiting for consumers to be canceled")
for possiblyCanceledConsumer in self._consumers:
# In this case, we ignore possible consumer errors because there
# already is an error to report.
possiblyCanceledConsumer.join(_HACK_DELAY)
if possiblyCanceledConsumer.isAlive():
self._consumers.append(possiblyCanceledConsumer)
def work(self):
"""
Launch consumer thread and produce items. In case any consumer or the
producer raise an exception, fail by raising this exception
"""
self.consumers = []
for consumerId in range(self._consumerCount):
consumerToStart = _Consumer(u"consumer %d" % consumerId,
self._itemQueue, self._failedConsumers)
self._consumers.append(consumerToStart)
consumerToStart.start()
if self._itemConsumerFailsAt is not None:
consumerToStart.itemToFailAt = self._itemConsumerFailsAt
self._log = logging.getLogger("producer ")
self._log.info(u"producing %d items", self._itemsToProduceCount)
for itemNumber in range(self._itemsToProduceCount):
self._possiblyRaiseConsumerError()
self._log.info(u"produce item %d", itemNumber)
if itemNumber == self._itemProducerFailsAt:
raise ValueError("ucannot produce item %d" % itemNumber)
# Do the actual work.
time.sleep(_PRODUCTION_DELAY)
self._itemQueue.put(itemNumber)
self._log.info(u"telling consumers to finish the remaining items")
for consumerToFinish in self._consumers:
consumerToFinish.finish()
self._log.info(u"waiting for consumers to finish")
for possiblyFinishedConsumer in self._consumers:
self._possiblyRaiseConsumerError()
possiblyFinishedConsumer.join(_HACK_DELAY)
if possiblyFinishedConsumer.isAlive():
self._consumers.append(possiblyFinishedConsumer)
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
parser = optparse.OptionParser()
parser.add_option("-c", "--consumer-fails-at", metavar="NUMBER",
type="long", help="number of items at which consumer fails (default: %default)")
parser.add_option("-i", "--items", metavar="NUMBER", type="long",
help="number of items to produce (default: %default)", default=10)
parser.add_option("-n", "--consumers", metavar="NUMBER", type="long",
help="number of consumers (default: %default)", default=2)
parser.add_option("-p", "--producer-fails-at", metavar="NUMBER",
type="long", help="number of items at which producer fails (default: %default)")
options, others = parser.parse_args()
worker = Worker(options.items, options.producer_fails_at,
options.consumer_fails_at, options.consumers)
try:
worker.work()
logging.info(u"processed all items")
except KeyboardInterrupt:
logging.warning(u"interrupted by user")
worker._cancelAllConsumers()
except Exception, error:
logging.error(u"%s", error)
worker._cancelAllConsumers()
2 个回答
你需要一个队列,这个队列要有一个取消的方法,这个方法可以清空内部的队列,设置一个取消的标志,然后唤醒所有的等待者。工作线程会从 join() 中醒来,检查队列上的取消标志,然后做出相应的处理。消费者会从 get() 中醒来,检查队列上的取消标志,并打印一个错误信息。这样,如果出现异常,你的消费者只需要调用 cancel() 方法就可以了。
可惜的是,Python 的队列没有取消的方法。这里有几个选择:
- 自己实现一个队列(这可能会比较复杂,容易出错)
- 扩展 Python 的队列,添加取消的方法(这样会让你的代码依赖于 Python 队列的内部实现)
- 代理队列类,重载 join/get 方法,加入你的忙等待逻辑(这仍然是一个忙等待的解决方案,但把它限制在一个地方,能让生产者/消费者的代码更整洁)
- 找其他的队列实现或库来使用
到目前为止,大家的回答给了一些不错的提示,但缺少可以直接用的代码。所以我把我问题中的代码整理成了一个库,现在可以在这个链接找到:http://pypi.python.org/pypi/proconex/。你也可以在这里查看源代码:https://github.com/roskakori/proconex。虽然这个接口看起来很合理,但实现上还是用的是轮询方式,所以欢迎大家提出改进意见。
在生产者或消费者线程中出现的任何异常都会在主线程中重新抛出。确保你使用 with
语句或者 finally:worker.close()
来确保所有线程都能正确关闭。
下面是一个简单的例子,展示了一个生产者和两个消费者处理整数的情况:
import logging
import proconex
class IntegerProducer(proconex.Producer):
def items(self):
for item in xrange(10):
logging.info('produce %d', item)
yield item
class IntegerConsumer(proconex.Consumer):
def consume(self, item):
logging.info('consume %d with %s', item, self.name)
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO)
producer = IntegerProducer()
consumer1 = IntegerConsumer('consumer1')
consumer2 = IntegerConsumer('consumer2')
with proconex.Worker(producer, [consumer1, consumer2]) as worker:
worker.work()