为什么我的多线程Python脚本使用Queue、threading.Thread和子进程如此不稳定

1 投票
2 回答
715 浏览
提问于 2025-04-16 09:08

我有三个脚本,分别叫 P1、P2 和 P3,我想把它们串联起来运行。这三个脚本需要依次执行,但在任何时候,可能会有多个 P1、P2 和 P3 同时在运行。

我需要在很多文件上快速运行这些脚本,所以我想使用线程来并行处理。

我正在使用 Python 的 Thread、Queue 和 subprocess 模块来实现这个功能。

我的问题是,当线程数量超过一个时,程序的表现就变得不稳定,线程之间的交接也不太正常。有时候,五个线程都能完美运行并完成任务。

这是我第一次尝试使用线程,我相信这主要是因为线程常见的问题,比如竞争条件。但我想知道如何能整理一下我的代码。

实际的代码可以在这里找到(https://github.com/harijay/xtaltools/blob/master/process_multi.py)。下面是伪代码。抱歉如果代码有点乱。

我的问题是,为什么使用这种设计时会出现不稳定的情况。所有线程在任何时候都在访问不同的文件。此外,subprocess.call 只有在脚本执行完毕并且生成的文件写入磁盘后才会返回。

我可以做些什么不同的事情呢?我尽量简洁地解释了我的设计。

我的基本设计:

P1_Queue = Queue()
P2_Queue = Queue()
P3_Queue = Queue()

class P1_Thread(Thread):
    def __init__(self,P1_Queue,P2_Queue):
        Thread.__init__(self)
        self.in_queue = P1_Queue
        self.out_queue = P2_Queue

    def run(self):
        while True:
            my_file_to_process = self.in_queue.get()
            if my_file_to_process = None:
                break
            P1_runner = P1_Runner(my_file_to_process)
            P1_runner.run_p1_using_subprocess()
            self.out_queue.put(my_file_to_process)

类 p1 Runner 接收输入文件的句柄,然后调用 subprocess.call() 来运行一个使用该文件的脚本,并通过 run_p1_using_subprocess 方法生成一个新的输出文件。

class P1_runner(object):

    def __init__(self,inputfile):
        self.my_shell_script = """#!/usr/bin/sh
prog_name <<eof
input 1
...
eof"""
       self.my_shell_script_file = open("some_unique_p1_file_name.sh")
       os.chmod("some_unique_file_name.sh",0755)

    def run_p1_using_subprocess(self):
        subprocess.call([self.my_shell_script_file])

I have essentially similar classes for P2 and P3 . All of which call a shell script that is custom generated

The chaining is achieved using a series of Thread Pools.
p1_worker_list = []
p2_worker_list = []
p3_worker_list = []

for i in range(THREAD_COUNT):
    p1_worker = P1_Thread(P1_Queue,P2_Queue)
    p1_worker.start()
    p1_worker_list.append(p1_worker)

for worker in p1_worker_list:
    worker.join()

And then again the same code block for p2 and p3

for i in range(THREAD_COUNT):
    p2_worker = P2_Thread(P2_Queue,P3_Queue)
    p2_worker.start()
    p2_worker_list.append(p1_worker)

for worker in p2_worker_list:
    worker.join()

非常感谢你的帮助和建议!

2 个回答

1

线程的退出条件是,当另一个线程清空它们的输入队列时,它们就会“自杀”。

    my_file_to_process = self.in_queue.get()
    if my_file_to_process = None:  # my sister ate faster than I did, so...
        break # ... I kill myself!

线程会因为没有找到工作而死去,尽管它们已经准备好继续工作。

你应该让线程进入睡眠状态(等待),直到它们的输入队列有事件被触发,只有当主程序(协调者)发出处理完成的信号时,线程才应该结束(设置自杀标志,并通知所有队列)。

(我看到你已经修改了代码)。

@Falmarri在其他地方的评论可能是说,你的问题并不是关于某个具体的错误(其他人可以回答的),而是因为你在代码中使用threading库的方式是错误的,整体上你对编程语言的使用也很别扭。例如:

  • 调用worker.join()会让主程序等待所有P1线程结束,然后再启动P2线程,这样就失去了并发的意义。
  • 你应该重写Thread.run()方法,或者在构造函数中提供一个可调用对象。没有必要使用Pn_runner类。
  • 所有线程类做的事情都是一样的。你不需要为每个处理阶段创建不同的类。
  • 如果你已经在使用Python,那么调用外部程序(更不用说shell脚本)就没有意义,除非你实在无法用纯Python轻松完成工作。
  • 基于以上原因,让你的程序写shell脚本到文件系统是非常奇怪的,几乎肯定是不必要的。

我建议你解决这个特定问题的方法是:

  1. 尽量坚持使用100%的Python。如果你做不到,或者觉得太难,至少你会找到需要外部访问的具体功能。
  2. 构建一个不使用并发的解决方案。
  3. 测量程序的性能,并尝试通过算法来改进它。
  4. 如果可以的话,避免使用线程。一个CPU密集型的程序在没有线程的情况下会消耗所有可用的计算周期。一个过于依赖磁盘的程序(或者依赖任何外部/远程资源的程序)会在没有其他事情可做时一直等待磁盘。要从线程中受益,程序必须在计算和外部资源使用之间找到合适的平衡(或者必须能够在忙碌时处理到达的请求)。
  5. pythonic的方式进行:从简单开始,逐渐增加功能和复杂性,同时始终避免任何看起来复杂的东西。

如果你的目的是学习Python中的线程,那么一定要寻找一个简单的问题来实验。如果你只是想并行运行几个shell脚本,那么bash和其他shell已经有相关的功能,你不需要使用Python。

2

这真是个大问题:

runner.run()

你绝对不应该手动调用线程的运行方法。启动一个线程应该用 .start()。你的代码乱得一团糟,没人会愿意花时间去找你的错误。

撰写回答