使用单例管理发出类似请求的多个类

2024-04-26 03:06:59 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一个应用程序,它可以向一个webapi发出数百万个请求,我目前的做法很有效,但效果不好,而且很笨重。我现在正在重写它,但基本上,我有多个类需要向API发出请求,它们需要并发运行。API约束只允许我生成~10 requests per second per API token,但我可以生成许多tokens。你知道吗

顺便说一句,几个月前我确实问了this very similair but less involved一个问题,从那以后这个项目就被搁置了,我从问这个问题中学到了更多。那么看看你自己的判断力吧哈哈

目前,(旧方法)我已经为每个类分配了自己的一组令牌。这对于我的案例来说是低效的,正如我所想的,为什么不让所有类调用一个管理所有令牌的Singleton,这样它们就不会被频繁使用,为每个人执行所有请求,然后使用回调来响应呢。你知道吗

我一直在做一些主要的挖掘和阅读,在经历了许多麻烦之后,我在以下资源之间拼凑出了一个稍微可行的原型:

我想到的是:

import json
import uvloop
import aiohttp
import AsyncLeakyBucket

from pprint import pprint
from aiohttp import ClientSession
from itertools import cycle

loop = uvloop.new_event_loop()

class Singleton(type):
    _instances = {}
    def __call__(cls, *args, **kwargs):
        if cls not in cls._instances:
            cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs)
        return cls._instances[cls]

class API(type, metaclass=Singleton):

    def __init__(self, base_url):
        super(API, self).__init__()
        self._tokens = ('list', 'of', 'tokens')
        num_tokens = len(self._tokens)
        self._tokens = cycle(self._tokens)
        self._base_url = base_url
        num_concurrent_per_token = 10
        total_concurrent = num_concurrent_per_token*num_tokens
        self._bucket = AsyncLeakyBucket(10*total_concurrent)
        self._queue = asyncio.PriorityQueue()

        asyncio.run(_start_workers(total_concurrent))

    async def _start_workers(self, num):
        session = asyncio.get_event_loop().run_until_complete(aiohttp.ClientSession())
        loop.run_until_complete(asyncio.gather(
            *(_get_requests(session, queue) for _ in range(num))
        ))

    async def _get_request(self, session, queue):
        api_headers = {
            'Accept': "application/json",
            'Authorization': "Auth {}"
        }
        while True:
            url_ext, callback = await queue.get()

            with self._bucket:
                try:
                    async with session.get(f"{self._base_url}{url_ext}",
                                           headers=api_headers.format(next(self.tokens))) as response:
                        status = response.status
                        data = await response.read()
                        try:
                            data = json.loads(r)
                        except json.decoder.JSONDecodeError:
                            print(f"Error loading {data} as JSON")
                            #return
                except aiohttp.ClientError as e:
                    print(f"Error retrieving request: {e}")
                    #return

            queue.task_done()
            asyncio.create_task(callback(data))

    async def get(self, url_extension, callback, pri=1):
        await self._queue.put(priority, (url_ext, callback))

async def request_done(data):
    pprint(data)

a = API('https://jsonplaceholder.typicode.com/')

for num in range(500):
    a.get(f"/comments/{num}", request_done)

我收到一个错误,无法测试此代码的其余部分:

Traceback (most recent call last):
  File "api.py", line 73, in <module>
    a = API('https://jsonplaceholder.typicode.com/')
  File "api.py", line 15, in __call__
    cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs)
TypeError: type.__new__() takes exactly 3 arguments (1 given)

从字面上说,坐下来写这门课作为一个整体与我的单一和线程有限的知识,现在我卡住了,不想打破它太远。我不知道有多少是工作,所以希望代码和我的解释得到我的意图充分理解。你知道吗

顺便说一句,我只是用https://jsonplaceholder.typicode.com/来测试基本功能,因为你可能知道其他API需要头和完全不同的地址。。。你知道吗

一如既往,提前感谢大家,期待社会各界的回音,如果途中有任何问题,我都会在这里!你知道吗


Tags: instancesimportselfapiasynciourldataget