Celery 创建多个任务实例

2 投票
1 回答
1837 浏览
提问于 2025-04-17 04:57

我正在创建一个任务(通过继承celery.task.Task),这个任务会连接到Twitter的流媒体API。为了调用Twitter的API,我使用了tweepy。根据我从celery文档中了解到的,‘一个任务并不是为每个请求都实例化,而是作为全局实例注册在任务注册表中。’我原本以为每次调用apply_async(或delay)这个任务时,都是在访问最初实例化的那个任务,但实际上并不是。相反,每次都会创建一个新的自定义任务类的实例。我需要能够访问原来的自定义任务,因为这是我终止tweepy API调用所创建的原始连接的唯一方法。

这里有一段代码,如果有帮助的话:

from celery import registry
from celery.task import Task

class FollowAllTwitterIDs(Task):
    def __init__(self):
        # requirements for creation of the customstream
        # goes here. The CustomStream class is a subclass
        # of tweepy.streaming.Stream class

        self._customstream = CustomStream(*args, **kwargs)

    @property
    def customstream(self):
        if self._customstream:
            # terminate existing connection to Twitter
            self._customstream.running = False
        self._customstream = CustomStream(*args, **kwargs)

    def run(self):
        self._to_follow_ids = function_that_gets_list_of_ids_to_be_followed()

        self.customstream.filter(follow=self._to_follow_ids, async=False)
follow_all_twitterids = registry.tasks[FollowAllTwitterIDs.name]

还有Django视图的部分:

def connect_to_twitter(request):
    if request.method == 'POST':
        do_stuff_here()
        .
        .
        .

        follow_all_twitterids.apply_async(args=[], kwargs={})

     return

任何帮助都非常感谢。:D

编辑:

为了给这个问题提供更多背景,CustomStream对象在每次调用filter()方法时都会创建一个httplib.HTTPSConnection实例。每当尝试创建新的连接时,都需要关闭之前的连接。通过将customstream.running设置为False来关闭连接。

1 个回答

0

这个任务应该只被创建一次。如果你觉得它不是这样,

我建议你在

print("INSTANTIATE")

后面加上

import traceback

traceback.print_stack()

放到Task.__init__方法里,这样你就能知道这个问题发生在哪里了。

我觉得你的任务可以这样更好地表达:

from celery.task import Task, task

class TwitterTask(Task):
    _stream = None
    abstract = True

    def __call__(self, *args, **kwargs):
        try:
            return super(TwitterTask, self).__call__(stream, *args, **kwargs)
        finally:
            if self._stream:
                self._stream.running = False

    @property
    def stream(self):
        if self._stream is None:
            self._stream = CustomStream()
        return self._stream

@task(base=TwitterTask)
def follow_all_ids():
    ids = get_list_of_ids_to_follow()
    follow_all_ids.stream.filter(follow=ids, async=false)

撰写回答