Redis Queue:如何防止链式作业异步运行

2024-04-29 02:15:07 发布

您现在位置:Python中文网/ 问答频道 /正文

我正试着让3个作业按顺序互相跟踪:

Job1 -> Job2 -> Job3

这3个作业在operations.py中定义:

def Job1(x):
    return x

def Job2(x):
    return x * x

def Job3(x):
    print(x)

我在script.py中调用这些作业,运行rq worker

from redis import Redis
from rq import Queue
from operation import Job1, Job2, Job3

redis_conn = Redis()
q = Queue(connection=redis_conn)

for num in [1,2,3,4,5,6,7,8]:
    j1 = q.enqueue(Job1, num)
    j2 = q.enqueue(Job2, j1.result, depends_on = j1)
    j3 = q.enqueue(Job3, depends_on = j2)

根据documentation,我希望j3等待j2,而j2又应该等待j1完成执行。但是,这可能不会发生。作业似乎是异步运行的。我这么说是因为redis工作人员将此作为一个错误:

File "./operation.py", line 5, in Job2
    return x * x
TypeError: unsupported operand type(s) for *: 'NoneType' and 'NoneType'

j2没有等待j1的结果,而是异步启动,因为此时j1的结果还没有准备好,所以j1.result是None,传递给j2。我的方法有什么问题?为什么作业不按顺序运行?你知道吗


Tags: frompyimportredisreturn顺序def作业