监听者模式包含了一个监听者 Listener 与之对应的事件 Event,还有一个事件发布者 EventPublish。
过程就是 EventPublish 发布一个事件,被监听者捕获到,然后执行事件相应的方法。
1. 发布事件
private final ApplicationEventPublisher eventPublisher;
eventPublisher.publishEvent(new LogEvent(this, name, 0, ld));
2. 事件
public class LogEvent extends ApplicationEvent {
private final LogParam logParam;
public LogEvent(Object source, LogParam logParam) {
//事件源(发布事件的对象)
super(source);
this.logParam = logParam;
}
public LogEvent(Object source, String logKey, LogType logType, String content) {
this(source, new LogParam(logKey, logType, content));
}
public LogParam getLogParam() {
return logParam;
}
}
3. 监听器
@Component
public class LogEventListener {
private final LogService logService;
public LogEventListener(LogService logService) {
this.logService = logService;
}
//将方法标记为应用程序事件侦听器
@EventListener
//异步
@Async
public void onApplicationEvent(LogEvent event) {
//转换成 Entity
Log logToCreate = event.getLogParam().convertTo();
//保存日志
logService.create(logToCreate);
}
}
/**
* 缓存锁注解(在具有该注解的方法上:加锁,执行目标方法,释放锁)
*/
@Target(ElementType.METHOD) //用于描述注解的使用范围
@Retention(RetentionPolicy.RUNTIME) //用于描述注解的生命周期
@Documented //表示该注解是否可以生成到 API 文档中
@Inherited //具备继承性(A被注解了,那么继承了A的B够继承到A中的注解)
public @interface CacheLock {
/** 缓存锁key前缀(默认"")*/
@AliasFor("value")
String prefix() default "";
/** 缓存锁key前缀(默认"")*/
@AliasFor("prefix")
String value() default "";
/** 过期时间 */
long expired() default 5;
/** 时间单位(默认 s)*/
TimeUnit timeUnit() default TimeUnit.SECONDS;
/** 分隔符(默认 :)*/
String delimiter() default ":";
/** 方法调用后是否删除缓存 */
boolean autoDelete() default true;
/** 是否跟踪请求信息(将请求IP添加到缓存key上)*/
boolean traceRequest() default false;
}
/**
* 缓存参数注释
*/
@Target(ElementType.PARAMETER)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
public @interface CacheParam {
}
Controller:
@PostMapping("/login")
@CacheLock(autoDelete = false) //登陆操作加锁,防止重复登陆
public AuthToken auth(@RequestBody @Valid LoginParam loginParam) {
return adminService.authenticate(loginParam);
}
@Aspect
@Configuration
public class CacheLockInterceptor {
//缓存锁key前缀:cache_lock_
private final static String CACHE_LOCK_PREFOX = "cache_lock_";
//缓存锁value:locked(被锁定状态)
private final static String CACHE_LOCK_VALUE = "locked";
//缓存池
private final StringCacheStore cacheStore;
public CacheLockInterceptor(StringCacheStore cacheStore) {
this.cacheStore = cacheStore;
}
//具有 @annotation(run.halo.app.cache.lock.CacheLock) 注解的方法都会触发拦截器
@Around("@annotation(run.halo.app.cache.lock.CacheLock)")
public Object interceptCacheLock(ProceedingJoinPoint joinPoint) throws Throwable {
//获取方法签名
MethodSignature methodSignature = (MethodSignature) joinPoint.getSignature();
//获取注解类
CacheLock cacheLock = methodSignature.getMethod().getAnnotation(CacheLock.class);
//建立缓存锁key
String cacheLockKey = buildCacheLockKey(cacheLock, joinPoint);
try {
//放入缓存(key是cacheLockKey,value是locked)
//如果缓存中没有此缓存锁key才会放入(返回true),否则会放入失败(返回false)
Boolean cacheResult = cacheStore.putIfAbsent(cacheLockKey, CACHE_LOCK_VALUE, cacheLock.expired(), cacheLock.timeUnit());
if (!cacheResult) {
throw new FrequentAccessException("访问过于频繁,请稍后再试!").setErrorData(cacheLockKey);
}
//处理被注解的方法,获取返回值
return joinPoint.proceed();
} finally {
if (cacheLock.autoDelete()) {
//注解标注调用方法后删除缓存
cacheStore.delete(cacheLockKey);
}
}
}
/**
* 建立缓存锁key(包括:cache_lock_cacheLock.prefix/Method:arg:IP)
*/
private String buildCacheLockKey(CacheLock cacheLock,ProceedingJoinPoint joinPoint) {
//获取方法
MethodSignature methodSignature = (MethodSignature) joinPoint.getSignature();
//建立缓存锁key
StringBuilder cacheKeyBuilder = new StringBuilder(CACHE_LOCK_PREFOX);
//分隔符(:)
String delimiter = cacheLock.delimiter();
if (StringUtils.isNotBlank(cacheLock.prefix())) {
cacheKeyBuilder.append(cacheLock.prefix());
} else {
cacheKeyBuilder.append(methodSignature.getMethod().toString());
}
//获取方法上的参数注解数组(一个方法可以有多个参数,一个参数可以有多个注解)
Annotation[][] parameterAnnotations = methodSignature.getMethod().getParameterAnnotations();
for (int i = 0; i < parameterAnnotations.length; i++) {
for (int j = 0; j < parameterAnnotations[i].length; j++) {
//获取注解
Annotation annotation = parameterAnnotations[i][j];
//如果被 @CacheParam 注解,则获取参数,添加到缓存锁key上
if (annotation instanceof CacheParam) {
//获取参数
Object arg = joinPoint.getArgs()[i];
//将参数添加到缓存key上
cacheKeyBuilder.append(delimiter).append(arg.toString());
}
}
}
if (cacheLock.traceRequest()) {
//添加 Request 客户端 IP
cacheKeyBuilder.append(delimiter).append(ServletUtils.getRequestIp());
}
return cacheKeyBuilder.toString();
}
}
@Data
@NoArgsConstructor
@AllArgsConstructor
class CacheWrapper<V> implements Serializable {
/** 缓存数据 */
private V data;
/** 过期时间 */
private Date expireAt;
/** 创造时间 */
private Date createAt;
}
public interface CacheStore<K, V> {
/** 通过key得到缓存 */
Optional<V> get(@NonNull K key);
/** 放入一个会过期的缓存 */
void put(@NonNull K key, @NonNull V value, long timeout, @NonNull TimeUnit timeUnit);
/** 缓存池中不存在key才放入 */
Boolean putIfAbsent(@NonNull K key, @NonNull V value, long timeout, @NonNull TimeUnit timeUnit);
/** 放入一个不会过期的缓存 */
void put(@NonNull K key, @NonNull V value);
/** 删除缓存 */
void delete(@NonNull K key);
}
public abstract class AbstractCacheStore<K, V> implements CacheStore<K, V> {
/** 通过key获取缓存包装器 */
abstract Optional<CacheWrapper<V>> getInternal(@NonNull K key);
/** 放入缓存包装器 */
abstract void putInternal(@NonNull K key, @NonNull CacheWrapper<V> cacheWrapper);
/** 如果key不存在才放入缓存包装器 */
abstract Boolean putInternalIfAbsent(@NonNull K key, @NonNull CacheWrapper<V> cacheWrapper);
/** 通过key获取value */
@Override
public Optional<V> get(K key) {
return getInternal(key).map(cacheWrapper -> {
//判断是否过期(过期时间不是null,并且在当前时间之前,表示已经过期)
if (cacheWrapper.getExpireAt() != null && cacheWrapper.getExpireAt().before(run.halo.app.utils.DateUtils.now())) {
//删除缓存
delete(key);
return null;
}
return cacheWrapper.getData();
});
}
/** 放入带过期时间的缓存 */
@Override
public void put(K key, V value, long timeout, TimeUnit timeUnit) {
putInternal(key, buildCacheWrapper(value, timeout, timeUnit));
}
/** key不存在才放入缓存 */
@Override
public Boolean putIfAbsent(K key, V value, long timeout, TimeUnit timeUnit) {
return putInternalIfAbsent(key, buildCacheWrapper(value, timeout, timeUnit));
}
/** 放入不过期的缓存 */
@Override
public void put(K key, V value) {
putInternal(key, buildCacheWrapper(value, 0, null));
}
/** 构建缓存包装器 */
@NonNull
private CacheWrapper<V> buildCacheWrapper(@NonNull V value, long timeout, @Nullable TimeUnit timeUnit) {
//过期时间必须>=0
Assert.isTrue(timeout >= 0, "Cache expiration timeout must not be less than 0");
Date now = run.halo.app.utils.DateUtils.now();
Date expireAt = null;
//如果设置了过期时间,则构造过期时间
if (timeout > 0 && timeUnit != null) {
expireAt = DateUtils.add(now, timeout, timeUnit);
}
//构建缓存包装器
CacheWrapper<V> cacheWrapper = new CacheWrapper<>();
cacheWrapper.setCreateAt(now);
cacheWrapper.setExpireAt(expireAt);
cacheWrapper.setData(value);
return cacheWrapper;
}
}
public abstract class StringCacheStore extends AbstractCacheStore<String, String> {
/** 放入值 */
public <T> void putAny(String key, T value) {
try {
//JsonUtils.objectToJson(value):对象转成Json
put(key, JsonUtils.objectToJson(value));
} catch (JsonProcessingException e) {
throw new ServiceException("Failed to convert " + value + " to json", e);
}
}
public <T> void putAny(@NonNull String key, @NonNull T value, long timeout, @NonNull TimeUnit timeUnit) {
try {
put(key, JsonUtils.objectToJson(value), timeout, timeUnit);
} catch (JsonProcessingException e) {
throw new ServiceException("Failed to convert " + value + " to json", e);
}
}
/** 获取值 */
public <T> Optional<T> getAny(String key, Class<T> type) {
return get(key).map(value -> {
try {
//JsonUtils.jsonToObject(value, type):Json转换成对象
return JsonUtils.jsonToObject(value, type);
} catch (IOException e) {
return null;
}
});
}
}
public class InMemoryCacheStore extends StringCacheStore {
/** 清理计划周期 */
private final static long PERIOD = 60 * 1000; //一分钟
/** 缓存容器 */
private final static ConcurrentHashMap<String, CacheWrapper<String>> CACHE_CONTAINER = new ConcurrentHashMap<>();
//定时任务
private final Timer timer;
public InMemoryCacheStore() {
//定时清理缓存(1分钟)
timer = new Timer();
//开启定时任务,延迟0,周期一分钟
timer.scheduleAtFixedRate(new CacheExpiryCleaner(), 0, PERIOD);
}
/** 获取值 */
@Override
Optional<CacheWrapper<String>> getInternal(String key) {
//获取value,并放入允许null的Optional对象中
return Optional.ofNullable(CACHE_CONTAINER.get(key));
}
/** 放入值 */
@Override
void putInternal(String key, CacheWrapper<String> cacheWrapper) {
//返回原始值
CacheWrapper<String> oldCacheWrapper = CACHE_CONTAINER.put(key, cacheWrapper);
}
/** 不存在才放入值 */
@Override
Boolean putInternalIfAbsent(String key, CacheWrapper<String> cacheWrapper) {
CacheWrapper<String> stringCacheWrapper = CACHE_CONTAINER.putIfAbsent(key, cacheWrapper);
if(stringCacheWrapper==null)return true;
return false;
}
/** 删除缓存 */
@Override
public void delete(String key) {
CACHE_CONTAINER.remove(key);
}
/** 清空缓存前取消所有定时任务 */
@PreDestroy //销毁Bean之前的操作
public void preDestroy() {
//取消所有定时任务
timer.cancel();
}
/** 缓存超时清理任务 */
private class CacheExpiryCleaner extends TimerTask {
//get()中会判断缓存是否过期,过期会删除缓存,并返回null
@Override
public void run() {
CACHE_CONTAINER.keySet().forEach(key -> {
//类名.this表示类名所代表类的对象
if (!InMemoryCacheStore.this.get(key).isPresent()) {
log.info("删除的过期缓存key:[{}]", key);
}
});
}
}
}
手机扫一扫
移动阅读更方便
你可能感兴趣的文章