在运行子进程时检查条件

1 投票
2 回答
2321 浏览
提问于 2025-04-17 21:07

我正在使用subprocess.Popen()来执行一个命令。我希望在这个命令执行完之前,不继续执行后面的代码。同时,我想在这个命令运行2分钟后检查一下生成的文件状态。如果文件的大小是零,我就想停止这个过程。目前我的代码是这样的。有没有更聪明的方法来做到这一点?

  def execute(command,outputfilename):
    process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
    start_time=time.time()
    Is_Working=False
    while True:
     process.poll()
     if not Is_Working:
         #allow 2 minutes for the process to create results
         if process.returncode == None and (time.time()-start_time)//60>1:
             Is_Working=True
             if (os.stat(outputfilename)[6]==0):
                 process.kill()
                 return
     if process.returncode != None: 
         break

    output, errors=process.communicate()

2 个回答

0

如果想要在2分钟内结束一个子进程,前提是outputfilename是空的(我假设outputfilename是被其他外部进程修改的),你可以使用threading.Timer来实现:

import os
from subprocess import Popen, PIPE
from threading import Timer

def kill_if_empty(proc, filename):
    if proc.poll() is None and os.path.getsize(filename) == 0:
       proc.kill()

def execute(command, outputfilename):
    p = Popen(command, stdout=PIPE, stderr=PIPE)
    Timer(2*60, kill_if_empty, [p, outputfilename]).start()
    output, errors = p.communicate()
    ...

这段代码会分别收集标准输出和错误输出,并且避免了你在问题中提到的可能出现的死锁情况。

1

总体来看,你的代码看起来不错。只有几个小细节需要注意:

  1. (time.time()-start_time)//60>1 这一行,我觉得用 // 是多余的,因为你不一定需要把结果向下取整,直接用浮点数进行比较就可以了,这都是基本的机器逻辑;
  2. 你可以通过把循环条件改成 while process.returncode is not None:… 来避免无限循环的情况;
  3. 为了让代码更简单,我建议你可以一直循环,直到文件大小不等于0,然后在循环结束后再调用 process.wait()
  4. 这样的话,就很适合使用 while/else 这个结构。

所以,一个改进的建议是,这样你可以在进程结束后做一些事情(比如清理或重试……),无论它是成功还是失败:

def execute(command,outputfilename):
    process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
    start_time=time.time()
    Is_Working=False
    while process.returncode == None:
        process.poll()
        #allow 2 minutes for the process to create results
        if (time.time()-start_time)/60 > 1:
            if (os.stat(outputfilename)[6]==0):
                process.kill()
                break
    else:
        # the process has not been killed
        # wait until it finishes
        process.wait()
        output, errors=process.communicate()
        # do stuff with the output
        […]

    # here the process may have been killed or not
    […]

或者另一个不错的选择是抛出一个异常:

def execute(command,outputfilename):
    process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
    start_time=time.time()
    Is_Working=False
    while process.returncode == None:
        process.poll()
        #allow 2 minutes for the process to create results
        if (time.time()-start_time)/60 > 1:
            if (os.stat(outputfilename)[6]==0):
                process.kill()
                raise Exception("Process has failed to do its job")
    # the process has not been killed
    # wait until it finishes
    process.wait()
    output, errors=process.communicate()
    # do stuff with the output
    […]

希望这些对你有帮助。

撰写回答