基于redis limpyd的队列/作业系统,python中的redis orm

redis-limpyd-jobs的Python项目详细描述


redis limpyd作业

基于 redis limpyd redis orm(python中的某种形式)

在哪里可以找到它:

  • Github存储库:https://github.com/limpyd/redis-limpyd-jobs" rel="nofollow">https://github.com/limpyd/redis limpyd jobs
  • PYPI包: https://pypi.python.org/pypi/redis-limpyd-jobs
  • 文档:http://documentup.com/limpyd/redis-limpyd-jobs" rel="nofollow">http://documentup.com/limpyd/redis limpyd jobs

安装:

支持python版本2.7和3.4到3.6(cpython和pypypy)。

支持redis py版本>;=2.10.0,<;2.11。

支持redis limpyd版本>;=1.2。

支持redis limpyd扩展版本>;=1.0。

pip install redis-limpyd-jobs

注意你实际上需要 redis-limpyd扩展 (最小v1.0) 除此之外,redis limpyd(最小v1.2) (两者都通过pypi自动安装)

工作原理

redis limpyd jobs 提供了三个 limpyd 模型( 队列 作业 错误 ,以及工人类。

这些模型实现了运行作业所需的最少资源 异步:

  • 使用 作业 模型存储要执行的操作
  • 队列 模型将存储具有优先级系统的作业列表
  • 错误 模型将存储所有错误
  • worker类将用于处理队列 并运行作业

简单示例

fromlimpyd_jobsimportSTATUSES,Queue,Job,Worker# The function to run when a job is called by the workerdefdo_stuff(job,queue):# here do stuff with your jobpass# Create a first job, name 'job:1', in a queue named 'myqueue', with a# priority of 1. The higher the priority, the sooner the job will runjob1=Job.add_job(identifier='job:1',queue_name='myqueue',priority=1)# Add another job in the same queue, with a higher priority, and a different# identifier (if the same was used, no new job would be added, but the# existing job's priority would have been updated)job2=Job.add_job(identifier='job:2',queue_name='myqueue',priority=2)# Create a worker for the queue used previously, asking to call the# "do_stuff" function for each job, and to stop after 2 jobsworker=Worker(queues='myqueue',callback=do_stuff,max_loops=2)# Now really run the jobsworker.run()# Here our jobs are done, our queue is emptyqueue1=Queue.get_queue('myqueue',priority=1)queue2=Queue.get_queue('myqueue',priority=2)# nothing waitingprintqueue1.waiting.lmembers(),queue2.waiting.lmembers()>>[][]# two jobs in success (show PKs of jobs)printqueue1.success.lmembers(),queue2.success.lmembers()>>['limpyd_jobs.models.Job:1','limpyd_jobs.models.Job:2']# Check our jobs statusesprintjob1.status.hget()==STATUSES.SUCCESS>>Trueprintjob2.status.hget()==STATUSES.SUCCESS>>True

你会注意到它的工作原理:

  • 作业.添加作业 以创建作业
  • worker() 创建一个worker,并设置 为每个作业运行哪个函数
  • 工人。运行 启动工人。

请注意,您可以运行任意数量的工人,即使在同一个 队列名称。在内部,我们使用 blpop redis命令获取作业 原子的。

但是你也可以只运行一个worker,只有一个队列, 回调中的不同内容取决于 工作的属性。

工人能够捕获sigint/sigterm信号,完成执行 退出前的当前作业。如果用于,例如 主管。

如果要在作业、队列或错误中存储更多信息,或者 要在员工身上有不同的行为,很容易,因为你可以 在limpyd作业中创建所有内容的子类 模型或工人类。

型号

< H4>作业

作业存储有关要运行的任务的所有必需信息。

注意:如果您想子类化作业模型以添加自己的字段, run 方法或其他方法,注意类必须在第一个 python模块(即不在父类或函数中)的级别工作,

工作字段
标识符

用于标识作业的字符串( 实例哈希字段 ,已索引)。

当使用(推荐的)add_job方法时,不能有 等待队列中具有相同标识符的许多作业。如果您创建 具有标识符的新作业,而具有相同标识符的其他作业仍在 同样的等待队列,做什么取决于两个队列的优先级 作业:-如果新作业的优先级较低(或相等),则将其丢弃- 如果新作业具有更高的优先级,则现有作业的优先级 更新到更高版本。

在这两种情况下, add_job 类方法都返回现有作业, 丢弃新的。

使用标识符的一种常见方法是,至少,存储 确定要应用任务的对象:-您可以 一个或多个队列用于一个唯一的任务,并且只存储 对象-您可以在每个 执行许多任务时,您可能也希望将任务存储在 标识符 字段:"任务:id"

注意,通过子类化 作业 模型,您可以添加新的 用于存储任务和其他所需参数的作业字段,如 参数(调整照片大小,发送消息…)

状态

一个字符串( instancehashfield ,已索引),用于存储 工作。

这只是一个字母,但我们提供了一个类来帮助详细使用它: 状态

fromlimpyd_jobsimportSTATUSESprintSTATUSES.SUCCESS>>"s"

通过 add_job 类方法创建作业时,其状态为 设置为状态等待或状态延迟 设置 延迟到 。当工人选择执行时, 状态传递到 状态。running 。完成后,它是 状态。成功 状态。错误 。另一个可用状态是 状态。已取消 ,如果要取消不带 将其从队列中移除。

您还可以显示状态的完整字符串:

printSTATUSES.by_value(my_job.status.hget())>>"SUCCESS"
优先级

用于存储 工作的优先级。

作业的优先级决定了它将在哪个队列对象中 存储。一个工人监听具有某些名称和不同类型的所有队列 优先权,但尊重优先权(相反)顺序:越高 优先级,作业执行得越快。

我们选择用"高优先级更好"的方式做事 提供始终以高于优先级添加作业的可能性 任何其他的。

直接更新作业的优先级不会更改 它被储存起来了。但当您通过(推荐)添加作业时 add_job 类方法,如果存在具有相同标识符的作业,则 将更新优先级(仅当新的优先级更高时)和作业 将移动到更高优先级队列。

添加了

存储日期和时间的字符串( instancehashfield ) 表示作业添加时间的 datetime.utcnow() ) 排队等候。

结合 end 字段计算作业很有用 持续时间。

开始

存储日期和时间的字符串( instancehashfield ) 获取作业的时间的表示 从队列中,在调用回调之前。

结合 end 字段计算作业很有用 持续时间。

结束

存储日期和时间的字符串( instancehashfield ) 表示作业设置时的 datetime.utcnow() ) 完成或出错,就在完成之后。

它是在结合 start 字段计算作业时很有用 持续时间。

尝试

保存为字符串( InstanceHashField )的整数,用于存储数字 执行任务的次数。如果是的话可能不止一个 出错后重新排队。

延迟至

日期时间的字符串表示形式( InstanceHashField ) 对象,直到作业可能在"延迟"列表中时(redis 队列的排序集。

通过传递 delayed_until 参数,它必须是一个 datetime 参数的延迟时间,必须是秒数(int或 或者一个 timedelta 对象。 延迟的参数将是 添加到当前时间( datetime.utcnow() )以计算 延迟到

如果一个作业在执行后出错,并且该工作进程有 正的 重新排队延迟_delta 属性, 延迟_直到 字段 将相应地设置,用于在 延迟,

排队

当前由队列管理时,此字段设置为 '1' : 等待,延迟,奔跑。此标志在调用时设置 排队或延迟,并在作业完成时由工作人员删除 已取消、已成功完成或已错误而未完成 要求。是这个字段被检查来测试同一个作业 调用"添加作业"时已存在。

取消打开错误

必须将此字段设置为 true 值(不要忘记 存储字符串,因此 0 将被保存为 "0" 所以 …因此,如果您想要 false 值:如果您不希望作业为 出错时重新排队。

请注意,如果要对所有a类作业执行此操作,则可能需要 设置为 true 该类的 always_cancel_on_error属性。

工作属性
队列模型

当通过 add_job 方法添加作业时,在 属性将用于获取或创建队列。默认设置为 队列 但如果要将其更新为自己的模型,则必须 将 作业的子类也划分为 模型,并更新此属性。

队列名称

默认情况下,可以在将 作业 类重写为 避免将队列名称参数传递给作业的方法 (尤其是添加作业

请注意,如果您不将 作业 模型子类化,则可以通过 队列模型 参数到 添加作业 方法。

总是在出错时取消

如果不希望此类的所有作业 如有错误,请重新排队。如果将其设置为默认值 ,您仍然可以通过设置它们的字段来逐个工作 在出现错误时取消设置 为a 真值 值。

作业属性和方法
标识 (属性)

ident 属性是模型+ 作业的主键,保存在队列中,允许检索 乔布斯.<

发生错误时必须取消(属性)

属性返回一个布尔值,指示 如果在执行过程中出现错误,则不得重新排队。

默认情况下,它将是 false ,但有一种方法可以改变这种情况 行为:

    SE将作业类的"总是取消"错误设置为 正确
  • 将作业的"Cancel"字段设置为"true"
持续时间 (属性)

duration 属性只返回用于计算 工作。如果 开始 结束 字段被设置,或在另一种情况下

运行 (方法)

这是工作的主要方法,你必须重写的唯一方法 当工作由工人执行时有一些凝灰岩。

此方法的返回值将传递给 工人,然后,如果定义了,则转到工作的 成功 方法。

默认情况下会引发一个未实现的错误。

参数:

  • 队列 :从中提取作业的队列。
重新排队 (方法)

requeue方法允许将作业放回等待状态(或 延迟)执行失败时排队。

参数:

  • 队列名称=无 保存作业的队列名称。如果没有 定义,将使用作业的类1。如果两者都未定义,则 出现异常。
  • 优先级=无 新作业的新优先级。如果没有定义, 这项工作将保持它的实际优先级。
  • 延迟到=无 工作真正重新排队的日期。真实的 延迟到 也可以通过将 延迟到 来设置 参数。
  • 延迟时间=无 timedelta 对象)等待作业真正重新排队。 它将计算作业的"延迟"字段
  • queue_model=none 用于存储队列的模型。默认情况下, 它被设置为 queue ,定义在 工作模式。如果未设置参数,则该属性将为 使用。注意在子类中将其设置为属性,或者 将使用requeue或默认队列模型中的参数 作业将不会保存在预期的队列模型中。
排队或延迟 (方法)

add_job requeue 中调用的方法 将作业放入等待队列或延迟队列,具体取决于 延迟到 。如果定义了这个参数,在将来, 作业被延迟,否则它只是排队。

此方法还将作业的队列标志设置为 '1'

参数:

  • 队列名称=无 保存作业的队列名称。如果没有 定义,将使用作业的类1。如果两者都未定义,则 出现异常。
  • 优先级=无 新作业的新优先级。利用工作的 如果未定义,则为实际值。
  • 延迟到=无 日期(必须是 日期时间 对象 一)的字符串表示,直到作业保留 在延迟的队列中。在此日期之前不会处理。
  • prepend=false 设置为 true 以在 等待列表,将首先执行(仅在未延迟的情况下)
  • queue_model=none 用于存储队列的模型。见 添加作业 和重新排队
开启 (重影方法)

如果在工作模型中定义了此方法(默认情况下不存在,即 "鬼魂")是当工作被工人抓到并即将被 执行("等待"状态)

参数:

  • 队列 :从中提取作业的队列。
< H6> 关于成功(ghost方法)

如果在工作模型中定义了此方法(默认情况下不存在,即 当工作执行成功时,工人会调用"ghost")。 (没有引发任何异常)。

参数:

  • 队列 :从中提取作业的队列。
  • 结果 由worker的 execute 方法返回的数据, 调用并返回作业的 run 方法的结果(或 提供给工人的 回调
关于错误 (重影方法)

如果在工作模型中定义了此方法(默认情况下不存在,即 "ghost")在作业执行失败时由工作程序调用(一个 引发异常)

参数:

  • 队列 :从中提取作业的队列。
  • 异常 :执行期间引发的异常。
  • 回溯 :异常发生时的回溯,如果 工作进程的save_tracebacks属性设置为 true
关于已跳过的 (重影方法)

如果在工作模型中定义了此方法(默认情况下不存在,即 "鬼魂")是当工人刚拿到工作时,叫不出来的 因为它的状态而被执行,而不是"等待"。另一种可能 原因是作业在执行期间被取消(按设置 其状态为 状态。已取消

  • 队列 :从中提取作业的队列。
在重新排队时 (重影方法)

如果在工作模型中定义了此方法(默认情况下不存在,即 当作业失败并且 由工人重新排队。

  • 队列 :从中提取作业的队列。
打开延迟 (重影方法)

如果在工作模型中定义了此方法(默认情况下不存在,即 当作业被延迟时(通过设置),工人调用"ghost 它在执行过程中的状态为 status.delayed (注意 可能还需要将作业值的 delayed_设置为正确的 一个日期时间(UTC日期时间的字符串表示),或工作进程 将延迟60秒)。

如果作业的状态设置为 状态。在队列的 等待列表中时被延迟。

  • 队列 :从中提取作业的队列。
工作类方法
添加作业

add_job 类方法是创建 一份工作。它将检查中是否已存在具有相同标识符的作业 队列(未完成),如果找到队列,则更新其优先级(和 将其移动到正确的队列中)。如果找不到现有的作业,则为新作业 将被创建并添加到队列中。

参数:

  • 标识符 标识符 字段的值。
  • 队列名称=无 保存作业的队列名称。如果没有 已定义,将使用类1。如果两者都未定义,则出现异常 升高。
  • 优先级=0 新作业的优先级,或 已经存在的作业,如果此优先级高于现有作业。
  • 队列模型 用于存储队列的模型。默认情况下, 设置为 queue ,在 作业 模型。如果未设置参数,则该属性将为 使用。注意在子类中将其设置为属性,或者 将使用 add_job 中的参数或默认的 队列模型 作业将不会保存在预期的队列模型中。
  • prepend=false 默认情况下,所有新作业应E在末尾添加 候补名单(从一开始,这是一个先进先出的名单),但是 您可以强制在等待列表的开头添加作业 要成为第一个执行的,只需设置 prepend 参数为 true 。如果作业已存在,则将在 列表的开头。
  • 延迟到=无 将来要执行的作业。如果定义和将来, 作业将被添加到延迟列表(redis排序集),而不是 等待的那个。真正的 延迟到 为 参数传递 delayed\u。
  • 延迟时间=无 timedelta 对象)在将作业添加到等待之前等待 名单。它将计算作业的"延迟"字段

如果使用 作业 模型的子类,则可以传递 方法的参数,只需将它们作为 参数,如果创建了新作业,则它们将被保存(但如果 在等待队列中找到现有作业)

获取车辆型号

返回模型的字符串表示形式,用于计算 标识工作的属性。

从标识符获取

从先前通过的 ident 属性获取的字符串返回作业 一份工作。

参数:

  • ident 一个字符串,包括作业和 它是主键,由 ident 属性返回。

排队

队列存储具有给定优先级的等待作业列表,并保留 成功工作和出错工作的列表。

队列字段
名称

由添加作业使用的字符串( InstanceHashField ,已索引) 方法查找要在其中存储它的队列。许多队列可以有 名称相同,但优先级不同。

工作进程还使用此名称来查找需要等待的队列 对于

优先级

用于存储 队列作业的优先级。队列中的所有作业都被认为具有 这个优先权。这就是为什么,正如对 作业 模型,更改作业的属性不会更改其真实属性 财产。但是添加(通过 作业的 add_job 类方法) 模型)具有相同队列名称的相同标识符的新作业可以 通过使用正确的 优先级。

如前所述,优先级越高,队列中的作业越快 将被处决。如果一个队列的优先级为2,而另一个队列的优先级为 在具有 优先级2将在其他优先级之前执行(至少获取)。 不考虑工人的数量。

等待

一个列表( listfield ),用于将作业的主键存储在等待中 状态。这是一个fifo列表:jobs被附加到右边(via rpush ),并从左侧提取(通过 blpop

获取时,将执行此列表中的作业,然后按 成功 错误 列表,具体取决于回调是否引发 是否例外。如果此等待列表中的作业不在等待中 状态,工人将跳过它。

成功

存储从中获取的作业主键的列表( listfield ) 等待列表并成功执行。

错误

存储从中获取的作业主键的列表( listfield ) 执行失败的等待列表。

延迟

存储延迟作业的排序集( sortedsetfield 延迟到未来的日期时间。时间戳表示 将 delayed_until 字段用作此排序集的分数, 以方便检索现在已准备好的作业。

队列属性

队列模型没有特定的属性。

队列属性和方法
第一个延迟的 (属性)

返回一个元组,表示延迟的 排队。它是一个带有作业pk和时间戳表示的元组 它的 延迟到 值(这是排序集的得分)。

如果延迟队列为空,则返回"无"。

第一个延迟时间 (属性)

返回第一个延迟作业的时间戳表示形式 就绪,如果延迟队列为空,则为无。

延迟作业 (方法)

将作业放入延迟队列。

参数:

  • 作业 要延迟的作业。
  • 延迟到 a 日期时间 对象指定作业的时间 应该放回等待队列。它将转换为 时间戳用作延迟列表的分数,这是一个redis 已排序集。
排队作业 (方法)

把工作列入等候名单。

参数:

  • 作业 要排队的作业。
  • prepend=false 设置为 true 以在 等待名单,第一个执行。
重新排队延迟的作业 (方法)

此方法将检查延迟队列中的所有作业 准备好执行并将其放回等待列表。

此方法将返回失败列表,每个失败都是元组 使用作业的 ident 属性返回的值,以及 导致故障的引发异常的消息。

不是说只有当作业的状态为 状态。延迟。它允许在之前取消延迟的作业。

队列类方法
获取队列

建议使用 get_queue 类方法获取 队列 对象。给定名称和优先级,它将返回找到的队列或 如果不存在匹配的队列,则创建一个队列。

参数:

  • 名称 要获取或创建的队列的名称。
  • 优先级 要获取或创建的队列的优先级。

如果使用 queue 模型的子类,则可以传递 将参数作为 参数,如果创建了新队列(但如果 找到现有队列)

获取等待键

get-waiting键返回所有现有的(waiting) 具有给定名称的队列,按优先级排序(相反的顺序是 最高优先级优先),然后是名字。返回值是 每个等待队列的redis键列表。它的 在内部被workers用作 blpop redis的参数 命令。< /P>

参数:

  • 名称 要考虑的队列的名称(可以是 字符串(如果是单个名称或字符串列表)
计数等待的作业

count_waiting_jobs方法返回仍在等待的作业数 等待给定的队列名称,组合所有优先级。

<精>恩斯:

  • 名称 要考虑的队列的名称(可以是 字符串(如果是单个名称或字符串列表)
计数延迟的作业

count_delayed_jobs方法返回仍然存在的作业数 对于给定的队列名称,合并所有优先级,延迟。

参数:

  • 名称 要考虑的队列的名称(可以是 字符串(如果是单个名称或字符串列表)
全部获取

get all方法返回给定的队列列表 姓名,

参数:

  • 名称 要考虑的队列的名称(可以是 字符串(如果是单个名称或字符串列表)
按优先级获取所有信息

class方法返回 按优先级排序的给定名称(最高优先级优先); 然后命名。

参数:

  • 名称 要考虑的队列的名称(可以是 字符串(如果是单个名称或字符串列表)

错误

错误 模型用于存储来自非 由工作人员成功执行。

其主要目的是能够按队列名称、作业筛选错误 模型、作业标识符、日期、异常类名或代码。你可以使用 您自己的 error 模型的子类,然后存储 字段,并对其进行筛选。

错误字段
工作模式

用于存储字符串的字符串( InstanceHashField ,已索引) 工作模型的表示。

工作包

一个字符串( instancehashfield ,已索引),用于存储 生成错误的作业。

idenfitier

存储 失败的作业。

队列名称

存储队列名称的字符串( instancehashfield ,已索引) 作业失败时已进入。

日期时间

字符串( InstanceHashField ,用 SimpleDateTimeIndex 索引) 存储错误的日期和时间(到第二个)(字符串表示 日期时间.utcnow() )。此字段已编制索引,因此您可以筛选 按日期和时间(字符串模式,而不是按部分日期和时间,即 日期时间 ,用于绘制错误图表。

日期

已弃用:这将被 日期时间 替换,但暂时保留以保持兼容性

一个字符串( instancehashfield ,已索引),用于存储日期(仅 错误的日期,而不是时间(字符串表示 日期时间.utcnow().date() )。此字段已编制索引,因此您可以筛选 按日期列出的错误,有助于绘制错误图。

时间

已弃用:这将被 日期时间 替换,但暂时保留以保持兼容性

一个字符串( instancehashfield )来存储时间(仅存储时间,而不是 错误的日期(字符串表示 日期时间.utcnow().time() )。

类型

存储错误类型的字符串( instancehashfield ,已索引)。 它是最初引发的异常的类名。

代码

存储 代码 最初引发的异常的属性。什么都没有储存 如果没有这样的属性,请在此输入。

消息

一个字符串( instancehashfield ),用于存储 最初引发的异常。

回溯

一个字符串( instancehashfield ),用于存储 最初引发的异常的回溯(工作进程可能没有 已填充)

错误属性和方法
日期时间

此属性根据 错误对象的日期时间字段。

错误类方法
添加错误

add_error 类方法是添加 通过接受将 当 变成 日期 时间 错误 变成 代码 消息

参数:

  • 队列名称 作业来自的队列的名称。

  • 作业 生成错误的作业,我们将从中提取 作业pk 标识符

  • 错误 从中提取代码和 消息:

  • when=none a datetime 从中提取日期的对象 和时间。

    如果未填写,将使用 datetime.utcnow()

  • 跟踪=无 要存储的跟踪,stringyfed。

如果使用 error 模型的子类,则可以传递 add_error方法的参数 参数,它们将保存在要创建的对象中。

收集工作

作业的集合是检索已分类错误的助手 对于给定的作业,更准确地说,对于此作业的所有实例 相同的标识符。

结果是一个 limpyd 集合,您可以使用 过滤器, 实例 …在它上面。

参数:

  • 作业 要出错的作业

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
java访问私有字段而不使用getter方法?   使用PowerMockito在JavaEWSAPI中模拟测试拉订阅   启动活动时未保存java首选项并清除变量   java如何在servlet中检索子域?斯普林有帮手吗   java使用Docker从命令行构建Android项目   java Android,ActionBar后退按钮(setDisplayHomeAsUpEnabled(true))重新创建父活动   java在重用FileOutputStream时应该关闭流吗?   java使用RESTAPI将文件上载到s3 bucket   Java SOAP Web服务应用程序中的mysql用户登录方法不工作   java使用多个数字计算百分比并转换为长   java Android SQLiteDatabase查询忽略空格   java如何在Javafx中比较两个字段文本   java错误:未设置java_HOME,在Eclipse安装后找不到   java在安卓中保存对象   java如何使用jaxws从返回List<Object>的服务中检索值   java Google OAuth2 JWT令牌验证异常   SpringMVC中的JavaUTF8编码问题,当从JSP表单发送POST请求中的越南语信件时   java从webview重定向到安卓应用程序   JUnit 5中多个扩展的java顺序