如何动态地正确关闭Python RQ工作进程?

10 投票
1 回答
5029 浏览
提问于 2025-04-17 17:20

我们正在使用Python RQ来动态管理工作进程。我们有一个自定义的工作脚本,简单来说就是这样:

from rq import Connection, Worker

queues_to_listen_on = get_queues_to_listen_on()

with Connection(connection = get_worker_connection()):
    w = Worker(queues_to_listen_on)
    w.work()

我们特别关注的是工作进程的关闭。我们最关心的是如何优雅地关闭一个工作进程,也就是说在关闭之前要确保当前的工作能够完成。适当的Worker对象上的request_stop(...)信号处理器似乎可以满足我们的需求,但据我所知,似乎没有其他方法可以发出这个信号,除了在终端中按CTRL+C来结束工作进程。

我认为有两种可能的解决方案(当然可能还有更多),按优先级排序:

  1. 通过编程方式,使用rq库发送信号给request_stop,从而触发优雅关闭。
  2. 以某种方式获取正确进程的pid(不确定是工作马进程还是工作监听进程),然后使用其他方法向该进程发送适当的信号。我们有一些方法可以做到这一点,但这可能需要更多的工作,并引入其他变量,这些我更希望能省去(例如,使用Fabric来运行远程命令或类似的方式)。

如果有更好的方法来解决这个问题,或者有其他替代方案可以达到同样的目标,我非常欢迎你的建议。

1 个回答

5

选项1在设计上绝对更好。

不过,为了解决你提到的必须使用CTRL + C来退出进程的问题(我也很讨厌这个),你可以对你的工作程序采用以下策略:

# WORKER_NAME.py
import os

PID = os.getpid()

@atexit.register
def clean_shut():
    print "Clean shut performed"

    try:
        os.unlink("WORKER_NAME.%d" % PID)
    except:
        pass

# Worker main
def main():
    f = open("WORKER_NAME.%d" % PID, "w")
    f.write("Delete this to end WORKER_NAME gracefully")
    f.close()

    while os.path.exists("WORKER_NAME.%d" % PID):
        # Worker working

然后在你的主脚本中,按照@Borys的建议获取工作程序的PID,发送温和的停止请求,并使用os.unlink("path/to/WORKER_NAME.%d" % worker_PID)来确保优雅地关闭程序 :)

不过,这种方法只适用于那些在无限循环中运行的工作程序。如果工作程序执行的任务会阻塞,即使是简单的顺序一次性任务,你就需要进一步追踪可能导致阻塞的代码,来解决这个问题,比如应用某种超时策略。

撰写回答