Python:从多线程子进程的标准输出中非阻塞读取

0 投票
2 回答
2892 浏览
提问于 2025-04-15 20:34

我有一个脚本(worker.py),它会以不缓冲的方式输出一些内容,格式是...

1
2
3
.
.
.
n

其中 n 是这个脚本中循环迭代的常量次数。在另一个脚本(service_controller.py)中,我启动了多个线程,每个线程都使用 subprocess.Popen(stdout=subprocess.PIPE, ...) 启动一个子进程。现在,在我的主线程(service_controller.py)中,我想读取每个线程的 worker.py 子进程的输出,并用它来估算完成所需的剩余时间。

我已经实现了读取 worker.py 的标准输出并确定最后打印的数字的逻辑。问题是我不知道怎么做到这一点而不阻塞。如果我读取一个固定的缓冲区大小,那么每次读取都会等待来自每个工作线程的相同数据。我尝试了很多方法,包括使用 fcntl、select + os.read 等等。请问我该怎么做才好?如果需要,我可以发布我的源代码,但我觉得这个解释已经足够清楚问题了。

谢谢大家的帮助。

编辑
添加示例代码

我有一个工作者,它启动了一个子进程。

class WorkerThread(threading.Thread):
    def __init__(self):
        self.completed = 0
        self.process = None
        self.lock = threading.RLock()
        threading.Thread.__init__(self)

    def run(self):
        cmd = ["/path/to/script", "arg1", "arg2"]
        self.process = subprocess.Popen(cmd, stdout=subprocess.PIPE, bufsize=1, shell=False)
        #flags = fcntl.fcntl(self.process.stdout, fcntl.F_GETFL)
        #fcntl.fcntl(self.process.stdout.fileno(), fcntl.F_SETFL, flags | os.O_NONBLOCK)

    def get_completed(self):
        self.lock.acquire();
        fd = select.select([self.process.stdout.fileno()], [], [], 5)[0]
        if fd:
            self.data += os.read(fd, 1)
            try:
                self.completed = int(self.data.split("\n")[-2])
            except IndexError:
                pass
        self.lock.release()
        return self.completed

然后我有一个线程管理器。

class ThreadManager():
    def __init__(self):
        self.pool = []
        self.running = []
        self.lock = threading.Lock()

    def clean_pool(self, pool):
        for worker in [x for x in pool is not x.isAlive()]:
            worker.join()
            pool.remove(worker)
            del worker
        return pool

    def run(self, concurrent=5):
        while len(self.running) + len(self.pool) > 0:
            self.clean_pool(self.running)
            n = min(max(concurrent - len(self.running), 0), len(self.pool))
            if n > 0:
                for worker in self.pool[0:n]:
                    worker.start()
                self.running.extend(self.pool[0:n])
                del self.pool[0:n]
            time.sleep(.01)
         for worker in self.running + self.pool:
             worker.join()

还有一些代码来运行它。

threadManager = ThreadManager()
for i in xrange(0, 5):
    threadManager.pool.append(WorkerThread())
threadManager.run()

我已经去掉了很多其他代码,希望能更好地找到问题所在。

2 个回答

1

你的方法 WorkerThread.run() 启动了一个子进程,然后立刻结束了。其实,run() 需要一直检查这个子进程的状态,并在子进程完成之前更新 WorkerThread.completed 的状态。

2

与其让你的服务控制器因为输入输出操作而被阻塞,不如让线程循环去读取自己控制的进程输出。

这样,你可以在控制进程的线程对象中添加一个方法,用来获取最近一次的输出结果。

当然,在这种情况下,别忘了使用一些锁机制来保护那个缓冲区,因为这个缓冲区会被线程用来填充数据,同时也会被控制器调用的方法用来获取数据。

撰写回答