远程队列使用者在res之后未收到第一条消息

2024-04-25 22:09:40 发布

您现在位置:Python中文网/ 问答频道 /正文

我有如下代码:

你知道吗服务器.py

import queue
from multiprocessing.managers import BaseManager

class QueueManager(BaseManager):
    pass

q = queue.Queue()
QueueManager.register('queue', callable=lambda:q)
m = QueueManager(address=('localhost', 51000), authkey=b'pass')
s = m.get_server()
s.serve_forever()

你知道吗制作人.py

from multiprocessing.managers import BaseManager
import time

class QueueManager(BaseManager):
    pass

QueueManager.register('queue')
m = QueueManager(address=('localhost', 51000), authkey=b'pass')
m.connect()
queue = m.queue()

idx = 0
while True:
    time.sleep(2)
    queue.put(idx)
    idx += 1

你知道吗消费者.py

from multiprocessing.managers import BaseManager

class QueueManager(BaseManager):
    pass

QueueManager.register('queue')
m = QueueManager(address=('localhost', 51000), authkey=b'pass')
m.connect()
queue = m.queue()

while True:
    message = queue.get()
    print(message)

如果我运行服务器和生产者,然后启动消费者,我会看到生产者放入队列的所有消息都出现在消费者处。但是,如果我停止消费程序并立即重新启动它,它总是会跳过一条消息。你知道吗

来说明我所看到的消费者.py地址:

0
1
2
3
<restart the consumer>
5
6
7
etc.

这就是python多处理队列应该如何工作的,是一个bug还是我做错了什么?你知道吗


Tags: frompyimportregisterlocalhostqueueaddress消费者
1条回答
网友
1楼 · 发布于 2024-04-25 22:09:40

我认为问题在于python中实现管道的方式,或者甚至可能是操作系统的限制。以下是完整的堆栈跟踪:

Traceback (most recent call last):
  File "consumer.py", line 12, in <module>
    message = queue.get()
  File "<string>", line 2, in get
  File "/usr/local/Cellar/python3/3.6.0/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/managers.py", line 757, in _callmethod
    kind, result = conn.recv()
  File "/usr/local/Cellar/python3/3.6.0/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/connection.py", line 250, in recv
    buf = self._recv_bytes()
  File "/usr/local/Cellar/python3/3.6.0/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/connection.py", line 407, in _recv_bytes
    buf = self._recv(4)
  File "/usr/local/Cellar/python3/3.6.0/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/connection.py", line 379, in _recv
    chunk = read(handle, remaining)
KeyboardInterrupt

值似乎在queue.get()调用中丢失,该调用没有正确地用SIGINT终止。queue.get()会立即被取消,因此python不会完成get()调用,然后丢失值。它看起来更像是python没有正确地取消管道上的recv。你知道吗

如果要将消费者更改为:

while True:
    while queue.empty():
        sleep(0.1)
    message = queue.get()
    print(message)

会有用的。但这当然是一个解决办法,而不是真正的解决办法。你知道吗

更新:

在更多地使用代码之后,我认为这是一个bug,因为:

  1. followed their coding example one by one
  2. 没有任何类型的队列可以解决这个问题(既不是multiprocessing.Queue也不是multiprocessing.JoinableQueue
  3. 同样发送task_done()也没有帮助

这个错误同时发生在python2和python3上。我建议你report this as a bug。在最坏的情况下,如果它不是一个bug,那么至少可以解释python为什么会这样做。你知道吗

相关问题 更多 >