Python: Queue.get()阻塞检查套接字连接的代码
我正在运行一个线程,这个线程从一个消息队列中取出消息,发送给客户端,然后等待确认。如果客户端断开连接,线程会捕捉到一个套接字错误并终止。不过问题是,如果消息队列是空的,线程就不会检查套接字连接。有没有办法让这段代码即使在队列空的时候也能检查套接字呢?(问题在于,没有消息就没有东西可以发送)
我是否需要在消息队列空的时候发送一个特殊的“ping”消息?然后在客户端检查这个消息是日志数据还是一个ping?任何帮助都非常感谢。
def run(self):
while not self._terminate:
try:
msgs = self.msgQ.get()
self.sock.send(pickle.dumps(msgs))
rdy = pickle.loads(self.sock.recv(2097152))
except socket.error, EOFError:
print 'log socketmanager closing'
self.terminate()
break
except Empty: pass
3 个回答
0
我有个想法,就是把队列(Queue)和管道(Pipe)封装到一个类里面。然后使用选择模块(select module)来检查套接字(socket)和管道,而不是用轮询(polling)来监控事件。
下面是封装队列和管道的示例代码:
class MyQueue(object):
"""docstring for MyQueue"""
def __init__(self, arg):
super(MyQueue, self).__init__()
# self.arg = arg
self._queue = Queue.Queue()
self._rdfd, self._wrfd = os.pipe()
return
def enQ(self, item=None, block=True, timeout=None):
os.write(self._wrfd, '+')
self._queue.put(item)
return
def deQ(self, block=True, timeout=None):
_itm = self._queue.get(block=block, timeout=timeout)
_msg = os.read(self._rdfd, 1)
return _itm
def get_notify(self):
return self._rdfd
print 'append queue'
self.inp.append(self._myq.get_notify())
print 'append socket'
self.inp.append(self.sock)
print self.inp
_slp = self.slp
_inp = self.inp
_out = self.out
_err = self.err
_in, _out, _err = select.select(_inp, _out, _err, _slp)
0
如果你在使用 Queue.Queue
这个队列,你可以用 Queue.get()
方法来获取队列里的数据。这个方法有两个可选的参数,分别是 block
和 timeout
。另外,你也可以使用 Queue.get_nowait()
方法,这个方法和 Queue.get(block=False)
是一样的,如果队列是空的,它会立刻返回,不会等。
try:
# wait for 1/10 second then return
msgs = self.msgQ.get(timeout=0.1)
except Queue.Empty, qe:
# handle empty queue
1
如果 msgQ
是空的,那么调用 self.msgQ.get()
会抛出一个 Empty
异常,这样就会完全跳过对 self.sock.send()
和 self.sock.recv()
的调用。你处理 Empty
异常的代码没有任何操作,所以你的程序会一直“忙等”,直到 msgQ
中有东西进来,而根本不会调用 send
或 recv
。
一个可能的解决办法是使用 Python 的 select 模块 来在你的异常处理代码中检查套接字。可以参考下面这样的写法:
def run(self):
while not self._terminate:
try:
msgs = self.msgQ.get()
self.sock.send(pickle.dumps(msgs))
rdy = pickle.loads(self.sock.recv(2097152))
except socket.error, EOFError:
print 'log socketmanager closing'
self.terminate()
break
except Empty:
results = select.select([], [], [self.sock], 0.5) # timeout of 0.5 seconds
if self.sock in results[2]:
print 'exceptional condition on socket'
self.terminate()
break