<p>我发现有一个表是为维护数据库中的任务而创建的。所以我只是简单地创建了一个脚本,它将每小时检查表中过去一小时记录中的失败任务,如果找到任何记录,它将发送电子邮件。在</p>
<p><strong>脚本.py</strong></p>
<pre><code>#!venv/bin/python2
import os
from django.conf import settings
if __name__ == '__main__' and __package__ is None:
os.sys.path.append(
os.path.dirname(
os.path.dirname(
os.path.abspath(__file__))))
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "rest_apis.settings")
import django
django.setup()
from django.core.mail import EmailMessage
from djcelery.models import TaskMeta
from datetime import datetime, timedelta, time
USERS_TO_NOTIFY = ['ops@yopmail.com']
TIME_THRESHOLD_INTERVAL = 60
def send_email(email_subject_line, email_body):
email = EmailMessage(email_subject_line,
email_body,
settings.EMAIL_HOST_USER,
USERS_TO_NOTIFY
)
email.send()
def main():
current_time = datetime.now() # Get Current TimeStamp
time_threshold = current_time - timedelta(minutes=TIME_THRESHOLD_INTERVAL) # Get 60 minutes past current time stamp
celery_taskmeta_objects = TaskMeta.objects.filter(status="FAILURE", date_done__gte=time_threshold)
email_body = "Below are the tasks which failed : "
if celery_taskmeta_objects.exists():
for celery_taskmeta in celery_taskmeta_objects:
print celery_taskmeta.task_id
email_body += "\n\ntask_id : %s" % celery_taskmeta.task_id
email_body += "\nstatus : %s" % celery_taskmeta.status
email_body += "\ndate : %s" % celery_taskmeta.date_done
email_body += "\ntraceback :"
email_body += "\n%s\n\n" % celery_taskmeta.traceback
email_subject_line = '[URGENT] Celery task failure in last %s minutes' % (TIME_THRESHOLD_INTERVAL)
send_email(email_subject_line, email_body)
main()
</code></pre>
<p>现在在一封电子邮件中,我也得到了完整的堆栈跟踪和任务的id。现在我的要求是每小时检查一次,所以我把脚本放在crontab中。现在您可以根据您的基本需要更改时间阈值并相应地工作。在</p>