并发未来与异步IO的区别
我正在尝试通过异步编程来优化我的Python代码。为此,我试用了asyncio和concurrent.futures这两个库。
以下是我的代码:
async def get_rds_instances(session, region, engine_types):
report_rds = []
mandatory_tags = {'Use-Case'}
client = session.client('rds', region_name=region)
try:
await asyncio.sleep(1)
response = client.describe_db_instances()
rds_report.append(response)
except (ClientError, Exception) as e:
print(e)
return reportd_rds
async def main():
... some arguments definition
session = get_rds_session(profile_name)
regions = session.get_available_regions('rds')
try:
reports = await asyncio.gather(*[get_rds_instances(session=session, region=region, engine_types=engine_types) for region in regions])
except Exception as e:
print(f"An error occurred: {str(e)}")
... process report
if __name__ == "__main__":
asyncio.run(main())
没有使用asyncio时,这段代码大约花了22秒完成。使用asyncio后,完成时间缩短到大约20秒。
接下来有趣的地方来了,我使用了concurrent.futures:
def get_rds_instances(session, region, engine_types):
report_rds = []
mandatory_tags = {'Use-Case'}
client = session.client('rds', region_name=region)
try:
response = client.describe_db_instances()
rds_report.append(response)
except (ClientError, Exception) as e:
print(e)
return reportd_rds
def main():
... some arguments definition
session = get_rds_session(profile_name)
regions = session.get_available_regions('rds')
try:
args = ((session, region, engine_types) for region in regions)
with concurrent.futures.ThreadPoolExecutor() as executor:
reports = executor.map(lambda p: get_rds_instances(*p), args)
except Exception as e:
print(f"An error occurred: {str(e)}")
... process report
if __name__ == "__main__":
main()
这段代码大约只花了3秒就完成了。
所以,我想问一下,这个差异正常吗?我在使用asyncio时是不是漏掉了什么,或者做错了什么?
补充:
谢谢!
1 个回答
0
在和我的同事们讨论后,发现describe_db_instances是一个可以被阻塞的函数,但像@user4815162342提到的那样,为每个区域创建多个线程确实有帮助,具体可以参考这里。所以我对代码做了一些修改:
def get_session(profile):
session = boto3.Session(profile_name=profile)
return session
def get_clients(session):
regions = session.get_available_regions('rds')
clients = [session.client('rds', region_name=region) for region in regions]
return clients
async def get_instances(client, engine_types):
output = []
loop = asyncio.get_event_loop()
try:
response = await loop.run_in_executor(None, client.describe_db_instances)
output.append(response['DBInstances'])
except (ClientError, Exception) as e:
print(e)
return output
async def main():
... some arguments definition
session = get_session(profile_name)
clients = get_clients(session)
reports = await asyncio.gather(*[get_instances(client, engine) for client in clients])
... process report
if __name__ == "__main__":
asyncio.run(main())