Python多进程问题?

5 投票
1 回答
2362 浏览
提问于 2025-04-16 20:38

我有一个文件夹,里面有500个输入文件(所有文件的总大小大约是500MB)。

我想写一个python脚本,完成以下几个步骤:

(1) 把所有的输入文件加载到内存中。

(2) 初始化一个空的python列表,后面会用到... 具体见(4)

(3) 启动15个不同的(独立的)进程:每个进程都使用相同的输入数据(来自(1)),但是使用不同的算法来处理这些数据,因此会生成不同的结果。

(4) 我希望所有独立的进程(来自步骤(3))把它们的输出存储在同一个python列表中(就是在步骤(2)中初始化的那个列表)。

一旦所有15个进程都完成了运行,我就会得到一个python列表,里面包含了这15个独立进程的所有结果。

我的问题是,这样在python中高效地实现是可能的吗?如果可以的话,能否提供一个示例代码或方案,说明如何做到这一点?

注意 #1: 我会在一台强大的多核服务器上运行这个脚本;所以目标是充分利用所有的处理能力,同时在所有独立进程之间共享一些内存(输入数据输出列表)。

注意 #2: 我是在Linux环境下工作。

1 个回答

6

好的,我刚刚用 zeromq 快速做了一个示例,展示了一个订阅者如何接收多个发布者的信息。你也可以用队列来实现这个功能,但那样你需要管理得更复杂一些。而zeromq的套接字(socket)使用起来非常简单,这让我觉得在这种情况下特别方便。

"""
demo of multiple processes doing processing and publishing the results
to a common subscriber
"""
from multiprocessing import Process


class Worker(Process):
    def __init__(self, filename, bind):
        self._filename = filename
        self._bind = bind
        super(Worker, self).__init__()

    def run(self):
        import zmq
        import time
        ctx = zmq.Context()
        result_publisher = ctx.socket(zmq.PUB)
        result_publisher.bind(self._bind)
        time.sleep(1)
        with open(self._filename) as my_input:
            for l in my_input.readlines():
                result_publisher.send(l)

if __name__ == '__main__':
    import sys
    import os
    import zmq

    #assume every argument but the first is a file to be processed
    files = sys.argv[1:]

    # create a worker for each file to be processed if it exists pass
    # in a bind argument instructing the socket to communicate via ipc
    workers = [Worker(f, "ipc://%s_%s" % (f, i)) for i, f \
               in enumerate((x for x in files if os.path.exists(x)))]

    # create subscriber socket
    ctx = zmq.Context()

    result_subscriber = ctx.socket(zmq.SUB)
    result_subscriber.setsockopt(zmq.SUBSCRIBE, "")

    # wire up subscriber to whatever the worker is bound to 
    for w in workers:
        print w._bind
        result_subscriber.connect(w._bind)

    # start workers
    for w in workers:
        print "starting workers..."
        w.start()

    result = []

    # read from the subscriber and add it to the result list as long
    # as at least one worker is alive
    while [w for w in workers if w.is_alive()]:
        result.append(result_subscriber.recv())
    else:
        # output the result
        print result

哦,对了,如果你想获取zmq,只需

$ pip install pyzmq-static

撰写回答