多线程云队列客户端。

task-queue的Python项目详细描述


[![构建状态](https://travis-ci.org/seung-lab/python-task-queue.svg?branch=master)(https://travis-ci.org/seung-lab/python-task-queue)[![PYPI版本](https://badge.fury.io/py/task queue.svg)](https://badge.fury.io/py/task-queue)

/>##安装

``bash
pip install numpy确保首先在单独的行上执行此操作
pip install task queue
```

task queue使用位于`$home/.cloudvolume/secrets/``中的cloudvolume机密。使用aws sqs作为队列后端时,必须提供"$home/.cloudvolume/secrets/aws secret.json"。有关其他说明,请参阅[CloudVolume]repo(https://github.com/seung-lab/cloud-volume)。

用法

定义从taskqueue.registeredtask继承的类并实现"execute"方法。registeredtasks包含将其属性呈现为json负载的逻辑,并且可以重新组合为任务队列另一侧的活动类。

任务可以在本地或云中加载到队列中,稍后执行。下面是一个简单的"printtask"的示例实现。容器类的属性应该是简单的值,可以很容易地编码成json,比如int、float、string和numpy数组。让"execute"方法下载并操作较重的数据。如果您感到好奇,可以通过调用task.payload()`.

``python
from taskqueue import registeredtask

class printtask(registeredtask):
def初始化(self,txt='):
super(printtask,self)。
传递给super()的es。会自动分配给init;使用此空间执行其他处理,例如:
self.txt=str(txt)

def execute(self):
如果self.txt:
print(str(self)+":"+self.txt)
否则:
print(self)
````

##本地用法

对于小作业,您可能需要使用一个或多个进程来执行任务:
``python
从taskqueue导入localtaskqueue

使用localtaskqueue(parallel=5)作为tq:使用5个进程
对于范围(1000):
tq.insert(
printtask(i)

```
这将加载队列中的1000个打印任务,然后跨五个进程执行这些任务。

生成任务并将其插入到云队列中。

`` python
来自taskqueue import taskqueue

new and fast
tq=taskqueue('sqs-queue-name')
tq.insert all((printtask(i)for i in range(1000)))可以使用列表或生成器

旧的10x较慢的替代方案
,taskqueue('sqs-queue-name')作为tq:
for i在范围(1000)内:
tq.insert(printtask(i))
```


这会将1000个printtask json描述插入到sqs队列中。

_模块包含registeredtasks

tq=task queue('sqs-queue-name')
tq.poll(lease_seconds=int(lease_seconds))
````

poll将定期检查sqs是否有新任务。如果找到任务,它将立即执行该任务,从队列中删除该任务,并请求另一个任务。如果找不到任何任务,则内置高达60秒的随机指数退避,以防止工作人员试图拒绝拒绝服务队列。如果任务未能完成,则该任务最终将在sqs队列中循环,确保所有任务最终都将完成,前提是它们在某些方面没有基本缺陷。

. 无依赖性)分布式任务执行引擎(如[Igneous](https://github.com/seung lab/Igneous/))经常使用基于云的队列,如Amazon Simple Queue Service(SQS)。在connectomics领域,我们处理petascale图像,每次运行需要生成数十万或数百万个云任务。在一个例子中,我们正在处理一个大图像的串行块,其中每个块都依赖于前一个块的完成。每个块的运行都需要生成和上载数百万个任务,并使用数千个工人。工人们会很快地耗尽任务队列,确保它能够足够快地被喂养以防止这个巨大集群的饥饿是很重要的。

实现这一点有一些策略。一种方法可能是使用一个功能齐全的DAG支持引擎,它可能会根据需要生成下一个任务。然而,我们对sqs很有经验,并围绕它设计了我们的架构。此外,根据我们的经验,它对上千台机器的敲击是很坚固的。这并不意味着可能有更好的方法存在,只是这对我们来说很方便。


接受这个约束,填充sqs队列的两种主要方法将是生成任务,该任务可以征集成百上千个处理器,或者我们只需使我们的任务生成客户机的速度和内存效率,并使用少量的核心进行多处理。保持简单和本地允许更大的操作灵活性,添加多处理执行引擎允许在不大量修改体系结构的情况下省略小作业的云服务。重要的是,小规模性能的提高并不妨碍后续的开发。

默认情况下,python任务队列库是单线程和阻塞的,因此每秒最多可上载数十个任务。使用线程、可变进程和批处理请求可以做得更好。据观察,taskqueue在一台机器上每秒的单核任务数超过3000个,每秒多核任务数超过10000个。这足以保持一个巨大的集群馈送,并允许巨大的程序员灵活性,因为他们可以使用简单的脚本从本地计算机填充队列。


s来调整队列。默认情况下,taskqueue将使用其线程模型提供每秒数百个任务的上载速率。我们将通过渐进式示例演示如何优化上载脚本,以获得每秒数千个任务,且延迟和内存使用率接近于零。

``python
1:每秒100秒,内存使用率高,非零延迟

tasks=[printtask(i)for i in range(1000000)]
使用task queue('sqs-queue-name')作为tq:
对于任务中的任务:
tq.insert(task)
`````

赤裸裸地使用博托。但是,任务列表的初始生成会占用大量内存,并在生成列表时引入延迟。

`` python
清单2:100s/秒,通常是低内存使用率,接近零延迟

taskqueue('sqs-queue-name')作为tq:
对于范围(1000000)内的i:
tq.insert(printtask(i))
```

清单2生成任务并同时开始上载它们。最初,这会导致内存使用率低,但有时任务生成开始超过上载,并导致内存使用率增长。无论如何,上传会立即开始,因此不会引入延迟。


`` python
清单3:100-1000s/秒,高内存使用率,非零延迟cy

tasks=[printtask(i)for i in range(1000000)]
,taskqueue('sqs-queue-name')为tq:
tq.insert_all(tasks)
`````

清单3利用了sqs批上载的优势,它允许一次提交10个任务。由于提交任务的开销主要是http/1.1tcp/ip连接开销,因此对10个请求进行批处理可以使性能提高近10倍。但是,在本例中,我们再次预先创建了所有任务,以便正确地对它们进行批处理,这会导致与清单1相同的内存和延迟问题。


``python
使用生成器而不是列表。生成器本质上是按需计算下一个列表元素的惰性列表。定义一个生成器是快速的,并且需要恒定的时间,因此我们几乎可以立即开始生产新的元素。这些元素按需生产并立即消耗,从而产生一个小的恒定内存开销,通常可以用千字节到兆字节来度量。

由于生成器不支持"len"运算符,因此我们手动传入项目数以显示进度条。

``python
ue import greenttaskqueue

tasks=(printtask(i)for i in range(1000000))
使用greenttaskqueue('sqs-queue-name')作为tq:
tq.insert懔all(tasks,total=(end-start))
`````

在清单5中,我们将taskqueue替换为greenttaskqueue。在幕后,taskqueue依赖python内核线程来实现并发io。然而,在具有多核的系统上,特别是在病毒化或numa上下文中,os将倾向于在核之间相当均匀地分配线程,从而导致较高的上下文切换开销。具有讽刺意味的是,一个更强大的多核系统会导致更低的性能。为了解决这个问题,我们引入了一个使用gevent的用户空间协作线程模型(绿色线程)(这取决于您的系统是使用libev还是libuv作为事件循环)。

这可能导致某些系统的性能大幅提高。通常情况下,单核将以极低的开销得到充分利用。然而,在python中使用与网络io协作的线程需要对标准库进行monkey修补(!!!)拒绝修补标准库将导致单线程性能。因此,使用greenttaskqueue可以将问题引入许多更大的应用程序(我们已经看到了多处理和ipython的问题)。但是,任务上载脚本通常可以与系统的其他部分隔离,这允许安全地执行猴子修补。为了让用户更好地控制何时他们希望接受猴子补丁的风险,它不会自动执行,并且会出现一个警告,说明如何修改您的程序。

`` python
清单6:1000s-10000/秒,低内存使用率,接近零延迟,高效的多处理


import gevent.monkey
gevent.monkey.patch_all()
from taskqueue import greentaskqueue
from concurrent.futures import processpoolexecutor

def up加载(参数):
start,end=args
tasks=(printtask(i)for i in range(start,end))
将greenttaskqueue('sqs-queue-name')作为tq:
tq.插入所有(tasks,total=(end-start))

task\ranges=[(0,250000),(250000,500000),(500000,750000),(750000,1000000)]processpoolexecutor(max_workers=4)作为池:
pool.map(upload,task_ranges)
````

有三个这个结构的关键部分需要注意。

首先,我们不使用通常的"多处理"包,而是使用"concurrent.futures.processpoolexecutor"。如果子进程在"多处理"中死亡,父进程将简单地挂起(不幸的是,这是设计的…)。使用此替代包,将至少引发异常。

其次,我们将任务生成的参数传递给子进程,而不是任务。无法在cpython[1]中将生成器从父进程传递到子进程。直接传递任务也是低效的,因为它要求首先生成任务(如清单1所示),然后在传递给子进程时无形地对其进行pickling和unpickling。因此,我们只传递少量可拾取的小对象,这些对象用于在另一侧构造任务生成器。

第三,如清单5所述,greentaskqueue比普通多线程taskqueue具有更少的上下文切换开销。使用greenttaskqueue将使每个核心高效地独立于其他核心运行。此时,您的主要瓶颈可能与操作系统/网卡相关(如果不是,请告诉我们!)。多处理确实可以扩展任务生产,但它在进程数量上是次线性的。每个进程的任务上载速率将随着每个额外核心的添加而降低,但每个核心仍会将额外吞吐量增加到某个拐点。

``python
skqueue从concurrent.futures导入processpoolexecutor导入greentaskqueue


__(self):
对于范围内的i(self.start,self.end):
生成printtask(i)

def upload(tsk):
tq=greenttaskqueue('sqs-queue-name')
tq.insert_all(tsk)

tasks=[打印任务迭代器(0,100),打印任务迭代器(100,200)]
带有processpoolexecutor(max_workers=2)as execute:
execute.map(upload,tasks)
````

上面的结构允许我们在前面编写生成器调用,在酸洗过程中只传递少数灵长类动物,并在另一边透明地调用生成器。我们甚至可以支持生成器无法使用的"len()"函数。

``python
觕清单8:简单的多处理


import gevent.monkey
gevent.monkey.patch_all(thread=false)
import copy
from taskqueue import greentaskqueue

(对象):
def初始(self,start,end):
self.start=start
self.end=end
def getitem(self,slc):
itr=copy.deepcopy(self)
itr.start=self.start+slc.start
itr.end=self.start+slc.stop
return itr
def len/>返回self.end-self.start
def\uuiter(self):
对于范围内的i(self.start,self.end):
生成打印任务(i)


tq=greenttaskqueue('sqs-queue-name')
tq.insert\all(printtaskterator(0200),parallel=2)
``````

lice运算符起作用,taskqueue可以自动删除迭代器,从而将迭代器馈送给多个进程。值得注意的是,我们不返回'printtaskterator(self.start+slc.start,self.start+slc.stop)',因为它在pickling期间会触发无休止的递归。但是,上面的运行时复制实现回避了这个问题。在内部,"printtaskterator(0200)"将变成"[printtaskterator(0100),printtaskterator(100200)"。我们还对队列中的子进程引发的异常进行跟踪。`格文·蒙克y.patch_all(thread=false)`是避免多进程挂起所必需的。

[1]不能在cpython中传递生成器,但[可以传递迭代器](https://stackoverflow.com/questions/1939015/singleton-python-generator-or-pickle-a-python-generator/1939493)。如果使用pypy或stackless python,则可以传递生成器。

--
使用<;3制作。

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
从批处理文件运行Java Selenium webdriver程序(不用于测试)   java I无法在intelli J Idea中添加导航抽屉   matlab是另一个java。lang.ClassCastException   gwt Java ProcessBuilder如何获取实时输出   列出如何根据Java中ArrayList的大小为变量分配字符?   逐字符比较java中的两个字符串   导入项目时,如何使用正确版本的Java和Gradle运行IntelliJ?   使用2d数组时出现java运行时异常   音频Java:使用按钮停止声音播放   安卓 Rfc2898DeriveBytes for java?   获取对java中包含类的引用   java在每次重定向到错误页面时登录到我的spring启动项目   java打印输出用于阵列式课堂考勤系统   java如何在计算后检索值?   java将列表项插入为属性?   java如何使用JPA的JPQL连接两个表