Django CQRS数据同步

django-cqrs的Python项目详细描述


Django CQRS公司

pyversionsPyPi StatusDocscodecovBuild StatusPyPI statusQuality Gate Status

django-cqrs是一个Django应用程序,它在几个Django微服务之间实现CQRS数据同步。在

CQRS公司

在Connect中,我们有一个相当复杂的领域模型。有许多微服务,它们是decomposed by subdomain,遵循database-per-service模式。这些微服务具有丰富且一致的api。它们部署在云k8s集群中,并在负载下自动扩展。其中许多服务从其他服务聚合数据,通常API Composition就足够了。但是,有些服务使用API连接的速度太慢,因此需要应用另一种模式。在

解决这个问题的模式称为CQRS - Command Query Responsibility Segregation。这种模式背后的核心思想是,视图数据库(副本)是为高效的查询和数据库连接而定义的。应用程序通过订阅拥有数据的服务发布的Domain events来保持其副本的数据最新。数据是eventually consistent,这对于非关键业务事务来说是可以的。在

文件

完整的文档可从https://django-cqrs.readthedocs.org获得。在

示例

集成

  • 设置RabbitMQ
  • 安装django-cqrs
  • 根据RabbitMQ设置对主服务应用更改
# models.pyfromdjango.dbimportmodelsfromdj_cqrs.mixinsimportMasterMixin,RawMasterMixinclassAccount(MasterMixin,models.Model):CQRS_ID='account'CQRS_PRODUCE=True# set this to False to prevent sending instances to TransportclassAuthor(MasterMixin,models.Model):CQRS_ID='author'CQRS_SERIALIZER='app.api.AuthorSerializer'# For cases of Diamond Multiinheritance the following approach could be used:frommptt.modelsimportMPTTModelfromdj_cqrs.metasimportMasterMetaclassComplexInheritanceModel(MPTTModel,RawMasterMixin):passMasterMeta.register(ComplexInheritanceModel)
^{pr2}$
  • 根据RabbitMQ设置对副本服务应用更改
fromdjango.dbimportmodelsfromdj_cqrs.mixinsimportReplicaMixinclassAccountRef(ReplicaMixin,models.Model):CQRS_ID='account'id=models.IntegerField(primary_key=True)classAuthorRef(ReplicaMixin,models.Model):CQRS_ID='author'CQRS_CUSTOM_SERIALIZATION=True@classmethoddefcqrs_create(cls,sync,**mapped_data):# Override herepassdefcqrs_update(self,sync,**mapped_data):# Override herepass
# settings.pyCQRS={'transport':'dj_cqrs.transport.RabbitMQTransport','queue':'account_replica','host':RABBITMQ_HOST,'port':RABBITMQ_PORT,'user':RABBITMQ_USERNAME,'password':RABBITMQ_PASSWORD,}
  • 在两个服务上应用迁移
  • 在副本服务上运行使用者工作线程。管理命令:python manage.py cqrs_consume -w 2

注释

当CQRS_序列化程序中存在具有相关实体的主模型时,在原子事务中进行操作是很重要的。 CQRS同步将在事务提交时发生。请避免在事务中多次保存主模型,以减少同步和副本端的潜在竞争。 相关模型的更新不会触发主模型的CQRS自动同步。这需要手动完成。在

示例:

withtransaction.atomic():publisher=models.Publisher.objects.create(id=1,name='publisher')author=models.Author.objects.create(id=1,name='author',publisher=publisher)withtransaction.atomic():publisher.name='new'publisher.save()author.save()

当只需要同步所需的实例时,有一个方法is_sync_instance来设置过滤规则。 重要的是要明白,CQRS计数即使在没有同步的情况下也能工作,并且每次更新模型时都会应用规则。在

示例:

classFilteredSimplestModel(MasterMixin,models.Model):CQRS_ID='filter'name=models.CharField(max_length=200)defis_sync_instance(self):returnlen(str(self.name))>2

公用事业

无传输的大容量同步器(用法示例:可用于初始配置)。可在计划停机时使用。在

  • 在主服务上:python manage.py cqrs_bulk_dump --cqrs-id=author->;author.dump
  • 在副本服务上:python manage.py cqrs_bulk_load -i=author.dump

过滤传输上的同步器(用法示例:将某些特定记录同步到给定副本)。可以动态使用。在

  • 要同步所有副本:python manage.py cqrs_sync --cqrs-id=author -f={"id__in": [1, 2]}
  • 要将所有实例仅与一个副本同步:python manage.py cqrs_sync --cqrs-id=author -f={} -q=replica

差异同步工具集()

  • 在K8S中获取差异并同步主服务和副本服务:
kubectl exec -i MASTER_CONTAINER -- python manage.py cqrs_diff_master --cqrs-id=author | 
    kubectl exec -i REPLICA_CONTAINER -- python manage.py cqrs_diff_replica |
    kubectl exec -i MASTER_CONTAINER -- python manage.py cqrs_diff_sync

发展

  1. Python 3.6+
  2. 安装依赖项requirements/dev.txt

测试

单元测试

  1. Python 3.6+
  2. 安装依赖项requirements/test.txt
  3. export PYTHONPATH=/your/path/to/django-cqrs/

检查代码样式:flake8 运行测试:pytest

测试报告在tests/reports中生成。在

  • out.xml-JUnit测试结果
  • coverage.xml-覆盖率xml结果

要生成HTML覆盖率报告,请使用: --cov-report html:tests/reports/cov_html

整合测试

  1. docker撰写
  2. cd integration_tests
  3. docker-compose run master

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
java使用ContentExchange设置请求属性   java Spark/Hdfs/Hdfsclient兼容性   java springcloudstreamkafka配置:instanceCount和instanceIndex   Java中web服务序列化日期   java用动态数据替换占位符   java git gc似乎覆盖了一个packfile,留下了一个打开的文件描述符,其中包含对“oldxxx.pack”的引用   为什么Apache项目对Java版本敏感?   java Anylogic帮助如何在导入的3dobject通过输送机上的多个“站”时更改其颜色?   JavaEclipseNeonM2E可以导入一个大型项目,但似乎不能自动解决依赖关系   java@FindBy搜索具有满足条件的子元素的元素   java如何将ActionEvent e与键绑定一起使用?   java转换以集中方式从外部库抛出的异常   java中用户文件/数据文件与系统/程序文件的区别   java使用变量字符串或字符作为对象名   字体使用Java图形操纵字符串中每个字符的形状   JavaFX图表移动数据   java RandomAccessFile:将所有项设置为相同的字节数?   java Google Play inapp Billing onPurchasesUpdated()错误响应代码1   java在不知道属性名和属性数的情况下处理json对象   java是否可以一次从HazelcastInstance(映射和列表)中删除所有数据?