为数据分析而设计的流水线框架,但对于其他应用程序是有用的
pyPipelineStream的Python项目详细描述
python的流水线框架。开发人员可以创建节点并将它们链接在一起以创建管道。
扩展```node``的类必须实现``run``方法,只要有新数据可用,就会调用该方法。
一个简单的例子
``python
来自pypipelining导入节点,pipeline
class generate(节点):
def setup(self):
self.pos=0
如果self.pos<;self.size:
self.emit(self.pos)
self.pos=self.pos+1
否则:
self.close()
class square(node):
def run(self,data):
self.emit(data**2)
pipeline=pipeline(generate(“gen”,size=10)square(“square”))
print(pipeline)
pipeline.run()
`````
nodes还可以指定一个批大小,该批大小指示应向节点推送多少数据。
例如,在前面的示例的基础上。在这种情况下,在nodes`` setup``方法中指定了``批处理大小``'。或者,也可以在创建节点时设置(例如`` printer(“print”,batch-size=5)```)
`` python
类打印机(节点):
def setup(self):
self.batch-size=node.batch-size=all
def run(self,data):
print(data)
pipeline=pipeline(generate(“gen”,大小=10)正方形(“正方形”)打印机(“打印”))
打印(管道)
管道.run()
```
扩展```node``的类必须实现``run``方法,只要有新数据可用,就会调用该方法。
一个简单的例子
``python
来自pypipelining导入节点,pipeline
class generate(节点):
def setup(self):
self.pos=0
self.emit(self.pos)
self.pos=self.pos+1
否则:
self.close()
class square(node):
def run(self,data):
self.emit(data**2)
pipeline=pipeline(generate(“gen”,size=10)square(“square”))
print(pipeline)
pipeline.run()
`````
nodes还可以指定一个批大小,该批大小指示应向节点推送多少数据。
例如,在前面的示例的基础上。在这种情况下,在nodes`` setup``方法中指定了``批处理大小``'。或者,也可以在创建节点时设置(例如`` printer(“print”,batch-size=5)```)
`` python
类打印机(节点):
def setup(self):
self.batch-size=node.batch-size=all
def run(self,data):
print(data)
pipeline=pipeline(generate(“gen”,大小=10)正方形(“正方形”)打印机(“打印”))
打印(管道)
管道.run()
```