blocks - 将输入发送到Python子进程管道
我正在用Python测试子进程的管道。我知道下面的程序可以直接用Python实现,但我并不是这个意思。我只是想测试这个管道,以便了解如何使用它。
我的系统是Linux Ubuntu 9.04,默认的Python版本是2.6。
我开始时参考了这个文档示例。
from subprocess import Popen, PIPE
p1 = Popen(["grep", "-v", "not"], stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
output = p2.communicate()[0]
print output
这个示例可以运行,但因为p1
的stdin
没有被重定向,所以我必须在终端输入内容来填充管道。当我输入^D
来关闭stdin时,我得到了想要的输出。
不过,我想用一个Python字符串变量来发送数据到管道。首先我尝试在stdin上写入:
p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
p1.stdin.write('test\n')
output = p2.communicate()[0] # blocks forever here
结果不行。我尝试在最后一行用p2.stdout.read()
,但它也会阻塞。我加了p1.stdin.flush()
和p1.stdin.close()
,但也没用。然后我转向使用通信:
p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
p1.communicate('test\n') # blocks forever here
output = p2.communicate()[0]
结果还是不行。
我注意到运行一个单独的进程(比如上面的p1
,去掉p2
)是完全正常的。而且把文件句柄传给p1
(stdin=open(...)
)也可以。所以问题是:
在Python中,是否可以在不阻塞的情况下,将数据传递给两个或更多子进程的管道?为什么不可以?
我知道我可以运行一个shell并在其中运行管道,但这不是我想要的。
更新 1:根据下面Aaron Digulla的提示,我现在尝试使用线程来让它工作。
我首先尝试在一个线程中运行p1.communicate
。
p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
t = threading.Thread(target=p1.communicate, args=('some data\n',))
t.start()
output = p2.communicate()[0] # blocks forever here
结果不行。我尝试了其他组合,比如改成.write()
和p2.read()
。都没用。现在我们试试相反的方法:
def get_output(subp):
output = subp.communicate()[0] # blocks on thread
print 'GOT:', output
p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
t = threading.Thread(target=get_output, args=(p2,))
t.start()
p1.communicate('data\n') # blocks here.
t.join()
代码在某个地方最终会阻塞。可能是在新创建的线程中,也可能是在主线程中,或者两者都有。所以没成功。如果你知道怎么让它工作,提供一些可用的代码会更简单。我在这里尝试。
更新 2
Paul Du Bois在下面回答了一些信息,所以我进行了更多测试。我阅读了整个subprocess.py
模块,了解了它的工作原理。所以我尝试将其准确应用到代码中。
我在Linux上,但由于我在测试线程,我的第一步是复制subprocess.py
中communicate()
方法的确切Windows线程代码,但用于两个进程而不是一个。以下是我尝试的完整代码:
import os
from subprocess import Popen, PIPE
import threading
def get_output(fobj, buffer):
while True:
chunk = fobj.read() # BLOCKS HERE
if not chunk:
break
buffer.append(chunk)
p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
b = [] # create a buffer
t = threading.Thread(target=get_output, args=(p2.stdout, b))
t.start() # start reading thread
for x in xrange(100000):
p1.stdin.write('hello world\n') # write data
p1.stdin.flush()
p1.stdin.close() # close input...
t.join()
结果是,这也不行。即使在调用了p1.stdin.close()
之后,p2.stdout.read()
仍然会阻塞。
然后我尝试了subprocess.py
中的posix代码:
import os
from subprocess import Popen, PIPE
import select
p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
numwrites = 100000
to_read = [p2.stdout]
to_write = [p1.stdin]
b = [] # create buffer
while to_read or to_write:
read_now, write_now, xlist = select.select(to_read, to_write, [])
if read_now:
data = os.read(p2.stdout.fileno(), 1024)
if not data:
p2.stdout.close()
to_read = []
else:
b.append(data)
if write_now:
if numwrites > 0:
numwrites -= 1
p1.stdin.write('hello world!\n'); p1.stdin.flush()
else:
p1.stdin.close()
to_write = []
print b
在select.select()
上也会阻塞。通过在代码中添加print
,我发现了以下几点:
- 读取是有效的。代码在执行过程中多次读取。
- 写入也是有效的。数据被写入到
p1.stdin
。 - 在
numwrites
结束时,调用了p1.stdin.close()
。 - 当
select()
开始阻塞时,只有to_read
有内容,即p2.stdout
。to_write
已经为空。 os.read()
调用总是返回一些内容,所以p2.stdout.close()
从未被调用。
从这两个测试的结论:关闭管道中第一个进程的stdin
(示例中的grep
)并没有使其将缓冲的输出转发到下一个进程并结束。
没有办法让它工作吗?
附注:我不想使用临时文件,我已经测试过文件,知道它可以工作。我也不想使用Windows。
11 个回答
让管道正常工作的主要有三个小窍门:
确保管道的每一端都在不同的线程或进程中使用(有些例子在这方面有问题)。
在每个进程中明确关闭未使用的管道一端。
处理缓冲问题,可以选择禁用缓冲(使用Python的-u选项)、使用伪终端,或者用一些不会影响数据的东西填满缓冲区(比如'\n',或者其他合适的字符)。
Python的“管道”模块中的例子(我是这个模块的作者)正好符合你的场景,并且把底层步骤讲得比较清楚。
http://pypi.python.org/pypi/pipeline/
最近,我使用了子进程模块,作为生产者-处理者-消费者-控制器模式的一部分:
http://www.darkarchive.org/w/Pub/PythonInteract
这个例子处理了带缓冲的标准输入,而没有使用伪终端,并且还说明了在哪些地方应该关闭管道的端口。我更喜欢使用进程而不是线程,但原理是一样的。此外,它还展示了如何同步队列,以便生产者可以输入数据,消费者可以收集输出,以及如何干净地关闭它们(注意队列中插入的哨兵)。这个模式允许根据最近的输出生成新的输入,从而实现递归发现和处理。
处理大文件
在用Python处理大文件时,有两个原则需要始终遵循。
- 因为任何输入输出操作都有可能会阻塞,所以我们必须把每个处理阶段放在不同的线程或进程中。在这个例子中我们使用线程,但如果用子进程的话,可以避免全局解释器锁(GIL)的问题。
- 我们必须使用增量读取和写入,这样就不会等到文件结束符(
EOF
)才开始处理。
另一种选择是使用非阻塞输入输出,不过在标准Python中这比较麻烦。可以看看gevent,这是一个轻量级的线程库,使用非阻塞原语实现同步输入输出API。
示例代码
我们将构建一个简单的处理流程,大致如下:
{cat /usr/share/dict/words} | grep -v not \
| {upcase, filtered tee to stderr} | cut -c 1-10 \
| {translate 'E' to '3'} | grep K | grep Z | {downcase}
其中大括号{}
里的每个阶段都是用Python实现的,而其他的则使用标准外部程序。总结:可以查看这个链接。
我们先导入需要的库。
#!/usr/bin/env python
from subprocess import Popen, PIPE
import sys, threading
处理流程中的Python阶段
除了最后一个用Python实现的阶段,其他所有阶段都需要放在一个线程里,这样它们的输入输出操作就不会阻塞其他阶段。如果你希望它们真正并行运行,可以选择用Python的子进程。
def writer(output):
for line in open('/usr/share/dict/words'):
output.write(line)
output.close()
def filter(input, output):
for line in input:
if 'k' in line and 'z' in line: # Selective 'tee'
sys.stderr.write('### ' + line)
output.write(line.upper())
output.close()
def leeter(input, output):
for line in input:
output.write(line.replace('E', '3'))
output.close()
每个阶段都需要放在自己的线程中,我们将使用一个方便的函数来实现。
def spawn(func, **kwargs):
t = threading.Thread(target=func, kwargs=kwargs)
t.start()
return t
创建处理流程
使用Popen
创建外部阶段,使用spawn
创建Python阶段。参数bufsize=-1
表示使用系统默认的缓冲(通常是4KB)。这通常比默认的(无缓冲)或行缓冲要快,但如果你想实时监控输出,可能需要使用行缓冲。
grepv = Popen(['grep','-v','not'], stdin=PIPE, stdout=PIPE, bufsize=-1)
cut = Popen(['cut','-c','1-10'], stdin=PIPE, stdout=PIPE, bufsize=-1)
grepk = Popen(['grep', 'K'], stdin=PIPE, stdout=PIPE, bufsize=-1)
grepz = Popen(['grep', 'Z'], stdin=grepk.stdout, stdout=PIPE, bufsize=-1)
twriter = spawn(writer, output=grepv.stdin)
tfilter = spawn(filter, input=grepv.stdout, output=cut.stdin)
tleeter = spawn(leeter, input=cut.stdout, output=grepk.stdin)
驱动处理流程
按照上面的方式组装后,处理流程中的所有缓冲区都会被填满,但由于没有人从最后一个阶段(grepz.stdout
)读取数据,它们都会阻塞。我们可以通过一次调用grepz.stdout.read()
来读取整个内容,但这样对于大文件会消耗很多内存。因此,我们选择增量读取。
for line in grepz.stdout:
sys.stdout.write(line.lower())
线程和进程在到达EOF
后会自动清理。我们也可以通过以下方式手动清理:
for t in [twriter, tfilter, tleeter]: t.join()
for p in [grepv, cut, grepk, grepz]: p.wait()
Python-2.6及之前版本
在内部,subprocess.Popen
会调用fork
,配置管道文件描述符,然后调用exec
。通过fork
创建的子进程会复制父进程中的所有文件描述符,这两个副本都需要关闭,才能让对应的读取器接收到EOF
。可以通过手动关闭管道(使用close_fds=True
或合适的preexec_fn
参数传给subprocess.Popen
)来解决这个问题,或者通过设置FD_CLOEXEC
标志,让exec
自动关闭文件描述符。这个标志在Python-2.7及以后的版本中会自动设置,具体可以查看issue12786。我们可以通过调用
p._set_cloexec_flags(p.stdin)
在将p.stdin
作为参数传给后续的subprocess.Popen
之前,来在早期版本的Python中获得Python-2.7的行为。
我找到了怎么做的方法。
这跟线程无关,也跟select()无关。
当我运行第一个进程(grep
)时,它会创建两个底层的文件描述符,每个管道一个。我们把它们叫做a
和b
。
当我运行第二个进程时,b
会传给cut
的标准输入(stdin
)。但是在Popen
中有一个很傻的默认设置 - close_fds=False
。
这个设置的结果是,cut
也会继承a
。所以即使我关闭了a
,grep
也无法结束,因为cut
的进程中的标准输入仍然是打开的(cut
会忽略它)。
下面的代码现在运行得很好。
from subprocess import Popen, PIPE
p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE, close_fds=True)
p1.stdin.write('Hello World\n')
p1.stdin.close()
result = p2.stdout.read()
assert result == "Hello Worl\n"
close_fds=True
应该是 unix 系统的默认设置。在 Windows 上,它会关闭所有的文件描述符,所以会阻止管道的使用。
编辑:
PS:对于遇到类似问题的人:正如 pooryorick 在评论中提到的,如果写入p1.stdin
的数据大于缓冲区的大小,也可能会阻塞。在这种情况下,你应该把数据分成小块,并使用select.select()
来判断何时读取/写入。问题中的代码应该能给你一些实现的提示。
编辑2:找到了另一种解决方案,得到了 pooryorick 的更多帮助 - 不用close_fds=True
来关闭所有的文件描述符,而是在执行第二个进程时关闭属于第一个进程的fd
,这样也能正常工作。关闭操作必须在子进程中进行,所以Popen
的preexec_fn
函数非常适合用来做到这一点。在执行p2
时,你可以这样做:
p2 = Popen(cmd2, stdin=p1.stdout, stdout=PIPE, stderr=devnull, preexec_fn=p1.stdin.close)