并发未来与异步IO的区别

1 投票
1 回答
67 浏览
提问于 2025-04-14 18:36

我正在尝试通过异步编程来优化我的Python代码。为此,我试用了asyncio和concurrent.futures这两个库。

以下是我的代码:

async def get_rds_instances(session, region, engine_types):
    report_rds = []
    mandatory_tags = {'Use-Case'}

    client = session.client('rds', region_name=region)
        try:
            await asyncio.sleep(1)
            response = client.describe_db_instances()
            rds_report.append(response)
        except (ClientError, Exception) as e:
            print(e)
    return reportd_rds

async def main():
... some arguments definition
    session = get_rds_session(profile_name)
    regions = session.get_available_regions('rds')
    try:
        reports = await asyncio.gather(*[get_rds_instances(session=session, region=region, engine_types=engine_types) for region in regions])
    except Exception as e:
        print(f"An error occurred: {str(e)}")
    
    ... process report

if __name__ == "__main__":
    asyncio.run(main())

没有使用asyncio时,这段代码大约花了22秒完成。使用asyncio后,完成时间缩短到大约20秒。

接下来有趣的地方来了,我使用了concurrent.futures:

def get_rds_instances(session, region, engine_types):
    report_rds = []
    mandatory_tags = {'Use-Case'}

    client = session.client('rds', region_name=region)
        try:
            response = client.describe_db_instances()
            rds_report.append(response)
        except (ClientError, Exception) as e:
            print(e)
    return reportd_rds


def main():
... some arguments definition
    session = get_rds_session(profile_name)
    regions = session.get_available_regions('rds')
    try:
        args = ((session, region, engine_types) for region in regions)
        with concurrent.futures.ThreadPoolExecutor() as executor:
        reports = executor.map(lambda p: get_rds_instances(*p), args)
    except Exception as e:
        print(f"An error occurred: {str(e)}")

    ... process report

if __name__ == "__main__":
    main()

这段代码大约只花了3秒就完成了。

所以,我想问一下,这个差异正常吗?我在使用asyncio时是不是漏掉了什么,或者做错了什么?

补充:

describe_db_instances

谢谢!

1 个回答

0

在和我的同事们讨论后,发现describe_db_instances是一个可以被阻塞的函数,但像@user4815162342提到的那样,为每个区域创建多个线程确实有帮助,具体可以参考这里。所以我对代码做了一些修改:

def get_session(profile):
    session = boto3.Session(profile_name=profile)
    return session


def get_clients(session):
    regions = session.get_available_regions('rds')
    clients = [session.client('rds', region_name=region) for region in regions]
    return clients


async def get_instances(client, engine_types):
    output = []
    loop = asyncio.get_event_loop()

    try:
        response = await loop.run_in_executor(None, client.describe_db_instances)
        output.append(response['DBInstances'])
    except (ClientError, Exception) as e:
        print(e)
    return output


async def main():
... some arguments definition
    session = get_session(profile_name)
    clients = get_clients(session)

    reports = await asyncio.gather(*[get_instances(client, engine) for client in clients])

... process report

if __name__ == "__main__":
    asyncio.run(main())

撰写回答