在本地和不同的远程资源(如http、s3)之间读写一致的io接口
iotoolz的Python项目详细描述
iotoolz公司
iotoolz
是对e2fyi-utils
的改进,部分灵感来自{iotoolz
是一个库,用于帮助提供与任何IO资源交互的一致的dev-x。
它提供了一个抽象类iotoolz.AbcStream
,它模仿python的本机open
非常接近(使用一些附加参数和方法,如save
)。在
API文档可以在https://iotoolz.readthedocs.io/en/latest/找到。在
更改日志可在CHANGELOG.md中找到。在
- Python 3.6 and above
- Licensed under Apache-2.0.
支持的流
当前支持以下流:
iotoolz.FileStream
:内置open
函数(file://
)上的包装器iotoolz.TempStream
:将滚动到磁盘的内存流(tmp://
,temp://
)iotoolz.HttpStream
:用requests
(http://
,https://
)实现的http或https流iotoolz.extensions.S3Stream
:用boto3
(s3://
,s3a://
,s3n://
)实现的s3流
安装
# install the default packages only (most lite-weight) pip install iotoolz # install dependencies for specific extension pip install iotoolz[boto3]# install all the extras pip install iotoolz[all]
可用附加功能:
all
:所有可选依赖项boto3
:boto3
用于iotoolz.extensions.S3Stream
minio
:待办事项
快速启动
helper对象iotoolz.streams.stream_factory
是的默认单例
iotoolz.streams.Streams
用于支持大多数常见用例。在
iotoolz.streams.open_stream
(别名iotoolz.streams.Stream
),是一个util方法
由singleton助手提供以创建流对象。此方法接受相同的结果
参数作为python的open
方法,具有以下附加参数:
data
:将传递到流中的可选str或字节fileobj
:可选的类似文件的对象,将被复制到流中content_type
:描述流的可选mime类型信息(例如application/json)inmem_size
:确定在转移到本地文件系统之前要分配给流的内存量。默认为无限制(可能导致内存错误)。在schema_kwargs
:模式到其默认kwarg的可选映射。在
eh3基本设置>
^{pr2}$
打开流
您可以像python内置的open
方法一样打开任何流。在
importpandasfromiotoolzimportopen_stream# print line by line some data in from a https endpoint# and do not verify the ssl cert of the https endpointwithopen_stream("https://foo/bar/data.txt",mode="r",schema_kwargs={"https":{"verify":False}})asstream:forlineinstream:print(line)# POST some binary content to a http endpoint (default is PUT)withopen_stream("https://foo.bar/api/data","wb",use_post=True)asstream:stream.write(b"hello world")# Copying a local file to s3withopen_stream("path/to/data.csv","r")ascsv_source,open_stream("s3://bucket/foobar.txt?StorageClass=STANDARD","w")ass3_sink:# pipe content in csv_source to tmpsinkcsv_source.pipe(s3_sink)# load to pandas dataframe from s3 fileobjwithopen_stream("s3://bucket/foobar.csv","r")ascsv:df=pd.read_csv(csv)
温度流
TempStream
是一个流,其功能类似于内存中的虚拟文件系统。在
importgcfromiotoolzimportStream,exists,glob,iter_dir# this stream can be garbage collectedStream("tmp://foo/bar/data.txt",data="foobar")# True if not gc yet, False if already gcexists("tmp://foo/bar/data.txt")# force gcgc.collect()# will not existexists("tmp://foo/bar/data.txt")# create temp stream with strong ref (hence will not be gc)s1=Stream("tmp://foo/bar/data.txt",data="foobar")s2=Stream("tmp://foo/example.txt",data="...")# returns s1 and s2iter_dir("tmp://foo/")# returns s1 onlyglob("tmp://foo/bar/*.txt")
流式操作
Stream
是open_stream
的别名,这两个方法都返回一个具体的AbcStream
对象。
您可以将对象同时视为“类似文件”和“流式”对象-也就是说,您可以
读,写,搜索,刷新,关闭对象。在
NOTE
By default, the underlying buffer is in-memory. You can enable rollover to disk by passing the
inmem_size
arg to the method, or update the defaultinmem_size
value with theiotoolz.streams.set_buffer_rollover_size
method.
fromiotoolzimportopen_stream,Stream,set_buffer_rollover_size# `Stream` is an alias of `open_stream`assertopen_stream==Stream# rollover to disk if data is over 100 MBset_buffer_rollover_size(10**8)# you can overwrite the default kwargs here alsostream=Stream("path/to/data.txt",mode="rw",# you can both read and write to a stream)# stream is lazily evaluated, nothing will be buffered until you call some methods# that requires the datadata=stream.read()# will attempt to provide encoding and content_type if not provided when opening the streamprint(stream.encoding)print(stream.content_type)# stream has the same interface as an IO object - i.e. u can seek, flush, close, etcstream.seek(5)# go to offset 5 from start of bufferstream.write("replace with this text")stream.seek(0,whence=2)# go to end of bufferstream.write("additional text after original eof")# continue writing to the end of the bufferstream.save()# flush save the entire buffer to the same dst locationstream.close()# close the stream
路径式操作
exists
、mkdir
、iter_dir
和{pathlib.Path
中的等效方法。在
method | supported streams | desc |
---|---|---|
^{ | All Streams | return the StreamInfo for an existing resource |
^{ | All Streams | Delete and remove the stream (except for ^{ |
^{ | All Streams | check if a stream points to an existing resource. |
^{ | ^{ | create a directory. |
^{ | ^{ | remove recursively everything in the directory. |
^{ | ^{ | iterate thru the streams in the directory. |
^{ | ^{ | iterate thru the streams in the directory that match a pattern. |
importitertoolsfromiotoolzimportStream,mkdir,iter_dir,glob,exists# similar to 'mkdir -p'mkdir("path/to/folder",parents=True,exist_ok=True)Stream("path/to/folder").mkdir(parents=True,exist_ok=True)# list object in an s3 bucketiter_dir("s3://bucket/prefix/")forstreaminStream("s3://bucket/prefix/").iter_dir():print(stream.uri)# find s3 objects with a specific patternglob("s3://bucket/prefix/*txt")forstreaminStream("s3://bucket/prefix/").glob("*.txt"):print(stream.uri)# existsexists("s3://bucket/prefix/foo.txt")# statsinfo=stats("s3://bucket/prefix/foo.txt")print(info.name)print(info.content_type)print(info.encoding)print(info.last_modified)print(info.etag)print(info.extras)# delete resourceunlink("s3://bucket/prefix/foo.txt")# rm all key with prefixrmdir("s3://bucket/prefix/")
管道流
pipe
是将数据推送到接收器的方法(类似于NodeJS流,只是它没有
水印或缓冲)。在
fromiotoolz.streamsimportopen_streamlocal_file=open_stream("path/to/google.html",content_type="text/html",mode="w")temp_file=open_stream("tmp://google.html",content_type="text/html",mode="wb")# when source is closed, all sinks will be closed alsowithopen_stream("https://google.com")assource:# writes to a temp file then to a local file in sequencesource.pipe(temp_file).pipe(local_file)local_file2=open_stream("path/to/google1.html",content_type="text/html",mode="w")local_file3=open_stream("path/to/google2.html",content_type="text/html",mode="w")# when source is closed, all sinks will be closed alsowithopen_stream("tmp://foo_src",mode="w")assource:# writes in a fan shape mannersource.pipe(local_file2)source.pipe(local_file3)source.write("hello world")
TODO support transform streams so that pipe can be more useful
- 项目
标签: