Gevent.monkey.patch_all 破坏依赖 socket.shutdown() 的代码

5 投票
1 回答
2603 浏览
提问于 2025-04-17 22:34

我现在正在为一个已有的 Django 项目添加对 gevent-socketio 的支持。发现调用 gevent.monkey.patch_all() 后,负责从套接字接收数据的线程的取消机制出现了问题,我们暂时称这个类为 SocketReadThread

SocketReadThread 的功能很简单,它在一个阻塞的套接字上调用 recv()。当接收到数据时,它会处理这些数据,然后再次调用 recv()。当发生异常或者 recv() 返回 0 字节时(比如在 SocketReadThread.stop_reading() 中调用 socket.shutdown(SHUT_RDWR) 时),线程就会停止。

问题出现在 gevent.monkey.patch_all() 替换了默认的套接字实现后。这样一来,我并没有正常关闭套接字,而是遇到了以下异常:

error: [Errno 9] File descriptor was closed in another greenlet

我猜这是因为 gevent 让我的套接字变成了非阻塞的,以便它能正常工作。这意味着当我调用 socket.shutdown(socket.SHUT_RDWR) 时,负责处理 monkey patch 的 socket.recv 调用的绿色线程试图从已关闭的文件描述符中读取数据。

我写了一个示例来单独测试这个问题:

from gevent import monkey

monkey.patch_all()

import socket
import sys
import threading
import time


class SocketReadThread(threading.Thread):
    def __init__(self, socket):
        super(SocketReadThread, self).__init__()
        self._socket = socket

    def run(self):
        connected = True
        while connected:
            try:
                print "calling socket.recv"
                data = self._socket.recv(1024)
                if (len(data) < 1):
                    print "received nothing, assuming socket shutdown"
                    connected = False
                else :
                    print "Recieved something: {}".format(data)
            except socket.timeout as e:
                print "Socket timeout: {}".format(e)
                connected = false
            except :
                ex = sys.exc_info()[1]
                print "Unexpected exception occurrred: {}".format(str(ex))
                raise ex

    def stop_reading(self):
        self._socket.shutdown(socket.SHUT_RDWR)
        self._socket.close()


if __name__ == '__main__':

    sock = socket.socket()
    sock.connect(('127.0.0.1', 4242))

    st = SocketReadThread(sock)
    st.start()
    time.sleep(3)
    st.stop_reading()
    st.join()

如果你打开一个终端并运行 nc -lp 4242 &(给这个程序提供一个连接的目标),然后运行我的程序,你会看到上面提到的异常。如果你去掉对 monkey.patch_all() 的调用,你会发现程序运行得很好。

我的问题是:如何在支持或不支持 gevent monkey patch 的情况下,取消 SocketReadThread 的操作,并且不需要使用任意超时,这样会导致取消变得缓慢(比如调用 recv() 时设置超时并检查条件)?

1 个回答

1

我发现有两种不同的方法可以解决这个问题。第一种方法就是简单地捕捉并忽略这个异常。这种做法看起来没问题,因为在编程中,一个线程关闭一个套接字(socket)是很常见的做法,这样可以让另一个线程退出正在进行的阻塞读取。我不太明白为什么绿色线程(greenlets)会对此抱怨,除了可能是为了调试方便。这其实只是让人烦恼而已。

第二种选择是使用自管道(self-pipe)技巧(快速搜索一下就能找到很多解释),这是一种唤醒被阻塞线程的机制。简单来说,我们创建一个第二个文件描述符(套接字在操作系统中就像一种文件描述符)来发送取消信号。然后我们使用选择(select)来阻塞等待,看看是套接字上有新数据到来,还是取消请求通过取消文件描述符到来。下面是示例代码。

from gevent import monkey

monkey.patch_all()

import os
import select
import socket
import sys
import threading
import time


class SocketReadThread(threading.Thread):
    def __init__(self, socket):
        super(SocketReadThread, self).__init__()
        self._socket = socket
        self._socket.setblocking(0)
        r, w = os.pipe()
        self._cancelpipe_r = os.fdopen(r, 'r')
        self._cancelpipe_w = os.fdopen(w, 'w')

    def run(self):
        connected = True
        read_fds = [self._socket, self._cancelpipe_r]
        while connected:
            print "Calling select"
            read_list, write_list, x_list = select.select(read_fds, [], [])
            print "Select returned"
            if self._cancelpipe_r in read_list :
                print "exiting"
                self._cleanup()
                connected = False
            elif self._socket in read_list:
                print "calling socket.recv"
                data = self._socket.recv(1024)
                if (len(data) < 1):
                    print "received nothing, assuming socket shutdown"
                    connected = False
                    self._cleanup()
                else :
                    print "Recieved something: {}".format(data)


    def stop_reading(self):
        print "writing to pipe"
        self._cancelpipe_w.write("\n")
        self._cancelpipe_w.flush()
        print "joining"
        self.join()
        print "joined"

    def _cleanup(self):
        self._cancelpipe_r.close()
        self._cancelpipe_w.close()
        self._socket.shutdown(socket.SHUT_RDWR)
        self._socket.close()


if __name__ == '__main__':

    sock = socket.socket()
    sock.connect(('127.0.0.1', 4242))

    st = SocketReadThread(sock)
    st.start()
    time.sleep(3)
    st.stop_reading()

再次提醒,在运行上面的程序之前,请先运行 netcat -lp 4242 & 来创建一个监听的套接字,以便连接。

撰写回答