J.U.C并发框架源码阅读(十六)FutureTask
阅读原文时间:2021年11月29日阅读:2

基于版本jdk1.7.0_80

java.util.concurrent.FutureTask

代码如下

/*
* ORACLE PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
*
*
*
*
*
*
*
*
*
*
*
*
*
*
*
*
*
*
*
*
*/

/*
*
*
*
*
*
* Written by Doug Lea with assistance from members of JCP JSR-166
* Expert Group and released to the public domain, as explained at
* http://creativecommons.org/publicdomain/zero/1.0/
*/

package java.util.concurrent;
import java.util.concurrent.locks.LockSupport;

/**
* A cancellable asynchronous computation. This class provides a base
* implementation of {@link Future}, with methods to start and cancel
* a computation, query to see if the computation is complete, and
* retrieve the result of the computation. The result can only be
* retrieved when the computation has completed; the {@code get}
* methods will block if the computation has not yet completed. Once
* the computation has completed, the computation cannot be restarted
* or cancelled (unless the computation is invoked using
* {@link #runAndReset}).
*
*

A {@code FutureTask} can be used to wrap a {@link Callable} or
* {@link Runnable} object. Because {@code FutureTask} implements
* {@code Runnable}, a {@code FutureTask} can be submitted to an
* {@link Executor} for execution.
*
*

In addition to serving as a standalone class, this class provides
* {@code protected} functionality that may be useful when creating
* customized task classes.
*
* @since 1.5
* @author Doug Lea
* @param The result type returned by this FutureTask's {@code get} methods
*/
public class FutureTask implements RunnableFuture {
/*
* Revision notes: This differs from previous versions of this
* class that relied on AbstractQueuedSynchronizer, mainly to
* avoid surprising users about retaining interrupt status during
* cancellation races. Sync control in the current design relies
* on a "state" field updated via CAS to track completion, along
* with a simple Treiber stack to hold waiting threads.
*
* Style note: As usual, we bypass overhead of using
* AtomicXFieldUpdaters and instead directly use Unsafe intrinsics.
*/

/\*\*  
 \* The run state of this task, initially NEW.  The run state  
 \* transitions to a terminal state only in methods set,  
 \* setException, and cancel.  During completion, state may take on  
 \* transient values of COMPLETING (while outcome is being set) or  
 \* INTERRUPTING (only while interrupting the runner to satisfy a  
 \* cancel(true)). Transitions from these intermediate to final  
 \* states use cheaper ordered/lazy writes because values are unique  
 \* and cannot be further modified.  
 \*  
 \* Possible state transitions:  
 \* NEW -> COMPLETING -> NORMAL  
 \* NEW -> COMPLETING -> EXCEPTIONAL  
 \* NEW -> CANCELLED  
 \* NEW -> INTERRUPTING -> INTERRUPTED  
 \*/  
private volatile int state;  
private static final int NEW          = 0;  
private static final int COMPLETING   = 1;  
private static final int NORMAL       = 2;  
private static final int EXCEPTIONAL  = 3;  
private static final int CANCELLED    = 4;  
private static final int INTERRUPTING = 5;  
private static final int INTERRUPTED  = 6;

/\*\* The underlying callable; nulled out after running \*/  
private Callable<V> callable;  
/\*\* The result to return or exception to throw from get() \*/  
private Object outcome; // non-volatile, protected by state reads/writes  
/\*\* The thread running the callable; CASed during run() \*/  
private volatile Thread runner;  
/\*\* Treiber stack of waiting threads \*/  
private volatile WaitNode waiters;

/\*\*  
 \* Returns result or throws exception for completed task.  
 \*  
 \* @param s completed state value  
 \*/  
@SuppressWarnings("unchecked")  
private V report(int s) throws ExecutionException {  
    Object x = outcome;  
    if (s == NORMAL)  
        return (V)x;  
    if (s >= CANCELLED)  
        throw new CancellationException();  
    throw new ExecutionException((Throwable)x);  
}

/\*\*  
 \* Creates a {@code FutureTask} that will, upon running, execute the  
 \* given {@code Callable}.  
 \*  
 \* @param  callable the callable task  
 \* @throws NullPointerException if the callable is null  
 \*/  
public FutureTask(Callable<V> callable) {  
    if (callable == null)  
        throw new NullPointerException();  
    this.callable = callable;  
    this.state = NEW;       // ensure visibility of callable  
}

/\*\*  
 \* Creates a {@code FutureTask} that will, upon running, execute the  
 \* given {@code Runnable}, and arrange that {@code get} will return the  
 \* given result on successful completion.  
 \*  
 \* @param runnable the runnable task  
 \* @param result the result to return on successful completion. If  
 \* you don't need a particular result, consider using  
 \* constructions of the form:  
 \* {@code Future<?> f = new FutureTask<Void>(runnable, null)}  
 \* @throws NullPointerException if the runnable is null  
 \*/  
public FutureTask(Runnable runnable, V result) {  
    this.callable = Executors.callable(runnable, result);  
    this.state = NEW;       // ensure visibility of callable  
}

public boolean isCancelled() {  
    return state >= CANCELLED;  
}

public boolean isDone() {  
    return state != NEW;  
}

public boolean cancel(boolean mayInterruptIfRunning) {  
    if (state != NEW)  
        return false;  
    if (mayInterruptIfRunning) {  
        if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, INTERRUPTING))  
            return false;  
        Thread t = runner;  
        if (t != null)  
            t.interrupt();  
        UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); // final state  
    }  
    else if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, CANCELLED))  
        return false;  
    finishCompletion();  
    return true;  
}

/\*\*  
 \* @throws CancellationException {@inheritDoc}  
 \*/  
public V get() throws InterruptedException, ExecutionException {  
    int s = state;  
    if (s <= COMPLETING)  
        s = awaitDone(false, 0L);  
    return report(s);  
}

/\*\*  
 \* @throws CancellationException {@inheritDoc}  
 \*/  
public V get(long timeout, TimeUnit unit)  
    throws InterruptedException, ExecutionException, TimeoutException {  
    if (unit == null)  
        throw new NullPointerException();  
    int s = state;  
    if (s <= COMPLETING &&  
        (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)  
        throw new TimeoutException();  
    return report(s);  
}

/\*\*  
 \* Protected method invoked when this task transitions to state  
 \* {@code isDone} (whether normally or via cancellation). The  
 \* default implementation does nothing.  Subclasses may override  
 \* this method to invoke completion callbacks or perform  
 \* bookkeeping. Note that you can query status inside the  
 \* implementation of this method to determine whether this task  
 \* has been cancelled.  
 \*/  
protected void done() { }

/\*\*  
 \* Sets the result of this future to the given value unless  
 \* this future has already been set or has been cancelled.  
 \*  
 \* <p>This method is invoked internally by the {@link #run} method  
 \* upon successful completion of the computation.  
 \*  
 \* @param v the value  
 \*/  
protected void set(V v) {  
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {  
        outcome = v;  
        UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state  
        finishCompletion();  
    }  
}

/\*\*  
 \* Causes this future to report an {@link ExecutionException}  
 \* with the given throwable as its cause, unless this future has  
 \* already been set or has been cancelled.  
 \*  
 \* <p>This method is invoked internally by the {@link #run} method  
 \* upon failure of the computation.  
 \*  
 \* @param t the cause of failure  
 \*/  
protected void setException(Throwable t) {  
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {  
        outcome = t;  
        UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state  
        finishCompletion();  
    }  
}

public void run() {  
    if (state != NEW ||  
        !UNSAFE.compareAndSwapObject(this, runnerOffset,  
                                     null, Thread.currentThread()))  
        return;  
    try {  
        Callable<V> c = callable;  
        if (c != null && state == NEW) {  
            V result;  
            boolean ran;  
            try {  
                result = c.call();  
                ran = true;  
            } catch (Throwable ex) {  
                result = null;  
                ran = false;  
                setException(ex);  
            }  
            if (ran)  
                set(result);  
        }  
    } finally {  
        // runner must be non-null until state is settled to  
        // prevent concurrent calls to run()  
        runner = null;  
        // state must be re-read after nulling runner to prevent  
        // leaked interrupts  
        int s = state;  
        if (s >= INTERRUPTING)  
            handlePossibleCancellationInterrupt(s);  
    }  
}

/\*\*  
 \* Executes the computation without setting its result, and then  
 \* resets this future to initial state, failing to do so if the  
 \* computation encounters an exception or is cancelled.  This is  
 \* designed for use with tasks that intrinsically execute more  
 \* than once.  
 \*  
 \* @return true if successfully run and reset  
 \*/  
protected boolean runAndReset() {  
    if (state != NEW ||  
        !UNSAFE.compareAndSwapObject(this, runnerOffset,  
                                     null, Thread.currentThread()))  
        return false;  
    boolean ran = false;  
    int s = state;  
    try {  
        Callable<V> c = callable;  
        if (c != null && s == NEW) {  
            try {  
                c.call(); // don't set result  
                ran = true;  
            } catch (Throwable ex) {  
                setException(ex);  
            }  
        }  
    } finally {  
        // runner must be non-null until state is settled to  
        // prevent concurrent calls to run()  
        runner = null;  
        // state must be re-read after nulling runner to prevent  
        // leaked interrupts  
        s = state;  
        if (s >= INTERRUPTING)  
            handlePossibleCancellationInterrupt(s);  
    }  
    return ran && s == NEW;  
}

/\*\*  
 \* Ensures that any interrupt from a possible cancel(true) is only  
 \* delivered to a task while in run or runAndReset.  
 \*/  
private void handlePossibleCancellationInterrupt(int s) {  
    // It is possible for our interrupter to stall before getting a  
    // chance to interrupt us.  Let's spin-wait patiently.  
    if (s == INTERRUPTING)  
        while (state == INTERRUPTING)  
            Thread.yield(); // wait out pending interrupt

    // assert state == INTERRUPTED;

    // We want to clear any interrupt we may have received from  
    // cancel(true).  However, it is permissible to use interrupts  
    // as an independent mechanism for a task to communicate with  
    // its caller, and there is no way to clear only the  
    // cancellation interrupt.  
    //  
    // Thread.interrupted();  
}

/\*\*  
 \* Simple linked list nodes to record waiting threads in a Treiber  
 \* stack.  See other classes such as Phaser and SynchronousQueue  
 \* for more detailed explanation.  
 \*/  
static final class WaitNode {  
    volatile Thread thread;  
    volatile WaitNode next;  
    WaitNode() { thread = Thread.currentThread(); }  
}

/\*\*  
 \* Removes and signals all waiting threads, invokes done(), and  
 \* nulls out callable.  
 \*/  
private void finishCompletion() {  
    // assert state > COMPLETING;  
    for (WaitNode q; (q = waiters) != null;) {  
        if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {  
            for (;;) {  
                Thread t = q.thread;  
                if (t != null) {  
                    q.thread = null;  
                    LockSupport.unpark(t);  
                }  
                WaitNode next = q.next;  
                if (next == null)  
                    break;  
                q.next = null; // unlink to help gc  
                q = next;  
            }  
            break;  
        }  
    }

    done();

    callable = null;        // to reduce footprint  
}

/\*\*  
 \* Awaits completion or aborts on interrupt or timeout.  
 \*  
 \* @param timed true if use timed waits  
 \* @param nanos time to wait, if timed  
 \* @return state upon completion  
 \*/  
private int awaitDone(boolean timed, long nanos)  
    throws InterruptedException {  
    final long deadline = timed ? System.nanoTime() + nanos : 0L;  
    WaitNode q = null;  
    boolean queued = false;  
    for (;;) {  
        if (Thread.interrupted()) {  
            removeWaiter(q);  
            throw new InterruptedException();  
        }

        int s = state;  
        if (s > COMPLETING) {  
            if (q != null)  
                q.thread = null;  
            return s;  
        }  
        else if (s == COMPLETING) // cannot time out yet  
            Thread.yield();  
        else if (q == null)  
            q = new WaitNode();  
        else if (!queued)  
            queued = UNSAFE.compareAndSwapObject(this, waitersOffset,  
                                                 q.next = waiters, q);  
        else if (timed) {  
            nanos = deadline - System.nanoTime();  
            if (nanos <= 0L) {  
                removeWaiter(q);  
                return state;  
            }  
            LockSupport.parkNanos(this, nanos);  
        }  
        else  
            LockSupport.park(this);  
    }  
}

/\*\*  
 \* Tries to unlink a timed-out or interrupted wait node to avoid  
 \* accumulating garbage.  Internal nodes are simply unspliced  
 \* without CAS since it is harmless if they are traversed anyway  
 \* by releasers.  To avoid effects of unsplicing from already  
 \* removed nodes, the list is retraversed in case of an apparent  
 \* race.  This is slow when there are a lot of nodes, but we don't  
 \* expect lists to be long enough to outweigh higher-overhead  
 \* schemes.  
 \*/  
private void removeWaiter(WaitNode node) {  
    if (node != null) {  
        node.thread = null;  
        retry:  
        for (;;) {          // restart on removeWaiter race  
            for (WaitNode pred = null, q = waiters, s; q != null; q = s) {  
                s = q.next;  
                if (q.thread != null)  
                    pred = q;  
                else if (pred != null) {  
                    pred.next = s;  
                    if (pred.thread == null) // check for race  
                        continue retry;  
                }  
                else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,  
                                                      q, s))  
                    continue retry;  
            }  
            break;  
        }  
    }  
}

// Unsafe mechanics  
private static final sun.misc.Unsafe UNSAFE;  
private static final long stateOffset;  
private static final long runnerOffset;  
private static final long waitersOffset;  
static {  
    try {  
        UNSAFE = sun.misc.Unsafe.getUnsafe();  
        Class<?> k = FutureTask.class;  
        stateOffset = UNSAFE.objectFieldOffset  
            (k.getDeclaredField("state"));  
        runnerOffset = UNSAFE.objectFieldOffset  
            (k.getDeclaredField("runner"));  
        waitersOffset = UNSAFE.objectFieldOffset  
            (k.getDeclaredField("waiters"));  
    } catch (Exception e) {  
        throw new Error(e);  
    }  
}

}

0. FutureTask简介

可取消的异步计算,可以用于包装Runnable或者Callable对象,可以查询计算完成状态,如果计算未完成则阻塞查询线程至完成为止,可以只是取消未完成的计算,也可以向运行中的计算发送中断信号。

1. FutureTask接口分析

2. FutureTask的state变量

FutureTask内部维护了一个volatile类型的int变量state,用于存储FutureTask的状态,其可能的取值如下

private static final int NEW = 0;//新建,实际上计算任务可能正在执行
private static final int COMPLETING = 1;//执行中,实际上计算任务已经执行完毕(可能正常,也可能是发生异常)
private static final int NORMAL = 2;//正常结束
private static final int EXCEPTIONAL = 3;//异常结束
private static final int CANCELLED = 4;//已取消
private static final int INTERRUPTING = 5;//中断中
private static final int INTERRUPTED = 6;//已中断

可能的状态转移流程为

* Possible state transitions:
* NEW -> COMPLETING -> NORMAL
* NEW -> COMPLETING -> EXCEPTIONAL
* NEW -> CANCELLED
* NEW -> INTERRUPTING -> INTERRUPTED

3. FutureTask的构造方法

/\*\*  
 \* Creates a {@code FutureTask} that will, upon running, execute the  
 \* given {@code Runnable}, and arrange that {@code get} will return the  
 \* given result on successful completion.  
 \*  
 \* @param runnable the runnable task  
 \* @param result the result to return on successful completion. If  
 \* you don't need a particular result, consider using  
 \* constructions of the form:  
 \* {@code Future<?> f = new FutureTask<Void>(runnable, null)}  
 \* @throws NullPointerException if the runnable is null  
 \*/  
public FutureTask(Runnable runnable, V result) {  
    this.callable = Executors.callable(runnable, result);  
    this.state = NEW;       // ensure visibility of callable  
}

初始化时,将state设置为NEW

4. FutureTask.run方法

public void run() {  
    if (state != NEW ||//当前状态必须为NEW,工作线程必须为null,然后将工作线程用cas操作设置为当前线程  
        !UNSAFE.compareAndSwapObject(this, runnerOffset,  
                                     null, Thread.currentThread()))  
        return;  
    try {  
        Callable<V> c = callable;  
        if (c != null && state == NEW) {//这个时候已经可以保证FutureTask是由当前线程独占了,只要判断当前线程没有已经执行过这个FutureTask即可  
            V result;  
            boolean ran;  
            try {  
                result = c.call();//当前线程执行计算任务  
                ran = true;//成功跑完,标记一下  
            } catch (Throwable ex) {//计算过程中抛出异常  
                result = null;  
                ran = false;//标记任务未完成  
                setException(ex);//标记任务异常  
            }  
            if (ran)  
                set(result);//标记任务正常完成  
        }  
    } finally {  
        // runner must be non-null until state is settled to  
        // prevent concurrent calls to run()  
        runner = null;//先设置state,再设置runner为null,防止并发调用call出问题  
        // state must be re-read after nulling runner to prevent  
        // leaked interrupts  
        int s = state;  
        if (s >= INTERRUPTING)//处理可能的中断  
            handlePossibleCancellationInterrupt(s);  
    }  
}

/\*\*  
 \* Sets the result of this future to the given value unless  
 \* this future has already been set or has been cancelled.  
 \*  
 \* <p>This method is invoked internally by the {@link #run} method  
 \* upon successful completion of the computation.  
 \*  
 \* @param v the value  
 \*/  
protected void set(V v) {//设置FutureTask为正常结束  
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {  
        outcome = v;  
        UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state  
        finishCompletion();  
    }  
}

/\*\*  
 \* Causes this future to report an {@link ExecutionException}  
 \* with the given throwable as its cause, unless this future has  
 \* already been set or has been cancelled.  
 \*  
 \* <p>This method is invoked internally by the {@link #run} method  
 \* upon failure of the computation.  
 \*  
 \* @param t the cause of failure  
 \*/  
protected void setException(Throwable t) {//设置FutureTask为异常结束  
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {  
        outcome = t;  
        UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state  
        finishCompletion();  
    }  
}

5. FutureTask.runAndReset方法

/\*\*  
 \* Executes the computation without setting its result, and then  
 \* resets this future to initial state, failing to do so if the  
 \* computation encounters an exception or is cancelled.  This is  
 \* designed for use with tasks that intrinsically execute more  
 \* than once.  
 \*  
 \* @return true if successfully run and reset  
 \*/  
protected boolean runAndReset() {  
    if (state != NEW ||  
        !UNSAFE.compareAndSwapObject(this, runnerOffset,  
                                     null, Thread.currentThread()))  
        return false;  
    boolean ran = false;  
    int s = state;  
    try {  
        Callable<V> c = callable;  
        if (c != null && s == NEW) {  
            try {  
                c.call(); // don't set result//不设置结果  
                ran = true;//如果任务计算过程中抛出异常或者被中断,ran变量为false  
            } catch (Throwable ex) {  
                setException(ex);  
            }  
        }  
    } finally {  
        // runner must be non-null until state is settled to  
        // prevent concurrent calls to run()  
        runner = null;  
        // state must be re-read after nulling runner to prevent  
        // leaked interrupts  
        s = state;  
        if (s >= INTERRUPTING)  
            handlePossibleCancellationInterrupt(s);  
    }  
    return ran && s == NEW;//如果任务是正常结束,重置FutureTask状态  
}

与FutureTask.run有所不同,首先如果计算任务执行成功,FutureTask的状态会被重置,其次是runAndReset方法不会设置计算任务的结果。

runAndReset方法主要用于定时任务的场景,比方说上一篇介绍的ScheduledThreadPoolExecutor中,ScheduledFutureTask.run方法中,对于周期执行的任务,就是调用的runAndReset方法。

5. FutureTask.cancel方法

public boolean cancel(boolean mayInterruptIfRunning) {  
    if (state != NEW)  
        return false;  
    if (mayInterruptIfRunning) {  
        if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, INTERRUPTING))//设置FutureTask的状态为INTERRUPTING  
            return false;  
        Thread t = runner;  
        if (t != null)  
            t.interrupt();//发送中断信号  
        UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); // final state//设置FutureTask的状态为INTERRUPTED  
    }  
    else if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, CANCELLED))//设置FutureTask的状态为CANCELLED  
        return false;  
    finishCompletion();  
    return true;  
}

逻辑很简单,只cancel状态为NEW的FutureTask

6. FutureTask.get方法

/\*\*  
 \* @throws CancellationException {@inheritDoc}  
 \*/  
public V get() throws InterruptedException, ExecutionException {  
    int s = state;  
    if (s <= COMPLETING)//任务未完成,排队等待结果  
        s = awaitDone(false, 0L);  
    return report(s);  
}

/\*\*  
 \* Awaits completion or aborts on interrupt or timeout.  
 \*  
 \* @param timed true if use timed waits  
 \* @param nanos time to wait, if timed  
 \* @return state upon completion  
 \*/  
private int awaitDone(boolean timed, long nanos)  
    throws InterruptedException {  
    final long deadline = timed ? System.nanoTime() + nanos : 0L;//计算等待结果线程的唤醒时间  
    WaitNode q = null;  
    boolean queued = false;  
    for (;;) {//死循环中等待结果  
        if (Thread.interrupted()) {//等待线程被中断  
            removeWaiter(q);//从等待队列中移除  
            throw new InterruptedException();  
        }

        int s = state;  
        if (s > COMPLETING) {//计算任务结束  
            if (q != null)//函数返回  
                q.thread = null;  
            return s;  
        }  
        else if (s == COMPLETING) // cannot time out yet  
            Thread.yield();//计算任务尚未结束,退让,减少开销  
        else if (q == null)  
            q = new WaitNode();//创建新的等待节点  
        else if (!queued)//如果当前等待节点还未入栈  
            queued = UNSAFE.compareAndSwapObject(this, waitersOffset,  
                                                 q.next = waiters, q);//用CAS操作将当前等待节点入栈  
        else if (timed) {  
            nanos = deadline - System.nanoTime();  
            if (nanos <= 0L) {  
                removeWaiter(q);  
                return state;  
            }  
            LockSupport.parkNanos(this, nanos);//入栈成功,等待  
        }  
        else  
            LockSupport.park(this);//入栈成功,等待  
    }  
}

这里用到了Treiber stack算法,让调用get方法的线程排队等待

7. FutureTask.finishCompletion方法

/\*\*  
 \* Removes and signals all waiting threads, invokes done(), and  
 \* nulls out callable.  
 \*/  
private void finishCompletion() {  
    // assert state > COMPLETING;  
    for (WaitNode q; (q = waiters) != null;) {  
        if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {  
            for (;;) {  
                Thread t = q.thread;  
                if (t != null) {  
                    q.thread = null;  
                    LockSupport.unpark(t);//唤醒等待线程  
                }  
                WaitNode next = q.next;//查找下一个等待线程  
                if (next == null)  
                    break;  
                q.next = null; // unlink to help gc  
                q = next;  
            }  
            break;  
        }  
    }

    done();

    callable = null;        // to reduce footprint  
}

FutureTask的任务如果执行结束,就会调用finishCompletion方法,这个方法会唤醒所有因为调用get方法而等待的线程,于是这些线程可以拿着FutureTask的执行结果离开了。