如何使用aiohttp进行速率限制?
我这周开始学习Python,作为第一个项目,我选择开发一个简单的应用程序,从Riot API获取数据,处理后再插入到mySQL数据库中。我成功地让它同步工作,没有遇到问题,这对项目来说已经足够了,因为这个API每秒最多只能调用20次,120秒内最多调用100次,而在40秒内就已经达到了这个限制。
不过,我想进一步改进一下,因为有可能获得更好的密钥,这样可以提供比我现在使用的更高的调用限制。
我尝试使用多线程,设置了一个线程执行器,里面有5个工作线程,每个线程负责获取一场比赛的数据,从连接池中获取连接,插入数据到数据库,然后再获取下一场可用的比赛进行抓取。但是这种方法看起来不太理想,因为它插入的数据量太小,重复了很多次。我的代码是用一个单独的抓取函数来处理的,这个函数接收一个由其他函数生成的URL,代码结构大致是这样的:
def fetch(url):
while True:
response = session.get(url, headers={"X-Riot-Token": f"{api_key}"})
if response.status_code == 200:
return response.json()
elif response.status_code == 429:
retry_after = int(response.headers.get("Retry-After", 1))
time.sleep(retry_after+1)
else:
return None
这个结构差不多是这样,while循环里面有一个try except。这个while循环的作用是,如果遇到调用限制的请求,它会等到超时结束后再继续请求。
在使用多线程时,这个方法有效,我还把重试次数设置为全局变量,并在请求之前检查是否受到了调用限制,但速度还是不够快,仍然在调用限制开始后进行了几次请求,这可能会导致被这个API列入黑名单。
我尝试使用asyncio和aiohttp,这样速度快多了,但还是达到了每秒20次和每120秒100次的限制,请求依然在继续。
我看到有人提到信号量,但我不太明白它是怎么工作的。是不是应该创建两个信号量,一个对应每个调用限制,在请求时获取信号量,执行asyncio.sleep,然后再相应地释放?这样下一个请求还会继续吗?我在这里看到其他帖子,但没有一个详细讲解如何处理两个不同的调用限制。
基本的结构应该是:同步请求获取比赛列表 -> 异步请求每场比赛获取相应数据 -> 异步请求获取每位比赛玩家的信息 -> 处理后将所有数据插入数据库。
1 个回答
信号量可以更好地限制同时进行的任务数量。如果你想控制请求的频率,可以简单地设置一个计数器,当计数器到达0时就阻止新的请求,并在设定的时间到达时重置计数器。(我还用过Gubernator这个外部服务来限制请求频率,它在处理更复杂的情况时可能会很有用)。
所以,作为一个快速尝试,可能可以这样做:
class RateLimit:
def __init__(self, limits, reset_times):
self._limits = limits
self._reset_times = reset_times
self._counts = list(limits)
async def limit(self):
while True:
if all(c > 0 for c in self._counts):
for i in range(len(self._counts)):
self._counts[i] -= 1
return
await self._waiter.wait()
async def __aenter__(self):
self._task = asyncio.create_task(self._reset())
return self
async def __aexit__(self, ...):
self._task.cancel()
await self._task
async def _reset(self):
times = list(self._reset_times)
while True:
self._waiter = asyncio.Event()
delay = min(times)
try:
await asyncio.sleep(delay)
except asyncio.CancelledError:
break
for i in range(len(times)):
times[i] -= delay
if times[i] <= 0:
self._counts[i] = self._limits[i]
times[i] += self._reset_times[i]
self._waiter.set()
async with RateLimiter((100, 20), (120, 1)) as rl:
await rl.limit() # Before each request
为了让这个方法更稳健,可能需要使用一个队列,这样请求就能始终按照相同的顺序进行。
另外,看看现有的库也是值得的(快速搜索一下可以找到aiolimiter和asynciolimiter),不过我不太确定这些库在处理多个限制时能否可靠地配合使用(比如两个限制器能否可靠地组合在一起)。