如何加快与子进程的通信速度
我正在使用Python 2中的subprocess
和threading
线程来处理标准输入,然后用程序A
、B
和C
来处理这些输入,并把修改后的数据写入标准输出。
这个脚本(我们叫它A_to_C.py
)运行得非常慢,我想知道怎么能改善它。
整体流程如下:
A_process = subprocess.Popen(['A', '-'], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
produce_A_thread = threading.Thread(target=produceA, args=(sys.stdin, A_process.stdin))
B_process = subprocess.Popen(['B', '-'], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
convert_A_to_B_thread = threading.Thread(target=produceB, args=(A_process.stdout, B_process.stdin))
C_process = subprocess.Popen(['C', '-'], stdin=subprocess.PIPE)
convert_B_to_C_thread = threading.Thread(target=produceC, args=(B_process.stdout, C_process.stdin))
produce_A_thread.start()
convert_A_to_B_thread.start()
convert_B_to_C_thread.start()
produce_A_thread.join()
convert_A_to_B_thread.join()
convert_B_to_C_thread.join()
A_process.wait()
B_process.wait()
C_process.wait()
这个脚本的想法是标准输入会传入A_to_C.py
:
- 程序
A
处理一部分标准输入,并通过函数produceA
生成A
的输出。 - 程序
B
处理A
的标准输出的一部分,并通过函数produceB
生成B
的输出。 - 程序
C
处理B
的标准输出的一部分,通过函数produceC
生成C
的输出,并将C
的输出写入标准输出。
我用cProfile做了性能分析,发现这个脚本几乎所有的时间都花在获取线程锁上。
举个例子,在一个417秒的测试任务中,花了416秒(>99%的总运行时间)在获取线程锁上:
$ python
Python 2.6.6 (r266:84292, Nov 21 2013, 10:50:32)
[GCC 4.4.7 20120313 (Red Hat 4.4.7-4)] on linux2
Type "help", "copyright", "credits" or "license" for more information.
>>> import pstats
>>> p = pstats.Stats('1.profile')
>>> p.sort_stats('cumulative').print_stats(10)
Thu Jun 12 22:19:07 2014 1.profile
1755 function calls (1752 primitive calls) in 417.203 CPU seconds
Ordered by: cumulative time
List reduced from 162 to 10 due to restriction <10>
ncalls tottime percall cumtime percall filename:lineno(function)
1 0.020 0.020 417.203 417.203 A_to_C.py:90(<module>)
1 0.000 0.000 417.123 417.123 A_to_C.py:809(main)
6 0.000 0.000 416.424 69.404 /foo/python/2.7.3/lib/python2.7/threading.py:234(wait)
32 416.424 13.013 416.424 13.013 {method 'acquire' of 'thread.lock' objects}
3 0.000 0.000 416.422 138.807 /foo/python/2.7.3/lib/python2.7/threading.py:648(join)
3 0.000 0.000 0.498 0.166 A_to_C.py:473(which)
37 0.000 0.000 0.498 0.013 A_to_C.py:475(is_exe)
3 0.496 0.165 0.496 0.165 {posix.access}
6 0.000 0.000 0.194 0.032 /foo/python/2.7.3/lib/python2.7/subprocess.py:475(_eintr_retry_call)
3 0.000 0.000 0.191 0.064 /foo/python/2.7.3/lib/python2.7/subprocess.py:1286(wait)
我在使用threading.Thread
和subprocess.Popen
时做错了什么,导致了这个问题?
5 个回答
这个场景特别适合用管道来处理,管道的并行处理是由操作系统自动管理的。既然你想要一个一脚本解决方案,下面就是了:
#! /usr/bin/python2
import sys
import subprocess
import pipes
# Define these as needed
def produceA(input, output):
output.write(input.read())
def produceB(input, output):
output.write(input.read())
def produceC(input, output):
output.write(input.read())
# Magic starts here
COMMAND = "{me} prepare_A | A - | {me} A_to_B | B - | {me} B_to_C | C -"
def bootstrap(input, output):
"""Prepares and runs the pipeline."""
me = "./{}".format(pipes.quote(__file__))
subprocess.call(
COMMAND.format(me=me),
stdin=input, stdout=output, shell=True, bufsize=-1
)
if __name__ == '__main__':
ACTIONS = {
"prepare_A": produceA,
"A_to_B": produceB,
"B_to_C": produceC
}
action = ACTIONS[sys.argv[1]] if len(sys.argv) > 1 else bootstrap
action(sys.stdin, sys.stdout)
这个脚本会根据你指定的命令,设置一个管道或者运行其中一个 produce
函数。
让它可以执行,然后在没有参数的情况下运行:
./A_to_C.py < A.txt > C.txt
注意:看起来你在使用 Python 2.6,所以这个解决方案是针对 Python 2.x 的,不过在 Python 3.x 中也应该能正常运行,只是 quote
函数从 Python 3.3 开始被移到了 shlex
中。
因为你在评论中提到了 popen()
和 pthreads
,我猜你是在使用 POSIX 系统(可能是 Linux)。
你有没有试过用 subprocess32
来替代标准的 subprocess
库呢?
文档中强烈推荐使用这个库,它可能会带来一些改进。
顺便说一下,我觉得把进程(subprocess
)和线程混合使用是 一个坏主意。
另外,为什么 python produceA.py | A | python produceB.py | B | python produceC.py | C
这种写法不符合你的需求呢?或者用 subprocess
实现的等效写法呢?
简而言之 如果你的程序运行得比预期慢,可能是因为中间函数的具体操作,而不是因为进程间通信(IPC)或线程问题。可以用简单的模拟函数和进程来测试,专注于数据在子进程之间传递的开销。在一个与您的代码非常相似的基准测试中,数据在子进程之间的传递性能似乎与直接使用 shell 管道相当;Python 在这方面并不特别慢。
原始代码发生了什么
原始代码的一般形式是:
def produceB(from_stream, to_stream):
while True:
buf = from_stream.read()
processed_buf = do_expensive_calculation(buf)
to_stream.write(processed_buf)
在这里,读和写之间的计算大约占所有进程(主进程和子进程)总 CPU 时间的 2/3 - 这里说的是 CPU 时间,而不是实际经过的时间。
我认为这会影响 I/O 的运行速度。读取、写入和计算每个都需要自己的线程,并且需要使用队列来在读取和计算之间、计算和写入之间提供缓冲(因为管道提供的缓冲不足,我认为)。
我下面展示,如果在读取和写入之间没有处理(或者说:如果中间处理在单独的线程中完成),那么线程和子进程的吞吐量会非常高。也可以为读取和写入各自设置线程;这会增加一些开销,但可以让写入不阻塞读取,反之亦然。三个线程(读取、写入和处理)效果更好,这样任何一个步骤都不会阻塞其他步骤(当然,这在队列大小的限制内)。
一些基准测试
下面的所有基准测试都是在 Ubuntu 14.04LTS 64位(Intel i7,Ivy Bridge,四核)上使用 Python 2.7.6 进行的。测试是将大约 1GB 的数据以 4KB 的块在两个 dd
进程之间传输,并通过 Python 作为中介。dd 进程使用中等大小(4KB)的块;典型的文本 I/O 会更小(除非被解释器巧妙地缓冲等),典型的二进制 I/O 当然会大得多。我有一个例子是完全基于你做的这个,还有一个例子是基于我之前尝试过的一种替代方法(结果证明更慢)。顺便说一下,感谢你提出这个问题,这很有用。
线程和阻塞 I/O
首先,让我们把问题中的原始代码转换成一个稍微简单一点的自包含示例。这只是两个进程通过一个线程进行通信,线程负责将数据从一个进程传输到另一个进程,进行阻塞的读取和写入。
import subprocess, threading
A_process = subprocess.Popen(["dd", "if=/dev/zero", "bs=4k", "count=244140"], stdout=subprocess.PIPE)
B_process = subprocess.Popen(["dd", "of=/dev/null", "bs=4k"], stdin=subprocess.PIPE)
def convert_A_to_B(src, dst):
read_size = 8*1024
while True:
try:
buf = src.read(read_size)
if len(buf) == 0: # This is a bit hacky, but seems to reliably happen when the src is closed
break
dst.write(buf)
except ValueError as e: # Reading or writing on a closed fd causes ValueError, not IOError
print str(e)
break
convert_A_to_B_thread = threading.Thread(target=convert_A_to_B, args=(A_process.stdout, B_process.stdin))
convert_A_to_B_thread.start()
# Here, watch out for the exact sequence to clean things up
convert_A_to_B_thread.join()
A_process.wait()
B_process.stdin.close()
B_process.wait()
结果:
244140+0 records in
244140+0 records out
999997440 bytes (1.0 GB) copied, 0.638977 s, 1.6 GB/s
244140+0 records in
244140+0 records out
999997440 bytes (1.0 GB) copied, 0.635499 s, 1.6 GB/s
real 0m0.678s
user 0m0.657s
sys 0m1.273s
不错!结果表明,在这种情况下,理想的读取大小大约是 8k-16KB,太小或太大的大小都会稍微慢一些。这可能与我们要求 dd 使用的 4KB 块大小有关。
选择和非阻塞 I/O
在我之前研究这种问题时,我尝试使用 select()
、非阻塞 I/O 和单线程。一个例子在我这里的问题中:如何异步读取和写入子进程?。那是为了并行读取两个进程,我在下面扩展到从一个进程读取并写入另一个进程。非阻塞写入的大小限制为 PIPE_BUF 或更小,在我的系统上是 4KB;为了简单起见,读取也设置为 4KB,尽管它们可以是任何大小。这有一些奇怪的边缘情况(以及根据细节可能出现的不可解释的挂起),但在下面的形式中它可靠地工作。
import subprocess, select, fcntl, os, sys
p1 = subprocess.Popen(["dd", "if=/dev/zero", "bs=4k", "count=244140"], stdout=subprocess.PIPE)
p2 = subprocess.Popen(["dd", "of=/dev/null", "bs=4k"], stdin=subprocess.PIPE)
def make_nonblocking(fd):
flags = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
make_nonblocking(p1.stdout)
make_nonblocking(p2.stdin)
print "PIPE_BUF = %d" % (select.PIPE_BUF)
read_size = select.PIPE_BUF
max_buf_len = 1 # For reasons which I have not debugged completely, this hangs sometimes when set > 1
bufs = []
while True:
inputready, outputready, exceptready = select.select([ p1.stdout.fileno() ],[ p2.stdin.fileno() ],[])
for fd in inputready:
if fd == p1.stdout.fileno():
if len(bufs) < max_buf_len:
data = p1.stdout.read(read_size)
bufs.append(data)
for fd in outputready:
if fd == p2.stdin.fileno() and len(bufs) > 0:
data = bufs.pop(0)
p2.stdin.write(data)
p1.poll()
# If the first process is done and there is nothing more to write out
if p1.returncode != None and len(bufs) == 0:
# Again cleanup is tricky. We expect the second process to finish soon after its input is closed
p2.stdin.close()
p2.wait()
p1.wait()
break
结果:
PIPE_BUF = 4096
244140+0 records in
244140+0 records out
999997440 bytes (1.0 GB) copied, 3.13722 s, 319 MB/s
244133+0 records in
244133+0 records out
999968768 bytes (1.0 GB) copied, 3.13599 s, 319 MB/s
real 0m3.167s
user 0m2.719s
sys 0m2.373s
然而,这显著慢于上面的版本(即使读取/写入大小都设置为 4KB 进行公平比较)。我不太确定原因。
附言:后来补充:似乎可以忽略或超过 PIPE_BUF。这会导致 p2.stdin.write()
抛出 IOError 异常(errno=11,暂时不可用),大多数情况下是因为管道中有足够的空间写入一些东西,但少于我们请求的完整大小。将上面的代码改为 read_size = 64*1024
,并捕获并忽略该异常,运行速度达到 1.4GB/s。
直接使用管道
作为基准,使用 shell 版本的管道(在子进程中)运行的速度如何?让我们看看:
import subprocess
subprocess.call("dd if=/dev/zero bs=4k count=244140 | dd of=/dev/null bs=4k", shell=True)
结果:
244140+0 records in
244140+0 records out
244140+0 records in
244140+0 records out
999997440 bytes (1.0 GB) copied, 0.425261 s, 2.4 GB/s
999997440 bytes (1.0 GB) copied, 0.423687 s, 2.4 GB/s
real 0m0.466s
user 0m0.300s
sys 0m0.590s
这显著快于使用线程的 Python 示例。然而,这只是一次复制,而线程的 Python 版本则进行了两次(进出 Python)。将命令修改为 "dd if=/dev/zero bs=4k count=244140 | dd bs=4k | dd of=/dev/null bs=4k"
,性能提升至 1.6GB,与 Python 示例相当。
如何在完整系统中进行比较
关于如何在完整系统中进行比较的一些额外想法。为了简单起见,这里只有两个进程,并且两个脚本都有完全相同的 convert_A_to_B()
函数。
脚本 1:在 Python 中传递数据,如上所述
A_process = subprocess.Popen(["A", ...
B_process = subprocess.Popen(["B", ...
convert_A_to_B_thread = threading.Thread(target=convert_A_to_B, ...
脚本 2:比较脚本,在 shell 中传递数据
convert_A_to_B(sys.stdin, sys.stdout)
在 shell 中运行:A | python script_2.py | B
这允许在完整系统中进行公平比较,而不使用模拟函数/进程。
块读取大小如何影响结果
对于这个测试,使用上面第一个(线程)示例中的代码,并将 dd
和 Python 脚本设置为使用相同的块大小进行读取/写入。
| Block size | Throughput |
|------------|------------|
| 1KB | 249MB/s |
| 2KB | 416MB/s |
| 4KB | 552MB/s |
| 8KB | 1.4GB/s |
| 16KB | 1.8GB/s |
| 32KB | 2.9GB/s |
| 64KB | 3.0GB/s |
| 128KB | 1.0GB/s |
| 256KB | 600MB/s |
理论上,使用更大的缓冲区应该会有更好的性能(可能会受到缓存效应的影响),但实际上 Linux 管道在使用非常大的缓冲区时会变慢,即使是使用纯 shell 管道。
我觉得你可能被cProfile的工作方式给误导了。比如,这里有一个简单的脚本,它使用了两个线程:
#!/usr/bin/python
import threading
import time
def f():
time.sleep(10)
def main():
t = threading.Thread(target=f)
t.start()
t.join()
如果我用cProfile来测试这个脚本,得到的结果是:
>>> import test
>>> import cProfile
>>> cProfile.run('test.main()')
60 function calls in 10.011 seconds
Ordered by: standard name
ncalls tottime percall cumtime percall filename:lineno(function)
1 0.000 0.000 10.011 10.011 <string>:1(<module>)
1 0.000 0.000 10.011 10.011 test.py:10(main)
1 0.000 0.000 0.000 0.000 threading.py:1008(daemon)
2 0.000 0.000 0.000 0.000 threading.py:1152(currentThread)
2 0.000 0.000 0.000 0.000 threading.py:241(Condition)
2 0.000 0.000 0.000 0.000 threading.py:259(__init__)
2 0.000 0.000 0.000 0.000 threading.py:293(_release_save)
2 0.000 0.000 0.000 0.000 threading.py:296(_acquire_restore)
2 0.000 0.000 0.000 0.000 threading.py:299(_is_owned)
2 0.000 0.000 10.011 5.005 threading.py:308(wait)
1 0.000 0.000 0.000 0.000 threading.py:541(Event)
1 0.000 0.000 0.000 0.000 threading.py:560(__init__)
2 0.000 0.000 0.000 0.000 threading.py:569(isSet)
4 0.000 0.000 0.000 0.000 threading.py:58(__init__)
1 0.000 0.000 0.000 0.000 threading.py:602(wait)
1 0.000 0.000 0.000 0.000 threading.py:627(_newname)
5 0.000 0.000 0.000 0.000 threading.py:63(_note)
1 0.000 0.000 0.000 0.000 threading.py:656(__init__)
1 0.000 0.000 0.000 0.000 threading.py:709(_set_daemon)
1 0.000 0.000 0.000 0.000 threading.py:726(start)
1 0.000 0.000 10.010 10.010 threading.py:911(join)
10 10.010 1.001 10.010 1.001 {method 'acquire' of 'thread.lock' objects}
2 0.000 0.000 0.000 0.000 {method 'append' of 'list' objects}
1 0.000 0.000 0.000 0.000 {method 'disable' of '_lsprof.Profiler' objects}
4 0.000 0.000 0.000 0.000 {method 'release' of 'thread.lock' objects}
4 0.000 0.000 0.000 0.000 {thread.allocate_lock}
2 0.000 0.000 0.000 0.000 {thread.get_ident}
1 0.000 0.000 0.000 0.000 {thread.start_new_thread}
你可以看到,几乎所有的时间都花在获取锁上。当然,我们知道这并不准确地反映脚本实际在做什么。实际上,所有的时间都是在f()
函数里的time.sleep
调用上消耗的。acquire
调用的高tottime
只是因为join
在等待f
完成,这意味着它必须坐着等着获取锁。然而,cProfile根本没有显示f
里花费的时间。由于示例代码非常简单,我们可以清楚地看到实际发生了什么,但在更复杂的程序中,这样的输出会非常误导。
你可以通过使用其他的性能分析库,比如yappi,来获得更可靠的结果:
>>> import test
>>> import yappi
>>> yappi.set_clock_type("wall")
>>> yappi.start()
>>> test.main()
>>> yappi.get_func_stats().print_all()
Clock type: wall
Ordered by: totaltime, desc
name #n tsub ttot tavg
<stdin>:1 <module> 2/1 0.000025 10.00801 5.004003
test.py:10 main 1 0.000060 10.00798 10.00798
..2.7/threading.py:308 _Condition.wait 2 0.000188 10.00746 5.003731
..thon2.7/threading.py:911 Thread.join 1 0.000039 10.00706 10.00706
..ython2.7/threading.py:752 Thread.run 1 0.000024 10.00682 10.00682
test.py:6 f 1 0.000013 10.00680 10.00680
..hon2.7/threading.py:726 Thread.start 1 0.000045 0.000608 0.000608
..thon2.7/threading.py:602 _Event.wait 1 0.000029 0.000484 0.000484
..2.7/threading.py:656 Thread.__init__ 1 0.000064 0.000250 0.000250
..on2.7/threading.py:866 Thread.__stop 1 0.000025 0.000121 0.000121
..lib/python2.7/threading.py:541 Event 1 0.000011 0.000101 0.000101
..python2.7/threading.py:241 Condition 2 0.000025 0.000094 0.000047
..hreading.py:399 _Condition.notifyAll 1 0.000020 0.000090 0.000090
..2.7/threading.py:560 _Event.__init__ 1 0.000018 0.000090 0.000090
..thon2.7/encodings/utf_8.py:15 decode 2 0.000031 0.000071 0.000035
..threading.py:259 _Condition.__init__ 2 0.000064 0.000069 0.000034
..7/threading.py:372 _Condition.notify 1 0.000034 0.000068 0.000068
..hreading.py:299 _Condition._is_owned 3 0.000017 0.000040 0.000013
../threading.py:709 Thread._set_daemon 1 0.000018 0.000035 0.000035
..ding.py:293 _Condition._release_save 2 0.000019 0.000033 0.000016
..thon2.7/threading.py:63 Thread._note 7 0.000020 0.000020 0.000003
..n2.7/threading.py:1152 currentThread 2 0.000015 0.000019 0.000009
..g.py:296 _Condition._acquire_restore 2 0.000011 0.000017 0.000008
../python2.7/threading.py:627 _newname 1 0.000014 0.000014 0.000014
..n2.7/threading.py:58 Thread.__init__ 4 0.000013 0.000013 0.000003
..threading.py:1008 _MainThread.daemon 1 0.000004 0.000004 0.000004
..hon2.7/threading.py:569 _Event.isSet 2 0.000003 0.000003 0.000002
使用yappi
,你会更容易看到时间是花在f
上的。
我怀疑你会发现,实际上你脚本的大部分时间都是在produceA
、produceB
和produceC
里做工作的。
你在调用subprocess.Popen()的时候,默认的缓冲区大小是0,这意味着它会使用无缓冲的输入输出。你可以试着设置一个合适的缓冲区大小,比如4K、16K,甚至1M,看看这样会不会有变化。