我有如下代码:
import queue
from multiprocessing.managers import BaseManager
class QueueManager(BaseManager):
pass
q = queue.Queue()
QueueManager.register('queue', callable=lambda:q)
m = QueueManager(address=('localhost', 51000), authkey=b'pass')
s = m.get_server()
s.serve_forever()
from multiprocessing.managers import BaseManager
import time
class QueueManager(BaseManager):
pass
QueueManager.register('queue')
m = QueueManager(address=('localhost', 51000), authkey=b'pass')
m.connect()
queue = m.queue()
idx = 0
while True:
time.sleep(2)
queue.put(idx)
idx += 1
from multiprocessing.managers import BaseManager
class QueueManager(BaseManager):
pass
QueueManager.register('queue')
m = QueueManager(address=('localhost', 51000), authkey=b'pass')
m.connect()
queue = m.queue()
while True:
message = queue.get()
print(message)
如果我运行服务器和生产者,然后启动消费者,我会看到生产者放入队列的所有消息都出现在消费者处。但是,如果我停止消费程序并立即重新启动它,它总是会跳过一条消息。你知道吗
来说明我所看到的消费者.py地址:
0
1
2
3
<restart the consumer>
5
6
7
etc.
这就是python多处理队列应该如何工作的,是一个bug还是我做错了什么?你知道吗
我认为问题在于python中实现管道的方式,或者甚至可能是操作系统的限制。以下是完整的堆栈跟踪:
值似乎在
queue.get()
调用中丢失,该调用没有正确地用SIGINT
终止。queue.get()
会立即被取消,因此python不会完成get()
调用,然后丢失值。它看起来更像是python没有正确地取消管道上的recv
。你知道吗如果要将消费者更改为:
会有用的。但这当然是一个解决办法,而不是真正的解决办法。你知道吗
更新:
在更多地使用代码之后,我认为这是一个bug,因为:
multiprocessing.Queue
也不是multiprocessing.JoinableQueue
task_done()
也没有帮助这个错误同时发生在python2和python3上。我建议你report this as a bug。在最坏的情况下,如果它不是一个bug,那么至少可以解释python为什么会这样做。你知道吗
相关问题 更多 >
编程相关推荐