在Python多进程中结合Pool.map和共享内存数组

70 投票
4 回答
55591 浏览
提问于 2025-04-15 15:40

我有一个非常大的(只读)数据数组,我想让多个进程同时处理这些数据。

我喜欢使用 Pool.map 函数,想用它来并行计算这些数据上的函数。

我看到可以使用 ValueArray 类在进程之间共享内存数据。但是当我尝试使用这些时,使用 Pool.map 函数时却出现了 RuntimeError: 'SynchronizedString objects should only be shared between processes through inheritance 的错误。

下面是我尝试做的一个简化示例:

from sys import stdin
from multiprocessing import Pool, Array

def count_it( arr, key ):
  count = 0
  for c in arr:
    if c == key:
      count += 1
  return count

if __name__ == '__main__':
  testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"
  # want to share it using shared memory
  toShare = Array('c', testData)

  # this works
  print count_it( toShare, "a" )

  pool = Pool()

  # RuntimeError here
  print pool.map( count_it, [(toShare,key) for key in ["a", "b", "s", "d"]] )

有人能告诉我我哪里做错了吗?

我想做的是在进程池中的进程创建后,传递有关新创建的共享内存数组的信息给这些进程。

4 个回答

7

我看到的问题是,Pool不支持通过它的参数列表来共享数据。这就是错误信息中提到的“对象应该通过继承在进程之间共享”的意思。要共享数据,它需要被继承,也就是说,如果你想用Pool类来共享数据,它必须是全局的。

如果你需要明确地传递这些数据,可能就得使用multiprocessing.Process。下面是你修改后的例子:

from multiprocessing import Process, Array, Queue

def count_it( q, arr, key ):
  count = 0
  for c in arr:
    if c == key:
      count += 1
  q.put((key, count))

if __name__ == '__main__':
  testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"
  # want to share it using shared memory
  toShare = Array('c', testData)

  q = Queue()
  keys = ['a', 'b', 's', 'd']
  workers = [Process(target=count_it, args = (q, toShare, key))
    for key in keys]

  for p in workers:
    p.start()
  for p in workers:
    p.join()
  while not q.empty():
    print q.get(),

输出: ('s', 9) ('a', 2) ('b', 3) ('d', 12)

队列中元素的顺序可能会有所不同。

为了让这个更通用,并且类似于Pool,你可以创建固定数量的进程,把键的列表分成N份,然后使用一个包装函数作为进程的目标,这个函数会对传入的每个键调用count_it,像这样:

def wrapper( q, arr, keys ):
  for k in keys:
    count_it(q, arr, k)
10

如果你看到这样的错误信息:

RuntimeError: 同步对象只能通过继承在进程之间共享

你可以考虑使用 multiprocessing.Manager,因为它没有这个限制。这个管理器的工作方式是,它假设自己是在一个完全独立的进程中运行。

import ctypes
import multiprocessing

# Put this in a method or function, otherwise it will run on import from each module:
manager = multiprocessing.Manager()
counter = manager.Value(ctypes.c_ulonglong, 0)
counter_lock = manager.Lock()  # pylint: disable=no-member

with counter_lock:
    counter.value = count = counter.value + 1
62

我再试一次,因为我刚看到悬赏;)

基本上,我觉得错误信息的意思就是它说的那样——多进程共享内存的数组不能作为参数传递(通过序列化)。把数据序列化没有意义,因为数据本身就是共享内存。所以你需要把共享数组设为全局变量。我觉得把它作为模块的属性放在一起更整洁,就像我之前的回答那样,但在你的例子中把它作为全局变量也很好。考虑到你不想在分叉之前设置数据,这里有一个修改过的例子。如果你想要有多个可能的共享数组(这就是你想把toShare作为参数传递的原因),你可以类似地创建一个全局的共享数组列表,然后只需把索引传递给count_it(这时代码会变成for c in toShare[i]:)。

from sys import stdin
from multiprocessing import Pool, Array, Process

def count_it( key ):
  count = 0
  for c in toShare:
    if c == key:
      count += 1
  return count

if __name__ == '__main__':
  # allocate shared array - want lock=False in this case since we 
  # aren't writing to it and want to allow multiple processes to access
  # at the same time - I think with lock=True there would be little or 
  # no speedup
  maxLength = 50
  toShare = Array('c', maxLength, lock=False)

  # fork
  pool = Pool()

  # can set data after fork
  testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"
  if len(testData) > maxLength:
      raise ValueError, "Shared array too small to hold data"
  toShare[:len(testData)] = testData

  print pool.map( count_it, ["a", "b", "s", "d"] )

[编辑:上面的代码在Windows上不工作,因为没有使用fork。不过,下面的代码在Windows上可以工作,仍然使用Pool,所以我觉得这最接近你想要的:

from sys import stdin
from multiprocessing import Pool, Array, Process
import mymodule

def count_it( key ):
  count = 0
  for c in mymodule.toShare:
    if c == key:
      count += 1
  return count

def initProcess(share):
  mymodule.toShare = share

if __name__ == '__main__':
  # allocate shared array - want lock=False in this case since we 
  # aren't writing to it and want to allow multiple processes to access
  # at the same time - I think with lock=True there would be little or 
  # no speedup
  maxLength = 50
  toShare = Array('c', maxLength, lock=False)

  # fork
  pool = Pool(initializer=initProcess,initargs=(toShare,))

  # can set data after fork
  testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"
  if len(testData) > maxLength:
      raise ValueError, "Shared array too small to hold data"
  toShare[:len(testData)] = testData

  print pool.map( count_it, ["a", "b", "s", "d"] )

不太明白为什么map不能序列化数组,但Process和Pool可以——我觉得可能是在Windows上子进程初始化时需要传输数据。请注意,数据仍然是在分叉之后设置的。

撰写回答