Executors.newScheduledThreadPool()定时任务线程池
阅读原文时间:2023年07月09日阅读:2

定时任务线程池是由 Timer 进化而来

jdk中的计划任务 Timer 工具类提供了以计时器或计划任务的功能来实现按指定时间或时间间隔执行任务,但由于 Timer 工具类并不是以池 pool ,而是以队列的方式来管理线程的,所以在高并发的情况下运行效率较低,在新版 JDK 中提供了 SchedleExecutorService对象来解决效率与定时任务的需求。

Timer讲解

其中Timer类主要负责计划任务的功能,也就是在指定的时间开始执行某一个任务,但封装任务的类却是TimerTask类。
通过继承 TimerTask 类 并实现 run() 方法来自定义要执行的任务

代码示例

public class Mytask extends TimerTask {
@Override
public void run()
{
DateFormat dateFormat = TimeUtil.df.get();
System.out.println("我的任务运行了" + dateFormat.format(new Date()));
}
}

// 通过执行Timer.schedule(TimerTask task,Date time) 在执行时间运行任务:
public class Run {
private static Timer timer=new Timer();

public static void main(String\[\] args) throws ParseException  
{  
    timer.schedule(new Mytask(), TimeUtil.df.get().parse("2017-09-14 09:19:30"));  
}  

}

public class TimeUtil{// 时间转换工具类

  public static final ThreadLocal df = new ThreadLocal() {

    @Override
    protected DateFormat initialValue() {
    return new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    }

  };
}

Timer类注意事项

1、创建一个 Timer 对象就是新启动了一个线程,但是这个新启动的线程,并不是守护线程,它一直在后台运行,通过如下 可以将新启动的 Timer 线程设置为守护线程。

private static Timer timer=new Timer(true);

2、提前:当计划时间早于当前时间,则任务立即被运行。

3、延迟:TimerTask 是以队列的方式一个一个被顺序运行的,所以执行的时间和你预期的时间可能不一致,因为前面的任务可能消耗的时间较长,则后面的任务运行的时间会被延迟。延迟的任务具体开始的时间,就是依据前面任务的"结束时间"

Timer的cancel() 和 TimerTask的cancel() 的区别?

前面提到任务的执行是以对列的方式一个个被顺序执行的(此处也说明不用考虑线程安全问题),TimerTask.cancel() 指的是把当前任务从任务对列里取消Timer.cancel() 值的是把当前任务队列里的所有任务都取消。值得注意的是,Timer 的cancel()有时并不一定会停止执行计划任务,而是正常执行。这是因为Timer类中的cancel()方法有时并没有争抢到queue锁,所以TimerTask类中的任务继续正常执行。

一篇讲解Timer特别细节的文章:https://blog.csdn.net/hl_java/article/details/79035237

来源继承

----> interface ScheduledExecutorService extends ExecutorService
----> interface ExecutorService extends Executor
----->class ScheduledThreadPoolExecutor extends ThreadPoolExecutor implements ScheduledExecutorService

schedule方法

1.schedule()方法是延迟运行方法,延时delay时间后执行任务,并不是周期性执行方法。
2.schedule()方法不会造成堵塞,该方法启动任务之后,代码会继续向下执行。
3.由于返回值是ScheduledFuture,执行该接口的get()方法时,会堵塞后面代码的执行。

public ScheduledFuture schedule(Runnable command,long delay,TimeUnit unit) {

if (command == null || unit == null)  
    throw new NullPointerException();  

  // 布置任务 scheduledFuture 是 RunnableSchedFuture实现类
RunnableScheduledFuture t = decorateTask(command,
new ScheduledFutureTask(command, null, triggerTime(delay, unit)));
  // 延迟执行
delayedExecute(t);
return t;
}

/**延迟或周期性任务的主要执行方法。。
否则,将任务添加到队列并启动一个线程(如果需要的话)来运行它。
(我们不能预启动线程来运行任务,因为任务(可能)还不应该运行。)
如果在添加任务时池被关闭,请根据状态和关闭后运行参数的需要取消并删除它。
*/

private void delayedExecute(RunnableScheduledFuture task) {
if (isShutdown())
reject(task);
else {
super.getQueue().add(task);
if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) &&
remove(task))
task.cancel(false);
else
ensurePrestart();
}
}

/**
与prestartCoreThread except相同,即使corePoolSize为0,也至少会启动一个线程。
*/

void ensurePrestart() {
int wc = workerCountOf(ctl.get());
if (wc < corePoolSize)
addWorker(null, true);
else if (wc == 0)
addWorker(null, false);
}

/**
检查是否可以根据当前池状态和给定的边界(core或maximum)添加新的worker。
如果是,则相应地调整worker计数,并且如果可能的话,创建并启动一个新的worker,并将firstTask作为其第一个任务运行。
如果池已停止或符合关闭条件,则此方法返回false。
如果线程工厂在被请求时创建线程失败,它也返回false。
如果线程创建失败,或者由于线程工厂返回null,或者由于异常(通常是thread .start()中的OutOfMemoryError),我们将干净地回滚。
@param firstTask 新线程应该先运行的任务(如果没有则为空)。
          当线程数量少于corePoolSize时(在方法execute()中),或者当队列已满时(在这种情况下,我们必须绕过队列),使用初始的第一个任务创建工作者。
          最初的空闲线程通常是通过prestartCoreThread创建的,或者用来替换其他濒死的工作线程。
@param core 如果为true则使用corePoolSize作为绑定,否则使用maximumPoolSize。
        (这里使用布尔指示符而不是值,以确保在检查其他池状态后读取新值)。
@return 成功返回true
*/

private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);

    // 检查超出数量,检查任务是否为空、队列是否为空  
    if (rs >= SHUTDOWN &&  
        ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))  
        return false;  

    for (;;) {  
        int wc = workerCountOf(c);//工作线程数和cou数量进行比较  
        if (wc >= CAPACITY ||  
            wc >= (core ? corePoolSize : maximumPoolSize))  
            return false;  
        if (compareAndIncrementWorkerCount(c))//CAS成功则跳出循环  
            break retry;  
        c = ctl.get();  // 从主内存重新读取 ctl  
        if (runStateOf(c) != rs)  
            continue retry;  
        // 否则由于workerCount改变CAS失败;重试内循环  
    }  
}  

boolean workerStarted = false;  
boolean workerAdded = false;  
Worker w = null;  
try {  
    w = new Worker(firstTask);  
    final Thread t = w.thread;  
    if (t != null) {  
        final ReentrantLock mainLock = this.mainLock;  
        mainLock.lock();  
        try {  
            // 保持锁时重新检查。如果ThreadFactory发生故障,或者在获取锁之前关闭,则退出。  
            int rs = runStateOf(ctl.get());  

            if (rs < SHUTDOWN ||  
                (rs == SHUTDOWN && firstTask == null)) {  
                if (t.isAlive()) // 预先检查线程是否仍然存活  
                    throw new IllegalThreadStateException();  
                workers.add(w);  
                int s = workers.size();  
                if (s > largestPoolSize)  
                    largestPoolSize = s;  
                workerAdded = true;  
            }  
        } finally {  
            mainLock.unlock();  
        }  
        if (workerAdded) {  
            t.start();// 添加成功后线程开始运行  
            workerStarted = true;  
        }  
    }  
} finally {  
    if (! workerStarted)  
        addWorkerFailed(w);  
}  
return workerStarted;  

}

scheduleAtFixedRate()周期性执行方法

1.该方法不堵塞,且该方法参数要求Runnable ,不可用Callable,因此无返回值。
2.initialDelay时间后开始执行任务,且每隔period时间执行一次,间隔时间从任务开始执行开始计算。
3.如果任务的执行业务时间超过了period间隔时间,则一次任务结束后立即开始下一次任务。

任务多了执行顺序?

首先创建一个 new ScheduledThreadPoolExecutor(1) 线程池,参数为1表示任务池中只有一个线程,但是此时我放入2个任务,也就是会排队执行,但是定时器不会排队,当schedule()方法执行完定时器就启动了,任务1开始运行,时间:1596525153927,执行4s,在时间为1596525157927时结束,4s时间内任务2定时器走完了(2s定时器),所以任务1结束可以立即执行任务2。以下是示例

public class ScheduledExecutorServiceDemo {
//任务线程
public static class MyCallable implements Callable{
String name;
long sleepTime;

    public MyCallable(String name, long sleepTime) {  
        super();  
        this.name = name;  
        this.sleepTime = sleepTime;  
    }

    @Override  
    public Object call() throws Exception {  
        System.out.println(name+"开始运行,时间:"+System.currentTimeMillis());  
        //休眠时间  
        Thread.sleep(sleepTime);  
        System.out.println(name+"结束运行,时间:"+System.currentTimeMillis());  
        return name+"结束运行!";  
    }  
}

public static void main(String\[\] args) {  
    ScheduledExecutorService scheduledes = null;  
    try {  
        //创建单线程任务池  
        scheduledes = new ScheduledThreadPoolExecutor(1);  
        System.out.println("开始执行定时任务!"+System.currentTimeMillis());  
        //开始执行第一个任务,等待时间已经开始计时  
        ScheduledFuture FutureA = scheduledes.schedule(new MyCallable("任务1",4000),2, TimeUnit.SECONDS);  
        //System.out.println(FutureA.get());  
        //开始执行第二个任务,等待时间已经开始计时  
        ScheduledFuture FutureB = scheduledes.schedule(new MyCallable("任务2",0),2, TimeUnit.SECONDS);  
        //System.out.println(FutureB.get());  
        System.out.println("结束执行定时任务!"+System.currentTimeMillis());

    } catch (Exception e) {  
        e.printStackTrace();  
    }  
}  

}

开始执行定时任务!1596525151924
结束执行定时任务!1596525151927
任务1开始运行,时间:1596525153927
任务1结束运行,时间:1596525157927
任务2开始运行,时间:1596525157927
任务2结束运行,时间:1596525157927