Django、Celery 递归与 Twitter API

3 投票
1 回答
1151 浏览
提问于 2025-04-17 16:48

我正在使用Django 1.4和Celery 3.0(rabbitmq)来构建一个任务集合,用于从Twitter API 1.1获取和缓存查询。我想实现的一件事是任务链,其中最后一个任务会根据到目前为止的响应和最近获取的响应数据,递归调用前面两个任务。这具体来说,可以让应用程序遍历用户的时间线(最多3200条推文),因为每次请求最多只能返回200条推文(这是Twitter API的限制)。

我的tasks.py的关键部分可以在这里看到,但在粘贴之前,我会展示我在Python命令行中调用的任务链(最终会通过用户输入在最终的网页应用中启动)。给定:

>>request(twitter_user_id='#1010101010101#, 
  total_requested=1000, 
  max_id = random.getrandbits(128) #e.g. arbitrarily large number)

我调用:

>> res = (twitter_getter.s(request) | 
        pre_get_tweets_for_user_id.s() | 
        get_tweets_for_user_id.s() | 
        timeline_recursor.s()).apply_async()

关键点是,timeline_recursor可以启动一个可变数量的get_tweets_for_user_id子任务。当timeline_recursor处于基本情况时,它应该返回一个响应字典,如这里定义的:

@task(rate_limit=None)
def timeline_recursor(request):
    previous_tweets=request.get('previous_tweets', None) #If it's the first time through, this will be None
    if not previous_tweets:
        previous_tweets = [] #so we initiate to empty array
    tweets = request.get('tweets', None) 

    twitter_user_id=request['twitter_user_id']
    previous_max_id=request['previous_max_id']
    total_requested=request['total_requested']
    pulled_in=request['pulled_in']

    remaining_requested = total_requested - pulled_in
    if previous_max_id:
        remaining_requested += 1 #this is because cursored results will always have one overlapping id

    else:
        previous_max_id = random.getrandbits(128) # for first time through loop

    new_max_id = min([tweet['id'] for tweet in tweets])
    test = lambda x, y: x<y

    if remaining_requested < 0:  #because we overshoot by requesting batches of 200
        remaining_requested = 0

    if tweets:
        previous_tweets.extend(tweets)

    if tweets and remaining_requested and (pulled_in > 1) and test(new_max_id, previous_max_id):

        request = dict(user_pk=user_pk,
                    twitter_user_id=twitter_user_id,
                    max_id = new_max_id,
                    total_requested = remaining_requested,
                    tweets=previous_tweets)

        #problem happens in this part of the logic???

        response = (twitter_getter_config.s(request) | get_tweets_for_user_id.s() | timeline_recursor.s()).apply_async()

    else: #if in base case, combine all tweets pulled in thus far and send back as "tweets" -- to be 
          #saved in db or otherwise consumed
        response = dict(
                    twitter_user_id=twitter_user_id,
                    total_requested = total_requested,
                    tweets=previous_tweets)
    return response

因此,我期望的res.result的响应是一个字典,包含Twitter用户ID、请求的推文数量和通过连续调用获取的推文集合。

不过,在递归任务的世界里并不太顺利。当我运行上面提到的任务链时,如果我在启动链后立即输入res.status,它会显示“成功”,尽管在我的celery工作者的日志视图中,我可以看到递归调用Twitter API的链式调用正在按预期进行,并且参数也是正确的。我还可以在链式任务执行时立即运行result.result。res.result返回的是一个AsyncResponse实例ID。即使递归链式任务已经完成,res.result仍然保持为AsyncResult ID。

另一方面,我可以通过访问res.result.result.result.result['tweets']来获取完整的推文集合。我可以推断出每个链式子任务确实在执行,只是我不明白为什么res.result没有预期的结果。当timeline_recursor达到基本情况时,应该发生的递归返回似乎没有按预期传播。

有什么想法可以解决这个问题吗?在Celery中使用递归可以非常强大,但至少对我来说,如何理解使用Celery的递归和递归函数,以及这如何影响链式任务中的返回逻辑,并不是完全清楚。

如有需要,我很乐意进一步解释,感谢任何建议。

1 个回答

1

什么是 apply_async 返回的内容(对象类型)?

我对 Celery 不太了解,但在 Twisted 和其他很多异步框架中,像这样的调用通常会立即返回(通常是 True 或者一个可以跟踪状态的对象),因为任务会被放入队列中等待处理。

再次强调,我对 Celery 不太熟悉,我猜测发生的情况是:

你是:立刻将 response 定义为异步的延迟 task,但却试图在结果还没出来的时候就对它进行操作。

你应该:定义一个 callback 函数,在任务完成后对结果进行处理并返回一个值。

查看 Celery 的文档,apply_async 通过 link 接受回调,但我没有找到任何人尝试从中捕获返回值的例子。

撰写回答