分别在并行进程中改变不同的python对象

2024-04-26 10:10:59 发布

您现在位置:Python中文网/ 问答频道 /正文

简而言之

我想并发地更改复杂的python对象,这样每个对象只由一个进程处理。我怎样才能做到这一点(最有效)?实施某种酸洗支持会有帮助吗?那会有效率吗?在

完整问题

我有一个python数据结构ArrayDict,它基本上由numpy数组和字典组成,并将任意索引映射到数组中的行。在我的例子中,所有的键都是整数。在

a = ArrayDict()

a[1234] = 12.5
a[10] = 3

print(a[1234])                               #12.5
print(a[10])                                 # 3.0

print(a[1234] == a.array[a.indexDict[1234]]) #true

现在我有多个这样的ArrayDict,并希望将它们填充到myMethod(arrayDict, params)。由于myMethod很昂贵,所以我想并行运行它。请注意,myMethod可能会向arrayDict添加许多行。每个进程都会改变自己的ArrayDict。我不需要并发访问ArrayDicts

myMethod中,我更改arrayDict中的条目(也就是说,我更改了numpy数组),我向arrayDict添加条目(也就是说,我向字典中添加另一个索引,并在内部数组中写入一个新值)。最后,我希望能够在arrayDict的内部numpy数组太小时交换它。这种情况并不经常发生,如果没有更好的解决方案,我可以在程序的非并行部分执行此操作。即使没有数组交换,我自己的尝试也没有成功。在

我花了几天时间研究共享内存和python的multiprocessing模块。由于我将最终在linux上工作,任务似乎相当简单:系统调用fork()可以有效地处理参数的副本。我的想法是在每个ArrayDict的进程中更改它,返回对象的更改版本,并覆盖原来的对象。为了节省内存和保存复制工作,我还使用了sharedmem数组将数据存储在ArrayDict中。我知道这本词典还得照抄。在

^{pr2}$

我得到的是一个分割错误。我可以通过创建ArrayDict到{}的深度副本并将它们保存到myData中,从而避免了这个错误。我真的不明白为什么这样做是必要的,而且频繁地复制(可能非常大的)数组(while循环需要很长时间),对我来说并不是什么高效的方法。不过,至少在一定程度上起到了作用。然而,由于共享内存,我的程序在第三次迭代时有一些错误的行为。因此,我认为我的方式不是最佳的。在

我读到herehere可以使用multiprocessing.Array在共享内存上保存aribtrary numpy数组。但是,我仍然需要共享整个ArrayDict,其中特别包括一个字典,而字典又是不可选择的。在

我怎样才能有效地实现我的目标?有没有可能(也很有效)使我的对象变得可选择?在

所有解决方案都必须在64位Linux上运行Python3和完全的numpy/scipy支持。在

编辑

我发现here使用多处理的“管理器”类和用户定义的代理类可以共享任意对象。这会有效率吗?我想利用这一点,我不需要并发访问对象,即使它们不是在主进程中处理的。是否可以为我要处理的每个对象创建一个管理器?(我对经理的工作方式可能还有一些误解。)


Tags: 对象numpy字典here进程错误条目数组
1条回答
网友
1楼 · 发布于 2024-04-26 10:10:59

这似乎是一个相当复杂的类,我不能完全预料到这个解决方案在您的情况下是否有效。对于这样一个复杂的类,一个简单的折衷方法是使用^{}。在

如果这不能回答你的问题,那么最好用一个最小的、有效的例子。在

from concurrent.futures import ProcessPoolExecutor

import numpy as np

class ArrayDict ():
  keys = None
  vals = None

  def __init__ (self):
    self.keys = dict ()
    self.vals = np.random.rand (1000)

  def __str__ (self):
    return "keys: " + str(self.keys) + ", vals: " + str(self.vals.mean())

def myMethod (ad, args):
  print ("starting:", ad)


if __name__ == '__main__':
  l     = [ArrayDict() for _ in range (5)]
  args  = [2, 3, 4, 1, 3]

  with ProcessPoolExecutor (max_workers = 2) as ex:

    d = ex.map (myMethod, l, args)

对象是cloned当发送到子进程时,您需要返回结果(因为对对象的更改不会传播回主进程),并处理如何存储它们。在

Note that changes to class variables will propagate to other objects in the same process, e.g. if you have more tasks than processes, changes to class variables will be shared among the instances running in the same process. This is usually undesired behavior.

这是并行化的高级接口。ProcessPoolExecutor使用multiprocessing模块,并且只能与pickable objects一起使用。我怀疑ProcessPoolExecutor的性能与"sharing state between processes"相似。在引擎盖下,ProcessPoolExecutoris using ^{},并且应该表现出与Pool相似的性能(除了在map中使用very long iterables)。ProcessPoolExecutor似乎是python中用于并发任务的未来API。在

如果可以的话,通常使用^{}(它可以与ProcessPoolExecutor交换)更快。在这种情况下,对象在进程之间共享,的更新将传播回主线程。在

如前所述,最快的选择可能是重新构造ArrayDict,以便它只使用可以由^{}或{}表示的对象。在

如果ProcessPoolExecutor不起作用,并且您无法优化ArrayDict,那么您可能无法使用^{}。关于如何做到这一点,有很多很好的例子here。在

The greatest performance gain is often likely to be found in myMethod. And, as I mentioned, the overhead of using threads is less than that of processes.

相关问题 更多 >