Python RQ: 回调模式
我现在有很多文档需要处理,正在使用Python RQ来并行化这个任务。
我想要一个工作流程,让每个文档执行不同的操作。比如说:A
-> B
-> C
,这意味着先把文档传给函数A
,等A
完成后,再进行B
,最后是C
。
不过,Python RQ似乎对这种工作流程的支持不是很好。
这里有一个简单但有点麻烦的方法来实现这个。简单来说,每个函数在工作流程中会以嵌套的方式调用下一个函数。
比如说,对于一个工作流程A
->B
->C
。
在最上层,代码是这样写的:
q.enqueue(A, the_doc)
这里的q是Queue
的实例,而在函数A
中,有这样的代码:
q.enqueue(B, the_doc)
在B
中,也有类似的代码:
q.enqueue(C, the_doc)
有没有比这个更优雅的方法呢?比如在一个函数中写一些代码:
q.enqueue(A, the_doc)
q.enqueue(B, the_doc, after = A)
q.enqueue(C, the_doc, after= B)
depends_on参数是最接近我需求的,不过,像这样运行:
A_job = q.enqueue(A, the_doc)
q.enqueue(B, depends_on=A_job )
是行不通的。因为q.enqueue(B, depends_on=A_job )
是在A_job = q.enqueue(A, the_doc)
执行后立即执行的。等到B
被加入队列时,A
的结果可能还没有准备好,因为处理需要时间。
附注:
如果Python RQ在这方面不太好用,还有什么其他工具可以在Python中实现同样的目的:
- 轮询并行化
- 支持工作流程处理
2 个回答
这里提到的depends_on参数是最接近我需求的,不过,像这样运行:
A_job = q.enqueue(A, the_doc) q.enqueue(B, depends_on=A_job )
是行不通的。因为q.enqueue(B, depends_on=A_job )会在A_job = q.enqueue(A, the_doc)执行后立刻执行。等到B被加入队列时,A的结果可能还没准备好,因为处理是需要时间的。
在这种情况下,q.enqueue(B, depends_on=A_job)会在A_job完成后再运行。如果结果还没准备好,q.enqueue(B, depends_on=A_job)会等到准备好为止。
它默认不支持这个功能,但使用其他技术是可以实现的。
在我的案例中,我使用了缓存来跟踪链中的上一个任务,这样当我们想要加入一个新函数(紧接着运行)时,就可以在调用enqueue()时正确设置它的'depends_on'参数。
注意enqueue时使用的额外参数:'timeout, result_ttl, ttl'。这些参数是因为我在rq上运行了较长的任务。你可以在python rq文档中查看它们的用法。
我使用了django_rq.enqueue(),它是基于python rq的。
# main.py
def process_job():
...
# Create a cache key for every chain of methods you want to call.
# NOTE: I used this for web development, in your case you may want
# to use a variable or a database, not caching
# Number of time to cache and keep the results in rq
TWO_HRS = 60 * 60 * 2
cache_key = 'update-data-key-%s' % obj.id
previous_job_id = cache.get(cache_key)
job = django_rq.enqueue(update_metadata,
campaign=campaign,
list=chosen_list,
depends_on=previous_job_id,
timeout=TWO_HRS,
result_ttl=TWO_HRS,
ttl=TWO_HRS)
# Set the value for the most recent finished job, so the next function
# in the chain can set the proper value for 'depends_on'
cache.set(token_key, job.id, TWO_HRS)
# utils.py
def update_metadata(campaign, list):
# Your code goes here to update the campaign object with the list object
pass
'depends_on' - 来自rq文档:
depends_on - 指定另一个任务(或任务ID),该任务必须完成后,这个任务才能被加入队列。
当 B 被放入队列时,A 的结果可能还没准备好,因为处理需要时间。
我不确定你最开始提问时这是否是真的,但现在来看,这个说法是不对的。实际上,depends_on
功能正是为了你描述的工作流程而设计的。
确实,这两个函数会立即一个接一个地执行。
A_job = q.enqueue(A, the_doc)
B_job = q.enqueue(B, depends_on=A_job )
但是,工作者不会在 A 完成之前执行 B
。在 A_job
成功执行之前,B.status == 'deferred'
。一旦 A.status == 'finished'
,那么 B
就会开始运行。
这意味着 B
和 C
可以像这样访问和操作它们依赖的结果:
import time
from rq import Queue, get_current_job
from redis import StrictRedis
conn = StrictRedis()
q = Queue('high', connection=conn)
def A():
time.sleep(100)
return 'result A'
def B():
time.sleep(100)
current_job = get_current_job(conn)
a_job_id = current_job.dependencies[0].id
a_job_result = q.fetch_job(a_job_id).result
assert a_job_result == 'result A'
return a_job_result + ' result B'
def C():
time.sleep(100)
current_job = get_current_job(conn)
b_job_id = current_job.dependencies[0].id
b_job_result = q.fetch_job(b_job_id).result
assert b_job_result == 'result A result B'
return b_job_result + ' result C'
最终,工作者会打印出 'result A result B result C'
。
另外,如果你队列里有很多任务,而 B
可能需要等一段时间才能执行,你可能想要大幅增加 result_ttl
,或者设置为无限期 result_ttl=-1
。否则,A 的结果会在设置的 result_ttl
秒数后被清除,这样 B
就无法访问到它,无法返回想要的结果。
不过,设置 result_ttl=-1
会对内存有重要影响。这意味着你的任务结果将永远不会被自动清除,内存会随着时间的推移而不断增加,直到你手动从 redis 中删除这些结果。