如何动态地正确关闭Python RQ工作进程?
我们正在使用Python RQ来动态管理工作进程。我们有一个自定义的工作脚本,简单来说就是这样:
from rq import Connection, Worker
queues_to_listen_on = get_queues_to_listen_on()
with Connection(connection = get_worker_connection()):
w = Worker(queues_to_listen_on)
w.work()
我们特别关注的是工作进程的关闭。我们最关心的是如何优雅地关闭一个工作进程,也就是说在关闭之前要确保当前的工作能够完成。适当的Worker
对象上的request_stop(...)
信号处理器似乎可以满足我们的需求,但据我所知,似乎没有其他方法可以发出这个信号,除了在终端中按CTRL+C
来结束工作进程。
我认为有两种可能的解决方案(当然可能还有更多),按优先级排序:
- 通过编程方式,使用
rq
库发送信号给request_stop
,从而触发优雅关闭。 - 以某种方式获取正确进程的pid(不确定是工作马进程还是工作监听进程),然后使用其他方法向该进程发送适当的信号。我们有一些方法可以做到这一点,但这可能需要更多的工作,并引入其他变量,这些我更希望能省去(例如,使用
Fabric
来运行远程命令或类似的方式)。
如果有更好的方法来解决这个问题,或者有其他替代方案可以达到同样的目标,我非常欢迎你的建议。
1 个回答
5
选项1在设计上绝对更好。
不过,为了解决你提到的必须使用CTRL + C
来退出进程的问题(我也很讨厌这个),你可以对你的工作程序采用以下策略:
# WORKER_NAME.py
import os
PID = os.getpid()
@atexit.register
def clean_shut():
print "Clean shut performed"
try:
os.unlink("WORKER_NAME.%d" % PID)
except:
pass
# Worker main
def main():
f = open("WORKER_NAME.%d" % PID, "w")
f.write("Delete this to end WORKER_NAME gracefully")
f.close()
while os.path.exists("WORKER_NAME.%d" % PID):
# Worker working
然后在你的主脚本中,按照@Borys的建议获取工作程序的PID,发送温和的停止请求,并使用os.unlink("path/to/WORKER_NAME.%d" % worker_PID)
来确保优雅地关闭程序 :)
不过,这种方法只适用于那些在无限循环中运行的工作程序。如果工作程序执行的任务会阻塞,即使是简单的顺序一次性任务,你就需要进一步追踪可能导致阻塞的代码,来解决这个问题,比如应用某种超时策略。