为数据分析而设计的流水线框架,但对于其他应用程序是有用的

pyPiper的Python项目详细描述


python的并行流水线框架。开发人员可以创建节点 把它们连在一起形成管道。

扩展Node的类必须实现将 每当有新数据可用时调用。

安装

pip install pypiper

示例用法

frompyPiperimportNode,PipelineclassGenerate(Node):defsetup(self,size):self.size=sizeself.pos=0defrun(self,data):ifself.pos<self.size:self.emit(self.pos)self.pos=self.pos+1else:self.close()classSquare(Node):defrun(self,data):self.emit(data**2)pipeline=Pipeline(Generate("gen",size=10)|Square("square"))print(pipeline)pipeline.run()

节点还可以指定批处理大小,该大小指示应该 被推到节点。例如,在上一个示例的基础上构建。在 这种情况batch_size是在nodessetup方法中指定的。 或者,可以在创建节点时设置(例如 Printer("print", batch_size=5)

classPrinter(Node):defsetup(self):self.batch_size=Node.BATCH_SIZE_ALLdefrun(self,data):print(data)pipeline=Pipeline(Generate("gen",size=10)|Square("square")|Printer("print"))print(pipeline)pipeline.run()

并行执行

要并行处理管道,请在创建时传递n_threads>;1 管道。并行执行是使用multiprocessing和 非常适合CPU密集型任务,如音频处理和 特征提取。例如:

classGenerate(Node):defsetup(self,size):self.pos=0defrun(self,data):ifself.pos<self.size:self.emit(self.pos)self.pos=self.pos+1else:self.close()pipeline=Pipeline(Generate("gen",size=10)|Square("square")|Printer("print"),n_threads=2)print(pipeline)pipeline.run()

流名称

您还可以命名输入和输出流。例如:

gen=EvenOddGenerate("gen",size=20,out_streams=["even","odd"])double=Double("double",out_streams="num",in_streams="even")square=Square("square",out_streams="num",in_streams="odd")printer1=Printer("p1",in_streams="num",batch_size=Node.BATCH_SIZE_ALL)printer2=Printer("p2",in_streams="num",batch_size=Node.BATCH_SIZE_ALL)p=Pipeline(gen|[double|printer1,square|printer2],quiet=False)p.run()

evenoddgenerate生成一对数字。使用out_streams 参数,我们将第一个数命名为偶数,第二个数命名为奇数。什么时候? 初始化double和square节点时,我们告诉double获取 偶数和平方来取奇数。

如果将多个输出流传递到一个节点,默认情况下,它们将 作为列表进入节点。例如,

gen=EvenOddGenerate("gen",size=10,out_streams=["even","odd"])printer=Printer("p1",batch_size=1)p=Pipeline(gen|printer,quiet=False)p.run()

将输出

[0,1],[2,3],...

但是,如果可以通过在 输入流in_streams参数。所以,``` python gen= evenoddgenerate(“gen”,size=20,out_streams=[“偶数”,“奇数”])

printer=打印机(“p1”,in_streams=[“偶数”,“奇数”],batch_size=1)

p=管道(gen printer,quiet=false)p.run()````

将生成:

0,1,2,3,...

进度更新

调用pipeline.run()时,可以为 进度更新。每当管道进展时,它就称之为 使用到目前为止已处理的项目数和 需要处理的项目总数。例如,如果你 如果使用的是TQM进度条,则可以使用以下代码:

fromtqdmimporttqdmclassTqdmUpdate(tqdm):defupdate(self,done,total_size=None):iftotal_sizeisnotNone:self.total=total_sizeself.n=donesuper().refresh()if__name__=='__main__':gen=Generate("gen",size=10)double=Double("double")sleeper=Sleep("sleep")p=Pipeline(gen|[double,sleeper],n_threads=4,quiet=True)withTqdmUpdate(desc="Progress")aspbar:p.run(update_callback=pbar.update)

使用pypiper的项目

  • COVFEFE:一个特性 注重词汇、句法和语用特征的提取工具 从文本和音频功能从音频。

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
爪哇太阳报。安全验证器。ValidatorException:PKIX路径生成失败   java理解为什么在onDispatchTouchEvent()返回True后仍调用onClick()   java如何在资源包中使用JSF标记/如何在资源包中重写URL?   java什么是流控制异常的替代方案?   java使用Spring数据JPA/MongoDB交叉存储,一个查询可以跨越两个数据库吗?   Apache Sling/felix中的java OSGi slf4j日志记录   为什么当我们想要水平/垂直旋转矩阵时,我们要把这个项除以2?   尝试从外部网页接收JSON字符串时出现安卓 Java NullPointerException   java执行异常:从Callable调用方法时   java在jetty上以调试模式运行webapp,使用maven jetty插件在intellij中构建成功(应用程序应在调试模式下启动)   带有内存数据库的linux Java应用程序的构建时间太长   基于java的随机数单元测试算法   java类型javax。摆动JComponent无法解析   Google日历API和UI小部件Java   java tomcat没有突然和任意地响应   java无法使用jsoup在html中获取图像src   我无法让Java接受键盘输入字符串   java如何开始使用Bambol而不让部署永远继续?   java如何使用另一个类的actionPerformed方法删除一个类/组件中的按钮?   java JSON反序列化brakets{}中的一系列对象