blocks - 将输入发送到Python子进程管道

34 投票
11 回答
26009 浏览
提问于 2025-04-15 15:13

我正在用Python测试子进程的管道。我知道下面的程序可以直接用Python实现,但我并不是这个意思。我只是想测试这个管道,以便了解如何使用它。

我的系统是Linux Ubuntu 9.04,默认的Python版本是2.6。

我开始时参考了这个文档示例

from subprocess import Popen, PIPE
p1 = Popen(["grep", "-v", "not"], stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
output = p2.communicate()[0]
print output

这个示例可以运行,但因为p1stdin没有被重定向,所以我必须在终端输入内容来填充管道。当我输入^D来关闭stdin时,我得到了想要的输出。

不过,我想用一个Python字符串变量来发送数据到管道。首先我尝试在stdin上写入:

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
p1.stdin.write('test\n')
output = p2.communicate()[0] # blocks forever here

结果不行。我尝试在最后一行用p2.stdout.read(),但它也会阻塞。我加了p1.stdin.flush()p1.stdin.close(),但也没用。然后我转向使用通信:

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
p1.communicate('test\n') # blocks forever here
output = p2.communicate()[0] 

结果还是不行。

我注意到运行一个单独的进程(比如上面的p1,去掉p2)是完全正常的。而且把文件句柄传给p1stdin=open(...))也可以。所以问题是:

在Python中,是否可以在不阻塞的情况下,将数据传递给两个或更多子进程的管道?为什么不可以?

我知道我可以运行一个shell并在其中运行管道,但这不是我想要的。


更新 1:根据下面Aaron Digulla的提示,我现在尝试使用线程来让它工作。

我首先尝试在一个线程中运行p1.communicate

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
t = threading.Thread(target=p1.communicate, args=('some data\n',))
t.start()
output = p2.communicate()[0] # blocks forever here

结果不行。我尝试了其他组合,比如改成.write()p2.read()。都没用。现在我们试试相反的方法:

def get_output(subp):
    output = subp.communicate()[0] # blocks on thread
    print 'GOT:', output

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
t = threading.Thread(target=get_output, args=(p2,)) 
t.start()
p1.communicate('data\n') # blocks here.
t.join()

代码在某个地方最终会阻塞。可能是在新创建的线程中,也可能是在主线程中,或者两者都有。所以没成功。如果你知道怎么让它工作,提供一些可用的代码会更简单。我在这里尝试。


更新 2

Paul Du Bois在下面回答了一些信息,所以我进行了更多测试。我阅读了整个subprocess.py模块,了解了它的工作原理。所以我尝试将其准确应用到代码中。

我在Linux上,但由于我在测试线程,我的第一步是复制subprocess.pycommunicate()方法的确切Windows线程代码,但用于两个进程而不是一个。以下是我尝试的完整代码:

import os
from subprocess import Popen, PIPE
import threading

def get_output(fobj, buffer):
    while True:
        chunk = fobj.read() # BLOCKS HERE
        if not chunk:
            break
        buffer.append(chunk)

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)

b = [] # create a buffer
t = threading.Thread(target=get_output, args=(p2.stdout, b))
t.start() # start reading thread

for x in xrange(100000):
    p1.stdin.write('hello world\n') # write data
    p1.stdin.flush()
p1.stdin.close() # close input...
t.join()

结果是,这也不行。即使在调用了p1.stdin.close()之后,p2.stdout.read()仍然会阻塞。

然后我尝试了subprocess.py中的posix代码:

import os
from subprocess import Popen, PIPE
import select

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)

numwrites = 100000
to_read = [p2.stdout]
to_write = [p1.stdin]
b = [] # create buffer

while to_read or to_write:
    read_now, write_now, xlist = select.select(to_read, to_write, [])
    if read_now:
        data = os.read(p2.stdout.fileno(), 1024)
        if not data:
            p2.stdout.close()
            to_read = []
        else:
            b.append(data)

    if write_now:
        if numwrites > 0:
            numwrites -= 1
            p1.stdin.write('hello world!\n'); p1.stdin.flush()
        else:
            p1.stdin.close()
            to_write = []

print b

select.select()上也会阻塞。通过在代码中添加print,我发现了以下几点:

  • 读取是有效的。代码在执行过程中多次读取。
  • 写入也是有效的。数据被写入到p1.stdin
  • numwrites结束时,调用了p1.stdin.close()
  • select()开始阻塞时,只有to_read有内容,即p2.stdoutto_write已经为空。
  • os.read()调用总是返回一些内容,所以p2.stdout.close()从未被调用。

从这两个测试的结论:关闭管道中第一个进程的stdin(示例中的grep)并没有使其将缓冲的输出转发到下一个进程并结束。

没有办法让它工作吗?

附注:我不想使用临时文件,我已经测试过文件,知道它可以工作。我也不想使用Windows。

11 个回答

3

让管道正常工作的主要有三个小窍门:

  1. 确保管道的每一端都在不同的线程或进程中使用(有些例子在这方面有问题)。

  2. 在每个进程中明确关闭未使用的管道一端。

  3. 处理缓冲问题,可以选择禁用缓冲(使用Python的-u选项)、使用伪终端,或者用一些不会影响数据的东西填满缓冲区(比如'\n',或者其他合适的字符)。

Python的“管道”模块中的例子(我是这个模块的作者)正好符合你的场景,并且把底层步骤讲得比较清楚。

http://pypi.python.org/pypi/pipeline/

最近,我使用了子进程模块,作为生产者-处理者-消费者-控制器模式的一部分:

http://www.darkarchive.org/w/Pub/PythonInteract

这个例子处理了带缓冲的标准输入,而没有使用伪终端,并且还说明了在哪些地方应该关闭管道的端口。我更喜欢使用进程而不是线程,但原理是一样的。此外,它还展示了如何同步队列,以便生产者可以输入数据,消费者可以收集输出,以及如何干净地关闭它们(注意队列中插入的哨兵)。这个模式允许根据最近的输出生成新的输入,从而实现递归发现和处理。

7

处理大文件

在用Python处理大文件时,有两个原则需要始终遵循。

  1. 因为任何输入输出操作都有可能会阻塞,所以我们必须把每个处理阶段放在不同的线程或进程中。在这个例子中我们使用线程,但如果用子进程的话,可以避免全局解释器锁(GIL)的问题。
  2. 我们必须使用增量读取和写入,这样就不会等到文件结束符(EOF)才开始处理。

另一种选择是使用非阻塞输入输出,不过在标准Python中这比较麻烦。可以看看gevent,这是一个轻量级的线程库,使用非阻塞原语实现同步输入输出API。

示例代码

我们将构建一个简单的处理流程,大致如下:

{cat /usr/share/dict/words} | grep -v not              \
    | {upcase, filtered tee to stderr} | cut -c 1-10   \
    | {translate 'E' to '3'} | grep K | grep Z | {downcase}

其中大括号{}里的每个阶段都是用Python实现的,而其他的则使用标准外部程序。总结:可以查看这个链接

我们先导入需要的库。

#!/usr/bin/env python
from subprocess import Popen, PIPE
import sys, threading

处理流程中的Python阶段

除了最后一个用Python实现的阶段,其他所有阶段都需要放在一个线程里,这样它们的输入输出操作就不会阻塞其他阶段。如果你希望它们真正并行运行,可以选择用Python的子进程。

def writer(output):
    for line in open('/usr/share/dict/words'):
        output.write(line)
    output.close()
def filter(input, output):
    for line in input:
        if 'k' in line and 'z' in line: # Selective 'tee'
            sys.stderr.write('### ' + line)
        output.write(line.upper())
    output.close()
def leeter(input, output):
    for line in input:
        output.write(line.replace('E', '3'))
    output.close()

每个阶段都需要放在自己的线程中,我们将使用一个方便的函数来实现。

def spawn(func, **kwargs):
    t = threading.Thread(target=func, kwargs=kwargs)
    t.start()
    return t

创建处理流程

使用Popen创建外部阶段,使用spawn创建Python阶段。参数bufsize=-1表示使用系统默认的缓冲(通常是4KB)。这通常比默认的(无缓冲)或行缓冲要快,但如果你想实时监控输出,可能需要使用行缓冲。

grepv   = Popen(['grep','-v','not'], stdin=PIPE, stdout=PIPE, bufsize=-1)
cut     = Popen(['cut','-c','1-10'], stdin=PIPE, stdout=PIPE, bufsize=-1)
grepk = Popen(['grep', 'K'], stdin=PIPE, stdout=PIPE, bufsize=-1)
grepz = Popen(['grep', 'Z'], stdin=grepk.stdout, stdout=PIPE, bufsize=-1)

twriter = spawn(writer, output=grepv.stdin)
tfilter = spawn(filter, input=grepv.stdout, output=cut.stdin)
tleeter = spawn(leeter, input=cut.stdout, output=grepk.stdin)

驱动处理流程

按照上面的方式组装后,处理流程中的所有缓冲区都会被填满,但由于没有人从最后一个阶段(grepz.stdout)读取数据,它们都会阻塞。我们可以通过一次调用grepz.stdout.read()来读取整个内容,但这样对于大文件会消耗很多内存。因此,我们选择增量读取

for line in grepz.stdout:
    sys.stdout.write(line.lower())

线程和进程在到达EOF后会自动清理。我们也可以通过以下方式手动清理:

for t in [twriter, tfilter, tleeter]: t.join()
for p in [grepv, cut, grepk, grepz]: p.wait()

Python-2.6及之前版本

在内部,subprocess.Popen会调用fork,配置管道文件描述符,然后调用exec。通过fork创建的子进程会复制父进程中的所有文件描述符,这两个副本都需要关闭,才能让对应的读取器接收到EOF。可以通过手动关闭管道(使用close_fds=True或合适的preexec_fn参数传给subprocess.Popen)来解决这个问题,或者通过设置FD_CLOEXEC标志,让exec自动关闭文件描述符。这个标志在Python-2.7及以后的版本中会自动设置,具体可以查看issue12786。我们可以通过调用

p._set_cloexec_flags(p.stdin)

在将p.stdin作为参数传给后续的subprocess.Popen之前,来在早期版本的Python中获得Python-2.7的行为。

23

我找到了怎么做的方法。

这跟线程无关,也跟select()无关。

当我运行第一个进程(grep)时,它会创建两个底层的文件描述符,每个管道一个。我们把它们叫做ab

当我运行第二个进程时,b会传给cut的标准输入(stdin)。但是在Popen中有一个很傻的默认设置 - close_fds=False

这个设置的结果是,cut也会继承a。所以即使我关闭了agrep也无法结束,因为cut的进程中的标准输入仍然是打开的(cut会忽略它)。

下面的代码现在运行得很好。

from subprocess import Popen, PIPE

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE, close_fds=True)
p1.stdin.write('Hello World\n')
p1.stdin.close()
result = p2.stdout.read() 
assert result == "Hello Worl\n"

close_fds=True 应该是 unix 系统的默认设置。在 Windows 上,它会关闭所有的文件描述符,所以会阻止管道的使用。

编辑:

PS:对于遇到类似问题的人:正如 pooryorick 在评论中提到的,如果写入p1.stdin的数据大于缓冲区的大小,也可能会阻塞。在这种情况下,你应该把数据分成小块,并使用select.select()来判断何时读取/写入。问题中的代码应该能给你一些实现的提示。

编辑2:找到了另一种解决方案,得到了 pooryorick 的更多帮助 - 不用close_fds=True来关闭所有的文件描述符,而是在执行第二个进程时关闭属于第一个进程的fd,这样也能正常工作。关闭操作必须在子进程中进行,所以Popenpreexec_fn函数非常适合用来做到这一点。在执行p2时,你可以这样做:

p2 = Popen(cmd2, stdin=p1.stdout, stdout=PIPE, stderr=devnull, preexec_fn=p1.stdin.close)

撰写回答