elastic-job源码(1)- job自动装配
阅读原文时间:2023年07月13日阅读:1

版本:3.1.0-SNAPSHOT

git地址https://github.com/apache/shardingsphere-elasticjob

Maven 坐标

org.apache.shardingsphere.elasticjob elasticjob-lite-spring-boot-starter ${latest.version}

Spring.factories配置

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.apache.shardingsphere.elasticjob.lite.spring.boot.job.ElasticJobLiteAutoConfiguration

在添加elasticjob-lite-spring-boot-starter启动类的时候,会自动加载ElasticJobLiteAutoConfiguration,接下来看下ElasticJobLiteAutoConfiguration中所做的处理。

ElasticJobLiteAutoConfiguration.java

/**
* ElasticJob-Lite auto configuration.
*/
@Configuration(proxyBeanMethods = false)
@AutoConfigureAfter(DataSourceAutoConfiguration.class)

/**
* elastic job 开关
* elasticjob.enabled.ture默认为true
*/
@ConditionalOnProperty(name = "elasticjob.enabled", havingValue = "true", matchIfMissing = true)

/**
* 导入
* ElasticJobRegistryCenterConfiguration.class 注册中心配置
* ElasticJobTracingConfiguration.class job事件追踪配置
* ElasticJobSnapshotServiceConfiguration.class 快照配置
*/
@Import({ElasticJobRegistryCenterConfiguration.class, ElasticJobTracingConfiguration.class, ElasticJobSnapshotServiceConfiguration.class})

/**
* job相关配置信息
*/
@EnableConfigurationProperties(ElasticJobProperties.class)
public class ElasticJobLiteAutoConfiguration {

@Configuration(proxyBeanMethods = false)  
/\*\*  
 \* ElasticJobBootstrapConfiguration.class  创建job beans 注入spring容器  
 \* ScheduleJobBootstrapStartupRunner.class  执行类型为ScheduleJobBootstrap.class 的job开始运行  
 \*/  
@Import({ElasticJobBootstrapConfiguration.class, ScheduleJobBootstrapStartupRunner.class})  
protected static class ElasticJobConfiguration {  
}  

}

Elastic-job 是利用zookeeper 实现分布式job的功能,所以在自动装配的时候,需要有zookeeper注册中心的配置。

自动装配主要做了4件事事

1.配置zookeeper 客户端信息,启动连接zookeeper.

2.配置事件追踪数据库,用于保存job运行记录

3.解析所有job配置文件,将所有job的bean放置在spring 单例bean中

4.识别job类型,在zookeeper节点上处理job节点数据,运行定时任务job.

第一件事:配置zookeeper 客户端信息,启动连接zookeeper.

ZookeeperRegistryCenter.class

public void init() {
log.debug("Elastic job: zookeeper registry center init, server lists is: {}.", zkConfig.getServerLists());
CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
//设置zookeeper 服务器地址
.connectString(zkConfig.getServerLists())
//设置重试机制
.retryPolicy(new ExponentialBackoffRetry(zkConfig.getBaseSleepTimeMilliseconds(), zkConfig.getMaxRetries(), zkConfig.getMaxSleepTimeMilliseconds()))
//设置命名空间,zookeeper节点名称
.namespace(zkConfig.getNamespace());
//设置session超时时间
if (0 != zkConfig.getSessionTimeoutMilliseconds()) {
builder.sessionTimeoutMs(zkConfig.getSessionTimeoutMilliseconds());
}
//设置连接超时时间
if (0 != zkConfig.getConnectionTimeoutMilliseconds()) {
builder.connectionTimeoutMs(zkConfig.getConnectionTimeoutMilliseconds());
}
if (!Strings.isNullOrEmpty(zkConfig.getDigest())) {
builder.authorization("digest", zkConfig.getDigest().getBytes(StandardCharsets.UTF_8))
.aclProvider(new ACLProvider() {

                @Override  
                public List<ACL> getDefaultAcl() {  
                    return ZooDefs.Ids.CREATOR\_ALL\_ACL;  
                }

                @Override  
                public List<ACL> getAclForPath(final String path) {  
                    return ZooDefs.Ids.CREATOR\_ALL\_ACL;  
                }  
            });  
}  
client = builder.build();  
//zookeeper 客户端开始启动  
client.start();  
try {  
    //zookeeper 客户端一直连接  
    if (!client.blockUntilConnected(zkConfig.getMaxSleepTimeMilliseconds() \* zkConfig.getMaxRetries(), TimeUnit.MILLISECONDS)) {  
        client.close();  
        throw new KeeperException.OperationTimeoutException();  
    }  
    //CHECKSTYLE:OFF  
} catch (final Exception ex) {  
    //CHECKSTYLE:ON  
    RegExceptionHandler.handleException(ex);  
}  

}

第二件事: 配置事件追踪数据库,用于保存job运行记录

ElasticJobTracingConfiguration.java

/**
* Create a bean of tracing DataSource.
*
* @param tracingProperties tracing Properties
* @return tracing DataSource
*/
@Bean("tracingDataSource")
//spring中注入bean name 为tracingDataSource的job数据库连接信息
public DataSource tracingDataSource(final TracingProperties tracingProperties) {
//获取elastic-job 数据库配置
DataSourceProperties dataSource = tracingProperties.getDataSource();
if (dataSource == null) {
return null;
}
HikariDataSource tracingDataSource = new HikariDataSource();
tracingDataSource.setJdbcUrl(dataSource.getUrl());
BeanUtils.copyProperties(dataSource, tracingDataSource);
return tracingDataSource;
}

/**
* Create a bean of tracing configuration.
*
* @param dataSource required by constructor
* @param tracingDataSource tracing ataSource
* @return a bean of tracing configuration
*/
@Bean
@ConditionalOnBean(DataSource.class)
@ConditionalOnProperty(name = "elasticjob.tracing.type", havingValue = "RDB")
public TracingConfiguration tracingConfiguration(final DataSource dataSource, @Nullable final DataSource tracingDataSource) {
/**
* dataSource 是业务数据库
* tracingDataSource 是job数据库
* 当配置elasticjob.tracing.type = RDB时,如果单独配置job数据库是,默认使用job数据库作为job运行轨迹的记录
* 但这边同时业务数据库和job追踪数据库同时注入是,mybatis-plus 结合@Table 使用的时候,很有可能找不到正确对应的数据源
*/
DataSource ds = tracingDataSource;
if (ds == null) {
ds = dataSource;
}
return new TracingConfiguration<>("RDB", ds);
}

通过elasticjob.tracing.type=RDB的配置开启事件追踪功能,这边job的事件追踪数据源可以和业务数据源配置不一样。

第三件事:解析所有job配置文件

ElasticJobBootstrapConfiguration.class

public void createJobBootstrapBeans() {
//获取job配置
ElasticJobProperties elasticJobProperties = applicationContext.getBean(ElasticJobProperties.class);
//获取单利注册对象
SingletonBeanRegistry singletonBeanRegistry = ((ConfigurableApplicationContext) applicationContext).getBeanFactory();
//获取注入zookeeper 客户端
CoordinatorRegistryCenter registryCenter = applicationContext.getBean(CoordinatorRegistryCenter.class);
//获取job事件追踪
TracingConfiguration tracingConfig = getTracingConfiguration();
//构造JobBootstraps
constructJobBootstraps(elasticJobProperties, singletonBeanRegistry, registryCenter, tracingConfig);
}

重要的是constructJobBootstraps 这个方法,来看下

private void constructJobBootstraps(final ElasticJobProperties elasticJobProperties, final SingletonBeanRegistry singletonBeanRegistry,
final CoordinatorRegistryCenter registryCenter, final TracingConfiguration tracingConfig) {
//遍历配置的每一个job
for (Map.Entry entry : elasticJobProperties.getJobs().entrySet()) {
ElasticJobConfigurationProperties jobConfigurationProperties = entry.getValue();
//校验 job class 和 type 都为空抛异常
Preconditions.checkArgument(null != jobConfigurationProperties.getElasticJobClass()
|| !Strings.isNullOrEmpty(jobConfigurationProperties.getElasticJobType()),
"Please specific [elasticJobClass] or [elasticJobType] under job configuration.");
//校验 job class 和 type 都有 报相互排斥
Preconditions.checkArgument(null == jobConfigurationProperties.getElasticJobClass()
|| Strings.isNullOrEmpty(jobConfigurationProperties.getElasticJobType()),
"[elasticJobClass] and [elasticJobType] are mutually exclusive.");

    if (null != jobConfigurationProperties.getElasticJobClass()) {  
        //通过class 注入job  
        registerClassedJob(entry.getKey(), entry.getValue().getJobBootstrapBeanName(), singletonBeanRegistry, registryCenter, tracingConfig, jobConfigurationProperties);  
    } else if (!Strings.isNullOrEmpty(jobConfigurationProperties.getElasticJobType())) {  
        //通过type 注入job  
        registerTypedJob(entry.getKey(), entry.getValue().getJobBootstrapBeanName(), singletonBeanRegistry, registryCenter, tracingConfig, jobConfigurationProperties);  
    }  
}  

}

Job 有两种类型的注入,第一种是是class,配置成job的全路径信息注入

再来看看registerClassedJob 方法里的内容

private void registerClassedJob(final String jobName, final String jobBootstrapBeanName, final SingletonBeanRegistry singletonBeanRegistry, final CoordinatorRegistryCenter registryCenter,
final TracingConfiguration tracingConfig, final ElasticJobConfigurationProperties jobConfigurationProperties) {
//获取job配置
JobConfiguration jobConfig = jobConfigurationProperties.toJobConfiguration(jobName);
//配置job事件追踪
jobExtraConfigurations(jobConfig, tracingConfig);
//获取job类型
ElasticJob elasticJob = applicationContext.getBean(jobConfigurationProperties.getElasticJobClass());
//没有配置cron表达式 就初始化为OneOffJobBootstrap对象,一次性任务
if (Strings.isNullOrEmpty(jobConfig.getCron())) {
Preconditions.checkArgument(!Strings.isNullOrEmpty(jobBootstrapBeanName), "The property [jobBootstrapBeanName] is required for One-off job.");
singletonBeanRegistry.registerSingleton(jobBootstrapBeanName, new OneOffJobBootstrap(registryCenter, elasticJob, jobConfig));
} else {
//有配置cron表达式 就初始化为ScheduleJobBootstrap对象,定时任务
//设置bean name
String beanName = !Strings.isNullOrEmpty(jobBootstrapBeanName) ? jobBootstrapBeanName : jobConfig.getJobName() + "ScheduleJobBootstrap";
//注入ScheduleJobBootstrap对象为单利对象
singletonBeanRegistry.registerSingleton(beanName, new ScheduleJobBootstrap(registryCenter, elasticJob, jobConfig));
}
}

Class 类型注入的job有两种类型

1.ScheduleJobBootstrap:定时任务类型的job。

2.OneOffJobBootstrap:一定次job类型。

看下定义的new ScheduleJobBootstrap 方法

public JobScheduler(final CoordinatorRegistryCenter regCenter, final ElasticJob elasticJob, final JobConfiguration jobConfig) {
Preconditions.checkArgument(null != elasticJob, "Elastic job cannot be null.");
this.regCenter = regCenter;
//获取job监听器
Collection jobListeners = getElasticJobListeners(jobConfig);
// 集成所有操作zookeeper 节点的services,job 监听器
setUpFacade = new SetUpFacade(regCenter, jobConfig.getJobName(), jobListeners);
//获取当前job名称
String jobClassName = JobClassNameProviderFactory.getProvider().getJobClassName(elasticJob);
//zookeeper节点 {namespace}/{jobclassname}/config 放置job配置信息
this.jobConfig = setUpFacade.setUpJobConfiguration(jobClassName, jobConfig);
// 集成所有操作zookeeper 节点的services
schedulerFacade = new SchedulerFacade(regCenter, jobConfig.getJobName());
jobFacade = new LiteJobFacade(regCenter, jobConfig.getJobName(), jobListeners, findTracingConfiguration().orElse(null));
//检验job配置
validateJobProperties();
//定义job执行器
jobExecutor = new ElasticJobExecutor(elasticJob, this.jobConfig, jobFacade);
//监听器里注入GuaranteeService
setGuaranteeServiceForElasticJobListeners(regCenter, jobListeners);
//创建定时任务,开始执行
jobScheduleController = createJobScheduleController();
}

看下createJobScheduleController

private JobScheduleController createJobScheduleController() {
JobScheduleController result = new JobScheduleController(createScheduler(), createJobDetail(), getJobConfig().getJobName());
//注册job
JobRegistry.getInstance().registerJob(getJobConfig().getJobName(), result);
//注册器开始运行
registerStartUpInfo();
return result;
}

看下registerStartUpInfo方法

public void registerStartUpInfo(final boolean enabled) {
//开始所有的监听器
listenerManager.startAllListeners();
//选举leader /{namespace}/leader/election/instance 放置选举出来的服务器
leaderService.electLeader();
//{namespace}/{ipservers} 设置enable处理
serverService.persistOnline(enabled);
//临时节点 /{namespave}/instances 放置运行服务实例信息
instanceService.persistOnline();
//开启一个异步服务
if (!reconcileService.isRunning()) {
reconcileService.startAsync();
}
}

这里实行的操作:

1.开启所有监听器处理

2.leader选举

3.持久化节点数据

4.开启异步服务

第四步:4.识别job类型,在zookeeper节点上处理job节点数据,运行定时任务job.

@Override
public void run(final String… args) {
log.info("Starting ElasticJob Bootstrap.");
applicationContext.getBeansOfType(ScheduleJobBootstrap.class).values().forEach(ScheduleJobBootstrap::schedule);
log.info("ElasticJob Bootstrap started.");
}

获取到所有的定时任务job(ScheduleJobBootstrap类型),执行schedule方法,底层实际使用quartz框架运行定时任务。