如何捕获在执行一系列Celery任务时生成的所有Python日志记录?

5 投票
3 回答
1659 浏览
提问于 2025-04-16 06:56

我想把自己做的任务队列系统改成基于Celery的任务队列,但有一个功能让我很困扰。

现在,我的任务队列运作得比较粗糙;我运行一个工作(这个工作会生成数据并上传到另一个服务器),然后用一种变体的Nose日志捕获库来收集日志,最后把这些日志作为详细的结果记录存储在应用数据库里。

我想把这个过程分成三个任务:

  1. 收集数据
  2. 上传数据
  3. 报告结果(包括前两个任务的所有日志)

这里最大的难点在于日志的收集。目前,我使用日志捕获功能,记录了每次日志调用的记录,这些记录在数据生成和上传过程中是必须的,用于诊断。由于这些任务甚至不一定在同一个进程中运行,所以在Celery任务队列中我不太清楚该怎么实现这个功能。

我理想的解决方案是一个简单且尽量不干扰其他任务的方法,能够在前两个任务(1和2)执行时捕获所有日志,并让报告任务(3)可以使用这些日志。

我是否应该保持任务定义的粗糙性,把所有工作放在一个任务里?还是说有办法把已有的日志传递到最后收集?

3 个回答

0

Django Sentry 是一个用于 Python(和 Django)的日志工具,它也支持 Celery。

0

听起来你需要一种叫做“监视器”的东西会比较合适。如果你能像看电影一样实时查看和处理日志,那你就可以在结果出来的时候直接获取它们。因为这个监视器是单独运行的,所以它不会依赖于它所监视的内容,我觉得这样可以满足你对一种不干扰的解决方案的要求。

1

我猜你是在使用 logging 模块。你可以为每个任务设置一个单独的命名记录器来完成工作。它们会继承上级的所有配置。

task.py 文件中:

import logging

@task
step1(*args, **kwargs):
    # `key` is some unique identifier common for a piece of data in all steps of processing
    logger = logging.getLogger("myapp.tasks.processing.%s"%key)
    # ...
    logger.info(...) # log something

@task
step2(*args, **kwargs):
    logger = logging.getLogger("myapp.tasks.processing.%s"%key)
    # ...
    logger.info(...) # log something

在这里,所有的记录都发送到了同一个命名记录器。现在,你可以用两种方法来获取这些记录:

  1. 配置一个文件监听器,名字要和记录器的名字相关联。完成最后一步后,只需从那个文件中读取所有信息。确保这个监听器的输出缓冲是关闭的,否则你可能会丢失记录。

  2. 创建一个自定义监听器,把记录保存在内存中,然后在需要的时候一次性返回所有记录。我建议在这里使用 memcached 来存储,这比自己创建一个跨进程的存储要简单。

撰写回答