如何用Python多进程创建同步对象?
我在弄清楚如何创建一个同步的Python对象时遇到了麻烦。我有一个叫做Observation的类和一个叫做Variable的类,它们的基本结构大概是这样的(代码简化了,只展示核心部分):
class Observation:
def __init__(self, date, time_unit, id, meta):
self.date = date
self.time_unit = time_unit
self.id = id
self.count = 0
self.data = 0
def add(self, value):
if isinstance(value, list):
if self.count == 0:
self.data = []
self.data.append(value)
else:
self.data += value
self.count += 1
class Variable:
def __init__(self, name, time_unit, lock):
self.name = name
self.lock = lock
self.obs = {}
self.time_unit = time_unit
def get_observation(self, id, date, meta):
self.lock.acquire()
try:
obs = self.obs.get(id, Observation(date, self.time_unit, id, meta))
self.obs[id] = obs
finally:
self.lock.release()
return obs
def add(self, date, value, meta={}):
self.lock.acquire()
try:
obs = self.get_observation(id, date, meta)
obs.add(value)
self.obs[id] = obs
finally:
self.lock.release()
这是我设置多进程部分的方式:
plugin = 在其他地方定义的函数
tasks = JoinableQueue()
result = JoinableQueue()
mgr = Manager()
lock = mgr.RLock()
var = Variable('foobar', 'year', lock)
for person in persons:
tasks.put(Task(plugin, var, person))
下面是代码应该如何工作的示例:
我有一个叫做var的Variable实例,我想给var添加一个观察值:
today = datetime.datetime.today()
var.add(today, 1)
所以,Variable的add函数会检查该日期是否已经存在观察值,如果存在就返回那个观察值,否则就创建一个新的Observation实例。在找到观察值后,通过调用obs.add(value)来添加实际的值。我最关心的是,确保不同的进程不会为同一天创建多个Observation实例,这就是我使用锁的原因。
一个Variable实例被创建,并在不同的进程之间共享,使用的是多进程库,它是多个Observation实例的容器。上面的代码不工作,我得到了这个错误:
RuntimeError: Lock对象应该只通过继承在进程之间共享
但是,如果我在启动不同进程之前实例化一个Lock对象,并把它传给Variable的构造函数,那么似乎会出现竞争条件,因为所有进程似乎都在互相等待。
最终目标是让不同的进程能够更新Variable对象中的obs变量。我需要确保线程安全,因为我不仅仅是在原地修改字典,还在添加新元素和增加现有变量。obs变量是一个字典,里面包含了一堆Observation实例。
我该如何同步,让多个进程共享同一个Variable实例呢?非常感谢你的帮助!
更新 1:
* 我正在使用多进程锁,并且我已经修改了源代码以展示这一点。
* 我已经更改了标题,以更准确地描述问题。
* 我把线程安全替换成了同步,因为我之前把这两个术语搞混了。
感谢Dmitry Dvoinikov的指正!
我还有一个问题不太确定:我应该在哪里实例化Lock?是在类里面,还是在初始化多进程之前并作为参数传入?答案是:应该在类外部进行。
更新 2:
* 我通过将Lock的初始化移到类定义外部并使用管理器,解决了“Lock对象应该只通过继承在进程之间共享”的错误。
* 最后一个问题是,现在一切正常,但当我把Variable实例放入队列时,它似乎没有更新,每次从队列中取出时都不包含我在上一次迭代中添加的观察值。这是唯一让我困惑的地方 :(
更新 3:
最终解决方案是将var.obs字典设置为mgr.dict()的实例,然后使用自定义序列化器。很高兴能和正在为此挣扎的人分享代码。
1 个回答
你提到的不是线程安全的问题,而是不同进程之间的同步,这完全是两回事。无论如何,先从这里开始。
不同的进程可以更新对象Variable中的obs变量。
这意味着Variable是在共享内存中,你必须明确地把对象存储在那里,单靠魔法是无法让本地实例被其他进程看到的。在这里:
数据可以通过Value或Array存储在共享内存映射中。
另外,你的代码片段缺少重要的导入部分。我们无法判断你是否正确地实例化了multiprocessing.Lock,而不是multithreading.Lock。你的代码没有显示你是如何创建进程并传递数据的。
因此,我建议你了解线程和进程之间的区别,看看你的应用是否真的需要一个包含多个进程的共享内存模型,并查看一下相关文档。