为rq(redis队列)提供作业调度功能

rq-scheduler的Python项目详细描述


RQ调度程序

RQ Scheduler是一个 将作业调度功能添加到RQ, 基于Redis的python队列库。

https://travis-ci.org/rq/rq-scheduler.svg?branch=master

要求

安装

您可以通过pip安装RQ Scheduler

pip install rq-scheduler

或者您可以从PyPI下载最新的稳定包。

用法

安排一项工作需要做两件不同的事情:

  1. 将作业放入调度程序
  2. 运行计划程序,该计划程序将在时间到来时将计划的作业移动到队列中

安排作业

有两种方法可以安排工作。第一种是使用rq调度器的enqueue_at

fromredisimportRedisfromrqimportQueuefromrq_schedulerimportSchedulerfromdatetimeimportdatetimescheduler=Scheduler(connection=Redis())# Get a scheduler for the "default" queue# You can also instantiate a Scheduler using an RQ Queuequeue=Queue('foo',connection=Redis())scheduler=Scheduler(queue=queue)# Puts a job into the scheduler. The API is similar to RQ except that it# takes a datetime object as first argument. So for example to schedule a# job to run on Jan 1st 2020 we do:scheduler.enqueue_at(datetime(2020,1,1),func)# Date time should be in UTC# Here's another example scheduling a job to run at a specific date and time (in UTC),# complete with args and kwargs.scheduler.enqueue_at(datetime(2020,1,1,3,4),func,foo,bar=baz)

第二种方法是使用enqueue_in。而不是接受一个datetime对象, 此方法需要一个timedelta,并计划在其上运行作业 x秒/分/小时/天/周后。例如,如果我们想监控 流行的tweet是一天中的几次,我们可以做类似的事情

fromdatetimeimporttimedelta# Schedule a job to run 10 minutes, 1 hour and 1 day laterscheduler.enqueue_in(timedelta(minutes=10),count_retweets,tweet_id)scheduler.enqueue_in(timedelta(hours=1),count_retweets,tweet_id)scheduler.enqueue_in(timedelta(days=1),count_retweets,tweet_id)

important:使用RQ Scheduler时,应始终使用UTC日期时间。

周期性和重复性工作

从0.3版起,RQ Scheduler还支持创建周期性和重复的作业。 您可以通过schedule方法执行此操作。请注意,此功能需要 RQ>;=0.3.1。

您就是这样做的

scheduler.schedule(scheduled_time=datetime.utcnow(),# Time for first execution, in UTC timezonefunc=func,# Function to be queuedargs=[arg1,arg2],# Arguments passed into function when executedkwargs={'foo':'bar'},# Keyword arguments passed into function when executedinterval=60,# Time before the function is called again, in secondsrepeat=10,# Repeat this number of times (None means repeat forever)meta={'foo':'bar'}# Arbitrary pickleable data on the job itself)

重要提示:如果设置了重复作业,则必须确保 请不要设置result\u ttl值,或者设置的值大于间隔。 否则,包含作业详细信息的条目将过期,并且不会重新安排作业。

cron作业

从0.6.0版起,RQ Scheduler还支持创建cron作业,可以用于 重复作业,以固定的时间、日期或间隔定期运行,以查看更多信息 https://en.wikipedia.org/wiki/Cron。您可以通过cron方法执行此操作。

您就是这样做的

scheduler.cron(cron_string,# A cron string (e.g. "0 0 * * 0")func=func,# Function to be queuedargs=[arg1,arg2],# Arguments passed into function when executedkwargs={'foo':'bar'},# Keyword arguments passed into function when executedrepeat=10,# Repeat this number of times (None means repeat forever)queue_name=queue_name,# In which queue the job should be put inmeta={'foo':'bar'}# Arbitrary pickleable data on the job itself)

检索计划作业

有时你需要知道哪些工作已经安排好了。你可以得到一个 使用get_jobs方法排队的作业列表

list_of_job_instances=scheduler.get_jobs()

这个方法以最简单的形式(如上面的例子所示)返回一个列表 当前计划执行的所有作业实例。

此外,该方法还接受两个可选的关键字参数untilwith_times。第一个选项指定计划作业的时间点 应该归还。它可以作为datetime/timedelta实例 或表示自纪元(1970-01-01 00:00:00)以来秒数的整数。 第二个参数是一个布尔值,它决定是否执行计划 时间应该与作业实例一起返回。

示例

# get all jobs until 2012-11-30 10:00:00list_of_job_instances=scheduler.get_jobs(until=datetime(2012,10,30,10))# get all jobs for the next hourlist_of_job_instances=scheduler.get_jobs(until=timedelta(hours=1))# get all jobs with execution timesjobs_and_times=scheduler.get_jobs(with_times=True)# returns a list of tuples:# [(<rq.job.Job object at 0x123456789>, datetime.datetime(2012, 11, 25, 12, 30)), ...]

检查作业是否已安排

您可以检查特定的作业实例或作业ID是否为 使用熟悉的pythonin运算符执行

ifjob_instanceinscheduler:# Do something# orifjob_idinscheduler:# Do something

取消作业

要取消作业,只需将Job或作业ID传递给scheduler.cancel

scheduler.cancel(job)

注意,无论是否找到指定的作业,此方法都返回None

运行调度程序

RQ Scheduler附带运行调度程序的脚本rqscheduler。 每分钟轮询redis一次并将计划的作业移动到 需要执行的相关队列

# This runs a scheduler process using the default Redis connection
rqscheduler

如果您想使用不同的redis服务器,也可以这样做

rqscheduler --host localhost --port 6379 --db 0

脚本接受这些参数:

  • -H--host:要连接到的redis服务器
  • -p--port:要连接到的端口
  • -d--db:要使用的redis db
  • -P--password:连接到redis的密码
  • -b--burst:以突发模式运行(将执行时间为在过去并退出)
  • -i INTERVAL--interval INTERVAL:调度程序检查要添加到队列中的新作业的频率(以秒为单位,可以是浮点以获得更高的精度)。
  • -j--job-class:指定RQ要使用的自定义作业类(python module.class)
  • -q--queue-class:指定RQ要使用的自定义队列类(python module.class)

参数使用 名称相同,但前缀为RQ_REDIS_

在ubuntu上作为服务运行调度程序

sudo/etc/systemd/system/rqscheduler.service

[Unit]Description=RQScheduler
After=network.target

[Service]ExecStart=/home/<<User>>/.virtualenvs/<<YourVirtualEnv>>/bin/python \
    /home/<<User>>/.virtualenvs/<<YourVirtualEnv>>/lib/<<YourPythonVersion>>/site-packages/rq_scheduler/scripts/rqscheduler.py

[Install]WantedBy=multi-user.target

如果您的配置不是localhost或未在environmnt variabes中设置,则还需要添加任何命令行参数。

启动,检查状态并启用服务

sudo systemctl start rqscheduler.service
sudo systemctl status rqscheduler.service
sudo systemctl enable rqscheduler.service

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
java数据未插入SQLite数据库   Java中内存有效的对象创建   java在方法内部使用“this”(不用于调用方法、构造函数或变量)   java为什么这里会出现NullPointerException?   在REST中使用HATEOAS导致的java循环依赖   java如何定制spring boot横幅?   Java数字基数计算器(即基数10到基数5)   如果在Kotlin vs Java中声明,用作全局上下文的安卓 MainApplication类将崩溃   用于过滤对象的Java lambda函数   java从字符串数组中获取整数列表   java为什么Maven找不到org。json JPMS自动模块?   java将字符串数组转换为int   仅当与阈值字节匹配时,java才会在映射中填充字符串值