Python的asyncore定期发送数据时使用可变超时,有更好的方法吗?
我想写一个服务器,让客户端可以连接上去,并定期接收更新,而不需要不断去询问。我的问题是,使用asyncore时,如果在dispatcher.writable()被调用时不返回true,你就得等到asyncore.loop超时后才能继续(默认超时时间是30秒)。
我尝试过两种解决方法:第一,减少超时时间到一个很小的值;第二,查询连接下次更新的时间,然后生成一个合适的超时时间。不过,如果你查一下'man 2 select_tut'里的“选择法则”,上面说,“你应该尽量在没有超时的情况下使用select()。”
有没有更好的方法呢?比如说Twisted?我想尽量避免使用额外的线程。我这里会提供一个可变超时的例子:
#!/usr/bin/python
import time
import socket
import asyncore
# in seconds
UPDATE_PERIOD = 4.0
class Channel(asyncore.dispatcher):
def __init__(self, sock, sck_map):
asyncore.dispatcher.__init__(self, sock=sock, map=sck_map)
self.last_update = 0.0 # should update immediately
self.send_buf = ''
self.recv_buf = ''
def writable(self):
return len(self.send_buf) > 0
def handle_write(self):
nbytes = self.send(self.send_buf)
self.send_buf = self.send_buf[nbytes:]
def handle_read(self):
print 'read'
print 'recv:', self.recv(4096)
def handle_close(self):
print 'close'
self.close()
# added for variable timeout
def update(self):
if time.time() >= self.next_update():
self.send_buf += 'hello %f\n'%(time.time())
self.last_update = time.time()
def next_update(self):
return self.last_update + UPDATE_PERIOD
class Server(asyncore.dispatcher):
def __init__(self, port, sck_map):
asyncore.dispatcher.__init__(self, map=sck_map)
self.port = port
self.sck_map = sck_map
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
self.bind( ("", port))
self.listen(16)
print "listening on port", self.port
def handle_accept(self):
(conn, addr) = self.accept()
Channel(sock=conn, sck_map=self.sck_map)
# added for variable timeout
def update(self):
pass
def next_update(self):
return None
sck_map = {}
server = Server(9090, sck_map)
while True:
next_update = time.time() + 30.0
for c in sck_map.values():
c.update() # <-- fill write buffers
n = c.next_update()
#print 'n:',n
if n is not None:
next_update = min(next_update, n)
_timeout = max(0.1, next_update - time.time())
asyncore.loop(timeout=_timeout, count=1, map=sck_map)
5 个回答
这基本上是demiurgus的解决方案,只是把一些粗糙的地方处理得更圆滑了。它保留了他最初的想法,但避免了运行时错误和忙碌的循环,并且经过了测试。[编辑:解决了在_delay期间修改调度器的问题]
class asynschedcore(sched.scheduler):
"""Combine sched.scheduler and asyncore.loop."""
# On receiving a signal asyncore kindly restarts select. However the signal
# handler might change the scheduler instance. This tunable determines the
# maximum time in seconds to spend in asycore.loop before reexamining the
# scheduler.
maxloop = 30
def __init__(self, map=None):
sched.scheduler.__init__(self, time.time, self._delay)
if map is None:
self._asynmap = asyncore.socket_map
else:
self._asynmap = map
self._abort_delay = False
def _maybe_abort_delay(self):
if not self._abort_delay:
return False
# Returning from this function causes the next event to be executed, so
# it might be executed too early. This can be avoided by modifying the
# head of the queue. Also note that enterabs sets _abort_delay to True.
self.enterabs(0, 0, lambda:None, ())
self._abort_delay = False
return True
def _delay(self, timeout):
if self._maybe_abort_delay():
return
if 0 == timeout:
# Should we support this hack, too?
# asyncore.loop(0, map=self._asynmap, count=1)
return
now = time.time()
finish = now + timeout
while now < finish and self._asynmap:
asyncore.loop(min(finish - now, self.maxloop), map=self._asynmap,
count=1)
if self._maybe_abort_delay():
return
now = time.time()
if now < finish:
time.sleep(finish - now)
def enterabs(self, abstime, priority, action, argument):
# We might insert an event before the currently next event.
self._abort_delay = True
return sched.scheduler.enterabs(self, abstime, priority, action,
argument)
# Overwriting enter is not necessary, because it is implemented using enter.
def cancel(self, event):
# We might cancel the next event.
self._abort_delay = True
return sched.scheduler.cancel(self, event)
def run(self):
"""Runs as long as either an event is scheduled or there are
sockets in the map."""
while True:
if not self.empty():
sched.scheduler.run(self)
elif self._asynmap:
asyncore.loop(self.maxloop, map=self._asynmap, count=1)
else:
break
也许你可以用 sched.scheduler
来实现这个功能,像这样(注意:这个代码没有经过测试):
import sched, asyncore, time
# Create a scheduler with a delay function that calls asyncore.loop
scheduler = sched.scheduler(time.time, lambda t: _poll_loop(t, time.time()) )
# Add the update timeouts with scheduler.enter
# ...
def _poll_loop(timeout, start_time):
asyncore.loop(timeout, count=1)
finish_time = time.time()
timeleft = finish_time - start_time
if timeleft > timeout: # there was a message and the timeout delay is not finished
_poll_loop(timeleft, finish_time) # so wait some more polling the socket
def main_loop():
while True:
if scheduler.empty():
asyncore.loop(30.0, count=1) # just default timeout, use what suits you
# add other work that might create scheduled events here
else:
scheduler.run()
“选择法则”在你的情况中不适用,因为你不仅有客户端触发的(纯服务器)活动,还有时间触发的活动——这正是选择超时的作用。这个法则其实应该说的是:“如果你设定了超时,确保在超时到达时你真的需要做一些有用的事情。”这个法则是为了防止忙等待;而你的代码并没有忙等待。
我建议不要把_timeout设置为0.1和下一个更新时间中的最大值,而是设置为0.0和下一个超时中的最大值。换句话说,如果在你进行更新的时候,更新周期已经过期,那你应该立刻进行那个特定的更新。
与其每次都询问每个频道是否想要更新,不如把所有频道存储在一个优先队列中(按下一个更新时间排序),然后只对最早的频道进行更新(直到你找到一个还没到更新时间的频道)。你可以使用heapq模块来实现这个功能。
你还可以通过不让每个频道都询问当前时间来节省一些系统调用,而是只查询一次当前时间,然后把这个时间传递给.update。