这是疯狂的生产者消费者实现吗?
# file1.py
class _Producer(self):
def __init__(self):
self.chunksize = 6220800
with open('/dev/zero') as f:
self.thing = f.read(self.chunksize)
self.n = 0
self.start()
def start(self):
import subprocess
import threading
def produce():
self._proc = subprocess.Popen(['producer_proc'], stdout=subprocess.PIPE)
while True:
self.thing = self._proc.stdout.read(self.chunksize)
if len(self.thing) != self.chunksize:
msg = 'Expected {0} bytes. Read {1} bytes'.format(self.chunksize, len(self.thing))
raise Exception(msg)
self.n += 1
t = threading.Thread(target=produce)
t.daemon = True
t.start()
self._thread = t
def stop(self):
if self._thread.is_alive():
self._proc.terminate()
self._thread.join(1)
producer = _Producer()
producer.start()
我写了一些代码,基本上是按照上面的设计,现在我想在其他文件中使用producer_proc
的输出,方法是:
# some_other_file.py
import file1
my_thing = file1.producer.thing
可能有多个消费者会引用file.producer.thing
,他们都需要使用同一个producer_proc
。而且producer_proc
绝不能被阻塞。这种实现方式合理吗?Python的全局解释器锁(GIL)能保证线程安全吗?还是说我需要重新实现一个队列来获取工作线程的数据?消费者需要明确地复制这个东西吗?
我想我是在尝试实现类似生产者/消费者模式或观察者模式的东西,但我对这些设计模式的技术细节并不是很清楚。
- 一个生产者不断地在制造东西
- 多个消费者在任意时间使用这些东西
- 当有新东西可用时,
producer.thing
应该立即被替换为新的东西,大部分东西可能不会被使用,但这没关系 - 多个消费者可以读取同一个东西,或者连续读取同一个东西两次。他们只想确保在请求时得到的是最新的东西,而不是过时的旧东西。
- 只要消费者在使用这个东西,即使生产者已经用一个新的东西覆盖了他的
self.thing
,消费者也应该能继续使用这个东西。
1 个回答
1
根据你这个(不寻常的!)需求,你的实现看起来是正确的。特别是:
- 如果你只更新一个属性,Python 的全局解释器锁(GIL)应该就够用了。单个字节码指令是原子的,也就是说它们要么完全执行,要么完全不执行。
- 如果你做的事情更复杂,那就加上锁!其实这样做基本上没什么坏处——如果你在乎性能或者多核扩展性,你可能就不会用 Python 了!
- 特别要注意的是,代码中的
self.thing
和self.n
是在不同的字节码指令中更新的。GIL 可能在这两者之间被释放或获取,所以如果不加锁的话,你无法保证这两个值是一致的。如果你不打算加锁,我建议你去掉self.n
,因为它可能会造成误用,或者至少加个注释说明这个问题。 - 使用者不需要复制对象。你并没有改变
self.thing
指向的特定对象(而且对于字符串对象来说,你也不能改变它们,因为它们是不可变的),而且 Python 是自动垃圾回收的,所以只要使用者抓住了它的引用,就可以继续访问,而不用太担心其他线程在干什么。最糟糕的情况是你的程序会占用很多内存,因为多个代的self.thing
被保留在内存中。
我有点好奇你的需求是从哪里来的。特别是,为什么你不在乎一个 thing
是从未使用还是被使用了很多次。