从Python中反复写入进程的stdin并读取stdout

9 投票
5 回答
3990 浏览
提问于 2025-04-16 02:55

我有一段Fortran代码,它从标准输入(STDIN)读取一些数字,然后把结果写到标准输出(STDOUT)。举个例子:

do
  read (*,*) x
  y = x*x
  write (*,*) y
enddo

这样我就可以从命令行启动这个程序,并得到以下的输入/输出序列:

5.0
25.0
2.5
6.25

现在我想在Python中实现这个功能。在尝试了很多次使用subprocess.Popen并查看这个网站上的旧问题后,我决定使用pexpect.spawn:

import pexpect, os
p = pexpect.spawn('squarer')
p.setecho(False)
p.write("2.5" + os.linesep)
res = p.readline()

而且它确实有效。问题是,我需要在Python和我的Fortran程序之间传递的真实数据是一个包含100,000个(或更多)双精度浮点数的数组。如果这个数组叫做x,那么

p.write(' '.join(["%.10f"%k for k in x]) + os.linesep)

就会超时,并出现来自pexpect的以下错误信息:

buffer (last 100 chars):   
before (last 100 chars):   
after: <class 'pexpect.TIMEOUT'>  
match: None  
match_index: None  
exitstatus: None
flag_eof: False
pid: 8574
child_fd: 3
closed: False
timeout: 30
delimiter: <class 'pexpect.EOF'>
logfile: None
logfile_read: None
logfile_send: None
maxread: 2000
ignorecase: False
searchwindowsize: None
delaybeforesend: 0.05
delayafterclose: 0.1
delayafterterminate: 0.1

除非x的元素少于303个。有没有办法在另一个程序的标准输入/输出之间传递大量数据呢?

我尝试过把数据分成更小的块,但这样会导致速度损失很

提前谢谢你。

5 个回答

1

这里有个很简单的说法:把你的Python程序分成两个部分。

python source.py | squarer | python sink.py

squarer这个应用就相当于你的Fortran代码。它从标准输入读取数据,然后把结果写到标准输出。

你的source.py是用Python写的,它的功能是

import sys
sys.stdout.write(' '.join(["%.10f"%k for k in x]) + os.linesep)

或者,可能还有一点更简单的,比如说

from __future__ import print_function
print( ' '.join(["{0:.10f}".format(k) for k in x]) )

而你的sink.py可以是这样的。

import fileinput
for line in fileinput.input():
    # process the line 

把源代码、squarer和sink分开,这样你就有了三个独立的进程(而不是两个),这样可以利用更多的处理器核心。更多的核心意味着更多的并发处理,也就是更有趣。

2

这是一个示例的 squarer.py 程序(它正好是用 Python 写的,你可以用你的 Fortran 可执行文件代替):

#!/usr/bin/python
import sys
data= sys.stdin.readline() # expecting lots of data in one line
processed_data= data[-2::-1] # reverse without the newline
sys.stdout.write(processed_data+'\n')

这是一个示例的 target.py 程序:

import thread, Queue
import subprocess as sbp

class Companion(object):
    "A companion process manager"
    def __init__(self, cmdline):
        "Start the companion process"
        self.companion= sbp.Popen(
            cmdline, shell=False,
            stdin=sbp.PIPE,
            stdout=sbp.PIPE)
        self.putque= Queue.Queue()
        self.getque= Queue.Queue()
        thread.start_new_thread(self._sender, (self.putque,))
        thread.start_new_thread(self._receiver, (self.getque,))

    def _sender(self, que):
        "Actually sends the data to the companion process"
        while 1:
            datum= que.get()
            if datum is Ellipsis:
                break
            self.companion.stdin.write(datum)
            if not datum.endswith('\n'):
                self.companion.stdin.write('\n')

    def _receiver(self, que):
        "Actually receives data from the companion process"
        while 1:
            datum= self.companion.stdout.readline()
            que.put(datum)

    def close(self):
        self.putque.put(Ellipsis)

    def send(self, data):
        "Schedule a long line to be sent to the companion process"
        self.putque.put(data)

    def recv(self):
        "Get a long line of output from the companion process"
        return self.getque.get()

def main():
    my_data= '12345678 ' * 5000
    my_companion= Companion(("/usr/bin/python", "squarer.py"))

    my_companion.send(my_data)
    my_answer= my_companion.recv()
    print my_answer[:20] # don't print the long stuff
    # rinse, repeat

    my_companion.close()

if __name__ == "__main__":
    main()

main 函数里包含了你要用的代码:首先设置一个 Companion 对象,然后用 companion.send 发送一长串数据,再用 companion.recv 接收一行数据。根据需要重复这个过程。

6

我找到了一种使用子进程模块的方法,所以在这里分享一下,方便有需要的人参考。

import subprocess as sbp

class ExternalProg:

    def __init__(self, arg_list):
        self.opt = sbp.Popen(arg_list, stdin=sbp.PIPE, stdout=sbp.PIPE, shell=True, close_fds=True)

    def toString(self,x):
        return ' '.join(["%.12f"%k for k in x])

    def toFloat(self,x):
        return float64(x.strip().split())

    def sendString(self,string):
        if not string.endswith('\n'):
            string = string + '\n'
        self.opt.stdin.write(string)

    def sendArray(self,x):
        self.opt.stdin.write(self.toString(x)+'\n')

    def readInt(self):
        return int(self.opt.stdout.readline().strip())

    def sendScalar(self,x):
        if type(x) == int:
            self.opt.stdin.write("%i\n"%x)
        elif type(x) == float:
            self.opt.stdin.write("%.12f\n"%x)

    def readArray(self):
        return self.toFloat(self.opt.stdout.readline())

    def close(self):
        self.opt.kill()

这个类是通过一个叫做'optimizer'的外部程序来调用的,如下所示:

optim = ExternalProg(['./optimizer'])
optim.sendScalar(500) # send the optimizer the length of the state vector, for example
optim.sendArray(init_x) # the initial guess for x
optim.sendArray(init_g) # the initial gradient g
next_x = optim.readArray() # get the next estimate of x
next_g = evaluateGradient(next_x) # calculate gradient at next_x from within python
# repeat until convergence

在Fortran这边(也就是编译后生成可执行文件'optimizer'的程序),会读取一个包含500个元素的向量,具体操作如下:

read(*,*) input_vector(1:500)

然后会将数据写出,操作如下:

write(*,'(500f18.11)') output_vector(1:500)

就这样!我已经测试过,能够处理最多20万个元素的状态向量(这是我目前需要的上限)。希望这个方法对其他人也有帮助。这个解决方案在ifort和xlf90下有效,但在gfortran下却不行,原因我也不太明白。

撰写回答