Django:如何将批量更新/插入操作封装在事务中?

5 投票
1 回答
6604 浏览
提问于 2025-04-18 10:31

这是我的使用场景:

  • 我有多个 celery 任务是同时运行的
  • 每个任务可能会批量创建更新很多对象。为此我使用了 django-bulk

所以基本上我在使用一个非常方便的函数 insert_or_update_many

  1. 它首先会进行一次选择(Select)
  2. 如果找到了对象,它就会更新这些对象
  3. 如果没有找到,就会创建新的对象

但是这样会引发并发问题。例如:如果在第一步时某个对象不存在,那么它会被加入到一个待插入的对象列表中。但是在这段时间内,可能会有另一个 Celery 任务创建了这个对象,当它尝试进行批量插入(第三步)时,我就会收到重复条目的错误。

我想我需要把这三步放在一个“阻塞”的块里。我查阅了一些关于事务的资料,尝试把第一、二、三步放在 with transaction.commit_on_success: 这个块里

with transaction.commit_on_success():
    cursor.execute(sql, parameters)
    existing = set(cursor.fetchall())
    if not skip_update:
        # Find the objects that need to be updated
        update_objects = [o for (o, k) in object_keys if k in existing]
        _update_many(model, update_objects, keys=keys, using=using)
    # Find the objects that need to be inserted.
    insert_objects = [o for (o, k) in object_keys if k not in existing]
    # Filter out any duplicates in the insertion
    filtered_objects = _filter_objects(con, insert_objects, key_fields)
    _insert_many(model, filtered_objects, using=using)

但这对我来说不起作用。我不太确定自己是否完全理解事务。我基本上需要一个块,确保在这个块里进行的多个操作不会被其他进程或线程在写入时访问我的数据库资源。

1 个回答

10

我基本上需要一个地方,可以放几个操作,确保没有其他进程或线程在写入我的数据库资源。

Django的事务一般来说不能保证这一点。如果你来自计算机科学的其他领域,你可能会自然地认为事务是以这种方式阻塞的,但在数据库的世界里,有不同类型的锁,存在不同的隔离级别,而且每个数据库的情况也不同。因此,为了确保你的事务能做到这一点,你需要了解事务、锁及其性能特点,以及你的数据库提供的控制机制。

不过,让一堆进程都试图锁定表以进行竞争插入,这听起来并不是个好主意。如果冲突很少,你可以使用一种乐观锁定的方法,如果失败就重试事务。或者,你可以把所有的celery任务指向一个单独的进程(如果你最终还是要获取表锁,那么并行处理并没有性能优势)。

我建议你先放弃批量操作,先用Django的update_or_create一次只处理一行。只要你的数据库有防止重复条目的约束(听起来是有的),这应该就能避免你上面提到的竞争条件。如果性能真的不行,再考虑更复杂的选项。

采取乐观并发控制的方法意味着,不是通过获取表锁来防止冲突,而是正常进行操作,如果出现问题再重试。在你的情况下,可能看起来像这样:

while True:
    try:
        with transaction.atomic():
            # do your bulk insert / update operation
    except IntegrityError:
        pass
    else:
        break

所以如果你遇到竞争条件,结果的IntegrityError会导致transaction.atomic()块回滚之前所做的任何更改,而while循环会强制重试事务(此时批量操作将看到新存在的行,并标记为更新而不是插入)。

这种方法在冲突很少的情况下效果很好,但如果冲突频繁,就会表现得很糟糕。

撰写回答