Python的分布式锁管理器
我有一堆服务器,每个服务器上有多个实例在访问一个资源,而这个资源每秒请求的次数是有限制的。
我需要一个机制来锁定所有正在运行的服务器和实例对这个资源的访问。
我在GitHub上找到一个叫做“restful分布式锁管理器”的工具:https://github.com/thefab/restful-distributed-lock-manager
不过,这个工具似乎有一个最小锁定时间是1秒,而且它的可靠性比较差。在几次测试中,解锁一个1秒的锁竟然需要1到3秒不等。
有没有什么经过良好测试的、可以用Python接口的工具可以用来实现这个功能?
补充说明:我需要的工具能够在1秒内自动解锁。我的代码中不会释放这个锁。
5 个回答
分布式锁管理器 Taooka http://taooka.com 的时间到期精度可以达到纳秒级别。不过,它目前只提供了 Golang 的客户端库。
你的需求看起来很具体。我建议你可以先写一个简单的锁服务器,然后在客户端实现锁的功能。可以用一个类来实现,当这个类被创建的时候就获取一个锁,当这个类不再使用时就释放这个锁。
class Lock(object):
def __init__(self,resource):
print "Lock acquired for",resource
# Connect to lock server and acquire resource
def __del__(self):
print "Lock released"
# Connect to lock server and unlock resource if locked
def callWithLock(resource,call,*args,**kwargs):
lock = Lock(resource)
return call( *args, **kwargs )
def test( asdf, something="Else" ):
return asdf + " " + something
if __name__ == "__main__":
import sys
print "Calling test:",callWithLock( "resource.test", test, sys.argv[0] )
示例输出
$ python locktest.py
Calling test: Lock acquired for resource.test
Lock released
locktest.py Else
我在我的集群中使用ZooKeeper,并且用python-kazoo库来处理队列和锁。
这是从kazoo的API文档中修改过来的一个例子,供你参考: http://kazoo.readthedocs.org/en/latest/api/recipe/lock.html
zk = KazooClient()
lock = zk.Lock("/lockpath", "my-identifier")
if lock.acquire(timeout=1):
code here
lock.release()
不过我记得,使用ZooKeeper至少需要三个节点。
我最开始的想法是用Redis,但还有很多其他不错的工具,有些甚至更轻便,所以我选择了基于zmq的解决方案。这样你就不需要运行Redis,只需运行一个小的Python脚本就可以了。
需求回顾
在描述解决方案之前,让我先回顾一下你的需求。
限制对某个资源的请求数量,在固定时间段内只能有一定数量的请求。
自动解锁。
资源的自动解锁应该在1秒钟以内完成。
这个系统应该是分布式的。我假设你是指多个分布式服务器可以使用某个资源,并且只需要一个锁服务(更多内容在结论部分)。
概念
在时间段内限制请求数量
时间段可以是一秒、几秒或更短的时间。唯一的限制是Python中时间测量的精度。
如果你的资源每秒有一个硬性限制,你应该使用1.0秒的时间段。
监控每个时间段内的请求数量,直到下一个时间段开始
在第一次请求访问你的资源时,设置下一个时间段的开始时间,并初始化请求计数器。
每次请求时,增加当前时间段的请求计数器,除非你已经达到了当前时间段允许的最大请求数量。
使用zmq提供服务,采用REQ/REP模式
你的服务器可以分布在多台计算机上。为了提供对LockerServer的访问,你将使用zmq。
示例代码
zmqlocker.py:
import time
import zmq
class Locker():
def __init__(self, max_requests=1, in_seconds=1.0):
self.max_requests = max_requests
self.in_seconds = in_seconds
self.requests = 0
now = time.time()
self.next_slot = now + in_seconds
def __iter__(self):
return self
def next(self):
now = time.time()
if now > self.next_slot:
self.requests = 0
self.next_slot = now + self.in_seconds
if self.requests < self.max_requests:
self.requests += 1
return "go"
else:
return "sorry"
class LockerServer():
def __init__(self, max_requests=1, in_seconds=1.0, url="tcp://*:7777"):
locker=Locker(max_requests, in_seconds)
cnt = zmq.Context()
sck = cnt.socket(zmq.REP)
sck.bind(url)
while True:
msg = sck.recv()
sck.send(locker.next())
class LockerClient():
def __init__(self, url="tcp://localhost:7777"):
cnt = zmq.Context()
self.sck = cnt.socket(zmq.REQ)
self.sck.connect(url)
def next(self):
self.sck.send("let me go")
return self.sck.recv()
运行你的服务器:
run_server.py:
from zmqlocker import LockerServer
svr = LockerServer(max_requests=5, in_seconds=0.8)
在命令行中:
$ python run_server.py
这将会在本地主机的默认端口7777上启动锁服务。
运行你的客户端
run_client.py:
from zmqlocker import LockerClient
import time
locker_cli = LockerClient()
for i in xrange(100):
print time.time(), locker_cli.next()
time.sleep(0.1)
在命令行中:
$ python run_client.py
你应该会看到“go”,“go”,“sorry”等响应打印出来。
试着运行更多的客户端。
进行一些压力测试
你可以先启动客户端,然后再启动服务器。客户端会在服务器启动之前被阻塞,等服务器启动后就会正常运行。
结论
- 所描述的需求都得到了满足
- 请求数量得到了限制
- 不需要手动解锁,只要下一个时间段可用,就可以允许更多请求
- LockerService可以通过网络或本地套接字访问。
- 这个系统应该是可靠的,zmq是一个成熟的解决方案,Python代码相对简单
- 不需要在所有参与者之间进行时间同步
- 性能会非常好
另一方面,你可能会发现资源的限制并不像你想的那么可预测,所以要准备好调整参数,以找到合适的平衡,并随时准备应对意外情况。
在提供“锁”的过程中也有一些优化空间——例如,如果锁的请求用完了,但当前时间段已经快结束了,你可以考虑稍等一下再说“抱歉”,然后在短暂的时间后再提供“可以”。
扩展到真正的分布式锁管理器
所谓“分布式”,我们也可以理解为多个锁服务器一起运行。这会更复杂,但也是可能的。zmq允许非常简单地连接到多个网址,因此客户端可以很容易地连接到多个锁服务器。关键在于如何协调锁服务器,以避免对资源的请求过多。zmq支持服务器间的通信。一种模型是,每个锁服务器会在PUB/SUB上发布每个提供的“可以”信号。所有其他锁服务器都会订阅,并利用每个“可以”信号来增加它们本地的请求计数器(逻辑稍作修改)。