Python: Queue.get()阻塞检查套接字连接的代码

0 投票
3 回答
2934 浏览
提问于 2025-04-16 13:23

我正在运行一个线程,这个线程从一个消息队列中取出消息,发送给客户端,然后等待确认。如果客户端断开连接,线程会捕捉到一个套接字错误并终止。不过问题是,如果消息队列是空的,线程就不会检查套接字连接。有没有办法让这段代码即使在队列空的时候也能检查套接字呢?(问题在于,没有消息就没有东西可以发送)

我是否需要在消息队列空的时候发送一个特殊的“ping”消息?然后在客户端检查这个消息是日志数据还是一个ping?任何帮助都非常感谢。

def run(self): 
    while not self._terminate: 
        try: 
            msgs = self.msgQ.get()

            self.sock.send(pickle.dumps(msgs))
            rdy = pickle.loads(self.sock.recv(2097152))
        except socket.error, EOFError: 
            print 'log socketmanager closing'
            self.terminate()
            break
        except Empty: pass

3 个回答

0

我有个想法,就是把队列(Queue)和管道(Pipe)封装到一个类里面。然后使用选择模块(select module)来检查套接字(socket)和管道,而不是用轮询(polling)来监控事件。

下面是封装队列和管道的示例代码:

class MyQueue(object):
"""docstring for MyQueue"""
def __init__(self, arg):
    super(MyQueue, self).__init__()
    # self.arg = arg
    self._queue = Queue.Queue()
    self._rdfd, self._wrfd = os.pipe()
    return

def enQ(self, item=None, block=True, timeout=None):
    os.write(self._wrfd, '+')
    self._queue.put(item)
    return

def deQ(self, block=True, timeout=None):
    _itm = self._queue.get(block=block, timeout=timeout)
    _msg = os.read(self._rdfd, 1)
    return _itm

def get_notify(self):
    return self._rdfd

    print 'append queue'
    self.inp.append(self._myq.get_notify())
    print 'append socket'
    self.inp.append(self.sock)
    print self.inp

    _slp = self.slp
    _inp = self.inp
    _out = self.out
    _err = self.err
    _in, _out, _err = select.select(_inp, _out, _err, _slp)
0

如果你在使用 Queue.Queue 这个队列,你可以用 Queue.get() 方法来获取队列里的数据。这个方法有两个可选的参数,分别是 blocktimeout。另外,你也可以使用 Queue.get_nowait() 方法,这个方法和 Queue.get(block=False) 是一样的,如果队列是空的,它会立刻返回,不会等。

try:
    # wait for 1/10 second then return
    msgs = self.msgQ.get(timeout=0.1)
except Queue.Empty, qe:
    # handle empty queue 
1

如果 msgQ 是空的,那么调用 self.msgQ.get() 会抛出一个 Empty 异常,这样就会完全跳过对 self.sock.send()self.sock.recv() 的调用。你处理 Empty 异常的代码没有任何操作,所以你的程序会一直“忙等”,直到 msgQ 中有东西进来,而根本不会调用 sendrecv

一个可能的解决办法是使用 Python 的 select 模块 来在你的异常处理代码中检查套接字。可以参考下面这样的写法:

def run(self): 
    while not self._terminate: 
        try: 
            msgs = self.msgQ.get()

            self.sock.send(pickle.dumps(msgs))
            rdy = pickle.loads(self.sock.recv(2097152))
        except socket.error, EOFError: 
            print 'log socketmanager closing'
            self.terminate()
            break
        except Empty:
            results = select.select([], [], [self.sock], 0.5) # timeout of 0.5 seconds
            if self.sock in results[2]:
                print 'exceptional condition on socket'
                self.terminate()
                break

撰写回答