使用不同代码源的两个celery实例

1 投票
1 回答
673 浏览
提问于 2025-04-18 12:42

我有两种工人:

  1. 第一种工人是和django连接的,负责数据库轮询。
  2. 第二种工人则不和django有任何互动。

我使用rabbitMQ作为消息中介,最开始在同一台服务器上启动这两种工人。

我想把第一种和第二种工人的代码分开,以便将来能在不同的服务器上启动这些工人。我希望第二种工人所在的服务器上不要有django的代码。

在django工人那边,我是这样定义它们的:

@shared_task(name='task_polling')
def task_polling():
    send_task('do_work', [], {'_serialized_task': serialized_task})

@shared_task(name='save_shell_task')
def save_shell_task(_result):
    pass

还有以下的celery配置:

from __future__ import absolute_import

import os
from datetime import timedelta
from celery import Celery


# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'ussd_auto.settings.local')


app = Celery(include=[
    'app_task_management.tasks',
])

app.conf.update(
    CELERY_IGNORE_RESULT=True,
    CELERY_ACCEPT_CONTENT=['json'],
    CELERY_TASK_SERIALIZER='json',
    CELERYBEAT_SCHEDULE={
        'periodic_task': {
            'task': 'task_polling',
            'schedule': timedelta(seconds=1),
        },
    },
)

我用以下命令启动这些工人:

celery -A my_app worker -B -l info

在启动屏幕上,任务都被正确注册了:

 -------------- celery@mbp-de-julio.home v3.1.12 (Cipater)
---- **** -----
--- * ***  * -- Darwin-13.1.0-x86_64-i386-64bit
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app:         __main__:0x102341390
- ** ---------- .> transport:   amqp://guest:**@localhost:5672//
- ** ---------- .> results:     disabled
- *** --- * --- .> concurrency: 4 (prefork)
-- ******* ----
--- ***** ----- [queues]
 -------------- .> celery           exchange=celery(direct) key=celery


[tasks]
  . save_shell_task
  . task_polling

而在另一边,我是这样定义任务和celery配置的:

from celery import Celery
from celery import shared_task
from celery.execute import send_task

from task_workers import task_workers

app = Celery('tasks', broker='amqp://guest@localhost//')

@shared_task(name='do_work')
def do_work(_serialized_task):
    result = task_workers.Worker().do(_task=_serialized_task)
    send_task('save_shell_task', [], {'_result':result})

我用这个命令启动这个工人:

    celery -A tasks worker --loglevel=info

启动屏幕显示任务也被正确注册了:

 -------------- celery@mbp-de-julio.home v3.1.12 (Cipater)
---- **** -----
--- * ***  * -- Darwin-13.1.0-x86_64-i386-64bit
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app:         tasks:0x101833450
- ** ---------- .> transport:   amqp://guest:**@localhost:5672//
- ** ---------- .> results:     disabled
- *** --- * --- .> concurrency: 4 (prefork)
-- ******* ----
--- ***** ----- [queues]
 -------------- .> celery           exchange=celery(direct) key=celery


[tasks]
  . do_work

但是看起来这两个celery实例之间不知道怎么沟通:

[2014-07-09 22:21:52,184: ERROR/MainProcess] Received unregistered task of type u'task_polling'.
The message has been ignored and discarded.

Did you remember to import the module containing this task?
Or maybe you are using relative imports?
Please see http://bit.ly/gLye1c for more information.

The full contents of the message body was:
[...]
KeyError: u'task_polling'

我该如何配置celery,让两边能够解决这个沟通问题呢?

更新 10/07

当我单独启动轮询任务时,轮询是正常工作的,但当我和其他工人一起启动celery时,这个celery接收到轮询的消息却说它不知道这个任务。也许我需要为这些任务分配不同的队列?

我已经定义了队列。

在第二种工人的celeryconfig.py文件中:

CELERY_ROUTES = {
    'do_work': {
        'queue': 'task_workers_queue'
    },
    'task_polling': {
        'queue': 'db_workers_queue'
    },
    'save_shell_task': {
        'queue': 'db_workers_queue'
    },
}

在第一种工人的数据库那边:

app.conf.update(
    CELERY_IGNORE_RESULT=True,
    CELERY_ACCEPT_CONTENT=['json'],
    CELERY_TASK_SERIALIZER='json',
    CELERYBEAT_SCHEDULE={
        'periodic_task': {
            'task': 'task_polling',
            'schedule': timedelta(seconds=1),
        },
    },
    CELERY_ROUTES={
        'task_polling': {
            'queue': 'db_workers_queue'
        },
        'save_shell_task': {
            'queue': 'db_workers_queue'
        },
        'do_work': {
            'queue': 'task_workers_queue'
        }
    },
)

我这样启动celery:

$ celery -A tasks worker --loglevel=info -Q task_workers_queue
$ celery -A ussd_auto worker -B -l info -Q db_workers_queue

现在,定时任务db_polling被beat正确调用。 这个任务成功地将工作发送给do_work任务。 但是do_work任务却没能将结果发送给save_shell_task任务。 没有错误,但什么也没发生。

1 个回答

0

在你第一次尝试的时候,你有两个工作者(worker)在同一个代理(broker)上配置,但注册的任务不同。在第一个场景中,这两个工作者会从代理那里轮流接收任务,这就是为什么会出现第一个错误:

Received unregistered task of type u'task_polling'.

很明显,第二个工作者没有注册这个任务,他根本不知道这个任务的源代码。

在你的更新中,你逻辑上考虑为每个工作者创建一个专用的队列,这确实是一个合适的解决方案,可以避免一个工作者接到不该接的任务。你只犯了一个错误,celery的send_task方法不会考虑路由,这个方法只是一个发送消息到代理的接口,路由是在通过任务类调度任务时才会被考虑。

要让它正常工作,你需要在send_task方法中指定队列,方法如下:

send_task('save_shell_task', [], kwargs={'_result':result, queue=queue})

更多相关内容:

我可以通过名称调用任务吗?

send_task 源码

希望这能帮到你

撰写回答