ScheduledThreadPoolExecutor解析一(调度任务,任务队列):
[url]http://donald-draper.iteye.com/blog/2367332[/url]
ScheduledThreadPoolExecutor解析二(任务调度):
[url]http://donald-draper.iteye.com/blog/2367593[/url]
前面一篇文章,我们看了任务的调度,先回顾一下:
从调度线程池执行器的构造来看,核心线程池数量是必须设置的,线程工厂和拒绝策略可选,默认最大线程池数量为 Integer.MAX_VALUE,保活时间为0,即不存在空闲的任务线程,
任务队列为DelayedWorkQueue。
scheduleAtFixedRate方法首先根据任务command和任务执行系统时间,
及任务间隔时间period,构造调度任务,简单包装调度任务,延时执行调度任务,
延时执行调度任务。在延时执行调度任务时,
添加任务到延时DelayedWorkQueue,同时添加一个空任务工作线程,空任务工作线程执行时,
如果任务为null,则从任务队列中取任务。调度任务的执行,如果任务为ScheduledFutureTask,
在运行的时候,从新计算任务下一次执行的系统时间,重置任务线程状态为READY,添加任务到队列。
scheduleWithFixedDelay与scheduleAtFixedRate不同点在构造ScheduledFutureTask时间间隔为-delay。时间间隔p为正,以固定的频率调度任务即scheduleAtFixedRate,每隔p时间执行一次任务,无论上一次任务是否执行完,具体任务能否执行,调度线程池无法保证,这要看是否有工作线程可用;当时间间隔p为负,以固定的间隔时间调度任务,即scheduleWithFixedDelay,
当前任务执行完后,等待p时间,再执行下一个任务。
今天来看一下线程池的关闭。
public void shutdown() { //委托给父类线程池执行器 super.shutdown(); }
//ThreadPoolExecutor
public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //检查线程访问权限 checkShutdownAccess(); //更新线程池状态为SHUTDOWN advanceRunState(SHUTDOWN); //中断空闲工作线程 interruptIdleWorkers(); //线程池关闭hook onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } //尝试结束线程池,这个前面以说,这里不再说 tryTerminate();}
关键在这一点
//线程池关闭hook
onShutdown(); // hook for ScheduledThreadPoolExecutor
/** * Cancels and clears the queue of all tasks that should not be run * due to shutdown policy. Invoked within super.shutdown. */ @Override void onShutdown() { //获取任务队列 BlockingQueue<Runnable> q = super.getQueue(); boolean keepDelayed = getExecuteExistingDelayedTasksAfterShutdownPolicy(); boolean keepPeriodic = getContinueExistingPeriodicTasksAfterShutdownPolicy(); if (!keepDelayed && !keepPeriodic) { //如果在线程池关闭时,可以在任务执行时,取消间歇性任务和延时任务, //则遍历任务,并以不可中断方式取消任务。 for (Object e : q.toArray()) if (e instanceof RunnableScheduledFuture<?>) ((RunnableScheduledFuture<?>) e).cancel(false); q.clear(); } else { // Traverse snapshot to avoid iterator exceptions //否则,遍历任务根据是间歇性任务还是延时任务, //获取相应的线程池关闭是否可取消正在执行的任务的策略 for (Object e : q.toArray()) { if (e instanceof RunnableScheduledFuture) { RunnableScheduledFuture<?> t = (RunnableScheduledFuture<?>)e; if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) || t.isCancelled()) { // also remove if already cancelled 如果可取消或任务已经取消,则移除任务,成功,则以不可中断方式取消任务。 if (q.remove(t)) t.cancel(false); } } } } //这个在线程池相关的文章中以说,这里不再说,尝试关闭线程池 tryTerminate(); }
关闭操作,与线程池执行器的关闭基本相同,不同的是,在onShutdown方法,调度线程池执行器,重写了这个方法,这个方法主要是根据线程池关闭间歇性任务和延时任务的处理策略,确定是否以不可中断方式取消任务。
再来看立即关闭:
public List<Runnable> shutdownNow() { //委托给父类 return super.shutdownNow(); }
//执行Runnable任务public void execute(Runnable command) { schedule(command, 0, TimeUnit.NANOSECONDS); }//提交Runnable任务 // Override AbstractExecutorService methods /** * @throws RejectedExecutionException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */ public Future<?> submit(Runnable task) { return schedule(task, 0, TimeUnit.NANOSECONDS); } /** * @throws RejectedExecutionException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */ public <T> Future<T> submit(Runnable task, T result) { return schedule(Executors.callable(task, result), 0, TimeUnit.NANOSECONDS); } /** * @throws RejectedExecutionException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */ public <T> Future<T> submit(Callable<T> task) { return schedule(task, 0, TimeUnit.NANOSECONDS); }//设置线程池关闭延时任务处理策略 /** * Sets the policy on whether to execute existing delayed * tasks even when this executor has been {@code shutdown}. * In this case, these tasks will only terminate upon * {@code shutdownNow}, or after setting the policy to * {@code false} when already shutdown. * This value is by default {@code true}. * * @param value if {@code true}, execute after shutdown, else don't. * @see #getExecuteExistingDelayedTasksAfterShutdownPolicy */ public void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value) { executeExistingDelayedTasksAfterShutdown = value; if (!value && isShutdown()) //如果在线程池关闭时,可以取消延时任务 onShutdown(); }//设置线程池关闭间歇性任务处理策略 /** * Sets the policy on whether to continue executing existing * periodic tasks even when this executor has been {@code shutdown}. * In this case, these tasks will only terminate upon * {@code shutdownNow} or after setting the policy to * {@code false} when already shutdown. * This value is by default {@code false}. * * @param value if {@code true}, continue after shutdown, else don't. * @see #getContinueExistingPeriodicTasksAfterShutdownPolicy */ public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) { continueExistingPeriodicTasksAfterShutdown = value; if (!value && isShutdown()) //如果在线程池关闭时,可以取消间歇性任务 onShutdown(); }
总结:
[color=blue]关闭操作,与线程池执行器的关闭基本相同,不同的是,在onShutdown方法,调度线程池执行器,重写了这个方法,这个方法主要是根据线程池关闭间歇性任务和延时任务的处理策略,确定是否以不可中断方式取消任务。[/color]
手机扫一扫
移动阅读更方便
你可能感兴趣的文章