Django查询未返回我刚刚保存的实例

0 投票
1 回答
121 浏览
提问于 2025-04-13 12:46

我正在做一个项目,需要追踪与某个过程相关的事件。在我的案例中,我有两个模型:RegistrationRegistrationEvent,后者通过外键与 Registration 连接。

我还写了一个 RegistrationEvent 的方法,叫做 _ensure_correct_flow_of_events,这个方法可以防止以不合理的顺序添加事件,并在调用 model.save 时被触发。实际上,事件的顺序必须是 SIGNED -> STARTED -> SUCCESS -> CERTIFICATE_ISSUED。在任何时候,事件 CANCELED 都可能发生。为了评估事件的顺序,这个方法会调用另一个方法 _get_previous_event,这个方法返回最后一个注册到 Registration 的事件。

在创建了 SUCCESS 事件后,save 方法会调用 Registration.threaded_issue_certificate,这是一个应该创建证书的方法,然后在一个新线程中创建 CERTIFICATE_ISSUED 事件,以便快速处理响应。问题是,当 CERTIFICATE_ISSUED 即将被创建时,_ensure_correct_flow_of_events_get_previous_event 被调用,此时 _get_previous_event 并没有返回刚刚创建的 SUCCESS 事件,而是返回了之前的 STARTED 事件。

我的日志是

Checking correct flow, previous event: Course started - admin registration id: 1 current event_type: 3
Checking correct flow, previous event: Course started - admin registration id: 1 current event_type: 4
Exception in thread Thread-2 (threaded_issue_certificate):
Traceback (most recent call last):
  File "/Users/zenodallavalle/miniconda3/lib/python3.10/threading.py", line 1016, in _bootstrap_inner
    self.run()
  File "/Users/zenodallavalle/miniconda3/lib/python3.10/threading.py", line 953, in run
[21/Mar/2024 15:16:41] "POST /admin/main/registrationevent/add/ HTTP/1.1" 302 0
    self._target(*self._args, **self._kwargs)
  File "/Users/zenodallavalle/Downloads/test/main/models.py", line 79, in threaded_issue_certificate
    return self.issue_certificate()
  File "/Users/zenodallavalle/Downloads/test/main/models.py", line 84, in issue_certificate
    RegistrationEvent.objects.create(
  File "/Users/zenodallavalle/Downloads/test/env/lib/python3.10/site-packages/django/db/models/manager.py", line 87, in manager_method
    return getattr(self.get_queryset(), name)(*args, **kwargs)
  File "/Users/zenodallavalle/Downloads/test/env/lib/python3.10/site-packages/django/db/models/query.py", line 679, in create
    obj.save(force_insert=True, using=self.db)
  File "/Users/zenodallavalle/Downloads/test/main/models.py", line 175, in save
    self._ensure_correct_flow_of_events(is_new=is_new)
  File "/Users/zenodallavalle/Downloads/test/main/models.py", line 155, in _ensure_correct_flow_of_events
    raise ValueError(
ValueError: After started next event must be 'success' or 'canceled'

这是为什么呢?

我在这里留下我的 models.py,以便复现这个行为。

from django.db import models
from django.contrib.auth.models import User
from logging import getLogger

from main.utils import make_thread

logger = getLogger(__name__)
import threading


def make_thread(fn):
    def _make_thread(*args, **kwargs):
        thread = threading.Thread(target=fn, args=args, kwargs=kwargs)
        thread.start()
        return thread

    return _make_thread


class Registration(models.Model):
    course_user = models.ForeignKey(
        User,
        on_delete=models.CASCADE,
        related_name="registrations",
    )

    created_by = models.ForeignKey(
        User,
        null=True,
        on_delete=models.SET_NULL,
    )
    created_at = models.DateTimeField(auto_now_add=True)
    updated_at = models.DateTimeField(auto_now=True)

    def _check_user_not_signed_for_other_courses(self):
        other_registrations = self.course_user.registrations.exclude(pk=self.pk)
        if any([not r.ended for r in other_registrations]):
            raise ValueError("User already signed for another course")

    def _ensure_created_by_is_not_null(self):
        if self.created_by is None:
            raise ValueError("Created by is null")

    def save(self, *args, **kwargs) -> None:
        is_new = self._state.adding
        if is_new:
            self._ensure_created_by_is_not_null()
            self._check_user_not_signed_for_other_courses()
        ret = super().save(*args, **kwargs)
        if is_new:
            RegistrationEvent.objects.create(
                course_registration=self,
                event_type=RegistrationEvent.EventType.SIGNED,
            )
        return ret

    def __str__(self):
        return f"{self.course_user} registration id: {self.pk}"

    def __repr__(self):
        return f"<Registration: {self.course_user} registration id: {self.pk}>"

    @property
    def ended(self):
        return self.events.filter(
            event_type__in=(
                RegistrationEvent.EventType.CERTIFICATE_ISSUED,
                RegistrationEvent.EventType.CANCELED,
                RegistrationEvent.EventType.FAILED,
            )
        ).exists()

    @property
    def last_event(self):
        return self.events.order_by("-created_at").first()

    @make_thread
    def threaded_issue_certificate(self):
        return self.issue_certificate()

    def issue_certificate(self):
        # Do something here
        # Register it as an event
        RegistrationEvent.objects.create(
            course_registration=self,
            event_type=RegistrationEvent.EventType.CERTIFICATE_ISSUED,
        )


class RegistrationEvent(models.Model):
    class EventType(models.IntegerChoices):
        SIGNED = 1, "Signed up"
        STARTED = 2, "Course started"
        SUCCESS = 3, "Course success"
        CERTIFICATE_ISSUED = 4, "Certificate issued"
        CANCELED = 5, "Cancelled"
        FAILED = 6, "Course failed"

    course_registration = models.ForeignKey(
        Registration,
        on_delete=models.CASCADE,
        related_name="events",
    )
    event_type = models.IntegerField(choices=EventType.choices)

    created_at = models.DateTimeField(auto_now_add=True, verbose_name="Creato il")
    updated_at = models.DateTimeField(auto_now=True, verbose_name="Aggiornato il")

    @property
    def event_type_description(self):
        return self.EventType(self.event_type).label

    def __str__(self):
        return f"{self.event_type_description} - {self.course_registration}"

    def __repr__(self):
        return f"<RegistrationEvent: {self.event_type_description} - {self.course_registration}>"

    def _get_previous_event(self, is_new):
        qs = self.course_registration.events.all()
        if not is_new:
            qs = qs.exclude(created_at__gte=self.created_at)
        return qs.order_by("-created_at").first()

    def _ensure_correct_flow_of_events(self, is_new):
        # If the course is completed (certificate issued or cancelled), no more events can be added
        if is_new:
            if self.course_registration.ended:
                raise ValueError(
                    "Il corso è completato, non è possibile aggiungere eventi"
                )

        if self.event_type == self.EventType.CANCELED:
            return  # No further checks needed

        previous_event = self._get_previous_event(is_new=is_new)
        print(
            "Checking correct flow, previous event:",
            previous_event,
            "current event_type:",
            self.event_type,
        )

        if not previous_event:
            if self.event_type != self.EventType.SIGNED:
                raise ValueError("First event must be 'signed'")
        elif previous_event.event_type == self.EventType.SIGNED:
            if self.event_type != self.EventType.STARTED:
                raise ValueError("After signed next event must be 'started'")
        elif previous_event.event_type == self.EventType.STARTED:
            if self.event_type not in (
                self.EventType.SUCCESS,
                self.EventType.CANCELED,
            ):
                raise ValueError(
                    "After started next event must be 'success' or 'canceled'"
                )
        elif previous_event.event_type == self.EventType.SUCCESS:
            if self.event_type != self.EventType.CERTIFICATE_ISSUED:
                raise ValueError(
                    "After success next event must be 'certificate issued'"
                )

    def _issue_certificate_if_needed(self, is_new):
        if not is_new:
            return
        if not self.event_type == self.EventType.SUCCESS:
            return
        self.course_registration.threaded_issue_certificate()

    def save(self, *args, **kwargs):
        is_new = self._state.adding

        if is_new:
            self._ensure_correct_flow_of_events(is_new=is_new)

        ret = super().save(*args, **kwargs)

        self._issue_certificate_if_needed(is_new=is_new)
        return ret

1 个回答

0

这个问题在Django中很常见,主要是因为它处理多线程操作和时间的方式。具体来说,这个问题出现在当一个新的线程为CERTIFICATE_ISSUED事件启动时,数据库的SUCCESS事件还没有完全更新。Django提供了一些解决方案,我发现最好的办法是使用post_save和receiver,这样在处理SUCCESS事件时,可以自动发放证书。

需要导入的内容有:

from django.db.models.signals import post_save
from django.dispatch import receiver

实现的方法大概是这样的:

@receiver(post_save, sender=RegistrationEvent)
def issue_certificate_if_success(sender, instance, created, **kwargs):
     if created and instance.event_type == RegistrationEvent.EventType.SUCCESS:
         instance.course_registration.threaded_issue_certificate()

如果你不想采用这种方法,还有其他的解决方案。我也见过使用事务模块的办法。

撰写回答