Celery/Redis相同任务被多次并行执行

9 投票
1 回答
6264 浏览
提问于 2025-04-18 14:26

我有两个自定义任务(TaskATaskB),它们都是从 celery.Task 这个类继承来的。调度器会不定期地启动 TaskA,而 TaskA 每次会用不同的参数启动 NTaskB。但是有时候,同一个 TaskB,用相同的参数,会被同时执行两次,这就导致了数据库出现了各种问题。

class TaskA(celery.Task):

    def run(self, *args, **kwargs):
        objects = MyModel.objects.filter(processed=False)\
                                 .values_list('id', flat=True)
        task_b = TaskB()
        for o in objects:
            o.apply_async(args=[o, ])

class TaskB(celery.Task):

    def run(self, obj_id, *args, **kwargs):
        obj = MyModel.objects.get(id=obj_id)
        # do some stuff with obj

我尝试过的办法

我试着使用 celery.group,希望能解决这个问题,但结果却出现了错误,提示说 run 需要两个参数,但我没有提供任何参数。

这是我尝试用 celery.group 启动 TaskB 的方式:

# somewhere in TaskA
task_b = TaskB()
g = celery.group([task_b.s(id) for id in objects])
g.apply_async()

我还试过这样:

# somewhere in TaskA
task_b = TaskB()
g = celery.group([task_b.run(id) for id in objects])
g.apply_async()

这会在 g.apply_async() 之前直接执行任务。

问题

这个问题是因为我启动任务的方式不对,还是其他原因?这是正常现象吗?

附加信息

在我的本地机器上,我运行的是 celery 3.1.13RabbitMQ 3.3.4,而在服务器上,celery 3.1.13 是和 Redis 2.8.9 一起运行的。在本地机器上,我没有看到这种情况,每个任务只执行一次。而在服务器上,我看到有 1 到 10 个这样的任务被连续执行了两次。

这是我在本地机器和服务器上运行 celery 的方式:

celery_beat: celery -A proj beat -l info

celery1: celery -A proj worker -Q default -l info --purge -n default_worker -P eventlet -c 50

celery2: celery -A proj worker -Q long -l info --purge -n long_worker -P eventlet -c 200

有效的解决办法

我在 TaskB 上引入了一个锁,锁的依据是它接收到的参数。经过大约 10 小时的测试,我发现到底是什么被执行了两次,但这个锁可以防止数据库的冲突。这个办法解决了我的问题,但我还是想了解为什么会发生这种情况。

1 个回答

6

你有没有按照Celery使用Redis的说明,设置fanout_prefixfanout_patterns?我在用Celery和Redis的时候,没有遇到这个问题。

撰写回答