如何用Python多进程创建同步对象?

2 投票
1 回答
6635 浏览
提问于 2025-04-16 11:57

我在弄清楚如何创建一个同步的Python对象时遇到了麻烦。我有一个叫做Observation的类和一个叫做Variable的类,它们的基本结构大概是这样的(代码简化了,只展示核心部分):

class Observation:
    def __init__(self, date, time_unit, id, meta):
        self.date = date
        self.time_unit = time_unit
        self.id = id
        self.count = 0
        self.data = 0

    def add(self, value):
        if isinstance(value, list):
            if self.count == 0:
                self.data = []
            self.data.append(value)
        else:
            self.data += value
        self.count += 1


class Variable:
    def __init__(self, name, time_unit, lock):
        self.name = name
        self.lock = lock
        self.obs = {}
        self.time_unit = time_unit

    def get_observation(self, id, date, meta):
        self.lock.acquire()
        try:
            obs = self.obs.get(id, Observation(date, self.time_unit, id, meta))
            self.obs[id] = obs
        finally:
            self.lock.release()
        return obs

    def add(self, date, value, meta={}):
        self.lock.acquire()
        try:
            obs = self.get_observation(id, date, meta)
            obs.add(value)
            self.obs[id] = obs
        finally:
            self.lock.release()

这是我设置多进程部分的方式:

plugin = 在其他地方定义的函数

tasks = JoinableQueue()

result = JoinableQueue()

mgr = Manager()

lock = mgr.RLock()

var = Variable('foobar', 'year', lock)

for person in persons:
    tasks.put(Task(plugin, var, person))

下面是代码应该如何工作的示例:

我有一个叫做var的Variable实例,我想给var添加一个观察值:

today = datetime.datetime.today()  
var.add(today, 1)  

所以,Variable的add函数会检查该日期是否已经存在观察值,如果存在就返回那个观察值,否则就创建一个新的Observation实例。在找到观察值后,通过调用obs.add(value)来添加实际的值。我最关心的是,确保不同的进程不会为同一天创建多个Observation实例,这就是我使用锁的原因。

一个Variable实例被创建,并在不同的进程之间共享,使用的是多进程库,它是多个Observation实例的容器。上面的代码不工作,我得到了这个错误:

RuntimeError: Lock对象应该只通过继承在进程之间共享

但是,如果我在启动不同进程之前实例化一个Lock对象,并把它传给Variable的构造函数,那么似乎会出现竞争条件,因为所有进程似乎都在互相等待。

最终目标是让不同的进程能够更新Variable对象中的obs变量。我需要确保线程安全,因为我不仅仅是在原地修改字典,还在添加新元素和增加现有变量。obs变量是一个字典,里面包含了一堆Observation实例。

我该如何同步,让多个进程共享同一个Variable实例呢?非常感谢你的帮助!

更新 1:
* 我正在使用多进程锁,并且我已经修改了源代码以展示这一点。
* 我已经更改了标题,以更准确地描述问题。
* 我把线程安全替换成了同步,因为我之前把这两个术语搞混了。

感谢Dmitry Dvoinikov的指正!

我还有一个问题不太确定:我应该在哪里实例化Lock?是在类里面,还是在初始化多进程之前并作为参数传入?答案是:应该在类外部进行。

更新 2:
* 我通过将Lock的初始化移到类定义外部并使用管理器,解决了“Lock对象应该只通过继承在进程之间共享”的错误。
* 最后一个问题是,现在一切正常,但当我把Variable实例放入队列时,它似乎没有更新,每次从队列中取出时都不包含我在上一次迭代中添加的观察值。这是唯一让我困惑的地方 :(

更新 3:
最终解决方案是将var.obs字典设置为mgr.dict()的实例,然后使用自定义序列化器。很高兴能和正在为此挣扎的人分享代码。

1 个回答

3

你提到的不是线程安全的问题,而是不同进程之间的同步,这完全是两回事。无论如何,先从这里开始。

不同的进程可以更新对象Variable中的obs变量。

这意味着Variable是在共享内存中,你必须明确地把对象存储在那里,单靠魔法是无法让本地实例被其他进程看到的。在这里:

数据可以通过Value或Array存储在共享内存映射中。

另外,你的代码片段缺少重要的导入部分。我们无法判断你是否正确地实例化了multiprocessing.Lock,而不是multithreading.Lock。你的代码没有显示你是如何创建进程并传递数据的。

因此,我建议你了解线程和进程之间的区别,看看你的应用是否真的需要一个包含多个进程的共享内存模型,并查看一下相关文档

撰写回答