使用Celery任务创建新进程
在Celery中,想要彻底重新加载模块,唯一的方法就是重启所有的Celery进程。我知道怎么远程关闭工作进程(broadcast('shutdown', destination = [<workers>])
),但不知道怎么把它们重新启动。
我有一段Python代码可以创建一个新的守护进程和一个新的工作进程,但当我尝试把它作为Celery任务运行时,出现了AssertionError: daemonic processes are not allowed to have children
的错误,我猜这和工作池的设置有关。
- 有没有办法在Celery中覆盖这个错误?
- 如果没有,还有没有其他方法让Celery启动另一个工作进程来替代自己?也许可以把整个过程放在一个bash脚本里(不过这样做的初衷是为了避免用Python去调用bash再调用Python)。
- 如果没有,还有没有其他方法让Celery重新加载最新版本的代码?
--autoreload
标志和broadcast('pool_restart')
都没有效果。
示例行为:
创建:
@task def add(x,y): return x+y
启动Celery,运行add.delay(4,4),得到8。
把add改成:
@task def add(x,y): return x*y
做点魔法(目前是“重启Celery”)
再次运行add.delay(4,4)
你应该得到16。但我总是得到8,除非我关闭Celery并重新加载它,而我不能远程做到这一点,除非有办法从远程机器启动工作进程,最好是用脚本来实现。
2 个回答
0
如果你在使用Flask框架,你可以通过在另一个进程中打开celery,使用subprocess命令行来启动,并利用Werkzeug的重载功能来重新加载文件:
if __name__ == "__main__":
celery.control.broadcast("shutdown") # Kill the previous workers
import subprocess
subprocess.Popen("celery worker", shell=True) # Or celeryd
app.run(use_reloader=True) # From Flask/Werkzeug
你也可以直接使用 debug=True
来启用Flask/Werkzeug的重载功能(不过在生产环境中可不建议这样做)。另外,你还可以使用 watchdog 这个包来监控文件的变化。这种方法有点粗暴……但确实有效。
2
在一个管理工具下运行Celery的后台进程,比如叫做supervisord的工具。当celeryd这个进程意外停止时,管理工具会自动把它重新启动。