在AWS Lambda中使用boto3 ThreadPoolExecutor并行检索S3对象

2024-06-12 06:20:12 发布

您现在位置:Python中文网/ 问答频道 /正文

我试图弄清楚为什么下面的代码会同时执行,无论是单线程还是使用ThreadPoolExecutor。我的Lambda函数从S3检索多个JSON文件,所有这些文件的大小大约为2k。对于我的测试,我使用了100个文件,不管我使用的是ThreadPoolExecutor还是单线程代码,这都需要2秒以上的时间。工作线程的数量也没有任何区别,因为我尝试了10个和25个,结果是一样的

这是多线程版本:

    s3_data = {}
    logger.debug('Creating boto3 S3 client')
    # Make sure s3 client has a large enough connection pool
    s3_client = boto3.client('s3', config=botocore.config.Config(max_pool_connections=max_s3_threads))
    # This assumes boto3 clients are thread safe
    logger.debug(f'Starting parallel S3 data retrieval, max threads={max_s3_threads}')
    with concurrent.futures.ThreadPoolExecutor(max_workers=max_s3_threads) as executor:
        s3_threads = {
            executor.submit(get_s3_data, s3_client, row['s3Key']): row['id']
            for row in rows.values() 
        }
        for s3_thread in concurrent.futures.as_completed(s3_threads):
            id = s3_threads[s3_thread]
            s3_data[ad_id] = s3_thread.result()
    logger.debug(f'Finished parallel S3 data retrieval, threads completed={len(s3_threads)}')

这是单线程版本:

    logger.debug('Starting sequential S3 data retrieval')
    s3_client = boto3.client('s3')
    for row in unique_artworks.values():
        s3_data[row['artworkId']] = get_s3_data(s3_client, row['s3Key'])
    logger.debug(f'Finished sequential S3 data retrieval')

get_s3_data函数只调用s3_client.get_object,调用时使用从环境变量获得的bucket名称和传入的键,并以dict形式返回JSON。这并不复杂

这段代码应该是I/O绑定的,而不是CPU绑定的,所以根据我所读到的内容,我不认为GIL会妨碍它。所有线程都使用相同的S3客户机对象实例,但这应该是安全的(我在输出中没有看到任何不可靠的结果)。为了以防万一,我尝试在被调用的函数中创建S3客户机,但速度更慢。我希望看到使用ThreadPoolExecutor会带来一些好处,但我不明白为什么我不这么做

也许我的代码有问题,或者我遗漏了一些参数。我在谷歌上搜索了大量的帖子、博客和文章,但仍然没有找到解决方案,所以我希望这里的人能提供一些见解


Tags: 代码debugclientdatagets3loggerboto3