zope3的远程处理队列

m01.remote的Python项目详细描述


自述文件

此软件包提供远程处理器。此远程处理器实现为 使用MongoDB作为存储的简单对象。处理器可以执行pre 在另一个线程中定义的作业。也可以在特定的 使用不同计划程序项的时间。

远程处理器使用两个不同的处理器。一个处理作业和 其他从调度程序中选取项并正在添加作业。这种分离 如果您实现了一个分布式概念,那么它是有用的。这意味着一个或多个 应用程序可以基于给定的计划程序项计划作业项。和 另一个应用程序正在处理作业,不知道如何调度 下一个项目。

由于我们将这个远程调度程序用于低CPU密集型作业,因此我们提供了 处理。这是通过在主工作进程中运行多个工作进程来完成的 线程。如果使用子流程进行作业处理,将得到 不限于当前python进程的多处理处理器。

您可以在 远程处理器。请参见JobWorkerArguments/MaxThreads。默认情况下,此号码 使用计算机上安装的CPU数量。

该实现使用MongoDB作为其组件的存储。这意味着 作业、作业工厂和调度程序项使用 orm概念来自m01.mongo。

有关基于zodb的远程处理器实现,请参见p01.remote,但请注意 p01.remote实现不提供worker和scheduler 处理器分离。至少还没有。

设置

< Buff行情>
>>> import transaction
>>> from pprint import pprint
>>> import zope.component
>>> import m01.mongo
>>> from m01.mongo import UTC
>>> import m01.remote.job
>>> from m01.remote import testing

现在让我们从创建两个远程处理器开始。我们可以使用远程队列 站点实现:

< Buff行情>
>>> from zope.security.proxy import removeSecurityProxy
>>> from m01.remote import interfaces

我们的测试远程处理器应作为应用程序根目录:

< Buff行情>
>>> rp = root
>>> rp
<TestProcessor None>

让我们发现可用的作业:

< Buff行情>
>>> dict(root._jobs)
{}

作业容器最初是空的,因为我们没有添加任何作业 工厂。现在让我们定义一个简单地回显输入字符串的作业工厂:

< Buff行情>
>>> echoJob = testing.EchoJob({})

现在我们可以设置作业输入:

< Buff行情>
>>> echoJob.input = {'foo': u'blah'}

作业的唯一api要求是可调用。现在我们要确保 这工作管用。注意,我们用远程处理器实例调用我们的作业 是我们初始化的应用程序根目录:

< Buff行情>
>>> echoJob(root)
{'foo': u'blah'}

让我们将作业添加到可用作业列表:

< Buff行情>
>>> rp.addJobFactory(u'echo', echoJob)

回声作业现在在远程处理器中可用:

< Buff行情>
>>> dict(rp._jobFactories)
{u'echo': <EchoJob u'echo'>}

由于远程处理器无法立即完成作业,因此传入的作业 由队列管理。首先,我们请求执行echo作业:

< Buff行情>
>>> import transaction
>>> from pprint import pprint
>>> import zope.component
>>> import m01.mongo
>>> from m01.mongo import UTC
>>> import m01.remote.job
>>> from m01.remote import testing
0
>>> import transaction
>>> from pprint import pprint
>>> import zope.component
>>> import m01.mongo
>>> from m01.mongo import UTC
>>> import m01.remote.job
>>> from m01.remote import testing
1

函数调度要执行的名为"echo"的作业 用指定的参数。方法返回一个作业ID,我们可以使用它 询问工作的情况。函数将作业标记为排队。

< Buff行情>
>>> import transaction
>>> from pprint import pprint
>>> import zope.component
>>> import m01.mongo
>>> from m01.mongo import UTC
>>> import m01.remote.job
>>> from m01.remote import testing
2

由于作业尚未处理,因此状态设置为"排队"。进一步, 目前还没有结果:

< Buff行情>
>>> import transaction
>>> from pprint import pprint
>>> import zope.component
>>> import m01.mongo
>>> from m01.mongo import UTC
>>> import m01.remote.job
>>> from m01.remote import testing
3

只要不处理作业,就可以取消它:

< Buff行情>
>>> import transaction
>>> from pprint import pprint
>>> import zope.component
>>> import m01.mongo
>>> from m01.mongo import UTC
>>> import m01.remote.job
>>> from m01.remote import testing
4
>>> import transaction
>>> from pprint import pprint
>>> import zope.component
>>> import m01.mongo
>>> from m01.mongo import UTC
>>> import m01.remote.job
>>> from m01.remote import testing
5

默认情况下,工作处理器不会启动:

< Buff行情>
>>> import transaction
>>> from pprint import pprint
>>> import zope.component
>>> import m01.mongo
>>> from m01.mongo import UTC
>>> import m01.remote.job
>>> from m01.remote import testing
6

要获得一个干净的日志环境,让我们清除日志堆栈:

>>> import transaction
>>> from pprint import pprint
>>> import zope.component
>>> import m01.mongo
>>> from m01.mongo import UTC
>>> import m01.remote.job
>>> from m01.remote import testing
7

现在我们可以通过调用start processor启动远程处理器 < Buff行情>

>>> import transaction
>>> from pprint import pprint
>>> import zope.component
>>> import m01.mongo
>>> from m01.mongo import UTC
>>> import m01.remote.job
>>> from m01.remote import testing
8

然后瞧-远程处理器正在处理:

< Buff行情>
>>> import transaction
>>> from pprint import pprint
>>> import zope.component
>>> import m01.mongo
>>> from m01.mongo import UTC
>>> import m01.remote.job
>>> from m01.remote import testing
9

签出日志将证明启动了remoTE处理器:

<阻塞率> AAAAAAA H20

让我们再次停止处理器:

<阻塞率> AAAAAAAAA 21

现在,让我们从已处理的作业中获取结果,但首先提交新添加的作业:

<阻塞率> 啊! AAAAAAAAA 23

现在创建一个工人并通过调用我们的简单工人来处理新的工作:

<阻塞率> 啊! 大花 啊!

首先检查作业是否得到处理:

<阻塞率> 是啊。 AAAAAAAAA 28

错误处理

现在,让我们定义一个导致错误的新作业:

<阻塞率> AAAAAAA 29

现在添加并执行:

<阻塞率> AAAAAAA 30 啊!

现在让我们看看发生了什么:

<阻塞率> 啊!

搜索作业错误项提供以下数据:

<阻塞率> 是啊。

如您所见,存储为tb的回溯是最重要的信息:

<阻塞率> 啊!

也可以尝试一个不太好的错误:

<阻塞率> 啊!

现在添加并执行:

<阻塞率> 啊! AAAAAAA 37 AAAAAAAAA 38 啊!

再次处理作业,但首先将retrytime设置为过时的值 将模拟上次通话后的时间流逝:

<阻塞率> 啊! AAAAAAA 37 AAAAAAA 42 啊!

第三次再做一次。现在它不会重新引发异常 但错误信息会附加到错误列表中。

<阻塞率> 啊! 啊!45!

现在让我们看看发生了什么:

<阻塞率> 啊! 啊! AAAAAAA 48 AAAAAAA 49 啊!

出于管理目的,远程处理器还允许您检查所有 乔布斯:

<阻塞率> 啊!

为了摆脱不再需要的工作,我们可以使用reomvejobs方法。

<阻塞率> AAAAAAA 52 AAAAAAA 53 啊! YYY11

现在处理最后一个挂起的作业,并确保没有更多的作业:

<阻塞率> 啊!

线程行为

每个远程处理器在一个单独的线程中运行,允许它们操作 独立地。作业的设计应避免冲突错误。

让我们启动在这一点上定义的远程处理器,看看 因此线程正在运行:

啊!

让我们对远程处理器进行p处理,让后台线程有机会 消息:

AAAAAAA 58

线程已退出:

啊!

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
从文本文件中读取时显示java符号“ï»”   java在有很多生产商的情况下如何改进Disruptor?   不同线程的java不同堆栈   用Java模拟oraclespool   jsp java访问自定义web中的错误信息。xml错误页   给出奇怪结果的java集成堆栈   java在jsp中显示值列表   java会话。保存更新具有错误ID的实体   在树数据结构中添加节点时的java递归   java在Spring Data Mongodb中使用$$ROOT检索整个文档   java我应该把图像放在罐子里还是不放在罐子里?(Inno设置)   java将bat文件放入jar文件中   Java:如何在节点上执行XPath查询   控制台应用程序如何在Mac上从Java输出重音字符?