Java多线程专题6: Queue和List
阅读原文时间:2023年07月08日阅读:1

CopyOnWriteArrayList 如何通过写时拷贝实现并发安全的 List?

CopyOnWrite(COW), 是计算机程序设计领域中的一种优化策略, 即写入时复制. 其机制当有多个线程同时去请求一个资源时(可以是内存中的一个数据), 当其中一个线程要对资源进行修改, 系统会copy一个副本给该线程, 让其进行修改, 而其他线程所拥有的资源并不会由于该线程对资源的改动而发生改变.

如果用代码来描述的话,就是创建多个线程, 在每个线程中如果修改共享变量, 那么就将此变量进行一次拷贝操作, 每次的修改都是对副本进行.

java.util.concurrent包中提供了两个CopyOnWrite机制容器,分别为CopyOnWriteArrayList和CopyOnWriteArraySet.

CopyOnWriteArrayList添加元素:在添加元素之前进行加锁操作,保证数据的原子性。在添加过程中,进行数组复制,修改操作,再将新生成的数组复制给集合中的array属性. 最后释放锁. 由于array属性被volatile修饰, 所以当添加完成后, 其他线程就可以立刻查看到被修改的内容.

CopyOnWriteArrayList读取元素get方法没有进行加锁处理.

机制的优缺点: 保证了数据在多线程操作时的最终一致性, 缺点就是内存空间的浪费, 不能保证实时的数据一致性.

随机数生成器 Random 类如何使用 CAS 算法保证多线程下新种子的唯一性?

Random里的seed用于控制生成的随机数, 每次生成后都会更新, 而这个seed又是一个AtomicLong类型对象, 所以在多线程下是可以保证seed的唯一性的.

谈下对基于链表的非阻塞无界队列 ConcurrentLinkedQueue 原理的理解?

An unbounded thread-safe queue based on linked nodes. This queue orders elements FIFO (first-in-first-out). The head of the queue is that element that has been on the queue the longest time. The tail of the queue is that element that has been on the queue the shortest time. New elements are inserted at the tail of the queue, and the queue retrieval operations obtain elements at the head of the queue. A ConcurrentLinkedQueue is an appropriate choice when many threads will share access to a common collection. Like most other concurrent collection implementations, this class does not permit the use of null elements.

This implementation employs an efficient non-blocking algorithm based on one described in Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms by Maged M. Michael and Michael L. Scott.

Iterators are weakly consistent, returning elements reflecting the state of the queue at some point at or since the creation of the iterator. They do not throw java.util.ConcurrentModificationException, and may proceed concurrently with other operations. Elements contained in the queue since the creation of the iterator will be returned exactly once.

Beware that, unlike in most collections, the size method is NOT a constant-time operation. Because of the asynchronous nature of these queues, determining the current number of elements requires a traversal of the elements, and so may report inaccurate results if this collection is modified during traversal. Additionally, the bulk operations addAll, removeAll, retainAll, containsAll, equals, and toArray are not guaranteed to be performed atomically. For example, an iterator operating concurrently with an addAll operation might view only some of the added elements.

This class and its iterator implement all of the optional methods of the Queue and Iterator interfaces.

Memory consistency effects: As with other concurrent collections, actions in a thread prior to placing an object into a ConcurrentLinkedQueue happen-before actions subsequent to the access or removal of that element from the ConcurrentLinkedQueue in another thread.

内部类Node实现了一些CAS方法, 用于节点操作

private static class Node<E> {
    volatile E item;
    volatile Node<E> next;

    /**
     * Constructs a new node.  Uses relaxed write because item can
     * only be seen after publication via casNext.
     */
    Node(E item) {
        UNSAFE.putObject(this, itemOffset, item);
    }

    boolean casItem(E cmp, E val) {
        return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
    }

    void lazySetNext(Node<E> val) {
        UNSAFE.putOrderedObject(this, nextOffset, val);
    }

    boolean casNext(Node<E> cmp, Node<E> val) {
        return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
    }

offer() 方法的源代码

public boolean offer(E e) {
    checkNotNull(e);
    final Node<E> newNode = new Node<E>(e);

    for (Node<E> t = tail, p = t;;) {
        Node<E> q = p.next;
        if (q == null) {
            // p is last node
            if (p.casNext(null, newNode)) {
                // Successful CAS is the linearization point
                // for e to become an element of this queue,
                // and for newNode to become "live".
                if (p != t) // hop two nodes at a time
                    casTail(t, newNode);  // Failure is OK.
                return true;
            }
            // Lost CAS race to another thread; re-read next
        }
        else if (p == q)
            // We have fallen off list.  If tail is unchanged, it
            // will also be off-list, in which case we need to
            // jump to head, from which all live nodes are always
            // reachable.  Else the new tail is a better bet.
            p = (t != (t = tail)) ? t : head;
        else
            // Check for tail updates after two hops.
            p = (p != t && t != (t = tail)) ? t : q;
    }
}

ConcurrentLinkedQueue 内部是如何使用 CAS 非阻塞算法来保证多线程下入队出队操作的线程安全?

casItem和casNext, 如果设置不成功就再到新位置再试一次, 直到成功

基于链表的阻塞队列 LinkedBlockingQueue 原理

通过ReentrantLock的lock()和await()实现的阻塞, 再底下的阻塞实现是用的AQS的acquireQueued()方法

阻塞队列LinkedBlockingQueue 内部是如何使用两个独占锁 ReentrantLock 以及对应的条件变量保证多线程先入队出队操作的线程安全?

A variant of the "two lock queue" algorithm. The putLock gates entry to put (and offer), and has an associated condition for waiting puts. Similarly for the takeLock. The "count" field that they both rely on is maintained as an atomic to avoid needing to get both locks in most cases. Also, to minimize need for puts to get takeLock and vice-versa, cascading notifies are used. When a put notices that it has enabled at least one take, it signals taker. That taker in turn signals others if more items have been entered since the signal. And symmetrically for takes signalling puts. Operations such as remove(Object) and iterators acquire both locks. Visibility between writers and readers is provided as follows:

Whenever an element is enqueued, the putLock is acquired and count updated. A subsequent reader guarantees visibility to the enqueued Node by either acquiring the putLock (via fullyLock) or by acquiring the takeLock, and then reading n = count.get(); this gives visibility to the first n items.

To implement weakly consistent iterators, it appears we need to keep all Nodes GC-reachable from a predecessor dequeued Node. That would cause two problems:

  • allow a rogue Iterator to cause unbounded memory retention

  • cause cross-generational linking of old Nodes to new Nodes if a Node was tenured while live, which generational GCs have a hard time dealing with, causing repeated major collections.

    However, only non-deleted Nodes need to be reachable from dequeued Nodes, and reachability does not necessarily have to be of the kind understood by the GC. We use the trick of linking a Node that has just been dequeued to itself. Such a self-link implicitly means to advance to head.next.

    删除时为了避免GC回收问题, 会把被删除节点的next指向自己

底层用单向链表存储数据, 可以用作有界队列或者无界队列, 默认无参构造函数的容量为Integer.MAX_VALUE. 从类图中可以看到, LinkedBlockingQueue使用了takeLock和putLock两把锁, 分别用于阻塞队列的读写线程,

  • head用来管理元素出队, 有 take(), poll(), peek() 三个操作

  • tail用来管理元素入队, 有 put(), offer() 两个操作

    private final ReentrantLock takeLock = new ReentrantLock();    /* 读锁 */
    private final Condition notEmpty = takeLock.newCondition();     /* 读锁对应的条件 */
    private final ReentrantLock putLock = new ReentrantLock();     /* 写锁 */
    private final Condition notFull = putLock.newCondition();           /* 写锁对应的条件 */
  • 在head上take时, 需要拿到takeLock, 如果队列为空, 就notEmpty.await(), 如果队列不为空, 就notFull.signal()

  • 在tail上put时, 需要拿到puLock, 如果队列满了, 就notFull.await(), 如果队列还没满, 就notEmpty.signal()

也就是说读线程和写线程可以同时运行, 在多线程高并发场景, 应该可以有更高的吞吐量, 性能比单锁更高.

ArrayBlockingQueue

ArrayBlockingQueue,底层用数组存储数据,属于有界队列,初始化时必须指定队列大小,count记录当前队列元素个数,takeIndex和putIndex分别记录出队和入队的数组下标边界,都在[0,items.length-1]范围内循环使用,同时满足0<=count<=items.length。在提供的阻塞方法put/take中,共用一个Lock实例,分别在绑定的不同的Condition实例处阻塞,如put在队列满时调用notFull.await(),take在队列空时调用notEmpty.await()

public void put(E e) throws InterruptedException {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == items.length)
            notFull.await();
        enqueue(e);
    } finally {
        lock.unlock();
    }
}

private void enqueue(E x) {
    final Object[] items = this.items;
    items[putIndex] = x;
    if (++putIndex == items.length)
        putIndex = 0;
    count++;
    notEmpty.signal();
}

ArrayBlockingQueue和LinkedBlockingQueue的区别

  1. LinkedBlockingQueue是基于链表实现的初始化, 可以不指定队列大小(默认Integer.MAX_VALUE), 而ArrayBlockingQueue是基于数组实现, 初始化时必须指定大小
  2. LinkedBlockingQueue在puts操作会生成新的Node对象, takes操作Node对象在某一时间会被gc, 可能会影响gc性能, ArrayBlockingQueue是固定的数组长度循环使用, 不会出现对象的产生与回收
  3. LinkedBlockingQueue基于链表, 在remove操作时不需移动数据, ArrayBlockingQueue是基于数组, 在remove时需要移动数据, 影响性能
  4. LinkedBlockingQueue使用两个锁将puts操作与takes操作分开, 而ArrayBlockingQueue使用一个锁的两个条件, 在高并发的情况下LinkedBlockingQueue的性能较好