在本地和不同的远程资源(如http、s3)之间读写一致的io接口

iotoolz的Python项目详细描述


iotoolz公司

PyPI versionBuild StatusCoverage StatusDocumentation StatusCode style: blackDownloads

iotoolz是对e2fyi-utils的改进,部分灵感来自{}。 iotoolz是一个库,用于帮助提供与任何IO资源交互的一致的dev-x。 它提供了一个抽象类iotoolz.AbcStream,它模仿python的本机open 非常接近(使用一些附加参数和方法,如save)。在

API文档可以在https://iotoolz.readthedocs.io/en/latest/找到。在

更改日志可在CHANGELOG.md中找到。在

支持的流

当前支持以下流:

  • iotoolz.FileStream:内置open函数(file://)上的包装器
  • iotoolz.TempStream:将滚动到磁盘的内存流(tmp://temp://
  • iotoolz.HttpStream:用requestshttp://https://)实现的http或https流
  • iotoolz.extensions.S3Stream:用boto3s3://s3a://s3n://)实现的s3流

安装

# install the default packages only (most lite-weight)
pip install iotoolz

# install dependencies for specific extension
pip install iotoolz[boto3]# install all the extras
pip install iotoolz[all]

可用附加功能:

  • all:所有可选依赖项
  • boto3boto3用于iotoolz.extensions.S3Stream
  • minio:待办事项

快速启动

helper对象iotoolz.streams.stream_factory是的默认单例 iotoolz.streams.Streams用于支持大多数常见用例。在

iotoolz.streams.open_stream(别名iotoolz.streams.Stream),是一个util方法 由singleton助手提供以创建流对象。此方法接受相同的结果 参数作为python的open方法,具有以下附加参数:

  • data:将传递到流中的可选str或字节
  • fileobj:可选的类似文件的对象,将被复制到流中
  • content_type:描述流的可选mime类型信息(例如application/json)
  • inmem_size:确定在转移到本地文件系统之前要分配给流的内存量。默认为无限制(可能导致内存错误)。在
  • schema_kwargs:模式到其默认kwarg的可选映射。在

eh3基本设置> ^{pr2}$

打开流

您可以像python内置的open方法一样打开任何流。在

importpandasfromiotoolzimportopen_stream# print line by line some data in from a https endpoint# and do not verify the ssl cert of the https endpointwithopen_stream("https://foo/bar/data.txt",mode="r",schema_kwargs={"https":{"verify":False}})asstream:forlineinstream:print(line)# POST some binary content to a http endpoint (default is PUT)withopen_stream("https://foo.bar/api/data","wb",use_post=True)asstream:stream.write(b"hello world")# Copying a local file to s3withopen_stream("path/to/data.csv","r")ascsv_source,open_stream("s3://bucket/foobar.txt?StorageClass=STANDARD","w")ass3_sink:# pipe content in csv_source to tmpsinkcsv_source.pipe(s3_sink)# load to pandas dataframe from s3 fileobjwithopen_stream("s3://bucket/foobar.csv","r")ascsv:df=pd.read_csv(csv)

温度流

TempStream是一个流,其功能类似于内存中的虚拟文件系统。在

importgcfromiotoolzimportStream,exists,glob,iter_dir# this stream can be garbage collectedStream("tmp://foo/bar/data.txt",data="foobar")# True if not gc yet, False if already gcexists("tmp://foo/bar/data.txt")# force gcgc.collect()# will not existexists("tmp://foo/bar/data.txt")# create temp stream with strong ref (hence will not be gc)s1=Stream("tmp://foo/bar/data.txt",data="foobar")s2=Stream("tmp://foo/example.txt",data="...")# returns s1 and s2iter_dir("tmp://foo/")# returns s1 onlyglob("tmp://foo/bar/*.txt")

流式操作

Streamopen_stream的别名,这两个方法都返回一个具体的AbcStream对象。 您可以将对象同时视为“类似文件”和“流式”对象-也就是说,您可以 读,写,搜索,刷新,关闭对象。在

NOTE

By default, the underlying buffer is in-memory. You can enable rollover to disk by passing the inmem_size arg to the method, or update the default inmem_size value with the iotoolz.streams.set_buffer_rollover_size method.

fromiotoolzimportopen_stream,Stream,set_buffer_rollover_size# `Stream` is an alias of `open_stream`assertopen_stream==Stream# rollover to disk if data is over 100 MBset_buffer_rollover_size(10**8)# you can overwrite the default kwargs here alsostream=Stream("path/to/data.txt",mode="rw",# you can both read and write to a stream)# stream is lazily evaluated, nothing will be buffered until you call some methods# that requires the datadata=stream.read()# will attempt to provide encoding and content_type if not provided when opening the streamprint(stream.encoding)print(stream.content_type)# stream has the same interface as an IO object - i.e. u can seek, flush, close, etcstream.seek(5)# go to offset 5 from start of bufferstream.write("replace with this text")stream.seek(0,whence=2)# go to end of bufferstream.write("additional text after original eof")# continue writing to the end of the bufferstream.save()# flush save the entire buffer to the same dst locationstream.close()# close the stream

路径式操作

existsmkdiriter_dir和{}是可用于 流对象。这些方法在适当的时候模仿它们在pathlib.Path中的等效方法。在

methodsupported streamsdesc
^{}All Streamsreturn the StreamInfo for an existing resource
^{}, ^{}, ^{}All StreamsDelete and remove the stream (except for ^{} where the buffer is cleared instead)
^{}All Streamscheck if a stream points to an existing resource.
^{}^{}create a directory.
^{}^{}, ^{}, and ^{},remove recursively everything in the directory.
^{}^{}, ^{}, and ^{}iterate thru the streams in the directory.
^{}^{}, ^{}, and ^{}iterate thru the streams in the directory that match a pattern.
importitertoolsfromiotoolzimportStream,mkdir,iter_dir,glob,exists# similar to 'mkdir -p'mkdir("path/to/folder",parents=True,exist_ok=True)Stream("path/to/folder").mkdir(parents=True,exist_ok=True)# list object in an s3 bucketiter_dir("s3://bucket/prefix/")forstreaminStream("s3://bucket/prefix/").iter_dir():print(stream.uri)# find s3 objects with a specific patternglob("s3://bucket/prefix/*txt")forstreaminStream("s3://bucket/prefix/").glob("*.txt"):print(stream.uri)# existsexists("s3://bucket/prefix/foo.txt")# statsinfo=stats("s3://bucket/prefix/foo.txt")print(info.name)print(info.content_type)print(info.encoding)print(info.last_modified)print(info.etag)print(info.extras)# delete resourceunlink("s3://bucket/prefix/foo.txt")# rm all key with prefixrmdir("s3://bucket/prefix/")

管道流

pipe是将数据推送到接收器的方法(类似于NodeJS流,只是它没有 水印或缓冲)。在

fromiotoolz.streamsimportopen_streamlocal_file=open_stream("path/to/google.html",content_type="text/html",mode="w")temp_file=open_stream("tmp://google.html",content_type="text/html",mode="wb")# when source is closed, all sinks will be closed alsowithopen_stream("https://google.com")assource:# writes to a temp file then to a local file in sequencesource.pipe(temp_file).pipe(local_file)local_file2=open_stream("path/to/google1.html",content_type="text/html",mode="w")local_file3=open_stream("path/to/google2.html",content_type="text/html",mode="w")# when source is closed, all sinks will be closed alsowithopen_stream("tmp://foo_src",mode="w")assource:# writes in a fan shape mannersource.pipe(local_file2)source.pipe(local_file3)source.write("hello world")

TODO support transform streams so that pipe can be more useful

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
Java例外。Lang.Stringindexoutofboundsexception索引超出范围(0)   java Spring引导Freemarker从2.2.0升级失败   重构Java反模式名称?包含对象的对象包含。。。等   用java处理JDBC可能出现的死锁的最佳方法   java无法访问主线程上的数据库,因为它可能会在很长一段时间内锁定UI   java如何将固定大小的画布包装在边框窗格中的滚动窗格居中?   java解析xsd文件后得到空结果   在html页面中表示XML文件的java   socketjava对象流   sql Java越界异常数据库   JavaJBoss7.1.1不会在Mavericks上启动   Twincat ADS事件驱动读取在一段时间后停止工作(Java)   java MyBatis使用生成的ID插入所有   Mojave上缺少MacOS Java控制面板   JavaGuice:如果多次注入相同的依赖项,是否注入了该依赖项的相同实例?