Python RQ: 回调模式

6 投票
2 回答
4846 浏览
提问于 2025-04-18 10:17

我现在有很多文档需要处理,正在使用Python RQ来并行化这个任务。

我想要一个工作流程,让每个文档执行不同的操作。比如说:A -> B -> C,这意味着先把文档传给函数A,等A完成后,再进行B,最后是C

不过,Python RQ似乎对这种工作流程的支持不是很好。

这里有一个简单但有点麻烦的方法来实现这个。简单来说,每个函数在工作流程中会以嵌套的方式调用下一个函数。

比如说,对于一个工作流程A->B->C

在最上层,代码是这样写的:

q.enqueue(A, the_doc)

这里的q是Queue的实例,而在函数A中,有这样的代码:

q.enqueue(B, the_doc)

B中,也有类似的代码:

q.enqueue(C, the_doc)

有没有比这个更优雅的方法呢?比如在一个函数中写一些代码:

q.enqueue(A, the_doc) q.enqueue(B, the_doc, after = A) q.enqueue(C, the_doc, after= B)

depends_on参数是最接近我需求的,不过,像这样运行:

A_job = q.enqueue(A, the_doc) q.enqueue(B, depends_on=A_job )

是行不通的。因为q.enqueue(B, depends_on=A_job )是在A_job = q.enqueue(A, the_doc)执行后立即执行的。等到B被加入队列时,A的结果可能还没有准备好,因为处理需要时间。

附注:

如果Python RQ在这方面不太好用,还有什么其他工具可以在Python中实现同样的目的:

  1. 轮询并行化
  2. 支持工作流程处理

2 个回答

0

这里提到的depends_on参数是最接近我需求的,不过,像这样运行:

A_job = q.enqueue(A, the_doc) q.enqueue(B, depends_on=A_job )

是行不通的。因为q.enqueue(B, depends_on=A_job )会在A_job = q.enqueue(A, the_doc)执行后立刻执行。等到B被加入队列时,A的结果可能还没准备好,因为处理是需要时间的。

在这种情况下,q.enqueue(B, depends_on=A_job)会在A_job完成后再运行。如果结果还没准备好,q.enqueue(B, depends_on=A_job)会等到准备好为止。


它默认不支持这个功能,但使用其他技术是可以实现的。

在我的案例中,我使用了缓存来跟踪链中的上一个任务,这样当我们想要加入一个新函数(紧接着运行)时,就可以在调用enqueue()时正确设置它的'depends_on'参数。

注意enqueue时使用的额外参数:'timeout, result_ttl, ttl'。这些参数是因为我在rq上运行了较长的任务。你可以在python rq文档中查看它们的用法。

我使用了django_rq.enqueue(),它是基于python rq的。

    # main.py
    def process_job():
        ...

        # Create a cache key for every chain of methods you want to call.
        # NOTE: I used this for web development, in your case you may want
        # to use a variable or a database, not caching

        # Number of time to cache and keep the results in rq
        TWO_HRS = 60 * 60 * 2

        cache_key = 'update-data-key-%s' % obj.id
        previous_job_id = cache.get(cache_key)
        job = django_rq.enqueue(update_metadata,
                                campaign=campaign,
                                list=chosen_list,
                                depends_on=previous_job_id,
                                timeout=TWO_HRS,
                                result_ttl=TWO_HRS,
                                ttl=TWO_HRS)

        # Set the value for the most recent finished job, so the next function
        # in the chain can set the proper value for 'depends_on'
        cache.set(token_key, job.id, TWO_HRS)

    # utils.py
    def update_metadata(campaign, list):
        # Your code goes here to update the campaign object with the list object
        pass

'depends_on' - 来自rq文档

depends_on - 指定另一个任务(或任务ID),该任务必须完成后,这个任务才能被加入队列。

4

当 B 被放入队列时,A 的结果可能还没准备好,因为处理需要时间。

我不确定你最开始提问时这是否是真的,但现在来看,这个说法是不对的。实际上,depends_on 功能正是为了你描述的工作流程而设计的。

确实,这两个函数会立即一个接一个地执行。

A_job = q.enqueue(A, the_doc)
B_job = q.enqueue(B, depends_on=A_job )

但是,工作者不会在 A 完成之前执行 B。在 A_job 成功执行之前,B.status == 'deferred'。一旦 A.status == 'finished',那么 B 就会开始运行。

这意味着 BC 可以像这样访问和操作它们依赖的结果:

import time
from rq import Queue, get_current_job
from redis import StrictRedis

conn = StrictRedis()
q = Queue('high', connection=conn)

def A():
    time.sleep(100)
    return 'result A'

def B():
    time.sleep(100)
    current_job = get_current_job(conn)
    a_job_id = current_job.dependencies[0].id
    a_job_result = q.fetch_job(a_job_id).result
    assert a_job_result == 'result A'
    return a_job_result + ' result B'


def C():
    time.sleep(100)
    current_job = get_current_job(conn)
    b_job_id = current_job.dependencies[0].id
    b_job_result = q.fetch_job(b_job_id).result
    assert b_job_result == 'result A result B'
    return b_job_result + ' result C'

最终,工作者会打印出 'result A result B result C'

另外,如果你队列里有很多任务,而 B 可能需要等一段时间才能执行,你可能想要大幅增加 result_ttl,或者设置为无限期 result_ttl=-1。否则,A 的结果会在设置的 result_ttl 秒数后被清除,这样 B 就无法访问到它,无法返回想要的结果。

不过,设置 result_ttl=-1 会对内存有重要影响。这意味着你的任务结果将永远不会被自动清除,内存会随着时间的推移而不断增加,直到你手动从 redis 中删除这些结果。

撰写回答