Django中异步任务与Redis的线程安全问题
我有一个用Django做的应用,它会调用一个异步任务来处理一组数据(使用celery)。这个任务会拿到这组数据,然后进行很多操作,这些操作可能会花费很长时间,具体取决于数据里的内容。因为数据可能会在不同的组中共享,所以用户可能会提交一个任务来处理一组数据,而这组数据里可能有一些内容正在被处理。这个新任务应该只处理那些还没有在运行的内容,并且在所有内容处理完成之前不返回结果。
我知道我说得有点复杂,所以想象一下下面的代码:
from time import sleep
import redis
from celery.task import Task
from someapp.models import InterestingModel
from someapp.longtime import i_take_a_while
class LongRunningTask(Task):
def run(self, process_id, *args, **kwargs):
_queryset = InterestingModel.objects.filter(process__id=process_id)
r = redis.Redis()
p = r.pipeline()
run_check_sets = ('run_check', 'objects_already_running')
# There must be a better way to do this:
for o in _queryset.values_list('pk', flat=True):
p.sadd('run_check')
p.sdiff(run_check_sets) # Objects that need to be run
p.sunion(run_check_sets) # Objects that we need to wait for
p.sunionstore('objects_already_running',run_check_sets)
p.delete('run_check')
redis_result = p.execute()
objects_to_run = redis_result[-3]
objects_to_wait_for = redis_result[-2]
if objects_to_run:
i_take_a_while(objects_to_run)
p = r.pipeline()
for o in objects_to_run:
p.srem('objects_already_running', o)
p.execute()
while objects_to_wait_for:
p = r.pipeline()
for o in objects_to_wait_for:
p.sismember('objects_already_running',o)
redis_result = p.execute()
objects_to_wait_for = [objects_to_wait_for[i] for i, member in enumerate(redis_result) if member]
# Probably need to add some sort of timeout here or in redis
sleep(30)
我对Redis非常陌生,所以我主要想问有没有更有效的方法来操作Redis,以达到同样的效果。更广泛地说,我想知道Redis是否真的必要,或者说它是否是处理这个问题的正确方法。感觉应该有更好的方式来让Django模型和Redis互动。最后,我还想知道这段代码是否真的线程安全。有没有人能指出我逻辑上的漏洞?
任何评论都很受欢迎。
1 个回答
2
你能不能稍微换个方式来设计这个呢?具体来说,我会先启动每个对象的任务,然后把这些长时间运行的工作的信息存储到某个地方(比如数据库、缓存等)。当每个对象的任务完成后,它会更新长时间运行的工作信息,并检查所有的任务是否都已经完成。如果都完成了,你就可以运行那些需要在长时间任务完成后执行的代码。
这样做的好处是,你的服务器在等待其他事情发生的时候,不会被一个线程占用。在客户端,你可以定期检查长时间任务的状态,甚至可以用完成的对象数量来更新一个进度条,如果你想的话。