图解CompletableFuture源码
阅读原文时间:2022年03月30日阅读:3

前言

关于CompletableFuture源码解析部分,整体上感觉还是写比较难的,不过为了推广到团队还是要好好搞一下的,我还是希望大家看到这边文章能学到点什么,废话不多说开始吧。

属性部分

首先看属性部分,我觉得可以从全貌了解他的整体的数据结构,后续我们看到一些操作的时候,也不会产生疑问,算是一种先整体后部分的思想。

打开CompletableFuture源码以后我们首先看到是下面两个核心的关键属性result和stack,关于这两个属性也有核心的注释,result可能是返回的结果集,也可能是包装的AltResult,stack这个数据暴露出了CompletableFuture的整体的结构是一个栈。

    volatile Object result;       // Either the result or boxed AltResult    volatile Completion stack;    // Top of Treiber stack of dependent actions

接下来我们看下Completion的情况,Completion是一个抽象类,分别实现了Runnable、AsynchronousCompletionTask接口,继承了ForkJoinPoolTask类,而ForJoinPoolTask抽象类又实现了Future接口,因此Completion实际上就是一个Future。

img

在Completion类中还有一个非常重要的成员属性,结合我们上面看到的CompletableFuture的stack属性,整好能验证CompletableFuture是一个链表的一个数据结构,Completion中的next保存了栈中下一个元素的引用,而CompletableFuture中的stack永远指向栈顶,至于是不是栈我们可以看下后续方法是如何操作的。

  volatile Completion next;

关于Completion类其实是一个抽象类,还有很多的实现,如下图,后续我们看到具体的实现的时候再来细化实现类。

img

核心方法源码解析

首先我们来看两个测试用例,

&nbsp;&nbsp;&nbsp;&nbsp;@Test&nbsp;&nbsp;&nbsp;&nbsp;public&nbsp;void&nbsp;test1()&nbsp;throws&nbsp;ExecutionException,&nbsp;InterruptedException&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;CompletableFuture<String>&nbsp;base&nbsp;=&nbsp;new&nbsp;CompletableFuture<>();&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;CompletableFuture<String>&nbsp;future&nbsp;=&nbsp;base.thenApply(s&nbsp;->&nbsp;s&nbsp;+&nbsp;"&nbsp;2").thenApply(s&nbsp;->&nbsp;s&nbsp;+&nbsp;"&nbsp;3");&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;base.complete("1");&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;System.out.println(future.get());&nbsp;&nbsp;&nbsp;&nbsp;}&nbsp;&nbsp;&nbsp;&nbsp;@Test&nbsp;&nbsp;&nbsp;&nbsp;public&nbsp;void&nbsp;test2()&nbsp;throws&nbsp;ExecutionException,&nbsp;InterruptedException&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;CompletableFuture<String>&nbsp;base&nbsp;=&nbsp;new&nbsp;CompletableFuture<>();&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;CompletableFuture<String>&nbsp;future&nbsp;=&nbsp;base.thenApply(s&nbsp;->&nbsp;s&nbsp;+&nbsp;"&nbsp;2").thenApply(s&nbsp;->&nbsp;s&nbsp;+&nbsp;"&nbsp;3");&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;future.complete("1");&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;System.out.println(future.get());&nbsp;&nbsp;&nbsp;&nbsp;}

执行这两个测试用例以后,我们会发现最终的结果的是不一致的,这里base和future对象,分别调用complete()和get()方法的排列组合,最终导致结果就发生了变化,是不是很神奇,接下来我们就来看看thenApply相关源码部分。

thenApply

关于thenApply的使用,CompletableFuture提供了类似的三个方法,以Async结尾的表示异步执行,如果传入Executor则以指定线程池执行,否则默认使用的线程池是ForkJoinPool。

public&nbsp;<U>&nbsp;CompletableFuture<U>&nbsp;thenApply(&nbsp;&nbsp;&nbsp;&nbsp;Function<?&nbsp;super&nbsp;T,?&nbsp;extends&nbsp;U>&nbsp;fn)&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;return&nbsp;uniApplyStage(null,&nbsp;fn);}public&nbsp;<U>&nbsp;CompletableFuture<U>&nbsp;thenApplyAsync(&nbsp;&nbsp;&nbsp;&nbsp;Function<?&nbsp;super&nbsp;T,?&nbsp;extends&nbsp;U>&nbsp;fn)&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;return&nbsp;uniApplyStage(asyncPool,&nbsp;fn);}public&nbsp;<U>&nbsp;CompletableFuture<U>&nbsp;thenApplyAsync(&nbsp;&nbsp;&nbsp;&nbsp;Function<?&nbsp;super&nbsp;T,?&nbsp;extends&nbsp;U>&nbsp;fn,&nbsp;Executor&nbsp;executor)&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;return&nbsp;uniApplyStage(screenExecutor(executor),&nbsp;fn);&nbsp;&nbsp;&nbsp;&nbsp;}

我们重点关注的thenApply的方法,整体的源码如下:

&nbsp;&nbsp;&nbsp;&nbsp;public&nbsp;<U>&nbsp;CompletableFuture<U>&nbsp;thenApply(&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;Function<?&nbsp;super&nbsp;T,?&nbsp;extends&nbsp;U>&nbsp;fn)&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;return&nbsp;uniApplyStage(null,&nbsp;fn);&nbsp;&nbsp;&nbsp;&nbsp;}&nbsp;&nbsp;&nbsp;&nbsp;private&nbsp;<V>&nbsp;CompletableFuture<V>&nbsp;uniApplyStage(&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;Executor&nbsp;e,&nbsp;Function<?&nbsp;super&nbsp;T,?&nbsp;extends&nbsp;V>&nbsp;f)&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;if&nbsp;(f&nbsp;==&nbsp;null)&nbsp;throw&nbsp;new&nbsp;NullPointerException();&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;1.创建一个新的CompletableFuture对象&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;CompletableFuture<V>&nbsp;d&nbsp;=&nbsp;&nbsp;new&nbsp;CompletableFuture<V>();&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;if&nbsp;(e&nbsp;!=&nbsp;null&nbsp;||&nbsp;!d.uniApply(this,&nbsp;f,&nbsp;null))&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;2.&nbsp;构建UniApply&nbsp;e代表线程池&nbsp;d&nbsp;代表新的CompletableFuture&nbsp;this&nbsp;代表当前&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;f&nbsp;代表方法&nbsp;这个时候&nbsp;UniApply&nbsp;内部的所有的引用都处于为null的状态&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;UniApply<T,V>&nbsp;c&nbsp;=&nbsp;new&nbsp;UniApply<T,V>(e,&nbsp;d,&nbsp;this,&nbsp;f);&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;3.&nbsp;c其实就是Completion对象,被push到栈中&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;push(c);&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;4.&nbsp;尝试执行c&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;c.tryFire(SYNC);&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;}&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;5.&nbsp;这个d会一直返回到调用thenApply的地方,后续的链式调用会作用在这个d上面&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;return&nbsp;d;&nbsp;&nbsp;&nbsp;&nbsp;}&nbsp;&nbsp;&nbsp;&nbsp;@SuppressWarnings("serial")&nbsp;&nbsp;&nbsp;&nbsp;static&nbsp;final&nbsp;class&nbsp;UniApply<T,V>&nbsp;extends&nbsp;UniCompletion<T,V>&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;Function<?&nbsp;super&nbsp;T,?&nbsp;extends&nbsp;V>&nbsp;fn;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;UniApply(Executor&nbsp;executor,&nbsp;CompletableFuture<V>&nbsp;dep,&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;CompletableFuture<T>&nbsp;src,&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;Function<?&nbsp;super&nbsp;T,?&nbsp;extends&nbsp;V>&nbsp;fn)&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;2.1&nbsp;向上执行&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;super(executor,&nbsp;dep,&nbsp;src);&nbsp;this.fn&nbsp;=&nbsp;fn;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;}&nbsp;&nbsp;&nbsp;&nbsp;}&nbsp;&nbsp;&nbsp;&nbsp;abstract&nbsp;static&nbsp;class&nbsp;UniCompletion<T,V>&nbsp;extends&nbsp;Completion&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;Executor&nbsp;executor;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;//&nbsp;executor&nbsp;to&nbsp;use&nbsp;(null&nbsp;if&nbsp;none)&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;CompletableFuture<V>&nbsp;dep;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;//&nbsp;the&nbsp;dependent&nbsp;to&nbsp;complete&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;CompletableFuture<T>&nbsp;src;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;//&nbsp;source&nbsp;for&nbsp;action&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;UniCompletion(Executor&nbsp;executor,&nbsp;CompletableFuture<V>&nbsp;dep,&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;CompletableFuture<T>&nbsp;src)&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;2.2&nbsp;dep就是新创建的d&nbsp;&nbsp;src就是当前的this&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;this.executor&nbsp;=&nbsp;executor;&nbsp;this.dep&nbsp;=&nbsp;dep;&nbsp;this.src&nbsp;=&nbsp;src;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;}&nbsp;&nbsp;&nbsp;&nbsp;}

关于执行第2步的时候,构建的对象如下图, src和dep都是空的CompletableFuture,next为Null,这里我们会发现所有的都是继承Completion对象,最终所有都是构建都可以理解为Completion对象;

img

image.png

关于执行第3步的时候,构建的UniApply对象的内容完成压栈的操作,将CompletableFuture的stack属性指向Completion对象;

img

image.png

接下来看第4步操作,尝试执行Completion;

&nbsp;&nbsp;&nbsp;&nbsp;@SuppressWarnings("serial")&nbsp;&nbsp;&nbsp;&nbsp;static&nbsp;final&nbsp;class&nbsp;UniApply<T,V>&nbsp;extends&nbsp;UniCompletion<T,V>&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;Function<?&nbsp;super&nbsp;T,?&nbsp;extends&nbsp;V>&nbsp;fn;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;UniApply(Executor&nbsp;executor,&nbsp;CompletableFuture<V>&nbsp;dep,&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;CompletableFuture<T>&nbsp;src,&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;Function<?&nbsp;super&nbsp;T,?&nbsp;extends&nbsp;V>&nbsp;fn)&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;super(executor,&nbsp;dep,&nbsp;src);&nbsp;this.fn&nbsp;=&nbsp;fn;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;}&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;final&nbsp;CompletableFuture<V>&nbsp;tryFire(int&nbsp;mode)&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;4.1&nbsp;d新创建的&nbsp;a(也是c中的src)&nbsp;就是原来的&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;CompletableFuture<V>&nbsp;d;&nbsp;CompletableFuture<T>&nbsp;a;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;4.2&nbsp;如果uniApply执行成功,则会进到下面的postFire调用&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;否则返回null&nbsp;如果返回null,就要等待以后的主动complete来再次触发&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;if&nbsp;((d&nbsp;=&nbsp;dep)&nbsp;==&nbsp;null&nbsp;||&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;!d.uniApply(a&nbsp;=&nbsp;src,&nbsp;fn,&nbsp;mode&nbsp;>&nbsp;0&nbsp;?&nbsp;null&nbsp;:&nbsp;this))&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;return&nbsp;null;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;4.5&nbsp;tryFire成功后,会把以下几个属性设为null,表面此Completion已经完成任务,&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;变成dead状态&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;dep&nbsp;=&nbsp;null;&nbsp;src&nbsp;=&nbsp;null;&nbsp;fn&nbsp;=&nbsp;null;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;4.6&nbsp;出栈&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;return&nbsp;d.postFire(a,&nbsp;mode);&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;}&nbsp;&nbsp;&nbsp;&nbsp;}&nbsp;&nbsp;&nbsp;&nbsp;final&nbsp;<S>&nbsp;boolean&nbsp;uniApply(CompletableFuture<S>&nbsp;a,&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;Function<?&nbsp;super&nbsp;S,?&nbsp;extends&nbsp;T>&nbsp;f,&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;UniApply<S,T>&nbsp;c)&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;Object&nbsp;r;&nbsp;Throwable&nbsp;x;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;4.3&nbsp;如果a(也是c中的src)没有准备完成,那result是空,这里就会直接返回false&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;if&nbsp;(a&nbsp;==&nbsp;null&nbsp;||&nbsp;(r&nbsp;=&nbsp;a.result)&nbsp;==&nbsp;null&nbsp;||&nbsp;f&nbsp;==&nbsp;null)&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;return&nbsp;false;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;tryComplete:&nbsp;if&nbsp;(result&nbsp;==&nbsp;null)&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;if&nbsp;(r&nbsp;instanceof&nbsp;AltResult)&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;if&nbsp;((x&nbsp;=&nbsp;((AltResult)r).ex)&nbsp;!=&nbsp;null)&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;completeThrowable(x,&nbsp;r);&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;break&nbsp;tryComplete;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;}&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;r&nbsp;=&nbsp;null;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;}&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;try&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;if&nbsp;(c&nbsp;!=&nbsp;null&nbsp;&&&nbsp;!c.claim())&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;return&nbsp;false;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;@SuppressWarnings("unchecked")&nbsp;S&nbsp;s&nbsp;=&nbsp;(S)&nbsp;r;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;4.4&nbsp;如果r不为空,则会作为f的输入参数,f的输出则成为当前CompletableFuture的完成值&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;completeValue(f.apply(s));&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;}&nbsp;catch&nbsp;(Throwable&nbsp;ex)&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;completeThrowable(ex);&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;}&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;}&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;return&nbsp;true;&nbsp;&nbsp;&nbsp;&nbsp;}

第5步返回d, 这个d会返回到调用thenApply的地方,后续的链式调用会作用在这个d上面,接下来我们可以看到base对象就是我们构建好的第一个链;

img

这里我们可以猜测后续的执行thenApply的方法,也就是执行完成test1的第二行代码,生成的结构如下图:

img

接下来我们验证一下,我们可以发现和我们猜想一致;

img

当我们的代码执行到test1的第3行的时候,也就是complete方法,该方法也就是为了解决我们执行tryFire执行失败后动作,源码如下:

&nbsp;&nbsp;&nbsp;&nbsp;public&nbsp;boolean&nbsp;complete(T&nbsp;value)&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;boolean&nbsp;triggered&nbsp;=&nbsp;completeValue(value);&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;postComplete();&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;return&nbsp;triggered;&nbsp;&nbsp;&nbsp;&nbsp;}&nbsp;&nbsp;&nbsp;&nbsp;final&nbsp;void&nbsp;postComplete()&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;1.&nbsp;this表示当前的CompletableFuture,&nbsp;也就是我们base&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;CompletableFuture<?>&nbsp;f&nbsp;=&nbsp;this;&nbsp;Completion&nbsp;h;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;2.&nbsp;判断stack是否为空&nbsp;&nbsp;或者如果f的栈为空且不是this则重置&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;while&nbsp;((h&nbsp;=&nbsp;f.stack)&nbsp;!=&nbsp;null&nbsp;||&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;(f&nbsp;!=&nbsp;this&nbsp;&&&nbsp;(h&nbsp;=&nbsp;(f&nbsp;=&nbsp;this).stack)&nbsp;!=&nbsp;null))&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;CompletableFuture<?>&nbsp;d;&nbsp;Completion&nbsp;t;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;3.&nbsp;CAS出栈&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;if&nbsp;(f.casStack(h,&nbsp;t&nbsp;=&nbsp;h.next))&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;if&nbsp;(t&nbsp;!=&nbsp;null)&nbsp;{&nbsp;4.出栈的h不是最后一个元素,最后一个元素直接执行7即可&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;if&nbsp;(f&nbsp;!=&nbsp;this)&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;5.&nbsp;如果f不是this,将刚出栈的h,&nbsp;入this的栈顶&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;我猜测这个地方大家会有迷惑&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;pushStack(h);&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;continue;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;}&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;h.next&nbsp;=&nbsp;null;&nbsp;&nbsp;&nbsp;&nbsp;6.&nbsp;detach&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;}&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;f&nbsp;=&nbsp;(d&nbsp;=&nbsp;h.tryFire(NESTED))&nbsp;==&nbsp;null&nbsp;?&nbsp;this&nbsp;:&nbsp;d;&nbsp;&nbsp;7.调用tryFire&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;}&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;}&nbsp;&nbsp;&nbsp;&nbsp;}

对于postComplete()方法可以理解为当任务完成之后,调用的一个后完成方法,主要用于触发其他依赖任务,也就是完成出栈的操作,关于第4、5步和的疑惑,这里我先说一下,这里的原因是每次调用产生的Completion并不在同一个stack中,接下来我们来看一个复杂的案例,可能大家就比较明白了;

复杂案例

&nbsp;&nbsp;&nbsp;&nbsp;@Test&nbsp;&nbsp;&nbsp;&nbsp;public&nbsp;void&nbsp;test3()&nbsp;throws&nbsp;ExecutionException,&nbsp;InterruptedException&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;CompletableFuture<String>&nbsp;base&nbsp;=&nbsp;new&nbsp;CompletableFuture<>();&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;CompletableFuture<String>&nbsp;future&nbsp;=&nbsp;base.thenApply(s&nbsp;->&nbsp;{&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;log.info("2");&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;return&nbsp;s&nbsp;+&nbsp;"&nbsp;2";&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;});&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;base.thenAccept(s&nbsp;->&nbsp;log.info(s&nbsp;+&nbsp;"3-1")).thenAccept(aVoid&nbsp;->&nbsp;log.info("3-2"));&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;base.thenAccept(s&nbsp;->&nbsp;log.info(s&nbsp;+&nbsp;"4-1")).thenAccept(aVoid&nbsp;->&nbsp;log.info("4-2"));&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;base.complete("1");&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;log.info("base&nbsp;result:&nbsp;{}",&nbsp;base.get());&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;log.info("future&nbsp;result:&nbsp;{}",&nbsp;future.get());&nbsp;&nbsp;&nbsp;&nbsp;}

首先看下输出,我们可以看到基本上是按照4-3-2-1的顺序输出的,证明CompletableFuture整体上是一个栈的结构,接下来我们就图解下这一过程;

img

当代码执行完第7行的时候我们得到的是这样的结构:

img

image.png

代码执行完第8行的时候,结构是这样的:

img

image.png

执行完第9行的时候,结构是这样的:

img

image.png

到这里就构成我们整个的调用链路,这个时候我们可以想明白为什么出栈的时候要判断下f != this了吧,因为内部又嵌套层栈的结构,构成了一个图状;

当代码执行到第10行的时候,就开始出栈,按照4-3-2-1的顺序输出,到这里这部分内容就讲解完成了。

参考以下内容:

深入理解JDK8新特性CompletableFuture

结束

欢迎大家点点关注,点点赞!

手机扫一扫

移动阅读更方便

阿里云服务器
腾讯云服务器
七牛云服务器