Python中的异步性和线程池

2024-04-19 07:51:00 发布

您现在位置:Python中文网/ 问答频道 /正文

我的用例相当简单,或者至少我觉得它很简单,来自节点。。。但我就是不太明白这个问题。在这个问题的编码方面,我需要的帮助更少,更多的只是一个解释:我想要并行多次(尽可能多次)运行异步进程(来自aws-encryption-sdk的解密和加密函数)。我想我的主要问题是,最好的方法是什么?你知道吗

后续问题。。。在我看来,最好的方法是使用异步函数(asyncio格式)来处理解密和加密,由线程池而不是单个线程来处理。有时,我需要以尽可能快的速度运行这个函数2000次(显然不一定完全并行)。然而,即使这是正确的,我不知道如何做到这一点。你知道吗

非常感谢任何能帮助我的人。为了让你知道我在哪条轨道上,我非常考虑使用这个线程的答案:https://stackoverflow.com/a/29280606/5335646。然而,一些消息来源说这更多的是用于阻塞函数,而不是真正的异步函数,比如在我的例子中(稍后返回对AWS KMS的调用)。你知道吗


Tags: 方法函数答案awsasyncio编码节点进程
1条回答
网友
1楼 · 发布于 2024-04-19 07:51:00

如果您使用的是python3.5或更新版本,那么可以使用async/await语句。你知道吗

import asyncio
import multiprocessing
import random
import time
from concurrent.futures import ProcessPoolExecutor

count_received = 0
tests_count = 16


async def AWS_KMS_call(i, text, action='encrypt'):
    print(i, multiprocessing.current_process().name, action)
    await asyncio.sleep(random.random())  # emulate some work
    return text[::-1]


async def encrypt(i, text):
    encrypted = await AWS_KMS_call(i, text, action='encrypt')
    print(i, multiprocessing.current_process().name, encrypted, 'encrypted')
    return encrypted


async def decrypt(i, text):
    encrypted = await encrypt(i, text)
    decrypted = await AWS_KMS_call(i, encrypted, action='decrypt')
    print(i, multiprocessing.current_process().name, encrypted, 'decrypted')
    return decrypted


def pool_func(i, text):
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    decrypted = loop.run_until_complete(decrypt(i, text))
    print(i, multiprocessing.current_process().name, decrypted, 'pool_func exit')
    return i, decrypted


def got_result(future):
    global count_received
    global tests_count
    count_received += 1
    i, text = future.result()
    print(i, multiprocessing.current_process().name, text, 'done')

    if count_received == tests_count:
        loop.stop()


if __name__ == '__main__':
    text = 'some_data_to_encrypt'
    executor = ProcessPoolExecutor(16)

    loop = asyncio.get_event_loop()

    for i in range(tests_count):
        task = asyncio.ensure_future(loop.run_in_executor(executor, pool_func, i, text))
        task.add_done_callback(got_result)

    try:
        loop.run_forever()
    finally:
        loop.close()

为了清晰起见,我添加了几张照片,输出结果如下:

0 Process-1 encrypt
1 Process-3 encrypt
2 Process-5 encrypt
4 Process-2 encrypt
5 Process-4 encrypt
6 Process-6 encrypt
7 Process-9 encrypt
8 Process-7 encrypt
9 Process-10 encrypt
10 Process-8 encrypt
3 Process-15 encrypt
11 Process-11 encrypt
12 Process-12 encrypt
13 Process-14 encrypt
15 Process-13 encrypt
14 Process-16 encrypt
1 Process-3 tpyrcne_ot_atad_emos encrypted
1 Process-3 decrypt
6 Process-6 tpyrcne_ot_atad_emos encrypted
6 Process-6 decrypt
15 Process-13 tpyrcne_ot_atad_emos encrypted
15 Process-13 decrypt
5 Process-4 tpyrcne_ot_atad_emos encrypted
5 Process-4 decrypt
3 Process-15 tpyrcne_ot_atad_emos encrypted
3 Process-15 decrypt
9 Process-10 tpyrcne_ot_atad_emos encrypted
9 Process-10 decrypt
1 Process-3 tpyrcne_ot_atad_emos decrypted
1 Process-3 some_data_to_encrypt pool_func exit
1 MainProcess some_data_to_encrypt done
4 Process-2 tpyrcne_ot_atad_emos encrypted
4 Process-2 decrypt
11 Process-11 tpyrcne_ot_atad_emos encrypted
11 Process-11 decrypt
10 Process-8 tpyrcne_ot_atad_emos encrypted
10 Process-8 decrypt
10 Process-8 tpyrcne_ot_atad_emos decrypted
10 Process-8 some_data_to_encrypt pool_func exit
10 MainProcess some_data_to_encrypt done
6 Process-6 tpyrcne_ot_atad_emos decrypted
6 Process-6 some_data_to_encrypt pool_func exit
6 MainProcess some_data_to_encrypt done
8 Process-7 tpyrcne_ot_atad_emos encrypted
8 Process-7 decrypt
12 Process-12 tpyrcne_ot_atad_emos encrypted
12 Process-12 decrypt
2 Process-5 tpyrcne_ot_atad_emos encrypted
2 Process-5 decrypt
15 Process-13 tpyrcne_ot_atad_emos decrypted
15 Process-13 some_data_to_encrypt pool_func exit
15 MainProcess some_data_to_encrypt done
11 Process-11 tpyrcne_ot_atad_emos decrypted
11 Process-11 some_data_to_encrypt pool_func exit
11 MainProcess some_data_to_encrypt done
13 Process-14 tpyrcne_ot_atad_emos encrypted
13 Process-14 decrypt
4 Process-2 tpyrcne_ot_atad_emos decrypted
4 Process-2 some_data_to_encrypt pool_func exit
4 MainProcess some_data_to_encrypt done
7 Process-9 tpyrcne_ot_atad_emos encrypted
7 Process-9 decrypt
0 Process-1 tpyrcne_ot_atad_emos encrypted
0 Process-1 decrypt
14 Process-16 tpyrcne_ot_atad_emos encrypted
14 Process-16 decrypt
8 Process-7 tpyrcne_ot_atad_emos decrypted
8 Process-7 some_data_to_encrypt pool_func exit
8 MainProcess some_data_to_encrypt done
5 Process-4 tpyrcne_ot_atad_emos decrypted
5 Process-4 some_data_to_encrypt pool_func exit
5 MainProcess some_data_to_encrypt done
3 Process-15 tpyrcne_ot_atad_emos decrypted
3 Process-15 some_data_to_encrypt pool_func exit
3 MainProcess some_data_to_encrypt done
9 Process-10 tpyrcne_ot_atad_emos decrypted
9 Process-10 some_data_to_encrypt pool_func exit
9 MainProcess some_data_to_encrypt done
13 Process-14 tpyrcne_ot_atad_emos decrypted
13 Process-14 some_data_to_encrypt pool_func exit
13 MainProcess some_data_to_encrypt done
2 Process-5 tpyrcne_ot_atad_emos decrypted
2 Process-5 some_data_to_encrypt pool_func exit
2 MainProcess some_data_to_encrypt done
0 Process-1 tpyrcne_ot_atad_emos decrypted
0 Process-1 some_data_to_encrypt pool_func exit
0 MainProcess some_data_to_encrypt done
12 Process-12 tpyrcne_ot_atad_emos decrypted
12 Process-12 some_data_to_encrypt pool_func exit
12 MainProcess some_data_to_encrypt done
14 Process-16 tpyrcne_ot_atad_emos decrypted
14 Process-16 some_data_to_encrypt pool_func exit
14 MainProcess some_data_to_encrypt done
7 Process-9 tpyrcne_ot_atad_emos decrypted
7 Process-9 some_data_to_encrypt pool_func exit
7 MainProcess some_data_to_encrypt done

另外,我建议您查看celery库。也许它更符合你的要求

相关问题 更多 >