用于aws kinisis的异步python客户端
async-kinesis-client的Python项目详细描述
异步运动客户端
使用异步的python kinisis客户端库
基于evan borgstromeborgstrom@nerdwallet.com的kinisis python项目 https://github.com/NerdWalletOSS/kinesis-python但是使用异步魔术
kinisis python的问题是所有的数据最终都在一个线程中 从那里被检查-所以尽管有许多进程,客户端 被检查点阻塞了。此外,它检查每个记录,这是 不可配置。
这个客户机基于aioboto3库,使用python 3.6+异步方法。
用法:
importasynciofromasync_kinesis_client.kinesis_consumerimportAsyncKinesisConsumerasyncdefread_stream():# This is a coroutine that reads all the records from a shardasyncdefread_records(shard_reader):asyncforrecordsinshard_reader.get_records():forrinrecords:print('Shard: {}; Record: {}'.format(shard_reader.shard_id,r))consumer=AsyncKinesisConsumer(stream_name='my-stream',checkpoint_table='my-checkpoint-table')# consumer will yield existing shards and will continue yielding# new shards if re-sharding happens asyncforshard_readerinconsumer.get_shard_readers():print('Got shard reader for shard id: {}'.format(shard_reader.shard_id))asyncio.ensure_future(read_records(shard_reader))asyncio.get_event_loop().run_until_complete(read_stream())
asyncSharDreader和asyncKinesisConsumer可以通过调用stop()方法从并行协程中停止, 在这种情况下,消费者将停止所有碎片读取器。 如果希望收到关闭碎片的通知,请在读取记录时捕获shardclosedexception。
asyncshardreader在最新版本后面公开millis属性,这可能有助于确定应用程序性能。
asynckinesisconsumer具有以下配置方法:
set_checkpoint_interval(records)-检查点之前要跳过多少记录
set_lock_duration(time)-保留锁的秒数。消费者将在该时间之前尝试刷新锁
set_reader_sleep_time(time)-如果shard reader没有从动觉流接收到任何记录,它应该等待多长时间(以秒为单位,可能是小数)
set_checkpoint_callback(coro)-在检查下一批记录之前设置要调用的回调协程。协程参数:shardid,sequenceNumber
制作人很琐碎:
fromasync_kinesis_client.kinesis_producerimportAsyncKinesisProducer# ...asyncdefwrite_stream():producer=AsyncKinesisProducer(stream_name='my-stream',ordered=True)awaitproducer.put_record(record=b'bytes',partition_key='string',# optional, if none, default time-based key is usedexplicit_hash_key='string'# optional)
一次发送多条记录:
fromasync_kinesis_client.kinesis_producerimportAsyncKinesisProducer# ...asyncdefwrite_stream():producer=AsyncKinesisProducer(stream_name='my-stream',ordered=True)records=[{'Data':b'bytes','PartitionKey':'string',# optional, if none, default time-based key is used'ExplicitHashKey':'string'# optional},...]response=awaitproducer.put_records(records=records)# See boto3 docs for response structure:# https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/kinesis.html#Kinesis.Client.put_records
AWS认证。对于在aws云外进行测试,特别是在使用多因素身份验证时,我发现以下代码片段非常有用:
importosimportaioboto3frombotocoreimportcredentialsfromaiobotocoreimportAioSessionworking_dir=os.path.join(os.path.expanduser('~'),'.aws/cli/cache')session=AioSession(profile=os.environ.get('AWS_PROFILE'))provider=session.get_component('credential_provider').get_provider('assume-role')provider.cache=credentials.JSONFileCache(working_dir)aioboto3.setup_default_session(botocore_session=session)
这允许在完成awsudo下的任何aws命令后重新使用缓存的会话令牌,您只需要设置aws_profile环境变量。
目前,库的测试仍然不足以应对不同的网络事件。 有人警告过你,你自己去冒险吧。