Django CQRS数据同步
django-cqrs的Python项目详细描述
Django CQRS公司
django-cqrs
是一个Django应用程序,它在几个Django微服务之间实现CQRS数据同步。在
CQRS公司
在Connect中,我们有一个相当复杂的领域模型。有许多微服务,它们是decomposed by subdomain,遵循database-per-service模式。这些微服务具有丰富且一致的api。它们部署在云k8s集群中,并在负载下自动扩展。其中许多服务从其他服务聚合数据,通常API Composition就足够了。但是,有些服务使用API连接的速度太慢,因此需要应用另一种模式。在
解决这个问题的模式称为CQRS - Command Query Responsibility Segregation。这种模式背后的核心思想是,视图数据库(副本)是为高效的查询和数据库连接而定义的。应用程序通过订阅拥有数据的服务发布的Domain events来保持其副本的数据最新。数据是eventually consistent,这对于非关键业务事务来说是可以的。在
文件
完整的文档可从https://django-cqrs.readthedocs.org获得。在
示例
集成
- 设置
RabbitMQ
- 安装
django-cqrs
- 根据RabbitMQ设置对主服务应用更改
# models.pyfromdjango.dbimportmodelsfromdj_cqrs.mixinsimportMasterMixin,RawMasterMixinclassAccount(MasterMixin,models.Model):CQRS_ID='account'CQRS_PRODUCE=True# set this to False to prevent sending instances to TransportclassAuthor(MasterMixin,models.Model):CQRS_ID='author'CQRS_SERIALIZER='app.api.AuthorSerializer'# For cases of Diamond Multiinheritance the following approach could be used:frommptt.modelsimportMPTTModelfromdj_cqrs.metasimportMasterMetaclassComplexInheritanceModel(MPTTModel,RawMasterMixin):passMasterMeta.register(ComplexInheritanceModel)^{pr2}$
- 根据RabbitMQ设置对副本服务应用更改
fromdjango.dbimportmodelsfromdj_cqrs.mixinsimportReplicaMixinclassAccountRef(ReplicaMixin,models.Model):CQRS_ID='account'id=models.IntegerField(primary_key=True)classAuthorRef(ReplicaMixin,models.Model):CQRS_ID='author'CQRS_CUSTOM_SERIALIZATION=True@classmethoddefcqrs_create(cls,sync,**mapped_data):# Override herepassdefcqrs_update(self,sync,**mapped_data):# Override herepass
# settings.pyCQRS={'transport':'dj_cqrs.transport.RabbitMQTransport','queue':'account_replica','host':RABBITMQ_HOST,'port':RABBITMQ_PORT,'user':RABBITMQ_USERNAME,'password':RABBITMQ_PASSWORD,}
- 在两个服务上应用迁移
- 在副本服务上运行使用者工作线程。管理命令:
python manage.py cqrs_consume -w 2
注释
当CQRS_序列化程序中存在具有相关实体的主模型时,在原子事务中进行操作是很重要的。 CQRS同步将在事务提交时发生。请避免在事务中多次保存主模型,以减少同步和副本端的潜在竞争。 相关模型的更新不会触发主模型的CQRS自动同步。这需要手动完成。在
示例:
withtransaction.atomic():publisher=models.Publisher.objects.create(id=1,name='publisher')author=models.Author.objects.create(id=1,name='author',publisher=publisher)withtransaction.atomic():publisher.name='new'publisher.save()author.save()
当只需要同步所需的实例时,有一个方法is_sync_instance
来设置过滤规则。
重要的是要明白,CQRS计数即使在没有同步的情况下也能工作,并且每次更新模型时都会应用规则。在
示例:
classFilteredSimplestModel(MasterMixin,models.Model):CQRS_ID='filter'name=models.CharField(max_length=200)defis_sync_instance(self):returnlen(str(self.name))>2
公用事业
无传输的大容量同步器(用法示例:可用于初始配置)。可在计划停机时使用。在
- 在主服务上:
python manage.py cqrs_bulk_dump --cqrs-id=author
->;author.dump
- 在副本服务上:
python manage.py cqrs_bulk_load -i=author.dump
过滤传输上的同步器(用法示例:将某些特定记录同步到给定副本)。可以动态使用。在
- 要同步所有副本:
python manage.py cqrs_sync --cqrs-id=author -f={"id__in": [1, 2]}
- 要将所有实例仅与一个副本同步:
python manage.py cqrs_sync --cqrs-id=author -f={} -q=replica
差异同步工具集()
- 在K8S中获取差异并同步主服务和副本服务:
kubectl exec -i MASTER_CONTAINER -- python manage.py cqrs_diff_master --cqrs-id=author | kubectl exec -i REPLICA_CONTAINER -- python manage.py cqrs_diff_replica | kubectl exec -i MASTER_CONTAINER -- python manage.py cqrs_diff_sync
发展
- Python 3.6+
- 安装依赖项
requirements/dev.txt
测试
单元测试
- Python 3.6+
- 安装依赖项
requirements/test.txt
export PYTHONPATH=/your/path/to/django-cqrs/
检查代码样式:flake8
运行测试:pytest
测试报告在tests/reports
中生成。在
out.xml
-JUnit测试结果coverage.xml
-覆盖率xml结果
要生成HTML覆盖率报告,请使用:
--cov-report html:tests/reports/cov_html
整合测试
- docker撰写
cd integration_tests
docker-compose run master
- 项目
标签: