记一次elastic-job使用
阅读原文时间:2023年07月12日阅读:1

当当的elastic-job定时任务

业务场景是定时从微信取accesstoken和jsticket,因为都只有7200秒的有效时间,所以设置了定时任务,定时将得到的数据存到redis缓存中

问题1:但是刚开始的时候将业务写在实现了simplejob类的类中,需要注入RedisCacheManager,结果发现是不能拿到的

所以更改了配置文件jobRegisterSupport

问题2:而且这个工作类的cron不管本地怎么改都不能改动,因为数据是从注册中心拿到的,所以在jobconfiguration中配置了overwrite为true,这样可以覆盖掉注册中心的配置

Elastic-Job-Lite采用无中心化设计,若每个客户端的配置不一致,不做控制的话,最后一个启动的客户端配置将会成为注册中心的最终配置。

Elastic-Job-Lite提出了overwrite概念,可通过JobConfiguration或Spring命名空间配置。overwrite=true即允许客户端配置覆盖注册中心,反之则不允许。如果注册中心无相关作业的配置,则无论overwrite是否配置,客户端配置都将写入注册中心。

原先的:

@Bean
public JobRegisterSupport jobRegisterSupport(CoordinatorRegistryCenter coordinatorRegistryCenter, ApplicationContextOwner applicationContextOwner){
ElasticJobClassScanner scanner = new ElasticJobClassScanner();
scanner.addIncludeFilter(new AssignableTypeFilter(ElasticJob.class) {
@Override
protected boolean matchClassName(String className) {
return false;
}
});
scanner.addExcludeFilter(new AnnotationTypeFilter(JobExclude.class) {
@Override
protected boolean matchClassName(String className) {
return false;
}
});
Set classes = scanner.doScan("*.*.jobInstance");//自己的工作类
classes.forEach(clazz ->Arrays.stream(JobConfig.JobElement.values()).forEach(jobElement -> {
if (clazz == jobElement.getJobClass()){
JobCoreConfiguration jobCoreConfiguration = JobCoreConfiguration.newBuilder(jobElement.getJobName(), jobElement.getCron(), jobElement.getShardingTotalCount()).build();
SimpleJobConfiguration simpleJobConfiguration = new SimpleJobConfiguration(jobCoreConfiguration, clazz.getCanonicalName());
LiteJobConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(simpleJobConfiguration).build();
JobScheduler jobScheduler = new JobScheduler(coordinatorRegistryCenter, simpleJobRootConfig);
jobScheduler.init();
DefaultListableBeanFactory beanFactory = (DefaultListableBeanFactory)applicationContextOwner.getApplicationContext().getAutowireCapableBeanFactory();
beanFactory.registerSingleton(clazz.getCanonicalName(),jobScheduler);
return;
}
}));
return new JobRegisterSupport();
}

改过的:

@Bean
public JobRegisterSupport jobRegisterSupport(CoordinatorRegistryCenter coordinatorRegistryCenter, ApplicationContextOwner applicationContextOwner){
ElasticJobClassScanner scanner = new ElasticJobClassScanner();
scanner.addIncludeFilter(new AssignableTypeFilter(ElasticJob.class) {
@Override
protected boolean matchClassName(String className) {
return false;
}
});
scanner.addExcludeFilter(new AnnotationTypeFilter(JobExclude.class) {
@Override
protected boolean matchClassName(String className) {
return false;
}
});
Set classes = scanner.doScan("*.*.job.jobInstance");
classes.forEach(clazz ->Arrays.stream(JobConfig.JobElement.values()).forEach(jobElement -> {
if (clazz == jobElement.getJobClass()){
JobCoreConfiguration jobCoreConfiguration = JobCoreConfiguration.newBuilder(jobElement.getJobName(), jobElement.getCron(), jobElement.getShardingTotalCount()).build();
SimpleJobConfiguration simpleJobConfiguration = new SimpleJobConfiguration(jobCoreConfiguration, clazz.getCanonicalName());
LiteJobConfiguration liteJobConfiguration = LiteJobConfiguration.newBuilder(simpleJobConfiguration).overwrite(true).build();//加入了覆盖
DefaultListableBeanFactory beanFactory = (DefaultListableBeanFactory)applicationContextOwner.getApplicationContext().getAutowireCapableBeanFactory();
AbstractBeanDefinition ActualRawBeanDefinition = BeanDefinitionBuilder.genericBeanDefinition(clazz).setScope(BeanDefinition.SCOPE_SINGLETON).getRawBeanDefinition();
beanFactory.registerBeanDefinition(clazz.getCanonicalName(),ActualRawBeanDefinition);
SpringJobScheduler springJobScheduler = new SpringJobScheduler((SimpleJob)beanFactory.getBean(clazz), coordinatorRegistryCenter, liteJobConfiguration);
springJobScheduler.init();
beanFactory.registerSingleton("spring" + clazz.getCanonicalName(),springJobScheduler);
return;
}
}));
return new JobRegisterSupport();
}

手机扫一扫

移动阅读更方便

阿里云服务器
腾讯云服务器
七牛云服务器

你可能感兴趣的文章