Halo(六)
阅读原文时间:2023年07月12日阅读:1

Spring publish-event 机制

监听者模式包含了一个监听者 Listener 与之对应的事件 Event,还有一个事件发布者 EventPublish。
过程就是 EventPublish 发布一个事件,被监听者捕获到,然后执行事件相应的方法。

1. 发布事件

    private final ApplicationEventPublisher eventPublisher;

    eventPublisher.publishEvent(new LogEvent(this, name, 0, ld));

2. 事件

public class LogEvent extends ApplicationEvent {

    private final LogParam logParam;

    public LogEvent(Object source, LogParam logParam) {
        //事件源(发布事件的对象)
        super(source);

        this.logParam = logParam;
    }

    public LogEvent(Object source, String logKey, LogType logType, String content) {
        this(source, new LogParam(logKey, logType, content));
    }

    public LogParam getLogParam() {
        return logParam;
    }
}

3. 监听器

@Component
public class LogEventListener {

    private final LogService logService;

    public LogEventListener(LogService logService) {
        this.logService = logService;
    }

    //将方法标记为应用程序事件侦听器
    @EventListener
    //异步
    @Async
    public void onApplicationEvent(LogEvent event) {
        //转换成 Entity
        Log logToCreate = event.getLogParam().convertTo();
        //保存日志
        logService.create(logToCreate);
    }
}

缓存模块

自定义注解并使用

/**
 * 缓存锁注解(在具有该注解的方法上:加锁,执行目标方法,释放锁)
 */
@Target(ElementType.METHOD)    //用于描述注解的使用范围
@Retention(RetentionPolicy.RUNTIME) //用于描述注解的生命周期
@Documented //表示该注解是否可以生成到 API 文档中
@Inherited  //具备继承性(A被注解了,那么继承了A的B够继承到A中的注解)
public @interface CacheLock {

    /** 缓存锁key前缀(默认"")*/
    @AliasFor("value")
    String prefix() default "";

    /** 缓存锁key前缀(默认"")*/
    @AliasFor("prefix")
    String value() default "";

    /** 过期时间 */
    long expired() default 5;

    /** 时间单位(默认 s)*/
    TimeUnit timeUnit() default TimeUnit.SECONDS;

    /** 分隔符(默认 :)*/
    String delimiter() default ":";

    /** 方法调用后是否删除缓存 */
    boolean autoDelete() default true;

    /** 是否跟踪请求信息(将请求IP添加到缓存key上)*/
    boolean traceRequest() default false;
}

/**
 * 缓存参数注释
 */
@Target(ElementType.PARAMETER)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
public @interface CacheParam {
}

Controller:

    @PostMapping("/login")
    @CacheLock(autoDelete = false)    //登陆操作加锁,防止重复登陆
    public AuthToken auth(@RequestBody @Valid LoginParam loginParam) {
        return adminService.authenticate(loginParam);
    }

用于缓存锁注解的拦截器(AOP代理模式)

@Aspect
@Configuration
public class CacheLockInterceptor {

    //缓存锁key前缀:cache_lock_
    private final static String CACHE_LOCK_PREFOX = "cache_lock_";

    //缓存锁value:locked(被锁定状态)
    private final static String CACHE_LOCK_VALUE = "locked";

    //缓存池
    private final StringCacheStore cacheStore;

    public CacheLockInterceptor(StringCacheStore cacheStore) {
        this.cacheStore = cacheStore;
    }

    //具有 @annotation(run.halo.app.cache.lock.CacheLock) 注解的方法都会触发拦截器
    @Around("@annotation(run.halo.app.cache.lock.CacheLock)")
    public Object interceptCacheLock(ProceedingJoinPoint joinPoint) throws Throwable {
        //获取方法签名
        MethodSignature methodSignature = (MethodSignature) joinPoint.getSignature();

        //获取注解类
        CacheLock cacheLock = methodSignature.getMethod().getAnnotation(CacheLock.class);

        //建立缓存锁key
        String cacheLockKey = buildCacheLockKey(cacheLock, joinPoint);

        try {
            //放入缓存(key是cacheLockKey,value是locked)
            //如果缓存中没有此缓存锁key才会放入(返回true),否则会放入失败(返回false)
            Boolean cacheResult = cacheStore.putIfAbsent(cacheLockKey, CACHE_LOCK_VALUE, cacheLock.expired(), cacheLock.timeUnit());

            if (!cacheResult) {
                throw new FrequentAccessException("访问过于频繁,请稍后再试!").setErrorData(cacheLockKey);
            }

            //处理被注解的方法,获取返回值
            return joinPoint.proceed();
        } finally {
            if (cacheLock.autoDelete()) {
                //注解标注调用方法后删除缓存
                cacheStore.delete(cacheLockKey);
            }
        }
    }

    /**
     * 建立缓存锁key(包括:cache_lock_cacheLock.prefix/Method:arg:IP)
     */
    private String buildCacheLockKey(CacheLock cacheLock,ProceedingJoinPoint joinPoint) {

        //获取方法
        MethodSignature methodSignature = (MethodSignature) joinPoint.getSignature();

        //建立缓存锁key
        StringBuilder cacheKeyBuilder = new StringBuilder(CACHE_LOCK_PREFOX);

        //分隔符(:)
        String delimiter = cacheLock.delimiter();

        if (StringUtils.isNotBlank(cacheLock.prefix())) {
            cacheKeyBuilder.append(cacheLock.prefix());
        } else {
            cacheKeyBuilder.append(methodSignature.getMethod().toString());
        }

        //获取方法上的参数注解数组(一个方法可以有多个参数,一个参数可以有多个注解)
        Annotation[][] parameterAnnotations = methodSignature.getMethod().getParameterAnnotations();
        for (int i = 0; i < parameterAnnotations.length; i++) {
            for (int j = 0; j < parameterAnnotations[i].length; j++) {
                //获取注解
                Annotation annotation = parameterAnnotations[i][j];
                //如果被 @CacheParam 注解,则获取参数,添加到缓存锁key上
                if (annotation instanceof CacheParam) {
                    //获取参数
                    Object arg = joinPoint.getArgs()[i];
                    //将参数添加到缓存key上
                    cacheKeyBuilder.append(delimiter).append(arg.toString());
                }
            }
        }

        if (cacheLock.traceRequest()) {
            //添加 Request 客户端 IP
            cacheKeyBuilder.append(delimiter).append(ServletUtils.getRequestIp());
        }
        return cacheKeyBuilder.toString();
    }
}

缓存包装器(将缓存数据封装成对象)

@Data
@NoArgsConstructor
@AllArgsConstructor
class CacheWrapper<V> implements Serializable {

    /** 缓存数据 */
    private V data;

    /** 过期时间 */
    private Date expireAt;

    /** 创造时间 */
    private Date createAt;
}

缓存池接口

public interface CacheStore<K, V> {

    /** 通过key得到缓存 */
    Optional<V> get(@NonNull K key);

    /** 放入一个会过期的缓存 */
    void put(@NonNull K key, @NonNull V value, long timeout, @NonNull TimeUnit timeUnit);

    /** 缓存池中不存在key才放入 */
    Boolean putIfAbsent(@NonNull K key, @NonNull V value, long timeout, @NonNull TimeUnit timeUnit);

    /** 放入一个不会过期的缓存 */
    void put(@NonNull K key, @NonNull V value);

    /** 删除缓存 */
    void delete(@NonNull K key);
}

缓存池抽象类

public abstract class AbstractCacheStore<K, V> implements CacheStore<K, V> {

    /** 通过key获取缓存包装器 */
    abstract Optional<CacheWrapper<V>> getInternal(@NonNull K key);

    /** 放入缓存包装器 */
    abstract void putInternal(@NonNull K key, @NonNull CacheWrapper<V> cacheWrapper);

    /** 如果key不存在才放入缓存包装器 */
    abstract Boolean putInternalIfAbsent(@NonNull K key, @NonNull CacheWrapper<V> cacheWrapper);

    /** 通过key获取value */
    @Override
    public Optional<V> get(K key) {
        return getInternal(key).map(cacheWrapper -> {
            //判断是否过期(过期时间不是null,并且在当前时间之前,表示已经过期)
            if (cacheWrapper.getExpireAt() != null && cacheWrapper.getExpireAt().before(run.halo.app.utils.DateUtils.now())) {
                //删除缓存
                delete(key);
                return null;
            }
            return cacheWrapper.getData();
        });
    }

    /** 放入带过期时间的缓存 */
    @Override
    public void put(K key, V value, long timeout, TimeUnit timeUnit) {
        putInternal(key, buildCacheWrapper(value, timeout, timeUnit));
    }

    /** key不存在才放入缓存 */
    @Override
    public Boolean putIfAbsent(K key, V value, long timeout, TimeUnit timeUnit) {
        return putInternalIfAbsent(key, buildCacheWrapper(value, timeout, timeUnit));
    }

    /** 放入不过期的缓存 */
    @Override
    public void put(K key, V value) {
        putInternal(key, buildCacheWrapper(value, 0, null));
    }

    /** 构建缓存包装器 */
    @NonNull
    private CacheWrapper<V> buildCacheWrapper(@NonNull V value, long timeout, @Nullable TimeUnit timeUnit) {
        //过期时间必须>=0
        Assert.isTrue(timeout >= 0, "Cache expiration timeout must not be less than 0");

        Date now = run.halo.app.utils.DateUtils.now();

        Date expireAt = null;

        //如果设置了过期时间,则构造过期时间
        if (timeout > 0 && timeUnit != null) {
            expireAt = DateUtils.add(now, timeout, timeUnit);
        }

        //构建缓存包装器
        CacheWrapper<V> cacheWrapper = new CacheWrapper<>();
        cacheWrapper.setCreateAt(now);
        cacheWrapper.setExpireAt(expireAt);
        cacheWrapper.setData(value);

        return cacheWrapper;
    }
}

字符串缓存池抽象类(将缓存 Data 数据转换成 Json 字符串)

public abstract class StringCacheStore extends AbstractCacheStore<String, String> {

    /** 放入值 */
    public <T> void putAny(String key, T value) {
        try {
            //JsonUtils.objectToJson(value):对象转成Json
            put(key, JsonUtils.objectToJson(value));
        } catch (JsonProcessingException e) {
            throw new ServiceException("Failed to convert " + value + " to json", e);
        }
    }

    public <T> void putAny(@NonNull String key, @NonNull T value, long timeout, @NonNull TimeUnit timeUnit) {
        try {
            put(key, JsonUtils.objectToJson(value), timeout, timeUnit);
        } catch (JsonProcessingException e) {
            throw new ServiceException("Failed to convert " + value + " to json", e);
        }
    }

    /** 获取值 */
    public <T> Optional<T> getAny(String key, Class<T> type) {
        return get(key).map(value -> {
            try {
                //JsonUtils.jsonToObject(value, type):Json转换成对象
                return JsonUtils.jsonToObject(value, type);
            } catch (IOException e) {
                return null;
            }
        });
    }
}

内存缓存池(字符串缓存池)

public class InMemoryCacheStore extends StringCacheStore {

    /** 清理计划周期 */
    private final static long PERIOD = 60 * 1000;    //一分钟

    /** 缓存容器 */
    private final static ConcurrentHashMap<String, CacheWrapper<String>> CACHE_CONTAINER = new ConcurrentHashMap<>();

    //定时任务
    private final Timer timer;

    public InMemoryCacheStore() {
        //定时清理缓存(1分钟)
        timer = new Timer();
        //开启定时任务,延迟0,周期一分钟
        timer.scheduleAtFixedRate(new CacheExpiryCleaner(), 0, PERIOD);
    }

    /** 获取值 */
    @Override
    Optional<CacheWrapper<String>> getInternal(String key) {
        //获取value,并放入允许null的Optional对象中
        return Optional.ofNullable(CACHE_CONTAINER.get(key));
    }

    /** 放入值 */
    @Override
    void putInternal(String key, CacheWrapper<String> cacheWrapper) {
        //返回原始值
        CacheWrapper<String> oldCacheWrapper = CACHE_CONTAINER.put(key, cacheWrapper);
    }

    /** 不存在才放入值 */
    @Override
    Boolean putInternalIfAbsent(String key, CacheWrapper<String> cacheWrapper) {
        CacheWrapper<String> stringCacheWrapper = CACHE_CONTAINER.putIfAbsent(key, cacheWrapper);
        if(stringCacheWrapper==null)return true;
        return false;
    }

    /** 删除缓存 */
    @Override
    public void delete(String key) {
        CACHE_CONTAINER.remove(key);
    }

    /** 清空缓存前取消所有定时任务 */
    @PreDestroy    //销毁Bean之前的操作
    public void preDestroy() {
        //取消所有定时任务
        timer.cancel();
    }

    /** 缓存超时清理任务 */
    private class CacheExpiryCleaner extends TimerTask {
        //get()中会判断缓存是否过期,过期会删除缓存,并返回null
        @Override
        public void run() {
            CACHE_CONTAINER.keySet().forEach(key -> {
                //类名.this表示类名所代表类的对象
                if (!InMemoryCacheStore.this.get(key).isPresent()) {
                    log.info("删除的过期缓存key:[{}]", key);
                }
            });
        }
    }
}