似乎无法使python芹菜信号工作

2024-05-13 05:31:48 发布

您现在位置:Python中文网/ 问答频道 /正文

我对芹菜的开发还比较陌生,我在实现信号方面遇到了一个问题。 我有一个由许多不同工人组成的应用程序。 目前,它使用rabbitmq作为代理,使用redis作为后端

每个工人都有自己的队列。这是我们目前的配置方式:

celery = Celery(queueDict['test'], broker=config.REDIS_SERVER, backend=config.REDIS_SERVER)
default_exchange = Exchange('default', type='direct')
test_queue = Queue(queueDict['test'], default_exchange, routing_key=queueDict['test'])


logger = get_task_logger(__name__)

celery.conf.task_queues = (test_queue, )


@celery.task(name='signal2', bind=True)
def signal2(self, param):
    print("dog" + param)

我希望使用信号,以便能够捕获应用程序中任何工作者的失败任务。当我在同一个worker中使用它时,它会在一个task_failure事件中工作。 但我想让另一个工作人员捕捉这些事件(甚至是我的flask应用程序) 但我似乎错过了一些东西。。。 这是我目前的努力,让它发挥作用

celery = Celery('consumer', broker=config.REDIS_SERVER, backend=config.REDIS_SERVER)
default_exchange = Exchange('default', type='direct')
default_queue = Queue(queueDict['default'], default_exchange, routing_key=queueDict['default'])

logger = get_task_logger(__name__)

celery.conf.task_queues = (default_queue, )


@task_failure.connect
def process_failure_signal(sender=None, task_id=None, exception=None,
                           args=None, kwargs=None, traceback=None, einfo=None, **akwargs):

    msg = 'Signal exception: %s (%s)' % (
        exception.__class__.__name__, exception)
    exc_info = (type(exception), exception, traceback)
    extra = {
        'data': {
            'task_id': str(task_id),
            'sender': str(sender),
            'args': str(args),
            'kwargs': str(kwargs),
        }
    }

    logger.error(msg, exc_info=exc_info, extra=extra)

但它从未收到任何信号。。。 谢谢你的帮助


Tags: nametestredisnoneconfigdefaulttaskexchange
2条回答

实时处理

要实时处理事件,您需要以下几点

  • 事件使用者(这是Receiver

  • 事件进入时调用的一组处理程序

    You can have different handlers for each event type, or a catch-all handler can be used ('*')

  • 州(可选)

    app.events.State是集群中任务和工作者的一种方便的内存表示形式,随着事件的到来而更新

    它封装了许多常见问题的解决方案,比如检查工作人员是否仍然活着(通过验证心跳),在事件发生时将事件字段合并在一起,确保时间戳同步,等等

结合这些功能,您可以轻松地实时处理事件:

from celery import Celery

def my_monitor(app):
    state = app.events.State()

    def announce_failed_tasks(event):
        state.event(event)
        # task name is sent only with -received event, and state
        # will keep track of this for us.
        task = state.tasks.get(event['uuid'])

        print('TASK FAILED: %s[%s] %s' % (
            task.name, task.uuid, task.info(),))

    with app.connection() as connection:
        recv = app.events.Receiver(connection, handlers={
                'task-failed': announce_failed_tasks,
                '*': state.event,
        })
        recv.capture(limit=None, timeout=None, wakeup=True)

if __name__ == '__main__':
    app = Celery(broker='amqp://guest@localhost//')
    my_monitor(app)

注意capturewakeup参数向所有工作人员发送信号,强制他们发送心跳。这样,当监视器启动时,您可以立即看到工人

您可以通过指定处理程序来侦听特定事件:

from celery import Celery

def my_monitor(app):
    state = app.events.State()

    def announce_failed_tasks(event):
        state.event(event)
        # task name is sent only with -received event, and state
        # will keep track of this for us.
        task = state.tasks.get(event['uuid'])

        print('TASK FAILED: %s[%s] %s' % (
            task.name, task.uuid, task.info(),))

    with app.connection() as connection:
        recv = app.events.Receiver(connection, handlers={
                'task-failed': announce_failed_tasks,
        })
        recv.capture(limit=None, timeout=None, wakeup=True)  

if __name__ == '__main__':
    app = Celery(broker='amqp://guest@localhost//')
    my_monitor(app)

Monitoring and Management Guide — Celery 4.4.2 documention

DejanLekic是正确的,他分享的页面正是我想要的

有兴趣的人士: https://docs.celeryproject.org/en/stable/userguide/monitoring.html#real-time-processing

这可以很容易地用于捕获事件和监视任务

相关问题 更多 >