为何通过共享内存的通信比通过队列慢得多?

15 投票
1 回答
6505 浏览
提问于 2025-04-18 17:02

我在一台比较新的苹果MacBook Pro上使用Python 2.7.5,这台电脑有四个物理CPU和八个逻辑CPU,也就是说,使用系统工具可以看到:

$ sysctl hw.physicalcpu
hw.physicalcpu: 4
$ sysctl hw.logicalcpu
hw.logicalcpu: 8

我需要对一个很大的1维列表或数组进行一些复杂的处理,然后把结果保存为中间输出,以便在后面的计算中再次使用。我的问题结构比较适合并行处理,所以我想试试用Python的多进程模块,把这个1维数组分成几块(可能是4块或8块,我还没决定),然后并行计算,最后再把结果组合成最终的格式。我在考虑是用multiprocessing.Queue()(消息队列)还是multiprocessing.Array()(共享内存)来作为子进程把计算结果传回主进程的方式。我还在做一些简单的实验,以确保我理解多进程模块的工作原理。不过,我遇到了一个意想不到的结果:在创建两个基本相同的解决方案时,使用共享内存进行进程间通信的版本似乎需要的执行时间要多得多(大约多了30倍!),而使用消息队列的版本则快得多。下面,我提供了两个不同版本的示例代码,都是为了解决一个“玩具”问题,这个问题使用并行进程生成一长串随机数,并以两种不同的方式将结果传回主进程:第一次使用消息队列,第二次使用共享内存。

这是使用消息队列的版本:

import random
import multiprocessing
import datetime

def genRandom(count, id, q):

    print("Now starting process {0}".format(id))
    output = []
    # Generate a list of random numbers, of length "count"
    for i in xrange(count):
        output.append(random.random())
    # Write the output to a queue, to be read by the calling process 
    q.put(output)

if __name__ == "__main__":
    # Number of random numbers to be generated by each process
    size = 1000000
    # Number of processes to create -- the total size of all of the random
    # numbers generated will ultimately be (procs * size)
    procs = 4

    # Create a list of jobs and queues 
    jobs = []
    outqs = []
    for i in xrange(0, procs):
        q = multiprocessing.Queue()
        p = multiprocessing.Process(target=genRandom, args=(size, i, q))
        jobs.append(p)
        outqs.append(q)

    # Start time of the parallel processing and communications section
    tstart = datetime.datetime.now()    
    # Start the processes (i.e. calculate the random number lists)      
    for j in jobs:
        j.start()

    # Read out the data from the queues
    data = []
    for q in outqs:
        data.extend(q.get())

    # Ensure all of the processes have finished
    for j in jobs:
        j.join()
    # End time of the parallel processing and communications section
    tstop = datetime.datetime.now()
    tdelta = datetime.timedelta.total_seconds(tstop - tstart)

    msg = "{0} random numbers generated in {1} seconds"
    print(msg.format(len(data), tdelta))

当我运行它时,得到的结果通常是这样的:

$ python multiproc_queue.py
Now starting process 0
Now starting process 1
Now starting process 2
Now starting process 3
4000000 random numbers generated in 0.514805 seconds

现在,这是相应的代码段,但稍微修改了一下,使用共享内存而不是队列:

import random
import multiprocessing
import datetime

def genRandom(count, id, d):

    print("Now starting process {0}".format(id))
    # Generate a list of random numbers, of length "count", and write them
    # directly to a segment of an array in shared memory
    for i in xrange(count*id, count*(id+1)):
        d[i] = random.random()

if __name__ == "__main__":
    # Number of random numbers to be generated by each process
    size = 1000000
    # Number of processes to create -- the total size of all of the random
    # numbers generated will ultimately be (procs * size)
    procs = 4

    # Create a list of jobs and a block of shared memory
    jobs = []
    data = multiprocessing.Array('d', size*procs)
    for i in xrange(0, procs):
        p = multiprocessing.Process(target=genRandom, args=(size, i, data))
        jobs.append(p)

    # Start time of the parallel processing and communications section
    tstart = datetime.datetime.now()    
    # Start the processes (i.e. calculate the random number lists)      
    for j in jobs:
        j.start()

    # Ensure all of the processes have finished
    for j in jobs:
    j.join()
    # End time of the parallel processing and communications section
    tstop = datetime.datetime.now()
    tdelta = datetime.timedelta.total_seconds(tstop - tstart)

    msg = "{0} random numbers generated in {1} seconds"
    print(msg.format(len(data), tdelta))

然而,当我运行共享内存版本时,得到的典型结果看起来更像这样:

$ python multiproc_shmem.py 
Now starting process 0
Now starting process 1
Now starting process 2
Now starting process 3
4000000 random numbers generated in 15.839607 seconds

我的问题是:为什么这两个版本的执行速度差别这么大(大约0.5秒对比15秒,差了30倍!)?特别是,我该如何修改共享内存版本,以使其运行得更快呢?

1 个回答

28

这是因为 multiprocessing.Array 默认会使用一个锁,来防止多个进程同时访问它:

multiprocessing.Array(typecode_or_type, size_or_initializer, *, lock=True)

...

如果锁是 True(默认值),那么会创建一个新的锁对象来同步对这个值的访问。如果锁是一个 Lock 或 RLock 对象,那么就会使用这个对象来同步访问。如果锁是 False,那么返回的对象在访问时不会自动受到锁的保护,所以它不一定是“进程安全”的。

这意味着你并不是在同时写入数组——一次只有一个进程可以访问它。由于你的示例工作进程几乎只是进行数组写入,不断等待这个锁会严重影响性能。如果在创建数组时使用 lock=False,性能会好很多:

使用 lock=True 时:

Now starting process 0
Now starting process 1
Now starting process 2
Now starting process 3
4000000 random numbers generated in 4.811205 seconds

使用 lock=False 时:

Now starting process 0
Now starting process 3
Now starting process 1
Now starting process 2
4000000 random numbers generated in 0.192473 seconds

需要注意的是,使用 lock=False 意味着你需要手动保护对 Array 的访问,特别是在进行一些不安全的操作时。你的示例中,进程写入的是独特的部分,所以这样做是可以的。但如果你在写入的同时还想读取,或者不同的进程写入重叠的部分,那你就需要手动获取锁了。

撰写回答