如何在多进程中同步Python字典
我正在使用Python 2.6和多进程模块来进行多线程编程。现在我想要一个可以同步的字典(我真正需要的唯一原子操作是对某个值进行 += 操作)。
我应该用 multiprocessing.sharedctypes.synchronized() 来包装这个字典吗?还是有其他更好的方法呢?
4 个回答
关于解决同时写入问题的合适方案,我做了一些快速研究,发现这篇文章提到了一种使用锁或信号量的解决方法。(http://effbot.org/zone/thread-synchronization.htm)
虽然这个例子并没有特别针对字典,但我相信你可以根据这个思路编写一个类来帮助你处理字典。
如果我需要以线程安全的方式实现类似的功能,我可能会使用Python的信号量解决方案。(假设我之前的合并方法不奏效。)我认为信号量通常会因为其阻塞特性而降低线程的效率。
来自该网站的说明:
信号量是一种更高级的锁机制。信号量有一个内部计数器,而不是一个锁标志,只有当尝试获取信号量的线程数量超过设定的上限时,它才会阻塞。根据信号量的初始化方式,这允许多个线程同时访问同一段代码。
semaphore = threading.BoundedSemaphore()
semaphore.acquire() # decrements the counter
... access the shared resource; work with dictionary, add item or whatever.
semaphore.release() # increments the counter
我会专门开一个进程来维护“共享字典”。可以使用比如 xmlrpclib 这样的工具,把这小部分代码提供给其他进程。你可以通过 xmlrpclib 暴露一个函数,这个函数接收 key, increment
来进行加法操作,另一个函数只接收 key
并返回对应的值。具体的细节(比如缺少的键是否有默认值等)可以根据你应用的需求来决定。
然后,你可以用任何你喜欢的方法来实现这个专门的共享字典进程:可以是一个简单的单线程服务器,内存中有一个简单的字典,也可以是一个简单的 sqlite 数据库等等。我建议你从“尽可能简单”的代码开始(这取决于你是否需要一个持久化的共享字典,或者持久化对你来说并不重要),然后再根据需要进行测量和优化。
简介
这里有很多人给出的建议,但没有实际的例子。这里的回答中没有一个提到使用多进程,这让我感到很失望。作为Python的爱好者,我们应该支持自己的内置库。虽然并行处理和同步从来都不是简单的事情,但我相信通过合理的设计可以让它变得简单。在现代多核架构中,这一点变得非常重要,不能被忽视!不过,我对多进程库并不满意,因为它还处于初级阶段,有很多问题和缺陷,而且它更倾向于函数式编程(我对此很反感)。目前,我还是更喜欢Pyro模块,因为多进程在服务器运行时无法共享新创建的对象,这个限制太严重了。管理器对象的“register”类方法只能在管理器(或其服务器)启动之前注册对象。说了这么多,接下来是代码:
Server.py
from multiprocessing.managers import SyncManager
class MyManager(SyncManager):
pass
syncdict = {}
def get_dict():
return syncdict
if __name__ == "__main__":
MyManager.register("syncdict", get_dict)
manager = MyManager(("127.0.0.1", 5000), authkey="password")
manager.start()
raw_input("Press any key to kill server".center(50, "-"))
manager.shutdown()
在上面的代码示例中,Server.py使用了多进程的SyncManager,它可以提供同步的共享对象。这个代码在解释器中运行时不会工作,因为多进程库对如何找到每个注册对象的“可调用”非常敏感。运行Server.py会启动一个定制的SyncManager,它共享syncdict字典,以供多个进程使用,并且可以连接到同一台机器上的客户端,或者如果在其他IP地址上运行,也可以连接到其他机器。在这个例子中,服务器在回环地址(127.0.0.1)上运行,端口是5000。使用authkey参数可以在操作syncdict时建立安全连接。当按下任何键时,管理器会关闭。
Client.py
from multiprocessing.managers import SyncManager
import sys, time
class MyManager(SyncManager):
pass
MyManager.register("syncdict")
if __name__ == "__main__":
manager = MyManager(("127.0.0.1", 5000), authkey="password")
manager.connect()
syncdict = manager.syncdict()
print "dict = %s" % (dir(syncdict))
key = raw_input("Enter key to update: ")
inc = float(raw_input("Enter increment: "))
sleep = float(raw_input("Enter sleep time (sec): "))
try:
#if the key doesn't exist create it
if not syncdict.has_key(key):
syncdict.update([(key, 0)])
#increment key value every sleep seconds
#then print syncdict
while True:
syncdict.update([(key, syncdict.get(key) + inc)])
time.sleep(sleep)
print "%s" % (syncdict)
except KeyboardInterrupt:
print "Killed client"
客户端也必须创建一个定制的SyncManager,注册“syncdict”,这次不需要传入可调用对象来获取共享字典。然后,它使用定制的SyncManager通过回环IP地址(127.0.0.1)在5000端口连接,并使用authkey建立与Server.py中启动的管理器的安全连接。它通过调用管理器上注册的可调用对象来获取共享字典syncdict。接下来,它会提示用户输入以下内容:
- 要操作的syncdict中的键
- 每次循环中要增加的值
- 每次循环中要休眠的时间(以秒为单位)
客户端会检查这个键是否存在。如果不存在,它会在syncdict中创建这个键。然后,客户端进入一个“无尽”的循环,不断更新这个键的值,增加指定的值,休眠指定的时间,并打印syncdict,直到发生键盘中断(Ctrl+C)。
烦人的问题
- 管理器的注册方法必须在管理器启动之前调用,否则即使在管理器上调用dir也会显示确实有注册的方法,但仍会出现异常。
- 对字典的所有操作必须使用方法,而不是直接赋值(例如,syncdict["blast"] = 2会失败,因为多进程共享自定义对象的方式)。
- 使用SyncManager的dict方法可以缓解烦人的问题#2,但烦人的问题#1又阻止了SyncManager.dict()返回的代理被注册和共享。(SyncManager.dict()只能在管理器启动后调用,而register只能在管理器启动前工作,因此SyncManager.dict()只有在进行函数式编程并将代理作为参数传递给进程时才有用,就像文档示例所示的那样。)
- 服务器和客户端都必须注册,尽管直觉上看起来客户端在连接到管理器后应该能自动识别(请将此添加到你的愿望清单中,多进程开发者们)。
总结
我希望你和我一样喜欢这个相当详细且稍微耗时的回答。我一直很困惑,为什么在多进程模块上这么挣扎,而Pyro却能轻松搞定,现在多亏了这个回答,我终于明白了。我希望这对Python社区有帮助,能改善多进程模块,因为我相信它有很大的潜力,但在初级阶段却未能发挥出应有的效果。尽管有这些烦人的问题,我认为这仍然是一个相当可行的替代方案,而且相对简单。你也可以使用SyncManager.dict()并像文档中展示的那样将其作为参数传递给进程,这可能会是一个更简单的解决方案,具体取决于你的需求,只是对我来说感觉不太自然。