数据集线器python sdk
pydatahub的Python项目详细描述
访问datahub python sdk api的优雅方式。 Documentation
安装
快捷方式:
$ sudo pip install pydatahub
依赖项将自动安装。
或源代码:
$ virtualenv pydatahub_env $ source pydatahub_env/bin/activate $ git clone <git clone URL> pydatahub $ cd pydatahub $ python setup.py install
如果未安装python dev,则将打印类似“python.h:no such file or directory”的错误消息。See this
如果安装在Windows中,错误消息如“微软Visual C++XX 0”是必需的,下载并安装依赖性here
如果网络不可用,则要求位于依赖项文件夹中:
$ cd dependency
$ pip install -r first.txt
$ pip install -r second.txt
python版本
在python 2.7、3.3、3.4、3.5、3.6和pypy上测试,建议使用python 3.6
依赖关系
- 设置工具(>;=39.2.0)
- 请求(>;=2.4.0)
- SimpleJSON(>;=3.3.0)
- 六个(>;=1.1.0)
- 枚举34(>;=1.1.5对于python_版本<;'3.4')
- CRCMOD(>;=1.7)
- LZ4(>;=2.0.0)
- cprotobuf(>;=0.1.9)
运行测试
- 安装tox:
$ pip install -U tox
- 用您的配置填充datahub/tests/datahub.ini
- 运行shell
$ tox
用法
fromdatahubimportDataHubdh=DataHub('**your-access-id**','**your-secret-access-key**',endpoint='**your-end-point**')# with security token# dh = DataHub('**your-access-id**', '**your-secret-access-key**', endpoint='**your-end-point**', security_token='**your-security-token**')# ============================= create project =============================project_name='my_project_name'comment='my project'dh.create_project(project_name,comment)# ============================= get project =============================project_result=dh.get_project('pydatahub_test')print(project_result)# ============================= create tuple topic =============================fromdatahub.modelsimportRecordSchema,FieldTypetopic_name='tuple_topic_test'shard_count=3life_cycle=7comment='tuple topic'record_schema=RecordSchema.from_lists(['bigint_field','string_field','double_field','bool_field','time_field'],[FieldType.BIGINT,FieldType.STRING,FieldType.DOUBLE,FieldType.BOOLEAN,FieldType.TIMESTAMP])dh.create_tuple_topic(project_name,topic_name,shard_count,life_cycle,record_schema,comment)# ============================= create blob topic =============================topic_name='blob_topic_test'shard_count=3life_cycle=7comment='blob topic'dh.create_tuple_topic(project_name,topic_name,shard_count,life_cycle,comment)# ============================= get topic =============================topic_result=dh.get_topic(project_name,topic_name)print(topic_result)print(topic_result.record_schema)# ============================= list shard =============================shards_result=dh.list_shard(project_name,topic_name)print(shards_result)# ============================= put tuple records =============================fromdatahub.modelsimportTupleRecord# put records by shard is recommendedrecords0=[]record0=TupleRecord(schema=record_schema,values=[1,'yc1',10.01,True,1455869335000000])record0.put_attribute('AK','47')records0.append(record0)put_result=dh.put_records_by_shard('pydatahub_test','tuple_topic_test',"0",records0)# records0 = []# record0 = TupleRecord(schema=record_schema, values=[1, 'yc1', 10.01, True, 1455869335000000])# record0.shard_id = '0'# record0.put_attribute('AK', '47')# records0.append(record0)# put_result = dh.put_records('pydatahub_test', 'tuple_topic_test', records0)print(put_result)# ============================= put blob records =============================fromdatahub.modelsimportBlobRecord# put records by shard is recommendeddata=Nonewithopen(os.path.join(root_path,'tests/resources/datahub.png'),'rb')asf:data=f.read()records1=[]record1=BlobRecord(blob_data=data)record1.put_attribute('a','b')records1.append(record1)put_result=dh.put_records_by_shard('pydatahub_test','blob_topic_test',"0"records1)# records1 = []# record1 = BlobRecord(blob_data=data)# record1.shard_id = '0'# record1.put_attribute('a', 'b')# records1.append(record1)# put_result = dh.put_records('pydatahub_test', 'blob_topic_test', records1)print(put_result)# ============================= get cursor =============================fromdatahub.modelsimportCursorTypecursor_result=dh.get_cursor(project_name,topic_name,'0',CursorType.OLDEST)print(cursor_result)# ============================= get blob records =============================limit=10blob_cursor_result=dh.get_cursor(project_name,topic_name,'0',CursorType.OLDEST)get_result=dh.get_blob_records(project_name,topic_name,'0',blob_cursor_result.cursor,limit)print(get_result)print(get_result.records)print(get_result.records[0])# ============================= get tuple records =============================limit=10tuple_cursor_result=dh.get_cursor(project_name,topic_name,'0',CursorType.OLDEST)get_result=dh.get_tuple_records(project_name,topic_name,'0',record_schema,tuple_cursor_result.cursor,limit)print(get_result)print(get_result.records)print(get_result.records[0].values)
示例
参见examples
中的更多示例贡献
对于开发安装,克隆存储库,然后从 来源:
git clone https://github.com/aliyun/aliyun-datahub-sdk-python.git