Django、Celery 递归与 Twitter API
我正在使用Django 1.4和Celery 3.0(rabbitmq)来构建一个任务集合,用于从Twitter API 1.1获取和缓存查询。我想实现的一件事是任务链,其中最后一个任务会根据到目前为止的响应和最近获取的响应数据,递归调用前面两个任务。这具体来说,可以让应用程序遍历用户的时间线(最多3200条推文),因为每次请求最多只能返回200条推文(这是Twitter API的限制)。
我的tasks.py的关键部分可以在这里看到,但在粘贴之前,我会展示我在Python命令行中调用的任务链(最终会通过用户输入在最终的网页应用中启动)。给定:
>>request(twitter_user_id='#1010101010101#,
total_requested=1000,
max_id = random.getrandbits(128) #e.g. arbitrarily large number)
我调用:
>> res = (twitter_getter.s(request) |
pre_get_tweets_for_user_id.s() |
get_tweets_for_user_id.s() |
timeline_recursor.s()).apply_async()
关键点是,timeline_recursor可以启动一个可变数量的get_tweets_for_user_id子任务。当timeline_recursor处于基本情况时,它应该返回一个响应字典,如这里定义的:
@task(rate_limit=None)
def timeline_recursor(request):
previous_tweets=request.get('previous_tweets', None) #If it's the first time through, this will be None
if not previous_tweets:
previous_tweets = [] #so we initiate to empty array
tweets = request.get('tweets', None)
twitter_user_id=request['twitter_user_id']
previous_max_id=request['previous_max_id']
total_requested=request['total_requested']
pulled_in=request['pulled_in']
remaining_requested = total_requested - pulled_in
if previous_max_id:
remaining_requested += 1 #this is because cursored results will always have one overlapping id
else:
previous_max_id = random.getrandbits(128) # for first time through loop
new_max_id = min([tweet['id'] for tweet in tweets])
test = lambda x, y: x<y
if remaining_requested < 0: #because we overshoot by requesting batches of 200
remaining_requested = 0
if tweets:
previous_tweets.extend(tweets)
if tweets and remaining_requested and (pulled_in > 1) and test(new_max_id, previous_max_id):
request = dict(user_pk=user_pk,
twitter_user_id=twitter_user_id,
max_id = new_max_id,
total_requested = remaining_requested,
tweets=previous_tweets)
#problem happens in this part of the logic???
response = (twitter_getter_config.s(request) | get_tweets_for_user_id.s() | timeline_recursor.s()).apply_async()
else: #if in base case, combine all tweets pulled in thus far and send back as "tweets" -- to be
#saved in db or otherwise consumed
response = dict(
twitter_user_id=twitter_user_id,
total_requested = total_requested,
tweets=previous_tweets)
return response
因此,我期望的res.result的响应是一个字典,包含Twitter用户ID、请求的推文数量和通过连续调用获取的推文集合。
不过,在递归任务的世界里并不太顺利。当我运行上面提到的任务链时,如果我在启动链后立即输入res.status,它会显示“成功”,尽管在我的celery工作者的日志视图中,我可以看到递归调用Twitter API的链式调用正在按预期进行,并且参数也是正确的。我还可以在链式任务执行时立即运行result.result。res.result返回的是一个AsyncResponse实例ID。即使递归链式任务已经完成,res.result仍然保持为AsyncResult ID。
另一方面,我可以通过访问res.result.result.result.result['tweets']来获取完整的推文集合。我可以推断出每个链式子任务确实在执行,只是我不明白为什么res.result没有预期的结果。当timeline_recursor达到基本情况时,应该发生的递归返回似乎没有按预期传播。
有什么想法可以解决这个问题吗?在Celery中使用递归可以非常强大,但至少对我来说,如何理解使用Celery的递归和递归函数,以及这如何影响链式任务中的返回逻辑,并不是完全清楚。
如有需要,我很乐意进一步解释,感谢任何建议。
1 个回答
什么是 apply_async
返回的内容(对象类型)?
我对 Celery 不太了解,但在 Twisted 和其他很多异步框架中,像这样的调用通常会立即返回(通常是 True
或者一个可以跟踪状态的对象),因为任务会被放入队列中等待处理。
再次强调,我对 Celery 不太熟悉,我猜测发生的情况是:
你是:立刻将 response
定义为异步的延迟 task
,但却试图在结果还没出来的时候就对它进行操作。
你应该:定义一个 callback
函数,在任务完成后对结果进行处理并返回一个值。
查看 Celery 的文档,apply_async
通过 link
接受回调,但我没有找到任何人尝试从中捕获返回值的例子。