• 欢迎访问 winrains 的个人网站!
  • 本网站主要从互联网整理和收集了与Java、网络安全、Linux等技术相关的文章,供学习和研究使用。如有侵权,请留言告知,谢谢!

图解BlockingQueue阻塞队列

Java技术 winrains 来源:arthinking 6个月前 (03-31) 39次浏览

本文重点介绍各种阻塞队列的实现、对比和使用场景。阅读完本文,你将对各种阻塞队列的实现原理都有一定的了解,以及了解他们的使用场景。

1、阻塞队列 BlockingQueue

1.1、BlockingQueue的基本原理

我们先来解释一下阻塞队列:

如上图

  • 生产线程1往阻塞队列里面添加新的数据,当阻塞队列满的时候(针对有界队列),生产线程1将会处于阻塞状态,直到消费线程2从队列中取走一个数据;
  • 消费线程2从阻塞队列取数据,当阻塞队列空的时候,消费线程2将会处于阻塞状态,直到生产线程把一个数据放进去。

阻塞队列的基本原理就这样,至于队列是用什么数据结构进行存储的,这里并没有规定,所以后面我们可以看到很多阻塞队列的实现。

阻塞队列的常用方法

我们查阅BlockingQueue总结了以下阻塞队列的方法:

如果队列满了(插入)或者空(移除)了 抛异常 阻塞 超时 立刻返回
插入 add(E e) put(E e) offer(E e, long timeout, TimeUnit unit) offer(E e)
移除 remove(Object o) take() poll(long timeout, TimeUnit unit)

注意:

根据remove(Object o)方法签名可知,这个方法可以移除队列的特定对象,但是这个方法效率并不高。因为需要遍历队列匹配到特定的对象之后,再进行移除。

以上支持阻塞和超时的方法都是能够响应中断的。

1.2、BlockingQueue的实现

作为一个五年级的小学生,到这里就开始听不懂了,暗中嘚瑟,还好学的没那么快。

下图展示了主要的BlockingQueue的实现类:

其实我们在前面的文章:ReentrantLock介绍与使用#2.4、条件变量 章节已经使用ReentrantLock的Condition实现了一个阻塞队列,底层是使用LinkedList进行存储的。

BlockingQueue底层也是基于AQS实现的,队列的阻塞使用ReentrantLock的Condition实现的。

下面我们来看看各个实现类的原理。以下分析我都会基于支持阻塞的put和take方法来分析。

2、ArrayBlockingQueue

相信经过上面各个同步类的分析,大家已经对AQS比较熟悉了,下面我将不再画具体的内部结构图了。

ArrayBlockingQueue使用的数据结构是数组

Object[capacity]

容量大小有构造函数的capacity参数决定。

2.1、put方法

public void put(E e) throws InterruptedException {
  checkNotNull(e);
  // 获取ReentrantLock锁
  final ReentrantLock lock = this.lock;
  lock.lockInterruptibly();
  try {
    // 如果队列满了,则进入条件队列进行等待
    while (count == items.length)
      notFull.await();
    // 队列不满,或者被取数线程唤醒了,那么会继续执行
    // 这里会往阻塞队列添加一个数据,然后唤醒等待时间最长的取数线程
    enqueue(e);
  } finally {
    // 释放ReentrantLock锁
    lock.unlock();
  }
}
  1. 只有获取到了ReentrantLock锁之后,才可以操作队列;
  2. 队列满了会阻塞进入条件队列等待;
  3. 队列不满则添加数据,并且唤醒等待时间最长的取数线程。

2.2、take方法

获取小顶堆最小的元素,获取之后会重新构造小顶堆。

public E take() throws InterruptedException {
  // 获取ReentrantLock锁
  final ReentrantLock lock = this.lock;
  lock.lockInterruptibly();
  try {
    // 如果队列空了,则进入条件队列进行等待
    while (count == 0)
      notEmpty.await();
    // 队列不空,或者被存数线程唤醒了,那么会继续执行
    // 这里会从阻塞队列取一个数据,然后唤醒等待时间最长的存数线程
    return dequeue();
  } finally {
    // 释放ReentrantLock锁
    lock.unlock();
  }
}
  1. 只有获取到了ReentrantLock锁之后,才可以操作队列;
  2. 队列空了会阻塞进入条件队列等待;
  3. 队列不满则取数据,并且唤醒等待时间最长的存数线程。

注意:ArrayList中的数据取数和存数都是依次遍历一个一个取或者存,直到队尾之后,从头开始继续。代码如下:

private void enqueue(E x) {
  final Object[] items = this.items;
  items[putIndex] = x;
  if (++putIndex == items.length)
    putIndex = 0;
  count++;
  notEmpty.signal();
}

private E dequeue() {
  final Object[] items = this.items;
  @SuppressWarnings("unchecked")
  E x = (E) items[takeIndex];
  items[takeIndex] = null;
  if (++takeIndex == items.length)
    takeIndex = 0;
  count--;
  if (itrs != null)
    itrs.elementDequeued();
  notFull.signal();
  return x;
}

如下图:

这里put和take使用了同一个ReentrantLock,不能并发执行。

有没有办法能够做到让put和take能够并发执行呢?接下来我们就来看看LinkedBlockingQueue。

3、LinkedBlockingQueue

LinkedBlockingQueue的put方法和take方法分别使用了不同的ReentrantLock,put和take可以并发执行,但是不能并发执行put或者take操作。

LinkedBlockingQueue底层使用的数据结构是单向链表

transient Node head;

private transient Node last;

容量大小可以由构造函数的capacity设定,默认为:Integer.MAX_VALUE

3.1、put方法

public void put(E e) throws InterruptedException {
  if (e == null) throw new NullPointerException();
  int c = -1;
  Node<E> node = new Node<E>(e);
  final ReentrantLock putLock = this.putLock;
  // 使用AtomicInteger保证原子性
  final AtomicInteger count = this.count;
  // 获取put锁
  putLock.lockInterruptibly();
  try {
    // 如果队列满了,则进入put条件队列等待
    while (count.get() == capacity) {
      notFull.await();
    }
    // 队列不满,或者被取数线程唤醒了,那么会继续执行
    // 这里会往阻塞队列末尾添加一个数据
    enqueue(node);
    c = count.getAndIncrement();
    // 如果队列不满,则唤醒等待时间最长的put线程
    if (c + 1 < capacity)
      notFull.signal();
  } finally {
    // 释放put锁
    putLock.unlock();
  }
  // 如果队列为空,再次获取put锁,然后唤醒等待时间最长的put线程
  if (c == 0)
    signalNotEmpty();
}

3.2、take方法

public E take() throws InterruptedException {
  E x;
  int c = -1;
  final AtomicInteger count = this.count;
  final ReentrantLock takeLock = this.takeLock;
  // 获取take锁
  takeLock.lockInterruptibly();
  try {
    // 如果队列空了,则进入take条件队列等待
    while (count.get() == 0) {
      notEmpty.await();
    }
    // 获取到第一个节点,非哑节点
    x = dequeue();
    // 阻塞队列数量减1
    c = count.getAndDecrement();
    // 如果阻塞队列数量不为空,那么唤醒等待时间最长的take线程
    if (c > 1)
      notEmpty.signal();
  } finally {
    // 释放take锁
    takeLock.unlock();
  }
  // 如果队列满了,再次获取take锁,然后唤醒等待时间最长的take线程
  if (c == capacity)
    signalNotFull();
  return x;
}

take和put操作如下图所示:

  1. 队列第一个节点为哑节点,占位用的;
  2. put操作一直往链表后面追加节点;
  3. take操作从链表头取节点;

ArrayBlockingQueue与LinkedBlockingQueue对比

队列 是否阻塞 是否有界 线程安全 适用场景
ArrayBlockingQueue 一把ReentrantLock锁 生产消费模型,平衡处理速度
LinkedBlockingQueue 可配置 两把ReentrantLock锁 生产消费模型,平衡处理速度

ArrayBlockingQueue

  • 数据结构:数组,存储空间预先分配,无需动态申请空间,使用过程中内存开销较小;

LinkedBlockingQueue

  • 数据结构:单项链表,存储空间动态申请,会增加JVM垃圾回收负担;
  • 两把锁,并发性能较好;
  • 可设置为无界,吞吐量比较大,但是不稳定,入队速度太快有可能导致内存溢出。

4、LinkedBlockingDeque

与LinkedBlockingQueue类似,只不过底层的数据结构是双向链表,并且增加了可以从队列两端插入和移除元素的方法,支持FIFOFILO。相关方法定义:

  • putFirst(E e)
  • putLast(E e)
  • E getFirst()
  • E getLast()
  • E takeFirst()
  • E takeLast()

LinkedBlockingQueue与LinkedBlockingDeque对比

LinkedBlockingQueue:

  • FIFO;
  • 读写分开两个ReentrantLock;

LinkedBlockingDeque:

  • FIFO & FILO;
  • 全局一把ReentrantLock;

5、PriorityBlockingQueue

是一个无界队列

存储结构:

private transient Object[] queue;

内部会构造为一颗平衡的二叉小顶堆,根据构造函数中传入的Comparator进行排序或者没有传的情况下使用自然的排序方法,数组的第一个元素为最小的元素。

全局一把ReentrantLock锁。

5.1、put方法

无界队列,一定可以添加成功,无需阻塞。容量不够则扩容,put完会重新构建小顶堆。关键代码如下:

public boolean offer(E e) {
  if (e == null)
    throw new NullPointerException();
  final ReentrantLock lock = this.lock;
  // 尝试获取锁
  lock.lock();
  int n, cap;
  Object[] array;
  // 如果数组空间不够,尝试扩容:通常会扩大约50%
  while ((n = size) >= (cap = (array = queue).length))
    tryGrow(array, cap);
  try {
    // 往小顶堆插入元素
    Comparator<? super E> cmp = comparator;
    if (cmp == null)
      siftUpComparable(n, e, array);
    else
      siftUpUsingComparator(n, e, array, cmp);
    // 元素个数+1
    size = n + 1;
    // 唤醒等待最久的取数线程
    notEmpty.signal();
  } finally {
    // 释放锁
    lock.unlock();
  }
  return true;
}

跟前面的各种阻塞队列实现思路基本一致,这里比较有意思的是数组的扩容和往小顶堆插入元素的处理逻辑,由于篇幅所限,这里不展开讲了,感兴趣的朋友可以前去了解下。

5.2、take方法

队列为空的时候进入条件等待,take完元素之后,立刻重新构建小顶堆。

public E take() throws InterruptedException {
  final ReentrantLock lock = this.lock;
  // 获取锁
  lock.lockInterruptibly();
  E result;
  try {
    // 尝试获取最小元素,即小顶堆第一个元素,然后重新排序,如果不存在表示队列暂无元素,进行阻塞等待。
    while ( (result = dequeue()) == null)
      notEmpty.await();
  } finally {
    // 释放锁
    lock.unlock();
  }
  return result;
}

这里比较有趣的是dequeue()方法,涉及到取最小元素,然后重新排序,由于篇幅所限,这里不展开讲了,感兴趣的朋友可以前去了解下。

6、SynchronousQueue

通过使用SynchronousQueue,我们可以在线程之间安全的传递变量,A线程把需要传递的变量放入SynchronousQueue,B线程读取。该队列特点如下:

  • 容量永远为0;
  • put操作阻塞,直到另一个线程取走了队列中的元素;
  • take操作阻塞,直到另一个线程put一个元素到队列中;
  • 任何线程只能取得其他线程put进去的元素。

与其他阻塞队列不同的是,SynchronousQueue不依赖与AQS实现,而是直接使用CAS操作实现的,这导致代码中有大量的判断是否数据被并发改写了,并做相应的处理。

我们不推荐使用的无界线程池Executors.newCachedThreadPool()底层就是用到了SynchronousQueue来实现的。

SynchronousQueue具有公平模式和非公平模式的区别,两者的实现不太一样,接下来就介绍一下。

6.1、公平模式

公平模式下,底层的数据结构是一个单向链表,对应实现类为:TransferQueue。

底层数据结构与LinkedBlockingQueue类似,只不过阻塞的条件不同

  • LinkedBlockingQueue在队列满的时候put线程会阻塞,在队列空的时候,take线程会阻塞;
  • SynchronousQueue put进去的元素没有被take的时候,put线程阻塞,take线程获取不到元素的时候,take线程阻塞;

如下图,刚开始有三个线程执行了put操作,都阻塞等待了:

然后有一个新的线程4执行了take操作,这里是FIFO队列,匹配上了线程1,于是线程1取了线程1节点的数据,然后同时唤醒了线程1,头节点向前推进:

这种FIFO的模式真是公平的体现。

大致执行流程就是这样子,比使用了AQS的简单,取而代之的是使用CAS,通过大量的检验节点是否变更和处理,以达到更高put和take的性能,不过代码就自然会变得很复杂了,感兴趣的朋友可以前往查看源码,这里就不做详细的解读了。

6.2、非公平模式

非公平模式,底层的数据结构是一个栈。代码也是比较复杂的,这里我直接用图来描述下其执行原理。

如下图,线程1、线程2、线程3依次执行put操作,入栈情况如下图,结果三个线程都阻塞了:

这个时候线程4执行take操作,会入栈,与栈顶的栈帧进行匹配:

匹配成功之后,唤醒匹配上的线程3,然后从栈中移除线程3和线程4

可以发现非公平模式下是LIFO的队列。

7、DelayQueue

延迟队列,提供给了在指定时间内才能获取队列元素的功能。

底层是通过PriorityQueue实现的:

  • put元素,触发Delayed接口的compareTo方法重新排序PriorityQueue小顶堆,让最小的元素(最快到期)排在最前面;一定会put成功,容量不够则扩容;
  • take元素,判断第一个元素是否到期,到期了则把原始poll出来(同时会重新构造小顶堆),否则会执行awaitNanos(delay),等待头节点元素到期之后,再重新获取元素。

8、各种阻塞锁对比

阻塞锁 数据结构 是否有界 线程安全 适用场景
ArrayBlockingQueue 数组 有界 一把ReentrantLock锁控制put和take。 生产消费模型,平衡处理速度
LinkedBlockingQueue 单向链表 可配置 两把ReentrantLock锁,put和take可以并发执行。 生产消费模型,平衡处理速度
LinkedBlockingDeque 双向链表 可配置 一把ReentrantLock锁控制put和take。 生产消费模型,平衡处理速度
优先级队列 PriorityBlockingQueue 二叉小顶堆 无界,会自动扩容 一把ReentrantLock锁控制put和take,队列为空的时候take进入条件等待; 短信队列中的验证码短信优先发送
同步队列 SynchronousQueue 单向链表或者栈 容量为1 CAS,put和take都会阻塞,直到配对成功为止; 线程之间传递数据
延迟队列 DelayQueue PriorityQueue,二叉小顶堆 无界,会自动扩容 一把ReentrantLock锁控制put和take,一次只能一个线程take,其他线程进入条件等待; 关闭超时空连接,任务超时处理…

9、使用案例

9.1、案例一:生产者消费者

下面是一个使用案例,使用到了阻塞队列和CountDownLatch。这个程序模拟:

  • 一个生产者线程,往阻塞队列中依次存入20条消息;
  • 一个消费者线程,一直尝试从阻塞线程中取出消息进行消费。

您可以使用上一节中 4.2.1~4.2.4中的任何一个阻塞队列替换掉代码中的阻塞队列,尝试看看效果:

public class BlockingQueueTest {

  public static void main(String[] args) {
    ExecutorService executor = Executors.newFixedThreadPool(2);
    // 这里可以替换为你想试用的阻塞队列
    BlockingQueue<String> queue = new PriorityBlockingQueue<>();
    executor.submit(new Producer(queue));
    executor.submit(new Cunsumer(queue));
    executor.shutdown();
  }
}

/**
 * 生产者线程,往阻塞队列中依次存入20条消息
 */
class Producer implements Runnable{

  private BlockingQueue<String> queue;

  Producer(BlockingQueue<String> queue) {
    this.queue = queue;
  }

  @Override
  public void run() {
    try {
      for (int i = 0; i < 20; i++){
        TimeUnit.MILLISECONDS.sleep(new Random().nextInt(1000));
        String threadName = Thread.currentThread().getName();
        String msg = threadName + " : " + (i + 1);
        queue.put(msg);
        System.out.println("producer: " + msg);
      }
      System.out.println("producer finished...");
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }

}

/**
 * 消费者线程,一直尝试从阻塞线程中取出消息进行消费
 */
class Cunsumer implements Runnable{

  private BlockingQueue<String> queue;

  Cunsumer(BlockingQueue<String> queue) {
    this.queue = queue;
  }

  @Override
  public void run() {
    try {
      while (true){
        TimeUnit.MILLISECONDS.sleep(new Random().nextInt(5000));
        String msg = queue.take();
        System.out.println("consumer " + Thread.currentThread().getName() + ", msg : " + msg);
      }
    } catch (Exception e) {
      e.printStackTrace();
    }
  }

}

9.2、案例二:优先级阻塞队列

下面是一个PriorityBlockingQueue的使用例子,可以发现每次put一个元素之后会自动排序,小顶堆第一个元素总是最小的那个,每次take出来的元素也是最小的,take出来之后也会再次自动排序:

public class PriorityBlockingQueueTest {

    public static void main(String[] args) throws InterruptedException {

        PriorityBlockingQueue<UserInfo> queue = new PriorityBlockingQueue<>();
        queue.put(new UserInfo(10, "User1"));
        System.out.println("priorityBlockingQueue: " + queue);

        queue.put(new UserInfo(5, "User2"));
        System.out.println("priorityBlockingQueue: " + queue);

        queue.put(new UserInfo(2,"User3"));
        System.out.println("priorityBlockingQueue: " + queue);

        queue.put(new UserInfo(4,"User4"));
        System.out.println("priorityBlockingQueue: " + queue);
        
        System.out.println("take data: " + queue.take());
        System.out.println("priorityBlockingQueue: " + queue);

        System.out.println("take data: " + queue.take());
        System.out.println("priorityBlockingQueue: " + queue);
    }
    
}

@Data
@NoArgsConstructor
@AllArgsConstructor
class UserInfo implements Comparable<UserInfo>{

    private int id;

    private String name;

    @Override
    public String toString() {
        return this.id + "";
    }
    @Override
    public int compareTo(UserInfo person) {
        return this.id > person.getId() ? 1 : ( this.id < person.getId() ? -1 :0);
    }
}

输出结果:

priorityBlockingQueue: [10]
priorityBlockingQueue: [5, 10]
priorityBlockingQueue: [2, 10, 5]
priorityBlockingQueue: [2, 4, 5, 10]
take data: 2
priorityBlockingQueue: [4, 10, 5]
take data: 4
priorityBlockingQueue: [5, 10]

9.3、案例三:SynchronousQueue的使用

下面使用SynchronousQueue的使用案例。大家可以替换成公平模式或者非公平模式,来查看程序输出结果,看看是FIFO还是LIFO:

ExecutorService executor = Executors.newFixedThreadPool(10);
// 构造函数参数设置公平模式或者非公平模式
SynchronousQueue<Integer> queue = new SynchronousQueue<>(false);

Runnable producer = () -> {
  Integer producedElement = ThreadLocalRandom
    .current()
    .nextInt();
  try {
    System.out.println(Thread.currentThread().getName() + " put " + producedElement);
    queue.put(producedElement);
    System.out.println(Thread.currentThread().getName() + " put finished");
  } catch (InterruptedException ex) {
    ex.printStackTrace();
  }
};

Runnable consumer = () -> {
  try {
    TimeUnit.SECONDS.sleep(1);
    Integer consumedElement = queue.take();
    System.out.println(Thread.currentThread().getName() + " take " + consumedElement);
  } catch (InterruptedException ex) {
    ex.printStackTrace();
  }
};

executor.execute(producer);
executor.execute(producer);
executor.execute(producer);
executor.execute(consumer);

executor.shutdown();
System.out.println(queue.size());

References

A Guide to Java SynchronousQueue

作者:arthinking

来源:https://www.itzhai.com/cpj/graphical-blocking-queue.html


版权声明:文末如注明作者和来源,则表示本文系转载,版权为原作者所有 | 本文如有侵权,请及时联系,承诺在收到消息后第一时间删除 | 如转载本文,请注明原文链接。
喜欢 (0)