Java中的几种阻塞队列
阅读原文时间:2021年04月20日阅读:1

Java中的几种阻塞队列

 发表于 2017-05-09

Java中的BlockingQueue接口是一个线程安全的存取队列,适用于生产者消费者的应用场景中,支持两个附加操作:

  • 生产者线程会一直不断的往阻塞队列中放入数据,直到队列满了为止。队列满了后,生产者线程阻塞等待消费者线程取出数据。
  • 消费者线程会一直不断的从阻塞队列中取出数据,直到队列空了为止。队列空了后,消费者线程阻塞等待生产者线程放入数据。

BlockingQueue接口

BlockingQueue提供四种不同的处理方法。

 

抛出异常

返回特殊值

一直阻塞

超时退出

插入方法

add(o)

offer(o)

put(o)

offer(o, timeout, timeunit)

移除方法

remove(o)

poll()

take(o)

poll(o, timeout, timeunit)

检查方法

element()

peek()

  1. 抛出异常
    • add: 插入数据时,如果阻塞队列满,那么抛出异常IllegalStateException,否则插入成功返回true。当使用有界(capacity-restricted queue)阻塞队列时,建议使用offer方法。
      • IllegalStateException - if the element cannot be added at this time due to capacity restrictions
      • ClassCastException - if the class of the specified element prevents it from being added to this queue
      • NullPointerException - if the specified element is null
      • IllegalArgumentException - if some property of the specified element prevents it from being added to this queue
    • remove: 删除数据时,如果队列中有此数据,删除成功返回true,否则返回false。如果包含一个或者多个object,那么只移除一个就返回true注意:remove(o)BlockingQueue接口的方法,remove()Queue接口的方法。
    • element: 如果队列为空,那么抛出异常NoSuchElementException。如果队列不为空,查询返回队列头部的数据,但是不移除数据,这点不同于remove()element同样是Queue接口的方法。
  2. 返回特殊值
    • offer: 插入数据时,如果阻塞队列没满,那么插入成功返回true,否则返回false。当使用有界(capacity-restricted queue)阻塞队列时,建议使用offer方法,不建议会抛出异常的add方法。
    • poll: 此方法是Queue接口的。如果队列不为空,查询、移除并返回队列头部元素。如果队列为空,那么返回null
    • peek: 此方法是Queue接口的。如果队列为空,返回null,这点不同于poll。如果队列不为空,查询返回队列头部的数据,但是不移除数据,这点不同于remove()
  3. 一直阻塞
    • put: 插入数据时,如果队列已满,那么阻塞等待队列可用,等待期间如果被中断,那么抛出InterruptedException
    • take: 查询、删除并返回队列头部元素,如果队列为空,那么阻塞等待队列可用,等待期间如果被中断,那么抛出InterruptedException
  4. 超时退出
    • offer: 插入数据时,如果队列已满,那么阻塞指定时间等待队列可用,等待期间如果被中断,那么抛出InterruptedException。如果插入成功,那么返回true,如果在达到指定时间后仍然队列不可用,那么返回false
    • poll: 查询、删除并返回队列头部元素,如果队列为空,那么阻塞指定时间等待队列可用,等待期间如果被中断,那么抛出InterruptedException。如果删除成功,那么返回队列头部元素,如果在达到指定时间后仍然队列不可用,那么返回null

Queue队列不能插入null,否则会抛出NullPointerException

Java里的阻塞队列

JDK7提供了7个阻塞队列。分别是

  • ArrayBlockingQueue :一个由数组结构组成的有界阻塞队列。
  • LinkedBlockingQueue :一个由链表结构组成的有界阻塞队列。
  • PriorityBlockingQueue :一个支持优先级排序的无界阻塞队列。
  • DelayQueue:一个使用优先级队列实现的无界阻塞队列。
  • SynchronousQueue:一个不存储元素的阻塞队列。
  • LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。
  • LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。

ArrayBlockingQueue

ArrayBlockingQueue是基于数组(array-based)的先进先出(FIFO)有界(bounded)阻塞队列。

  • 创建队列时,必须要指定队列容量(capacity),即数组大小。

  • 创建队列时,可以传入Collection来初始化队列元素。

  • 队列一旦被创建,那么队列容量不能被改变。

  • 队列支持公平模式和非公平模式,默认非公平模式。

  • 队列中只有一把锁,写锁和读锁未分离,并发控制采用了经典的two-condition(notEmptynotFull)算法。

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    16

    17

    18

    19

    20

    21

    22

    23

    24

    25

    /** The queued items */

    final Object[] items;

    /** items index for next take, poll, peek or remove */

    int takeIndex;

    /** items index for next put, offer, or add */

    int putIndex;

    /** Number of elements in the queue */

    int count;

    /*

    * Concurrency control uses the classic two-condition algorithm

    * found in any textbook.

    */

    /** Main lock guarding all access */

    final ReentrantLock lock;

    /** Condition for waiting takes */

    private final Condition notEmpty;

    /** Condition for waiting puts */

    private final Condition notFull;

LinkedBlockingQueue

LinkedBlockingQueue是基于链表(linked nodes)的先进先出(FIFO)的可选界(optionally-bounded)的阻塞队列。

  • 创建队列时,为了避免额外开销,可以指定队列容量(capacity);如果不指定队列容量,那么默认队列容量为Integer.MAX_VALUE
  • 创建队列时,可以可以传入Collection来初始化队列元素,此时不能指定队列容量,默认为Integer.MAX_VALUE
  • 队列中的count即当前队列元素个数,采用AtomicInteger,避免puttake的竞争。
  • ArrayBlockingQueue不同的是,LinkedBlockingQueue队列中有两把锁,读锁和写锁是分离的。
  • 在使用LinkedBlockingQueue时,若队列大小为默认值,且生产速度大于消费速度时,可能会内存溢出。
  • LinkedBlockingQueue理论上来说比ArrayBlockingQueue有更高的吞吐量,但是在大多数的实际应用场景中,却没有很好的表现。

PriorityBlockingQueue

PriorityBlockingQueue是基于数组(array based)的支持优先级的无界(unbounded)的阻塞队列。此队列的数据结构是堆。

  • 创建队列时,如果指定初始化容量(initialCapacity),那么默认初始化容量DEFAULT_INITIAL_CAPACITY为11。
  • 创建队列时,可以指定队列初始化容量(initialCapacity),不是队列容量(capacity)。
  • PriorityBlockingQueue的无界(unbounded)相对于LinkedBlockingQueue的可选界(optionally-bounded)来说,无界是指不能在创建队列时,不能指定队列的最大容量(capacity),并不是说PriorityBlockingQueue本身无界。LinkedBlockingQueue默认(注意,这里指的是默认容量,即,你可以指定大于Integer.MAX_VALUE的值)的最大容量是Integer.MAX_VALUE,而PriorityBlockingQueue的最大容量是MAX_ARRAY_SIZE=Integer.MAX_VALUE-8
  • PriorityBlockingQueue无界的另一个意思就是生产者线程不会因为队列满了就阻塞,因为队列是无界的,没有容量满了这一说。offer(E e, long timeout, TimeUnit unit)的后两个参数没有任何作用查看源代码发现,其方法的实现直接是调用了offer(e)。但是当队列为空时,take仍然会阻塞。
  • offer(e)永远不会返回falseoffer(E e, long timeout, TimeUnit unit)永远不会返回false或者阻塞。
  • PriorityBlockingQueue通过数组来实现队列,在原有数组满了的情况下,通过复制数组来扩展队列容量,如果新扩展的数组容量大小超过MAX_ARRAY_SIZE,那么抛出OutOfMemoryError异常。
  • 默认情况下元素采取自然顺序排列,也可以通过比较器comparator来指定元素的排序规则。元素按照升序排列。

DelayQueue

DelayQueue是基于PriorityQueue实现的支持延时获取元素的阻塞队列。

  • DelayQueue中存放的对象必须实现Delayed接口。
  • 如果没有到期元素,那么就没有headpoll方法返回null。
  • 当一个元素的getDelay(TimeUnit.NANOSECONDS)返回值小于等于0时,该元素过期。
  • 虽然不能用takepoll移除未过期的元素,但是这些未过期的元素仍然和过期元素一样同等对待。例如,size方法返回的数量就是过期元素和未过期元素的之和。

DelayQueue的适用场景:

  • 关闭空闲链接。服务器中有很多空闲链接,在一定时间后,关闭他们。
  • 删除过期缓存。在一定时间后,删除某些缓存的对象。
  • 任务超时处理。在网络协议滑动窗口请求应答式交互时,处理超时未响应的请求。
  • 生成订单后的60秒后,给用户发送短信通知。
  • 下单15分钟后,如果用户不付款就关闭订单。

通过上面几条场景例子,可以看出来,DelayQueue适用于在一定时间后,做某些业务处理

SynchronousQueue

SynchronousQueue是一个没有数据缓冲的BlockingQueue

  • 一个线程的插入必须等待另一个线程的删除操作才能完成,反之亦然。
  • SynchronousQueue适合传递性设计(handoff designs),即一个线程中运行的对象,需要将某些信息、任务或者事件等传递给另一个线程中运行的对象的场景。
  • SynchronousQueue支持公平和非公平模式。
  • 不能在同步队列上进行peek,因为仅在试图要移除元素时,该元素才存在。
  • 不能迭代队列,因为其中没有元素可用于迭代。
  • Executors.newCachedThreadPool()中就使用了SynchronousQueue队列。

LinkedTransferQueue

LinkedTransferQueue是基于链表(linked nodes)的无界(unbounded)阻塞队列。

  • 无界队列(Integer.MAX_VALUE),进出队列采用FIFO(先进先出)原则。
  • 生产者会一直阻塞直到所添加到队列的元素被某一个消费者所消费。主要用于线程间消息的传递,与SynchronousQueue很类似,但是比起SynchronousQueue更好用。
  • LinkedTransferQueue既可以使用BlockingQueueput方法进行常规的添加元素操作,也可以使用transfer方法进行阻塞添加。
  • 相比SynchronousQueue灵活之处在于,队列长度非0,阻塞插入和非阻塞插入的元素可以共存。
  • 如果当前有消费者正在等待接收元素(消费者使用take()方法或带时间限制的poll()方法时),transfer方法可以把生产者传入的元素立刻transfer(传输)给消费者。如果没有消费者在等待接收元素,transfer方法会将元素存放在队列的tail节点,并等到该元素被消费者消费了才返回。

LinkedBlockingDeque

LinkedBlockingDueue是基于链表(linked nodes)的可选界(optionally-bounded)的双向阻塞队列。

  • 该阻塞队列同时支持FIFO和FILO两种操作方式,即可以从队列的头和尾同时操作(插入/删除)。
  • 该阻塞队列是支持线程安全。
  • 在初始化LinkedBlockingDeque时可以设置容量防止其过渡膨胀。
  • 双向阻塞队列可以运用在“工作窃取”(work stealing)模式中。

参考文档

  1. http://tutorials.jenkov.com/java-util-concurrent/blockingqueue.html
  2. http://www.infoq.com/cn/articles/java-blocking-queue
  3. What are the differences between SynchronousQueue and LinkedBlockingQueue
  4. when to prefer LinkedBlockingQueue over ArrayBlockingQueue
  5. JDK1.8源代码

转载地址:https://xbest.github.io/2017/05/09/java-BlockingQueue/