如何在Flask-Restful中使用Flask-Cache
我该如何在Flask-Restful中使用Flask-Cache的@cache.cached()装饰器呢?举个例子,我有一个叫Foo的类,它是从Resource这个类继承来的,Foo类里面有get、post、put和delete这些方法。
那么,在我执行了一个POST
操作后,怎样才能让缓存的结果失效呢?
@api.resource('/whatever')
class Foo(Resource):
@cache.cached(timeout=10)
def get(self):
return expensive_db_operation()
def post(self):
update_db_here()
## How do I invalidate the value cached in get()?
return something_useful()
6 个回答
来自@JahMyst的回答对我没有用。Flask-Cache和Flask的restful框架不兼容。根据他们的文档,@cache.Cached和@cache.memoize无法处理可变对象。
Using mutable objects (classes, etc) as part of the cache key can become tricky. It is suggested to not pass in an object instance into a memoized function. However, the memoize does perform a repr() on the passed in arguments so that if the object has a __repr__ function that returns a uniquely identifying string for that object, that will be used as part of the cache key.
我不得不自己想出一个解决方案。把这个代码片段留在这里,以防其他人遇到同样的问题。
cache_key
函数将用户请求转换成哈希值。cache_res_pickled
函数用于对数据进行序列化或反序列化。
|-flask-app
|-app.py
|-resource
|--some_resource.py
import json
import logging
import pickle
import time
import urllib
from flask import Response, abort, request
from redis import Redis
redis_client = Redis("127.0.0.1", "6379")
exp_setting_s = 1500
def json_serial(obj):
"""
JSON serializer for objects not serializable by default json code"
Args:
obj: JSON serialized object for dates
Returns:
serialized JSON data
"""
if isinstance(obj, datetime.datetime):
return obj.__str__()
def cache_key():
""" ""
Returns: Hashed string of request made by the user.
"""
args = request.args
key = (
request.path
+ "?"
+ urllib.parse.urlencode(
[(k, v) for k in sorted(args) for v in sorted(args.getlist(k))]
)
)
key_hashed = hashlib.sha256(key.encode())
return key_hashed.hexdigest()
def cache_res_pickled(data, encode):
"""
Args:
data (dict): Data in dict format
encode (Boolean): Encode (true) or decode (false) the data
Returns: Result after pickling
"""
if encode:
return pickle.dumps(data)
else:
data = pickle.loads(data)
return data
class SomeResource(Resource):
@auth.login_required
def get(self):
# Get the key for request in hashed format SHA256
key = cache_key()
result = redis_client.get(key)
def generate():
"""
A lagging generator to stream JSON so we don't have to hold everything in memory
This is a little tricky, as we need to omit the last comma to make valid JSON,
thus we use a lagging generator, similar to http://stackoverflow.com/questions/1630320/
"""
releases = res.__iter__()
try:
prev_release = next(releases) # get first result
# We have some releases. First, yield the opening json
yield '{"data": ['
# Iterate over the releases
for release in releases:
yield json.dumps(prev_release, default=json_serial) + ", "
prev_release = release
logging.info(f"For {key} # records returned = {len(res)}")
# Now yield the last iteration without comma but with the closing brackets
yield json.dumps(prev_release, default=json_serial) + "]}"
except StopIteration:
# StopIteration here means the length was zero, so yield a valid releases doc and stop
logging.info(f"For {key} # records returned = {len(res)}")
yield '{"data": []}'
if result is None:
# Secure a key on Redis server.
redis_client.set(key, cache_res_pickled({}, True), ex=exp_setting_s)
try:
# Do the querying to the DB or math here to get res. It should be in dict format as shown below
res = {"A": 1, "B": 2, "C": 2}
# Update the key on Redis server with the latest data
redis_client.set(key, cache_res_pickled(res, True), ex=exp_setting_s)
return Response(generate(), content_type="application/json")
except Exception as e:
logging.exception(e)
abort(505, description="Resource not found. error - {}".format(e))
else:
res = cache_res_pickled(result, False)
if res:
logging.info(
f"The data already exists! loading the data form Redis cache for Key - {key} "
)
return Response(generate(), content_type="application/json")
else:
logging.info(
f"There is already a request for this key. But there is no data in it. Key: {key}."
)
s = time.time()
counter = 0
# loops aimlessly till the data is available on the Redis
while not any(res):
result = redis_client.get(key)
res = cache_res_pickled(result, False)
counter += 1
logging.info(
f"The data was available after {time.time() - s} seconds. Had to loop {counter} times."
)
return Response(generate(), content_type="application/json")
在编程中,有时候我们会遇到一些问题,比如代码运行不正常或者出现错误。这些问题可能是因为我们没有正确理解某些概念或者使用了不合适的代码。
当你在编写代码时,确保你理解每一行代码的作用。如果你不明白某个部分,可以查找相关资料或者向别人请教。这样可以帮助你更快地找到问题所在,并且提高你的编程能力。
此外,调试也是一个重要的技能。调试就是找出代码中的错误并修复它们。你可以通过逐行检查代码、添加打印信息或者使用调试工具来帮助你找到问题。
总之,编程是一个不断学习和实践的过程,遇到问题时不要气馁,保持耐心,逐步解决它们。
##create a decarator
from werkzeug.contrib.cache import SimpleCache
CACHE_TIMEOUT = 300
cache = SimpleCache()
class cached(object):
def __init__(self, timeout=None):
self.timeout = timeout or CACHE_TIMEOUT
def __call__(self, f):
def decorator(*args, **kwargs):
response = cache.get(request.path)
if response is None:
response = f(*args, **kwargs)
cache.set(request.path, response, self.timeout)
return response
return decorator
#add this decarator to your views like below
@app.route('/buildingTotal',endpoint='buildingTotal')
@cached()
def eventAlert():
return 'something'
@app.route('/buildingTenants',endpoint='buildingTenants')
@cached()
def buildingTenants():
return 'something'
你可以使用 cache.clear()
方法来清除缓存。
想了解更多细节,可以查看这个链接:https://pythonhosted.org/Flask-Cache/#flask.ext.cache.Cache.clear,还有在 https://pythonhosted.org/Flask-Cache/ 中的清除缓存部分。
是的,你可以那样使用。
也许你还需要看看这个链接:flask-cache 如何缓存 URL 查询字符串参数
因为 Flask-Cache
的实现不让你直接访问底层的 cache
对象,所以你需要自己创建一个 Redis
客户端,并使用它的 keys
方法来列出所有的缓存键。
cache_key
方法是用来替换你在cache.cached
装饰器中默认的键生成方式的。clear_cache
方法只会清除与当前资源相关的那部分缓存。
这个解决方案只在 Redis
上测试过,使用其他缓存引擎时,具体实现可能会有些不同。
from app import cache # The Flask-Cache object
from config import CACHE_REDIS_HOST, CACHE_REDIS_PORT # The Flask-Cache config
from redis import Redis
from flask import request
import urllib
redis_client = Redis(CACHE_REDIS_HOST, CACHE_REDIS_PORT)
def cache_key():
args = request.args
key = request.path + '?' + urllib.urlencode([
(k, v) for k in sorted(args) for v in sorted(args.getlist(k))
])
return key
@api.resource('/whatever')
class Foo(Resource):
@cache.cached(timeout=10, key_prefix=cache_key)
def get(self):
return expensive_db_operation()
def post(self):
update_db_here()
self.clear_cache()
return something_useful()
def clear_cache(self):
# Note: we have to use the Redis client to delete key by prefix,
# so we can't use the 'cache' Flask extension for this one.
key_prefix = request.path
keys = [key for key in redis_client.keys() if key.startswith(key_prefix)]
nkeys = len(keys)
for key in keys:
redis_client.delete(key)
if nkeys > 0:
log.info("Cleared %s cache keys" % nkeys)
log.info(keys)