为rq(redis队列)提供作业调度功能
rq-scheduler的Python项目详细描述
要求
用法
安排一项工作需要做两件不同的事情:
- 将作业放入调度程序
- 运行计划程序,该计划程序将在时间到来时将计划的作业移动到队列中
安排作业
有两种方法可以安排工作。第一种是使用rq调度器的enqueue_at
fromredisimportRedisfromrqimportQueuefromrq_schedulerimportSchedulerfromdatetimeimportdatetimescheduler=Scheduler(connection=Redis())# Get a scheduler for the "default" queue# You can also instantiate a Scheduler using an RQ Queuequeue=Queue('foo',connection=Redis())scheduler=Scheduler(queue=queue)# Puts a job into the scheduler. The API is similar to RQ except that it# takes a datetime object as first argument. So for example to schedule a# job to run on Jan 1st 2020 we do:scheduler.enqueue_at(datetime(2020,1,1),func)# Date time should be in UTC# Here's another example scheduling a job to run at a specific date and time (in UTC),# complete with args and kwargs.scheduler.enqueue_at(datetime(2020,1,1,3,4),func,foo,bar=baz)
第二种方法是使用enqueue_in。而不是接受一个datetime对象, 此方法需要一个timedelta,并计划在其上运行作业 x秒/分/小时/天/周后。例如,如果我们想监控 流行的tweet是一天中的几次,我们可以做类似的事情
fromdatetimeimporttimedelta# Schedule a job to run 10 minutes, 1 hour and 1 day laterscheduler.enqueue_in(timedelta(minutes=10),count_retweets,tweet_id)scheduler.enqueue_in(timedelta(hours=1),count_retweets,tweet_id)scheduler.enqueue_in(timedelta(days=1),count_retweets,tweet_id)
important:使用RQ Scheduler时,应始终使用UTC日期时间。
周期性和重复性工作
从0.3版起,RQ Scheduler还支持创建周期性和重复的作业。 您可以通过schedule方法执行此操作。请注意,此功能需要 RQ>;=0.3.1。
您就是这样做的
scheduler.schedule(scheduled_time=datetime.utcnow(),# Time for first execution, in UTC timezonefunc=func,# Function to be queuedargs=[arg1,arg2],# Arguments passed into function when executedkwargs={'foo':'bar'},# Keyword arguments passed into function when executedinterval=60,# Time before the function is called again, in secondsrepeat=10,# Repeat this number of times (None means repeat forever)meta={'foo':'bar'}# Arbitrary pickleable data on the job itself)
重要提示:如果设置了重复作业,则必须确保 请不要设置result\u ttl值,或者设置的值大于间隔。 否则,包含作业详细信息的条目将过期,并且不会重新安排作业。
cron作业
从0.6.0版起,RQ Scheduler还支持创建cron作业,可以用于 重复作业,以固定的时间、日期或间隔定期运行,以查看更多信息 https://en.wikipedia.org/wiki/Cron。您可以通过cron方法执行此操作。
您就是这样做的
scheduler.cron(cron_string,# A cron string (e.g. "0 0 * * 0")func=func,# Function to be queuedargs=[arg1,arg2],# Arguments passed into function when executedkwargs={'foo':'bar'},# Keyword arguments passed into function when executedrepeat=10,# Repeat this number of times (None means repeat forever)queue_name=queue_name,# In which queue the job should be put inmeta={'foo':'bar'}# Arbitrary pickleable data on the job itself)
检索计划作业
有时你需要知道哪些工作已经安排好了。你可以得到一个 使用get_jobs方法排队的作业列表
list_of_job_instances=scheduler.get_jobs()
这个方法以最简单的形式(如上面的例子所示)返回一个列表 当前计划执行的所有作业实例。
此外,该方法还接受两个可选的关键字参数until和 with_times。第一个选项指定计划作业的时间点 应该归还。它可以作为datetime/timedelta实例 或表示自纪元(1970-01-01 00:00:00)以来秒数的整数。 第二个参数是一个布尔值,它决定是否执行计划 时间应该与作业实例一起返回。
示例
# get all jobs until 2012-11-30 10:00:00list_of_job_instances=scheduler.get_jobs(until=datetime(2012,10,30,10))# get all jobs for the next hourlist_of_job_instances=scheduler.get_jobs(until=timedelta(hours=1))# get all jobs with execution timesjobs_and_times=scheduler.get_jobs(with_times=True)# returns a list of tuples:# [(<rq.job.Job object at 0x123456789>, datetime.datetime(2012, 11, 25, 12, 30)), ...]
检查作业是否已安排
您可以检查特定的作业实例或作业ID是否为 使用熟悉的pythonin运算符执行
ifjob_instanceinscheduler:# Do something# orifjob_idinscheduler:# Do something
取消作业
要取消作业,只需将Job或作业ID传递给scheduler.cancel
scheduler.cancel(job)
注意,无论是否找到指定的作业,此方法都返回None。
运行调度程序
RQ Scheduler附带运行调度程序的脚本rqscheduler。 每分钟轮询redis一次并将计划的作业移动到 需要执行的相关队列
# This runs a scheduler process using the default Redis connection
rqscheduler
如果您想使用不同的redis服务器,也可以这样做
rqscheduler --host localhost --port 6379 --db 0
脚本接受这些参数:
- -H或--host:要连接到的redis服务器
- -p或--port:要连接到的端口
- -d或--db:要使用的redis db
- -P或--password:连接到redis的密码
- -b或--burst:以突发模式运行(将执行时间为在过去并退出)
- -i INTERVAL或--interval INTERVAL:调度程序检查要添加到队列中的新作业的频率(以秒为单位,可以是浮点以获得更高的精度)。
- -j或--job-class:指定RQ要使用的自定义作业类(python module.class)
- -q或--queue-class:指定RQ要使用的自定义队列类(python module.class)
参数使用 名称相同,但前缀为RQ_REDIS_。
在ubuntu上作为服务运行调度程序
sudo/etc/systemd/system/rqscheduler.service
[Unit]Description=RQScheduler After=network.target [Service]ExecStart=/home/<<User>>/.virtualenvs/<<YourVirtualEnv>>/bin/python \ /home/<<User>>/.virtualenvs/<<YourVirtualEnv>>/lib/<<YourPythonVersion>>/site-packages/rq_scheduler/scripts/rqscheduler.py [Install]WantedBy=multi-user.target
如果您的配置不是localhost或未在environmnt variabes中设置,则还需要添加任何命令行参数。
启动,检查状态并启用服务
sudo systemctl start rqscheduler.service
sudo systemctl status rqscheduler.service
sudo systemctl enable rqscheduler.service