getstream.io的客户端。在几小时内而不是几周内构建可伸缩的新闻源和活动流。

stream-python的Python项目详细描述


流式python

Build StatuscodecovPyPI version

stream-pythonStream的官方python客户机,这是一个用于构建可伸缩新闻源和活动流的web服务。

注意,还有一个更高级别的Django - Stream integration库,它钩住了django orm。

您可以在https://getstream.io/get_started注册流帐户。

安装

stream python支持:

  • Python(2.6、2.7、3.4、3.5、3.6、3.7)

从pypi安装
pip install stream-python

完整文档

此python客户机的文档可在Stream websiteRead the Docs上找到。

使用量

importdatetime# Instantiate a new clientimportstreamclient=stream.connect('YOUR_API_KEY','API_KEY_SECRET')# INstantiate a new client specifying datacenter locationclient=stream.connect('YOUR_API_KEY','API_KEY_SECRET',location='us-east')# Find your API keys here https://getstream.io/dashboard/# Instantiate a feed objectuser_feed_1=client.feed('user','1')# Get activities from 5 to 10 (slow pagination)result=user_feed_1.get(limit=5,offset=5)# (Recommended & faster) Filter on an id less than the given UUIDresult=user_feed_1.get(limit=5,id_lt="e561de8f-00f1-11e4-b400-0cc47a024be0")# Create a new activityactivity_data={'actor':1,'verb':'tweet','object':1,'foreign_id':'tweet:1'}activity_response=user_feed_1.add_activity(activity_data)# Create a bit more complex activityactivity_data={'actor':1,'verb':'run','object':1,'foreign_id':'run:1','course':{'name':'Golden Gate park','distance':10},'participants':['Thierry','Tommaso'],'started_at':datetime.datetime.now()}user_feed_1.add_activity(activity_data)# Remove an activity by its iduser_feed_1.remove_activity("e561de8f-00f1-11e4-b400-0cc47a024be0")# or by foreign iduser_feed_1.remove_activity(foreign_id='tweet:1')# Follow another feeduser_feed_1.follow('flat','42')# Stop following another feeduser_feed_1.unfollow('flat','42')# List followers/followingfollowing=user_feed_1.following(offset=0,limit=2)followers=user_feed_1.followers(offset=0,limit=10)# Creates many follow relationships in one requestfollows=[{'source':'flat:1','target':'user:1'},{'source':'flat:1','target':'user:2'},{'source':'flat:1','target':'user:3'}]client.follow_many(follows)# Batch adding activitiesactivities=[{'actor':1,'verb':'tweet','object':1},{'actor':2,'verb':'watch','object':3}]user_feed_1.add_activities(activities)# Add an activity and push it to other feeds too using the `to` fieldactivity={"actor":"1","verb":"like","object":"3","to":["user:44","user:45"]}user_feed_1.add_activity(activity)# Retrieve an activity by its IDclient.get_activities(ids=[activity_id])# Retrieve an activity by the combination of foreign_id and timeclient.get_activities(foreign_id_times=[(foreign_id,activity_time),])# Update some parts of an activity with activity_partial_updateset={'product.name':'boots','colors':{'red':'0xFF0000','green':'0x00FF00'}}unset=['popularity','details.info']# ...by IDclient.activity_partial_update(id=activity_id,set=set,unset=unset)# ...or by combination of foreign_id and timeclient.activity_partial_update(foreign_id=foreign_id,time=activity_time,set=set,unset=unset)# Generating user token for client side usage (JS client)user_token=client.create_user_token("user-42")# Javascript client side feed initialization# client = stream.connect(apiKey, userToken, appId);# Generate a redirect url for the Stream Analytics platform to track# events/impressions on url clicksimpression={'content_list':['tweet:1','tweet:2','tweet:3'],'user_data':'tommaso','location':'email','feed_id':'user:global'}engagement={'content':'tweet:2','label':'click','position':1,'user_data':'tommaso','location':'email','feed_id':'user:global'}events=[impression,engagement]redirect_url=client.create_redirect_url('http://google.com/','user_id',events)

JS client

贡献

首先,确保可以运行测试套件。通过py.test运行测试

py.test
# with coverage
py.test --cov stream --cov-report html
# against a local API backendLOCAL=true py.test

安装黑色和片状8

pip install black
pip install flake8

安装git钩子以避免推送无效代码(git commit将运行black和flak8)

发布新版本

为了发布新版本,您需要成为pypi的维护人员。

  • 更新更改日志
  • 在setup.py上更新版本
  • 提交并推送到github
  • 为版本创建新标记(例如v2.9.0
  • 用python创建一个新的distpython setup.py sdist
  • 上传新的葡萄酒分销twine upload dist/stream-python-VERSION-NAME.tar.gz

如果不确定,也可以使用pypi测试服务器进行测试twine upload --repository-url https://test.pypi.org/legacy/ dist/stream-python-VERSION-NAME.tar.gz

版权和许可信息

版权所有(c)2014-2017 stream.io inc和个人贡献者。保留所有权利。

有关此软件的历史记录、使用条款和条件以及所有保证的免责声明的信息,请参阅“许可证”文件。

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
java如何在创建对话框时设置模糊背景   java支持clojure中的xml和json REST响应   java在Android中通过多个JSON对象循环   java如何创建T类型的新对象   Java应用程序的设计   java使用GridView、适配器和毕加索制作流行电影应用程序   java在映射中交换值   java在同一活动/布局中多次使用同一片段   使用FixedLengthTokenizer使用java Spring FlatFileItemReader   javajavax。xml。ws。WebServiceException:javax。xml。肥皂SOAPException:错误代码QName必须是命名空间限定的!在weblogic server 12c中部署时   当我在构造函数中调用java Autowired属性时,该属性为null   线程“main”java中的linux异常。网BindException:地址已在使用中   java检查两个日期周期是否重叠   有没有办法通过安卓应用程序自动检测java服务器应用程序是否在线?   java检查2D数组中4个连续相同的对角线元素(连接4个游戏)   向Java数组添加数据   java组织。弹性搜索。客户运输NoNodeAvailableException:配置的节点均不可用:[]