python i/o管道实用程序

tubing的Python项目详细描述


https://imgur.com/Q9Lv0xo.png
https://travis-ci.org/dokipen/tubing.svg?branch=masterhttps://coveralls.io/repos/github/dokipen/tubing/badge.svg?branch=masterhttps://img.shields.io/pypi/v/tubing.svghttps://img.shields.io/pypi/pyversions/tubing.svghttps://img.shields.io/pypi/dd/tubing.svghttps://img.shields.io/pypi/l/tubing.svghttps://img.shields.io/pypi/wheel/tubing.svghttps://readthedocs.org/projects/tubing/badge/?version=latestCode Health

tubing是一个python i/o库。是什么让管子这么酷 严重滥用位或运算符()。你写过python吗 暗号,心想,“伙计,这很好,但我真的希望它是 有点像bash。“小家伙,我们让python有点像bash。如果你 是个超级跛脚的书呆子,你可以用 tube()函数并祈祷以后不要重载任何其他运算符 版本。以下是安装管道的方法:

$ pip install tubing

管子目前还很简陋。我们试着简单地添加 你自己的功能。希望你没有那么不愉快。在那里 下面是添加源、管和接收器的三个部分。如果你真的 一些补充,考虑把它们放回上游。我们很乐意 全套工具。

现在,看看这个完全可操作的I/O库的威力。

fromtubingimportsources,tubes,sinksobjs=[dict(name="Bob Corsaro",birthdate="08/03/1977",alignment="evil",),dict(name="Tom Brady",birthdate="08/03/1977",alignment="good",),]sources.Objects(objs) \
     |tubes.JSONDumps() \
     |tubes.Joined(by=b"\n") \
     |tubes.Gzip() \
     |sinks.File("output.gz","wb")

然后在我们的老朋友聚会上。

$ zcat output.gz
{"alignment": "evil", "birthdate": "08/03/1977", "name": "Bob Corsaro"}{"alignment": "good", "birthdate": "08/03/1977", "name": "Tom Brady"}
$

您可以在readthedocs上找到更多文档

目录

来源

ObjectsTakes a list of python objects.
FileCreates a stream from a file.
BytesTakes a byte string.
IOTakes an object with a read function.
SocketTakes an addr, port and socket() args. .
HTTPTakes an method, url and any args that can be passed to requests library.

管子

GunzipUnzips a binary stream.
GzipZips a binary stream.
JSONLoadsParses a byte string stream of raw JSON objects. Will try to use ujson, then built-in json.
JSONDumpsSerializes an object stream using json.dumps. Will try to use ujson, then built-in json.
SplitSplits a stream that supports the split method.
JoinedJoins a stream of the same type as the by argument.
TeeTakes a sink and passes chunks along apparatus.
MapTakes a transformer function for single items in stream.
FilterTakes a filter test callback and only forwards items that pass.
ChunkMapTakes a transformer function for batch of stream items.

水槽

ObjectsA list that stores all passed items to self.
BytesSaves each chunk self.results.
FileWrites each chunk to a file.
HTTPPostWrites data via HTTPPost.
HashTakes algorithm name, updates hash with contents.
DebuggerWrites each chunk to the tubing.tubes debugger with level DEBUG.

扩展

s3.S3SourceCreate stream from an S3 object.
s3.MultipartUploaderStream data to S3 object.
elasticsearch.BulkSinkStream elasticsearch.DocUpdate objects to the elasticsearch _bulk endpoint.

来源

要创建自己的源代码,请使用以下接口创建一个reader类。

classMyReader(object):"""
    MyReader returns count instances of data.
    """def__init__(self,data="hello world\n",count=10):self.data=dataself.count=countdefread(self,amt):"""
        read(amt) returns $amt of data and a boolean indicating EOF.
        """ifnotamt:amt=self.countr=self.data*min(amt,self.count)self.count-=amtreturnr,self.count<=0

重要的是要记住,read函数应该返回 可数的数据单位,而不是单个数据。然后把你的读者包起来 MakeSourceFactory的爱的拥抱。

fromtubingimportsourcesMySource=sources.MakeSourceFactory(MyReader)

现在它可以用在仪器上了!

from__future__importprint_functionfromtubingimporttubessink=MySource(data="goodbye cruel world!",count=1) \
     |tubes.Joined(by=b"\n") \
     |sinks.Bytes()print(sinks.result)# Output: goodbye cruel world!

管子

相信我,自己做试管会更有趣。先做个变压器。

classOptimusPrime(object):deftransform(self,chunk):returnlist(reversed(chunk))

chunk是一个iterable,其中len()是流的任何类型的数据 与合作。在变形金刚中,不需要担心缓冲区大小或 关闭或异常,只需将一个iterable转换为另一个iterable。有 tubes.py中有很多示例。

接下来给擎天柱一个拥抱。

fromtubingimporttubesAllMixedUp=tubes.MakeTranformerTubeFactory(OptimusPrime)

准备好混合一些数据了吗?

from__future__importprint_functionimportjsonfromtubingimportsources,sinksobjs=[{"number":i}foriinrange(0,10)]sink=sources.Objects(objs) \
     |AllMixedUp(chunk_size=2) \
     |sinks.Objects()print(json.dumps(sink))# Output: [{"number": 1}, {"number": 0}, {"number": 3}, {"number": 2}, {"number": 5}, {"number": 4}, {"number": 7}, {"number": 6}, {"number": 9}, {"number": 8}]

水槽

真的厌倦了制作文档…也许我会晚些时候完成。我有真正的工作要做。

嗯..我走了这么远,让我们继续前进。

from__future__importprint_functionfromtubingimportsources,tubes,sinksclassStdoutWriter(object):defwrite(self,chunk):forpartinchunk:print(part)defclose(self):# this function is optionalprint("That's all folks!")defabort(self):# this is also optionalprint("Something terrible has occurred.")Debugger=sinks.MakeSinkFactory(StdoutWriter)objs=[{"number":i}foriinrange(0,10)]sink=sources.Objects(objs) \
     |AllMixedUp(chunk_size=2) \
     |tubes.JSONDumps() \
     |tubes.Joined(by=b"\n") \
     |Debugger()# Output:#{"number": 1}#{"number": 0}#{"number": 3}#{"number": 2}#{"number": 5}#{"number": 4}#{"number": 7}#{"number": 6}#{"number": 9}#{"number": 8}#That's all folks!

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
jframe为什么JAVA paint()方法不起作用?   java Guice:将ContainerRequestContext注入拦截器   java如何优雅地关闭Spring JMS MessageListenerAdapter   java如何在Spring中设置快照隔离级别   Java中的安卓平台独立信任存储路径   java无法在eclipse中运行hello world程序   java Sinch空指针问题   使用Java将JSON流式传输到BigQuery   java从“大数据”中选择什么Swing控件?   java通过对象字段过滤/排序集合?   java将数据从活动传递到另一个活动中的片段   java访问打包在jar文件中的文档   Java获取事件的大小。getDragboard()。getFiles()。流()。映射(文件::长度)。toString());   java Android libgdx:启动程序图标按下后,启动屏幕不会立即显示   java如何在Google App Engine灵活环境中配置oracle jdk   java有没有办法减少这些行?   Java:客户端socket不读取第二行,在终止符行之后保持打开状态   java以编程方式获取api 29上的所有APK文件   java ActionBar按钮不显示