如何从脚本/module __main__ 启动 Celery worker?

33 投票
6 回答
45433 浏览
提问于 2025-04-18 04:53

我在一个模块里定义了一个 Celery 应用,现在我想从这个模块的 __main__ 启动工作进程,也就是说,我想用 python -m 来运行这个模块,而不是从命令行用 celery。我试过这样做:

app = Celery('project', include=['project.tasks'])

# do all kind of project-specific configuration
# that should occur whenever this module is imported

if __name__ == '__main__':
    # log stuff about the configuration
    app.start(['worker', '-A', 'project.tasks'])

但是现在 Celery 认为我是在没有参数的情况下运行工作进程:

Usage: worker <command> [options] 

Show help screen and exit.

Options:
  -A APP, --app=APP     app instance to use (e.g. module.attr_name)
[snip]

它给出的使用说明就像你输入 celery --help 一样,感觉好像没有接收到命令。我还试过

app.worker_main(['-A', 'project.tasks'])

但是那样会提示 -A 这个参数不被识别。

那么我该怎么做呢?或者说,我怎么能给工作进程传递一个回调,让它记录关于配置的信息呢?

6 个回答

4

我觉得你只是没有把参数包裹起来,这样celery才能读取它们,比如:

queue = Celery('blah', include=['blah'])
queue.start(argv=['celery', 'worker', '-l', 'info'])
10

worker_main 在 celery 5.0.3 版本中被重新加入,具体可以查看这里:

https://github.com/celery/celery/pull/6481

在 5.0.4 版本中,这对我来说是有效的:

self.app.worker_main(argv = ['worker', '--loglevel=info', '--concurrency={}'.format(os.environ['CELERY_CONCURRENCY']), '--without-gossip'])
20

自从Celery 5发布以来,有了一些变化

现在,worker_main的结果是:

AttributeError: 'Celery' object has no attribute 'worker_main'

对于Celery 5,请按照以下步骤操作:

app = celery.Celery(
    'project',
    include=['project.tasks']
)

if __name__ == '__main__':
    worker = app.Worker(
        include=['project.tasks']
    )
    worker.start()

详细信息请查看celery.apps.workercelery.worker.WorkController.setup_defaults(希望将来能有更好的文档)。

24

使用 app.worker_main 方法(版本 3.1.12):

± cat start_celery.py
#!/usr/bin/python

from myapp import app


if __name__ == "__main__":
    argv = [
        'worker',
        '--loglevel=DEBUG',
    ]
    app.worker_main(argv)
14

根据Django-Celery模块的代码,你可以尝试这样的做法:

from __future__ import absolute_import, unicode_literals

from celery import current_app
from celery.bin import worker


if __name__ == '__main__':
    app = current_app._get_current_object()

    worker = worker.worker(app=app)

    options = {
        'broker': 'amqp://guest:guest@localhost:5672//',
        'loglevel': 'INFO',
        'traceback': True,
    }

    worker.run(**options)

撰写回答