CQRS
是一种与领域驱动设计和事件溯源相关的架构模式, 它的全称是Command Query Responsibility Segregation, 又叫命令查询职责分离, Greg Young在2010年创造了这个术语, 它是基于Bertrand Meyer 的 CQS (Command-Query Separation 命令查询分离原则) 设计模式。
CQRS
认为不论业务多复杂在最终实现的时候, 无非是读写操作, 因此建议将应用程序分为两个方面, 即Command(命令)和Query(查询)
命令端:
查询端:
命令与读取操作的是同一个数据库, 命令端通过ORM框架将实体保存到数据库中, 查询端通过数据访问层获取数据 (数据访问层通过ORM框架或者存储过程获取数据)
命令与读取操作的是不同的数据库, 命令端通过ORM框架将实体保存到 写库 (Write Db), 并将本地改动推送到 读库 (Read Db), 查询端通过数据访问层访问 读库 (Read Db), 使用这种模式可以带来以下好处:
查询更简单
提升查询端的使用体验
关注点分离
通过事件溯源实现的CQRS
中会将应用程序的改变都以事件的方式存储起来, 使用这种模式可以带来以下好处:
ES
、Redis
等用来存储数据, 提升查询效率当然事情有利自然也有弊, CQRS
的使用固然会带来很多好处, 但同样它也会给项目带来复杂度的提升, 并且双数据库模式、事件溯源模式 的CQRS
, 使用的是最终一致性, 这些都是我们在选择技术方案时必须要考虑的
上述文章中我们了解到了CQRS其本质上是一种读写分离的设计思想, 它并不是强制性的规定必须要怎样去做, 这点与之前的IEvent
(进程内事件、IIntegrationEvent
(跨进程事件不同, 它并不是强制性的, 根据CQRS
的设计模式我们将事件分成Command
、Query
由于Query
(查询) 是需要有返回值的, 因此我们在继承IEvent
的同时, 还额外增加了一个Result
属性用以存储结果, 我们希望将查询的结果保存到Result
中, 但它不是强制性的, 我们并没有强制性要求必须要将结果保存到Result
中。
由于Command
(命令) 是没有返回值的, 因此我们并没有额外新增Result
属性, 我们认为命令会更新数据, 那就需要用到工作单元, 因此Command
除了继承IEvent
之外, 还继承了ITransaction
,这方便了我们在Handler
中的可以通过@event.UnitOfWork
来管理工作单元, 而不需要通过构造函数来获取
但MasaFramework
并没有要求必须使用 Event Sourcing 模式
或者 双数据库模式
的CQRS, 具体使用哪种实现, 它取决于业务的决策者
下面就就来看看MasaFramework
提供的CQRS
是如何使用的
新建ASP.NET Core 空项目Assignment.CqrsDemo
,并安装Masa.Contrib.Dispatcher.Events
,Masa.Contrib.Dispatcher.IntegrationEvents
,Masa.Contrib.Dispatcher.IntegrationEvents.Dapr
,Masa.Contrib.ReadWriteSplitting.Cqrs
,Masa.Contrib.Development.DaprStarter.AspNetCore
dotnet new web -o Assignment.CqrsDemo
cd Assignment.CqrsDemo
dotnet add package Masa.Contrib.Dispatcher.Events --version 0.7.0-preview.9 //使用进程内事件总线
dotnet add package Masa.Contrib.Dispatcher.IntegrationEvents --version 0.7.0-preview.9 //使用跨进程事件总线
dotnet add package Masa.Contrib.Dispatcher.IntegrationEvents.Dapr --version 0.7.0-preview.9 //使用Dapr提供pubsub能力
dotnet add package Masa.Contrib.ReadWriteSplitting.Cqrs --version 0.7.0-preview.9 //使用CQRS
dotnet add package Masa.Contrib.Development.DaprStarter.AspNetCore --version 0.7.0-preview.9 //开发环境下协助 Dapr Sidecar, 用于通过Dapr发布集成事件
注册跨进程事件总线、进程内事件总线, 修改类Program.cs
示例中未真实使用DB, 不再使用发件箱模式, 只需要使用集成事件提供的PubSub
能力即可
builder.Services.AddIntegrationEventBus(dispatcherOptions =>
{
dispatcherOptions.UseDapr();//使用 Dapr 提供的PubSub能力
dispatcherOptions.UseEventBus();//使用进程内事件总线
});
注册Dapr Starter 协助管理Dapr Sidecar
(开发环境使用)
if (builder.Environment.IsDevelopment())
builder.Services.AddDaprStarter();
新增加添加商品方法, 修改类Program.cs
app.MapPost("/goods/add", async (AddGoodsCommand command, IEventBus eventBus) =>
{
await eventBus.PublishAsync(command);
});
///
public record AddGoodsCommand : Command
{
public string Name { get; set; }
public string Cover { get; set; }
public decimal Price { get; set; }
public int Count { get; set; }
}
新增加查询商品的方法, 修改类Program.cs
app.MapGet("/goods/{id}", async (Guid id, IEventBus eventBus) =>
{
var query = new GoodsItemQuery(id);
await eventBus.PublishAsync(query);
return query.Result;
});
///
public record GoodsItemQuery : Query
{
public Guid Id { get; set; } = default!;
public override GoodsItemDto Result { get; set; }
public GoodsItemQuery(Guid id)
{
Id = id;
}
}
///
public class GoodsItemDto
{
public Guid Id { get; set; }
public string Name { get; set; }
public string Cover { get; set; }
public decimal Price { get; set; }
public int Count { get; set; }
public DateTime DateTime { get; set; }
}
新增Command
处理程序, 添加类CommandHandler.cs
public class CommandHandler
{
///
///
///
[EventHandler]
public async Task AddGoods(AddGoodsCommand command, IIntegrationEventBus integrationEventBus)
{
//todo: 模拟添加商品到db并发送添加商品集成事件
var goodsId = Guid.NewGuid(); //模拟添加到db后并获取商品id
await integrationEventBus.PublishAsync(new AddGoodsIntegrationEvent(goodsId, command.Name, command.Cover, command.Price,
command.Count));
}
}
///
///
///
///
///
///
public record AddGoodsIntegrationEvent(Guid Id, string Name, string Cover, decimal Price, int Count) : IntegrationEvent
{
public Guid Id { get; set; } = Id;
public string Name { get; set; } = Name;
public string Cover { get; set; } = Cover;
public decimal Price { get; set; } = Price;
public int Count { get; set; } = Count;
public override string Topic { get; set; } = nameof(AddGoodsIntegrationEvent);
}
新增Query
处理程序, 添加类QueryHandler.cs
public class QueryHandler
{
///
///
///
[EventHandler]
public Task GetGoods(GoodsItemQuery query)
{
//todo: 模拟从cache获取商品
var goods = new GoodsItemDto();
query.Result = goods;
return Task.CompletedTask;
}
}
新增添加商品的跨进程事件的处理服务, 修改Program.cs
app.MapPost(
"/integration/goods/add",
[Topic("pubsub", nameof(AddGoodsIntegrationEvent))]
(AddGoodsIntegrationEvent @event, ILogger
{
//todo: 模拟添加商品到缓存
logger.LogInformation("添加商品到缓存, {Event}", @event);
});
// 使用 dapr 来订阅跨进程事件
app.UseRouting();
app.UseCloudEvents();
app.UseEndpoints(endpoint =>
{
endpoint.MapSubscribeHandler();
});
流水账式的服务会使得
Program.cs
变得十分臃肿, 可以通过Masa Framework
提供的MinimalAPIs来简化Program.cs
点击查看详情。
我们上面的例子是通过事件总线来完成解耦以及数据模型的同步, 使用的双数据库模式, 但读库使用的是 缓存数据库
, 在Command
端做商品的添加操作, 在Query
端只做查询, 且两端分别使用各自的数据源, 两者业务互不影响, 并且由于缓存数据库性能更强, 它将最大限度的提升性能, 使得我们有更好的使用体验。
在Masa Framework
中仅仅是通过ICommand
、IQuery
将读写分开, 但这并没有硬性要求, 事实上你使用IEvent
也是可以的, CQRS
只是一种设计模式, 这点我们要清楚, 它只是告诉我们要按照一个什么样的标准去做, 但具体怎么来做, 取决于业务的决策者, 除此之外, 后续Masa Framework
还会增加对Event Sourcing
(事件溯源)的支持, 通过事件重放, 允许我们随时重建到对象的任何状态
Assignment15
https://github.com/zhenlei520/MasaFramework.Practice
CQRS架构项目:https://github.com/masalabs/MASA.EShop/tree/main/src/Services/Masa.EShop.Services.Catalog
MASA.Framework:https://github.com/masastack/MASA.Framework
MASA.EShop:https://github.com/masalabs/MASA.EShop
MASA.Blazor:https://github.com/BlazorComponent/MASA.Blazor
如果你对我们的 MASA Framework 感兴趣, 无论是代码贡献、使用、提 Issue, 欢迎联系我们
手机扫一扫
移动阅读更方便
你可能感兴趣的文章