生产者/消费者例外处理

proconex的Python项目详细描述


proconex是一个模块,用于简化生产者/消费者的实现 习语。除了基于python的Queue.Queue的简单实现之外, proconex还负责处理在生产或消费过程中出现的异常 并确保所有工作以干净的方式关闭 留下僵尸线。

示例用法

为了使用普鲁卡因,我们需要做一些准备。

首先,设置python的日志:

>>> import logging
>>> logging.basicConfig(level=logging.INFO)

如果您想使用with语句清理并仍然使用python 2.5,您需要导入它:

>>> from __future__ import with_statement

最后,我们当然需要导入proconex本身:

>>> import proconex

下面是一个从文件中读取行的简单生成器:

>>> class LineProducer(proconex.Producer):
...     def __init__(self, fileToReadPath):
...         super(LineProducer, self).__init__()
...         self._fileToReadPath = fileToReadPath
...     def items(self):
...         with open(self._fileToReadPath, 'rb') as fileToRead:
...             for lineNumber, line in enumerate(fileToRead, start=1):
...                 yield (lineNumber, line.rstrip('\n\r'))

构造函数可以接受设置生产者所需的任何参数。在 在这种情况下,我们只需要读取文件的路径,fileToReadPath。 构造函数只是将值存储在一个属性中,以便以后引用。

函数items()通常作为生成器实现,并生成 一个接一个的生产项目,直到没有更多的项目生产。在 在这种情况下,我们只是逐行返回文件作为行号的元组,然后 没有尾随换行符的行内容。

接下来,我们需要一个消费者。这里有一个简单的方法处理读取的行 由上面的制作人打印其编号和文本:

>>> class LineConsumer(proconex.Consumer):
...     def consume(self, item):
...         lineNumber, line = item
...         if "self" in line:
...             print u"line %d: %s" % (lineNumber, line)

定义了producer和consumer的类之后,我们可以创建producer和 消费者名单:

>>> producer =  LineProducer(__file__)
>>> consumers = [LineConsumer("consumer#%d" % consumerId)
...         for consumerId in xrange(3)]

要真正开始生产过程,我们需要一个工人来控制 生产者和消费者:

>>> with proconex.Worker(producer, consumers) as lineWorker:
...     lineWorker.work() # doctest: +ELLIPSIS
line ...

WITH语句确保一旦工作线程 完成或失败。或者可以使用try ... except ... finally 要处理错误和清除,请执行以下操作:

>>> producer =  LineProducer(__file__)
>>> consumers = [LineConsumer("consumer#%d" % consumerId)
...         for consumerId in xrange(3)]
>>> lineWorker = proconex.Worker(producer, consumers)
>>> try:
...     lineWorker.work()
... except Exception, error:
...     print error
... finally:
...    lineWorker.close() # doctest: +ELLIPSIS
line ...

除了Worker之外,还有一个Converter,它不仅 生成和使用项,但也生成转换项。当 Converter``s use the same ``Producer``s as ``Worker``s, they require different consumers based on ``ConvertingConsumer。这样的消费者 addItem(),应该使用consume()来添加转换的项。

下面是一个消费者示例,它将消费的整数转换为 它们的平方值:

>>> class SquareConvertingIntegerConsumer(proconex.ConvertingConsumer):
...     def consume(self, item):
...         self.addItem(item * item)

0到4之间整数的合适生产者是:

>>> class IntegerProducer(proconex.Producer):
...     def items(self):
...         for item in xrange(5):
...             yield item

将它们组合在转换器中,我们得到:

>>> with proconex.Converter(IntegerProducer("producer"),
...         SquareConvertingIntegerConsumer("consumer")) as converter:
...     for item in converter.items():
...         print item
0
1
4
9
16

限制

使用proconex时,您应该注意以下几点:

  • 由于python的全局解释器锁(gil),至少有一个producer和
    为了允许线程切换,应该将使用者绑定到I/O。
  • 代码包含几个轮询循环,因为Queue可以
    不支持取消get()put()。轮询不会耗尽 CPU,因为它在等待事件发生时使用超时。尽管如此, 我们还有改进的空间,欢迎您的贡献。
  • 从生产过程中的错误中恢复的唯一方法是重新启动
    从头开始的整个过程。
如果你需要更多的灵活性和控制,比PROCONEX提供,尝试 celery

源代码

Proconex是根据GNU Lesser通用公共许可证第3版发行的 或者以后。

源代码可从<;https://github.com/roskakori/proconex>;获得。

版本历史记录

版本0.42012-04-14

  • 修正了Converter偶尔过早终止可能导致 消费者忽略了生产商排在队列中的最后几项。

版本0.3,2012-01-06

  • 添加了Converter类,类似于Worker,但需要 消费者产生调用方可以处理的结果。
  • 更改了生产者和消费者引发的异常以保留其堆栈 传递到WorkerConverter时跟踪。

版本0.22012-01-04

  • 增加了对多个生产者的支持。
  • 添加了lim它代表队列大小。默认情况下,它是消费者数量的两倍。

版本0.12012-01-03

  • 首次公开发行。

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
Java泛型重写抽象方法并具有子类的返回类型   Java中的字符串反转字符,同时保留一些字符   java将系统时间与我获取它的时间进行比较   java解析ODATA URL以在准备entityset之前读取ID值   java中的有界通配符下界泛型即使在传递超类时也不会编译   c#Java的JVM和Java的内部工作方式有什么不同。NET的CLR?   java如何在windows7上指定JDK的版本?   Java:列出单个目录中的所有文件(1020000+)   java使用Logback和Lombok   安卓谷歌玩java。lang.NullPointerException   使用RSA的解密结果在普通Java和Android中有所不同   具有默认连接池的java Spring引导   java我如何在一个坏的测试环境中前进?