Dubbo源码阅读分享系列文章,欢迎大家关注点赞
时间是一种调度模型, 是一种高效的、批量管理定时任务的调度模型。时间轮一般会实现成一个环形结构,类似一个时钟,分为很多槽,一个槽代表一个时间间隔,每个槽使用双向链表存储定时任务;指针周期性地跳动,跳动到一个槽位,就执行该槽位的定时任务。
至于为什么使用时间轮我们可以对比下我们所熟知的数据结构来分析一下时间轮的优势,这样我们就明白为什么会出现时间轮。
添加/删除任务: 遍历每一个节点, 找到相应的位置插入, 因此时间复杂度为O(n);
执行任务: 取出最小定时任务为首节点, 因此时间复杂度为O(1);
有序队列的缺点在于添加/删除任务,我们可以通过树形结构来进行优化添加/删除,也就是红黑树。
添加/删除任务: 红黑树能将排序的的时间复杂度降到O(logN);
执行任务: 红黑树执行任务节点在最左侧节点, 因此按照查询时间复杂度为O(logN);
堆具有特点必须是完全二叉树,任一结点的值是其子树所有结点的最大值或最小值
添加/删除任务: 时间复杂度为O(logN);
执行任务: 最小节点为根节点, 时间复杂度为O(1);
对于时间轮的实现一般是环状+链表,这样子整体复杂度为:
添加/删除任务: 时间复杂度为O(logN);
执行任务: 最小节点为根节点, 时间复杂度为O(1);
整体上看看上去我们可以选择红黑树、最小堆、时间轮,但是如果是多线程情况,红黑树、最小堆执行操作需要锁住整个内容,而时间轮就不需要,类似分段式锁的概念,因此更优选择是时间轮。
下图是一个单层时间轮,假设下图时间轮的周期是1秒,时间轮中有10个槽位,则每个槽位代表的时间就是100ms,现在有A、B、C三个任务,分别是任务A(230ms后执行)、B(450ms之后运行)、C(1950ms之后运行)。我们可以看到任务A被放到了槽位2,任务B被放到了槽位4,任务C被放到了槽位9,当时间轮转动到对应的槽时,就会从槽中取出任务判断是否需要执行。这个里面涉及一个周期概念,任务C具有一个周期,当时间轮完成一次循环,下次执行到9的时候,任务C才会执行,目前Dubbo中采用单层时间轮机制。
对应多层时间轮就是具有多个时间轮,下图中具有两个时间轮,第一层时间轮还是保持和单层时间轮一样,第二层时间轮为一个周期为10秒的时间轮,还是按照上述案例,这个时候A、B任务还是被分配在第一层时间轮,对于C任务,当完成完成一个周期以后,第二层时间轮刻度会执行到1的位置,同时任务C也会被取出到第一层时间轮9的位置,当一层时间轮再次转动到9的位置的时候,则会触发任务C,这种将第二层的任务取出放入第一层中称为降层,它是为了保证任务被处理的时间精度。Kafka内部就是采用的这种多层时间轮机制。
心跳检查,Netty中的心跳检查就是采用时间轮形式;
超时处理,目前Dubbo中采用时间轮来处理超时调用;
分布式锁续期,目前在分布式锁Redisson通过时间轮定时给分布式锁续期;
定时任务,对于分布式定时任务的调度就是采用的时间轮设计;
消息中间件,延时队列消息的中间件一般采用时间轮实现;
Dubbo中时间轮的设计都位于org.apache.dubbo.common.timer包中,我们首先来看下核心接口的设计:
TimerTask封装了要执行的任务,所有的定时任务都需要继承TimerTask接口,TimerTask就是任务交接入口,该方法内部只有一个run方法,该方法接收一个Timeout类型。
Timeout与TimerTask一一对应,Timeout主要是为了获取定时任务的状态以及操作定时任务,Timeout与TimerTask两者的关系类似于线程池返回的Future对象与提交到线程池中的任务对象之间的关系。
Timer接口定义了定时器的基本行为,核心是newTimeout方法:提交一个定时任务并返回关联的Timeout对象。
HashedWheelTimeout是Timeout的唯一实现,它的作用有两个:
时间轮中双向链表的节点,其中封装了实际要执行的任务TimerTask;
定时任务TimerTask提交到 HashedWheelTimer 之后返回的,通过它可以查看定时任务的状态、对定时任务进行取消、从双向链表中移除等操作;
首先来看下HashedWheelTimeout核心字段,该核心字段的设计表明链表的结构是一个双向链表:
//初始化状态private static final int ST_INIT = 0;//被取消状态private static final int ST_CANCELLED = 1;//过期状态private static final int ST_EXPIRED = 2;//更新定时任务的状态private static final AtomicIntegerFieldUpdater<HashedWheelTimeout> STATE_UPDATER = AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimeout.class, "state");//时间轮对象private final HashedWheelTimer timer;//实际执行的任务private final TimerTask task;//定时任务执行的时间private final long deadline;//默认状态是初始化@SuppressWarnings({"unused", "FieldMayBeFinal", "RedundantFieldInitialization"})private volatile int state = ST_INIT;//当前任务剩余的时钟周期数long remainingRounds;//当前定时任务在链表中的前驱节点和后继节点 设计为一个双向链表HashedWheelTimeout next;HashedWheelTimeout prev;//时间轮中的一个槽//每个槽维护一个双向链表,当时间轮指针转到当前槽时,就会从槽所负责的双向链表中取出任务进行处理HashedWheelBucket bucket;
HashedWheelTimeout核心方法介绍:
isCancelled、isExpired方法,主要用于检查当前HashedWheelTimeout状态;
cancel方法将当前HashedWheelTimeout的状态设置为取消状态,并将当前HashedWheelTimeout添加到 cancelledTimeouts队列中等待销毁;
expire方法表示,当前到期的任务会调用该方法将会将当前HashedWheelTimeout设置为过期状态状态,然后调用其中的TimerTask的run方法执行定时任务,不同类型的任务可以自己实现run方法;
remove方法会将当前HashedWheelTimeout从时间轮中删除;
@Override public boolean cancel() { //CAS变更状态 if (!compareAndSetState(ST_INIT, ST_CANCELLED)) { return false; } //任务被取消时,时间轮会将它暂存到时间轮所维护的canceledTimeouts队列中. //当时间轮转动到槽进行任务处理之前和时间轮退出运行时都会调用cancel,而 //cancel会调用remove,从而清理该队列中被取消的定时任务 timer.cancelledTimeouts.add(this); return true;}void remove() { //获取当前任务属于哪个槽位 HashedWheelBucket bucket = this.bucket; if (bucket != null) { //从双向链表中移除节点 bucket.remove(this); } else { //当前时间轮所维护的定时任务的数量 timer.pendingTimeouts.decrementAndGet(); }}public void expire() { //CAS修改定时任务状态为已过期 if (!compareAndSetState(ST_INIT, ST_EXPIRED)) { return; } try { //执行定时任务 task.run(this); } catch (Throwable t) { if (logger.isWarnEnabled()) { logger.warn("An exception was thrown by " + TimerTask.class.getSimpleName() + '.', t); } }}
HashedWheelBucket是时间轮中的一个槽,它内部维护了双向链表的首尾指针,双向链表中的每一个节点就是一个HashedWheelTimeout对象,同时关联了一个TimerTask定时任务。
private HashedWheelTimeout head;private HashedWheelTimeout tail;
HashedWheelBucket维护双向链表的头尾节点,可以遍历整个链表,因此具备了维护任务的能力,接下来我们来看一下HashedWheelBucket的核心方法。
addTimeout方法新增HashedWheelTimeout到链表尾部;
pollTimeout方法移除双向链表中的头结点,并将其返回;
remove方法用于移除双向链表的指定节点;
clearTimeouts方法循环调用pollTimeout方法处理整个双向链表,并返回所有未超时或者未被取消的任务,该方法会在时间轮停止的时候被调用;
expireTimeouts方法遍历双向链表中的全部 HashedWheelTimeout节点,分别处理以下三种情况,定时任务已到期,则会通过remove方法取出,并调用其expire方法执行任务逻辑。定时任务已被取消,则通过remove方法取出直接丢弃。定时任务还未到期,则会将remainingRounds(剩余时钟周期)减一;
void addTimeout(HashedWheelTimeout timeout) { //空判断一下 assert timeout.bucket == null;timeout.bucket = this;//如果头节点为空 说明整个链表为空 则设置头尾为当前节点if (head == null) { head = tail = timeout;} else { //添加到未节点 tail.next = timeout; timeout.prev = tail; tail = timeout;}}/** Expire all {@link HashedWheelTimeout}s for the given {@code deadline}./void expireTimeouts(long deadline) { HashedWheelTimeout timeout = head; //时间轮指针转到某个槽时从双向链表头节点开始遍历 while (timeout != null) { HashedWheelTimeout next = timeout.next; //当前任务到期 if (timeout.remainingRounds <= 0) { //移除 next = remove(timeout); if (timeout.deadline <= deadline) { //执行任务 timeout.expire(); } else { // The timeout was placed into a wrong slot. This should never happen. throw new IllegalStateException(String.format( "timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline)); } } else if (timeout.isCancelled()) { //任务被取消 被移除 next = remove(timeout); } else { //时钟周期减一 timeout.remainingRounds--; } //判断下一个节点 timeout = next; }}void clearTimeouts(Set
HashedWheelTimer实现了Timer接口,它通过时间轮算法实现了一个定时器。可以通过newTimeout方法可以向时间轮中添加定时任务,该任务会先被暂存到timeouts队列中,等时间轮转动到某个槽时,会将该timeouts队列中的任务转移到某个槽所负责的双向链表中。从双向链表的头部开始迭代,对每个定时任务HashedWheelTimeout进行计算,属于当前时钟周期则取出运行,不属于则将其剩余的时钟周期数减一操作。此外还提供停止时间轮的stop方法,以及判断时间轮是否终止的方法。
//时间轮处理定时任务逻辑private final Worker worker = new Worker();//时间轮内部处理定时任务的线程private final Thread workerThread;private static final int WORKER_STATE_INIT = 0;private static final int WORKER_STATE_STARTED = 1;private static final int WORKER_STATE_SHUTDOWN = 2;//时间轮状态 0 - init, 1 - started, 2 - shut down@SuppressWarnings({"unused", "FieldMayBeFinal"})private volatile int workerState;//时间轮每个槽所代表的时间private final long tickDuration;//时间轮的环形队列,数组每个元素都是一个槽,一个槽负责维护一个双向链表,用于存储定时任务private final HashedWheelBucket[] wheel;//wheel.length - 1 private final int mask;//CountDownLatch保证线程已经启动private final CountDownLatch startTimeInitialized = new CountDownLatch(1);//外部向时间轮提交的定时任务private final Queue<HashedWheelTimeout> timeouts = new LinkedBlockingQueue<>();//用于暂存被取消的定时任务private final Queue<HashedWheelTimeout> cancelledTimeouts = new LinkedBlockingQueue<>();//时间轮剩余的待处理的定时任务数量private final AtomicLong pendingTimeouts = new AtomicLong(0);//最多允许多少个任务等待执行private final long maxPendingTimeouts;//当前时间轮的启动时间private volatile long startTime;
时间轮的初始化是在HashedWheelTimer的构造函数中完成的,主要就是创建HashedWheelBucket数组,以及创建workerThread工作线程,该线程就是负责处理时间轮中的定时任务的线程。
public HashedWheelTimer( ThreadFactory threadFactory, long tickDuration, TimeUnit unit, int ticksPerWheel, long maxPendingTimeouts) { //参数校验 if (threadFactory == null) { throw new NullPointerException("threadFactory"); } if (unit == null) { throw new NullPointerException("unit"); } if (tickDuration <= 0) { throw new IllegalArgumentException("tickDuration must be greater than 0: " + tickDuration); } if (ticksPerWheel <= 0) { throw new IllegalArgumentException("ticksPerWheel must be greater than 0: " + ticksPerWheel); } //圆环有多少时间间隔 将ticksPerWheel转化为一个大于等于该值的2^n的值 wheel = createWheel(ticksPerWheel); //快速计算槽的位置 mask = wheel.length - 1; //时间轮每个槽的时间间隔 this.tickDuration = unit.toNanos(tickDuration); //边界值检查 if (this.tickDuration >= Long.MAX_VALUE / wheel.length) { throw new IllegalArgumentException(String.format( "tickDuration: %d (expected: 0 < tickDuration in nanos < %d", tickDuration, Long.MAX_VALUE / wheel.length)); } //工作线程 workerThread = threadFactory.newThread(worker); //最多允许多少个任务等待执行 this.maxPendingTimeouts = maxPendingTimeouts; //限制timer实例个数,最大不超过64 if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT && WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) { reportTooManyInstances(); }}private static HashedWheelBucket[] createWheel(int ticksPerWheel) { if (ticksPerWheel <= 0) { throw new IllegalArgumentException( "ticksPerWheel must be greater than 0: " + ticksPerWheel);}if (ticksPerWheel > 1073741824) { throw new IllegalArgumentException( "ticksPerWheel may not be greater than 2^30: " + ticksPerWheel);}//计算创建多少个槽ticksPerWheel = normalizeTicksPerWheel(ticksPerWheel);//初始化时间轮数组HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel];for (int i = 0; i < wheel.length; i++) { wheel[i] = new HashedWheelBucket();}return wheel;}
提交定时任务发生在初始化之后,由newTimeout方法完成任务提交,方法内部将待处理的任务数量加1,然后启动时间轮线程,这时worker的run方法就会被系统调度运行。然后将该定时任务封装成HashedWheelTimeout加入到timeouts队列中。start之后,时间轮就开始运行起来了,直到外界调用stop方法终止退出。
public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) { if (task == null) { throw new NullPointerException("task"); } if (unit == null) { throw new NullPointerException("unit"); } //任务数加1 long pendingTimeoutsCount = pendingTimeouts.incrementAndGet(); //判断是否超过最大任务数 if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) { pendingTimeouts.decrementAndGet(); throw new RejectedExecutionException("Number of pending timeouts (" + pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending " + "timeouts (" + maxPendingTimeouts + ")"); } //启动时间轮 start(); //计算定时任务的deadline long deadline = System.nanoTime() + unit.toNanos(delay) - startTime; //参数校验 if (delay > 0 && deadline < 0) { deadline = Long.MAX_VALUE; } //创建一个HashedWheelTimeout对象 HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline); //被暂存到timeouts队列中 timeouts.add(timeout); return timeout;}public void start() { //判断时间轮状态 //1.如果是初始化, 则启动worker线程, 启动整个时间轮 //2. 如果已经启动则略过 //3. 如果是已经停止,则报错 switch (WORKER_STATE_UPDATER.get(this)) { case WORKER_STATE_INIT: if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) { workerThread.start(); } break; case WORKER_STATE_STARTED: break; case WORKER_STATE_SHUTDOWN: throw new IllegalStateException("cannot be started once stopped"); default: throw new Error("Invalid WorkerState"); } //等待worker线程初始化时间轮的启动时间 while (startTime == 0) { try { //countDownLatch来确保调度的线程已经被启动 startTimeInitialized.await(); } catch (InterruptedException ignore) { // Ignore - it will be ready very soon. } }}
Worker实现了Runnable接口,也就是时间轮内部的工作线程,工作线程来处理放入时间轮中的定时任务。对于该方法核心就是run方法,
public void run() { //初始化startTime时间轮初始化以后的时候 startTime = System.nanoTime(); if (startTime == 0) { // We use 0 as an indicator for the uninitialized value here, so make sure it's not 0 when initialized. startTime = 1; } //唤醒阻塞线程 startTimeInitialized.countDown(); do { //判断是否到了处理槽的时间 如果没到sleep final long deadline = waitForNextTick(); if (deadline > 0) { //获取对应槽 int idx = (int) (tick & mask); //清理用户主动取消的定时任务 processCancelledTasks(); //获取当前指针对应的槽位 HashedWheelBucket bucket = wheel[idx]; //将timeouts队列中的定时任务转移到时间轮中对应的槽中 transferTimeoutsToBuckets(); //处理该槽位的双向链表中的定时任务 bucket.expireTimeouts(deadline); tick++; } //运行状态一直循环 } while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED); //执行到此处说明时间轮被停止了 //清除所有槽中的任务, 并加入到未处理任务列表 for (HashedWheelBucket bucket : wheel) { bucket.clearTimeouts(unprocessedTimeouts); } //将还没有加入到槽中的待处理定时任务队列中的任务取出, 如果是未取消的任务, //则加入到未处理任务队列中 for (; ; ) { HashedWheelTimeout timeout = timeouts.poll(); if (timeout == null) { break; } if (!timeout.isCancelled()) { unprocessedTimeouts.add(timeout); } } //最后再次清理cancelledTimeouts队列中用户主动取消的定时任务 processCancelledTasks(); }
时间轮指针转动,时间轮周期开始;
清理用户主动取消的定时任务,这些定时任务在用户取消时,会记录到 cancelledTimeouts 队列中。在每次指针转动的时候,时间轮都会清理该队列;
将缓存在timeouts队列中的定时任务转移到时间轮中对应的槽中;
根据当前指针定位对应槽,处理该槽位的双向链表中的定时任务;
检测时间轮的状态。如果时间轮处于运行状态,则循环执行上述步骤,不断执行定时任务。如果时间轮处于停止状态,则执行下面的步骤获取到未被执行的定时任务并加入 unprocessedTimeouts 队列:遍历时间轮中每个槽位,并调用 clearTimeouts方法;对timeouts队列中未被加入槽中循环调用poll;
最后再次清理cancelledTimeouts队列中用户主动取消的定时任务;
在Dubbo中有关于时间轮的应用有两个核心的抽象类,一个是AbstractRetryTask,另外一个是AbstractTimerTask,关于AbstractRetryTask重试机制我们在注册中心的时候已经介绍完成,这里重点看下AbstractTimerTask实现。
关于AbstractTimerTask有是三个实现类,一个用来关闭连接,一个是心跳检查,一个是重连接,AbstractTimerTask会调用不同的实现,我们来看下HeartbeatTimerTask实现,
protected void doTask(Channel channel) { try { //获取最后一次读写时间 Long lastRead = lastRead(channel); Long lastWrite = lastWrite(channel); if ((lastRead != null && now() - lastRead > heartbeat) || (lastWrite != null && now() - lastWrite > heartbeat)) { //最后一次读写时间超过心跳时间,就会发送心跳请求 Request req = new Request(); req.setVersion(Version.getProtocolVersion()); req.setTwoWay(true); req.setEvent(HEARTBEAT_EVENT); //发送心跳信息 channel.send(req); if (logger.isDebugEnabled()) { logger.debug("Send heartbeat to remote channel " + channel.getRemoteAddress() + ", cause: The channel has no data-transmission exceeds a heartbeat period: " + heartbeat + "ms"); } } } catch (Throwable t) { logger.warn("Exception when heartbeat to remote channel " + channel.getRemoteAddress(), t); } }
欢迎大家点点关注,点点赞!
手机扫一扫
移动阅读更方便
你可能感兴趣的文章