将Asyncio API调用限制为n个同时会话信号量,不加载超过1个线程

2024-04-29 22:50:10 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在尝试使用25-40个线程(取决于调用持续时间)异步运行约10000个API调用(下面的示例只有52个调用)。我正在努力处理异步和信号量逻辑

请注意,API本身是异步的,可以接受来自单个IP的同时调用

几个问题:

  • 尽管定义了5个异步IO线程,但只有1个线程在运行。我能够在<;40个调用,但当试图集成信号量以限制同时调用的数量时,我注意到只有1个线程正在运行
  • 调试时,我确实看到“任务”中填充了对API调用的所有响应。我可以让它们打印出来,但我不知道如何将它们加载到异步def中的csv中。谢谢你在这方面的帮助

具有以下限制的API。API库文档可以在here中找到。有关限额的信息如下,但可在here找到

  • 每个IP每10秒的最大请求数:100
  • 单个RPC每个IP每10秒的最大请求数:40
  • 每个IP的最大并发连接数:40
  • 每IP每10秒的最大连接速率:40
  • 每30秒的最大数据量:100 MB
import asyncio
import time
from solana.rpc.async_api import AsyncClient

sig_test = ['MCNpCvPMq1MU9yGXoDZVcJj3AxpdXcRj7KBqteoeNAAhoDJxHn4F1sd9zPL7sYaAY6jyJzjvxrvwhLqHn9Q54BA',
 '38vMvcVJ6VRebYM8hk3hBDiumbkxGhtU4TTbhSYfuTKw7PGgYeT9KxucA5mKRrUA6YGApbqXie5VtFZCVFA3oJKU',
 '67agnhzgtHN5t2dQa86kSLPWxNzU3qoiidG7DCYksZWt8nNCahKijQd4Cgq4S3Fc1qWiB9tvEpo19Vkimw1NGepD',
 '3ADAbcdt43cnG7iQpDFdNufSQtLz6KX7W4SSyRHfpgwZ6Dt6P4HDRrNc6gV8ZibqP8uNLz6bumzTiydjkaYhhc6M',
 '4hpuQETvckYMaBGTfaY9Sv7cPYjrsmxTttnyDSpW6jWPHfHrwL7FpuChoG2vthGvPW5NjLYz3zrcom7VGiY1AmBE',
 '5Wi3JUtxAd79xAjQP8VowZwtfZgBYgoqdRYKTWMQ988by2QcagHgJAchu9qHSk63GC6gKGL8FEMVJXFcYUQFbemJ',
 '575NbibE2PiCe3DYvpo2CsCEdszTqwpc3Ntu6T37RN3QyuMSCX9NFnHJ1Wt33sPowDanPsNPrv2FMntDcq1cUwj6',
 '2zmpafyBDrV1MVjDtjuBzAcCx6brhweZ2E4TAxAjFxw3DrDusgf2DiEP7DWLePijKopiQ5srzP6LyMomAwTCeC4U',
 'dNYAYZPKj3tK6CnzRfMuEjScKT9CG1uiZGTgZtYMWRK2626su2xsEZYkTXgpky6WGJgUozDjYR9UdqZn7StjYhz',
 'buRdnipbHbDPuPaHfFzLrganh3SGyMHajY7pTTBMmubg8JDXtPb1QEEExDX76z4nB7y7y5hhLWrtKZLXPNU7ZQN',
 '5DRF4NPawm58R186F7pXRNN13Bkt6XptNXZbmvTAXX3JqdihH3piv7Pdbg33Nn5VsLjwGGLDBoUXgSnPWdnpRxzB',
 '5eyR7BohkxWVfczW2WrQyWAUTJDq13DMEMLz8WBZuZWREiv1KpCoJ2zuH82pvxrD2p2AwsPYBX9WBWGxfndG76fH',
 '2y8GvEvox7WSa83gkZzSzvew5BdbvLBqwwA7DFxh9aJHtb1mfwoGCc25A99py8S5LhJTyPCqWfLZdLdtY8kQkfsP',
 '5DPH9stZB1TA3haYUDB2gQHQ2jwusMYj7e4USYzJy97jzhJKQnd39vQhm5FaJEop53Moi5kkv5jJVmLgwNLNufve',
 '4EHvM4bAi4aMqFjTpKRUSwmr6JhaQs3PicD1tzSYfjAiVCdWNiUPBpnVtZXFqbE2oWJ7yX5GM5UK4j2jsySWAfg8',
 'PY2nKPxUgtgSb63XgSdGyupntTofcue6aR6q1kQdvHvucK3Z41fK3emmmb9uWfxFBEgWqMLjzDZJQAkddAK5zfG',
 'QQwnLpUrgUiepd8ExwYJmrckfHSXnUcviqbqLkRiGGPDHqyqnkGxiDCEZ2JpJnv2p8cgjuKgjmK8cHRaUyD1EAe',
 '3Po2STminrxr4xe9NV8fpL8eUsMhUTk77RK3wdfwAgTt3MDDX9Far2WQ48t7Kt28RneDonsfLA4sTBH48SBs1jgi',
 '3GW4fQGMWqXZ6SnGtGG89WmhqU3ZUoRF3XqhxVXcDedKBFVZKBmA5xBr7aiiNrKs13sSYVq3o9RnVzJm6HYTkJ2x',
 '2bf9y6he2o1MFw3FeshWVesvDZfhUaRKZ2ZbcqgEUVmHvc8uNtkw4yKGL1Q4aoFhoXhuNcSgcT1mNxeYD4md2RqY',
 '5ezKVuMtHQRRmWF5kGeTgEdeXE8B7YtbYZutzqMjCX9fhTqmqWa3UPaAA6ct9LtFKk8W9wtSmRDWDooVHsGNC5M5',
 '2TznhDsRzBAemc6PpF6qgD9BpqCey3UojXEkzc9tZcoEfLPmzFUM1dMmQSB1cQV395zEhX1rJy27mfvYMCHctgxf',
 '3Bw8H5tDiPVqq9fBTEZ9XcEsy8giBbdfUpiqd1XBiwgLvb7zEc723z9QF3LNoKbqRY3ghLb2NgHCHveF73GHCqjk',
 '4MY8E7C5Wn3Rib1Cw9DmyEApbvgkVUsidY73wR7WdkwWJkB11oHmz7t8x6L8fa18hk6UocCCZFRym8TH3P38eu4S',
 '4idQrw58dEPQQYcv7aNyGRogNScxXrfN48w9E6izVxHXVCCD3NgCizW9C7rpkVeca4RiXgr98wTwKBKRSMqhqXi7',
 '3Z41paock2PnDT74XX883agVg1SHRFWive5VtF7pDT3ZLL6yquQGBidy3VWE5BNsZgkb2ndoCqmJFsgNFoNeiq9b',
 '4G1epgzCqgYDC6ywYb9nkuxk2Rs41pnKZGnzDqt1uwwPbX37nH1Y5WD2g1V9Dsxvcmi4VTgxQCEvX6rWBMx8DggB',
 '4mLizKPd5NczyGdCbL5X4pjxSjjoTybkkoptu5MF3Ge35BerB2tjrQ3JnzJZ11iLXW3jAFj1kyi6qxU4DTW8d9Yp',
 '2Wd57E7DVYRJFA9GmzuVYZtmM6wgMAsWN9fLMiwkvgYDEBXBPR2NjJ2xcnrX9agQxDh2B8SumBAeACAVmgPKYBKP',
 'rXvfJDidDUypE3MoG4pQxMB959yFJsr42WgVw8LAvxMvmLLur5ZxadkKW9haYiK3ZgqyCTFUqGFpTPw3wfMtFtG',
 '32GPaWfijxFsUniZv22Gx5o2GhJYMPhaJ1h8PVNbwbkBJMBoVrCG21mu1drmwqPMW91juCoKUFDhAz9DBXCcZzBn',
 '5puj2WhmkoUhZ6gWp3hwXqTum47cdh64iwDsYxPtkuySHRFpFoW45tEs1hHPboW8Vyns47tT3GSjLbBeja3wA1b5',
 'RcPKfGUWcd9qfyBi3jBk4VKiHDS2DnJwbRcuNBKe7gDtrmXqzXZF21agCGQHFHh3BdZ4GwXeZcKoj6pMWoeKva1',
 '5Kv4LX2KP7Rh2BVUSNe5XRT3QSj3cATK1oTSsYZFqWjCaRnHjtPNgxoqca89FDjJhzMSteikX4WU5RZpfMEBvZa2',
 '4AQdiQsx2e8GPs6uA7eRxqBUCtZiihtpen3MYahvPwjknboRFkMyg2AfbxxSQAL3znXkYG4CCHSddRj5ZAhrgAQh',
 'brtYdPqsHdAVCiXWbbGNaUrSrR54rveT9qr7suR9E3Yww2Z5hJSTt2j47rPw2aLS5Lu9bd6Xx5k3fzqWvWXetrt',
 '2tWA5dPd4CANiZJRbyPVpcS5VyQbnyYEBZ5JNew9BvtYEgEhueofkSwCqdSgG92Nqr7WvbUCpdY4dfFVLXFCq9Xn',
 '4nANJMRA7RtaCM1Tab6qk16MAsn9yMSAhdxM5g9X9ezgKZN8WRJArQFMP33PnGwd2zCVrZuWDhQCWaS9sGFUpKE2',
 '4NwBNYW4JxaR4G7Be8U6S2ExtavJgydt1ARg8cSzNypoe79dSESsAwfwZFwjtkjC1ueu9fb6Z5ES2due9ZGbpGmT',
 '4x9BMG4PbhnxDbjiTRaG5bU1dePydmJ7uZjYZQYnghtTQvXfeY2jKGECGWDkctRZUpo8pWTer6CCZCra1d6jZgSZ',
 '4GsgGiRnxADYFReDLkqbGPq15RotQFWNcMgcctvRyazqQVjTQ9jB4kybsyRaaACibQZihFRuBtBouf4V8sHFxPa3',
 '4HzSehzpX5a532GihQ4RT791ZtqUVFUWEr1o9eosZqknKjevic5NQGoAnZsaWV2ijHF13EsLR5cW4GHdQyAbXFhM',
 '5r5V53LLKoRu34qjFZDKbgunToEC6N9YC3dkrrG7FqvxM4SYATPW31GusGvAb8KYwHEXjMv1d5T8oTLrLNQBx6Nw',
 '4oCDgRZqLcxSreD9e7R2dRtybLcndkjjihkMzLRgHC2stidUPmNX2RgPwe8oWcR57UTNj1iZ2BwL5eKu8sTaJsmG',
 '5CDAnzE2ZB7SrMqfuct9etnxjnaJCXK5AbGQGG3wXHWLUTPBGBDRweB4GjNrj6gufCKPfbMSRgF8XR3FD5eXfyKd',
 '63qZzbL5WtTSd1PX2RYjgHXr8EryGMCYTwkCXTYK7sGJkpMVKHg9hsBTUoCGM3XBEj2ERyD6k3VXDRmeJF4j4yhf',
 '2a2m5WgtXKeybJhCCqrCAy6yuwjcPXUuKKSULkNN4J9LwcJuuU38epkCaDhufQTV9qkGJA11Jb7aXS7FQ5qXBpC2',
 'CFam6WaB5rJSymirxDTzHWsXbKoTjis2LwB2Wjf9w93UZYXGUUmk4vJ5GVxPrZttm24JbWYCG8KdYzbAdJD23aw',
 '4qLdyCErDex8VazQYpVZmNAhg3nWewg5FXb4BmaJUBSSMZoXKiryKMxjiRKnCumoCFhkcxWbr1ThVEe7B2xxj4TU',
 '5qzeiTKBAWJ8GTFT41zNiHcjn62Wcth9GQB4FddGg9TnhcqrXhTqgQoAnY7UP8tJe2Hr8FvyDB6Ev7YG6UtLd57q',
 '36FkBHkK2jPK4VMyfa9iobiiYhhMgL66yKT9k9ckX1e2z6cAaVndQamHMgYs9Sdfp6QN5rE79ENy2Vooiz8mBdVn',
 '4zgM6nGzw7UYqZVrXM4hBac725c53PXw2NWWs8wnvRTYvvHmtuiNsJUMx8BSiRRpoDyfMzMhqdBEHQSksouDThMC']


start = time.time()

async def get_signatures_with_sem_async(sem, signature, session):
    async with sem:
        result = await session.get_confirmed_transaction(signature)
        return result

async def get_signatures_async(signatures, max_threads, delay):
    tasks = []
    sem = asyncio.Semaphore(max_threads) 
    async with sem:
        async with AsyncClient("https://api.mainnet-beta.solana.com") as session: 
            for sig in signatures:
                if delay:
                    await asyncio.sleep(delay)
                task = await asyncio.ensure_future(get_signatures_with_sem_async(sem, sig, session))
                tasks.append(str(task))
            return print(tasks)
    

asyncio.run(get_signatures_async(signatures=sig_test,max_threads=5,delay=1.1))

end = time.time()
total_time = end - start

print("It took {} seconds".format(total_time))

Tags: importipapiasynciogetasynctimesession