从Python子进程获取输出并发送命令

4 投票
1 回答
1958 浏览
提问于 2025-04-17 07:09

我正在尝试从一个子进程中获取输出,然后根据之前的输出给这个进程发送命令。我需要根据程序的需要,进行多次这样的操作。(如果可以的话,我还希望能隐藏子进程的命令提示符)。

我以为这会是个简单的任务,因为我看到这个问题在2003年的帖子中就有人讨论过,而现在快到2012年了,这似乎是个很常见的需求,理应是任何编程语言的基本功能。但显然我错了,过了将近9年,居然还是没有一种稳定、不破坏、跨平台的标准方法来完成这个任务!

我对文件输入输出、缓冲和线程的理解不多,所以我希望能找到一个尽可能简单的解决方案。如果有一个与python 3.x兼容的模块可以实现这个功能,我非常愿意下载它。我知道有很多问题基本上在问同样的事情,但我还没有找到一个能解决我想完成的简单任务的答案。

这是我目前根据各种来源写的代码;不过我完全不知道接下来该怎么做。我的所有尝试都以失败告终,有些甚至让我的CPU使用率达到100%(基本上什么也没做),而且无法退出。

import subprocess
from subprocess import Popen, PIPE
p = Popen(r'C:\postgis_testing\shellcomm.bat',stdin=PIPE,stdout=PIPE,stderr=subprocess.STDOUT shell=True)
stdout,stdin = p.communicate(b'command string')

如果我的问题不清楚,我会贴出一个示例批处理文件的文本,这个文件展示了在需要向子进程发送多个命令的情况下(如果你输入了错误的命令字符串,程序会循环)。

@echo off
:looper
set INPUT=
set /P INPUT=Type the correct command string:
if "%INPUT%" == "command string" (echo you are correct) else (goto looper)

如果有人能帮我,我会非常感激,我相信很多人也会!

编辑:这是使用eryksun的代码(下一条帖子)得到的功能代码:

import subprocess
import threading
import time
import sys

try: 
    import queue
except ImportError:
    import Queue as queue

def read_stdout(stdout, q, p):
    it = iter(lambda: stdout.read(1), b'')
    for c in it:
        q.put(c)
        if stdout.closed:
            break

_encoding = getattr(sys.stdout, 'encoding', 'latin-1')
def get_stdout(q, encoding=_encoding):
    out = []
    while 1:
        try:
            out.append(q.get(timeout=0.2))
        except queue.Empty:
            break
    return b''.join(out).rstrip().decode(encoding)

def printout(q):
    outdata = get_stdout(q)
    if outdata:
        print('Output: %s' % outdata)

if __name__ == '__main__':
    #setup
    p = subprocess.Popen(['shellcomm.bat'], stdin=subprocess.PIPE, 
                     stdout=subprocess.PIPE, stderr=subprocess.PIPE, 
                     bufsize=0, shell=True) # I put shell=True to hide prompt
    q = queue.Queue()
    encoding = getattr(sys.stdin, 'encoding', 'utf-8')

    #for reading stdout
    t = threading.Thread(target=read_stdout, args=(p.stdout, q, p))
    t.daemon = True
    t.start()

    #command loop
    while p.poll() is None:
        printout(q)
        cmd = input('Input: ')
        cmd = (cmd + '\n').encode(encoding)
        p.stdin.write(cmd)
        time.sleep(0.1) # I added this to give some time to check for closure (otherwise it doesn't work)

    #tear down
    for n in range(4):
        rc = p.poll()
        if rc is not None:
            break
        time.sleep(0.25)
    else:
        p.terminate()
        rc = p.poll()
        if rc is None:
            rc = 1

    printout(q)
    print('Return Code: %d' % rc)

但是当从命令提示符运行脚本时,会发生以下情况:

C:\Users\username>python C:\postgis_testing\shellcomm7.py
Input: sth
Traceback (most recent call last):
File "C:\postgis_testing\shellcomm7.py", line 51, in <module>
    p.stdin.write(cmd)
IOError: [Errno 22] Invalid argument

似乎程序在从命令提示符运行时会关闭。有什么想法吗?

1 个回答

3

这个示例使用了一个专门的线程来读取标准输出。如果你多找找,肯定能找到一个更完整的实现,它是用面向对象的方式写的。至少我可以说,这个方法在你提供的批处理文件上,在Python 2.7.2和3.2.2中都能正常工作。

shellcomm.bat:

@echo off
echo Command Loop Test
echo.
:looper
set INPUT=
set /P INPUT=Type the correct command string:
if "%INPUT%" == "command string" (echo you are correct) else (goto looper)

根据命令“wrong”、“still wrong”和“command string”的顺序,我得到的输出是:

Output:
Command Loop Test

Type the correct command string:
Input: wrong
Output:
Type the correct command string:
Input: still wrong
Output:
Type the correct command string:
Input: command string
Output:
you are correct

Return Code: 0

在读取管道输出时,readline有时能用,但批处理文件中的set /P INPUT自然是不会写入行结束符的。所以我用了lambda: stdout.read(1)来一次读取一个字节(虽然效率不高,但能用)。这个读取函数会把数据放到一个队列里。主线程在写入命令后,从队列中获取输出。在这里对get调用使用超时设置,可以让它等一小会儿,以确保程序在等待输入。你也可以检查输出中的提示符,来判断程序什么时候在期待输入。

不过,像这样的设置不能指望在所有情况下都能工作,因为你想要交互的控制台程序可能在管道输出时会进行缓冲。在Unix系统中,有一些工具命令可以插入到管道中,以修改缓冲方式,使其不缓冲、行缓冲或设置特定大小——比如stdbuf。还有一些方法可以让程序误以为它连接到了一个伪终端(可以看看pexpect)。不过,如果你没有程序源代码的访问权限,想在Windows上解决这个问题,我就不知道有什么办法可以明确设置缓冲了,像setvbuf那样。

import subprocess
import threading
import time
import sys

if sys.version_info.major >= 3:
    import queue
else:
    import Queue as queue
    input = raw_input

def read_stdout(stdout, q):
    it = iter(lambda: stdout.read(1), b'')
    for c in it:
        q.put(c)
        if stdout.closed:
            break

_encoding = getattr(sys.stdout, 'encoding', 'latin-1')
def get_stdout(q, encoding=_encoding):
    out = []
    while 1:
        try:
            out.append(q.get(timeout=0.2))
        except queue.Empty:
            break
    return b''.join(out).rstrip().decode(encoding)

def printout(q):
    outdata = get_stdout(q)
    if outdata:
        print('Output:\n%s' % outdata)

if __name__ == '__main__':

    ARGS = ["shellcomm.bat"]   ### Modify this

    #setup
    p = subprocess.Popen(ARGS, bufsize=0, stdin=subprocess.PIPE, 
                         stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    q = queue.Queue()
    encoding = getattr(sys.stdin, 'encoding', 'utf-8')

    #for reading stdout
    t = threading.Thread(target=read_stdout, args=(p.stdout, q))
    t.daemon = True
    t.start()

    #command loop
    while 1:
        printout(q)
        if p.poll() is not None or p.stdin.closed:
            break
        cmd = input('Input: ') 
        cmd = (cmd + '\n').encode(encoding)
        p.stdin.write(cmd)

    #tear down
    for n in range(4):
        rc = p.poll()
        if rc is not None:
            break
        time.sleep(0.25)
    else:
        p.terminate()
        rc = p.poll()
        if rc is None:
            rc = 1

    printout(q)
    print('\nReturn Code: %d' % rc)

撰写回答