Python:并行执行cat子进程

19 投票
2 回答
18164 浏览
提问于 2025-04-18 06:18

我在一个远程服务器上运行几个 cat | zgrep 命令,并单独收集它们的输出,以便后续处理:

class MainProcessor(mp.Process):
    def __init__(self, peaks_array):
        super(MainProcessor, self).__init__()
        self.peaks_array = peaks_array

    def run(self):
        for peak_arr in self.peaks_array:
            peak_processor = PeakProcessor(peak_arr)
            peak_processor.start()

class PeakProcessor(mp.Process):
    def __init__(self, peak_arr):
        super(PeakProcessor, self).__init__()
        self.peak_arr = peak_arr

    def run(self):
        command = 'ssh remote_host cat files_to_process | zgrep --mmap "regex" '
        log_lines = (subprocess.check_output(command, shell=True)).split('\n')
        process_data(log_lines)

不过,这样做会导致这些子进程(也就是 'ssh ... cat ...' 命令)一个接一个地执行。第二个命令得等第一个命令完成后才能开始,依此类推。

我该如何修改这段代码,让这些子进程可以同时运行,同时又能单独收集每个命令的输出呢?

2 个回答

41

你不需要用 multiprocessingthreading 来同时运行子进程。比如说:

#!/usr/bin/env python
from subprocess import Popen

# run commands in parallel
processes = [Popen("echo {i:d}; sleep 2; echo {i:d}".format(i=i), shell=True)
             for i in range(5)]
# collect statuses
exitcodes = [p.wait() for p in processes]

这段代码可以同时运行5个命令。注意:这里没有使用线程或者 multiprocessing 模块。给命令加上&符号是没有意义的,因为 Popen 不会等命令执行完。你需要明确调用 .wait() 来等待。

虽然使用线程来收集子进程的输出很方便,但其实并不是必须的:

#!/usr/bin/env python
from multiprocessing.dummy import Pool # thread pool
from subprocess import Popen, PIPE, STDOUT

# run commands in parallel
processes = [Popen("echo {i:d}; sleep 2; echo {i:d}".format(i=i), shell=True,
                   stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
             for i in range(5)]

# collect output in parallel
def get_lines(process):
    return process.communicate()[0].splitlines()

outputs = Pool(len(processes)).map(get_lines, processes)

相关内容:Python threading multiple bash subprocesses?.

下面是一个代码示例,它可以在同一个线程中同时获取多个子进程的输出(适用于Python 3.8及以上版本):

#!/usr/bin/env python3
import asyncio
import sys
from subprocess import PIPE, STDOUT


async def get_lines(shell_command):
    p = await asyncio.create_subprocess_shell(
        shell_command, stdin=PIPE, stdout=PIPE, stderr=STDOUT
    )
    return (await p.communicate())[0].splitlines()


async def main():
    # get commands output in parallel
    coros = [
        get_lines(
            f'"{sys.executable}" -c "print({i:d}); import time; time.sleep({i:d})"'
        )
        for i in range(5)
    ]
    print(await asyncio.gather(*coros))


if __name__ == "__main__":
    asyncio.run(main())

旧的(2014年)答案(Python 3.4?):

#!/usr/bin/env python3
import asyncio
import sys
from asyncio.subprocess import PIPE, STDOUT

@asyncio.coroutine
def get_lines(shell_command):
    p = yield from asyncio.create_subprocess_shell(shell_command,
            stdin=PIPE, stdout=PIPE, stderr=STDOUT)
    return (yield from p.communicate())[0].splitlines()

if sys.platform.startswith('win'):
    loop = asyncio.ProactorEventLoop() # for subprocess' pipes on Windows
    asyncio.set_event_loop(loop)
else:
    loop = asyncio.get_event_loop()

# get commands output in parallel
coros = [get_lines('"{e}" -c "print({i:d}); import time; time.sleep({i:d})"'
                    .format(i=i, e=sys.executable)) for i in range(5)]
print(loop.run_until_complete(asyncio.gather(*coros)))
loop.close()
0

另一种方法(除了之前提到的将 shell 进程放在后台)是使用 多线程

你现在的 run 方法可以这样做:

thread.start_new_thread ( myFuncThatDoesZGrep)

要收集结果,你可以这样做:

class MyThread(threading.Thread):
   def run(self):
       self.finished = False
       # Your code to run the command here.
       blahBlah()
       # When finished....
       self.finished = True
       self.results = []

按照上面链接中的多线程方法运行线程。当你的线程对象的 myThread.finished 等于 True 时,你就可以通过 myThread.results 来收集结果。

撰写回答