处理选择中的超出范围问题

1 投票
4 回答
1414 浏览
提问于 2025-04-17 15:44

有很多工人(进程),超过1000个,他们在做一些工作,并想把结果保存到数据库里。工作的结果是一个JSON对象。每个工人每秒可以生成1到5个JSON对象。保存结果的过程是一个单独的进程。工人和保存者之间的连接是单向的,使用的是multiprocessing.Pipe。管道的数量和工人的数量是一样的。

在保存者的进程中,我会定期调用:

def recv_data(self):
    data = []
    for pipe in self.data_pipe_pool:
        if pipe.poll():
            data.append(pipe.recv())
    return data

self.data_pipe_pool - 这是一个包含所有工人管道的列表。

如果我运行大约100个工人,一切都很好。但如果我运行超过1000个工人,就会出现异常:

2013-02-13T15:17:40.731429
Traceback (most recent call last):
  File "saver.py", line 44, in run
    profile = self.poll_data()
  File "saver.py", line 116, in poll_data
    ret = self.recv_data()
  File "saver_unit.py", line 127, in recv_data
    if pipe.poll():
IOError: handle out of range in select()

我知道这和select()调用有关,并且:

在GNU/Linux系统中,FD_SETSIZE通常被定义为1024。

但是,select是在哪里被调用的呢?如果是在pipe.poll()中,为什么我会超过FD_SETSIZE的限制?我每次只对一个管道调用pipe.poll()。我在哪里可以查看Python语言的源代码,找到这个select的调用呢?

有没有什么方法可以避免超过FD_SETSIZE的限制,或者不使用select呢?

4 个回答

0

我用epoll解决了这个问题。方法非常简单:

def set_data_pipe_poll(self, data_pipe_poll):
    self.epoll = select.epoll()
    for p in data_pipe_poll:
        self.epoll.register(p, select.EPOLLIN)
    self.data_pipe_poll = data_pipe_poll

def recv_data(self):
    data = []
    events = self.epoll.poll(timeout = 0)
    for fileno, _ in events:
        p = filter(lambda x: x.fileno() == fileno, self.data_pipe_poll)[0]
        data.append(p.recv())
    return data

当我调用 epoll.poll() 的时候,我就不再调用 select 了。

0

你有没有想过用像 beanstalkd 这样的东西?听起来你有一个从工作者到服务器的连接,而通常情况下,你会有成千上万的工作者和十几台服务器从结果队列中读取数据,然后把它们存到数据库里。

这样做的好处是,你的工作者在完成工作时可能花费的时间比和服务器沟通的时间要多得多,但你依然有一个连接保持着。

你可以设置一个“任务队列”,让成千上万的工作者从中获取任务,然后再设置一个“结果队列”,让他们把结果存放在这里。之后,你可以有很多服务器从“结果队列”中取数据,存入数据库。

简单来说,这样做可以避免你用完文件句柄。

2

如果你查看一下select的手册页面,你会看到:

如果用负数或者大于等于FD_SETSIZE的值来执行FD_CLR()或FD_SET(),就会出现不确定的行为。

这就是说,如果在你的poll调用中,后台使用了select(这很有可能),而且你有的文件描述符超过了FD_SETSIZE(如果你有超过1000个管道,这种情况很常见),那么结果可能会是任何东西。

撰写回答