将redis用作启用优先级和基于时间的任务队列。

rpqueue的Python项目详细描述


说明

此包旨在提供基于优先级的远程任务队列解决方案 使用redis作为传输和持久层,使用json作为 交换格式。

从语义上讲,该模块实现了一个0/1或1+队列,具有可选的重试次数。 即,默认情况下,它会尝试执行每个任务一次,或手动执行>;1,或 >;1自动显示“可见性超时”。

如果“手动”重试任务引发异常,则不会自动重试, 但是您可以手动重试该任务并指定最大尝试。同样地, 对于具有可见性超时的任务,如果该任务引发异常或没有 完成后,将在提供的重试次数限制内重试。

请参阅下面的Retries部分。

提供完整的文档:https://josiahcarlson.github.io/rpqueue/

开始

为了执行任务,必须确保rpqueue知道 可以执行的任务,必须将rpqueue配置为连接到 redis服务器,则必须启动任务执行后台程序:

from mytasks import usertasks1, usertasks2, ...
import rpqueue

rpqueue.set_redis_connection_settings(host, port, db)
rpqueue.execute_tasks()

或者,rpqueue提供了一个命令行接口来执行同样的操作,尽管 必须提供导入所有模块或包的模块或包的名称 定义要运行的任务的包。例如:

# tasks.py
from tasks import accounting, cleanup, ...
# any other imports or configuration necessary, put them here

# run from the command-line
python -m rpqueue.run --module=tasks --host=... --port=... --db=...

示例使用

假设您有一个模块usertasks1,其中有一个要执行的任务 echo_to_stdout。您的模块可能如下所示:

from rpqueue import task

@task
def echo_to_stdout(message):
    print(message)

要调用上述任务,您可以使用:

echo_to_stdout.execute(...)
echo_to_stdout.execute(..., delay=delay_in_seconds)

您还可以使用 periodic_taskdecorator:

@periodic_task(25, queue="low")
def function1():
    # Will be executed every 25 seconds from within the 'low' queue.
    pass

重试次数

任务可以提供一个可选的attempts参数,该参数指定 失败前任务尝试执行的总次数。由 默认情况下,所有任务都将attempts设置为1,除非另有指定:

@task(attempts=3)
def fail_until_zero(value, **kwargs):
    try:
        if value != 0:
            value -= 1
            raise Exception
    except:
        fail_until_zero.retry(value, **kwargs)
    else:
        print "succeeded"

如果传递值3,则永远不会打印“succeeded”。为什么?第一次 try的值为3,尝试次数为3,但失败。第二次传递的值为2, 尝试次数=2,但失败。第三次传递的值为1,尝试次数为1,失败,并且 重试返回而不重试。attempts值是 尝试,包括第一次和所有重试。

vis_超时的自动重试

包含在rpqueue 0.30.0或更高版本中,您可以提供任务(现在是数据队列) 可见性超时,即(按照amazon sqs样式的语义)一个时间,用于 在自动重新输入之前,任务必须正确执行 进入队列。:

@task(attempts=20, vis_timeout=5, use_dead=False)
def usually_eventually_succeed(**kwargs):
    # (4/5)**20  is ~ 0.0115, so call chain fails about 1% of the time
    if not random.randrange(5):
        return "done!"

    time.sleep(6) # fail silently

死信任务队列

如果你想知道哪些任务失败了,失败的呼叫可以自动 进入死信队列。:

@rpqueue.task(attempts=5, vis_timeout=5, use_dead=True)
def fails_to_dead(**kwargs):
    # (4/5)**5  is 0.32768, so call chain fails about 33% of the time
    if not random.randrange(5):
        return "done!"

    time.sleep(6) # fail silently

task_deadletter = rpqueue.Data(rpqueue.DEADLETTER_QUEUE, is_tasks=True)
dead_tasks = task_deadletter.get_data(items=5)

有关详细信息,请参见help(rpqueue.Data)

等待任务执行

从版本19开始,rpqueue提供了在任务完成之前等待它的能力 开始执行:

@task
def my_task(args):
    # do something

executing_task = my_task.execute()
if executing_task.wait(5):
    # task is either being executed, or it is done
else:
    # task has not started execution yet

有了等待任务完成的能力,你可以 通过在else中插入对executing_task.cancel()的调用来添加截止日期 上面的方块。

自动存储任务结果

从版本19开始,rpqueue提供了存储 任务完成时:

@task(save_results=30)
def task_with_results():
    return 5

etask = task_with_results.execute()
if etask.wait(5):
    print etask.result # should print 5

参数save_results可以传递给任务、周期性任务,甚至 cron任务(如下所述)。传递的值将是结果的长度 存储在redis中,以秒为单位。所有结果都必须是json可编码的。

附加功能

crontab

使用类似crontab的语法支持cron_任务需要python crontab 模块:http://pypi.python.org/pypi/crontab/,允许:

@cron_task('0 5 tue * *')
def function2():
    # Will be executed every Tuesday at 5AM.
    pass

数据队列

将数据放入队列,而不是任务。我是说,应该是从 开始,但它现在在这里。

便捷功能:
  • 1-100个每次读取0个数据项,由您自行决定
  • vis_timeout
  • attempts
  • use_dead
  • 如果您想继续处理数据,请刷新数据(我们不识别读卡器,因此如果您希望保证独占性,则应使用显式锁)

几个例子:

# 0/1 queue
dq = rpqueue.Data('best_effort')
dq.put_data([item1, item2, item3, ...])
items = dq.get_data(2) # {<uuid>: <item>, ...}

# Up to 5 deliveries, with 5 second delay before re-insertion
dq5 = rpqueue.Data('retry_processing', attempts=5, vis_timeout=5)
dq5.put_data([item1, item2, item3, ...])
items = dq5.get_data(2) # {<uuid>: <item>, ...}
items2 = dq5.get_data(2, vis_timeout=20) # override timeout on read
refreshed = set(dq5.refresh_data(items, vis_timeout=7)) # refresh our lock
items = {k:v for k,v in items if k in refreshed}
dq5.done_data(items)
dq5.done_data(items2)

# Up to 1 try with a 5 second delay before insertion into deadletter queue
dqd = rpqueue.Data('retry_processing', attempts=1, vis_timeout=5, use_dead=True)
dqd.put_data([item1, item2, item3, ...])
items = dqd.get_data(2) # {<uuid>: <item>, ...}
items2 = dqd.get_data(2, vis_timeout=20) # override timeout on read
refreshed = set(dqd.refresh_data(items, vis_timeout=7)) # refresh our lock
items = {k:v for k,v in items if k in refreshed}
dqd.done_data(items)
time.sleep(20)
# items2 are now "dead"
dead = rpqueue.Data(rpqueue.DEADLETTER_QUEUE)
dead_items = dead.get_data(2) # these have a different format, see docs!

更接近实际情况的较长示例:

aggregate_queue = rpqueue.Data("aggregate_stats", vis_timeout=30, use_dead=False)

@rpqueue.periodic_task(60)
def aggregate():
    # If vis_timeout is not provided, will use the queue default.
    # If vis_timeout is <= 0, will act as a 0/1 queue, and later "done data"
    # calling is unnecessary.
    data = aggregate_queue.get_data(items=100, vis_timeout=5)
    # data is a dictionary: {<uuid>: <item>, <uuid>: <item>, ...}
    # do something with data
    done_with = []
    for id, value in data.items():
        # do something with value
        done_with.append(id)

    aggregate_queue.refresh_data(data) # still working!

    # You can pass any iterator that naturally iterates over the uuids you
    # want to be "done" with.
    aggregate_queue.done_data(done_with)
    # also okay:
    # aggregate_queue.done_data(data)
    # aggregate_queue.done_data(tuple(data))
    # aggregate_queue.done_data(list(data))

赞助商

不喜欢lgpl?赞助这个项目,得到你想要的任何许可。

该项目部分由structd.com和hcaptcha.com赞助,两者都是 他们获得了符合他们需要的许可证。

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
Java泛型重写抽象方法并具有子类的返回类型   Java中的字符串反转字符,同时保留一些字符   java将系统时间与我获取它的时间进行比较   java解析ODATA URL以在准备entityset之前读取ID值   java中的有界通配符下界泛型即使在传递超类时也不会编译   c#Java的JVM和Java的内部工作方式有什么不同。NET的CLR?   java如何在windows7上指定JDK的版本?   Java:列出单个目录中的所有文件(1020000+)   java使用Logback和Lombok   安卓谷歌玩java。lang.NullPointerException   使用RSA的解密结果在普通Java和Android中有所不同   具有默认连接池的java Spring引导   java我如何在一个坏的测试环境中前进?