正确取消接受和关闭Python处理/多进程Listener连接的方法
在这个例子中,我使用的是 pyprocessing 模块,不过如果你用的是 python 2.6 或者 multiprocessing backport,把处理模块换成多进程模块应该也能正常工作。
我现在有一个程序,它监听一个unix套接字(使用 processing.connection.Listener),接受连接并启动一个线程来处理请求。在某个时刻,我想优雅地退出这个进程,但由于 accept() 调用是阻塞的,我找不到一个好的方法来取消它。我有一个方法在这里(OS X)是有效的,就是设置一个信号处理器,然后从另一个线程发信号给这个进程,像这样:
import processing
from processing.connection import Listener
import threading
import time
import os
import signal
import socket
import errno
# This is actually called by the connection handler.
def closeme():
time.sleep(1)
print 'Closing socket...'
listener.close()
os.kill(processing.currentProcess().getPid(), signal.SIGPIPE)
oldsig = signal.signal(signal.SIGPIPE, lambda s, f: None)
listener = Listener('/tmp/asdf', 'AF_UNIX')
# This is a thread that handles one already accepted connection, left out for brevity
threading.Thread(target=closeme).start()
print 'Accepting...'
try:
listener.accept()
except socket.error, e:
if e.args[0] != errno.EINTR:
raise
# Cleanup here...
print 'Done...'
我想到的唯一其他方法是深入连接内部(listener._listener._socket),设置非阻塞选项……但这可能会有一些副作用,而且通常来说让人很害怕。
有没有人有更优雅(甚至可能是正确!)的方法来实现这个?它需要能够在 OS X、Linux 和 BSD 上运行,但在 Windows 上的兼容性就不必要了。
澄清: 谢谢大家!和往常一样,我原问题中的模糊之处被揭示出来了 :)
- 我需要在取消监听后进行清理,但我并不总是想要真正退出这个进程。
- 我需要能够从其他不是同一父进程生成的进程访问这个进程,这让队列变得不太好用。
- 使用线程的原因是:
- 它们访问一个共享状态。实际上是一个共同的内存数据库,所以我想这可以用其他方式实现。
- 我必须能够同时接受多个连接,但实际的线程大部分时间都是在阻塞某些操作。每个接受的连接都会启动一个新线程;这样做是为了不让所有客户端在 I/O 操作时都被阻塞。
关于线程和进程,我使用线程来让我的阻塞操作变得非阻塞,使用进程来实现多进程处理。
5 个回答
我刚接触多进程模块,但我觉得把多进程模块和多线程模块混在一起用有点反常,它们不都是为了处理同样的问题吗?
不过,你可以考虑把你的监听函数放到一个独立的进程里去运行。我不太清楚这样会对你其他的代码产生什么影响,但这可能是一个更简洁的选择。
from multiprocessing import Process
from multiprocessing.connection import Listener
class ListenForConn(Process):
def run(self):
listener = Listener('/tmp/asdf', 'AF_UNIX')
listener.accept()
# do your other handling here
listen_process = ListenForConn()
listen_process.start()
print listen_process.is_alive()
listen_process.terminate()
listen_process.join()
print listen_process.is_alive()
print 'No more listen process.'
我本以为可以绕过这个问题,但看起来我不得不这样做:
from processing import connection
connection.Listener.fileno = lambda self: self._listener._socket.fileno()
import select
l = connection.Listener('/tmp/x', 'AF_UNIX')
r, w, e = select.select((l, ), (), ())
if l in r:
print "Accepting..."
c = l.accept()
# ...
我知道这样做违反了德梅特法则,还引入了一些不太好的修改方式,但似乎这是实现这个目标最简单的方法。如果有人有更优雅的解决方案,我很乐意听听 :)
难道这就是select的用途吗?
只有在select显示socket不会阻塞的时候,才可以调用accept...
select有一个超时时间,这样你就可以偶尔跳出来检查一下,看看是不是该关闭了...