SynchrounousQueue
是一个比较特殊的无界阻塞队列并支持非公平和公平模式,严格意义上来说不算一个队列,因为它不像其他阻塞队列一样能有容量,它仅有一个指向栈顶的地址,栈中的节点由线程自己保存。任意的线程都会等待直到获得数据(消费)或者交付完成(生产)才会返回。
SynchronousQueue
和普通的阻塞队列的差异类似于下图所示(非公平模式):
阻塞队列通常是存储生产者的生产结果然后消费者去消费,阻塞队列就类似于一个中转站。
SynchronousQueue
则存储生产结果,只告诉消费者生产者的位置,然后让其自己去与之交流(反过来一样),就没有中转的一个过程而是直接交付的。
SynchronousQueue
将数据交付的任务交给生产者或消费者自行处理,实现的非常看不懂。
那么既然是Queue
,就可以通过 offer
和 take
方法来了解
offer:
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
return transferer.transfer(e, true, 0) != null;
}
take:
public E take() throws InterruptedException {
E e = transferer.transfer(null, false, 0);
if (e != null)
return e;
Thread.interrupted();
throw new InterruptedException();
}
offer
和 take
中都调用了 transferer.transfer(...)
transferer
是一个接口 SynchrounousQueue
有两个实现类:
TransferQueue: 用于公平交付
TransferStack:用于不公平交付
这两个的作用可以通过 SynchrounousQueue
的构造方法得知:
public SynchronousQueue(boolean fair) {
transferer = fair ? new TransferQueue
}
如果启用公平交付则创建 TransferQueue
否则使用 TransferStack
首先先分析 TransferStack.transfer
通过类名得知,是使用堆栈实现的,是一个LIFO的序列。
每个请求的线程都会被包装成一个 SNode
,具有以下属性:
class SNode {
volatile SNode next; // 链接的下一个SNode
volatile SNode match; // 与该线程匹配的另外一个线程SNode节点
volatile Thread waiter; // 当前请求的线程
Object item;
int mode; //该节点的类型(模式)
}
很显然,是一个链表结构,使用一个mode
来标识该节点的类型,具有以下值:
// 代表一个消费者
int REQUEST = 0;
// 代表一个生产者
int DATA = 1;
// 代表已经和另外一个节点匹配
int FULFILLING = 2;
通过源码注释得知,整个TransferStack.transfer
可以分为以下几步:
如果当前的栈是空的,或者栈顶与请求的节点模式相同,那么就将该节点作为栈顶并等待下一个与之相匹配的请求节点,最后返回匹配节点的数据(take或offer)或者null(被取消/中断)
如果栈不为空,请求节点与栈顶节点相匹配(一个是REQUEST一个是DATA)那么当前节点模式变为FULFILLING,然后将其压入栈中和互补的节点进行匹配,完成交付后同时弹出栈并返回交易的数据,如果匹配失败则与其他节点解除关系等带回收。
如果栈顶已经存在一个FULFILLING的节点,说明正在交付,那么就帮助这个栈顶节点快速完成交易。
下面用图来描述先生产后消费的例子
当栈为空将其封装为SNode
节点后入栈,自旋等待其他节点与自己匹配
这是TransferStack.transfer
的第一个部分,用来处理栈为空或者是多个生产者/消费者的情况,使得都自旋等待匹配。
if (h == null || h.mode == mode) { // empty or same-mode
// 生产者/消费者不愿意等待则直接返回
if (timed && nanos <= 0L) { // can't wait
if (h != null && h.isCancelled())
casHead(h, h.next); // pop cancelled node
else
return null;
// 创建一个节点并将该节点作为栈顶
} else if (casHead(h, s = snode(s, e, h, mode))) {
// 自旋等待下一个与之匹配的节点
SNode m = awaitFulfill(s, timed, nanos);
// 如果该节点已经取消等待
if (m == s) {
//清理该节点
clean(s);
return null;
}
// 如果有节点与自己匹配那么就返回交换的元素
if ((h = head) != null && h.next == s)
casHead(h, s.next); // help s's fulfiller
// 如果当前模式为DATA就表示是 消费者等待生产者生产
// 如果当前模式为REQUEST就表示是 生产者等待消费者消费
return (E) ((mode == REQUEST) ? m.item : s.item);
}
}
栈不为空,消费者压入栈顶,消费者与生产者进行匹配,消费者改变头节点(也就是本身)的状态为FuLFILLING
https://blog-1253313758.cos.ap-guangzhou.myqcloud.com/202207271911008.png"width=50% height=50%/>
这是TransferStack.transfer
的第二个部分,新节点和已经存在的节点进行匹配
else if (!isFulfilling(h.mode)) { // try to fulfill
if (h.isCancelled()) // already cancelled
casHead(h, h.next); // pop and retry
else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
for (;;) { // loop until matched or waiters disappear
// m 为与当前插入的节点相匹配的节点,也就是之前head指向的节点
SNode m = s.next; // m is s's match
// 如果m取消了则清空整个栈,这里不用担心后面的节点,因为clean会将已经取消的节点弹出。m为空就代表没有节点和该节点匹配了
if (m == null) { // all waiters are gone
casHead(s, null); // pop fulfill node
s = null; // use new node next time
break; // restart main loop
}
SNode mn = m.next;
// 尝试匹配
if (m.tryMatch(s)) {
// 匹配成功并弹出
casHead(s, mn); // pop both s and m
return (E) ((mode == REQUEST) ? m.item : s.item);
} else // lost match
// 匹配失败
s.casNext(m, mn); // help unlink
}
}
}
匹配完成之后将这两个节点弹出并返回交换的元素
使其他节点可以辅助其他节点完成任务
else {
// 栈顶节点正在进行匹配,帮助栈顶节点完成匹配
SNode m = h.next; // m is h's match
//
if (m == null) // waiter is gone
casHead(h, null); // pop fulfilling node
else {
SNode mn = m.next;
if (m.tryMatch(h)) // help match
casHead(h, mn); // pop both h and m
else // lost match
h.casNext(m, mn); // help unlink
}
}
先消费者后生产者也是同理
有几个关键方法需要额外注意的:
casHead
是一个通过CAS更新头节点的方法,J9 之后就不使用 Unsafe
了,改为使用handle
boolean casHead(SNode h, SNode nh) {
return h == head &&
SHEAD.compareAndSet(this, h, nh);
}
awaitFulfill
使使节点自旋等待的一个方法,如果该节点位于头节点或者是在等待其他节点与自己匹配都会自旋。自旋期间会监测自己的中断状态、匹配状态。如果超过了自旋次数或者该节点不允许等待都会通过LockSupport.park
来使线程阻塞。
SNode awaitFulfill(SNode s, boolean timed, long nanos) {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
Thread w = Thread.currentThread();
int spins = shouldSpin(s)
? (timed ? MAX_TIMED_SPINS : MAX_UNTIMED_SPINS)
: 0;
for (;;) {
if (w.isInterrupted())
s.tryCancel();
SNode m = s.match;
if (m != null)
return m;
if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
s.tryCancel();
continue;
}
}
if (spins > 0) {
Thread.onSpinWait();
spins = shouldSpin(s) ? (spins - 1) : 0;
}
else if (s.waiter == null)
s.waiter = w; // establish waiter so can park next iter
else if (!timed)
LockSupport.park(this);
else if (nanos > SPIN_FOR_TIMEOUT_THRESHOLD)
LockSupport.parkNanos(this, nanos);
}
}
clean
将一个节点移除,用来清理已经中断(取消)的节点。
最坏的情况下需要遍历整个栈来移除s
。可能会存在多个线程并发的删除其他节点。
该方法从头部开始寻找所有被取消的节点然后将其删除
void clean(SNode s) {
s.item = null; // forget item
s.waiter = null; // forget thread
SNode past = s.next;
if (past != null && past.isCancelled())
past = past.next;
// Absorb cancelled nodes at head
SNode p;
// 从头部开始遇到被取消的节点就出栈
while ((p = head) != null && p != past && p.isCancelled())
casHead(p, p.next);
// Unsplice embedded nodes
// 继续寻找
while (p != null && p != past) {
SNode n = p.next;
if (n != null && n.isCancelled())
p.casNext(n, n.next);
else
p = n;
}
}
tryMatch
使两个线程相互匹配,并使在等待的线程解锁
boolean tryMatch(SNode s) {
if (match == null &&
// 修改被匹配节点的match状态,使得该节点能够了解到自己已经和其他节点匹配了
SMATCH.compareAndSet(this, null, s)) {
// 如果被匹配的节点已经阻塞了,那么将该节点的线程unpark
Thread w = waiter;
if (w != null) { // waiters need at most one unpark
waiter = null;
LockSupport.unpark(w);
}
// 匹配成功
return true;
}
return match == s;
}
从这里也能看出两个线程是直接交付的,没有中间商。速度相对来说就会快一点。
下面是该方法的完成代码:
E transfer(E e, boolean timed, long nanos) {
SNode s = null; // constructed/reused as needed
int mode = (e == null) ? REQUEST : DATA;
for (;;) {
SNode h = head;
// 栈顶节点为空或者与栈顶节点模式匹配
if (h == null || h.mode == mode) { // empty or same-mode
if (timed && nanos <= 0L) { // can't wait
if (h != null && h.isCancelled())
casHead(h, h.next); // pop cancelled node
else
return null;
// 创建一个节点并将该节点作为栈顶
} else if (casHead(h, s = snode(s, e, h, mode))) {
// 自旋等待下一个与之匹配的节点
SNode m = awaitFulfill(s, timed, nanos);
// 该节点已经取消等待
if (m == s) {
//清理该节点
clean(s);
return null;
}
// 已经匹配,
if ((h = head) != null && h.next == s)
casHead(h, s.next); // help s's fulfiller
// 如果当前匹配模式是 REQUEST 那么就说明在消费,需要返回被匹配的节点
// 如果不是当前模式就是 DATA 说明在生产,需要返回S(是一个生产者)它的item不为
return (E) ((mode == REQUEST) ? m.item : s.item);
}
} else if (!isFulfilling(h.mode)) { // try to fulfill
if (h.isCancelled()) // already cancelled
casHead(h, h.next); // pop and retry
else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
for (;;) { // loop until matched or waiters disappear
// m 为与当前插入的节点相匹配的节点,也就是之前head指向的节点
SNode m = s.next; // m is s's match
// 如果m取消了则清空整个栈,这里不用担心后面的节点,因为clean会将已经取消的节点弹出。m为空就代表没有节点和该节点匹配了
if (m == null) { // all waiters are gone
casHead(s, null); // pop fulfill node
s = null; // use new node next time
break; // restart main loop
}
SNode mn = m.next;
// 尝试匹配
if (m.tryMatch(s)) {
// 匹配成功并弹出
casHead(s, mn); // pop both s and m
return (E) ((mode == REQUEST) ? m.item : s.item);
} else // lost match
// 匹配失败
s.casNext(m, mn); // help unlink
}
}
} else {
// 栈顶节点正在进行匹配,帮助栈顶节点完成匹配
SNode m = h.next; // m is h's match
//
if (m == null) // waiter is gone
casHead(h, null); // pop fulfilling node
else {
SNode mn = m.next;
if (m.tryMatch(h)) // help match
casHead(h, mn); // pop both h and m
else // lost match
h.casNext(m, mn); // help unlink
}
}
}
}
SynchronousQueue
通过TransferQueue
来实现
QNode
是TransferQueue
用来包装每一个请求的。
final class QNode {
volatile QNode next; ![](https://article.cdnof.com/2307/d4fcdcaa-3f08-47ed-918a-210d9027caf2.png) // 该节点指向的下一个节点
volatile Object item; //
volatile Thread waiter; // 用来控制线程的状态 unpark 或 park
final boolean isData; // 用来标记该节点为生产者还是消费者
....
}
一些属性值
// 队列头
transient volatile QNode head;
// 队列尾
transient volatile QNode tail;
// 一个标志,用来辅助删除节点
transient volatile QNode cleanMe;
和TransferStack
一样,都是通过transfer
方法来实现交付的,逻辑相对TransferStack.transfer
更加简单一点,主要分为以下两种情况:
队列为空或者与头节点类型相同,将该节点添加到队列中,改变头的状态并自旋等待其他节点与自己匹配,匹配成功后返回交互的元素。
如果当前队列包含正在等待的节点并与头节点相匹配(消费者-生产者或者生产者-消费者),尝试去匹配头节点,匹配成功则出队,返回交互的元素。
E transfer(E e, boolean timed, long nanos) {
QNode s = null; // constructed/reused as needed
boolean isData = (e != null);
for (;;) {
QNode t = tail;
QNode h = head;
if (t == null || h == null) // saw uninitialized value
continue; // spin
// 情况一:入队
if (h == t || t.isData == isData) { // empty or same-mode
QNode tn = t.next;
// 尾节点被修改,其他节点操作还未完成。循环等待
if (t != tail) // inconsistent read
continue;
// 尾节点后还链接了其他的节点说明当前尾节点不是尾节点,需要更新
if (tn != null) { // lagging tail
advanceTail(t, tn);
continue;
}
// 如果当前节点不允许等待,则退出
if (timed && nanos <= 0L) // can't wait
return null;
// 创建节点
if (s == null)
s = new QNode(e, isData);
// 放到队尾,如果失败则重新尝试
if (!t.casNext(null, s)) // failed to link in
continue;
// 当前节点为尾节点,更新t指针
advanceTail(t, s); // swing tail and wait
// 自旋等待其他节点匹配
Object x = awaitFulfill(s, e, timed, nanos);
// 说明当前节点被取消
if (x == s) { // wait was cancelled
// 清除该节点
clean(t, s);
return null;
}
// 该节点可能未从队列中移除,需要处理s为队尾的情况
if (!s.isOffList()) { // not already unlinked
// 更新头节点
advanceHead(t, s); // unlink if head
if (x != null) // and forget fields
s.item = s;
s.waiter = null;
}
return (x != null) ? (E)x : e;
// 情况二:出队
} else { // complementary-mode
QNode m = h.next; // node to fulfill
if (t != tail || m == null || h != head)
continue; // inconsistent read Object x = m.item;
// 在并发的情况下m已经和其他节点匹配或者是m已经取消了
// 当前节点为生产者(类型相同)
if (isData == (x != null) || // m already fulfilled
// m已经取消
x == m || // m cancelled
// 更新m的值,标识m已经和自己匹配,如果更新失败则m已经匹配或者是取消了
!m.casItem(x, e)) { // lost CAS
// 如果m不能和自己匹配
advanceHead(h, m); // dequeue and retry
continue;
}
// 成功匹配并出队
advanceHead(h, m); // successfully fulfilled
// 停止阻塞被匹配的节点
LockSupport.unpark(m.waiter);
return (x != null) ? (E)x : e;
}
}
}
如果消费者或生产者被取消了,那么需要将其从队列中删除,完成这个操作的是clean
分为三种情况:
如果该节点不是尾节点则直接删除
如果该节点是尾节点,但是cleanMe为空,则将cleanMe置为当前节点的前继节点(意思就是一会再删,应该该节点现在不能被删除)
如果cleanMe不为空,则根据cleanMe删除需要删除的节点。如果当前节点的前继节点不为cleanMe则对应步骤2,否则置为空。
尾节点如果直接删除会导致一些并发问题
void clean(QNode pred, QNode s) {
s.waiter = null; // forget thread
while (pred.next == s) { // Return early if already unlinked
QNode h = head;
QNode hn = h.next; // Absorb cancelled first node as head
// 从头开始,如果有被取消的节点则跳过该节点
if (hn != null && hn.isCancelled()) {
advanceHead(h, hn);
continue;
}
// 确保尾节点的读一致性
QNode t = tail; // Ensure consistent read for tail
if (t == h)
return;
QNode tn = t.next
// 尾节点不一致,重新开始
if (t != tail)
continue;
// 又有新节点插入,更新尾节点
if (tn != null) {
advanceTail(t, tn);
continue;
}
// s 不为尾节点直接删除s
if (s != t) { // If not tail, try to unsplice
QNode sn = s.next;
if (sn == s || pred.casNext(s, sn))
return;
}
// 这里说明s目前还不能删除,只能删除上一次需要删除的节点。
// cleanMe保存着上一次需要删除节点的前继节点
QNode dp = cleanMe;
if (dp != null) { // Try unlinking previous cancelled node
// d 为待删除节点
QNode d = dp.next;
QNode dn;
// 检查d的状态
// d 已经为空则说明已经被删除,所以要删除d,d不能被删除
if (d == null || // d is gone or
// d 不能离开队列
d == dp || // d is off list or
// d 节点不能被取消
!d.isCancelled() || // d not cancelled or
// d 不能为尾节点
(d != t && // d not tail and
// d 有后继节点
(dn = d.next) != null && // has successor
dn != d && // that is on list
// 删除d
dp.casNext(d, dn))) // d unspliced
// 清除cleanMe的状态
casCleanMe(dp, null);
// cleanMe 已经保存了 s 的状态,但是s不满足删除的条件
if (dp == pred)
return; // s is already saved node
// 将cleanMe
} else if (casCleanMe(null, pred))
return; // Postpone cleaning s
}
}
如果队列中没有出现能与之相匹配的节点,则该节点就自旋等待,完成这个操作的是awaitFulfill
,和TransferStack
中的差不太多
Object awaitFulfill(QNode s, E e, boolean timed, long nanos) {
/* Same idea as TransferStack.awaitFulfill */
final long deadline = timed ? System.nanoTime() + nanos : 0L;
Thread w = Thread.currentThread ();
// 自旋次数
int spins = (head.next == s)
? (timed ? MAX_TIMED_SPINS : MAX_UNTIMED_SPINS)
: 0;
for (;;) {
// 当前线程是否阻塞
if (w.isInterrupted())
s.tryCancel (e);
// 匹配项,用于检测是否被匹配
Object x = s.item;
if (x != e)
return x;
// 是否超时
if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
s.tryCancel(e);
continue;
}
}
// 用来控制自旋次数
if (spins > 0) {
--spins;
Thread.onSpinWait();
}
else if (s.waiter == null)
s.waiter = w;
// 该线程不允许等待,则阻塞
else if (!timed)
LockSupport.park (this);
// 过一定时间阻塞
else if (nanos > SPIN_FOR_TIMEOUT_THRESHOLD)
LockSupport.parkNanos(this, nanos);
}
}
SynchrounousQueue
和常见的阻塞队列比起来处理方式不一样,也比较难理解。
对于不公平和不公平分为了两种实现方式,利用了FIFO(公平)和LIFO(不公平)的特性来实现。
相比于可存储的队列,SynchronousQueue
导致其他线程就必须自旋等待交付,所以如果交付占用了大量时间就导致其他线程就需要等待特别久的时间,但好处是不需要存储而且是直接交付的。
SynchronousQueue还有一些理解不到位的地方,以上仅供参考。
手机扫一扫
移动阅读更方便
你可能感兴趣的文章