如何使用Python将stdin/stdout传递给Perl脚本

3 投票
3 回答
1310 浏览
提问于 2025-04-17 09:22

这段Python代码可以顺利地把数据传给Perl脚本。

import subprocess
kw = {}
kw['executable'] = None
kw['shell'] = True
kw['stdin'] = None
kw['stdout'] = subprocess.PIPE
kw['stderr'] = subprocess.PIPE
args = ' '.join(['/usr/bin/perl','-w','/path/script.perl','<','/path/mydata'])
subproc = subprocess.Popen(args,**kw)
for line in iter(subproc.stdout.readline, ''):
    print line.rstrip().decode('UTF-8')

不过,它要求我先把数据保存到一个磁盘文件里(/path/mydata)。如果在Python代码中逐行处理数据,然后传给子进程,这样会更整洁,像这样:

import subprocess
kw = {}
kw['executable'] = '/usr/bin/perl'
kw['shell'] = False
kw['stderr'] = subprocess.PIPE
kw['stdin'] = subprocess.PIPE
kw['stdout'] = subprocess.PIPE
args = ['-w','/path/script.perl',]
subproc = subprocess.Popen(args,**kw)
f = codecs.open('/path/mydata','r','UTF-8')
for line in f:
    subproc.stdin.write('%s\n'%(line.strip().encode('UTF-8')))
    print line.strip()  ### code hangs after printing this ###
    for line in iter(subproc.stdout.readline, ''):
        print line.rstrip().decode('UTF-8')
subproc.terminate()
f.close()

但是在发送第一行数据到子进程后,代码就卡住了,无法继续读取。我还有其他的可执行文件用同样的代码运行得很好。

我的数据文件可能会很大(1.5 GB),有没有办法在不保存到文件的情况下实现数据传输?我不想为了兼容其他系统而重写Perl脚本。

3 个回答

0

请注意手册中提到的关于使用 Popen.stdinPopen.stdout 的警告(在Popen.stdin 之上):

警告: 使用communicate(),而不是.stdin.write.stdout.read.stderr.read,以避免因为其他操作系统管道缓冲区满了而导致的死锁,这会阻塞子进程。

我明白一次性在内存中存储一个1.5GB的字符串并不是很理想,但使用communicate()是一种可以正常工作的方式。而正如你所观察到的,一旦操作系统的管道缓冲区满了,使用 stdin.write() + stdout.read() 的方式可能会导致死锁。

你觉得使用 communicate() 可行吗?

1

你的代码在这一行卡住了:

for line in iter(subproc.stdout.readline, ''):

这是因为这个循环只能在到达文件末尾(EOF)时结束,而文件末尾会在子进程结束时出现。不过,你并不想等到进程结束,你只想等到它处理完发送给它的那一行。

另外,正如Chris Morgan已经指出的,你还遇到了缓冲的问题。另一个StackOverflow上的问题讨论了如何在子进程中进行非阻塞读取。我根据那个问题的代码,快速做了一个简单的修改,适应你的问题:

def enqueue_output(out, queue):
    for line in iter(out.readline, ''):
        queue.put(line)
    out.close()

kw = {}
kw['executable'] = '/usr/bin/perl'
kw['shell'] = False
kw['stderr'] = subprocess.PIPE
kw['stdin'] = subprocess.PIPE
kw['stdout'] = subprocess.PIPE
args = ['-w','/path/script.perl',]
subproc = subprocess.Popen(args, **kw)
f = codecs.open('/path/mydata','r','UTF-8')
q = Queue.Queue()
t = threading.Thread(target = enqueue_output, args = (subproc.stdout, q))
t.daemon = True
t.start()
for line in f:
    subproc.stdin.write('%s\n'%(line.strip().encode('UTF-8')))
    print "Sent:", line.strip()  ### code hangs after printing this ###
    try:
        line = q.get_nowait()
    except Queue.Empty:
        pass
    else:
        print "Received:", line.rstrip().decode('UTF-8')

subproc.terminate()
f.close()

你很可能需要对这段代码进行一些修改,但至少它不会再卡住了。

1

谢谢你,srgerg。我之前也尝试过使用线程的方法。不过,这种方法单独使用时总是会卡住。我的旧代码和srgerg的代码都缺少了最终的解决方案,你的建议让我有了最后的灵感。

最终的解决方案是写入足够的虚拟数据,以强制从缓冲区中输出最后有效的行。为此,我添加了代码来跟踪写入到标准输入的有效行数。线程循环会打开输出文件,保存数据,并在读取的行数等于有效输入行数时停止。这种方法确保无论文件大小如何,都能逐行读取和写入。

def std_output(stdout,outfile=''):
    out = 0
    f = codecs.open(outfile,'w','UTF-8')
    for line in iter(stdout.readline, ''):
        f.write('%s\n'%(line.rstrip().decode('UTF-8')))
        out += 1
        if i == out: break
    stdout.close()
    f.close()

outfile = '/path/myout'
infile = '/path/mydata'

subproc = subprocess.Popen(args,**kw)
t = threading.Thread(target=std_output,args=[subproc.stdout,outfile])
t.daemon = True
t.start()

i = 0
f = codecs.open(infile,'r','UTF-8')
for line in f:
    subproc.stdin.write('%s\n'%(line.strip().encode('UTF-8')))
    i += 1
subproc.stdin.write('%s\n'%(' '*4096)) ### push dummy data ###
f.close()
t.join()
subproc.terminate()

撰写回答