如何在Django测试中强制事务中的竞争条件?

9 投票
2 回答
2456 浏览
提问于 2025-04-17 23:30

有没有办法用多个线程来运行django的测试,并故意制造竞争条件?我想确保处理事务错误的那段代码能够被执行。更具体一点,我想能启动两个线程,它们会尝试在数据库上执行相同的操作,其中一个成功,另一个失败。我使用的是django自带的测试框架。

Python伪代码:

def some_method():
  try
    with transaction.atomic():
      objectA = get_object_from_db()
      objectA.delete()
  except Error:
    # error handling code to be run

  
class TestClass(TransactionalTestCase):
  def test_some_method():
    # run two threads and make sure that the race condition was present and some_method recovered successfully


            

2 个回答

2

竞争条件会导致两种异常情况:丢失更新写偏差。所以,使用两个线程,你可以测试在某些隔离级别下是否会发生丢失更新写偏差

我创建了两组代码来测试在PostgreSQL的默认隔离级别已提交读下的丢失更新写偏差,如下所示:

我将解释:

  • 丢失更新写偏差 => 这里
  • 在什么隔离级别下发生什么异常 => 这里

<丢失更新>

首先,我用models.py创建了一个store_product,包含idnamestock,如下所示:

store_product表:

id name stock
1 苹果 10
2 橙子 20
# "store/models.py"

from django.db import models

class Product(models.Model):
    name = models.CharField(max_length=30)
    stock = models.IntegerField()

然后,我创建并运行了丢失更新的测试代码,如下所示:

# "store/views.py"

from django.db import transaction
from time import sleep
from .models import Person
from threading import Thread
from django.http import HttpResponse

@transaction.atomic
def transaction1(flow):
    while True:
        while True:
            if flow[0] == "Step 1":
                sleep(0.1)
                print("T1", flow[0], "BEGIN")
                flow[0] = "Step 2"
                break
        
        while True:
            if flow[0] == "Step 2":
                sleep(0.1)
                print("T1", flow[0], "SELECT")
                product = Product.objects.get(id=2)
                print(product.id, product.name, product.stock)
                flow[0] = "Step 3"
                break

        while True:
            if flow[0] == "Step 5":
                sleep(0.1)
                print("T1", flow[0], "UPDATE")
                Product.objects.filter(id=2).update(stock=13)
                flow[0] = "Step 6"
                break
        
        while True:
            if flow[0] == "Step 6":
                sleep(0.1)
                print("T1", flow[0], "COMMIT")
                flow[0] = "Step 7"
                break
        break

@transaction.atomic
def transaction2(flow):
    while True:
        while True:
            if flow[0] == "Step 3":
                sleep(0.1)
                print("T2", flow[0], "BEGIN")
                flow[0] = "Step 4"
                break

        while True:
            if flow[0] == "Step 4":
                sleep(0.1)
                print("T2", flow[0], "SELECT")
                product = Product.objects.get(id=2)
                print(product.id, product.name, product.stock)
                flow[0] = "Step 5"
                break
        
        while True:
            if flow[0] == "Step 7":
                sleep(0.1)
                print("T2", flow[0], "UPDATE")
                Product.objects.filter(id=2).update(stock=16)                
                flow[0] = "Step 8"
                break

        while True:
            if flow[0] == "Step 8":
                sleep(0.1)
                print("T2", flow[0], "COMMIT")
                break
        break

def call_transcations(request): 
    flow = ["Step 1"]

    thread1 = Thread(target=transaction1, args=(flow,), daemon=True)
    thread2 = Thread(target=transaction2, args=(flow,), daemon=True)

    thread1.start()
    thread2.start()

    thread1.join()
    thread2.join()

    return HttpResponse("Call_transcations")

根据控制台上的结果,发生了丢失更新,因为在PostgreSQL已提交读隔离级别下,会发生丢失更新

T1 Step 1 BEGIN
T1 Step 2 SELECT # Reads the same row
2 Orange 20
T2 Step 3 BEGIN
T2 Step 4 SELECT # Reads the same row
2 Orange 20
T1 Step 5 UPDATE # Writes "stock"
T1 Step 6 COMMIT # And commits the write
T2 Step 7 UPDATE # Overwrites "stock"
T2 Step 8 COMMIT # And commits the overwrite

此外,我还获取了PostgreSQL的SQL查询日志,如下所示。你可以查看如何记录PostgreSQL的SQL查询

[20504]: BEGIN
[20504]: SELECT "store_product"."id", "store_product"."name", "store_product"."stock" 
         FROM "store_product" 
         WHERE "store_product"."id" = 2 
         LIMIT 21
[3840]: BEGIN
[3840]: SELECT "store_product"."id", "store_product"."name", "store_product"."stock" 
        FROM "store_product" 
        WHERE "store_product"."id" = 2 
        LIMIT 21
[20504]: UPDATE "store_product" SET "stock" = 13 
         WHERE "store_product"."id" = 2
[20504]: COMMIT
[3840]: UPDATE "store_product" SET "stock" = 16 
        WHERE "store_product"."id" = 2
[3840]: COMMIT

下面的表格显示了上述PostgreSQL的流程和SQL查询日志:

流程 事务1 (T1) 事务2 (T2) 说明
步骤1 BEGIN; T1开始。
步骤2 SELECT "store_product"."id", "store_product"."name", "store_product"."stock" FROM "store_product" WHERE "store_product"."id" = 2 LIMIT 21;

2 橙子 20
T1读取到的20,稍后更新为13,因为有顾客买了7个橙子。
步骤3 BEGIN; T2开始。
步骤4 SELECT "store_product"."id", "store_product"."name", "store_product"."stock" FROM "store_product" WHERE "store_product"."id" = 2 LIMIT 21;

2 橙子 20
T2读取到的20,稍后更新为16,因为有顾客买了4个橙子。
步骤5 UPDATE "store_product" SET "stock" = 13 WHERE "store_product"."id" = 2; T1将20更新为13
步骤6 COMMIT; T1提交。
步骤7 UPDATE "store_product" SET "stock" = 16 WHERE "store_product"."id" = 2; T2在T1提交后,将13更新为16
步骤8 COMMIT; T2提交。

*发生了丢失更新。

接下来,由于在PostgreSQL可重复读可串行化隔离级别下丢失更新不会发生,所以我将隔离级别设置为可重复读,如下所示:

postgres=# ALTER DATABASE postgres SET DEFAULT_TRANSACTION_ISOLATION TO 'REPEATABLE READ';

然后再次运行上述代码,结果是丢失更新没有发生,因为在T2 步骤7时,更新被拒绝,出现了两个异常,如下所示:

T1 Step 1 BEGIN
T1 Step 2 SELECT
2 Orange 20
T2 Step 3 BEGIN
T2 Step 4 SELECT
2 Orange 20
T1 Step 5 UPDATE
T1 Step 6 COMMIT
T2 Step 7 UPDATE    # ↓↓ 2 exceptions occurred ↓↓
psycopg2.errors.SerializationFailure: could not serialize access due to concurrent update
django.db.utils.OperationalError: could not serialize access due to concurrent update

此外,更新被拒绝,出现了一个错误,第二个事务被回滚,依据PostgreSQL的SQL查询日志如下:

[14072]: BEGIN
[14072]: SELECT "store_product"."id", "store_product"."name", "store_product"."stock" 
         FROM "store_product" 
         WHERE "store_product"."id" = 2 
         LIMIT 21
[1834]: BEGIN
[1834]: SELECT "store_product"."id", "store_product"."name", "store_product"."stock" 
        FROM "store_product" 
        WHERE "store_product"."id" = 2 
        LIMIT 21
[14072]: UPDATE "store_product" 
         SET "stock" = 13 
         WHERE "store_product"."id" = 2
[14072]: COMMIT
[1834] ERROR: could not serialize access due to concurrent update # Here
[1834] STATEMENT: UPDATE "store_product" 
                  SET "stock" = 16 
                  WHERE "store_product"."id" = 2
[1834]: ROLLBACK # Here

下面的表格显示了上述PostgreSQL的流程和SQL查询日志:

流程 事务1 (T1) 事务2 (T2) 说明
步骤1 BEGIN; T1开始。
步骤2 SELECT "store_product"."id", "store_product"."name", "store_product"."stock" FROM "store_product" WHERE "store_product"."id" = 2 LIMIT 21;

2 橙子 20
T1读取到的20,稍后更新为13,因为有顾客买了7个橙子。
步骤3 BEGIN; T2开始。
步骤4 SELECT "store_product"."id", "store_product"."name", "store_product"."stock" FROM "store_product" WHERE "store_product"."id" = 2 LIMIT 21;

2 橙子 20
T2读取到的20,稍后更新为16,因为有顾客买了4个橙子。
步骤5 UPDATE "store_product" SET "stock" = 13 WHERE "store_product"."id" = 2; T1将20更新为13
步骤6 COMMIT; T1提交。
步骤7 UPDATE "store_product" SET "stock" = 16 WHERE "store_product"."id" = 2;

错误:由于并发更新无法序列化访问
T2无法在T1提交后将13更新为16
步骤8 ROLLBACK; T2回滚。

*没有发生丢失更新。

<写偏差>

首先,我用models.py创建了一个store_doctor,包含idnameon_call,如下所示:

store_doctor表:

id name on_call
1 约翰
2 丽莎
# "store/models.py"

from django.db import models

class Doctor(models.Model):
    name = models.CharField(max_length=30)
    on_call = models.BooleanField()

然后,我创建并运行了写偏差的测试代码,如下所示:

# "store/views.py"

# ...

@transaction.atomic
def transaction1(flow):
    while True:
        while True:
            if flow[0] == "Step 1":
                print("T1", flow[0], "BEGIN")
                flow[0] = "Step 2"
                break
        
        while True:
            if flow[0] == "Step 2":
                print("T1", flow[0], "SELECT")
                doctor_count = Doctor.objects.filter(on_call=True).count()
                print(doctor_count)
                flow[0] = "Step 3"
                break

        while True:
            if flow[0] == "Step 5":
                print("T1", flow[0], "UPDATE")
                Doctor.objects.filter(id=1).update(on_call=False)
                flow[0] = "Step 6"
                break
        
        while True:
            if flow[0] == "Step 6":
                print("T1", flow[0], "COMMIT")
                flow[0] = "Step 7"
                break
        break

@transaction.atomic
def transaction2(flow):
    while True:
        while True:
            if flow[0] == "Step 3":
                print("T2", flow[0], "BEGIN")
                flow[0] = "Step 4"
                break

        while True:
            if flow[0] == "Step 4":
                print("T2", flow[0], "SELECT")
                doctor_count = Doctor.objects.filter(on_call=True).count()
                print(doctor_count)
                flow[0] = "Step 5"
                break
        
        while True:
            if flow[0] == "Step 7":
                print("T2", flow[0], "UPDATE")
                Doctor.objects.filter(id=2).update(on_call=False)
                flow[0] = "Step 8"
                break

        while True:
            if flow[0] == "Step 8":
                print("T2", flow[0], "COMMIT")
                break
        break

# ...

根据控制台上的结果,发生了写偏差,因为在PostgreSQL已提交读隔离级别下,会发生写偏差

T1 Step 1 BEGIN
T1 Step 2 SELECT # Reads the same data
2 
T2 Step 3 BEGIN
T2 Step 4 SELECT # Reads the same data
2
T1 Step 5 UPDATE # Writes 'False' to John's "on_call" 
T1 Step 6 COMMIT # And commits the write
T2 Step 7 UPDATE # Writes 'False' to Lisa's "on_call" 
T2 Step 8 COMMIT # And commits the write

此外,我还获取了PostgreSQL的SQL查询日志,如下所示:

[11252]: BEGIN
[11252]: SELECT COUNT(*) 
         AS "__count" 
         FROM "store_doctor" 
         WHERE "store_doctor"."on_call"
[2368]: BEGIN
[2368]: SELECT COUNT(*) 
        AS "__count" 
        FROM "store_doctor" 
        WHERE "store_doctor"."on_call"
[11252]: UPDATE "store_doctor" 
         SET "on_call" = false 
         WHERE "store_doctor"."id" = 1
[11252]: COMMIT
[2368]: UPDATE "store_doctor" 
        SET "on_call" = false 
        WHERE "store_doctor"."id" = 2
[2368]: COMMIT

下面的表格显示了上述PostgreSQL的流程和SQL查询日志:

流程 事务1 (T1) 事务2 (T2) 说明
步骤1 BEGIN; T1开始。
步骤2 SELECT COUNT(*) AS "__count" FROM "store_doctor" WHERE "store_doctor"."on_call";

2
T1读取到的2,所以约翰可以休息。
步骤3 BEGIN; T2开始。
步骤4 SELECT COUNT(*) AS "__count" FROM "store_doctor" WHERE "store_doctor"."on_call";

2
T2读取到的2,所以丽莎可以休息。
步骤5 UPDATE "store_doctor" SET "on_call" = false WHERE "store_doctor"."id" = 1; T1将True更新为False,表示约翰休息。
步骤6 COMMIT; T1提交。
步骤7 UPDATE "store_doctor" SET "on_call" = false WHERE "store_doctor"."id" = 2; T2将True更新为False,表示丽莎休息。
步骤8 COMMIT; T2提交。

约翰和丽莎都休息。

*发生了写偏差。

接下来,由于在PostgreSQL可串行化隔离级别下写偏差不会发生,所以我将隔离级别设置为可串行化,如下所示:

postgres=# ALTER DATABASE postgres SET DEFAULT_TRANSACTION_ISOLATION TO 'SERIALIZABLE';

然后再次运行上述代码,结果是写偏差没有发生,因为在T2 步骤7时,更新被拒绝,出现了两个异常,如下所示:

T1 Step 1 BEGIN
T1 Step 2 SELECT
2
T2 Step 3 BEGIN
T2 Step 4 SELECT
2
T1 Step 5 UPDATE
T1 Step 6 COMMIT
T2 Step 7 UPDATE    # ↓↓ 2 exceptions occurred ↓↓
psycopg2.errors.SerializationFailure: could not serialize access due to read/write dependencies among transactions
django.db.utils.OperationalError: could not serialize access due to read/write dependencies among transactions

此外,更新被拒绝,出现了一个错误,第二个事务被回滚,依据PostgreSQL的SQL查询日志如下:

[80642]: BEGIN
[80642]: SELECT COUNT(*) AS "__count" 
         FROM "store_doctor" 
         WHERE "store_doctor"."on_call"
[4244]: BEGIN
[4244]: SELECT COUNT(*) AS "__count" 
        FROM "store_doctor" 
        WHERE "store_doctor"."on_call"
[80642]: UPDATE "store_doctor" 
         SET "on_call" = false 
         WHERE "store_doctor"."id" = 1
[80642]: COMMIT
[4244] ERROR: could not serialize access due to read/write dependencies among transactions # Here
[4244] DETAIL: Reason code: Canceled on identification as a pivot, during write.
[4244] HINT: The transaction might succeed if retried.
[4244] STATEMENT: UPDATE "store_doctor" 
                  SET "on_call" = false 
                  WHERE "store_doctor"."id" = 2
[4244]: ROLLBACK # Here

下面的表格显示了上述PostgreSQL的流程和SQL查询日志:

流程 事务1 (T1) 事务2 (T2) 说明
步骤1 BEGIN; T1开始。
步骤2 SELECT COUNT(*) AS "__count" FROM "store_doctor" WHERE "store_doctor"."on_call";

2
T1读取到的2,所以约翰可以休息。
步骤3 BEGIN; T2开始。
步骤4 SELECT COUNT(*) AS "__count" FROM "store_doctor" WHERE "store_doctor"."on_call";

2
T2读取到的2,所以丽莎可以休息。
步骤5 UPDATE "store_doctor" SET "on_call" = false WHERE "store_doctor"."id" = 1; T1将True更新为False,表示约翰休息。
步骤6 COMMIT; T1提交。
步骤7 UPDATE "store_doctor" SET "on_call" = false WHERE "store_doctor"."id" = 2;

错误:由于事务之间的读/写依赖关系无法序列化访问
T2无法将True更新为False,表示丽莎无法休息。
步骤8 ROLLBACK; T2回滚。

只有约翰休息。

*没有发生写偏差。

2

从我看到的内容来看,你想要处理异常的地方。我想问你一个问题:在多线程竞争的情况下,你真的需要触发这个异常,还是说你只是想确保如果发生了这种情况,它能够正确处理?

我会这样做:

import unittest
import mock


# added just mimic django's orm for the purpose of the demo
class QuerySet(object):
  def delete(self):
    pass

def get_object_from_db():
  return QuerySet()

def some_method():
  try:
    objectA = get_object_from_db()
    objectA.delete()
    return True # this should be whatever you want to do in case it worked
  except Exception: # I would look up and check what ever error the django orm is raising.
    return False # this should be whatever you want to do in case it didn't work

class TestClass(unittest.TestCase):
  def test_some_method_in_case_it_worked(self):
    self.assertEqual(some_method(), True)

  def test_some_method_in_case_it_did_not_work(self):
    with mock.patch('__main__.get_object_from_db') as mocked_get_object_from_db:
      mocked_get_object_from_db.side_effect = RuntimeError('a message')
      self.assertEqual(some_method(), False)

if __name__ == '__main__':
    unittest.main()

mock 现在已经成为标准库的一部分。 https://pypi.python.org/pypi/mock

这样做可以避免你遇到那些不稳定的测试。你知道的,就是那些随机失败的测试。

撰写回答