如何为EventStream创建一个mock,它使用s3selectapi来基于s3select查询获取内容?

2024-05-29 07:36:26 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在为一个函数创建一个单元测试,该函数使用boto3的S3客户机函数“select_object_content”从S3存储桶中读取对象。我想嘲笑的反应是

{
    'Payload': EventStream({
        'Records': {
            'Payload': b'bytes'
        },
        'Stats': {
            'Details': {
                'BytesScanned': 123,
                'BytesProcessed': 123,
                'BytesReturned': 123
            }
        },
        'Progress': {
            'Details': {
                'BytesScanned': 123,
                'BytesProcessed': 123,
                'BytesReturned': 123
            }
        },
        'Cont': {},
        'End': {}
    })
}

有效负载是一个EventStream对象,创建为EventStream(self、raw_stream、output_shape、parser、operation_name)并接受4个参数。我有一个用'utf-8'编码的字节串形式的原始流,但是我找不到关于其他参数如何赋值的更多信息。在

我用MagicMock来模仿s3_client.select_object_内容. 在

我希望能够将athena结果(作为CSV放在S3中)作为流传递,并确保代码具有处理特定场景的单元测试。在

编辑:我可以用以下结构来模拟回答:

mock函数的返回类型是Dict[str,Any]

^{pr2}$

Tags: 对象函数参数客户机objects3单元测试details

热门问题