Python - 从另一个队列重试失败的Celery任务
我在使用Celery向一个网络服务发送数据。有时候,因为网络断了,数据没法发送,这个任务会无限次重试,直到成功发送为止。但其实这种重试是没必要的,因为网络就是不通,不需要再试了。
我想到了一个更好的解决办法:如果一个任务失败了三次(至少重试三次),就把它移到另一个队列。这个队列里存放着所有失败的任务。等网络恢复正常,数据成功发送后,正常队列里的任务就完成了,这时再开始处理那个失败任务的队列。这样就不会浪费CPU的内存去一次又一次地重试任务。
这是我的代码:现在我只是一直重试这个任务,但我怀疑这样做是否正确。
@shared_task(default_retry_delay = 1 * 60, max_retries = 10)
def post_data_to_web_service(data,url):
try :
client = SoapClient(
location = url,
action = 'http://tempuri.org/IService_1_0/',
namespace = "http://tempuri.org/",
soap_ns='soap', ns = False
)
response= client.UpdateShipment(
Weight = Decimal(data['Weight']),
Length = Decimal(data['Length']),
Height = Decimal(data['Height']),
Width = Decimal(data['Width']) ,
)
except Exception, exc:
raise post_data_to_web_service.retry(exc=exc)
我该如何同时维护两个队列,并尝试从这两个队列中执行任务呢?
Settings.py
BROKER_URL = 'redis://localhost:6379/0'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
1 个回答
6
默认情况下,celery会把所有的任务放到一个叫做 celery
的队列里。所以你可以在这里运行你的任务,当出现错误时,它会自动重试。如果重试次数达到上限,你可以把这些任务转移到一个新的队列,比如叫 foo
。
from celery.exceptions import MaxRetriesExceededError
@shared_task(default_retry_delay = 1 * 60, max_retries = 10)
def post_data_to_web_service(data,url):
try:
#do something with given args
except MaxRetriesExceededError:
post_data_to_web_service([data, url], queue='foo')
except Exception, exc:
raise post_data_to_web_service.retry(exc=exc)
当你启动你的工作进程时,这个任务会尝试处理给定的数据。如果处理失败,它会在60秒后重试10次。然后,当它遇到 MaxRetriesExceededError
这个错误时,它会把同样的任务放到新的队列 foo
中。
要处理这些任务,你需要启动一个新的工作进程。
celery worker -l info -A my_app -Q foo
或者,如果你启动默认的工作进程时使用特定的设置,也可以从中处理这个任务。
celery worker -l info -A my_app -Q celery,foo