Celery/Redis相同任务被多次并行执行
我有两个自定义任务(TaskA
和 TaskB
),它们都是从 celery.Task
这个类继承来的。调度器会不定期地启动 TaskA
,而 TaskA
每次会用不同的参数启动 N
次 TaskB
。但是有时候,同一个 TaskB
,用相同的参数,会被同时执行两次,这就导致了数据库出现了各种问题。
class TaskA(celery.Task):
def run(self, *args, **kwargs):
objects = MyModel.objects.filter(processed=False)\
.values_list('id', flat=True)
task_b = TaskB()
for o in objects:
o.apply_async(args=[o, ])
class TaskB(celery.Task):
def run(self, obj_id, *args, **kwargs):
obj = MyModel.objects.get(id=obj_id)
# do some stuff with obj
我尝试过的办法
我试着使用 celery.group
,希望能解决这个问题,但结果却出现了错误,提示说 run
需要两个参数,但我没有提供任何参数。
这是我尝试用 celery.group
启动 TaskB
的方式:
# somewhere in TaskA
task_b = TaskB()
g = celery.group([task_b.s(id) for id in objects])
g.apply_async()
我还试过这样:
# somewhere in TaskA
task_b = TaskB()
g = celery.group([task_b.run(id) for id in objects])
g.apply_async()
这会在 g.apply_async()
之前直接执行任务。
问题
这个问题是因为我启动任务的方式不对,还是其他原因?这是正常现象吗?
附加信息
在我的本地机器上,我运行的是 celery 3.1.13
和 RabbitMQ 3.3.4
,而在服务器上,celery 3.1.13
是和 Redis 2.8.9
一起运行的。在本地机器上,我没有看到这种情况,每个任务只执行一次。而在服务器上,我看到有 1 到 10 个这样的任务被连续执行了两次。
这是我在本地机器和服务器上运行 celery 的方式:
celery_beat: celery -A proj beat -l info
celery1: celery -A proj worker -Q default -l info --purge -n default_worker -P eventlet -c 50
celery2: celery -A proj worker -Q long -l info --purge -n long_worker -P eventlet -c 200
有效的解决办法
我在 TaskB
上引入了一个锁,锁的依据是它接收到的参数。经过大约 10 小时的测试,我发现到底是什么被执行了两次,但这个锁可以防止数据库的冲突。这个办法解决了我的问题,但我还是想了解为什么会发生这种情况。