在线程函数中捕获subprocess.Popen的输出

-1 投票
1 回答
1033 浏览
提问于 2025-04-18 14:20

我有一段代码,里面用subprocess.Popen执行了4个命令。我正在处理日志文件,使用的代码如下。当我按顺序处理这些文件时,一切都很正常。现在我为每个文件创建了一个线程,以便可以同时处理,并把下面的函数绑定到每个线程上。但是,有些线程能得到我想要的结果,而有些却报错了。

代码:

def process_log_file(file):
    proc= subprocess.Popen(['python27', 'countmapper.py',"C:\\pythonPrograms\\04-03-2014\\17IL\\"+file],cwd="C:\pythonPrograms\\",stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
    out, err = proc.communicate()
    sortedop= subprocess.Popen(['sort'],cwd="C:\pythonPrograms\\",stdout=subprocess.PIPE,stdin=subprocess.PIPE, stderr=subprocess.STDOUT)
    out, err = sortedop.communicate(out)
    countReducer= subprocess.Popen(['python27', 'countreducer.py'],cwd="C:\pythonPrograms\\",stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.STDOUT)
    out, err = countReducer.communicate(out)
    countpostprocesser= subprocess.Popen(['python27', 'countpostprocesser.py'],cwd="C:\pythonPrograms\\",stdin=subprocess.PIPE,stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
    out, err = countpostprocesser.communicate(out)
    jsondata2=json.loads(out)
    fd=open(file+".json","w")
    json.dump(jsondata2,fd,sort_keys=True,indent=2)
    fd.close()
    return

收到的错误:

Exception in thread Thread-42:
Traceback (most recent call last):
  File "C:\Python27\lib\threading.py", line 810, in __bootstrap_inner
    self.run()
  File "C:\Python27\lib\threading.py", line 763, in run
    self.__target(*self.__args, **self.__kwargs)
  File "C:\pythonPrograms\counts_batch_threading.py", line 45, in process_log_file
    jsondata2=json.loads(out)
  File "C:\Python27\lib\json\__init__.py", line 338, in loads
    return _default_decoder.decode(s)
  File "C:\Python27\lib\json\decoder.py", line 365, in decode
    obj, end = self.raw_decode(s, idx=_w(s, 0).end())
  File "C:\Python27\lib\json\decoder.py", line 383, in raw_decode
    raise ValueError("No JSON object could be decoded")
ValueError: No JSON object could be decoded

用于创建线程的代码:

for file in glob.glob("SAMPLE*.log"):
        thread1 = threading.Thread(target=process_log_file,args=(str(file),))
        threads.append(thread1)
        thread1.start()        

# Wait for all threads to complete
for t in threads:
    t.join()

有人能帮我解决这个问题吗?

1 个回答

-1

你的错误可能是因为 Popen 是非阻塞的,也就是说它会立即尝试读取结果,而不是等程序结束后再读取。你可以试试使用 check_call,这个函数是阻塞的,它会让你等到结果出来再继续。

撰写回答