Python与Redis:管理者/工作者应用的最佳实践
我有一些关于使用Python和Redis来创建一个工作队列应用程序的问题,这个应用程序可以运行异步命令。以下是我目前写的代码:
def queueCmd(cmd):
r_server.rpush("cmds", cmd)
def printCmdQueue():
print r_server.lrange("cmds", 0 , -1)
def work():
print "command being consumed: ", r_server.lpop("cmds")
return -1
def boom(info):
print "pop goes the weasel"
if __name__ == '__main__':
r_server = redis.Redis("localhost")
queueCmd("ls -la;sleep 10;ls")
queueCmd("mkdir test; sleep 20")
queueCmd("ls -la;sleep 10;ls")
queueCmd("mkdir test; sleep 20")
queueCmd("ls -la;sleep 10;ls")
queueCmd("mkdir test; sleep 20")
printCmdQueue()
pool = Pool(processes=2)
print "cnt:", +r_server.llen("cmds")
#while r_server.llen("cmds") > 0:
while True:
pool.apply_async(work, callback=boom)
if not r_server.lrange("cmds", 0, -1):
#if r_server.llen("cmds") == 0:
print "Terminate pool"
pool.terminate()
break
printCmdQueue()
首先,我是不是可以理解为,如果我需要和管理者进行沟通,我应该使用回调函数?我看到的一些简单例子是把异步调用的结果存储在一个结果变量中,然后通过result.get(timeout=1)来访问它。这里的沟通是指把一些东西放回到Redis的列表中。
编辑:如果命令是异步运行的,而我在主程序中对结果设置了超时,这会让工作者超时吗,还是仅仅让管理者内部的这个操作超时?如果只是管理者超时,那我能不能用这个方法来检查工作者的退出代码呢?
接下来,这段代码产生了以下输出:
['ls -la;sleep 10;ls', 'mkdir test; sleep 20', 'ls -la;sleep 10;ls', 'mkdir test; sleep 20', 'ls -la;sleep 10;ls', 'mkdir test; sleep 20']
command being consumed: ['mkdir test; sleep 20', 'ls -la;sleep 10;ls', 'mkdir test; sleep 20', 'ls -la;sleep 10;ls', 'mkdir test; sleep 20']
pop goes the weasel
command being consumed: ['ls -la;sleep 10;ls', 'mkdir test; sleep 20', 'ls -la;sleep 10;ls', 'mkdir test; sleep 20']
command being consumed: mkdir test; sleep 20
pop goes the weasel
pop goes the weasel
command being consumed: ['ls -la;sleep 10;ls', 'mkdir test; sleep 20']
pop goes the weasel
command being consumed: ['ls -la;sleep 10;ls', 'mkdir test; sleep 20']
command being consumed: mkdir test; sleep 20
Terminate pool
command being consumed: None
pop goes the weasel
pop goes the weasel
pop goes the weasel
[]
为什么工作者想要一次处理多个命令,即使我一个一个地从队列中取出?另外,这个过程有时候不会顺利结束,有时需要按ctrl+c来强制停止。为了解决这个问题,我清空了队列然后重新开始。我觉得这和apply_sync()有关,可能需要在工作者那边做更多的事情?
如果我把if语句改成注释掉的那个,我得到的结果是:
ValueError: invalid literal for int() with base 10: 'ls -la;sleep 10;ls'
这似乎是检查是否需要退出的更好方法,但有时这个函数返回的是字符串字面量?
如果能给我一些改进的建议,我会非常感激。我只是想做一个管理者,就像Linux机器上的一个服务/守护进程。它将用于从Redis列表中获取任务(目前是命令,但可能会有更多),并将结果返回到Redis列表中。将来,我还会有一个图形界面与这个管理者互动,以获取队列状态和返回结果。
谢谢,
编辑:
我意识到我有点搞错了。我不需要从工作者访问Redis服务器,这导致了一些错误(特别是ValueError)。
为了解决这个问题,现在的循环是:
while not r_server.llen("cmds") == 0:
cmd = r_server.lpop("cmds")
pool.apply_async(work, [cmd])
在这些代码行之后,我调用了pool.close()
。我使用了os.getpid()
和os.getppid()
来检查我确实有多个子进程在运行。
我仍然希望听到这是否听起来像是一个使用Redis创建管理者/工作者应用程序的好方法。
1 个回答
你的问题在于,你试图通过一个redis连接同时运行多个命令。
你期待的是这样的结果:
Thread 1 Thread 2
LLEN test
1
LPOP test
command
LLEN test
0
但实际上你得到的是:
Thread 1 Thread 2
LLEN test
1
LPOP test
LLEN test
command
0
结果是按顺序返回的,但没有任何东西将某个线程或命令与特定的结果关联起来。单个redis连接不是线程安全的——你需要为每个工作线程准备一个连接。
如果你不当使用管道(pipelining),也会遇到类似的问题——管道主要是为了写入操作,比如一次性往列表中添加很多项目,这样可以提高性能,因为你可以假设LPUSH成功,而不必等服务器告诉你每个项目是否成功。Redis仍然会返回结果,但这些结果不一定是最后一个发送的命令的结果。
除此之外,基本的方法是合理的。不过你可以做一些改进:
- 与其检查列表的长度,不如使用非阻塞的LPOP——如果返回null,说明列表是空的。
- 添加一个计时器,这样如果列表为空,它会等待,而不是直接发出另一个命令。
- 在while循环的条件中加入取消检查。
- 处理连接错误——我使用一个外部循环来设置,如果连接失败,工作线程会尝试重新连接(基本上是重新启动main),在合理的尝试次数内,如果还不行就彻底终止工作进程。