Django CQRS数据同步

django-cqrs的Python项目详细描述


Django CQRS公司

pyversionsPyPi StatusDocscodecovBuild StatusPyPI statusQuality Gate Status

django-cqrs是一个Django应用程序,它在几个Django微服务之间实现CQRS数据同步。在

CQRS公司

在Connect中,我们有一个相当复杂的领域模型。有许多微服务,它们是decomposed by subdomain,遵循database-per-service模式。这些微服务具有丰富且一致的api。它们部署在云k8s集群中,并在负载下自动扩展。其中许多服务从其他服务聚合数据,通常API Composition就足够了。但是,有些服务使用API连接的速度太慢,因此需要应用另一种模式。在

解决这个问题的模式称为CQRS - Command Query Responsibility Segregation。这种模式背后的核心思想是,视图数据库(副本)是为高效的查询和数据库连接而定义的。应用程序通过订阅拥有数据的服务发布的Domain events来保持其副本的数据最新。数据是eventually consistent,这对于非关键业务事务来说是可以的。在

文件

完整的文档可从https://django-cqrs.readthedocs.org获得。在

示例

集成

  • 设置RabbitMQ
  • 安装django-cqrs
  • 根据RabbitMQ设置对主服务应用更改
# models.pyfromdjango.dbimportmodelsfromdj_cqrs.mixinsimportMasterMixin,RawMasterMixinclassAccount(MasterMixin,models.Model):CQRS_ID='account'CQRS_PRODUCE=True# set this to False to prevent sending instances to TransportclassAuthor(MasterMixin,models.Model):CQRS_ID='author'CQRS_SERIALIZER='app.api.AuthorSerializer'# For cases of Diamond Multiinheritance the following approach could be used:frommptt.modelsimportMPTTModelfromdj_cqrs.metasimportMasterMetaclassComplexInheritanceModel(MPTTModel,RawMasterMixin):passMasterMeta.register(ComplexInheritanceModel)
^{pr2}$
  • 根据RabbitMQ设置对副本服务应用更改
fromdjango.dbimportmodelsfromdj_cqrs.mixinsimportReplicaMixinclassAccountRef(ReplicaMixin,models.Model):CQRS_ID='account'id=models.IntegerField(primary_key=True)classAuthorRef(ReplicaMixin,models.Model):CQRS_ID='author'CQRS_CUSTOM_SERIALIZATION=True@classmethoddefcqrs_create(cls,sync,**mapped_data):# Override herepassdefcqrs_update(self,sync,**mapped_data):# Override herepass
# settings.pyCQRS={'transport':'dj_cqrs.transport.RabbitMQTransport','queue':'account_replica','host':RABBITMQ_HOST,'port':RABBITMQ_PORT,'user':RABBITMQ_USERNAME,'password':RABBITMQ_PASSWORD,}
  • 在两个服务上应用迁移
  • 在副本服务上运行使用者工作线程。管理命令:python manage.py cqrs_consume -w 2

注释

当CQRS_序列化程序中存在具有相关实体的主模型时,在原子事务中进行操作是很重要的。 CQRS同步将在事务提交时发生。请避免在事务中多次保存主模型,以减少同步和副本端的潜在竞争。 相关模型的更新不会触发主模型的CQRS自动同步。这需要手动完成。在

示例:

withtransaction.atomic():publisher=models.Publisher.objects.create(id=1,name='publisher')author=models.Author.objects.create(id=1,name='author',publisher=publisher)withtransaction.atomic():publisher.name='new'publisher.save()author.save()

当只需要同步所需的实例时,有一个方法is_sync_instance来设置过滤规则。 重要的是要明白,CQRS计数即使在没有同步的情况下也能工作,并且每次更新模型时都会应用规则。在

示例:

classFilteredSimplestModel(MasterMixin,models.Model):CQRS_ID='filter'name=models.CharField(max_length=200)defis_sync_instance(self):returnlen(str(self.name))>2

公用事业

无传输的大容量同步器(用法示例:可用于初始配置)。可在计划停机时使用。在

  • 在主服务上:python manage.py cqrs_bulk_dump --cqrs-id=author->;author.dump
  • 在副本服务上:python manage.py cqrs_bulk_load -i=author.dump

过滤传输上的同步器(用法示例:将某些特定记录同步到给定副本)。可以动态使用。在

  • 要同步所有副本:python manage.py cqrs_sync --cqrs-id=author -f={"id__in": [1, 2]}
  • 要将所有实例仅与一个副本同步:python manage.py cqrs_sync --cqrs-id=author -f={} -q=replica

差异同步工具集()

  • 在K8S中获取差异并同步主服务和副本服务:
kubectl exec -i MASTER_CONTAINER -- python manage.py cqrs_diff_master --cqrs-id=author | 
    kubectl exec -i REPLICA_CONTAINER -- python manage.py cqrs_diff_replica |
    kubectl exec -i MASTER_CONTAINER -- python manage.py cqrs_diff_sync

发展

  1. Python 3.6+
  2. 安装依赖项requirements/dev.txt

测试

单元测试

  1. Python 3.6+
  2. 安装依赖项requirements/test.txt
  3. export PYTHONPATH=/your/path/to/django-cqrs/

检查代码样式:flake8 运行测试:pytest

测试报告在tests/reports中生成。在

  • out.xml-JUnit测试结果
  • coverage.xml-覆盖率xml结果

要生成HTML覆盖率报告,请使用: --cov-report html:tests/reports/cov_html

整合测试

  1. docker撰写
  2. cd integration_tests
  3. docker-compose run master

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
java Kafka producer大量内存使用(泄漏?)   java NullPointerException。。。正在插入数据但无法检索数据[Mysql DB]   java spring+jpa+hibernate=没有可用于当前线程的实际事务的EntityManager无法可靠地处理“persist”调用   getelementbyid在没有ID的情况下如何在java中使用GetElementsById   java有没有一种使用WatchService强制轮询的方法?   java将值从jframe传递给另一个jframe并使用它   Java/Groovy中带重试的反应式事件处理   具有两个包装器元素的java Jackson XML ArrayList输出   java总是在范围内使用不同的随机元素   取消选择java下拉列表值   多线程如何在Java中为对象的不同成员拥有不同的同步块   java如何使用多线程从文本文件中读取输入   java Spring启动附加崩溃命令   java使用公共或单独的actionPerfomed方法有什么区别   java用Spring3.0中的SpEL替换JSP中的EL   java作为windows服务运行应用程序时无法访问共享文件夹   java xml 1.1规范中的“解析数据”是什么意思?   以编程方式设置JComboBox索引时java触发ItemListener   java Android WebView:只加载HTML,不加载JS或CSS(在某些设备中)   Java:计算do/while循环的数量