Python:队列.get()阻止代码检查套接字连接

2024-04-29 20:16:28 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在运行一个线程,它从队列中获取消息,将其发送到客户机,然后接收确认。如果客户端断开连接,线程将捕获套接字错误并终止。问题是如果msgQ是空的,线程就不会检查套接字连接。有没有办法设置这个代码,这样即使队列是空的,也会检查套接字?(问题是,没有消息,就没有要发送的内容)

我需要发送一个特殊的ping吗?彭!消息如果msgQ是空的(并在客户端检查消息是日志数据还是ping?)?任何帮助都将不胜感激。在

def run(self): 
    while not self._terminate: 
        try: 
            msgs = self.msgQ.get()

            self.sock.send(pickle.dumps(msgs))
            rdy = pickle.loads(self.sock.recv(2097152))
        except socket.error, EOFError: 
            print 'log socketmanager closing'
            self.terminate()
            break
        except Empty: pass

Tags: self消息客户端客户机队列错误ping线程
3条回答

如果使用的是^{},则可以使用可选的block和{}参数调用{a2},或者调用{},它相当于Queue.get(block=False),如果队列为空,它将立即返回:

try:
    # wait for 1/10 second then return
    msgs = self.msgQ.get(timeout=0.1)
except Queue.Empty, qe:
    # handle empty queue 

如果msgQ为空,则对self.msgQ.get()的调用将引发Empty异常,并完全跳过对self.sock.send()和{}的调用。您的Empty异常的异常处理程序不执行任何操作,因此您的代码将busy wait,直到出现在msgQ中,而不会调用send或{}。在

一种可能的解决方案是使用pythonselect module检查异常处理程序中的套接字。大致如下:

def run(self): 
    while not self._terminate: 
        try: 
            msgs = self.msgQ.get()

            self.sock.send(pickle.dumps(msgs))
            rdy = pickle.loads(self.sock.recv(2097152))
        except socket.error, EOFError: 
            print 'log socketmanager closing'
            self.terminate()
            break
        except Empty:
            results = select.select([], [], [self.sock], 0.5) # timeout of 0.5 seconds
            if self.sock in results[2]:
                print 'exceptional condition on socket'
                self.terminate()
                break

我有个主意,就是把队列和管道包装到一个类中。然后使用select模块检查套接字和管道,不会使用轮询来监视事件。在

以下包装队列和管道的示例代码:

class MyQueue(object):
"""docstring for MyQueue"""
def __init__(self, arg):
    super(MyQueue, self).__init__()
    # self.arg = arg
    self._queue = Queue.Queue()
    self._rdfd, self._wrfd = os.pipe()
    return

def enQ(self, item=None, block=True, timeout=None):
    os.write(self._wrfd, '+')
    self._queue.put(item)
    return

def deQ(self, block=True, timeout=None):
    _itm = self._queue.get(block=block, timeout=timeout)
    _msg = os.read(self._rdfd, 1)
    return _itm

def get_notify(self):
    return self._rdfd

^{pr2}$

相关问题 更多 >