为什么我的多线程Python脚本使用Queue、threading.Thread和子进程如此不稳定
我有三个脚本,分别叫 P1、P2 和 P3,我想把它们串联起来运行。这三个脚本需要依次执行,但在任何时候,可能会有多个 P1、P2 和 P3 同时在运行。
我需要在很多文件上快速运行这些脚本,所以我想使用线程来并行处理。
我正在使用 Python 的 Thread、Queue 和 subprocess 模块来实现这个功能。
我的问题是,当线程数量超过一个时,程序的表现就变得不稳定,线程之间的交接也不太正常。有时候,五个线程都能完美运行并完成任务。
这是我第一次尝试使用线程,我相信这主要是因为线程常见的问题,比如竞争条件。但我想知道如何能整理一下我的代码。
实际的代码可以在这里找到(https://github.com/harijay/xtaltools/blob/master/process_multi.py)。下面是伪代码。抱歉如果代码有点乱。
我的问题是,为什么使用这种设计时会出现不稳定的情况。所有线程在任何时候都在访问不同的文件。此外,subprocess.call 只有在脚本执行完毕并且生成的文件写入磁盘后才会返回。
我可以做些什么不同的事情呢?我尽量简洁地解释了我的设计。
我的基本设计:
P1_Queue = Queue()
P2_Queue = Queue()
P3_Queue = Queue()
class P1_Thread(Thread):
def __init__(self,P1_Queue,P2_Queue):
Thread.__init__(self)
self.in_queue = P1_Queue
self.out_queue = P2_Queue
def run(self):
while True:
my_file_to_process = self.in_queue.get()
if my_file_to_process = None:
break
P1_runner = P1_Runner(my_file_to_process)
P1_runner.run_p1_using_subprocess()
self.out_queue.put(my_file_to_process)
类 p1 Runner 接收输入文件的句柄,然后调用 subprocess.call() 来运行一个使用该文件的脚本,并通过 run_p1_using_subprocess 方法生成一个新的输出文件。
class P1_runner(object):
def __init__(self,inputfile):
self.my_shell_script = """#!/usr/bin/sh
prog_name <<eof
input 1
...
eof"""
self.my_shell_script_file = open("some_unique_p1_file_name.sh")
os.chmod("some_unique_file_name.sh",0755)
def run_p1_using_subprocess(self):
subprocess.call([self.my_shell_script_file])
I have essentially similar classes for P2 and P3 . All of which call a shell script that is custom generated
The chaining is achieved using a series of Thread Pools.
p1_worker_list = []
p2_worker_list = []
p3_worker_list = []
for i in range(THREAD_COUNT):
p1_worker = P1_Thread(P1_Queue,P2_Queue)
p1_worker.start()
p1_worker_list.append(p1_worker)
for worker in p1_worker_list:
worker.join()
And then again the same code block for p2 and p3
for i in range(THREAD_COUNT):
p2_worker = P2_Thread(P2_Queue,P3_Queue)
p2_worker.start()
p2_worker_list.append(p1_worker)
for worker in p2_worker_list:
worker.join()
非常感谢你的帮助和建议!
2 个回答
线程的退出条件是,当另一个线程清空它们的输入队列时,它们就会“自杀”。
my_file_to_process = self.in_queue.get()
if my_file_to_process = None: # my sister ate faster than I did, so...
break # ... I kill myself!
线程会因为没有找到工作而死去,尽管它们已经准备好继续工作。
你应该让线程进入睡眠状态(等待),直到它们的输入队列有事件被触发,只有当主程序(协调者)发出处理完成的信号时,线程才应该结束(设置自杀标志,并通知所有队列)。
(我看到你已经修改了代码)。
@Falmarri在其他地方的评论可能是说,你的问题并不是关于某个具体的错误(其他人可以回答的),而是因为你在代码中使用threading
库的方式是错误的,整体上你对编程语言的使用也很别扭。例如:
- 调用
worker.join()
会让主程序等待所有P1线程结束,然后再启动P2线程,这样就失去了并发的意义。 - 你应该重写
Thread.run()
方法,或者在构造函数中提供一个可调用对象。没有必要使用Pn_runner
类。 - 所有线程类做的事情都是一样的。你不需要为每个处理阶段创建不同的类。
- 如果你已经在使用Python,那么调用外部程序(更不用说shell脚本)就没有意义,除非你实在无法用纯Python轻松完成工作。
- 基于以上原因,让你的程序写shell脚本到文件系统是非常奇怪的,几乎肯定是不必要的。
我建议你解决这个特定问题的方法是:
- 尽量坚持使用100%的Python。如果你做不到,或者觉得太难,至少你会找到需要外部访问的具体功能。
- 构建一个不使用并发的解决方案。
- 测量程序的性能,并尝试通过算法来改进它。
- 如果可以的话,避免使用线程。一个CPU密集型的程序在没有线程的情况下会消耗所有可用的计算周期。一个过于依赖磁盘的程序(或者依赖任何外部/远程资源的程序)会在没有其他事情可做时一直等待磁盘。要从线程中受益,程序必须在计算和外部资源使用之间找到合适的平衡(或者必须能够在忙碌时处理到达的请求)。
- 以pythonic的方式进行:从简单开始,逐渐增加功能和复杂性,同时始终避免任何看起来复杂的东西。
如果你的目的是学习Python中的线程,那么一定要寻找一个简单的问题来实验。如果你只是想并行运行几个shell脚本,那么bash
和其他shell已经有相关的功能,你不需要使用Python。
这真是个大问题:
runner.run()
你绝对不应该手动调用线程的运行方法。启动一个线程应该用 .start()。你的代码乱得一团糟,没人会愿意花时间去找你的错误。