如何实现非阻塞的socket连接?

15 投票
6 回答
26730 浏览
提问于 2025-04-15 13:16

我这里有个比较简单的问题。我需要同时和很多主机进行通信,但其实我并不需要同步,因为每个请求都是独立的。

所以我选择使用异步套接字,而不是开很多线程来处理。现在我遇到一个小问题:

异步的部分运行得很好,但当我连接到100个主机时,如果有100个超时(超时设置为10秒),我就得等1000秒,结果发现所有的连接都失败了。

有没有办法让套接字连接不阻塞呢?我的套接字已经设置为非阻塞,但调用connect()时还是会阻塞。

减少超时时间不是一个可接受的解决方案。

我是在用Python,但我想编程语言在这个情况下并不重要。

我真的需要使用线程吗?

6 个回答

7

很遗憾,没有示例代码来展示这个错误,所以有点难以理解这个代码块是从哪里来的。

他做的事情大概是:

s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.setblocking(0)
s.connect(("www.nonexistingname.org", 80))

socket模块内部使用了getaddrinfo,这个操作会阻塞,特别是当主机名不存在的时候。一个符合标准的DNS客户端会等一段时间,以确认这个名字是真的不存在,还是只是某些DNS服务器反应慢。

解决办法是只连接到IP地址,或者使用一个允许非阻塞请求的DNS客户端,比如pydns

8

使用 select 模块。这可以让你在多个非阻塞的套接字上等待输入输出的完成。这里有 更多关于 select 的信息。从链接的页面上:

在 C 语言中,编写 select 的代码相对复杂。 在 Python 中,这非常简单,但 它和 C 版本差不多, 所以如果你理解了 Python 中的 select, 在 C 中使用它也不会有太大问题。

ready_to_read, ready_to_write, in_error = select.select(
                  potential_readers, 
                  potential_writers, 
                  potential_errs, 
                  timeout)

你需要给 select 传递三个列表:第一个列表包含你可能想要读取的所有套接字;第二个列表包含你可能想要写入的所有套接字,最后一个列表(通常留空)是你想检查错误的套接字。你需要注意的是,一个套接字可以出现在多个列表中。select 调用是阻塞的,但你可以设置一个超时时间。通常这样做是明智的——给它一个比较长的超时时间(比如一分钟),除非你有充分的理由去做其他的事情。

作为回报,你会得到三个列表。 它们包含了实际上可以读取、可以写入和出错的套接字。每个列表都是你传入的对应列表的一个子集(可能是空的)。如果你把一个套接字放在多个输入列表中,它最多只会出现在一个输出列表中。

如果一个套接字在可读输出列表中,你几乎可以肯定在这个套接字上调用 recv 会返回一些数据。可写列表也是一样。你将能够 send 一些数据。也许不是你想发送的全部,但总比什么都没有好。(实际上,任何健康的套接字都会被视为可写——这只是意味着有可用的出站网络缓冲区空间。)

如果你有一个“服务器”套接字,把它放在潜在的读取者列表中。如果它出现在可读列表中,你的接受操作(accept)几乎肯定会成功。如果你创建了一个新的套接字去连接其他人,把它放在潜在的写入者列表中。如果它出现在可写列表中,你就有很大的机会它已经连接成功。

6

你需要让连接的过程也并行进行,因为当你设置了超时时间后,套接字会被阻塞。另一种方法是你可以不设置超时,然后使用选择模块。

你可以使用 asyncore 模块中的调度器类来实现这个功能。可以看看基本的 HTTP客户端示例。这个类的多个实例在连接时不会互相阻塞。你也可以使用线程来做到这一点,我觉得这样更容易跟踪套接字的超时,但既然你已经在使用异步方法了,那就继续沿着这个方向走吧。

举个例子,下面的代码在我所有的Linux系统上都能运行:

import asyncore, socket

class client(asyncore.dispatcher):
    def __init__(self, host):
        self.host = host
        asyncore.dispatcher.__init__(self)
        self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
        self.connect((host, 22))

    def handle_connect(self):
        print 'Connected to', self.host

    def handle_close(self):
        self.close()

    def handle_write(self):
        self.send('')

    def handle_read(self):
        print ' ', self.recv(1024)

clients = []
for i in range(50, 100):
    clients.append(client('cluster%d' % i))

asyncore.loop()

在 cluster50 到 cluster100 之间,有很多机器是无响应的,或者根本不存在。这段代码会立即开始打印:

Connected to cluster50
  SSH-2.0-OpenSSH_4.3

Connected to cluster51
  SSH-2.0-OpenSSH_4.3

Connected to cluster52
  SSH-2.0-OpenSSH_4.3

Connected to cluster60
  SSH-2.0-OpenSSH_4.3

Connected to cluster61
  SSH-2.0-OpenSSH_4.3

...

不过,这里没有考虑到 getaddrinfo,它必须被阻塞。如果你在解析 DNS 查询时遇到问题,所有的操作都会被迫等待。你可能需要单独收集 DNS 查询,然后在你的异步循环中使用 IP 地址。

如果你想要比 asyncore 更强大的工具,可以看看 Twisted Matrix。虽然它有点复杂,但这是你能找到的最好的 Python 网络编程工具包。

撰写回答