PipeFrame用python编写多处理数据管道的简单模块。

pipeframe的Python项目详细描述


管架

什么是管道?在

In computing, a pipeline, also known as a data pipeline, is a set of data processing elements connected in series, where the output of one element is the input of the next one. The elements of a pipeline are often executed in parallel or in time-sliced fashion. Some amount of buffer storage is often inserted between elements.

来源:维基百科

PipeFrame是一个小的pipeframe帮助您利用python多处理库处理数据(流或批处理)。在

安装

pip上提供的软件包,要在您的环境中安装它,只需执行以下操作:

pip install pipeframe

入门

创建管道

首先应该创建管道,它应该继承自pipeframe.core.PipelineEngine 并包含steps类属性:

frompipeframe.coreimportPipelineEngineclassYourCustomPipeline(PipelineEngine):steps=[func1,func2,...]

管道将针对steps函数执行每个条目。你可以 定义要对数据执行的任何数量的函数,即执行顺序 将遵循您在步骤列表中定义的相同顺序。在

您的函数应该将要处理的记录作为参数接收并返回 修改后的记录和用于绕过后续步骤执行的布尔值 在数据上(False)或继续管道流(True)。在

^{pr2}$

您还必须提供一个名为feed的函数,该函数将为您的进程提供一些数据:

classYourCustomPipeline(PipelineEngine):steps=[func1,func2,...]deffeed(self,bucket):req=requests.get('https://www.reddit.com/r/all/top.json',headers={'User-agent':'pipeframe'})ifreq.status_code==200:data=req.json()['data']['children']forentryindata:bucket.put(entry['data'])

运行管道

要执行新创建的管道,必须使用PipeFrame executor调用它:

frompipeframe.coreimportPipeFramepipe_frame=PipeFrame(cpu_count=16,stream_buffer_size=50000)pipe_frame.run(YourCustomPipeline)

cpu_count和{}是可选参数:

  • cpu计数:一个整数,默认为机器中的核心数减去1
  • 缓冲区大小:默认为10000的整数

流还是批处理?在

默认情况下,管道将以批处理模式运行,这意味着提要函数将在该步骤之前运行并完成 函数开始。您必须知道有多少数据条目将进入队列,并调整buffer_size 根据这一点。在

如果使source='stream',则提要函数将在step函数、feed和processing之后启动 将并行发生。在这种情况下,您应该将timeout属性调整为一个足够高的值,以防止 由于队列中暂时缺少数据而导致管道终止(对于数据摄取速度低于 你的处理能力)。在

示例:

classYourCustomPipeline(PipelineEngine):steps=[func1,func2,...]source='stream'timeout=10deffeed(self,bucket):forentryininfinite_stream_of_data():bucket.put(entry)

在上面的示例中,您的工人将等待10秒,等待无限长的\u stream_of_data()函数生成 要处理的新数据,如果10秒内没有新数据到达,则工作进程将终止,因为流已干涸。在

完整的例子

frompipeframe.coreimportPipelineEngine,PipeFrameimportfcntlimportjsondefclear_entry(entry):entry['new_number']=0returnentry,Truedefpower(entry):entry['new_number']=entry['number']**entry['number']returnentry,Truedefwrite_to_disk(entry):"""    Lock the file, write entry, release the file.    """withopen("log","a")asfh:fcntl.flock(fh,fcntl.LOCK_EX)fh.write(json.dumps(entry['number'])+'\n')fcntl.flock(fh,fcntl.LOCK_UN)returnentry,TrueclassPowerDataPipeline(PipelineEngine):steps=[clear_entry,power,write_to_disk]source='batch'@staticmethoddeffeed(bucket):x=1000000foriinrange(10):x+=1000entry={'number':x}bucket.put(entry)# With all cpu  - 1pipe_frame=PipeFrame()pipe_frame.run(PowerDataPipeline)# With 2 cpuspipe_frame=PipeFrame(cpu_count=2)pipe_frame.run(PowerDataPipeline)

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
java Spinner选定值未上载到firebase数据库   java如何通过bukkit中的配置添加消息?   java在SharedReference中保存列表的泛型类型   javascript Java小程序未定义   swt在Java中构建控制台应用程序   java OAuth同意屏幕没有突然显示,没有错误?   java webview选择文件安卓不工作   java Spring boot JPA如何从同一连接添加多个数据库?   java JDBC DB2驱动程序计时器线程处于阻塞状态   java我在vscode中运行flatter时看到一个错误在phone中运行应用程序somone能否帮助我plz   Java:生成JSON:如何避免生成不完整的JSON   java Date compareTo()方法始终返回1   当使用Junit runner运行测试时,java Spring自动连线失败   java Android/Sockets如何将信息从主UI线程发送到socket线程?   java Android内存声明全局变量   java如何在JBoss中拥有多个具有相同JNDI名称的数据源?   python在Java中嵌入CPython时,为什么会挂起?   java如何提供深度模拟对象?   java“find:smallint,expected:integer”Hibernate对informix数据库的验证在短时间内失败