python模块有助于利用kinisis消息聚合格式进行聚合和解聚合。

aws-kinesis-agg的Python项目详细描述


python动态聚合和解聚合模块

python的kinisis聚合/解聚合模块提供了使用kinisis聚合r对标准kinisis用户记录进行内存聚合和解聚合的能力。ecord格式允许更有效地传输记录。

安装

python记录聚合/解聚合模块在python包索引(pypi)上可用,如aws-kinisis-agg-rel="nofollow">aws-kinisis-agg。您可以通过pip命令行工具安装它:

pip install aws_kinesis_agg

或者,您可以简单地从这个存储库中复制aws-kinisis-agg模块,并直接使用它,但需要注意的是,google protobuf模块也必须可用(如果您通过pip安装,这个依赖项将我会为您服务的。

记录聚合模块(aggregator.py)

aggregator.py模块包含python类,这些类允许您使用kinisis聚合记录格式来聚合记录。使用记录聚合可以提高吞吐量,并在编写将数据发布到Amazon Kinesis的生产应用程序时降低成本。

用法

记录聚合模块提供了一个简单的接口,用于在生产者应用程序中创建协议缓冲区编码的数据。aws-kinisis-u agg模块提供了有效地将单个记录打包成更大的聚合记录的方法。

使用聚合时,可以创建一个recordaggregator对象,然后为每个记录提供分区键、原始数据和(可选)显式哈希键。您可以选择提供一个回调函数,该函数将在完全打包的聚合记录可用时调用,也可以添加记录并检查字节大小或记录数,直到聚合记录适当满为止。您可以保证从recordaggregator对象返回的任何聚合记录都将符合对kinisis的单个putRecord请求。

要开始,请导入aws-kinisis\u agg模块:

import aws_kinesis_agg

当您在producer应用程序中生成记录时,您将使用aws\u kinisis\u agg模块中提供的聚合方法来聚合它们。aws-kinisis-u-agg模块提供了执行迭代聚合和基于回调的聚合的方法。

迭代聚合

迭代聚合方法包括一次向recordaggregator添加一个记录,并检查响应以确定何时可以使用完全聚合的记录。当现有聚合记录中有足够的空间容纳更多记录时,add_user_record方法返回none并在完整聚合记录可供传输时返回aggrecord对象。

for rec in records:
    result = kinesis_aggregator.add_user_record(rec.PartitionKey, rec.Data, rec.ExplicitHashKey)
    if result:
        #Send the result to Kinesis    

基于回调的聚合

要使用基于回调的聚合,必须通过on_record_complete方法注册回调。当您向recordaggregator对象添加单个记录时,只要有新的完全打包的聚合记录可用,您就会收到一个回调(在单独的线程上)。

def my_callback(agg_record):
    #Send the record to Kinesis

...

kinesis_aggregator.on_record_complete(my_callback)
for rec in records:
    kinesis_aggregator.add_user_record(rec.PartitionKey, rec.Data, rec.ExplicitHashKey)

示例

此存储库包含一个示例脚本,该脚本使用记录聚合模块aggregator.py来聚合记录,并使用基于回调的聚合将记录传输到amazon kinisis。您可以在文件中找到这个示例功能,该文件可以用作您自己应用程序的模板。离子可以轻松地生成和传输编码数据。

基于回调的聚合和传输示例

下面的示例假设您运行的是Python2.7.x版本,并且还需要安装和配置boto3模块。您可以通过pip install boto3或任何其他普通的python安装机制来安装boto3。要将示例配置为能够发布到您的kinisis流,请确保遵循boto3配置指南中的说明。为了简洁起见,下面的示例已被删除,但您仍然可以在kinisis_publisher.py找到完整的工作版本。简略的例子是:

import boto3
import aws_kinesis_agg.aggregator

kinesis_client = None

def send_record(agg_record):
    global kinesis_client
    pk, ehk, data = agg_record.get_contents()
    kinesis_client.put_record(StreamName='MyKinesisStreamName',
                                  Data=data,
                                  PartitionKey=pk,
                                  ExplicitHashKey=ehk)

if __name__ == '__main__':
    kinesis_client = boto3.client('kinesis', region_name='us-west-2')

    kinesis_agg = aws_kinesis_agg.aggregator.RecordAggregator()
    kinesis_agg.on_record_complete(send_record)

    for i in range(0,1024):
        pk, ehk, data = get_record(...)
        kinesis_agg.add_user_record(pk, data, ehk)

    #Clear out any remaining records that didn't trigger a callback yet
    send_record(kinesis_agg.clear_and_get()) 

记录解聚模块(deaggregator.py)

deaggregator.py模块包含python类,这些类允许您使用kinisis聚合r对传输的记录进行解聚合。ECORD格式,包括由动觉生产者库传输的格式。此库将允许您在任何python环境(包括aws lambda)中解除聚合记录。

用法

record deaggregation模块提供了一个简单的接口,用于在使用者应用程序中处理kinisi聚合的消息数据。aws-kinisis-u-agg模块为基于批量和基于生成器的处理提供方法。

使用解聚合时,您将提供一个聚合的运动记录并返回多个运动用户记录。如果提供的一个kinisis记录不是聚合的kinisis记录,那就很好了——您只需从单个记录输入中获得单个记录输出。从解聚合返回的动觉用户记录如下:

{
    'eventVersion' : String - The version number of the Kinesis event used
    'eventID' : String - The unique ID of this Kinesis event
    'kinesis' :
    {
        'partitionKey' : String - The Partition Key provided when the record was submitted
        'explicitHashKey' : String - The hash value used to explicitly determine the shard the data record is assigned to by overriding the partition key hash (or None if absent) 
        'data' : String - The original data transmitted by the producer (base64 encoded)
        'kinesisSchemaVersion' : String - The version number of the Kinesis message schema used,
        'sequenceNumber' : BigInt - The sequence number assigned to the record on submission to Kinesis
        'subSequenceNumber' : Int - The sub-sequence number for the User Record in the aggregated record, if aggregation was in use by the producer
        'aggregated' : Boolean - Always True for a user record extracted from a Kinesis aggregated record
    },
    'invokeIdentityArn' : String - The ARN of the IAM user used to invoke this Lambda function
    'eventName' : String - Always "aws:kinesis:record" for a Kinesis record
    'eventSourceARN' : String - The ARN of the source Kinesis stream
    'eventSource' : String - Always "aws:kinesis" for a Kinesis record
    'awsRegion' : String - The name of the source region for the event (e.g. "us-east-1")
}

要开始,请导入aws-kinisis\u agg模块:

导入aws-kinisis-u-agg

下一步,当您在消费者应用程序中收到动觉记录时,您将使用aws动觉agg模块中提供的解聚集方法提取用户记录。

重要信息:在aws-kinisis-agg模块中提供的解聚集方法预期输入记录的格式与通常从aws-lambda接收到的基于词典的格式相同。有关详细信息,请参阅aws文档的用python编写lambda函数的编程模型部分。

批量转换

去聚合的批量转换方法接受一个运动记录列表,提取所有聚合的用户记录并将其累积到一个列表中。传递给此方法的任何记录如果不是kinisis聚合记录,则将原封不动地返回。该方法返回一个kinisis用户记录列表,其格式与lambda的kinisis事件处理程序通常传递的格式相同。

user_records = deaggregate_records(raw_kinesis_records)

基于发电机的转换

基于生成器的解聚集转换方法使用python生成器函数以迭代方式一次从原始运动记录中提取一个用户记录。传递给此方法的任何记录如果不是kinisis聚合记录,则将原封不动地返回。例如,您可以使用此代码遍历每个非聚合记录:

for record in iter_deaggregate_records(raw_kinesis_records):        

    #Process each record
    pass 

示例

本模块在文件中包含两个示例aws lambda函数,即lambda函数.py,它使您能够轻松地构建新函数,通过aws lambda处理动态信息聚合数据。

批量转换示例

from __future__ import print_function

from aws_kinesis_agg.deaggregator import deaggregate_records
import base64

def lambda_bulk_handler(event, context):

    raw_kinesis_records = event['Records']

    #Deaggregate all records in one call
    user_records = deaggregate_records(raw_kinesis_records)

    #Iterate through deaggregated records
    for record in user_records:        

        # Kinesis data in Python Lambdas is base64 encoded
        payload = base64.b64decode(record['kinesis']['data'])

        #TODO: Process each record

    return 'Successfully processed {} records.'.format(len(user_records))

基于生成器的转换示例
pip install aws_kinesis_agg
0

构建并部署lambda函数以处理运动记录

开始处理动觉数据的一个简单方法是使用aws lambda。通过在此存储库中现有的lambda_function.py模块的基础上构建lambda_function.py,您无需编写样板代码就可以利用动态信息解聚功能。

当您准备好进行构建并上传到aws lambda时,您有两个选择:

  • 在这个python项目的根目录中,您可以找到一个名为make_lambda_build.py的示例生成文件。此文件是一个与平台无关的构建脚本,它将获取此演示中的现有python项目,并将其打包到一个名为python\u lambda\u build.zip的构建文件中,您可以直接将其上载到aws lambda。

为了使用构建脚本,请确保pythonpip工具在命令行上可用。如果您有其他的pip依赖项,请确保将它们添加到make lambda_build.py>顶部的列表中。然后运行此命令:

pip install aws_kinesis_agg
1

构建脚本将创建一个名为build的新文件夹,复制所有python源文件,通过pip下载任何必要的依赖项,并创建可以部署到aws lambda的文件python_lambda_build.zip

aws lambda用户的重要构建说明

如果选择将自己的python zip文件部署到aws lambda,请注意google的protobuf模块通常依赖于使用python的pth设置来将根节点google的protobuf模块导入。如果在aws lambda日志中看到错误,例如:

pip install aws_kinesis_agg
2

您可以进入google模块文件夹(包含protobuf文件夹的同一文件夹)并生成一个名为.py的空文件。一旦您重新压缩所有内容并重新部署,这将修复上面的错误。

注意:如果您使用了提供的脚本,则此问题已经为您解决了。


版权所有2014-2015 Amazon.com,Inc.或其附属公司。保留所有权利。

根据亚马逊软件许可证("许可证")授权。除非符合许可证,否则您不能使用此文件。许可证副本位于

pip install aws_kinesis_agg
3

或者在该文件附带的"许可证"文件中。本文件按"原样"分发,无任何明示或暗示的保证或条件。请参阅许可证,了解该许可证下管理权限和限制的特定语言。

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
java apache Jclouds与quarkus兼容吗?   java Switch语句和字符串到字节   java在Windows中处理unix路径   java将XML中的不同值插入数据库表   Android Room中带日期的java查询   java如何将vaadin7与googlemap连接起来   java有条件地忽略特定属性DTO   数据库中的java Spring最后一个ID作为JSP中的默认输入值。。怎样   java创建和使用匿名可运行类的最佳方法   关于布尔语句中参数的java问题   java JMH:无法创建SecurityManager:worker。组织。格拉德尔。过程内部的工人小孩BootstrapSecurityManager   JavaXMLStreamReader编码   java Hibernate空指针位于AbstractUserTypeHibernateIntegrator用户类型   安卓支持v7。小装置。在使用安卓 studio的java文件中找不到工具栏?   java从res文件夹中读取名为的文件(从变量读取)   java如何解决“图形设备初始化失败:d3d、sw”问题   java字节缓冲区可以用作“长寿”静态对象吗?   java如何获取currentlyloggedin用户的用户名   java如何优雅地检测SSL