面向人类的异步python http。

requests-futures的Python项目详细描述


针对人类的异步python http请求

https://travis-ci.org/ross/requests-futures.png?branch=master

pythonrequestshttp库的小插件。使用Python3.2 concurrent.futuresbackport用于先前版本的python。

额外的api和更改是最小的,并努力避免意外。

以下同步代码:

fromrequestsimportSessionsession=Session()# first requests starts and blocks until finishedresponse_one=session.get('http://httpbin.org/get')# second request starts once first is finishedresponse_two=session.get('http://httpbin.org/get?foo=bar')# both requests are completeprint('response one status: {0}'.format(response_one.status_code))print(response_one.content)print('response two status: {0}'.format(response_two.status_code))print(response_two.content)

可以转换为利用未来,因此通过创建 一个未来的探索,抓住回归的未来来代替回应。这个 可以通过在将来调用result方法来检索响应:

fromrequests_futures.sessionsimportFuturesSessionsession=FuturesSession()# first request is started in backgroundfuture_one=session.get('http://httpbin.org/get')# second requests is started immediatelyfuture_two=session.get('http://httpbin.org/get?foo=bar')# wait for the first request to complete, if it hasn't alreadyresponse_one=future_one.result()print('response one status: {0}'.format(response_one.status_code))print(response_one.content)# wait for the second request to complete, if it hasn't alreadyresponse_two=future_two.result()print('response two status: {0}'.format(response_two.status_code))print(response_two.content)

默认情况下,创建的threadpoolexecutor具有8个工作线程。如果你愿意 调整该值或跨多个可以提供的会话共享执行器 一个给未来会话构造函数。

fromconcurrent.futuresimportThreadPoolExecutorfromrequests_futures.sessionsimportFuturesSessionsession=FuturesSession(executor=ThreadPoolExecutor(max_workers=10))# ...

作为增加工人数量的捷径,你可以通过 最大工人数直接进入未来会话构造函数:

fromrequests_futures.sessionsimportFuturesSessionsession=FuturesSession(max_workers=10)

Futuression将使用现有会话对象,如果提供:

fromrequestsimportsessionfromrequests_futures.sessionsimportFuturesSessionmy_session=session()future_session=FuturesSession(session=my_session)

就这样。请求的api。会话在没有任何修改的情况下被保留 而不是回应。就像所有期货例外一样 被转移(抛出)到未来。result()调用,因此try/except块应该是 搬到那里去了。

取消排队的请求(也就是在您之后进行清理)

如果你知道你不需要任何来自未来的回应 还没有解决,最好取消那些请求。你可以这么做 将会话用作上下文管理器:

fromrequests_futures.sessionsimportFuturesSessionwithFuturesSession(max_workers=1)assession:future=session.get('https://httpbin.org/get')future2=session.get('https://httpbin.org/delay/10')future3=session.get('https://httpbin.org/delay/10')response=future.result()

在本例中,将跳过第二个或第三个请求,从而节省时间和 否则会被浪费的资源。

在后台工作

其他处理可以在后台使用请求的hooks 功能。这可能有助于将工作转移到前台,因为 一个简单的例子是json解析。

frompprintimportpprintfromrequests_futures.sessionsimportFuturesSessionsession=FuturesSession()defresponse_hook(resp,*args,**kwargs):# parse the json storing the result on the response objectresp.data=resp.json()future=session.get('http://httpbin.org/get',hooks={'response':response_hook,})# do some other stuff, send some more requests while this one worksresponse=future.result()print('response status {0}'.format(response.status_code))# data will have been attached to the response object in the backgroundpprint(response.data)

钩子也可以应用于会话。

frompprintimportpprintfromrequests_futures.sessionsimportFuturesSessiondefresponse_hook(resp,*args,**kwargs):# parse the json storing the result on the response objectresp.data=resp.json()session=FuturesSession()session.hooks['response']=response_hookfuture=session.get('http://httpbin.org/get')# do some other stuff, send some more requests while this one worksresponse=future.result()print('response status {0}'.format(response.status_code))# data will have been attached to the response object in the backgroundpprint(response.data)pprint(response.data)

一个更高级的示例,向所有请求添加elapsed属性。

frompprintimportpprintfromrequests_futures.sessionsimportFuturesSessionfromtimeimporttimeclassElapsedFuturesSession(FuturesSession):defrequest(self,method,url,hooks={},*args,**kwargs):start=time()deftiming(r,*args,**kwargs):r.elapsed=time()-starttry:ifisinstance(hooks['response'],(list,tuple)):# needs to be first so we don't time other hooks executionhooks['response'].insert(0,timing)else:hooks['response']=[timing,hooks['response']]exceptKeyError:hooks['response']=timingreturnsuper(ElapsedFuturesSession,self) \
            .request(method,url,hooks=hooks,*args,**kwargs)session=ElapsedFuturesSession()future=session.get('http://httpbin.org/get')# do some other stuff, send some more requests while this one worksresponse=future.result()print('response status {0}'.format(response.status_code))print('response elapsed {0}'.format(response.elapsed))

使用processpoolexecutor

类似于threadpoolexecutor,可以使用 进程池执行器。顾名思义,请求将被执行 在单独的进程而不是线程中并发。

fromconcurrent.futuresimportProcessPoolExecutorfromrequests_futures.sessionsimportFuturesSessionsession=FuturesSession(executor=ProcessPoolExecutor(max_workers=10))# ... use as before

提示

在内存不足的情况下,使用processpoolexecutor非常有用 每个请求的使用率非常高(大响应),并且循环解释程序 需要将内存释放回操作系统。

使用processpoolexecutor的基本要求是 未来会话都是可腌制的。

这意味着只有python 3.5是完全支持的,而python版本 3.4和以上要求现有的请求。 当初始化未来会话时。python 2.x和<;3.4目前还没有 支持。

# Using python 3.4fromconcurrent.futuresimportProcessPoolExecutorfromrequestsimportSessionfromrequests_futures.sessionsimportFuturesSessionsession=FuturesSession(executor=ProcessPoolExecutor(max_workers=10),session=Session())# ... use as before

如果pickling失败,将引发指向此文档的异常。

# Using python 2.7fromconcurrent.futuresimportProcessPoolExecutorfromrequestsimportSessionfromrequests_futures.sessionsimportFuturesSessionsession=FuturesSession(executor=ProcessPoolExecutor(max_workers=10),session=Session())Traceback(mostrecentcalllast):...RuntimeError:Cannotpicklefunction.Refertodocumentation:https://github.com/ross/requests-futures/#using-processpoolexecutor

重要

  • python>;=3.4必需
  • 使用python<;3.5时需要会话实例
  • 如果对未来分类进行子分类,则它必须是可导入的(模块全局)

安装

pip install requests-futures

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
java Kafka在producer上流式传输自定义头   java ExecutorService:不需要输出   java并发读取不可修改的映射   如何匹配Java计算机的给定输出猜你的数字游戏;二进制搜索   java使用数据库创建新对象   javascript如何用spring mvc上传图像我尝试了这种方法,如何解决这个问题   java如何使用Quarkus Panache仅选择某些字段?   java如何通知其他对象(如何“关闭”队列)ArrayBlockingQueue中将不再有元素,   java使用JavaFX MediaPlayer从MP3读取ID3v2标记   当我们在Android上打开应用程序时,java仅在第三次显示toast   apache spark java。RuntimeException:java。lang.Integer不是bigint:java架构的有效外部类型。伊奥。IOException   模拟插座。用于测试Java的getOutputStream()   java在git对分期间,只运行失败的测试安全吗?或者我们应该运行所有测试?   java Android MediaPlayer音量非常低(已调整音量)   java组织。冬眠服务UnknownServiceException:请求的未知服务[org.hibernate.ogm.service.impl.OgmConfigurationService]   java扫描程序没有停止   java无法在JBoss EAP 6.2上调用远程EJB