elastic-job 的简单使用
阅读原文时间:2023年07月13日阅读:1

说明:这个是使用2.1.5版本

  elastic-job是当当开源的的的定时任务,使用也是很简单的,可以解决数据量的大的时候可以分片执行,多应用节点部署时候不会重复执行。

  是通过zookeeper作为控制中心,进行任务分配。

使用手册:http://elasticjob.io/docs/elastic-job-lite/02-guide/config-manual/

分片原理:https://www.cnblogs.com/haoxinyue/p/6919375.html

一. 安装elastic-job控制台

  下载源码:https://github.com/elasticjob/elastic-job-lite

  (1) 找到elastic-job-lite 下面的elastic-job-lite-console,然后在这个文件下执行mvn打包

mvn clean install -Dmaven.test.skip=true  

  (2)得到elastic-job-lite-console-2.1.5.tar.gz。并上传到linux服务器下,并且解压,

在bin文件夹下执行nohup ./start.sh & (-p 可以指定端口号) ,端口是8899 ,浏览器打开:ip:8899就可以访问控制台了,如下图:

  

已经配置过注册中心的界面

   

二, zookeeper安装

参考:

    https://blog.csdn.net/m290345792/article/details/52576890

    https://www.cnblogs.com/wangmingshun/p/7745808.html

三,实现job

  (1)使用maven 引入jar包


com.dangdang elastic-job-lite-core 2.1.
com.dangdang elastic-job-lite-spring 2.1.

  (2)编写一个测试job

  job功能要 实现SimpleJob这个类,然后重写execute方法,在这个方法中编写job的业务逻辑,例如

public class SimpleJobDemo2 implements SimpleJob {
@Override
public void execute(ShardingContext shardingContext) {
System.out.println(String.format("------Thread ID: %s, 任务总片数: %s, 当前分片项: %s",
Thread.currentThread().getId(), shardingContext.getShardingTotalCount(), shardingContext.getShardingItem()));
/**
* 实际开发中,有了任务总片数和当前分片项,查询选择数据进行执行
*
*/
}
}

  例如分片执行的:

public class MyElasticJob implements SimpleJob {

@Override  
public void execute(ShardingContext context) {  
    switch (context.getShardingItem()) {  
        case 0:  
            // do something by sharding item 0  
            break;  
        case 1:  
            // do something by sharding item 1  
            break;  
        case 2:  
            // do something by sharding item 2  
            break;  
        // case n: ...  
    }  
}  

}

(3) Spring 配置(更多配置参数请参考 使用手册)  

//数据库jndi配置,执行job 的时候会插入表,并且生成执行记
       ${db.jndiName}   


/**
* server-lists : 连接Zookeeper服务器的列表 包括IP地址和端口号 多个地址用逗号分隔 如: host1:2181,host2:2181
* namespace :Zookeeper的命名空间
* base-sleep-time-milliseconds: 等待重试的间隔时间的初始值 单位:毫秒
* max-sleep-time-milliseconds: 等待重试的间隔时间的最大值 单位:毫秒
* max-retries : 最大尝试次数
*
**/


/**
* id: 任务的唯一标示
* event-trace-rdb-data-source : job要操作的数据库(可以不设置)
* class:job 所在的位置
* registry-center-ref : 要指定的注册中心
* cron : job 触发的cron表达式
* sharding-total-count : 作业分片总数
* sharding-item-parameters :分片序列号和参数用等号分隔,多个键值对用逗号分隔 分片序列号从0开始,不可大于或等于作业分片总数 如: 0=A,1=B
**/

  (4)启动程序,就可以在之前部署的控制台看到job状态

    登录的账号密码在打包的配置文件里,登录后配置注册中心  就可以在页面中看到job了

    

四,job执行说明:

  1. 如果分片数是1  sharding-total-count = "1" zookeeper注册中心会在多台服务器中随机选择一台执行,如果不是1,则会根据分片策略执行。参考:分片原理

   2.  注册中心可以有多台,但是都是以单数的形式(1,3,5),因为zookeeper 是一主多从,主的宕机,会使用选举的形式再次确定哪个是主,哪些是从。

  3. 如果设置了数据库的jndi,并且设置了 event-trace-rdb-data-source 的值,job运行的时候会在数据库的 OB_EXECUTION_LOG和JOB_STATUS_TRACE_LOG两张表中生成执行记录,不设置也不影响job的正常运行 ,例如:

  

五,问题记录

  (1)zookeeper安装启动问题:

  启动会遇到问题,例如找不到jdk,具体原因在安装目录 bin/zookeeper.out文件中查找,找到问题并解决后在启动

  (2) 关闭应用的程序的时候,tomcat 进程还在

  tomcat(sh shutdown.sh) 关闭的时候,项目不能彻底停掉,进程还在:是因为关闭的时候servlet容器发现job还有线程存在,所以不能彻底关闭

  解决办法:增加一个监听,在contextDestroyed 方法中你用elastic-job的shutdown方法

//新建一个类实现ServletContextListener用于关闭elasticJob的线程
@WebListener
public class JobServletContextListener implements ServletContextListener {

private static Logger logger = LoggerFactory.getLogger(JobServletContextListener.class);

@Override  
public void contextInitialized(ServletContextEvent servletContextEvent) {

}

@Override  
public void contextDestroyed(ServletContextEvent servletContextEvent) {  
    logger.info("Destroying Context...");  
    try {  
        WebApplicationContext context = (WebApplicationContext) servletContextEvent.getServletContext().getAttribute(  
                WebApplicationContext.ROOT\_WEB\_APPLICATION\_CONTEXT\_ATTRIBUTE);

        String\[\] beanNames = context.getBeanDefinitionNames();

        for(String beanName:beanNames)  
        {  
            if(beanName.contains("dangdang")&&beanName.contains("SpringJobScheduler")){  
                logger.info("发现dangdang定时任务beanName: "+beanName);  
                SpringJobScheduler scheduler = (SpringJobScheduler)context.getBean(beanName);  
                scheduler.getSchedulerFacade().shutdownInstance();  
            }  
        }  
    } catch (Exception e) {  
        logger.error("Error Destroying Context", e);  
    }

}  

}

手机扫一扫

移动阅读更方便

阿里云服务器
腾讯云服务器
七牛云服务器

你可能感兴趣的文章