持久的python子进程

14 投票
4 回答
20897 浏览
提问于 2025-04-17 10:56

有没有办法让Python中的子进程调用“持久化”?我需要多次调用一个程序,而这个程序加载起来比较慢。如果能让这个程序一直开着,然后和它进行交流,那就太好了。

我Python脚本的简单版本大概是这样的:

for text in textcollection:
    myprocess = subprocess.Popen(["myexecutable"],
                stdin = subprocess.PIPE, stdout = subprocess.PIPE,
                stderr = None)
    myoutputtext, err = myprocess.communicate(input=text)

我需要单独处理每个文本,所以把所有文本合并成一个大文件一次性处理是不行的。

如果有类似这样的选项:

myprocess = subprocess.Popen(["myexecutable"],
            stdin = subprocess.PIPE, stdout = subprocess.PIPE,
            stderr = None)    for text in textcollection:
for text in textcollection:
    myoutputtext, err = myprocess.communicate(input=text)

可以让我保持这个进程一直开着,那我会非常感激。

4 个回答

1

我觉得你想要的是

myprocess.stdin.write(text)

你可以创建一个 Popens 的列表,然后在另一个循环中对每个元素调用 communicate。大概是这样的

processes=[]
for text in textcollection:
    myprocess = subprocess.Popen(["myexecutable"],
                stdin = subprocess.PIPE, stdout = subprocess.PIPE,
                stderr = None)
    myprocess.stdin.write(text)
    processes.append(myprocess)

for proc in processes:
    myoutput, err=proc.communicate()
    #do something with the output here

这样的话,它就不需要等到所有的 Popens 都启动完才能继续执行了

5

你遇到的问题是因为调用了 communicate() 方法,这个方法会导致你的子进程被终止。根据 subprocess 的文档communicate() 方法的作用是:

与进程互动:向标准输入(stdin)发送数据。从标准输出(stdout)和标准错误(stderr)读取数据,直到文件结束。等待进程结束。

你想要做的是直接与 POpen 对象的 stdinstdout 属性进行互动,以便与子进程沟通。不过,文档建议不要这样做,原因是:

警告:使用 communicate() 而不是 .stdin.write.stdout.read.stderr.read,以避免因为其他操作系统管道缓冲区满而导致的死锁,从而阻塞子进程。

所以,你要么需要自己想办法解决可能出现的死锁问题,要么希望有人为你写了一个 异步子进程模块

编辑:这里有一个简单的例子,展示如何使用异步子进程模块:

import asyncsubprocess

textcollection = ['to', 'be', 'or', 'not', 'to be', 'that is the', 'question']

myprocess = asyncsubprocess.Popen(["cat"],
     stdin = asyncsubprocess.PIPE,
     stdout = asyncsubprocess.PIPE,
     stderr = None)

for text in textcollection:
    bytes_sent, myoutput, err = myprocess.listen(text)
    print text, bytes_sent, myoutput, err

当我运行这个时,它会打印:

to 2 to 
be 2 be 
or 2 or 
not 3 not 
to be 5 to be 
that is the 11 that is the 
question 8 question 
34

你可以使用 myprocess.stdin.write()myprocess.stdout.read() 来和你的子进程进行交流。不过,你需要注意处理缓冲区的问题,以免你的调用被阻塞。

如果你的子进程输出的内容是明确的,你可以通过行缓冲来可靠地与它沟通,使用 myprocess.stdout.readline() 就可以了。

下面是一个例子:

>>> p = subprocess.Popen(['cat'], bufsize=1, stdin=subprocess.PIPE, stdout=subprocess.PIPE)
>>> p.stdin.write('hello world\n')
>>> p.stdout.readline()
'hello world\n'
>>> p.stdout.readline()        # THIS CALL WILL BLOCK

在Unix系统中,还有一种替代的方法,就是把文件句柄设置为非阻塞模式。这样你就可以调用像 myprocess.stdout.read() 这样的函数,如果有数据可用,它会返回数据;如果没有数据,它会抛出一个 IOError 错误:

>>> p = subprocess.Popen(['cat'], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
>>> import fcntl, os
>>> fcntl.fcntl(p.stdout.fileno(), fcntl.F_SETFL, os.O_NONBLOCK)
0
>>> p.stdout.read()         # raises an exception instead of blocking
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
IOError: [Errno 11] Resource temporarily unavailable

这样你就可以做类似这样的事情:

fcntl.fcntl(p.stdout.fileno(), fcntl.F_SETFL, os.O_NONBLOCK)
for text in textcollection:
    myprocess.stdin.write(text + '\n')
    while True:
        myoutputtext = ''
        try:
            myoutputtext += myprocess.stdout.read()
        except IOError:
            pass
        if validate_output(myoutputtext):
            break
        time.sleep(.1)    # short sleep before attempting another read

在这个例子中,validate_output() 是你需要自己编写的一个函数,它会返回 True,如果你到目前为止收到的数据都是你预期的输出。

撰写回答