使用信号和Celery-django的Django工作流引擎

3 投票
1 回答
2018 浏览
提问于 2025-04-16 09:33

我正在我的项目中实现一个工作流引擎,主要目的是创建一个可移植的应用程序。也就是说,我希望将来能把它放到任何其他项目中,然后把工作流连接到我项目中的不同模型上,让它正常工作。

我尝试过想一个方法,但感觉这个设置并不是最理想的。我想在我的项目里创建一个工作流应用,连接两种模型,一种模型包含工作流的设置(比如工作流、步骤、动作),另一种模型则包含实例或交易。

下面是我的工作流模型代码:

from django.db import models
from django.contrib.contenttypes.models import ContentType
from django.contrib.contenttypes import generic
import signals


################################
#   Workflow engine models
################################

class Signal_Listener(models.Model):
    LISTENING_ON = (
    ('I', 'INSERT'),
    ('U', 'UPDATE'),
    ('D', 'DELETE'),
    ('S', 'SELECT'),
    ('A', 'ANY QUERY'),
    ('L', 'ANY DDL'),
    )

    table_name = models.CharField(max_length=100)
    listening_to = models.CharField(max_length=1, choices=LISTENING_ON)

    class Meta:
        unique_together = (("table_name", "listening_to"),)

    def __unicode__(self):
        return '%s - %s' % (self.table_name, self.listening_to)

class Action(models.Model):
    ACTION_TYPE_CHOICES = (
    ('P', 'Python Script'   ),
    ('C', 'Class name'      ),
    )
    name = models.CharField(max_length=100)
    action_type = models.CharField(max_length=1, choices=ACTION_TYPE_CHOICES)
    audit_obj = generic.GenericRelation('core.Audit', editable=False)

class Steps(models.Model):
    sequence = models.IntegerField() 
    Action = models.ForeignKey(Action)
    Signal_Listener = models.ForeignKey(Signal_Listener)

class Process(models.Model):
## TODO: Document
# Processes class is used to store information about the process itself.
# Or in another word, the workflow name.
    WF_TYPE_LIST = (
    ('P', 'Python-API'),
    )

    name = models.CharField(max_length=30)
    is_active = models.BooleanField()
    wf_type = models.CharField(max_length=1, choices=WF_TYPE_LIST)
    audit_obj = generic.GenericRelation('core.Audit', editable=False)
    listening_to = models.ForeignKey(Steps)



################################
#   Workflow transactions models
################################

class Instance(models.Model):
##  TODO: Document
# Re
    INSTANCE_STATUS = (
    ('I', 'In Progress' ),
    ('C', 'Cancelled'   ),
    ('A', 'Archived'     ), # Old completed tasks can be archived
    ('P', 'Pending'     ),
    ('O', 'Completed'   )
    )

    id = models.CharField(max_length=200, primary_key=True)
    status = models.CharField(max_length=1, choices=INSTANCE_STATUS, db_index=True)
    audit_obj = generic.GenericRelation('core.Audit', editable=False)

    def save(self, *args, **kwargs):
    # on new records generate a new uuid
        if self.id is None or self.id.__len__() is 0:
            import uuid

            self.id = uuid.uuid4().__str__()
        super(Instances, self).save(*args, **kwargs)

class Task(models.Model):
    TASK_STATUS = (
    ('S', 'Assigned'    ),
    ('I', 'In Progress' ),
    ('P', 'Pending'     ),
    ('C', 'Cancelled'   ),
    ('A', 'Archived'    ), # Old completed tasks can be archived
    ('O', 'Completed'   )
    )
    name = models.CharField(max_length=100)
    instance = models.ForeignKey(Instance)
    status = models.CharField(max_length=1, choices=TASK_STATUS)
    bio = models.CharField(max_length=100)
    audit_obj = generic.GenericRelation('core.Audit', editable=False)

我还有一个工作流信号的代码:

    """
        Workflow Signals
            Will be used to monitor all inserts, update, delete or select statements
            If an action is attached to that particular table, it will be inserted Celery-Tasks distribution.
    """
    from django.db.models.signals import post_save, post_delete
    from django.core.cache import cache


    def workflow_post_init_listener(sender, **kwargs):
        try:

            if cache.get('wf_listner_cache_%s' % kwargs['instance']._meta.db_table):
                pass
            else:
                record = 'Signal_Listener'.objects.get(table_name__exact=kwargs['instance']._meta.db_table)
# am not sure what to do next!
        except 'Signal_Listener'.DoesNotExist:
            # TODO: Error logging
            pass

    post_save.connect(workflow_post_init_listener, dispatch_uid="workflow.models.listener.save")

我觉得我的模型设计可能需要改进。我可以在多个场景中使用这个设计,比如我想从审批流程开始。例如,我可以在信号监听器中插入表名或模型名,以监控新的插入记录。如果有新的记录,我就会触发一个特定的工作流。

至于动作,我明白这些动作需要开发。也许我需要在工作流应用下创建一个动作文件夹,把每个动作放在一个类里。每个动作会执行特定的任务,比如发送邮件、归档、更新数据库值等等。

如果我是在重复造轮子,或者已经有类似的东西被开发出来了,欢迎大家推荐,我会很乐意去了解一下。

祝好,

1 个回答

2

比如你可以看看 zope.wfmc(http://pypi.python.org/pypi/zope.wfmc):这是一个工作流管理联盟的实现,工作流可以用 XPDL 来定义。

撰写回答