使用Celery任务创建新进程

2 投票
2 回答
2536 浏览
提问于 2025-04-17 13:26

在Celery中,想要彻底重新加载模块,唯一的方法就是重启所有的Celery进程。我知道怎么远程关闭工作进程(broadcast('shutdown', destination = [<workers>])),但不知道怎么把它们重新启动。

我有一段Python代码可以创建一个新的守护进程和一个新的工作进程,但当我尝试把它作为Celery任务运行时,出现了AssertionError: daemonic processes are not allowed to have children的错误,我猜这和工作池的设置有关。

  1. 有没有办法在Celery中覆盖这个错误?
  2. 如果没有,还有没有其他方法让Celery启动另一个工作进程来替代自己?也许可以把整个过程放在一个bash脚本里(不过这样做的初衷是为了避免用Python去调用bash再调用Python)。
  3. 如果没有,还有没有其他方法让Celery重新加载最新版本的代码?--autoreload标志和broadcast('pool_restart')都没有效果。

示例行为:

  1. 创建:

    @task def add(x,y): return x+y

  2. 启动Celery,运行add.delay(4,4),得到8。

  3. 把add改成:

    @task def add(x,y): return x*y

  4. 做点魔法(目前是“重启Celery”)

  5. 再次运行add.delay(4,4)

你应该得到16。但我总是得到8,除非我关闭Celery并重新加载它,而我不能远程做到这一点,除非有办法从远程机器启动工作进程,最好是用脚本来实现。

2 个回答

0

如果你在使用Flask框架,你可以通过在另一个进程中打开celery,使用subprocess命令行来启动,并利用Werkzeug的重载功能来重新加载文件:

if __name__ == "__main__":
    celery.control.broadcast("shutdown") # Kill the previous workers
    import subprocess
    subprocess.Popen("celery worker", shell=True) # Or celeryd
    app.run(use_reloader=True) # From Flask/Werkzeug

你也可以直接使用 debug=True 来启用Flask/Werkzeug的重载功能(不过在生产环境中可不建议这样做)。另外,你还可以使用 watchdog 这个包来监控文件的变化。这种方法有点粗暴……但确实有效。

2

在一个管理工具下运行Celery的后台进程,比如叫做supervisord的工具。当celeryd这个进程意外停止时,管理工具会自动把它重新启动。

撰写回答