如何限制Python中的活跃线程数量?

12 投票
7 回答
14722 浏览
提问于 2025-04-15 16:19

我刚开始学习Python,正在研究threading(多线程),我想做一些音乐文件的转换,并希望能利用我电脑上的多个核心(每个核心一个正在进行的转换线程)。

class EncodeThread(threading.Thread):
    # this is hacked together a bit, but should give you an idea
    def run(self):
        decode = subprocess.Popen(["flac","--decode","--stdout",self.src],
                            stdout=subprocess.PIPE)
        encode = subprocess.Popen(["lame","--quiet","-",self.dest],
                                stdin=decode.stdout)
        encode.communicate()

# some other code puts these threads with various src/dest pairs in a list

for proc in threads: # `threads` is my list of `threading.Thread` objects
    proc.start()

一切都运作良好,所有文件都被编码了,太棒了!不过,所有的进程都是立刻启动的,但我只想同时运行两个(每个核心一个)。一旦有一个完成了,我希望它能继续处理列表中的下一个,直到所有都完成,然后再继续执行程序。

我该怎么做呢?

(我看过线程池和队列的功能,但找不到简单的解决办法。)

补充:也许我应该补充一下,我的每个线程都在使用subprocess.Popen来运行一个单独的命令行解码器(flac),然后将输出传给一个命令行编码器(lame/mp3)。

7 个回答

1

简单来说:别用线程。

如果你想看看一个实际的例子,我最近在工作中做了一个小工具。它是一个围绕 ssh 的小包装器,可以运行可配置数量的 Popen() 子进程。我把它放在了这里:Bitbucket: classh (集群管理员的 ssh 包装器)

正如我所说,我不使用线程;我只是启动子进程,然后循环调用它们的 .poll() 方法,检查是否超时(这个也可以配置),并在收集结果时补充进程池。我尝试过不同的 sleep() 值,之前我写过一个版本(在 Python 加入 subprocess 模块之前),使用了 signal 模块(SIGCHLD 和 SIGALRM)以及 os.fork()os.execve() 函数——这涉及到管道和文件描述符等)。

在我的情况下,我在收集结果时逐步打印出来……并且记住所有结果,以便在最后总结(当所有任务完成或因超时被终止时)。

我在一个包含 25,000 个内部主机的列表上运行了这个程序(其中许多主机已经关闭、退役、位于国际上,或者我的测试账号无法访问等)。它在两个多小时内完成了任务,没有出现任何问题。(大约有 60 个因为系统处于不正常状态而超时——这证明我的超时处理是正确的)。

所以我知道这个模型是可靠的。用这段代码同时运行 100 个当前的 ssh 进程似乎没有造成明显的影响。(这是一个稍微旧一点的 FreeBSD 机器)。我以前在我的旧 512MB 笔记本上也能无问题地运行旧版(在 subprocess 之前)同时处理 100 个进程)。

(顺便说一下:我打算整理一下这个工具并添加一些功能;欢迎你贡献代码或者自己克隆一个分支;这就是 Bitbucket.org 的用途)。

4

“我每个线程都在用 subprocess.Popen 来运行一个单独的命令行[进程]。”

为什么要让一堆线程去管理一堆进程呢?这正是操作系统为你做的事情。为什么要去微管理操作系统已经在管理的东西呢?

与其费劲心思让线程去监督进程,不如直接创建进程。你的进程表可能无法处理2000个进程,但处理几十个(也许几百个)是没问题的。

你希望有比你的CPU能处理的更多的工作在排队。真正需要关注的是内存问题,而不是进程或线程。如果所有进程的活跃数据总和超过了物理内存,那么数据就得被交换,这样会让你变得很慢。

如果你的进程占用的内存比较小,你可以同时运行很多个。如果你的进程占用的内存比较大,那你就不能同时运行太多了。

40

如果你想限制同时运行的线程数量,可以使用一个叫做信号量的东西:

threadLimiter = threading.BoundedSemaphore(maximumNumberOfThreads)

class EncodeThread(threading.Thread):

    def run(self):
        threadLimiter.acquire()
        try:
            <your code here>
        finally:
            threadLimiter.release()

你可以一次性启动所有线程。除了maximumNumberOfThreads这个数量的线程之外,其他线程会在threadLimiter.acquire()这里等待。只有当一个线程完成并通过threadLimiter.release()时,等待的线程才会继续执行。

撰写回答