Python与Redis:管理者/工作者应用的最佳实践

1 投票
1 回答
3098 浏览
提问于 2025-04-16 19:44

我有一些关于使用Python和Redis来创建一个工作队列应用程序的问题,这个应用程序可以运行异步命令。以下是我目前写的代码:

def queueCmd(cmd):
    r_server.rpush("cmds", cmd)

def printCmdQueue():
    print r_server.lrange("cmds", 0 , -1)

def work():
    print "command being consumed: ", r_server.lpop("cmds")
    return -1

def boom(info):
    print "pop goes the weasel"

if __name__ == '__main__':

    r_server = redis.Redis("localhost")

    queueCmd("ls -la;sleep 10;ls")
    queueCmd("mkdir test; sleep 20")
    queueCmd("ls -la;sleep 10;ls")
    queueCmd("mkdir test; sleep 20")
    queueCmd("ls -la;sleep 10;ls")
    queueCmd("mkdir test; sleep 20")

    printCmdQueue()

    pool = Pool(processes=2)

    print "cnt:", +r_server.llen("cmds")
    #while r_server.llen("cmds") > 0:
    while True:
        pool.apply_async(work, callback=boom)
        if not r_server.lrange("cmds", 0, -1):
        #if r_server.llen("cmds") == 0:
            print "Terminate pool"
            pool.terminate()
            break

    printCmdQueue()

首先,我是不是可以理解为,如果我需要和管理者进行沟通,我应该使用回调函数?我看到的一些简单例子是把异步调用的结果存储在一个结果变量中,然后通过result.get(timeout=1)来访问它。这里的沟通是指把一些东西放回到Redis的列表中。

编辑:如果命令是异步运行的,而我在主程序中对结果设置了超时,这会让工作者超时吗,还是仅仅让管理者内部的这个操作超时?如果只是管理者超时,那我能不能用这个方法来检查工作者的退出代码呢?

接下来,这段代码产生了以下输出:

['ls -la;sleep 10;ls', 'mkdir test; sleep 20', 'ls -la;sleep 10;ls', 'mkdir test; sleep 20', 'ls -la;sleep 10;ls', 'mkdir test; sleep 20']
command being consumed:  ['mkdir test; sleep 20', 'ls -la;sleep 10;ls', 'mkdir test; sleep 20', 'ls -la;sleep 10;ls', 'mkdir test; sleep 20']
pop goes the weasel
command being consumed:  ['ls -la;sleep 10;ls', 'mkdir test; sleep 20', 'ls -la;sleep 10;ls', 'mkdir test; sleep 20']
command being consumed:  mkdir test; sleep 20
pop goes the weasel
pop goes the weasel
command being consumed:  ['ls -la;sleep 10;ls', 'mkdir test; sleep 20']
pop goes the weasel
command being consumed:  ['ls -la;sleep 10;ls', 'mkdir test; sleep 20']
command being consumed:  mkdir test; sleep 20
Terminate pool
command being consumed:  None
 pop goes the weasel
pop goes the weasel
pop goes the weasel
[]

为什么工作者想要一次处理多个命令,即使我一个一个地从队列中取出?另外,这个过程有时候不会顺利结束,有时需要按ctrl+c来强制停止。为了解决这个问题,我清空了队列然后重新开始。我觉得这和apply_sync()有关,可能需要在工作者那边做更多的事情?

如果我把if语句改成注释掉的那个,我得到的结果是:

ValueError: invalid literal for int() with base 10: 'ls -la;sleep 10;ls'

这似乎是检查是否需要退出的更好方法,但有时这个函数返回的是字符串字面量?

如果能给我一些改进的建议,我会非常感激。我只是想做一个管理者,就像Linux机器上的一个服务/守护进程。它将用于从Redis列表中获取任务(目前是命令,但可能会有更多),并将结果返回到Redis列表中。将来,我还会有一个图形界面与这个管理者互动,以获取队列状态和返回结果。

谢谢,

编辑:

我意识到我有点搞错了。我不需要从工作者访问Redis服务器,这导致了一些错误(特别是ValueError)。

为了解决这个问题,现在的循环是:

while not r_server.llen("cmds") == 0:
    cmd = r_server.lpop("cmds")
    pool.apply_async(work, [cmd])

在这些代码行之后,我调用了pool.close()。我使用了os.getpid()os.getppid()来检查我确实有多个子进程在运行。

我仍然希望听到这是否听起来像是一个使用Redis创建管理者/工作者应用程序的好方法。

1 个回答

2

你的问题在于,你试图通过一个redis连接同时运行多个命令。

你期待的是这样的结果:

Thread 1     Thread 2
LLEN test    
1                            
LPOP test   
command      
             LLEN test
             0

但实际上你得到的是:

Thread 1     Thread 2
LLEN test    
1                            
LPOP test   
             LLEN test
             command
0

结果是按顺序返回的,但没有任何东西将某个线程或命令与特定的结果关联起来。单个redis连接不是线程安全的——你需要为每个工作线程准备一个连接。

如果你不当使用管道(pipelining),也会遇到类似的问题——管道主要是为了写入操作,比如一次性往列表中添加很多项目,这样可以提高性能,因为你可以假设LPUSH成功,而不必等服务器告诉你每个项目是否成功。Redis仍然会返回结果,但这些结果不一定是最后一个发送的命令的结果。

除此之外,基本的方法是合理的。不过你可以做一些改进:

  • 与其检查列表的长度,不如使用非阻塞的LPOP——如果返回null,说明列表是空的。
  • 添加一个计时器,这样如果列表为空,它会等待,而不是直接发出另一个命令。
  • 在while循环的条件中加入取消检查。
  • 处理连接错误——我使用一个外部循环来设置,如果连接失败,工作线程会尝试重新连接(基本上是重新启动main),在合理的尝试次数内,如果还不行就彻底终止工作进程。

撰写回答