生产者/消费者例外处理
proconex的Python项目详细描述
proconex是一个模块,用于简化生产者/消费者的实现 习语。除了基于python的Queue.Queue的简单实现之外, proconex还负责处理在生产或消费过程中出现的异常 并确保所有工作以干净的方式关闭 留下僵尸线。
示例用法
为了使用普鲁卡因,我们需要做一些准备。
首先,设置python的日志:
>>> import logging >>> logging.basicConfig(level=logging.INFO)
如果您想使用with
语句清理并仍然使用python
2.5,您需要导入它:
>>> from __future__ import with_statement
最后,我们当然需要导入proconex本身:
>>> import proconex
下面是一个从文件中读取行的简单生成器:
>>> class LineProducer(proconex.Producer): ... def __init__(self, fileToReadPath): ... super(LineProducer, self).__init__() ... self._fileToReadPath = fileToReadPath ... def items(self): ... with open(self._fileToReadPath, 'rb') as fileToRead: ... for lineNumber, line in enumerate(fileToRead, start=1): ... yield (lineNumber, line.rstrip('\n\r'))
构造函数可以接受设置生产者所需的任何参数。在 在这种情况下,我们只需要读取文件的路径,fileToReadPath。 构造函数只是将值存储在一个属性中,以便以后引用。
函数items()通常作为生成器实现,并生成 一个接一个的生产项目,直到没有更多的项目生产。在 在这种情况下,我们只是逐行返回文件作为行号的元组,然后 没有尾随换行符的行内容。
接下来,我们需要一个消费者。这里有一个简单的方法处理读取的行 由上面的制作人打印其编号和文本:
>>> class LineConsumer(proconex.Consumer): ... def consume(self, item): ... lineNumber, line = item ... if "self" in line: ... print u"line %d: %s" % (lineNumber, line)
定义了producer和consumer的类之后,我们可以创建producer和 消费者名单:
>>> producer = LineProducer(__file__) >>> consumers = [LineConsumer("consumer#%d" % consumerId) ... for consumerId in xrange(3)]
要真正开始生产过程,我们需要一个工人来控制 生产者和消费者:
>>> with proconex.Worker(producer, consumers) as lineWorker: ... lineWorker.work() # doctest: +ELLIPSIS line ...
WITH语句确保一旦工作线程 完成或失败。或者可以使用try ... except ... finally 要处理错误和清除,请执行以下操作:
>>> producer = LineProducer(__file__) >>> consumers = [LineConsumer("consumer#%d" % consumerId) ... for consumerId in xrange(3)] >>> lineWorker = proconex.Worker(producer, consumers) >>> try: ... lineWorker.work() ... except Exception, error: ... print error ... finally: ... lineWorker.close() # doctest: +ELLIPSIS line ...
除了Worker之外,还有一个Converter,它不仅 生成和使用项,但也生成转换项。当 Converter``s use the same ``Producer``s as ``Worker``s, they require different consumers based on ``ConvertingConsumer。这样的消费者 addItem(),应该使用consume()来添加转换的项。
下面是一个消费者示例,它将消费的整数转换为 它们的平方值:
>>> class SquareConvertingIntegerConsumer(proconex.ConvertingConsumer): ... def consume(self, item): ... self.addItem(item * item)
0到4之间整数的合适生产者是:
>>> class IntegerProducer(proconex.Producer): ... def items(self): ... for item in xrange(5): ... yield item
将它们组合在转换器中,我们得到:
>>> with proconex.Converter(IntegerProducer("producer"), ... SquareConvertingIntegerConsumer("consumer")) as converter: ... for item in converter.items(): ... print item 0 1 4 9 16
限制
使用proconex时,您应该注意以下几点:
- 由于python的全局解释器锁(gil),至少有一个producer和
- 为了允许线程切换,应该将使用者绑定到I/O。
- 代码包含几个轮询循环,因为Queue可以
- 不支持取消
get()
和put()
。轮询不会耗尽 CPU,因为它在等待事件发生时使用超时。尽管如此, 我们还有改进的空间,欢迎您的贡献。
- 从生产过程中的错误中恢复的唯一方法是重新启动
- 从头开始的整个过程。
版本历史记录
版本0.42012-04-14
- 修正了Converter偶尔过早终止可能导致 消费者忽略了生产商排在队列中的最后几项。
版本0.3,2012-01-06
- 添加了Converter类,类似于Worker,但需要 消费者产生调用方可以处理的结果。
- 更改了生产者和消费者引发的异常以保留其堆栈 传递到Worker或Converter时跟踪。
版本0.22012-01-04
- 增加了对多个生产者的支持。
- 添加了lim它代表队列大小。默认情况下,它是消费者数量的两倍。
版本0.12012-01-03
- 首次公开发行。