为什么results_ttl=0时Redis仍然填满?
问题:为什么即使工作结果被立即丢弃,Redis 还是会填满?
我在用 Redis 作为队列来异步创建 PDF,然后把结果保存到我的数据库里。因为已经保存了,所以我不需要在之后再访问这个对象,所以处理完后我不想在 Redis 中继续存储这个结果。
为了防止结果留在 Redis 中,我把 TTL
设置为 0
:
parameter_dict = {
"order": serializer.object,
"photo": base64_image,
"result_ttl": 0
}
django_rq.enqueue(procces_template, **parameter_dict)
问题是,尽管 Redis 的工作者说这个任务会立即过期:
15:33:35 Job OK, result = John Doe's nail order to 568 Broadway
15:33:35 Result discarded immediately.
15:33:35
15:33:35 *** Listening on high, default, low...
但是 Redis 还是会填满,并且抛出:
ResponseError: command not allowed when used memory > 'maxmemory'
我需要在 Redis 或 django-rq 中设置其他参数,以防止 Redis 填满,即使工作结果已经不再存储了吗?
更新:
根据这篇 帖子,我猜测内存可能是因为 Redis 中的失败任务而填满的。
使用这个代码片段:
def print_redis_failed_queue():
q = django_rq.get_failed_queue()
while True:
job = q.dequeue()
if not job:
break
print job
这是 Redis 中键的转储链接:
这个内容太长了,不方便在这里贴出来。它的大小似乎支持我的理论。但是使用:
def delete_redis_failed_queue():
q = django_rq.get_failed_queue()
count = 0
while True:
job = q.dequeue()
if not job:
print "{} Jobs deleted.".format(count)
break
job.delete()
count += 1
并没有像我预期的那样清空 Redis。我该如何获得更准确的 Redis 键的转储?我清理任务的方式正确吗?
2 个回答
1
你可以使用“黑洞”异常处理器,这个处理器来自于http://python-rq.org/docs/exceptions/,并且可以和job.cancel()
一起使用:
def black_hole(job, *exc_info):
# Delete the job hash on redis, otherwise it will stay on the queue forever
job.cancel()
return False
2
结果发现,Redis的存储空间被孤立的任务占满了,也就是说,这些任务没有被分配到特定的队列。
虽然孤立任务的原因不明,但用下面这段代码可以解决问题:
import redis
from rq.queue import Queue, get_failed_queue
from rq.job import Job
redis = Redis()
for i, key in enumerate(self.redis.keys('rq:job:*')):
job_number = key.split("rq:job:")[1]
job = Job.fetch(job_number, connection=self.redis)
job.delete()
在我的具体情况下,在每个任务完成后调用这段代码(实际上是下面的 delete_orphaned_jobs()
方法),可以确保Redis不会被填满,同时也能处理孤立的任务。想了解更多关于这个问题的细节,可以查看这个在 django-rq问题中的讨论。
在诊断这个问题的过程中,我还创建了一个 工具类,可以方便地检查和删除任务/孤立任务:
class RedisTools:
'''
A set of utility tools for interacting with a redis cache
'''
def __init__(self):
self._queues = ["default", "high", "low", "failed"]
self.get_redis_connection()
def get_redis_connection(self):
redis_url = os.getenv('REDISTOGO_URL', 'redis://localhost:6379')
self.redis = redis.from_url(redis_url)
def get_queues(self):
return self._queues
def get_queue_count(self, queue):
return Queue(name=queue, connection=self.redis).count
def msg_print_log(self, msg):
print msg
logger.info(msg)
def get_key_count(self):
return len(self.redis.keys('rq:job:*'))
def get_queue_job_counts(self):
queues = self.get_queues()
queue_counts = [self.get_queue_count(queue) for queue in queues]
return zip(queues, queue_counts)
def has_orphanes(self):
job_count = sum([count[1] for count in self.get_queue_job_counts()])
return job_count < self.get_key_count()
def print_failed_jobs(self):
q = django_rq.get_failed_queue()
while True:
job = q.dequeue()
if not job:
break
print job
def print_job_counts(self):
for queue in self.get_queue_job_counts():
print "{:.<20}{}".format(queue[0], queue[1])
print "{:.<20}{}".format('Redis Keys:', self.get_key_count())
def delete_failed_jobs(self):
q = django_rq.get_failed_queue()
count = 0
while True:
job = q.dequeue()
if not job:
self.msg_print_log("{} Jobs deleted.".format(count))
break
job.delete()
count += 1
def delete_orphaned_jobs(self):
if not self.has_orphanes():
return self.msg_print_log("No orphan jobs to delete.")
for i, key in enumerate(self.redis.keys('rq:job:*')):
job_number = key.split("rq:job:")[1]
job = Job.fetch(job_number, connection=self.redis)
job.delete()
self.msg_print_log("[{}] Deleted job {}.".format(i, job_number))