SynchronousQueue核心源码分析
阅读原文时间:2023年07月13日阅读:1

  SynchronousQueue是一个不存储元素的阻塞队列。每一个put操作必须等待一个take操作,否则不能继续添加元素。SynchronousQueue支持公平性和非公平性2种策略来访问队列。默认是采用非公平性策略访问队列。公平性策略底层使用了类似队列的数据结构,而非公平策略底层使用了类似栈的数据结构。SynchronousQueue可以看成是一个传球手,负责把生产者线程处理的数据直接传递给消费者线程。队列本身并不存储任何元素,非常适合传递性场景。SynchronousQueue的吞吐量高于LinkedBlockingQueue和ArrayBlockingQueue。

  下面是SynchronousQueue类的类图,下文中将详细分析这种阻塞队列的实现细节:

SynchronousQueue重要的成员变量

//表示运行当前程序的平台,所拥有的CPU数量
static final int NCPUS = Runtime.getRuntime().availableProcessors();

//为什么需要自旋这个操作?
//因为线程 挂起 唤醒站在cpu角度去看的话,是非常耗费资源的,涉及到用户态和内核态的切换…
//自旋的好处,自旋期间线程会一直检查自己的状态是否被匹配到,如果自旋期间被匹配到,那么直接就返回了
//如果自旋期间未被匹配到,自旋次数达到某个指标后,还是会将当前线程挂起的…
//NCPUS:当一个平台只有一个CPU时,你觉得还需要自旋么?
//答:肯定不需要自旋了,因为一个cpu同一时刻只能执行一个线程,自旋没有意义了…而且你还站着cpu 其它线程没办法执行..这个
//栈的状态更不会改变了.. 当只有一个cpu时 会直接选择 LockSupport.park() 挂起等待者线程。

//表示指定超时时间的话,当前线程最大自旋次数。
//只有一个cpu 自旋次数为0
//当cpu大于1时,说明当前平台是多核平台,那么指定超时时间的请求的最大自旋次数是 32 次。
//32是一个经验值。
static final int maxTimedSpins = (NCPUS < 2) ? 0 : 32;

//表示未指定超时限制的话,线程等待匹配时,自旋次数。
//是指定超时限制的请求的自旋次数的16倍.
static final int maxUntimedSpins = maxTimedSpins * 16;

//如果请求是指定超时限制的话,如果超时nanos参数是< 1000 纳秒时,
//禁止挂起。挂起再唤醒的成本太高了..还不如选择自旋空转呢…
static final long spinForTimeoutThreshold = 1000L;

SynchronousQueue使用了一个非常关键的方法来转移数据(从生产者线程转移到消费者线程),下面是这个方法

abstract static class Transferer {
/**
* @param e 可以为null,null时表示这个请求是一个 REQUEST 类型的请求
* 如果不是null,说明这个请求是一个 DATA 类型的请求。
*
* @param timed 如果为true 表示指定了超时时间 ,如果为false 表示不支持超时,表示当前请求一直等待到匹配为止,或者被中断。
* @param nanos 超时时间限制 单位 纳秒
*
* @return E 如果当前请求是一个 REQUEST类型的请求,返回值如果不为null 表示 匹配成功,如果返回null,表示REQUEST类型的请求超时 或 被中断。
* 如果当前请求是一个 DATA 类型的请求,返回值如果不为null 表示 匹配成功,返回当前线程put的数据。
* 如果返回值为null 表示,DATA类型的请求超时 或者 被中断..都会返回Null。
*
*/
abstract E transfer(E e, boolean timed, long nanos);
}

1、TransferStack是SynchronousQueue的一个内部类,可以使用它实现非公平的访问队列。下面是其重要的成员变量以及一些公共方法

/** 表示Node类型为 请求类型(消费者) */
static final int REQUEST = 0;
/** 表示Node类型为 数据类型(生产者) */
static final int DATA = 1;
/** 表示Node类型为 匹配中类型
* 假设栈顶元素为 REQUEST-NODE,当前请求类型为 DATA的话,入栈会修改类型为 FULFILLING 【栈顶 & 栈顶之下的一个node】。
* 假设栈顶元素为 DATA-NODE,当前请求类型为 REQUEST的话,入栈会修改类型为 FULFILLING 【栈顶 & 栈顶之下的一个node】。
*/
static final int FULFILLING = 2;

//判断当前模式是否为 匹配中状态
static boolean isFulfilling(int m) { return (m & FULFILLING) != 0; }

static final class SNode {
volatile SNode next; // next node in stack
//与当前node匹配的节点
//null:还没有任何匹配 等于自己:表示当前为取消状态 等于别的Snode:当前为匹配状态
volatile SNode match; // the node matched to this
//假设当前node对应的线程 自旋期间未被匹配成功,那么node对应的线程需要挂起,挂起前 waiter 保存对应的线程引用,
//方便 匹配成功后,被唤醒。
volatile Thread waiter; // to control park/unpark
//数据域,data不为空 表示当前Node对应的请求类型为 DATA类型。 反之则表示Node为 REQUEST类型。
Object item; // data; or null for REQUESTs
int mode;
// Note: item and mode fields don't need to be volatile
// since they are always written before, and read after,
// other volatile/atomic operations.

SNode(Object item) {  
    this.item = item;  
}  

//CAS方式设置Node对象的next字段  
boolean casNext(SNode cmp, SNode val) {  
    //优化:cmp == next  为什么要判断?  
    //因为cas指令 在平台执行时,同一时刻只能有一个cas指令被执行。  
    //有了java层面的这一次判断,可以提升一部分性能。 cmp == next 不相等,就没必要走 cas指令。  
    return cmp == next &&  
        UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);  
}  

/\*\*  
 \* 尝试匹配  
 \* 调用tryMatch的对象是 栈顶节点的下一个节点,与栈顶匹配的节点。  
 \*  
 \* @return ture 匹配成功。 否则匹配失败..  
 \*/  
boolean tryMatch(SNode s) {  
    //条件一:match == null 成立,说明当前Node尚未与任何节点发生过匹配...  
    //条件二 成立:使用CAS方式 设置match字段,表示当前Node已经被匹配了  
    if (match == null &&  
        UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) {  
        //当前Node如果自旋结束,那么会使用LockSupport.park 方法挂起,挂起之前会将Node对应的Thread 保留到 waiter字段。  
        Thread w = waiter;  
        //条件成立:说明Node对应的Thread已经挂起了...  
        if (w != null) {    // waiters need at most one unpark  
            waiter = null;  
            LockSupport.unpark(w);  
        }  
        return true;  
    }  
    return match == s;  
}  

/\*\*  
 \* Tries to cancel a wait by matching node to itself.  
 \*/  
void tryCancel() {  
    //match字段 保留当前Node对象本身,表示这个Node是取消状态,取消状态的Node,最终会被强制移除出栈。  
    UNSAFE.compareAndSwapObject(this, matchOffset, null, this);  
}  

//如果match保留的是当前Node本身,那表示当前Node是取消状态,反之 则 非取消状态。  
boolean isCancelled() {  
    return match == this;  
}  

// Unsafe mechanics  
private static final sun.misc.Unsafe UNSAFE;  
private static final long matchOffset;  
private static final long nextOffset;  

static {  
    try {  
        UNSAFE = sun.misc.Unsafe.getUnsafe();  
        Class<?> k = SNode.class;  
        matchOffset = UNSAFE.objectFieldOffset  
            (k.getDeclaredField("match"));  
        nextOffset = UNSAFE.objectFieldOffset  
            (k.getDeclaredField("next"));  
    } catch (Exception e) {  
        throw new Error(e);  
    }  
}  

}

//表示栈顶指针
volatile SNode head;

//设置栈顶元素,同样使用到了优化
boolean casHead(SNode h, SNode nh) {
return h == head &&
UNSAFE.compareAndSwapObject(this, headOffset, h, nh);
}

/**
* Creates or resets fields of a node. Called only from transfer
* where the node to push on stack is lazily created and
* reused when possible to help reduce intervals between reads
* and CASes of head and to avoid surges of garbage when CASes
* to push nodes fail due to contention.
*
* @param s SNode引用,当这个引用指向空时,snode方法会创建一个SNode对象 并且赋值给这个引用
* @param e SNode对象的item字段
* @param next 指向当前栈帧的下一个栈帧
* @param mode REQUEST/DATA/FULFILLING
*/
static SNode snode(SNode s, Object e, SNode next, int mode) {
if (s == null) s = new SNode(e);
s.mode = mode;
s.next = next;
return s;

2、TransferStack转移的核心方法transfer

E transfer(E e, boolean timed, long nanos) {
//包装当前线程的Node
SNode s = null; // constructed/reused as needed
//e == null 条件成立:当前线程是一个REQUEST线程。
//否则 e!=null 说明 当前线程是一个DATA线程,提交数据的线程。
int mode = (e == null) ? REQUEST : DATA;

//自旋  
for (;;) {  
    //h表示栈顶指针  
    SNode h = head;  
    /\*  
     \* If apparently empty or already containing nodes of same  
     \*    mode, try to push node on stack and wait for a match,  
     \*    returning it, or null if cancelled.  
     \*/  
    //CASE1:当前栈内为空 或者 栈顶Node模式与当前请求模式一致,尝试着把当前node入栈并且等待一个匹配,最后会返回对应的数据,  
    // 如果这个节点在这过程中被取消了,则返回null  
    if (h == null || h.mode == mode) {  // empty or same-mode  
        //条件一:成立,说明当前请求是指定了 超时限制的  
        //条件二:nanos <= 0 , nanos == 0. 表示这个请求 不支持 “阻塞等待”。 queue.offer();  
        if (timed && nanos <= 0) {      // can't wait  
            //条件成立:说明栈顶处于取消状态,协助栈顶出栈,再一次自旋尝试。  
            if (h != null && h.isCancelled())  
                casHead(h, h.next);     // pop cancelled node  
            elss  
                return null;  
        }  

        //什么时候执行else if 呢?  
        //当前栈顶为空 或者 模式与当前请求一致,且当前请求允许阻塞等待。  
        //snode(s, e, h, mode),将当前的信息包装为一个Snode节点  
        //casHead(h, s = snode(s, e, h, mode))  入栈操作。  
        else if (casHead(h, s = snode(s, e, h, mode))) {  
            //执行到这里,说明 当前请求入栈成功。  
            //入栈成功之后要做什么呢?  
            //在栈内等待一个好消息,等待被匹配!  

            //awaitFulfill 等待被匹配的逻辑...  
            //1.正常情况:返回匹配的节点  
            //2.取消情况:返回当前节点  s节点进去,返回s节点...  
            SNode m = awaitFulfill(s, timed, nanos);  

            //条件成立:说明当前Node状态是 取消状态...  
            if (m == s) {               // wait was cancelled  
                //将取消状态的节点 出栈...  
                clean(s);  
                //取消状态 最终返回null  
                return null;  
            }  
            //执行到这里 说明当前Node已经被匹配了...  

            //条件一:成立,说明栈顶是有Node  
            //条件二:成立,说明 Fulfill 和 当前Node 还未出栈,需要协助出栈。  
            if ((h = head) != null && h.next == s)  
                casHead(h, s.next);     // help s's fulfiller  
            //当前NODE模式为REQUEST类型:返回匹配节点的m.item 数据域  
            //当前NODE模式为DATA类型:返回Node.item 数据域,当前请求提交的 数据e  
            return (E) ((mode == REQUEST) ? m.item : s.item);  
        }  
    }  
    /\*  
     \* If apparently containing node of complementary mode,  
     \*    try to push a fulfilling node on to stack, match  
     \*    with corresponding waiting node, pop both from  
     \*    stack, and return matched item. The matching or  
     \*    unlinking might not actually be necessary because of  
     \*    other threads performing action 3:  
     \*  
     \*/  
    //什么时候来到这??  
    //栈顶Node的模式与当前请求的模式不一致,会执行else if 的条件。  
    //栈顶是 (DATA  Reqeust)    (Request   DATA)   (FULFILLING  REQUEST/DATA)  

    //CASE2:当前栈顶模式与请求模式不一致,且栈顶不是FULFILLING  
    else if (!isFulfilling(h.mode)) { // try to fulfill  
        //条件成立:说明当前栈顶状态为 取消状态,当前线程协助它出栈。  
        if (h.isCancelled())            // already cancelled  
            casHead(h, h.next);         // pop and retry  
            // 条件成立:说明当前节点压栈成功,入栈一个 FULFILLING | mode  NODE  
        else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {                 
            //自旋,fulfill 节点 和 fulfill.next 节点进行匹配工作...  
            for (;;) { // loop until matched or waiters disappear  
                //m 与当前s 匹配节点。  
                SNode m = s.next;       // m is s's match  
                // 如果m为null,说明除了s节点外的节点都被其它线程先一步匹配掉了  
                // 就清空栈并跳出内部循环,到外部循环再重新入栈判断  
                if (m == null) {        // all waiters are gone  
                    casHead(s, null);   // pop fulfill node  
                    s = null;           // use new node next time  
                    //回到外层大的 自旋中,再重新选择路径执行,此时有可能 插入一个节点。  
                    break;              // restart main loop  
                }  
                //什么时候会执行到这里呢?  
                //fulfilling 匹配节点不为null,进行真正的匹配工作。  

                //获取 匹配节点的 下一个节点。  
                SNode mn = m.next;  
                //尝试匹配,匹配成功,则将fulfilling 和 m 一起出栈  
                if (m.tryMatch(s)) {  
                    //结对出栈  
                    casHead(s, mn);     // pop both s and m  
                    //当前NODE模式为REQUEST类型:返回匹配节点的m.item 数据域  
                    //当前NODE模式为DATA类型:返回Node.item 数据域,当前请求提交的 数据e  
                    return (E) ((mode == REQUEST) ? m.item : s.item);  
                } else                  // lost match  
                    // 尝试匹配失败,说明m已经先一步被其它线程匹配了  
                    // 就协助清除它,进入下一个自旋,再一次进行尝试  
                    s.casNext(m, mn);   // help unlink  
            }  
        }  
    }  
    /\*  
        If top of stack already holds another fulfilling node,  
     \*    help it out by doing its match and/or pop  
     \*    operations, and then continue. The code for helping  
     \*    is essentially the same as for fulfilling, except  
     \*    that it doesn't return the item.  
     \*/  
    //CASE3:什么时候会执行?  
    //栈顶模式为 FULFILLING模式,表示栈顶和栈顶下面的栈帧正在发生匹配...  
    //当前请求需要做 协助 工作。  
    else {                            // help a fulfiller  
        //h 表示的是 fulfilling节点,m fulfilling匹配的节点。  
        SNode m = h.next;               // m is h's match  
        //m == null 什么时候可能成立呢?  
        //当s.next节点 超时或者被外部线程中断唤醒后,会执行 clean 操作 将 自己清理出栈,此时  
        //站在匹配者线程 来看,真有可能拿到一个null。  
        if (m == null)                  // waiter is gone  
            casHead(h, null);           // pop fulfilling node  
            //大部分情况:走else分支。  
        else {  
            //获取栈顶匹配节点的 下一个节点  
            SNode mn = m.next;  
            //条件成立:说明 m 和 栈顶 匹配成功  
            if (m.tryMatch(h))          // help match  
                casHead(h, mn);         // pop both h and m  
            else                        // lost match  
                // 尝试匹配失败,说明m已经先一步被其它线程匹配了  
                // 就协助清除它  
                h.casNext(m, mn);       // help unlink  
        }  
    }  
}  

}

3、TransferStack中其他重要的方法

/**
* Spins/blocks until node s is matched by a fulfill operation.
* @param s 当前请求Node
* @param timed 当前请求是否支持 超时限制
* @param nanos 如果请求支持超时限制,nanos 表示超时等待时长。
* @return matched node, or s if cancelled
*/
SNode awaitFulfill(SNode s, boolean timed, long nanos) {
//deadline 表示等待截止时间..
final long deadline = timed ? System.nanoTime() + nanos : 0L;
Thread w = Thread.currentThread();
//允许自旋检查的次数..
int spins = (shouldSpin(s) ?
(timed ? maxTimedSpins : maxUntimedSpins) : 0);

//自旋检查逻辑:1.是否匹配  2.是否超时  3.是否被中断..  
for (;;) {  
    //条件成立:说明当前线程收到中断信号,需要设置Node状态为取消状态。  
    if (w.isInterrupted())  
        //Node对象的 match 指向 当前Node 说明该Node状态就是 取消状态。  
        s.tryCancel();  

    //m 表示与当前Node匹配的节点。  
    //1.正常情况:有一个请求 与 当前Node 匹配成功,这个时候 s.match 指向 匹配节点。  
    //2.取消情况:当前match 指向 当前Node...  
    SNode m = s.match;  

    //m 表示与当前Node匹配的节点。  
    //1.正常情况:有一个请求 与 当前Node 匹配成功,这个时候 s.match 指向 匹配节点。  
    //2.取消情况:当前match 指向 当前Node...  
    if (m != null)  
        return m;  

    //条件成立:说明指定了超时限制..  
    if (timed) {  
        //nanos 表示距离超时 还有多少纳秒..  
        nanos = deadline - System.nanoTime();  
        //条件成立:说明已经超时了...  
        if (nanos <= 0L) {  
            //设置当前Node状态为 取消状态.. match-->当前Node  
            s.tryCancel();  
            continue;  
        }  
    }  
    //条件成立:说明当前线程还可以进行自旋检查...  
    if (spins > 0)  
        spins = shouldSpin(s) ? (spins-1) : 0;  
    //spins == 0 ,已经不允许再进行自旋检查了  
    else if (s.waiter == null)  
        s.waiter = w; // establish waiter so can park next iter  
        // 条件成立:说明当前Node对应的请求  未指定超时限制。  
    else if (!timed)  
        //使用不指定超时限制的park方法 挂起当前线程,直到 当前线程被外部线程 使用unpark唤醒。  
        LockSupport.park(this);  
   //什么时候执行到这里? timed == true 设置了 超时限制..  
   //条件成立:nanos > 1000 纳秒的值,只有这种情况下,才允许挂起当前线程..否则 说明 超时给的太少了...挂起和唤醒的成本 远大于 空转自旋...  
    else if (nanos > spinForTimeoutThreshold)  
        LockSupport.parkNanos(this, nanos);  
}  

}

/**
* Returns true if node s is at head or there is an active
* fulfiller.
*/
boolean shouldSpin(SNode s) {
//获取栈顶
SNode h = head;
//条件一 h == s :条件成立 说明当前s 就是栈顶,允许自旋检查…
//条件二 h == null : 什么时候成立? 当前s节点 自旋检查期间,又来了一个 与当前s 节点匹配的请求,双双出栈了…条件会成立。
//条件三 isFulfilling(h.mode) : 前提 当前 s 不是 栈顶元素。并且当前栈顶正在匹配中,这种状态 栈顶下面的元素,都允许自旋检查。
return (h == s || h == null || isFulfilling(h.mode));
}

/**
* Unlinks s from the stack.
*/
void clean(SNode s) {
//清空数据域
s.item = null; // forget item
s.waiter = null; // forget thread

/\*  
 \* At worst we may need to traverse entire stack to unlink  
 \* s. If there are multiple concurrent calls to clean, we  
 \* might not see s if another thread has already removed  
 \* it. But we can stop when we see any node known to  
 \* follow s. We use s.next unless it too is cancelled, in  
 \* which case we try the node one past. We don't check any  
 \* further because we don't want to doubly traverse just to  
 \* find sentinel.  
 \*/  

//检查取消节点的截止位置  
SNode past = s.next;  

if (past != null && past.isCancelled())  
    past = past.next;  

// Absorb cancelled nodes at head  
SNode p;  
//从栈顶开始向下检查,将栈顶开始向下连续的 取消状态的节点 全部清理出去,直到碰到past为止。  
while ((p = head) != null && p != past && p.isCancelled())  
    casHead(p, p.next);  

// Unsplice embedded nodes  
while (p != null && p != past) {  
    SNode n = p.next;  
    if (n != null && n.isCancelled())  
        p.casNext(n, n.next);  
    else  
        p = n;  
}  

}

1、TransferQueue是SynchronousQueue的一个内部类,可以使用它实现公平的访问队列。下面是其重要的成员变量以及一些公共方法

/** Node class for TransferQueue. */
static final class QNode {
volatile QNode next; // next node in queue
//数据域
volatile Object item; // CAS'ed to or from null
//当Node对应的线程 未匹配到节点时,对应的线程 最终会挂起,挂起之前会保留 线程引用到waiter ,
//方法 其它Node匹配当前节点时 唤醒 当前线程..
volatile Thread waiter; // to control park/unpark
//true 当前Node是一个DATA类型 false表示当前Node是一个REQUEST类型。
final boolean isData;

QNode(Object item, boolean isData) {  
    this.item = item;  
    this.isData = isData;  
}  
//修改当前节点next引用  
boolean casNext(QNode cmp, QNode val) {  
    return next == cmp &&  
        UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);  
}  
//修改当前节点数据域item  
boolean casItem(Object cmp, Object val) {  
    return item == cmp &&  
        UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);  
}  

/\*\*  
 \* Tries to cancel by CAS'ing ref to this as item.  
 \* 尝试取消当前node  
 \* 取消状态的Node,它的item域,指向自己Node。  
 \*/  
void tryCancel(Object cmp) {  
    UNSAFE.compareAndSwapObject(this, itemOffset, cmp, this);  
}  

//判断当前Node是否为取消状态  
boolean isCancelled() {  
    return item == this;  
}  

/\*\*  
 \* Returns true if this node is known to be off the queue  
 \* because its next pointer has been forgotten due to  
 \* an advanceHead operation.  
 \* 判断当前节点是否 “不在” 队列内,当next指向自己时,说明节点已经出队。  
 \*/  
boolean isOffList() {  
    return next == this;  
}  

// Unsafe mechanics  
private static final sun.misc.Unsafe UNSAFE;  
private static final long itemOffset;  
private static final long nextOffset;  

static {  
    try {  
        UNSAFE = sun.misc.Unsafe.getUnsafe();  
        Class<?> k = QNode.class;  
        itemOffset = UNSAFE.objectFieldOffset  
            (k.getDeclaredField("item"));  
        nextOffset = UNSAFE.objectFieldOffset  
            (k.getDeclaredField("next"));  
    } catch (Exception e) {  
        throw new Error(e);  
    }  
}  

}

/** Head of queue */
transient volatile QNode head;
/** Tail of queue */
transient volatile QNode tail;
/**
* Reference to a cancelled node that might not yet have been
* unlinked from queue because it was the last inserted node
* when it was cancelled.
* 表示被清理节点的前驱节点。因为入队操作是 两步完成的,
* 第一步:t.next = newNode
* 第二步:tail = newNode
* 所以,队尾节点出队,是一种非常特殊的情况,需要特殊处理,回头讲!
*/
transient volatile QNode cleanMe;

TransferQueue() {
QNode h = new QNode(null, false); // initialize to dummy node.
head = h;
tail = h;
}

/**
* Tries to cas nh as new head; if successful, unlink
* old head's next node to avoid garbage retention.
* 设置头指针指向新的节点,蕴含操作:老的头节点出队。
*/
void advanceHead(QNode h, QNode nh) {
if (h == head &&
UNSAFE.compareAndSwapObject(this, headOffset, h, nh))
h.next = h; // forget old next
}

/**
* Tries to cas nt as new tail.
* 更新队尾节点 为新的队尾。
* @param t 老的队尾
* @param nt 新的队尾
*/
void advanceTail(QNode t, QNode nt) {
if (tail == t)
UNSAFE.compareAndSwapObject(this, tailOffset, t, nt);
}
/**
* Tries to CAS cleanMe slot.
*/
boolean casCleanMe(QNode cmp, QNode val) {
return cleanMe == cmp &&
UNSAFE.compareAndSwapObject(this, cleanMeOffset, cmp, val);

2、TransferQueue转移的核心方法transfer

E transfer(E e, boolean timed, long nanos) {
//s 指向当前请求 对应Node
QNode s = null; // constructed/reused as needed
//isData == true 表示 当前请求是一个写数据操作(DATA) 否则isData == false 表示当前请求是一个 REQUEST操作。
boolean isData = (e != null);

for (;;) {  
    QNode t = tail;  
    QNode h = head;  
    if (t == null || h == null)         // saw uninitialized value  
        continue;                       // spin  

    /\*  
     \* 1. If queue apparently empty or holding same-mode nodes,  
     \*    try to add node to queue of waiters, wait to be  
     \*    fulfilled (or cancelled) and return matching item.  
     \*/  
    //CASE1:入队  
    //条件一:成立,说明head和tail同时指向dummy节点,当前队列实际情况 就是 空队列。此时当前请求需要做入队操作,因为没有任何节点 可以去匹配。  
    //条件二:队列不是空,队尾节点与当前请求类型是一致的情况。说明也是无法完成匹配操作的情况,此时当前节点只能入队...  
    if (h == t || t.isData == isData) { // empty or same-mode  
        //获取当前队尾t 的next节点 tn - t.next  
        QNode tn = t.next;  

        if (t != tail)                  // inconsistent read  
            continue;  
        //条件成立:说明已经有线程 入队了,且只完成了 入队的 第一步:设置t.next = newNode, 第二步可能尚未完成..  
        if (tn != null) {               // lagging tail  
            //协助更新tail 指向新的 尾结点。  
            advanceTail(t, tn);  
            continue;  
        }  
        //条件成立:说明当前调用transfer方法的 上层方法 可能是 offer() 无参的这种方法进来的,这种方法不支持 阻塞等待...  
        if (timed && nanos <= 0)        // can't wait  
            return null;  
        //条件成立:说明当前请求尚未 创建对应的node  
        if (s == null)  
            //创建node过程...  
            s = new QNode(e, isData);  
        //条件 不成立:!t.casNext(null, s)  说明当前t仍然是tail,当前线程对应的Node入队的第一步 完成!  
        if (!t.casNext(null, s))        // failed to link in  
            continue;  
        //更新队尾 为请求节点。  
        advanceTail(t, s);              // swing tail and wait  

        //当前节点 等待匹配....  
        //当前请求为DATA模式时:e 请求带来的数据  
        //x == this 当前SNode对应的线程 取消状态  
        //x == null 表示已经有匹配节点了,并且匹配节点拿走了item数据。  

        //当前请求为REQUEST模式时:e == null  
        //x == this 当前SNode对应的线程 取消状态  
        //x != null 且 item != this  表示当前REQUEST类型的Node已经匹配到一个DATA类型的Node了。  
        Object x = awaitFulfill(s, e, timed, nanos);  
        //说明当前Node状态为 取消状态,需要做 出队逻辑。  
        if (x == s) {                   // wait was cancelled  
            clean(t, s);  
            return null;  
        }  

        //执行到这里说明 当前Node 匹配成功了...  
        //1.当前线程在awaitFulfill方法内,已经挂起了...此时运行到这里时是被 匹配节点的线程使用LockSupport.unpark() 唤醒的..  
        //被唤醒:当前请求对应的节点,肯定已经出队了,因为匹配者线程 是先让当前Node出队的,再唤醒当前Node对应线程的。  

        //2.当前线程在awaitFulfill方法内,处于自旋状态...此时匹配节点 匹配后,它检查发现了,然后返回到上层transfer方法的。  
        //自旋状态返回时:当前请求对应的节点,不一定就出队了...  

        //被唤醒时:s.isOffList() 条件会成立。  !s.isOffList() 不会成立。  
        //条件成立:说明当前Node仍然在队列内,需要做 匹配成功后 出队逻辑。  
        if (!s.isOffList()) {           // not already unlinked  
            //其实这里面做的事情,就是防止当前Node是自旋检查状态时发现 被匹配了,然后当前线程 需要将  
            //当前线程对应的Node做出队逻辑.  

            //t 当前s节点的前驱节点,更新dummy节点为 s节点。表示head.next节点已经出队了...  
            advanceHead(t, s);          // unlink if head  
            //x != null 且 item != this  表示当前REQUEST类型的Node已经匹配到一个DATA类型的Node了。  
            //因为s节点已经出队了,所以需要把它的item域 给设置为它自己,表示它是个取消出队状态。  
            if (x != null)              // and forget fields  
                s.item = s;  
            s.waiter = null;  
        }  
        //x != null 成立,说明当前请求是REQUEST类型,返回匹配到的数据x  
        //x != null 不成立,说明当前请求是DATA类型,返回DATA请求时的e。  
        return (x != null) ? (E)x : e;  

    }  
    /\*  
     \* 2. If queue apparently contains waiting items, and this  
     \*    call is of complementary mode, try to fulfill by CAS'ing  
     \*    item field of waiting node and dequeuing it, and then  
     \*    returning matching item.  
     \*/  
    //CASE2:队尾节点 与 当前请求节点 互补 (队尾->DATA,请求类型->REQUEST)  (队尾->REQUEST, 请求类型->DATA)  
    else {                            // complementary-mode  
        //h.next节点 其实是真正的队头,请求节点 与队尾模式不同,需要与队头 发生匹配。因为TransferQueue是一个 公平模式  
        QNode m = h.next;               // node to fulfill  

        //条件一:t != tail 什么时候成立呢? 肯定是并发导致的,其它线程已经修改过tail了,有其它线程入队过了..当前线程看到的是过期数据,需要重新循环  
        //条件二:m == null 什么时候成立呢? 肯定是其它请求先当前请求一步,匹配走了head.next节点。  
        //条件三:条件成立,说明已经有其它请求匹配走head.next了。。。当前线程看到的是过期数据。。。重新循环...  
        if (t != tail || m == null || h != head)  
            continue;                   // inconsistent read  

        Object x = m.item;  

        //条件一:isData == (x != null)  
        //isData 表示当前请求是什么类型  isData == true:当前请求是DATA类型  isData == false:当前请求是REQUEST类型。  
        //1.假设isData == true   DATA类型  
        //m其实表示的是 REQUEST 类型的NODE,它的数据域是 null  => x==null  
        //true == (null != null)  => true == false => false  

        //2.假设isData == false REQUEST类型  
        //m其实表示的是 DATA 类型的NODE,它的数据域是 提交是的e ,并且e != null。  
        //false == (obj != null) => false == true => false  

        //总结:正常情况下,条件一不会成立。  

        //条件二:条件成立,说明m节点已经是 取消状态了...不能完成匹配,当前请求需要continue,再重新选择路径执行了..  

        //条件三:!m.casItem(x, e),前提条件 m 非取消状态。  
        //1.假设当前请求为REQUEST类型   e == null  
        //m 是 DATA类型了...  
        //相当于将匹配的DATA Node的数据域清空了,相当于REQUEST 拿走了 它的数据。  

        //2.假设当前请求为DATA类型    e != null  
        //m 是 REQUEST类型了...  
        //相当于将匹配的REQUEST Node的数据域 填充了,填充了 当前DATA 的 数据。相当于传递给REQUEST请求数据了...  
        if (isData == (x != null) ||    // m already fulfilled  
            x == m ||                   // m cancelled  
            !m.casItem(x, e)) {         // lost CAS  
            //将头结点进行更新,进入下一次cas中  
            advanceHead(h, m);          // dequeue and retry  
            continue;  
        }  

        //执行到这里,说明匹配已经完成了,匹配完成后,需要做什么?  
        //1.将真正的头节点 出队。让这个真正的头结点成为dummy节点  
        advanceHead(h, m);              // successfully fulfilled  

        LockSupport.unpark(m.waiter);  
        return (x != null) ? (E)x : e;  
    }  
}  

}

3、TransferQueue中其他重要的方法

Object awaitFulfill(QNode s, E e, boolean timed, long nanos) {
/* Same idea as TransferStack.awaitFulfill */
final long deadline = timed ? System.nanoTime() + nanos : 0L;
Thread w = Thread.currentThread();
//允许自旋检查的次数..
int spins = ((head.next == s) ?
(timed ? maxTimedSpins : maxUntimedSpins) : 0);
//自旋:1.检查状态等待匹配 2.挂起线程 3.检查状态 是否被中断 或者 超时..
for (;;) {
//条件成立:说明线程等待过程中,收到了中断信号,属于中断唤醒..
if (w.isInterrupted())
s.tryCancel(e);
Object x = s.item;
//item有几种情况呢?
//当SNode模式为DATA模式时:
//1.item != null 且 item != this 表示请求要传递的数据 put(E e)
//2.item == this 当前SNode对应的线程 取消状态
//3.item == null 表示已经有匹配节点了,并且匹配节点拿走了item数据。

    //当SNode模式为REQUEST模式时:  
    //1.item == null 时,正常状态,当前请求仍然未匹配到对应的DATA请求。  
    //2.item == this 当前SNode对应的线程 取消状态  
    //3.item != null 且 item != this  表示当前REQUEST类型的Node已经匹配到一个DATA类型的Node了。  

    //条件成立:  
    //当前请求为DATA模式时:e 请求带来的数据  
    //item == this 当前SNode对应的线程 取消状态  
    //item == null 表示已经有匹配节点了,并且匹配节点拿走了item数据。  

    //当前请求为REQUEST模式时:e == null  
    //item == this 当前SNode对应的线程 取消状态  
    //item != null 且 item != this  表示当前REQUEST类型的Node已经匹配到一个DATA类型的Node了。  
    if (x != e)  
        return x;  

    //条件成立:说明请求指定了超时限制..  
    if (timed) {  
        nanos = deadline - System.nanoTime();  
        if (nanos <= 0L) {  
            s.tryCancel(e);  
            continue;  
        }  
    }  
    if (spins > 0)  
        --spins;  
    else if (s.waiter == null)  
        s.waiter = w;  
    else if (!timed)  
        LockSupport.park(this);  
    else if (nanos > spinForTimeoutThreshold)  
        LockSupport.parkNanos(this, nanos);  
}  

}

/**
* Gets rid of cancelled node s with original predecessor pred.
*/
void clean(QNode pred, QNode s) {
//将清理节点的waiter置为null
s.waiter = null; // forget thread
/*
* At any given time, exactly one node on list cannot be
* deleted -- the last inserted node. To accommodate this,
* if we cannot delete s, we save its predecessor as
* "cleanMe", deleting the previously saved version
* first. At least one of node s or the node previously
* saved can always be deleted, so this always terminates.
*/
while (pred.next == s) { // Return early if already unlinked
QNode h = head;
QNode hn = h.next; // Absorb cancelled first node as head
if (hn != null && hn.isCancelled()) {
advanceHead(h, hn);
continue;
}
QNode t = tail; // Ensure consistent read for tail
if (t == h)
return;
QNode tn = t.next;
if (t != tail)
continue;
if (tn != null) {
advanceTail(t, tn);
continue;
}
if (s != t) { // If not tail, try to unsplice
QNode sn = s.next;
if (sn == s || pred.casNext(s, sn))
return;
}
QNode dp = cleanMe;
if (dp != null) { // Try unlinking previous cancelled node
QNode d = dp.next;
QNode dn;
if (d == null || // d is gone or
d == dp || // d is off list or
!d.isCancelled() || // d not cancelled or
(d != t && // d not tail and
(dn = d.next) != null && // has successor
dn != d && // that is on list
dp.casNext(d, dn))) // d unspliced
casCleanMe(dp, null);
if (dp == pred)
return; // s is already saved node
} else if (casCleanMe(null, pred))
return; // Postpone cleaning s
}
}