我有一个应用程序,它可以向一个webapi发出数百万个请求,我目前的做法很有效,但效果不好,而且很笨重。我现在正在重写它,但基本上,我有多个类需要向API发出请求,它们需要并发运行。API约束只允许我生成~10 requests per second per API token
,但我可以生成许多tokens
。你知道吗
顺便说一句,几个月前我确实问了this very similair but less involved一个问题,从那以后这个项目就被搁置了,我从问这个问题中学到了更多。那么看看你自己的判断力吧哈哈
目前,(旧方法)我已经为每个类分配了自己的一组令牌。这对于我的案例来说是低效的,正如我所想的,为什么不让所有类调用一个管理所有令牌的Singleton,这样它们就不会被频繁使用,为每个人执行所有请求,然后使用回调来响应呢。你知道吗
我一直在做一些主要的挖掘和阅读,在经历了许多麻烦之后,我在以下资源之间拼凑出了一个稍微可行的原型:
我想到的是:
import json
import uvloop
import aiohttp
import AsyncLeakyBucket
from pprint import pprint
from aiohttp import ClientSession
from itertools import cycle
loop = uvloop.new_event_loop()
class Singleton(type):
_instances = {}
def __call__(cls, *args, **kwargs):
if cls not in cls._instances:
cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs)
return cls._instances[cls]
class API(type, metaclass=Singleton):
def __init__(self, base_url):
super(API, self).__init__()
self._tokens = ('list', 'of', 'tokens')
num_tokens = len(self._tokens)
self._tokens = cycle(self._tokens)
self._base_url = base_url
num_concurrent_per_token = 10
total_concurrent = num_concurrent_per_token*num_tokens
self._bucket = AsyncLeakyBucket(10*total_concurrent)
self._queue = asyncio.PriorityQueue()
asyncio.run(_start_workers(total_concurrent))
async def _start_workers(self, num):
session = asyncio.get_event_loop().run_until_complete(aiohttp.ClientSession())
loop.run_until_complete(asyncio.gather(
*(_get_requests(session, queue) for _ in range(num))
))
async def _get_request(self, session, queue):
api_headers = {
'Accept': "application/json",
'Authorization': "Auth {}"
}
while True:
url_ext, callback = await queue.get()
with self._bucket:
try:
async with session.get(f"{self._base_url}{url_ext}",
headers=api_headers.format(next(self.tokens))) as response:
status = response.status
data = await response.read()
try:
data = json.loads(r)
except json.decoder.JSONDecodeError:
print(f"Error loading {data} as JSON")
#return
except aiohttp.ClientError as e:
print(f"Error retrieving request: {e}")
#return
queue.task_done()
asyncio.create_task(callback(data))
async def get(self, url_extension, callback, pri=1):
await self._queue.put(priority, (url_ext, callback))
async def request_done(data):
pprint(data)
a = API('https://jsonplaceholder.typicode.com/')
for num in range(500):
a.get(f"/comments/{num}", request_done)
我收到一个错误,无法测试此代码的其余部分:
Traceback (most recent call last):
File "api.py", line 73, in <module>
a = API('https://jsonplaceholder.typicode.com/')
File "api.py", line 15, in __call__
cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs)
TypeError: type.__new__() takes exactly 3 arguments (1 given)
从字面上说,坐下来写这门课作为一个整体与我的单一和线程有限的知识,现在我卡住了,不想打破它太远。我不知道有多少是工作,所以希望代码和我的解释得到我的意图充分理解。你知道吗
顺便说一句,我只是用https://jsonplaceholder.typicode.com/来测试基本功能,因为你可能知道其他API需要头和完全不同的地址。。。你知道吗
一如既往,提前感谢大家,期待社会各界的回音,如果途中有任何问题,我都会在这里!你知道吗
目前没有回答
相关问题 更多 >
编程相关推荐