Python多处理中Pool.map与共享内存数组的结合

2024-04-24 05:00:40 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一个非常大的(只读)数据数组,我希望由多个进程并行处理。

我喜欢Pool.map函数,并希望使用它并行计算该数据上的函数。

我看到可以使用Value或Array类在进程之间使用共享内存数据。但是,当我尝试使用这个函数时,会得到一个RuntimeError:“使用Pool.map函数时,SynchronizedString对象只能通过继承在进程之间共享:

下面是一个简单的例子,说明我要做的事情:

from sys import stdin
from multiprocessing import Pool, Array

def count_it( arr, key ):
  count = 0
  for c in arr:
    if c == key:
      count += 1
  return count

if __name__ == '__main__':
  testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"
  # want to share it using shared memory
  toShare = Array('c', testData)

  # this works
  print count_it( toShare, "a" )

  pool = Pool()

  # RuntimeError here
  print pool.map( count_it, [(toShare,key) for key in ["a", "b", "s", "d"]] )

有人能告诉我我做错了什么吗?

所以我想做的是,在进程池中创建了一个新创建的共享内存分配数组之后,将其信息传递给进程。


Tags: 数据key函数fromimportmap进程count
3条回答

如果数据是只读的,只需将其作为模块中的变量,然后再将fork from Pool。那么所有的子进程都应该能够访问它,并且如果您不写入它,它就不会被复制。

import myglobals # anything (empty .py file)
myglobals.data = []

def count_it( key ):
    count = 0
    for c in myglobals.data:
        if c == key:
            count += 1
    return count

if __name__ == '__main__':
myglobals.data = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"

pool = Pool()
print pool.map( count_it, ["a", "b", "s", "d"] )

如果确实要尝试使用Array,但可以尝试使用lock=False关键字参数(默认情况下为true)。

我看到的问题是池不支持通过其参数列表挑选共享数据。这就是错误消息“对象只应通过继承在进程之间共享”的含义。如果要使用Pool类共享数据,则需要继承共享数据,即全局数据。

如果需要显式传递它们,则可能必须使用multiprocessing.Process。下面是您重新编写的示例:

from multiprocessing import Process, Array, Queue

def count_it( q, arr, key ):
  count = 0
  for c in arr:
    if c == key:
      count += 1
  q.put((key, count))

if __name__ == '__main__':
  testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"
  # want to share it using shared memory
  toShare = Array('c', testData)

  q = Queue()
  keys = ['a', 'b', 's', 'd']
  workers = [Process(target=count_it, args = (q, toShare, key))
    for key in keys]

  for p in workers:
    p.start()
  for p in workers:
    p.join()
  while not q.empty():
    print q.get(),

Output: ('s', 9) ('a', 2) ('b', 3) ('d', 12)

队列元素的顺序可能不同。

为了使此方法更通用,更类似于池,您可以创建固定数量的N个进程,将键列表拆分为N个片段,然后使用包装函数作为进程目标,它将为传递给它的列表中的每个键调用count_it,如:

def wrapper( q, arr, keys ):
  for k in keys:
    count_it(q, arr, k)

我刚看到赏金,又试了一次;)

基本上,我认为错误消息意味着它所说的-多处理共享内存数组不能作为参数(通过pickling)传递。串行化数据是没有意义的,关键是数据是共享内存。因此,必须使共享数组成为全局数组。我认为将它作为模块的属性(如我的第一个答案所示)更简洁,但是在您的示例中将它作为全局变量也可以很好地工作。考虑到您不想在fork之前设置数据的观点,下面是一个经过修改的示例。如果您希望有多个可能的共享数组(这就是为什么您希望将toShare作为参数传递),您可以类似地创建一个共享数组的全局列表,并只传递索引来计算它(它将变成for c in toShare[i]:)。

from sys import stdin
from multiprocessing import Pool, Array, Process

def count_it( key ):
  count = 0
  for c in toShare:
    if c == key:
      count += 1
  return count

if __name__ == '__main__':
  # allocate shared array - want lock=False in this case since we 
  # aren't writing to it and want to allow multiple processes to access
  # at the same time - I think with lock=True there would be little or 
  # no speedup
  maxLength = 50
  toShare = Array('c', maxLength, lock=False)

  # fork
  pool = Pool()

  # can set data after fork
  testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"
  if len(testData) > maxLength:
      raise ValueError, "Shared array too small to hold data"
  toShare[:len(testData)] = testData

  print pool.map( count_it, ["a", "b", "s", "d"] )

[编辑:由于没有使用fork,上述操作在windows上不起作用。但是,下面的操作在Windows上确实有效,仍然使用Pool,因此我认为这是最接近您所需的:

from sys import stdin
from multiprocessing import Pool, Array, Process
import mymodule

def count_it( key ):
  count = 0
  for c in mymodule.toShare:
    if c == key:
      count += 1
  return count

def initProcess(share):
  mymodule.toShare = share

if __name__ == '__main__':
  # allocate shared array - want lock=False in this case since we 
  # aren't writing to it and want to allow multiple processes to access
  # at the same time - I think with lock=True there would be little or 
  # no speedup
  maxLength = 50
  toShare = Array('c', maxLength, lock=False)

  # fork
  pool = Pool(initializer=initProcess,initargs=(toShare,))

  # can set data after fork
  testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"
  if len(testData) > maxLength:
      raise ValueError, "Shared array too small to hold data"
  toShare[:len(testData)] = testData

  print pool.map( count_it, ["a", "b", "s", "d"] )

不知道为什么map不会Pickle数组,但是Process和Pool会-我想它可能已经在windows上的子进程初始化时传输了。注意,数据仍然是在fork之后设置的。

相关问题 更多 >