为数据分析而设计的流水线框架,但对于其他应用程序是有用的
pyPiper的Python项目详细描述
python的并行流水线框架。开发人员可以创建节点 把它们连在一起形成管道。
扩展Node的类必须实现将 每当有新数据可用时调用。
安装
pip install pypiper
示例用法
frompyPiperimportNode,PipelineclassGenerate(Node):defsetup(self,size):self.size=sizeself.pos=0defrun(self,data):ifself.pos<self.size:self.emit(self.pos)self.pos=self.pos+1else:self.close()classSquare(Node):defrun(self,data):self.emit(data**2)pipeline=Pipeline(Generate("gen",size=10)|Square("square"))print(pipeline)pipeline.run()
节点还可以指定批处理大小,该大小指示应该 被推到节点。例如,在上一个示例的基础上构建。在 这种情况batch_size是在nodessetup方法中指定的。 或者,可以在创建节点时设置(例如 Printer("print", batch_size=5))
classPrinter(Node):defsetup(self):self.batch_size=Node.BATCH_SIZE_ALLdefrun(self,data):print(data)pipeline=Pipeline(Generate("gen",size=10)|Square("square")|Printer("print"))print(pipeline)pipeline.run()
并行执行
要并行处理管道,请在创建时传递n_threads>;1 管道。并行执行是使用multiprocessing和 非常适合CPU密集型任务,如音频处理和 特征提取。例如:
classGenerate(Node):defsetup(self,size):self.pos=0defrun(self,data):ifself.pos<self.size:self.emit(self.pos)self.pos=self.pos+1else:self.close()pipeline=Pipeline(Generate("gen",size=10)|Square("square")|Printer("print"),n_threads=2)print(pipeline)pipeline.run()
流名称
您还可以命名输入和输出流。例如:
gen=EvenOddGenerate("gen",size=20,out_streams=["even","odd"])double=Double("double",out_streams="num",in_streams="even")square=Square("square",out_streams="num",in_streams="odd")printer1=Printer("p1",in_streams="num",batch_size=Node.BATCH_SIZE_ALL)printer2=Printer("p2",in_streams="num",batch_size=Node.BATCH_SIZE_ALL)p=Pipeline(gen|[double|printer1,square|printer2],quiet=False)p.run()
evenoddgenerate生成一对数字。使用out_streams 参数,我们将第一个数命名为偶数,第二个数命名为奇数。什么时候? 初始化double和square节点时,我们告诉double获取 偶数和平方来取奇数。
如果将多个输出流传递到一个节点,默认情况下,它们将 作为列表进入节点。例如,
gen=EvenOddGenerate("gen",size=10,out_streams=["even","odd"])printer=Printer("p1",batch_size=1)p=Pipeline(gen|printer,quiet=False)p.run()
将输出
[0,1],[2,3],...
但是,如果可以通过在 输入流in_streams参数。所以,``` python gen= evenoddgenerate(“gen”,size=20,out_streams=[“偶数”,“奇数”])
printer=打印机(“p1”,in_streams=[“偶数”,“奇数”],batch_size=1)
p=管道(gen printer,quiet=false)p.run()````
将生成:
0,1,2,3,...
进度更新
调用pipeline.run()时,可以为 进度更新。每当管道进展时,它就称之为 使用到目前为止已处理的项目数和 需要处理的项目总数。例如,如果你 如果使用的是TQM进度条,则可以使用以下代码:
fromtqdmimporttqdmclassTqdmUpdate(tqdm):defupdate(self,done,total_size=None):iftotal_sizeisnotNone:self.total=total_sizeself.n=donesuper().refresh()if__name__=='__main__':gen=Generate("gen",size=10)double=Double("double")sleeper=Sleep("sleep")p=Pipeline(gen|[double,sleeper],n_threads=4,quiet=True)withTqdmUpdate(desc="Progress")aspbar:p.run(update_callback=pbar.update)
使用pypiper的项目
- COVFEFE:一个特性 注重词汇、句法和语用特征的提取工具 从文本和音频功能从音频。