这是疯狂的生产者消费者实现吗?

0 投票
1 回答
655 浏览
提问于 2025-04-17 02:10
# file1.py

class _Producer(self):

  def __init__(self):
    self.chunksize = 6220800
    with open('/dev/zero') as f:
      self.thing = f.read(self.chunksize)
    self.n = 0
    self.start()

  def start(self):
    import subprocess
    import threading

    def produce():
      self._proc = subprocess.Popen(['producer_proc'], stdout=subprocess.PIPE)
      while True:
        self.thing = self._proc.stdout.read(self.chunksize)
        if len(self.thing) != self.chunksize:
          msg = 'Expected {0} bytes.  Read {1} bytes'.format(self.chunksize, len(self.thing))
          raise Exception(msg)
        self.n += 1

    t = threading.Thread(target=produce)
    t.daemon = True
    t.start()
    self._thread = t

  def stop(self):
    if self._thread.is_alive():
      self._proc.terminate()
      self._thread.join(1)

producer = _Producer()
producer.start()

我写了一些代码,基本上是按照上面的设计,现在我想在其他文件中使用producer_proc的输出,方法是:

# some_other_file.py
import file1
my_thing = file1.producer.thing 

可能有多个消费者会引用file.producer.thing,他们都需要使用同一个producer_proc。而且producer_proc绝不能被阻塞。这种实现方式合理吗?Python的全局解释器锁(GIL)能保证线程安全吗?还是说我需要重新实现一个队列来获取工作线程的数据?消费者需要明确地复制这个东西吗?

我想我是在尝试实现类似生产者/消费者模式或观察者模式的东西,但我对这些设计模式的技术细节并不是很清楚。

  • 一个生产者不断地在制造东西
  • 多个消费者在任意时间使用这些东西
  • 当有新东西可用时,producer.thing应该立即被替换为新的东西,大部分东西可能不会被使用,但这没关系
  • 多个消费者可以读取同一个东西,或者连续读取同一个东西两次。他们只想确保在请求时得到的是最新的东西,而不是过时的旧东西。
  • 只要消费者在使用这个东西,即使生产者已经用一个新的东西覆盖了他的self.thing,消费者也应该能继续使用这个东西。

1 个回答

1

根据你这个(不寻常的!)需求,你的实现看起来是正确的。特别是:

  • 如果你只更新一个属性,Python 的全局解释器锁(GIL)应该就够用了。单个字节码指令是原子的,也就是说它们要么完全执行,要么完全不执行。
  • 如果你做的事情更复杂,那就加上锁!其实这样做基本上没什么坏处——如果你在乎性能或者多核扩展性,你可能就不会用 Python 了!
  • 特别要注意的是,代码中的 self.thingself.n 是在不同的字节码指令中更新的。GIL 可能在这两者之间被释放或获取,所以如果不加锁的话,你无法保证这两个值是一致的。如果你不打算加锁,我建议你去掉 self.n,因为它可能会造成误用,或者至少加个注释说明这个问题。
  • 使用者不需要复制对象。你并没有改变 self.thing 指向的特定对象(而且对于字符串对象来说,你也不能改变它们,因为它们是不可变的),而且 Python 是自动垃圾回收的,所以只要使用者抓住了它的引用,就可以继续访问,而不用太担心其他线程在干什么。最糟糕的情况是你的程序会占用很多内存,因为多个代的 self.thing 被保留在内存中。

我有点好奇你的需求是从哪里来的。特别是,为什么你不在乎一个 thing 是从未使用还是被使用了很多次。

撰写回答