正确取消接受和关闭Python处理/多进程Listener连接的方法

11 投票
5 回答
5247 浏览
提问于 2025-04-11 20:17

在这个例子中,我使用的是 pyprocessing 模块,不过如果你用的是 python 2.6 或者 multiprocessing backport,把处理模块换成多进程模块应该也能正常工作。

我现在有一个程序,它监听一个unix套接字(使用 processing.connection.Listener),接受连接并启动一个线程来处理请求。在某个时刻,我想优雅地退出这个进程,但由于 accept() 调用是阻塞的,我找不到一个好的方法来取消它。我有一个方法在这里(OS X)是有效的,就是设置一个信号处理器,然后从另一个线程发信号给这个进程,像这样:

import processing
from processing.connection import Listener
import threading
import time
import os
import signal
import socket
import errno

# This is actually called by the connection handler.
def closeme():
    time.sleep(1)
    print 'Closing socket...'
    listener.close()
    os.kill(processing.currentProcess().getPid(), signal.SIGPIPE)

oldsig = signal.signal(signal.SIGPIPE, lambda s, f: None)

listener = Listener('/tmp/asdf', 'AF_UNIX')
# This is a thread that handles one already accepted connection, left out for brevity
threading.Thread(target=closeme).start()
print 'Accepting...'
try:
    listener.accept()
except socket.error, e:
    if e.args[0] != errno.EINTR:
        raise
# Cleanup here...
print 'Done...'

我想到的唯一其他方法是深入连接内部(listener._listener._socket),设置非阻塞选项……但这可能会有一些副作用,而且通常来说让人很害怕。

有没有人有更优雅(甚至可能是正确!)的方法来实现这个?它需要能够在 OS X、Linux 和 BSD 上运行,但在 Windows 上的兼容性就不必要了。

澄清: 谢谢大家!和往常一样,我原问题中的模糊之处被揭示出来了 :)

  • 我需要在取消监听后进行清理,但我并不总是想要真正退出这个进程。
  • 我需要能够从其他不是同一父进程生成的进程访问这个进程,这让队列变得不太好用。
  • 使用线程的原因是:
    • 它们访问一个共享状态。实际上是一个共同的内存数据库,所以我想这可以用其他方式实现。
    • 我必须能够同时接受多个连接,但实际的线程大部分时间都是在阻塞某些操作。每个接受的连接都会启动一个新线程;这样做是为了不让所有客户端在 I/O 操作时都被阻塞。

关于线程和进程,我使用线程来让我的阻塞操作变得非阻塞,使用进程来实现多进程处理。

5 个回答

1

我刚接触多进程模块,但我觉得把多进程模块和多线程模块混在一起用有点反常,它们不都是为了处理同样的问题吗?

不过,你可以考虑把你的监听函数放到一个独立的进程里去运行。我不太清楚这样会对你其他的代码产生什么影响,但这可能是一个更简洁的选择。

from multiprocessing import Process
from multiprocessing.connection import Listener


class ListenForConn(Process):

    def run(self):
        listener = Listener('/tmp/asdf', 'AF_UNIX')
        listener.accept()

        # do your other handling here


listen_process = ListenForConn()
listen_process.start()

print listen_process.is_alive()

listen_process.terminate()
listen_process.join()

print listen_process.is_alive()
print 'No more listen process.'
3

我本以为可以绕过这个问题,但看起来我不得不这样做:

from processing import connection
connection.Listener.fileno = lambda self: self._listener._socket.fileno()

import select

l = connection.Listener('/tmp/x', 'AF_UNIX')
r, w, e = select.select((l, ), (), ())
if l in r:
  print "Accepting..."
  c = l.accept()
  # ...

我知道这样做违反了德梅特法则,还引入了一些不太好的修改方式,但似乎这是实现这个目标最简单的方法。如果有人有更优雅的解决方案,我很乐意听听 :)

3

难道这就是select的用途吗?

只有在select显示socket不会阻塞的时候,才可以调用accept...

select有一个超时时间,这样你就可以偶尔跳出来检查一下,看看是不是该关闭了...

撰写回答