Django查询未返回我刚刚保存的实例
我正在做一个项目,需要追踪与某个过程相关的事件。在我的案例中,我有两个模型:Registration
和 RegistrationEvent
,后者通过外键与 Registration
连接。
我还写了一个 RegistrationEvent
的方法,叫做 _ensure_correct_flow_of_events
,这个方法可以防止以不合理的顺序添加事件,并在调用 model.save
时被触发。实际上,事件的顺序必须是 SIGNED -> STARTED -> SUCCESS -> CERTIFICATE_ISSUED
。在任何时候,事件 CANCELED
都可能发生。为了评估事件的顺序,这个方法会调用另一个方法 _get_previous_event
,这个方法返回最后一个注册到 Registration
的事件。
在创建了 SUCCESS
事件后,save
方法会调用 Registration.threaded_issue_certificate
,这是一个应该创建证书的方法,然后在一个新线程中创建 CERTIFICATE_ISSUED
事件,以便快速处理响应。问题是,当 CERTIFICATE_ISSUED
即将被创建时,_ensure_correct_flow_of_events
和 _get_previous_event
被调用,此时 _get_previous_event
并没有返回刚刚创建的 SUCCESS
事件,而是返回了之前的 STARTED
事件。
我的日志是
Checking correct flow, previous event: Course started - admin registration id: 1 current event_type: 3
Checking correct flow, previous event: Course started - admin registration id: 1 current event_type: 4
Exception in thread Thread-2 (threaded_issue_certificate):
Traceback (most recent call last):
File "/Users/zenodallavalle/miniconda3/lib/python3.10/threading.py", line 1016, in _bootstrap_inner
self.run()
File "/Users/zenodallavalle/miniconda3/lib/python3.10/threading.py", line 953, in run
[21/Mar/2024 15:16:41] "POST /admin/main/registrationevent/add/ HTTP/1.1" 302 0
self._target(*self._args, **self._kwargs)
File "/Users/zenodallavalle/Downloads/test/main/models.py", line 79, in threaded_issue_certificate
return self.issue_certificate()
File "/Users/zenodallavalle/Downloads/test/main/models.py", line 84, in issue_certificate
RegistrationEvent.objects.create(
File "/Users/zenodallavalle/Downloads/test/env/lib/python3.10/site-packages/django/db/models/manager.py", line 87, in manager_method
return getattr(self.get_queryset(), name)(*args, **kwargs)
File "/Users/zenodallavalle/Downloads/test/env/lib/python3.10/site-packages/django/db/models/query.py", line 679, in create
obj.save(force_insert=True, using=self.db)
File "/Users/zenodallavalle/Downloads/test/main/models.py", line 175, in save
self._ensure_correct_flow_of_events(is_new=is_new)
File "/Users/zenodallavalle/Downloads/test/main/models.py", line 155, in _ensure_correct_flow_of_events
raise ValueError(
ValueError: After started next event must be 'success' or 'canceled'
这是为什么呢?
我在这里留下我的 models.py
,以便复现这个行为。
from django.db import models
from django.contrib.auth.models import User
from logging import getLogger
from main.utils import make_thread
logger = getLogger(__name__)
import threading
def make_thread(fn):
def _make_thread(*args, **kwargs):
thread = threading.Thread(target=fn, args=args, kwargs=kwargs)
thread.start()
return thread
return _make_thread
class Registration(models.Model):
course_user = models.ForeignKey(
User,
on_delete=models.CASCADE,
related_name="registrations",
)
created_by = models.ForeignKey(
User,
null=True,
on_delete=models.SET_NULL,
)
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
def _check_user_not_signed_for_other_courses(self):
other_registrations = self.course_user.registrations.exclude(pk=self.pk)
if any([not r.ended for r in other_registrations]):
raise ValueError("User already signed for another course")
def _ensure_created_by_is_not_null(self):
if self.created_by is None:
raise ValueError("Created by is null")
def save(self, *args, **kwargs) -> None:
is_new = self._state.adding
if is_new:
self._ensure_created_by_is_not_null()
self._check_user_not_signed_for_other_courses()
ret = super().save(*args, **kwargs)
if is_new:
RegistrationEvent.objects.create(
course_registration=self,
event_type=RegistrationEvent.EventType.SIGNED,
)
return ret
def __str__(self):
return f"{self.course_user} registration id: {self.pk}"
def __repr__(self):
return f"<Registration: {self.course_user} registration id: {self.pk}>"
@property
def ended(self):
return self.events.filter(
event_type__in=(
RegistrationEvent.EventType.CERTIFICATE_ISSUED,
RegistrationEvent.EventType.CANCELED,
RegistrationEvent.EventType.FAILED,
)
).exists()
@property
def last_event(self):
return self.events.order_by("-created_at").first()
@make_thread
def threaded_issue_certificate(self):
return self.issue_certificate()
def issue_certificate(self):
# Do something here
# Register it as an event
RegistrationEvent.objects.create(
course_registration=self,
event_type=RegistrationEvent.EventType.CERTIFICATE_ISSUED,
)
class RegistrationEvent(models.Model):
class EventType(models.IntegerChoices):
SIGNED = 1, "Signed up"
STARTED = 2, "Course started"
SUCCESS = 3, "Course success"
CERTIFICATE_ISSUED = 4, "Certificate issued"
CANCELED = 5, "Cancelled"
FAILED = 6, "Course failed"
course_registration = models.ForeignKey(
Registration,
on_delete=models.CASCADE,
related_name="events",
)
event_type = models.IntegerField(choices=EventType.choices)
created_at = models.DateTimeField(auto_now_add=True, verbose_name="Creato il")
updated_at = models.DateTimeField(auto_now=True, verbose_name="Aggiornato il")
@property
def event_type_description(self):
return self.EventType(self.event_type).label
def __str__(self):
return f"{self.event_type_description} - {self.course_registration}"
def __repr__(self):
return f"<RegistrationEvent: {self.event_type_description} - {self.course_registration}>"
def _get_previous_event(self, is_new):
qs = self.course_registration.events.all()
if not is_new:
qs = qs.exclude(created_at__gte=self.created_at)
return qs.order_by("-created_at").first()
def _ensure_correct_flow_of_events(self, is_new):
# If the course is completed (certificate issued or cancelled), no more events can be added
if is_new:
if self.course_registration.ended:
raise ValueError(
"Il corso è completato, non è possibile aggiungere eventi"
)
if self.event_type == self.EventType.CANCELED:
return # No further checks needed
previous_event = self._get_previous_event(is_new=is_new)
print(
"Checking correct flow, previous event:",
previous_event,
"current event_type:",
self.event_type,
)
if not previous_event:
if self.event_type != self.EventType.SIGNED:
raise ValueError("First event must be 'signed'")
elif previous_event.event_type == self.EventType.SIGNED:
if self.event_type != self.EventType.STARTED:
raise ValueError("After signed next event must be 'started'")
elif previous_event.event_type == self.EventType.STARTED:
if self.event_type not in (
self.EventType.SUCCESS,
self.EventType.CANCELED,
):
raise ValueError(
"After started next event must be 'success' or 'canceled'"
)
elif previous_event.event_type == self.EventType.SUCCESS:
if self.event_type != self.EventType.CERTIFICATE_ISSUED:
raise ValueError(
"After success next event must be 'certificate issued'"
)
def _issue_certificate_if_needed(self, is_new):
if not is_new:
return
if not self.event_type == self.EventType.SUCCESS:
return
self.course_registration.threaded_issue_certificate()
def save(self, *args, **kwargs):
is_new = self._state.adding
if is_new:
self._ensure_correct_flow_of_events(is_new=is_new)
ret = super().save(*args, **kwargs)
self._issue_certificate_if_needed(is_new=is_new)
return ret
1 个回答
这个问题在Django中很常见,主要是因为它处理多线程操作和时间的方式。具体来说,这个问题出现在当一个新的线程为CERTIFICATE_ISSUED事件启动时,数据库的SUCCESS事件还没有完全更新。Django提供了一些解决方案,我发现最好的办法是使用post_save和receiver,这样在处理SUCCESS事件时,可以自动发放证书。
需要导入的内容有:
from django.db.models.signals import post_save
from django.dispatch import receiver
实现的方法大概是这样的:
@receiver(post_save, sender=RegistrationEvent)
def issue_certificate_if_success(sender, instance, created, **kwargs):
if created and instance.event_type == RegistrationEvent.EventType.SUCCESS:
instance.course_registration.threaded_issue_certificate()
如果你不想采用这种方法,还有其他的解决方案。我也见过使用事务模块的办法。