Python - 从另一个队列重试失败的Celery任务

5 投票
1 回答
4693 浏览
提问于 2025-04-18 18:25

我在使用Celery向一个网络服务发送数据。有时候,因为网络断了,数据没法发送,这个任务会无限次重试,直到成功发送为止。但其实这种重试是没必要的,因为网络就是不通,不需要再试了。

我想到了一个更好的解决办法:如果一个任务失败了三次(至少重试三次),就把它移到另一个队列。这个队列里存放着所有失败的任务。等网络恢复正常,数据成功发送后,正常队列里的任务就完成了,这时再开始处理那个失败任务的队列。这样就不会浪费CPU的内存去一次又一次地重试任务。

这是我的代码:现在我只是一直重试这个任务,但我怀疑这样做是否正确。

@shared_task(default_retry_delay = 1 * 60, max_retries = 10)
def post_data_to_web_service(data,url):

    try : 
        client = SoapClient(
                            location = url,
                            action = 'http://tempuri.org/IService_1_0/',
                            namespace = "http://tempuri.org/", 
                            soap_ns='soap', ns = False
                            )

        response= client.UpdateShipment(
                                        Weight = Decimal(data['Weight']), 
                                        Length = Decimal(data['Length']), 
                                        Height = Decimal(data['Height']), 
                                        Width =  Decimal(data['Width']) , 
                                        )

    except Exception, exc:
        raise post_data_to_web_service.retry(exc=exc) 

我该如何同时维护两个队列,并尝试从这两个队列中执行任务呢?

Settings.py

BROKER_URL = 'redis://localhost:6379/0'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'

1 个回答

6

默认情况下,celery会把所有的任务放到一个叫做 celery 的队列里。所以你可以在这里运行你的任务,当出现错误时,它会自动重试。如果重试次数达到上限,你可以把这些任务转移到一个新的队列,比如叫 foo

from celery.exceptions import MaxRetriesExceededError

@shared_task(default_retry_delay = 1 * 60, max_retries = 10)
def post_data_to_web_service(data,url):
    try:
        #do something with given args

 except MaxRetriesExceededError:
        post_data_to_web_service([data, url], queue='foo')

 except Exception, exc:
        raise post_data_to_web_service.retry(exc=exc) 

当你启动你的工作进程时,这个任务会尝试处理给定的数据。如果处理失败,它会在60秒后重试10次。然后,当它遇到 MaxRetriesExceededError 这个错误时,它会把同样的任务放到新的队列 foo 中。

要处理这些任务,你需要启动一个新的工作进程。

celery worker -l info -A my_app -Q foo

或者,如果你启动默认的工作进程时使用特定的设置,也可以从中处理这个任务。

 celery worker -l info -A my_app -Q celery,foo

撰写回答