ABP - 本地事件总线
阅读原文时间:2023年08月18日阅读:3

1. 事件总线

在我们的一个应用中,经常会出现一个逻辑执行之后要跟随执行另一个逻辑的情况,例如一个用户创建了后续还需要发送邮件进行通知,或者需要初始化相应的权限等。面对这样的情况,我们当然可以顺序进行相应的逻辑代码的编写,但这样会导致各种业务逻辑全部集中耦合在一个类中,违背了 "单一职责原则"。

在 ABP 框架中,对于上面的业务场景的处理,我们可以通过事件总线来解耦,使得代码逻辑实现更加清晰。事件总线的本质就是中介者模式,利用一个中介角色在发送方和接受方之间进行消息的传递,接受方单独实现关注的独立的小功能点,从而达到各块业务逻辑清晰,代码松散耦合的目的。

ABP 框架中的事件总线分为 本地事件总线 和 分布式事件总线 两种,两种使用的方式基本类似,只是分布式事件总线需要借助 RabbitMQ、Kafaka 等第三方消息队列中间件。本章先讲本地事件总线相关知识点。

2. 本地事件总线

本地事件总线实现进程内的事件的发布订阅功能,通常运用于单体应用架构或微服务架构中的一个服务内部。使用方式比较简单,以下是演示,也将通过控制台程序来进行。

2.1. 事件总线基本使用

本地事件总线的实现包含在 Volo.Abp.EventBus Nuget 包中,我们可以通过以下方式来集成。

通过以下命令创建一个控制台项目:

abp new AbpEventBus -t console

在 AbpEventBusSample.csproj 执行以下命令:

Abp add-package Volo.Abp.EventBus

如果是 Web 应用的话,在通过 ABP CLI 初始化启动模板的时候就已经集成了事件总线模块,无须再自己进行集成。

2.1.1 发布

ABP 提供了 ILocalEventBus 接口来满足我们对本地事件总线的使用。我们只需要在要进行事件发布的类注入该接口即可,之后就能通过以下的方式进行事件的发布了。

public class HelloWorldService : ITransientDependency
{
    private readonly ILocalEventBus _localEventBus;
    public HelloWorldService(ILocalEventBus localEventBus)
    {
        _localEventBus = localEventBus;
    }

    public Task SayHelloAsync()
    {
        // 当前业务逻辑
        Console.WriteLine("Hello Jerry!");
        // 关联业务逻辑
        _localEventBus.PublishAsync(new HelloEventData
        {
            Who = "Tom",
            ToWho = "Jerry",
            Where = "广州天河正佳广场",
            When = DateTime.Now
        });

        return Task.CompletedTask;
    }
}

在进行事件发布之前需要先定义一个事件对象,这是一个普通类,是事件相关的各种数据的一个包装类,例如上面使用到的 HelloEventData 。

public class HelloEventData
{
    public string Who { get; set; }

  public string ToWho { get; set; }

    public string Where { get; set; }

    public DateTime When { get; set; }
}

就算在事件发布过程中不需要传输任何数据也需要创建一个类,在这种情况下为空类,这是因为事件总线是通过这个事件对象的类型来确定其对于的订阅者,从而执行相应的处理方法的。

通过源码可以看到,当我们调用 PublishAsync 方法时,最终时调用了 TriggerHandlersAsync 方法,该方法中从 HandlerFactories 中找到相应的的 HandlerFactory,然后通过 IEventHandlerInvoker 执行相应的事件执行器。

最终,就是反射创建了对于的 IEventHandlerMethodExecutor 对象,传入执行器和事件对象,再通过委托调用执行器的 HandleEventAsync 方法。

那么 HandlerFactories 是怎么来的呢?它实际上就是一个事件对象和执行器对应的集合。这里的工厂实际上并不是执行器工厂,它不负责执行器的创建,而是通过执行器生成执行器的包装类(为了类型对象能够释放销毁)。

在我们通过 PublishAsync 方法发布事件的时候,还可以通过 onUnitOfWorkComplete 参数设置事件发布是否和工作单元挂钩,实现事件和其他业务的原子性。其实这也很简单,就是结合工作单元的时候,只是将事件存储起来,没有立刻触发。

等到工作单元提交了,再通过 IUnitOfWorkEventPublisher 对象发布,而该接口在事件总线模块中有对于的实现类UnitOfWorkEventPublisher,其实就是工作单元提交时再次发布一次不结合工作单元的事件而已。

2.1.2 订阅

事件的订阅有多种方式。

(1) 实现 ILocalEventHandler<TEvent> 接口,并配置到容器

public class HelloEventHandler : ILocalEventHandler<HelloEventData>, ITransientDependency
{
    public Task HandleEventAsync(HelloEventData eventData)
    {
        Console.WriteLine($"{eventData.Who} Say Hello To { eventData.ToWho } in { eventData.When } at { eventData.When }");
        return Task.CompletedTask;
    }
}

这种方式是最简便的方式,ABP 框架中的事件总线模块会在服务配置到容器时自动方向这些订阅者执行器。

从源码中可以看到,事件总线模块中注册了容器中依赖关系配置的拦截器(auto Ioc 容器的功能),在应用启动向容器中配置依赖关系的时候,这里的事件会触发,对每一个配置进行检查,通过接口类型查找到相应的实现类之后,会被保存到 选项 当中。

在 事件总线 构造函数会根据选项中保存的执行器注册订阅

实际上这里就是通过执行器实例创建了一个工厂类,并且将其添加到上面讲到的事件对象和执行器对应的集合 HandlerFactories 中,维护好事件对象类型与执行器的对于关系。

(2) 手动调用 ILocalEventBus 接口进行订阅

public Task SayHelloAsync()
{
    // 当前业务逻辑
    Console.WriteLine("Hello Jerry!");

    _localEventBus.Subscribe<HelloEventData, HelloLogEventHandler>();
    //_localEventBus.Subscribe(new HelloLogEventHandler());
    //_localEventBus.Subscribe<HelloEventData>((eventData) => {  return Task.CompletedTask; });

    // 关联业务逻辑
    _localEventBus.PublishAsync(new HelloEventData
    {
        Who = "Tom",
        ToWho = "Jerry",
        Where = "广州天河正佳广场",
        When = DateTime.Now
    });

    return Task.CompletedTask;
}

public class HelloLogEventHandler : ILocalEventHandler<HelloEventData>
{
    public Task HandleEventAsync(HelloEventData eventData)
    {
        Console.WriteLine($"Log: {eventData.Who} Say Hello To { eventData.ToWho } in { eventData.When } at { eventData.When }");
        return Task.CompletedTask;
    }
}

手动注册的订阅者会在调用注册代码之后全局生效,应该保存只有一次的订阅注册。

手动注册订阅者的时候,其实和上面自动注册的方式没太大区别,事件总线会根据我们注册订阅者的方式进行一定的包装,最终也是添加到事件对象和执行器对照的集合。

这些订阅者会在我们调用 ILocalEventBus 的 PublishAsync 方法对相关的事件进行发布之后触发。事件的发布订阅有以下特点:

  • 事件可以由0个或多个处理程序订阅.

  • 一个事件处理程序可以订阅多个事件,但是需要为每个事件实现 ILocalEventHandler 接口.

  • 如果需要在订阅者执行器中执行数据库操作并且使用到仓储,那可能需要使用工作单元。因为一些存储库方法需要在活动的工作单元中工作。应确保处理方法设置为 virtual,并为该方法添加一个 [UnitOfWork] 特性,或者手动使用 IUnitOfWorkManager 创建一个工作单元范围。

  • 当一个事件发布,订阅的事件处理程序将立即执行,而同时 PublishAsync 如果通过 await 关键字转同步的话,它将阻塞,直到事件处理程序执行完成。换句话说,本地事件总线事件发布与处理实际是立即触发,顺序执行的。

    这意味着如果处理程序抛出一个异常,它会影响发布该事件的代码,我们可以在 PublishAsync 调用上捕捉异常。 如果想要隐藏错误,可以在事件处理程序中使用 try-catch。 如果在一个工作单元范围内进行事件发布,那么相应的事件处理程序也会被工作单元覆盖. 这意味着,如果你的 UOW 是事务和处理程序抛出一个异常,事务会回滚。

这从上面列出来的源码中也可以看出来,本地事件总线本质上就还是通过在维护好事件对象类型与执行器对照集合中通过事件对象查找执行器,然后调用执行器中的方法的过程。

实体的增、删、改是非常常见的操作,有些时候一些实体的增、删、改之后需要关联一些其他的业务逻辑,这时候我们可以通过事件总线来解决。ABP框架会为所有的实体自动发布这些事件,我们只需要订阅相关的事件。

sing System.Threading.Tasks;
using Microsoft.AspNetCore.Identity;
using Volo.Abp.DependencyInjection;
using Volo.Abp.Domain.Entities.Events;
using Volo.Abp.EventBus;

namespace AbpDemo
{
    public class MyHandler
        : ILocalEventHandler<EntityCreatedEventData<IdentityUser>>,
          ITransientDependency
    {
        public async Task HandleEventAsync(
            EntityCreatedEventData<IdentityUser> eventData)
        {
            var userName = eventData.Entity.UserName;
            var email = eventData.Entity.Email;
            //...
        }
    }
}

上面的例子是 ABP 官方的示例,订阅了 EntityCreatedEventData<Entity> 接口的事件处理程序会在相应的实体创建之后触发。这种和领域对象(实体、聚合根)操作相关的预定义事件有两类:

用过去时态事件

当相关工作单元完成且实体更改成功保存到数据库时,将发布带有过去时态的事件. 如果在这些事件处理程序上抛出异常,则无法回滚事务,因为事务已经提交.

事件类型;

  • EntityCreatedEventData<T> 当实体创建成功后发布.
  • EntityUpdatedEventData<T> 当实体更新成功后发布.
  • EntityDeletedEventData<T> 当实体删除成功后发布.
  • EntityChangedEventData<T> 当实体创建,更新,删除后发布. 如果你需要监听任何类型的更改,它是一种快捷方式 - 而不是订阅单个事件.

用于进行时态事件(6.0版本可用,7.0版本已移除)

带有进行时态的事件在完成事务之前发布(如果数据库事务由所使用的数据库提供程序支持). 如果在这些事件处理程序上抛出异常,它会回滚事务,因为事务还没有完成,更改也没有保存到数据库中.

事件类型;

  • EntityCreatingEventData<T> 当新实体保存到数据库前发布.
  • EntityUpdatingEventData<T> 当已存在实体更新到数据库前发布.
  • EntityDeletingEventData<T> 删除实体前发布.
  • EntityChangingEventData<T> 当实体创建,更新,删除前发布. 如果你需要监听任何类型的更改,它是一种快捷方式 - 而不是订阅单个事件.

它们是在将更改保存到数据库时发布预构建事件;

  • 对于 EF Core, 他们在 DbContext.SaveChanges 发布.
  • 对于 MongoDB, 在你调用仓储的 InsertAsync, UpdateAsync 或 DeleteAsync 方法发布(因为MongoDB没有更改追踪系统).

领域对象中是不能够通过依赖注入注入服务,在聚合根类中我们可以通过 AddLocalEvent 添加本地事件,实体类中则不行,这里添加的事件将在聚合根对象持久化操作的时候发布。

using System;
using Volo.Abp.Domain.Entities;

namespace AbpDemo
{
    public class Product : AggregateRoot<Guid>
    {
        public string Name { get; set; }

        public int StockCount { get; private set; }

        private Product() { }

        public Product(Guid id, string name)
            : base(id)
        {
            Name = name;
        }

        public void ChangeStockCount(int newCount)
        {
            StockCount = newCount;

            //ADD an EVENT TO BE PUBLISHED
            AddLocalEvent(
                new StockCountChangedEvent
                {
                    ProductId = Id,
                    NewCount = newCount
                }
            );
        }
    }
}

聚合根中可以通过 AddLocalEvent 方法添加事件,是因为 ABP 框架中的聚合根基类实现了 IGeneratesDomainEvents 接口,如果我们的实体类中也需要发布事件,也可以实现 IGeneratesDomainEvents 接口。但是 ABP 并不建议随意地普通的实体类实现该接口,因为基于 IGeneratesDomainEvents 的事件发布是基于特定的数据库提供程序的,目前 ABP 框架中仅支持 EF Core 、MongoDB 的实现。

通过源码可以看到,这些事件的发布是重写了 SaveChangeAsync 等数据持久化的方法,在其中根据 IGeneratesDomainEvents 接口添加了相应的领域对象的更改操作事件。

最终还是和上面的发布事件时的工作单元操作一样,先添加到工作单元中,在工作单元提交的时候由 IUnitOfWorkEventPublisher 对象发布。

参考文档:

ABP 官方文档 - 本地事件总线

ABP 系列总结:

目录:ABP 系列总结

上一篇:ABP - 缓存模块(2)

手机扫一扫

移动阅读更方便

阿里云服务器
腾讯云服务器
七牛云服务器