将redis用作启用优先级和基于时间的任务队列。
rpqueue的Python项目详细描述
说明
此包旨在提供基于优先级的远程任务队列解决方案 使用redis作为传输和持久层,使用json作为 交换格式。
从语义上讲,该模块实现了一个0/1或1+队列,具有可选的重试次数。 即,默认情况下,它会尝试执行每个任务一次,或手动执行>;1,或 >;1自动显示“可见性超时”。
如果“手动”重试任务引发异常,则不会自动重试, 但是您可以手动重试该任务并指定最大尝试。同样地, 对于具有可见性超时的任务,如果该任务引发异常或没有 完成后,将在提供的重试次数限制内重试。
请参阅下面的Retries部分。
开始
为了执行任务,必须确保rpqueue知道 可以执行的任务,必须将rpqueue配置为连接到 redis服务器,则必须启动任务执行后台程序:
from mytasks import usertasks1, usertasks2, ... import rpqueue rpqueue.set_redis_connection_settings(host, port, db) rpqueue.execute_tasks()
或者,rpqueue提供了一个命令行接口来执行同样的操作,尽管 必须提供导入所有模块或包的模块或包的名称 定义要运行的任务的包。例如:
# tasks.py from tasks import accounting, cleanup, ... # any other imports or configuration necessary, put them here # run from the command-line python -m rpqueue.run --module=tasks --host=... --port=... --db=...
示例使用
假设您有一个模块usertasks1,其中有一个要执行的任务 echo_to_stdout。您的模块可能如下所示:
from rpqueue import task @task def echo_to_stdout(message): print(message)
要调用上述任务,您可以使用:
echo_to_stdout.execute(...) echo_to_stdout.execute(..., delay=delay_in_seconds)
您还可以使用 periodic_taskdecorator:
@periodic_task(25, queue="low") def function1(): # Will be executed every 25 seconds from within the 'low' queue. pass
重试次数
任务可以提供一个可选的attempts参数,该参数指定 失败前任务尝试执行的总次数。由 默认情况下,所有任务都将attempts设置为1,除非另有指定:
@task(attempts=3) def fail_until_zero(value, **kwargs): try: if value != 0: value -= 1 raise Exception except: fail_until_zero.retry(value, **kwargs) else: print "succeeded"
如果传递值3,则永远不会打印“succeeded”。为什么?第一次 try的值为3,尝试次数为3,但失败。第二次传递的值为2, 尝试次数=2,但失败。第三次传递的值为1,尝试次数为1,失败,并且 重试返回而不重试。attempts值是 尝试,包括第一次和所有重试。
vis_超时的自动重试
包含在rpqueue 0.30.0或更高版本中,您可以提供任务(现在是数据队列) 可见性超时,即(按照amazon sqs样式的语义)一个时间,用于 在自动重新输入之前,任务必须正确执行 进入队列。:
@task(attempts=20, vis_timeout=5, use_dead=False) def usually_eventually_succeed(**kwargs): # (4/5)**20 is ~ 0.0115, so call chain fails about 1% of the time if not random.randrange(5): return "done!" time.sleep(6) # fail silently
死信任务队列
如果你想知道哪些任务失败了,失败的呼叫可以自动 进入死信队列。:
@rpqueue.task(attempts=5, vis_timeout=5, use_dead=True) def fails_to_dead(**kwargs): # (4/5)**5 is 0.32768, so call chain fails about 33% of the time if not random.randrange(5): return "done!" time.sleep(6) # fail silently task_deadletter = rpqueue.Data(rpqueue.DEADLETTER_QUEUE, is_tasks=True) dead_tasks = task_deadletter.get_data(items=5)
有关详细信息,请参见help(rpqueue.Data)。
等待任务执行
从版本19开始,rpqueue提供了在任务完成之前等待它的能力 开始执行:
@task def my_task(args): # do something executing_task = my_task.execute() if executing_task.wait(5): # task is either being executed, or it is done else: # task has not started execution yet
有了等待任务完成的能力,你可以 通过在else中插入对executing_task.cancel()的调用来添加截止日期 上面的方块。
自动存储任务结果
从版本19开始,rpqueue提供了存储 任务完成时:
@task(save_results=30) def task_with_results(): return 5 etask = task_with_results.execute() if etask.wait(5): print etask.result # should print 5
参数save_results可以传递给任务、周期性任务,甚至 cron任务(如下所述)。传递的值将是结果的长度 存储在redis中,以秒为单位。所有结果都必须是json可编码的。
附加功能
crontab
使用类似crontab的语法支持cron_任务需要python crontab 模块:http://pypi.python.org/pypi/crontab/,允许:
@cron_task('0 5 tue * *') def function2(): # Will be executed every Tuesday at 5AM. pass
数据队列
将数据放入队列,而不是任务。我是说,应该是从 开始,但它现在在这里。
- 便捷功能:
- 1-100个每次读取0个数据项,由您自行决定
- vis_timeout
- attempts
- use_dead
- 如果您想继续处理数据,请刷新数据(我们不识别读卡器,因此如果您希望保证独占性,则应使用显式锁)
几个例子:
# 0/1 queue dq = rpqueue.Data('best_effort') dq.put_data([item1, item2, item3, ...]) items = dq.get_data(2) # {<uuid>: <item>, ...} # Up to 5 deliveries, with 5 second delay before re-insertion dq5 = rpqueue.Data('retry_processing', attempts=5, vis_timeout=5) dq5.put_data([item1, item2, item3, ...]) items = dq5.get_data(2) # {<uuid>: <item>, ...} items2 = dq5.get_data(2, vis_timeout=20) # override timeout on read refreshed = set(dq5.refresh_data(items, vis_timeout=7)) # refresh our lock items = {k:v for k,v in items if k in refreshed} dq5.done_data(items) dq5.done_data(items2) # Up to 1 try with a 5 second delay before insertion into deadletter queue dqd = rpqueue.Data('retry_processing', attempts=1, vis_timeout=5, use_dead=True) dqd.put_data([item1, item2, item3, ...]) items = dqd.get_data(2) # {<uuid>: <item>, ...} items2 = dqd.get_data(2, vis_timeout=20) # override timeout on read refreshed = set(dqd.refresh_data(items, vis_timeout=7)) # refresh our lock items = {k:v for k,v in items if k in refreshed} dqd.done_data(items) time.sleep(20) # items2 are now "dead" dead = rpqueue.Data(rpqueue.DEADLETTER_QUEUE) dead_items = dead.get_data(2) # these have a different format, see docs!
更接近实际情况的较长示例:
aggregate_queue = rpqueue.Data("aggregate_stats", vis_timeout=30, use_dead=False) @rpqueue.periodic_task(60) def aggregate(): # If vis_timeout is not provided, will use the queue default. # If vis_timeout is <= 0, will act as a 0/1 queue, and later "done data" # calling is unnecessary. data = aggregate_queue.get_data(items=100, vis_timeout=5) # data is a dictionary: {<uuid>: <item>, <uuid>: <item>, ...} # do something with data done_with = [] for id, value in data.items(): # do something with value done_with.append(id) aggregate_queue.refresh_data(data) # still working! # You can pass any iterator that naturally iterates over the uuids you # want to be "done" with. aggregate_queue.done_data(done_with) # also okay: # aggregate_queue.done_data(data) # aggregate_queue.done_data(tuple(data)) # aggregate_queue.done_data(list(data))
赞助商
不喜欢lgpl?赞助这个项目,得到你想要的任何许可。
该项目部分由structd.com和hcaptcha.com赞助,两者都是 他们获得了符合他们需要的许可证。