用于aws kinisis的异步python客户端

async-kinesis-client的Python项目详细描述


异步运动客户端

使用异步的python kinisis客户端库

基于evan borgstromeborgstrom@nerdwallet.com的kinisis python项目 https://github.com/NerdWalletOSS/kinesis-python但是使用异步魔术

kinisis python的问题是所有的数据最终都在一个线程中 从那里被检查-所以尽管有许多进程,客户端 被检查点阻塞了。此外,它检查每个记录,这是 不可配置。

这个客户机基于aioboto3库,使用python 3.6+异步方法。

用法:

importasynciofromasync_kinesis_client.kinesis_consumerimportAsyncKinesisConsumerasyncdefread_stream():# This is a coroutine that reads all the records from a shardasyncdefread_records(shard_reader):asyncforrecordsinshard_reader.get_records():forrinrecords:print('Shard: {}; Record: {}'.format(shard_reader.shard_id,r))consumer=AsyncKinesisConsumer(stream_name='my-stream',checkpoint_table='my-checkpoint-table')# consumer will yield existing shards and will continue yielding# new shards if re-sharding happens             asyncforshard_readerinconsumer.get_shard_readers():print('Got shard reader for shard id: {}'.format(shard_reader.shard_id))asyncio.ensure_future(read_records(shard_reader))asyncio.get_event_loop().run_until_complete(read_stream())

asyncSharDreaderasyncKinesisConsumer可以通过调用stop()方法从并行协程中停止, 在这种情况下,消费者将停止所有碎片读取器。 如果希望收到关闭碎片的通知,请在读取记录时捕获shardclosedexception

asyncshardreader在最新版本后面公开millis属性,这可能有助于确定应用程序性能。

asynckinesisconsumer具有以下配置方法:

set_checkpoint_interval(records)-检查点之前要跳过多少记录

set_lock_duration(time)-保留锁的秒数。消费者将在该时间之前尝试刷新锁

set_reader_sleep_time(time)-如果shard reader没有从动觉流接收到任何记录,它应该等待多长时间(以秒为单位,可能是小数)

set_checkpoint_callback(coro)-在检查下一批记录之前设置要调用的回调协程。协程参数:shardidsequenceNumber

制作人很琐碎:

fromasync_kinesis_client.kinesis_producerimportAsyncKinesisProducer# ...asyncdefwrite_stream():producer=AsyncKinesisProducer(stream_name='my-stream',ordered=True)awaitproducer.put_record(record=b'bytes',partition_key='string',# optional, if none, default time-based key is usedexplicit_hash_key='string'# optional)

一次发送多条记录:

fromasync_kinesis_client.kinesis_producerimportAsyncKinesisProducer# ...asyncdefwrite_stream():producer=AsyncKinesisProducer(stream_name='my-stream',ordered=True)records=[{'Data':b'bytes','PartitionKey':'string',# optional, if none, default time-based key is used'ExplicitHashKey':'string'# optional},...]response=awaitproducer.put_records(records=records)# See boto3 docs for response structure:# https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/kinesis.html#Kinesis.Client.put_records

AWS认证。对于在aws云外进行测试,特别是在使用多因素身份验证时,我发现以下代码片段非常有用:

importosimportaioboto3frombotocoreimportcredentialsfromaiobotocoreimportAioSessionworking_dir=os.path.join(os.path.expanduser('~'),'.aws/cli/cache')session=AioSession(profile=os.environ.get('AWS_PROFILE'))provider=session.get_component('credential_provider').get_provider('assume-role')provider.cache=credentials.JSONFileCache(working_dir)aioboto3.setup_default_session(botocore_session=session)

这允许在完成awsudo下的任何aws命令后重新使用缓存的会话令牌,您只需要设置aws_profile环境变量。

目前,库的测试仍然不足以应对不同的网络事件。 有人警告过你,你自己去冒险吧。

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
带有字符串的java JNA调用与带有字节[]的java JNA调用的行为不同   java基于键列表获取子映射   重启后永久增加java堆大小?   JavaHTTPS服务器:相互SSL身份验证   java为什么接受接口的方法会拒绝该接口的实现?   片段中的java视图无法应用于()   ms access Java SQL更新命令不工作   java将web服务自动打包和部署到Oracle Application Server 10g   java有没有办法在安卓 studio中为安卓时钟设置多个警报?   位于FTP服务器上的文件上的Java校验和md5   在Java中创建类时遇到问题。有些方法不太确定   java错误:在类chrome\u驱动程序中找不到主方法   通用海图(Javascript\Java)