python i/o管道实用程序
tubing的Python项目详细描述
tubing是一个python i/o库。是什么让管子这么酷 严重滥用位或运算符()。你写过python吗 暗号,心想,“伙计,这很好,但我真的希望它是 有点像bash。“小家伙,我们让python有点像bash。如果你 是个超级跛脚的书呆子,你可以用 tube()函数并祈祷以后不要重载任何其他运算符 版本。以下是安装管道的方法:
$ pip install tubing
管子目前还很简陋。我们试着简单地添加 你自己的功能。希望你没有那么不愉快。在那里 下面是添加源、管和接收器的三个部分。如果你真的 一些补充,考虑把它们放回上游。我们很乐意 全套工具。
现在,看看这个完全可操作的I/O库的威力。
fromtubingimportsources,tubes,sinksobjs=[dict(name="Bob Corsaro",birthdate="08/03/1977",alignment="evil",),dict(name="Tom Brady",birthdate="08/03/1977",alignment="good",),]sources.Objects(objs) \ |tubes.JSONDumps() \ |tubes.Joined(by=b"\n") \ |tubes.Gzip() \ |sinks.File("output.gz","wb")
然后在我们的老朋友聚会上。
$ zcat output.gz {"alignment": "evil", "birthdate": "08/03/1977", "name": "Bob Corsaro"}{"alignment": "good", "birthdate": "08/03/1977", "name": "Tom Brady"} $
您可以在readthedocs上找到更多文档
目录
来源
Objects | Takes a list of python objects. |
File | Creates a stream from a file. |
Bytes | Takes a byte string. |
IO | Takes an object with a read function. |
Socket | Takes an addr, port and socket() args. . |
HTTP | Takes an method, url and any args that can be passed to requests library. |
管子
Gunzip | Unzips a binary stream. |
Gzip | Zips a binary stream. |
JSONLoads | Parses a byte string stream of raw JSON objects. Will try to use ujson, then built-in json. |
JSONDumps | Serializes an object stream using json.dumps. Will try to use ujson, then built-in json. |
Split | Splits a stream that supports the split method. |
Joined | Joins a stream of the same type as the by argument. |
Tee | Takes a sink and passes chunks along apparatus. |
Map | Takes a transformer function for single items in stream. |
Filter | Takes a filter test callback and only forwards items that pass. |
ChunkMap | Takes a transformer function for batch of stream items. |
水槽
Objects | A list that stores all passed items to self. |
Bytes | Saves each chunk self.results. |
File | Writes each chunk to a file. |
HTTPPost | Writes data via HTTPPost. |
Hash | Takes algorithm name, updates hash with contents. |
Debugger | Writes each chunk to the tubing.tubes debugger with level DEBUG. |
扩展
s3.S3Source | Create stream from an S3 object. |
s3.MultipartUploader | Stream data to S3 object. |
elasticsearch.BulkSink | Stream elasticsearch.DocUpdate objects to the elasticsearch _bulk endpoint. |
来源
要创建自己的源代码,请使用以下接口创建一个reader类。
classMyReader(object):""" MyReader returns count instances of data. """def__init__(self,data="hello world\n",count=10):self.data=dataself.count=countdefread(self,amt):""" read(amt) returns $amt of data and a boolean indicating EOF. """ifnotamt:amt=self.countr=self.data*min(amt,self.count)self.count-=amtreturnr,self.count<=0
重要的是要记住,read函数应该返回 可数的数据单位,而不是单个数据。然后把你的读者包起来 MakeSourceFactory的爱的拥抱。
fromtubingimportsourcesMySource=sources.MakeSourceFactory(MyReader)
现在它可以用在仪器上了!
from__future__importprint_functionfromtubingimporttubessink=MySource(data="goodbye cruel world!",count=1) \ |tubes.Joined(by=b"\n") \ |sinks.Bytes()print(sinks.result)# Output: goodbye cruel world!
管子
相信我,自己做试管会更有趣。先做个变压器。
classOptimusPrime(object):deftransform(self,chunk):returnlist(reversed(chunk))
chunk是一个iterable,其中len()是流的任何类型的数据 与合作。在变形金刚中,不需要担心缓冲区大小或 关闭或异常,只需将一个iterable转换为另一个iterable。有 tubes.py中有很多示例。
接下来给擎天柱一个拥抱。
fromtubingimporttubesAllMixedUp=tubes.MakeTranformerTubeFactory(OptimusPrime)
准备好混合一些数据了吗?
from__future__importprint_functionimportjsonfromtubingimportsources,sinksobjs=[{"number":i}foriinrange(0,10)]sink=sources.Objects(objs) \ |AllMixedUp(chunk_size=2) \ |sinks.Objects()print(json.dumps(sink))# Output: [{"number": 1}, {"number": 0}, {"number": 3}, {"number": 2}, {"number": 5}, {"number": 4}, {"number": 7}, {"number": 6}, {"number": 9}, {"number": 8}]
水槽
真的厌倦了制作文档…也许我会晚些时候完成。我有真正的工作要做。
嗯..我走了这么远,让我们继续前进。
from__future__importprint_functionfromtubingimportsources,tubes,sinksclassStdoutWriter(object):defwrite(self,chunk):forpartinchunk:print(part)defclose(self):# this function is optionalprint("That's all folks!")defabort(self):# this is also optionalprint("Something terrible has occurred.")Debugger=sinks.MakeSinkFactory(StdoutWriter)objs=[{"number":i}foriinrange(0,10)]sink=sources.Objects(objs) \ |AllMixedUp(chunk_size=2) \ |tubes.JSONDumps() \ |tubes.Joined(by=b"\n") \ |Debugger()# Output:#{"number": 1}#{"number": 0}#{"number": 3}#{"number": 2}#{"number": 5}#{"number": 4}#{"number": 7}#{"number": 6}#{"number": 9}#{"number": 8}#That's all folks!