多进程通过共享内存传递字典数组

6 投票
2 回答
4990 浏览
提问于 2025-04-19 08:45

下面的代码可以运行,但因为要处理大量数据,所以速度很慢。在实际使用中,创建进程和发送数据所花的时间几乎和计算时间一样长,因此当第二个进程创建时,第一个进程的计算几乎已经完成,这样并行处理就没什么意义了。

这段代码和这个问题中的代码是一样的,“多进程在合并结果时对992个整数有截止限制”,下面的修改建议已经实现了。不过,我遇到了一个常见的问题,估计是因为处理大数据时,序列化(也就是把数据转成可以存储或传输的格式)需要很长时间。

我看到有些回答提到使用multiprocessing.array来传递共享内存数组。我有一个大约4000个索引的数组,但每个索引都有一个包含200个键值对的字典。每个进程只读取这些数据,进行一些计算,然后返回一个矩阵(4000x3),里面没有字典。

像这样的回答 “共享只读数据是否会被复制到不同的进程中?” 使用了map。是否可以保持下面的系统并实现共享内存?有没有有效的方法可以将字典数组的数据发送给每个进程,比如把字典放在某个管理器里,然后把它放进multiprocessing.array中?

import multiprocessing

def main():
    data = {}
    total = []
    for j in range(0,3000):
        total.append(data)
        for i in range(0,200):
            data[str(i)] = i

    CalcManager(total,start=0,end=3000)

def CalcManager(myData,start,end):
    print 'in calc manager'
    #Multi processing
    #Set the number of processes to use.  
    nprocs = 3
    #Initialize the multiprocessing queue so we can get the values returned to us
    tasks = multiprocessing.JoinableQueue()
    result_q = multiprocessing.Queue()
    #Setup an empty array to store our processes
    procs = []
    #Divide up the data for the set number of processes 
    interval = (end-start)/nprocs 
    new_start = start
    #Create all the processes while dividing the work appropriately
    for i in range(nprocs):
        print 'starting processes'
        new_end = new_start + interval
        #Make sure we dont go past the size of the data 
        if new_end > end:
            new_end = end 
        #Generate a new process and pass it the arguments 
        data = myData[new_start:new_end]
        #Create the processes and pass the data and the result queue
        p = multiprocessing.Process(target=multiProcess,args=(data,new_start,new_end,result_q,i))
        procs.append(p)
        p.start()
        #Increment our next start to the current end 
        new_start = new_end+1
    print 'finished starting'    

    #Print out the results
    for i in range(nprocs):
        result = result_q.get()
        print result

    #Joint the process to wait for all data/process to be finished
    for p in procs:
        p.join()

#MultiProcess Handling
def multiProcess(data,start,end,result_q,proc_num):
    print 'started process'
    results = []
    temp = []
    for i in range(0,22):
        results.append(temp)
        for j in range(0,3):
            temp.append(j)
    result_q.put(results)
    return

if __name__== '__main__':   
    main()

解决方案

通过把字典列表放进一个管理器,问题就解决了。

manager=Manager()
d=manager.list(myData)

看起来管理器不仅管理这个列表,还管理列表中的字典。启动时间有点慢,似乎数据还是在被复制,但这只在开始时做一次,然后在进程内部就可以直接使用这些数据。

import multiprocessing
import multiprocessing.sharedctypes as mt
from multiprocessing import Process, Lock, Manager
from ctypes import Structure, c_double

def main():
    data = {}
    total = []
    for j in range(0,3000):
        total.append(data)
        for i in range(0,100):
            data[str(i)] = i

    CalcManager(total,start=0,end=500)

def CalcManager(myData,start,end):
    print 'in calc manager'
    print type(myData[0])

    manager = Manager()
    d = manager.list(myData)

    #Multi processing
    #Set the number of processes to use.  
    nprocs = 3
    #Initialize the multiprocessing queue so we can get the values returned to us
    tasks = multiprocessing.JoinableQueue()
    result_q = multiprocessing.Queue()
    #Setup an empty array to store our processes
    procs = []
    #Divide up the data for the set number of processes 
    interval = (end-start)/nprocs 
    new_start = start
    #Create all the processes while dividing the work appropriately
    for i in range(nprocs):
        new_end = new_start + interval
        #Make sure we dont go past the size of the data 
        if new_end > end:
            new_end = end 
        #Generate a new process and pass it the arguments 
        data = myData[new_start:new_end]
        #Create the processes and pass the data and the result queue
        p = multiprocessing.Process(target=multiProcess,args=(d,new_start,new_end,result_q,i))
        procs.append(p)
        p.start()
        #Increment our next start to the current end 
        new_start = new_end+1
    print 'finished starting'    

    #Print out the results
    for i in range(nprocs):
        result = result_q.get()
        print len(result)

    #Joint the process to wait for all data/process to be finished
    for p in procs:
        p.join()

#MultiProcess Handling
def multiProcess(data,start,end,result_q,proc_num):
    #print 'started process'
    results = []
    temp = []
    data = data[start:end]
    for i in range(0,22):
        results.append(temp)
        for j in range(0,3):
            temp.append(j)
    print len(data)        
    result_q.put(results)
    return

if __name__ == '__main__':
    main()

2 个回答

2

看了你的问题,我假设以下几点:

  • 你想对每个 myData 中的项目返回一个输出(某种矩阵)
  • 你创建了一个可连接的队列(tasks),可能是用来存放输入的,但不太确定怎么使用它

代码

import logging
import multiprocessing


def create_logger(logger_name):
    ''' Create a logger that log to the console '''
    logger = logging.getLogger(logger_name)
    logger.setLevel(logging.DEBUG)

    # create console handler and set appropriate level
    ch = logging.StreamHandler()
    formatter = logging.Formatter("%(processName)s %(funcName)s() %(levelname)s: %(message)s")
    ch.setFormatter(formatter)
    logger.addHandler(ch)
    return logger

def main():
    global logger
    logger = create_logger(__name__)
    logger.info('Main started')
    data = []
    for i in range(0,100):
        data.append({str(i):i})

    CalcManager(data,start=0,end=50)
    logger.info('Main ended')

def CalcManager(myData,start,end):
    logger.info('CalcManager started')
    #Initialize the multiprocessing queue so we can get the values returned to us
    tasks = multiprocessing.JoinableQueue()
    results = multiprocessing.Queue()

    # Add tasks
    for i in range(start, end):
        tasks.put(myData[i])

    # Create processes to do work
    nprocs = 3
    for i in range(nprocs):
        logger.info('starting processes')
        p = multiprocessing.Process(target=worker,args=(tasks,results))
        p.daemon = True
        p.start()

    # Wait for tasks completion, i.e. tasks queue is empty
    try:
        tasks.join()
    except KeyboardInterrupt:
        logger.info('Cancel tasks')

    # Print out the results
    print 'RESULTS'
    while not results.empty():
        result = results.get()
        print result

    logger.info('CalManager ended')

def worker(tasks, results):
    while True:
        try:
            task = tasks.get()  # one row of input
            task['done'] = True # simular work being done
            results.put(task)   # Save the result to the output queue
        finally:
            # JoinableQueue: for every get(), we need a task_done()
            tasks.task_done()


if __name__== '__main__':   
    main()

讨论

  • 对于多个进程的情况,我建议使用 logging 模块,因为它有几个好处:
    • 它是线程和进程安全的;这意味着你不会遇到一个进程的输出和另一个进程的输出混在一起的情况
    • 你可以配置日志记录来显示进程名称和函数名称,这对调试非常有帮助
  • CalcManager 本质上是一个任务管理器,它做以下事情:
    1. 创建三个进程
    2. 填充输入队列 tasks
    3. 等待任务完成
    4. 打印结果
  • 注意,当创建进程时,我把它们标记为 守护进程,这意味着当主程序退出时,它们会被自动终止。你不需要担心手动结束它们
  • worker 是实际工作的地方:
    • 它们会一直运行(while True 循环)
    • 每次循环时,它们会获取一个输入单位,进行一些处理,然后把结果放到输出中
    • 任务完成后,它会调用 task_done(),这样主进程就知道所有工作都完成了。我把 task_done 放在 finally 语句中,以确保即使在处理过程中发生错误,它也会被执行
2

你可以通过使用一个 multiprocessing.Manager 来存储你的列表,这样可能会看到一些性能提升。具体来说,你可以把列表放在一个管理服务器上,然后让每个子进程从这个共享的列表中获取数据,而不是把列表的切片复制到每个子进程中:

def CalcManager(myData,start,end):
    print 'in calc manager'
    print type(myData[0])

    manager = Manager()
    d = manager.list(myData)

    nprocs = 3 
    result_q = multiprocessing.Queue()
    procs = []

    interval = (end-start)/nprocs 
    new_start = start

    for i in range(nprocs):
        new_end = new_start + interval
        if new_end > end:
            new_end = end 
        p = multiprocessing.Process(target=multiProcess,
                                    args=(d, new_start, new_end, result_q, i))
        procs.append(p)
        p.start()
        #Increment our next start to the current end 
        new_start = new_end+1
    print 'finished starting'        

    for i in range(nprocs):
        result = result_q.get()
        print len(result)

    #Joint the process to wait for all data/process to be finished
    for p in procs:
        p.join()

这个方法会在创建任何工作进程之前,把你的整个 data 列表复制到一个 Manager 进程中。Manager 会返回一个 Proxy 对象,这个对象可以让你共享访问这个 list。接着,你只需要把这个 Proxy 传给工作进程,这样它们的启动时间就会大大减少,因为不再需要复制 data 列表的切片。这里的缺点是,子进程访问列表的速度会变慢,因为访问需要通过进程间通信(IPC)去到管理进程。这个方法是否真的能提高性能,主要取决于你在工作进程中对 list 的具体操作,但尝试一下是值得的,因为这只需要很少的代码修改。

撰写回答