用于aws kinisis的异步python客户端

async-kinesis-client的Python项目详细描述


异步运动客户端

使用异步的python kinisis客户端库

基于evan borgstromeborgstrom@nerdwallet.com的kinisis python项目 https://github.com/NerdWalletOSS/kinesis-python但是使用异步魔术

kinisis python的问题是所有的数据最终都在一个线程中 从那里被检查-所以尽管有许多进程,客户端 被检查点阻塞了。此外,它检查每个记录,这是 不可配置。

这个客户机基于aioboto3库,使用python 3.6+异步方法。

用法:

importasynciofromasync_kinesis_client.kinesis_consumerimportAsyncKinesisConsumerasyncdefread_stream():# This is a coroutine that reads all the records from a shardasyncdefread_records(shard_reader):asyncforrecordsinshard_reader.get_records():forrinrecords:print('Shard: {}; Record: {}'.format(shard_reader.shard_id,r))consumer=AsyncKinesisConsumer(stream_name='my-stream',checkpoint_table='my-checkpoint-table')# consumer will yield existing shards and will continue yielding# new shards if re-sharding happens             asyncforshard_readerinconsumer.get_shard_readers():print('Got shard reader for shard id: {}'.format(shard_reader.shard_id))asyncio.ensure_future(read_records(shard_reader))asyncio.get_event_loop().run_until_complete(read_stream())

asyncSharDreaderasyncKinesisConsumer可以通过调用stop()方法从并行协程中停止, 在这种情况下,消费者将停止所有碎片读取器。 如果希望收到关闭碎片的通知,请在读取记录时捕获shardclosedexception

asyncshardreader在最新版本后面公开millis属性,这可能有助于确定应用程序性能。

asynckinesisconsumer具有以下配置方法:

set_checkpoint_interval(records)-检查点之前要跳过多少记录

set_lock_duration(time)-保留锁的秒数。消费者将在该时间之前尝试刷新锁

set_reader_sleep_time(time)-如果shard reader没有从动觉流接收到任何记录,它应该等待多长时间(以秒为单位,可能是小数)

set_checkpoint_callback(coro)-在检查下一批记录之前设置要调用的回调协程。协程参数:shardidsequenceNumber

制作人很琐碎:

fromasync_kinesis_client.kinesis_producerimportAsyncKinesisProducer# ...asyncdefwrite_stream():producer=AsyncKinesisProducer(stream_name='my-stream',ordered=True)awaitproducer.put_record(record=b'bytes',partition_key='string',# optional, if none, default time-based key is usedexplicit_hash_key='string'# optional)

一次发送多条记录:

fromasync_kinesis_client.kinesis_producerimportAsyncKinesisProducer# ...asyncdefwrite_stream():producer=AsyncKinesisProducer(stream_name='my-stream',ordered=True)records=[{'Data':b'bytes','PartitionKey':'string',# optional, if none, default time-based key is used'ExplicitHashKey':'string'# optional},...]response=awaitproducer.put_records(records=records)# See boto3 docs for response structure:# https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/kinesis.html#Kinesis.Client.put_records

AWS认证。对于在aws云外进行测试,特别是在使用多因素身份验证时,我发现以下代码片段非常有用:

importosimportaioboto3frombotocoreimportcredentialsfromaiobotocoreimportAioSessionworking_dir=os.path.join(os.path.expanduser('~'),'.aws/cli/cache')session=AioSession(profile=os.environ.get('AWS_PROFILE'))provider=session.get_component('credential_provider').get_provider('assume-role')provider.cache=credentials.JSONFileCache(working_dir)aioboto3.setup_default_session(botocore_session=session)

这允许在完成awsudo下的任何aws命令后重新使用缓存的会话令牌,您只需要设置aws_profile环境变量。

目前,库的测试仍然不足以应对不同的网络事件。 有人警告过你,你自己去冒险吧。

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
java在Spring Boot中从CrudePository访问表的几列   biginteger如何将Java long转换为*无符号*baseX字符串(以及返回)?   NetBeans“方法”颜色的java深色主题不起作用   位操作如何在位集JAVA中左右移动位?   java如何在Spring Boot中调用函数后释放所有缓存数据   java如何在IntelliJ中调试子进程?   使用Jersey API处理java文件上传和缓冲内部   java如何在安卓上显示通知?   java如何使用mybatis 3传递动态字段和值   java第三方API在多模块Maven项目中看不到类   如何使用java在SQLServer2005中存储日期   java确实需要接口指针来提供多个版本的JNI函数表吗   java我的希伯来语文本在iText中是左对齐的   java在清单的java选项中配置密钥库。yml在铸造PCF中的应用