如何使用aiohttp进行速率限制?

0 投票
1 回答
28 浏览
提问于 2025-04-13 03:15

我这周开始学习Python,作为第一个项目,我选择开发一个简单的应用程序,从Riot API获取数据,处理后再插入到mySQL数据库中。我成功地让它同步工作,没有遇到问题,这对项目来说已经足够了,因为这个API每秒最多只能调用20次,120秒内最多调用100次,而在40秒内就已经达到了这个限制。

不过,我想进一步改进一下,因为有可能获得更好的密钥,这样可以提供比我现在使用的更高的调用限制。

我尝试使用多线程,设置了一个线程执行器,里面有5个工作线程,每个线程负责获取一场比赛的数据,从连接池中获取连接,插入数据到数据库,然后再获取下一场可用的比赛进行抓取。但是这种方法看起来不太理想,因为它插入的数据量太小,重复了很多次。我的代码是用一个单独的抓取函数来处理的,这个函数接收一个由其他函数生成的URL,代码结构大致是这样的:

def fetch(url):
    while True:
        response = session.get(url, headers={"X-Riot-Token": f"{api_key}"})
        if response.status_code == 200:
            return response.json()
        elif response.status_code == 429:
            retry_after = int(response.headers.get("Retry-After", 1))
            time.sleep(retry_after+1)
        else:
            return None

这个结构差不多是这样,while循环里面有一个try except。这个while循环的作用是,如果遇到调用限制的请求,它会等到超时结束后再继续请求。

在使用多线程时,这个方法有效,我还把重试次数设置为全局变量,并在请求之前检查是否受到了调用限制,但速度还是不够快,仍然在调用限制开始后进行了几次请求,这可能会导致被这个API列入黑名单。

我尝试使用asyncio和aiohttp,这样速度快多了,但还是达到了每秒20次和每120秒100次的限制,请求依然在继续。

我看到有人提到信号量,但我不太明白它是怎么工作的。是不是应该创建两个信号量,一个对应每个调用限制,在请求时获取信号量,执行asyncio.sleep,然后再相应地释放?这样下一个请求还会继续吗?我在这里看到其他帖子,但没有一个详细讲解如何处理两个不同的调用限制。

基本的结构应该是:同步请求获取比赛列表 -> 异步请求每场比赛获取相应数据 -> 异步请求获取每位比赛玩家的信息 -> 处理后将所有数据插入数据库。

1 个回答

0

信号量可以更好地限制同时进行的任务数量。如果你想控制请求的频率,可以简单地设置一个计数器,当计数器到达0时就阻止新的请求,并在设定的时间到达时重置计数器。(我还用过Gubernator这个外部服务来限制请求频率,它在处理更复杂的情况时可能会很有用)。

所以,作为一个快速尝试,可能可以这样做:

class RateLimit:
    def __init__(self, limits, reset_times):
        self._limits = limits
        self._reset_times = reset_times
        self._counts = list(limits)

    async def limit(self):
        while True:
            if all(c > 0 for c in self._counts):
                for i in range(len(self._counts)):
                    self._counts[i] -= 1
                return
            await self._waiter.wait()

    async def __aenter__(self):
        self._task = asyncio.create_task(self._reset())
        return self

    async def __aexit__(self, ...):
        self._task.cancel()
        await self._task

    async def _reset(self):
        times = list(self._reset_times)
        while True:
            self._waiter = asyncio.Event()
            delay = min(times)
            try:
                await asyncio.sleep(delay)
            except asyncio.CancelledError:
                break
            for i in range(len(times)):
                times[i] -= delay
                if times[i] <= 0:
                    self._counts[i] = self._limits[i]
                    times[i] += self._reset_times[i]

            self._waiter.set()

async with RateLimiter((100, 20), (120, 1)) as rl:
    await rl.limit()  # Before each request

为了让这个方法更稳健,可能需要使用一个队列,这样请求就能始终按照相同的顺序进行。

另外,看看现有的库也是值得的(快速搜索一下可以找到aiolimiter和asynciolimiter),不过我不太确定这些库在处理多个限制时能否可靠地配合使用(比如两个限制器能否可靠地组合在一起)。

撰写回答