如何使用Python将stdin/stdout传递给Perl脚本
这段Python代码可以顺利地把数据传给Perl脚本。
import subprocess
kw = {}
kw['executable'] = None
kw['shell'] = True
kw['stdin'] = None
kw['stdout'] = subprocess.PIPE
kw['stderr'] = subprocess.PIPE
args = ' '.join(['/usr/bin/perl','-w','/path/script.perl','<','/path/mydata'])
subproc = subprocess.Popen(args,**kw)
for line in iter(subproc.stdout.readline, ''):
print line.rstrip().decode('UTF-8')
不过,它要求我先把数据保存到一个磁盘文件里(/path/mydata)。如果在Python代码中逐行处理数据,然后传给子进程,这样会更整洁,像这样:
import subprocess
kw = {}
kw['executable'] = '/usr/bin/perl'
kw['shell'] = False
kw['stderr'] = subprocess.PIPE
kw['stdin'] = subprocess.PIPE
kw['stdout'] = subprocess.PIPE
args = ['-w','/path/script.perl',]
subproc = subprocess.Popen(args,**kw)
f = codecs.open('/path/mydata','r','UTF-8')
for line in f:
subproc.stdin.write('%s\n'%(line.strip().encode('UTF-8')))
print line.strip() ### code hangs after printing this ###
for line in iter(subproc.stdout.readline, ''):
print line.rstrip().decode('UTF-8')
subproc.terminate()
f.close()
但是在发送第一行数据到子进程后,代码就卡住了,无法继续读取。我还有其他的可执行文件用同样的代码运行得很好。
我的数据文件可能会很大(1.5 GB),有没有办法在不保存到文件的情况下实现数据传输?我不想为了兼容其他系统而重写Perl脚本。
3 个回答
请注意手册中提到的关于使用 Popen.stdin
和 Popen.stdout
的警告(在Popen.stdin
之上):
警告: 使用
communicate()
,而不是.stdin.write
、.stdout.read
或.stderr.read
,以避免因为其他操作系统管道缓冲区满了而导致的死锁,这会阻塞子进程。
我明白一次性在内存中存储一个1.5GB的字符串并不是很理想,但使用communicate()
是一种可以正常工作的方式。而正如你所观察到的,一旦操作系统的管道缓冲区满了,使用 stdin.write()
+ stdout.read()
的方式可能会导致死锁。
你觉得使用 communicate()
可行吗?
你的代码在这一行卡住了:
for line in iter(subproc.stdout.readline, ''):
这是因为这个循环只能在到达文件末尾(EOF)时结束,而文件末尾会在子进程结束时出现。不过,你并不想等到进程结束,你只想等到它处理完发送给它的那一行。
另外,正如Chris Morgan已经指出的,你还遇到了缓冲的问题。另一个StackOverflow上的问题讨论了如何在子进程中进行非阻塞读取。我根据那个问题的代码,快速做了一个简单的修改,适应你的问题:
def enqueue_output(out, queue):
for line in iter(out.readline, ''):
queue.put(line)
out.close()
kw = {}
kw['executable'] = '/usr/bin/perl'
kw['shell'] = False
kw['stderr'] = subprocess.PIPE
kw['stdin'] = subprocess.PIPE
kw['stdout'] = subprocess.PIPE
args = ['-w','/path/script.perl',]
subproc = subprocess.Popen(args, **kw)
f = codecs.open('/path/mydata','r','UTF-8')
q = Queue.Queue()
t = threading.Thread(target = enqueue_output, args = (subproc.stdout, q))
t.daemon = True
t.start()
for line in f:
subproc.stdin.write('%s\n'%(line.strip().encode('UTF-8')))
print "Sent:", line.strip() ### code hangs after printing this ###
try:
line = q.get_nowait()
except Queue.Empty:
pass
else:
print "Received:", line.rstrip().decode('UTF-8')
subproc.terminate()
f.close()
你很可能需要对这段代码进行一些修改,但至少它不会再卡住了。
谢谢你,srgerg。我之前也尝试过使用线程的方法。不过,这种方法单独使用时总是会卡住。我的旧代码和srgerg的代码都缺少了最终的解决方案,你的建议让我有了最后的灵感。
最终的解决方案是写入足够的虚拟数据,以强制从缓冲区中输出最后有效的行。为此,我添加了代码来跟踪写入到标准输入的有效行数。线程循环会打开输出文件,保存数据,并在读取的行数等于有效输入行数时停止。这种方法确保无论文件大小如何,都能逐行读取和写入。
def std_output(stdout,outfile=''):
out = 0
f = codecs.open(outfile,'w','UTF-8')
for line in iter(stdout.readline, ''):
f.write('%s\n'%(line.rstrip().decode('UTF-8')))
out += 1
if i == out: break
stdout.close()
f.close()
outfile = '/path/myout'
infile = '/path/mydata'
subproc = subprocess.Popen(args,**kw)
t = threading.Thread(target=std_output,args=[subproc.stdout,outfile])
t.daemon = True
t.start()
i = 0
f = codecs.open(infile,'r','UTF-8')
for line in f:
subproc.stdin.write('%s\n'%(line.strip().encode('UTF-8')))
i += 1
subproc.stdin.write('%s\n'%(' '*4096)) ### push dummy data ###
f.close()
t.join()
subproc.terminate()