zope3的远程处理队列
m01.remote的Python项目详细描述
自述文件
此软件包提供远程处理器。此远程处理器实现为 使用MongoDB作为存储的简单对象。处理器可以执行pre 在另一个线程中定义的作业。也可以在特定的 使用不同计划程序项的时间。
远程处理器使用两个不同的处理器。一个处理作业和 其他从调度程序中选取项并正在添加作业。这种分离 如果您实现了一个分布式概念,那么它是有用的。这意味着一个或多个 应用程序可以基于给定的计划程序项计划作业项。和 另一个应用程序正在处理作业,不知道如何调度 下一个项目。
由于我们将这个远程调度程序用于低CPU密集型作业,因此我们提供了 处理。这是通过在主工作进程中运行多个工作进程来完成的 线程。如果使用子流程进行作业处理,将得到 不限于当前python进程的多处理处理器。
您可以在 远程处理器。请参见JobWorkerArguments/MaxThreads。默认情况下,此号码 使用计算机上安装的CPU数量。
该实现使用MongoDB作为其组件的存储。这意味着 作业、作业工厂和调度程序项使用 orm概念来自m01.mongo。
有关基于zodb的远程处理器实现,请参见p01.remote,但请注意 p01.remote实现不提供worker和scheduler 处理器分离。至少还没有。
设置
< Buff行情>>>> import transaction >>> from pprint import pprint >>> import zope.component >>> import m01.mongo >>> from m01.mongo import UTC >>> import m01.remote.job >>> from m01.remote import testing
现在让我们从创建两个远程处理器开始。我们可以使用远程队列 站点实现:
< Buff行情>>>> from zope.security.proxy import removeSecurityProxy >>> from m01.remote import interfaces
我们的测试远程处理器应作为应用程序根目录:
< Buff行情>>>> rp = root >>> rp <TestProcessor None>
让我们发现可用的作业:
< Buff行情>>>> dict(root._jobs) {}
作业容器最初是空的,因为我们没有添加任何作业 工厂。现在让我们定义一个简单地回显输入字符串的作业工厂:
< Buff行情>>>> echoJob = testing.EchoJob({})
现在我们可以设置作业输入:
< Buff行情>>>> echoJob.input = {'foo': u'blah'}
作业的唯一api要求是可调用。现在我们要确保 这工作管用。注意,我们用远程处理器实例调用我们的作业 是我们初始化的应用程序根目录:
< Buff行情>>>> echoJob(root) {'foo': u'blah'}
让我们将作业添加到可用作业列表:
< Buff行情>>>> rp.addJobFactory(u'echo', echoJob)
回声作业现在在远程处理器中可用:
< Buff行情>>>> dict(rp._jobFactories) {u'echo': <EchoJob u'echo'>}
由于远程处理器无法立即完成作业,因此传入的作业 由队列管理。首先,我们请求执行echo作业:
< Buff行情>>>> import transaction >>> from pprint import pprint >>> import zope.component >>> import m01.mongo >>> from m01.mongo import UTC >>> import m01.remote.job >>> from m01.remote import testing0
>>> import transaction >>> from pprint import pprint >>> import zope.component >>> import m01.mongo >>> from m01.mongo import UTC >>> import m01.remote.job >>> from m01.remote import testing1
函数调度要执行的名为"echo"的作业 用指定的参数。方法返回一个作业ID,我们可以使用它 询问工作的情况。函数将作业标记为排队。
< Buff行情>>>> import transaction >>> from pprint import pprint >>> import zope.component >>> import m01.mongo >>> from m01.mongo import UTC >>> import m01.remote.job >>> from m01.remote import testing2
由于作业尚未处理,因此状态设置为"排队"。进一步, 目前还没有结果:
< Buff行情>>>> import transaction >>> from pprint import pprint >>> import zope.component >>> import m01.mongo >>> from m01.mongo import UTC >>> import m01.remote.job >>> from m01.remote import testing3
只要不处理作业,就可以取消它:
< Buff行情>>>> import transaction >>> from pprint import pprint >>> import zope.component >>> import m01.mongo >>> from m01.mongo import UTC >>> import m01.remote.job >>> from m01.remote import testing4
>>> import transaction >>> from pprint import pprint >>> import zope.component >>> import m01.mongo >>> from m01.mongo import UTC >>> import m01.remote.job >>> from m01.remote import testing5
默认情况下,工作处理器不会启动:
< Buff行情>>>> import transaction >>> from pprint import pprint >>> import zope.component >>> import m01.mongo >>> from m01.mongo import UTC >>> import m01.remote.job >>> from m01.remote import testing6
要获得一个干净的日志环境,让我们清除日志堆栈:
>>> import transaction >>> from pprint import pprint >>> import zope.component >>> import m01.mongo >>> from m01.mongo import UTC >>> import m01.remote.job >>> from m01.remote import testing7
现在我们可以通过调用start processor启动远程处理器 < Buff行情>
>>> import transaction >>> from pprint import pprint >>> import zope.component >>> import m01.mongo >>> from m01.mongo import UTC >>> import m01.remote.job >>> from m01.remote import testing8
然后瞧-远程处理器正在处理:
< Buff行情>>>> import transaction >>> from pprint import pprint >>> import zope.component >>> import m01.mongo >>> from m01.mongo import UTC >>> import m01.remote.job >>> from m01.remote import testing9
签出日志将证明启动了remoTE处理器:
<阻塞率> AAAAAAA H20让我们再次停止处理器:
<阻塞率> AAAAAAAAA 21现在,让我们从已处理的作业中获取结果,但首先提交新添加的作业:
<阻塞率> 啊! AAAAAAAAA 23现在创建一个工人并通过调用我们的简单工人来处理新的工作:
<阻塞率> 啊! 大花 啊!首先检查作业是否得到处理:
<阻塞率> 是啊。 AAAAAAAAA 28错误处理
现在,让我们定义一个导致错误的新作业:
<阻塞率> AAAAAAA 29现在添加并执行:
<阻塞率> AAAAAAA 30 啊!现在让我们看看发生了什么:
<阻塞率> 啊!搜索作业错误项提供以下数据:
<阻塞率> 是啊。如您所见,存储为tb的回溯是最重要的信息:
<阻塞率> 啊!也可以尝试一个不太好的错误:
<阻塞率> 啊!现在添加并执行:
<阻塞率> 啊! AAAAAAA 37 AAAAAAAAA 38 啊!再次处理作业,但首先将retrytime设置为过时的值 将模拟上次通话后的时间流逝:
<阻塞率> 啊! AAAAAAA 37 AAAAAAA 42 啊!第三次再做一次。现在它不会重新引发异常 但错误信息会附加到错误列表中。
<阻塞率> 啊! 啊!45!现在让我们看看发生了什么:
<阻塞率> 啊! 啊! AAAAAAA 48 AAAAAAA 49 啊!出于管理目的,远程处理器还允许您检查所有 乔布斯:
<阻塞率> 啊!为了摆脱不再需要的工作,我们可以使用reomvejobs方法。
<阻塞率> AAAAAAA 52 AAAAAAA 53 啊! YYY11现在处理最后一个挂起的作业,并确保没有更多的作业:
<阻塞率> 啊!线程行为
每个远程处理器在一个单独的线程中运行,允许它们操作 独立地。作业的设计应避免冲突错误。
让我们启动在这一点上定义的远程处理器,看看 因此线程正在运行:
啊!让我们对远程处理器进行p处理,让后台线程有机会 消息:
AAAAAAA 58线程已退出:
啊!