• 欢迎访问 winrains 的个人网站!
  • 本网站主要从互联网整理和收集了与Java、网络安全、Linux等技术相关的文章,供学习和研究使用。如有侵权,请留言告知,谢谢!

图解并发辅助工具类

Java技术 winrains 来源:arthinking 6个月前 (03-31) 38次浏览

去年问我怎么学Java的那个五年级小学生又来向我问问题了,说Java中线程同步有没有好用的工具类。幸亏没有问我什么算法,瑟瑟发抖。这个我倒是还挺在行的,了解到他喜欢熊出没,于是我就用熊作为主角,给他分享了几个Java并发框架中的辅助工具类。(呼~松了一口气)

看文本片文章,你将了解到:

  1. CountDownLatch的工作原理和使用场景;
  2. CyclicBarrier的工作原理和使用场景;
  3. Semaphore的工作原理和使用场景。

1、闭锁 CountDownLatch

一个同步工具类,允许一个或者多个线程一直等待,直到其他线程的操作都执行完成之后再继续往下执行。

使用场景:在一些应用场合中,需要等待某个条件达到要求后才能做后面的事情;同时当线程都完成后也会触发事件,以便进行后面的操作。 这个时候就可以使用CountDownLatch。CountDownLatch最重要的方法是countDown()await(),前者主要是计数减一,后者是等待计数到0,如果没有到达0,就继续阻塞等待。

方法详细介绍:CountDownLatch的介绍和使用

为了方便理解,这不,我又发挥了以下我的动画绘制功底,写了一个动图:

如上图,左边三只小熊,可以当成三个线程,每一只撞到栏杆,计数器就减1,这相当于执行了countDown方法;

右边有两只暴走小熊在等待计数器变为0,可以当成两个线程,执行了await方法;

最终左边三只暴走小熊抵达了栏杆处,计数器变为0,唤醒了右边的暴走小熊,暴走小熊就开始动起来了。

1.1、执行原理

CountDownLatch是基于AQS共享模式的使用。

如下图,我们通过给CountDownLatch构造函数传入state的值。

countDown方法本质是释放共享锁,核心实现逻辑是:state>0 && state-1,如果state>0,则state减一,否则执行失败;

await方法本质是获取共享锁,核心实现是:getState()==0,如果state==0,则表示获取成功,否则线程阻塞进入等待队列;

当state减到0的时候,会唤醒等待队列中的所有线程,尝试继续获取共享锁,这个时候正常是所有线程都能获取成功的。

1.2、使用案例

三个线程共同拉取一块数据,每个线程拉取数据块的一部分,等到所有线程的数据都拉取过来之后,另一个处理线程再开始这个数据块。

下载线程:

class Downloader implements Runnable{

  private CountDownLatch latch;

  private String downloaderName;

  /**
   * 构造函数
   * @param downLatch 注意, 所有需要协作的线程需要使用同一个闭锁
   * @param downloaderName
     */
  public Downloader(CountDownLatch downLatch, String downloaderName){
    this.latch = downLatch;
    this.downloaderName = downloaderName;
  }

  public void run() {
    this.download();

    try {
      TimeUnit.SECONDS.sleep(new Random().nextInt(10));
    } catch (InterruptedException e) {
      System.out.println("Thread interrupt status: " + Thread.currentThread().isInterrupted());
    }
    System.out.println(this.downloaderName + "下载完成...");
    this.latch.countDown();
  }

  private void download(){
    System.out.println(this.downloaderName + "正在下载文件...");
  }

}

数据处理线程:

class DataProcessor implements Runnable {

  private CountDownLatch latch;

  /**
   * 构造函数
   * @param latch 注意, 所有需要协作的线程需要使用同一个闭锁
     */
  public DataProcessor(CountDownLatch latch){
    this.latch = latch;
  }

  public void run() {
    System.out.println("等待下载完数据...");
    try {
      this.latch.await();
    } catch (InterruptedException e) {
      System.out.println("Thread interrupt status: " + Thread.currentThread().isInterrupted());
    }
    System.out.println("数据下载完成, 开始处理数据...");
  }

}

运行代码:

ExecutorService executor = Executors.newFixedThreadPool(4);

CountDownLatch latch = new CountDownLatch(3);

Downloader d1 = new Downloader(latch, "下载线程1");
Downloader d2 = new Downloader(latch, "下载线程2");
Downloader d3 = new Downloader(latch, "下载线程3");

DataProcessor processor = new DataProcessor(latch);
executor.execute(d1);
executor.execute(d2);
executor.execute(d3);
executor.execute(processor);
executor.shutdown();

执行结果:

等待下载完数据...
下载线程1正在下载文件...
下载线程2正在下载文件...
下载线程3正在下载文件...
下载线程3下载完成...
下载线程2下载完成...
下载线程1下载完成...
数据下载完成, 开始处理数据...

1.3、其他说明

类似的,我们也可以使用Thread.join方法实现控制线程执行顺序,但是没有那么灵活。如果我们把任务都丢到线程池里面多线程执行,那么就不能手动的在一个线程里面调用另一个线程的join方法了。

2、栅栏 CyclicBarrier

屏障,或成为栅栏,我们在之前 一文带你彻底理解同步和锁的本质(干货) 一文有讨论到过,主要是达到这种目的:所有线程都准备就绪,就着手下一阶段的工作,否则不能进入下一阶段。

我们还是让上面的小熊来演示一下。

上面5只小熊,准备跑到起跑线,跑到起跑线等待,相当于执行了await方法,等到所有小熊准备就绪之后,然后一起开跑。这就很好的揭示了内存屏障的作用了。

2.1、执行原理

CyclicBarrier是基于ReentrantLock的Condition来实现的。

如下图,栅栏中有两个关键属性:

  • parties:栅栏计数器初始值
  • count:栅栏计数器

其中CyclicBarrier的await()方法封装了对ReentrantLock条件锁的使用,主要处理流程:

  • 获取ReentrantLock锁;
  • count减1,如果此时count为0,那么唤醒等待队列中所有线程,并结束这一轮处理,重置屏障,否则进入下一步;
  • 执行condition.await方法,把当前线程丢到条件队列;
  • 当count减少到0的时候,执行condition.signalAll方法把条件队列中的所有线程节点都移动到等待队列;
  • 最后唤醒同步队列中的线程节点,线程从condition.await阻塞处醒来继续执行:获取ReentrantLock锁,用当前线程节点替换旧的头节点,最终放ReentrantLock锁,继续让线程往下执行(每个线程依次获取、锁释放锁)。如下图:

await()能够响应中断。除此之外,await还提供了带有超时的实现await(long timeout, TimeUnit unit),以及reset()方法重新开启下一轮,具体大家可以看源码的实现。

2.2、使用案例

下面的案例模拟了赛跑,只有当所有运动员都在起跑线上准备好了,才允许他们开跑:

public class CyclicBarrierTest {

  public static void main(String[] args) {
    CyclicBarrier barrier = new CyclicBarrier(3);
    ExecutorService executor = Executors.newFixedThreadPool(3);
    executor.submit(new Runner(barrier, "1号选手"));
    executor.submit(new Runner(barrier, "2号选手"));
    executor.submit(new Runner(barrier, "3号选手"));
    barrier.reset();
    executor.submit(new Runner(barrier, "4号选手"));
    executor.submit(new Runner(barrier, "5号选手"));
    executor.submit(new Runner(barrier, "6号选手"));
    executor.shutdown();
  }
}

class Runner implements Runnable {

  /**
   * 一个同步辅助类,它允许一组线程互相等待,直到到达某个公共屏障点 (common barrier point)
   */
  private CyclicBarrier barrier;

  private String name;

  public Runner(CyclicBarrier barrier, String name) {
    super();
    this.barrier = barrier;
    this.name = name;
  }

  @Override
  public void run() {
    try {
      Thread.sleep(1000 * (new Random()).nextInt(8));
      System.out.println(name + " 准备好了...");
      // barrier的await方法,在所有参与者都已经在此 barrier 上调用 await 方法之前,将一直等待。
      barrier.await();
    } catch (InterruptedException | BrokenBarrierException e) {
      e.printStackTrace();
    }
    System.out.println(name + " 起跑!");
    for (int i = 0; i < 10; i++) {
      System.out.println(this.name + "正在跑步" + i);
    }
  }
}

2.3、其他说明

CountDownLatch是若干个线程等待另外n个线程完成某件事之后才能执行;而CyclicBarrier是若干个线程互相等待,只有等到所有线程都执行了await只会,这若干个线程才可以继续往下执行

3、信号量 Semaphore

信号量通过一组许可证来控制对共享资源的访问。

如果需要,可以用acquire()方法获取许可,如果许可为0,那么会进行阻塞,通过使用release()方法释放许可,把许可归还给Semaphore,归还之后,阻塞的线程优惠醒来尝试获取许可。

Semaphore提供给了若干个api对应不同的功能:

  • Semaphore(int permits):非公平模式创建;
  • Semaphore(int permits, boolean fair):可以指定是否公平模式创建;
  • acquire():尝试获取1个许可,如果没有许可则阻塞,可以被中断停止等待;
  • acquire(int permits):跟上一个方法类型,尝试获取permits个许可;
  • acquireUninterruptibly():尝试获取一个许可,不可中断;
  • acquireUninterruptibly(int permits):尝试获取permits个许可,不可中断;
  • tryAcquire():尝试获取一个许可,获取不到则直接返回失败;
  • tryAcquire(int permits):尝试获取permits个许可,获取不到则直接返回失败;
  • tryAcquire(int permits, long timeout, TimeUnit unit):尝试在timeout时间内获取permits个许可,超时则返回false,可被中断;
  • tryAcquire(long timeout, TimeUnit unit):尝试在timeout时间内获取1个许可,超时则返回false,可被中断;
  • release():释放一个许可;
  • release(int permits):释放n个许可;

下面演示基于公平锁的Semaphore,获取锁使用acquireUninterruptibly()

这里设置的许可为2,可以发现,同一时刻最多只能有两个线程获得许可。

3.1、执行原理

Semaphore的执行原理相对来说比较简单。下面描述了可中断非公平的信号量实现原理,ASQ中的state值就相当于许可的数量:

  • 执行acquire的时候,会尝试让state – acquires,如果发现许可足够,则进行cas更新,扣减许可,否则线程进入等待队列;
  • 执行release的时候,state + releases,把许可加回去。

3.2、使用案例

下面演示了使用semaphore实现限流的机制,模拟20个客户端线程尝试执行业务逻辑,同一时刻最多只有5个线程能够并发的执行。

// 线程池
ExecutorService exec = Executors.newCachedThreadPool();
// 只能5个线程同时访问
final Semaphore semp = new Semaphore(5);
// 模拟20个客户端访问
for (int index = 0; index < 20; index++) {
  final int NO = index;
  Runnable run = () -> {
    try {
      // 获取许可
      if(semp.tryAcquire()) {
        System.out.println("线程获得许可: " + NO);
        Thread.sleep((long) (Math.random() * 10000));
        // 访问完后,释放
        semp.release();
      } else {
        System.out.println("达到并发上限,请求失败,请稍后再试");
      }

    } catch (InterruptedException e) {
      System.out.println("执行异常");
    }
  };
  exec.execute(run);
}
// 退出线程池
exec.shutdown();

注意,这里使用的是tryAcquire失败之后直接返回,线程不会进入AQS等待队列。

作者:arthinking

来源:https://www.itzhai.com/cpj/graphical-several-fun-concurrent-helper-classes.html


版权声明:文末如注明作者和来源,则表示本文系转载,版权为原作者所有 | 本文如有侵权,请及时联系,承诺在收到消息后第一时间删除 | 如转载本文,请注明原文链接。
喜欢 (0)