如何在Django测试中强制事务中的竞争条件?
有没有办法用多个线程来运行django的测试,并故意制造竞争条件?我想确保处理事务错误的那段代码能够被执行。更具体一点,我想能启动两个线程,它们会尝试在数据库上执行相同的操作,其中一个成功,另一个失败。我使用的是django自带的测试框架。
Python伪代码:
def some_method():
try
with transaction.atomic():
objectA = get_object_from_db()
objectA.delete()
except Error:
# error handling code to be run
class TestClass(TransactionalTestCase):
def test_some_method():
# run two threads and make sure that the race condition was present and some_method recovered successfully
2 个回答
竞争条件会导致两种异常情况:丢失更新和写偏差。所以,使用两个线程,你可以测试在某些隔离级别下是否会发生丢失更新或写偏差。
我创建了两组代码来测试在PostgreSQL的默认隔离级别已提交读下的丢失更新或写偏差,如下所示:
我将解释:
<丢失更新>
首先,我用models.py
创建了一个store_product
表,包含id
、name
和stock
,如下所示:
store_product
表:
id | name | stock |
---|---|---|
1 | 苹果 | 10 |
2 | 橙子 | 20 |
# "store/models.py"
from django.db import models
class Product(models.Model):
name = models.CharField(max_length=30)
stock = models.IntegerField()
然后,我创建并运行了丢失更新的测试代码,如下所示:
# "store/views.py"
from django.db import transaction
from time import sleep
from .models import Person
from threading import Thread
from django.http import HttpResponse
@transaction.atomic
def transaction1(flow):
while True:
while True:
if flow[0] == "Step 1":
sleep(0.1)
print("T1", flow[0], "BEGIN")
flow[0] = "Step 2"
break
while True:
if flow[0] == "Step 2":
sleep(0.1)
print("T1", flow[0], "SELECT")
product = Product.objects.get(id=2)
print(product.id, product.name, product.stock)
flow[0] = "Step 3"
break
while True:
if flow[0] == "Step 5":
sleep(0.1)
print("T1", flow[0], "UPDATE")
Product.objects.filter(id=2).update(stock=13)
flow[0] = "Step 6"
break
while True:
if flow[0] == "Step 6":
sleep(0.1)
print("T1", flow[0], "COMMIT")
flow[0] = "Step 7"
break
break
@transaction.atomic
def transaction2(flow):
while True:
while True:
if flow[0] == "Step 3":
sleep(0.1)
print("T2", flow[0], "BEGIN")
flow[0] = "Step 4"
break
while True:
if flow[0] == "Step 4":
sleep(0.1)
print("T2", flow[0], "SELECT")
product = Product.objects.get(id=2)
print(product.id, product.name, product.stock)
flow[0] = "Step 5"
break
while True:
if flow[0] == "Step 7":
sleep(0.1)
print("T2", flow[0], "UPDATE")
Product.objects.filter(id=2).update(stock=16)
flow[0] = "Step 8"
break
while True:
if flow[0] == "Step 8":
sleep(0.1)
print("T2", flow[0], "COMMIT")
break
break
def call_transcations(request):
flow = ["Step 1"]
thread1 = Thread(target=transaction1, args=(flow,), daemon=True)
thread2 = Thread(target=transaction2, args=(flow,), daemon=True)
thread1.start()
thread2.start()
thread1.join()
thread2.join()
return HttpResponse("Call_transcations")
根据控制台上的结果,发生了丢失更新,因为在PostgreSQL的已提交读隔离级别下,会发生丢失更新:
T1 Step 1 BEGIN
T1 Step 2 SELECT # Reads the same row
2 Orange 20
T2 Step 3 BEGIN
T2 Step 4 SELECT # Reads the same row
2 Orange 20
T1 Step 5 UPDATE # Writes "stock"
T1 Step 6 COMMIT # And commits the write
T2 Step 7 UPDATE # Overwrites "stock"
T2 Step 8 COMMIT # And commits the overwrite
此外,我还获取了PostgreSQL的SQL查询日志,如下所示。你可以查看如何记录PostgreSQL的SQL查询:
[20504]: BEGIN
[20504]: SELECT "store_product"."id", "store_product"."name", "store_product"."stock"
FROM "store_product"
WHERE "store_product"."id" = 2
LIMIT 21
[3840]: BEGIN
[3840]: SELECT "store_product"."id", "store_product"."name", "store_product"."stock"
FROM "store_product"
WHERE "store_product"."id" = 2
LIMIT 21
[20504]: UPDATE "store_product" SET "stock" = 13
WHERE "store_product"."id" = 2
[20504]: COMMIT
[3840]: UPDATE "store_product" SET "stock" = 16
WHERE "store_product"."id" = 2
[3840]: COMMIT
下面的表格显示了上述PostgreSQL的流程和SQL查询日志:
流程 | 事务1 (T1) | 事务2 (T2) | 说明 |
---|---|---|---|
步骤1 | BEGIN; |
T1开始。 | |
步骤2 | SELECT "store_product"."id", "store_product"."name", "store_product"."stock" FROM "store_product" WHERE "store_product"."id" = 2 LIMIT 21; 2 橙子 20 |
T1读取到的20 ,稍后更新为13 ,因为有顾客买了7个橙子。 |
|
步骤3 | BEGIN; |
T2开始。 | |
步骤4 | SELECT "store_product"."id", "store_product"."name", "store_product"."stock" FROM "store_product" WHERE "store_product"."id" = 2 LIMIT 21; 2 橙子 20 |
T2读取到的20 ,稍后更新为16 ,因为有顾客买了4个橙子。 |
|
步骤5 | UPDATE "store_product" SET "stock" = 13 WHERE "store_product"."id" = 2; |
T1将20 更新为13 。 |
|
步骤6 | COMMIT; |
T1提交。 | |
步骤7 | UPDATE "store_product" SET "stock" = 16 WHERE "store_product"."id" = 2; |
T2在T1提交后,将13 更新为16 。 |
|
步骤8 | COMMIT; |
T2提交。*发生了丢失更新。 |
接下来,由于在PostgreSQL的可重复读
或可串行化
隔离级别下丢失更新不会发生,所以我将隔离级别设置为可重复读
,如下所示:
postgres=# ALTER DATABASE postgres SET DEFAULT_TRANSACTION_ISOLATION TO 'REPEATABLE READ';
然后再次运行上述代码,结果是丢失更新没有发生,因为在T2 步骤7
时,更新被拒绝,出现了两个异常,如下所示:
T1 Step 1 BEGIN
T1 Step 2 SELECT
2 Orange 20
T2 Step 3 BEGIN
T2 Step 4 SELECT
2 Orange 20
T1 Step 5 UPDATE
T1 Step 6 COMMIT
T2 Step 7 UPDATE # ↓↓ 2 exceptions occurred ↓↓
psycopg2.errors.SerializationFailure: could not serialize access due to concurrent update
django.db.utils.OperationalError: could not serialize access due to concurrent update
此外,更新被拒绝,出现了一个错误,第二个事务被回滚,依据PostgreSQL的SQL查询日志如下:
[14072]: BEGIN
[14072]: SELECT "store_product"."id", "store_product"."name", "store_product"."stock"
FROM "store_product"
WHERE "store_product"."id" = 2
LIMIT 21
[1834]: BEGIN
[1834]: SELECT "store_product"."id", "store_product"."name", "store_product"."stock"
FROM "store_product"
WHERE "store_product"."id" = 2
LIMIT 21
[14072]: UPDATE "store_product"
SET "stock" = 13
WHERE "store_product"."id" = 2
[14072]: COMMIT
[1834] ERROR: could not serialize access due to concurrent update # Here
[1834] STATEMENT: UPDATE "store_product"
SET "stock" = 16
WHERE "store_product"."id" = 2
[1834]: ROLLBACK # Here
下面的表格显示了上述PostgreSQL的流程和SQL查询日志:
流程 | 事务1 (T1) | 事务2 (T2) | 说明 |
---|---|---|---|
步骤1 | BEGIN; |
T1开始。 | |
步骤2 | SELECT "store_product"."id", "store_product"."name", "store_product"."stock" FROM "store_product" WHERE "store_product"."id" = 2 LIMIT 21; 2 橙子 20 |
T1读取到的20 ,稍后更新为13 ,因为有顾客买了7个橙子。 |
|
步骤3 | BEGIN; |
T2开始。 | |
步骤4 | SELECT "store_product"."id", "store_product"."name", "store_product"."stock" FROM "store_product" WHERE "store_product"."id" = 2 LIMIT 21; 2 橙子 20 |
T2读取到的20 ,稍后更新为16 ,因为有顾客买了4个橙子。 |
|
步骤5 | UPDATE "store_product" SET "stock" = 13 WHERE "store_product"."id" = 2; |
T1将20 更新为13 。 |
|
步骤6 | COMMIT; |
T1提交。 | |
步骤7 | UPDATE "store_product" SET "stock" = 16 WHERE "store_product"."id" = 2; 错误:由于并发更新无法序列化访问 |
T2无法在T1提交后将13 更新为16 。 |
|
步骤8 | ROLLBACK; |
T2回滚。*没有发生丢失更新。 |
<写偏差>
首先,我用models.py
创建了一个store_doctor
表,包含id
、name
和on_call
,如下所示:
store_doctor
表:
id | name | on_call |
---|---|---|
1 | 约翰 | 是 |
2 | 丽莎 | 是 |
# "store/models.py"
from django.db import models
class Doctor(models.Model):
name = models.CharField(max_length=30)
on_call = models.BooleanField()
然后,我创建并运行了写偏差的测试代码,如下所示:
# "store/views.py"
# ...
@transaction.atomic
def transaction1(flow):
while True:
while True:
if flow[0] == "Step 1":
print("T1", flow[0], "BEGIN")
flow[0] = "Step 2"
break
while True:
if flow[0] == "Step 2":
print("T1", flow[0], "SELECT")
doctor_count = Doctor.objects.filter(on_call=True).count()
print(doctor_count)
flow[0] = "Step 3"
break
while True:
if flow[0] == "Step 5":
print("T1", flow[0], "UPDATE")
Doctor.objects.filter(id=1).update(on_call=False)
flow[0] = "Step 6"
break
while True:
if flow[0] == "Step 6":
print("T1", flow[0], "COMMIT")
flow[0] = "Step 7"
break
break
@transaction.atomic
def transaction2(flow):
while True:
while True:
if flow[0] == "Step 3":
print("T2", flow[0], "BEGIN")
flow[0] = "Step 4"
break
while True:
if flow[0] == "Step 4":
print("T2", flow[0], "SELECT")
doctor_count = Doctor.objects.filter(on_call=True).count()
print(doctor_count)
flow[0] = "Step 5"
break
while True:
if flow[0] == "Step 7":
print("T2", flow[0], "UPDATE")
Doctor.objects.filter(id=2).update(on_call=False)
flow[0] = "Step 8"
break
while True:
if flow[0] == "Step 8":
print("T2", flow[0], "COMMIT")
break
break
# ...
根据控制台上的结果,发生了写偏差,因为在PostgreSQL的已提交读隔离级别下,会发生写偏差:
T1 Step 1 BEGIN
T1 Step 2 SELECT # Reads the same data
2
T2 Step 3 BEGIN
T2 Step 4 SELECT # Reads the same data
2
T1 Step 5 UPDATE # Writes 'False' to John's "on_call"
T1 Step 6 COMMIT # And commits the write
T2 Step 7 UPDATE # Writes 'False' to Lisa's "on_call"
T2 Step 8 COMMIT # And commits the write
此外,我还获取了PostgreSQL的SQL查询日志,如下所示:
[11252]: BEGIN
[11252]: SELECT COUNT(*)
AS "__count"
FROM "store_doctor"
WHERE "store_doctor"."on_call"
[2368]: BEGIN
[2368]: SELECT COUNT(*)
AS "__count"
FROM "store_doctor"
WHERE "store_doctor"."on_call"
[11252]: UPDATE "store_doctor"
SET "on_call" = false
WHERE "store_doctor"."id" = 1
[11252]: COMMIT
[2368]: UPDATE "store_doctor"
SET "on_call" = false
WHERE "store_doctor"."id" = 2
[2368]: COMMIT
下面的表格显示了上述PostgreSQL的流程和SQL查询日志:
流程 | 事务1 (T1) | 事务2 (T2) | 说明 |
---|---|---|---|
步骤1 | BEGIN; |
T1开始。 | |
步骤2 | SELECT COUNT(*) AS "__count" FROM "store_doctor" WHERE "store_doctor"."on_call"; 2 |
T1读取到的2 ,所以约翰可以休息。 |
|
步骤3 | BEGIN; |
T2开始。 | |
步骤4 | SELECT COUNT(*) AS "__count" FROM "store_doctor" WHERE "store_doctor"."on_call"; 2 |
T2读取到的2 ,所以丽莎可以休息。 |
|
步骤5 | UPDATE "store_doctor" SET "on_call" = false WHERE "store_doctor"."id" = 1; |
T1将True 更新为False ,表示约翰休息。 |
|
步骤6 | COMMIT; |
T1提交。 | |
步骤7 | UPDATE "store_doctor" SET "on_call" = false WHERE "store_doctor"."id" = 2; |
T2将True 更新为False ,表示丽莎休息。 |
|
步骤8 | COMMIT; |
T2提交。约翰和丽莎都休息。*发生了写偏差。 |
接下来,由于在PostgreSQL的可串行化
隔离级别下写偏差不会发生,所以我将隔离级别设置为可串行化
,如下所示:
postgres=# ALTER DATABASE postgres SET DEFAULT_TRANSACTION_ISOLATION TO 'SERIALIZABLE';
然后再次运行上述代码,结果是写偏差没有发生,因为在T2 步骤7
时,更新被拒绝,出现了两个异常,如下所示:
T1 Step 1 BEGIN
T1 Step 2 SELECT
2
T2 Step 3 BEGIN
T2 Step 4 SELECT
2
T1 Step 5 UPDATE
T1 Step 6 COMMIT
T2 Step 7 UPDATE # ↓↓ 2 exceptions occurred ↓↓
psycopg2.errors.SerializationFailure: could not serialize access due to read/write dependencies among transactions
django.db.utils.OperationalError: could not serialize access due to read/write dependencies among transactions
此外,更新被拒绝,出现了一个错误,第二个事务被回滚,依据PostgreSQL的SQL查询日志如下:
[80642]: BEGIN
[80642]: SELECT COUNT(*) AS "__count"
FROM "store_doctor"
WHERE "store_doctor"."on_call"
[4244]: BEGIN
[4244]: SELECT COUNT(*) AS "__count"
FROM "store_doctor"
WHERE "store_doctor"."on_call"
[80642]: UPDATE "store_doctor"
SET "on_call" = false
WHERE "store_doctor"."id" = 1
[80642]: COMMIT
[4244] ERROR: could not serialize access due to read/write dependencies among transactions # Here
[4244] DETAIL: Reason code: Canceled on identification as a pivot, during write.
[4244] HINT: The transaction might succeed if retried.
[4244] STATEMENT: UPDATE "store_doctor"
SET "on_call" = false
WHERE "store_doctor"."id" = 2
[4244]: ROLLBACK # Here
下面的表格显示了上述PostgreSQL的流程和SQL查询日志:
流程 | 事务1 (T1) | 事务2 (T2) | 说明 |
---|---|---|---|
步骤1 | BEGIN; |
T1开始。 | |
步骤2 | SELECT COUNT(*) AS "__count" FROM "store_doctor" WHERE "store_doctor"."on_call"; 2 |
T1读取到的2 ,所以约翰可以休息。 |
|
步骤3 | BEGIN; |
T2开始。 | |
步骤4 | SELECT COUNT(*) AS "__count" FROM "store_doctor" WHERE "store_doctor"."on_call"; 2 |
T2读取到的2 ,所以丽莎可以休息。 |
|
步骤5 | UPDATE "store_doctor" SET "on_call" = false WHERE "store_doctor"."id" = 1; |
T1将True 更新为False ,表示约翰休息。 |
|
步骤6 | COMMIT; |
T1提交。 | |
步骤7 | UPDATE "store_doctor" SET "on_call" = false WHERE "store_doctor"."id" = 2; 错误:由于事务之间的读/写依赖关系无法序列化访问 |
T2无法将True 更新为False ,表示丽莎无法休息。 |
|
步骤8 | ROLLBACK; |
T2回滚。只有约翰休息。*没有发生写偏差。 |
从我看到的内容来看,你想要处理异常的地方。我想问你一个问题:在多线程竞争的情况下,你真的需要触发这个异常,还是说你只是想确保如果发生了这种情况,它能够正确处理?
我会这样做:
import unittest
import mock
# added just mimic django's orm for the purpose of the demo
class QuerySet(object):
def delete(self):
pass
def get_object_from_db():
return QuerySet()
def some_method():
try:
objectA = get_object_from_db()
objectA.delete()
return True # this should be whatever you want to do in case it worked
except Exception: # I would look up and check what ever error the django orm is raising.
return False # this should be whatever you want to do in case it didn't work
class TestClass(unittest.TestCase):
def test_some_method_in_case_it_worked(self):
self.assertEqual(some_method(), True)
def test_some_method_in_case_it_did_not_work(self):
with mock.patch('__main__.get_object_from_db') as mocked_get_object_from_db:
mocked_get_object_from_db.side_effect = RuntimeError('a message')
self.assertEqual(some_method(), False)
if __name__ == '__main__':
unittest.main()
mock 现在已经成为标准库的一部分。 https://pypi.python.org/pypi/mock
这样做可以避免你遇到那些不稳定的测试。你知道的,就是那些随机失败的测试。