• 欢迎访问 winrains 的个人网站!
  • 本网站主要从互联网整理和收集了与Java、网络安全、Linux等技术相关的文章,供学习和研究使用。如有侵权,请留言告知,谢谢!

ReentrantLock 介绍与使用

Java技术 winrains 来源:arthinking 8个月前 (03-21) 73次浏览

导读

本文为您介绍JUC中ReentrantLock的各种API和使用案例,阅读完本文,您将了解到:

  1. 如何使用tryLock避免顺序死锁问题
  2. 如何通过tryLock来重试获取锁
  3. 如何使用可中断锁,以及可中断锁的实现原理;
  4. ReentrantLock和synchronized的区别,以及如何选择?

ReentrantLock是J.U.C包下提供的独占锁锁,根据其名称可知,该锁是可重入的。主要提供以下功能更:

  • 等待可中断:该类提供了lockInterruptibly()方法实现了等待可中断机制,这种机制使得如果当前线程正在等待锁,但其他线程请求该锁,则当前线程将被中断并立即返回而无需获取锁;
  • 公平锁:在并发获取同一个锁的场景,线程必须按照申请获取锁的顺序进行获取。一般是在ReentrantLock构造函数中传递true实例化一个公平锁;
    • Lock lock = new ReentrantLock(true);
  • 支持多条件的锁:synchronized关键字配合Object.wiat()和Object.notify()或者Object.notifyAll()方法,可以实现线程之间的同步。类似的,Lock中也提供了这样的机制。是通过Condition类提供的方法实现的。

如果线程重入了一个锁,那么ReentrantLock的锁计数器会加1,每个解锁请求,锁计数器减1。当锁计数器为0的时候,表示资源被解锁了。

synchronized通过操作Mark Word实现同步,锁标识存储于Mark Word中;ReentrantLock通过AQS(抽象同步队列)实现同步,锁标识存储于AQS的state属性中。

1、关键方法

  • lock(): 调用该方法会使锁计数器加1,如果共享资源最初是空闲的,则将锁定并授予线程;
  • unlock(): 调用该方法使锁计数器减1,当计数达到0的时候,将释放资源;
  • tryLock(): 如果资源没有被任何其他线程占用,那么该方法返回true,并且锁计数器加1。如果资源不是空闲的,则该方法返回false。这个时候线程不会阻塞,而是直接退出返回结果;
  • lockInterruptible(): 该方法使得资源空闲时允许该线程在获取资源时被其他线程中断。也就是说:如果当前线程正在等待锁,但其他线程请求该锁,则当前线程将被中断并立即返回,不会继续等待获取锁;
  • getHoldCount(): 获取资源上持有的锁的计数器;
  • isHeldByCurrentThread: 如果资源锁有当前线程持有,则此方法返回true。

请始终在finally块中调用unlock语句,以确保即使在方法body中引发了异常,也可以释放锁。

比较难衡量的是究竟要休眠多久,这得看另一个已经进入临界区的线程究竟需要执行多久。如果频繁的休眠,会导致频繁切换用户态和内核态,比较占用资源。

2、使用案例

2.1、通过tryLock避免锁顺序死锁问题

对于具有锁顺序的场景,我们可以通过使用tryLock避免死锁问题。下面是一个具有锁顺序的例子:

package com.itzhai.concurrency;

import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

/**
 * Created by arthinking on 1/3/2020.
 */
public class RetryTest {

    public static void main(String[] args) {

        BusinessObject ob1 = new BusinessObject();
        ob1.setBoName("业务实体1");
        ob1.setLock(new ReentrantLock());

        BusinessObject ob2 = new BusinessObject();
        ob2.setBoName("业务实体2");
        ob2.setLock(new ReentrantLock());

        Thread thread1 = new Thread() {
            public void run (){
                try {
                    doService(ob1, ob2, 10, TimeUnit.SECONDS);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
        thread1.setName("线程1");
        thread1.start();

        Thread thread2 = new Thread() {
            public void run (){
                try {
                    doService(ob2, ob1, 10, TimeUnit.SECONDS);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
        thread2.setName("线程2");
        thread2.start();
    }

    public static boolean doService(BusinessObject bo1,
                                    BusinessObject bo2,
                                    long timeout,
                                    TimeUnit unit) throws InterruptedException {
        boolean isSuccess = false;
        String threadName = Thread.currentThread().getName();
        long stopTime = System.nanoTime() + unit.toNanos(timeout);
        while (true) {
            // 获取第一个锁
            System.out.println (threadName + " 请求获取 " + bo1.getBoName());
            if (!bo1.getLock().tryLock()) {
                System.out.println (threadName + " 锁定失败 " + bo1.getBoName());
                if (isTimeout(threadName, stopTime)) return false;
                continue;
            }

            try {
                System.out.println (threadName + " 锁定成功 " + bo1.getBoName());
                Thread.sleep(new Random().nextInt(100));

                // 获取第二个锁
                System.out.println (threadName + " 请求获取 " + bo2.getBoName());
                if (!bo2.getLock().tryLock()) {
                    System.out.println (threadName + " 锁定失败 " + bo2.getBoName());
                    if (isTimeout(threadName, stopTime)) return false;
                    continue;
                }
                try {
                    System.out.println(threadName + " 锁定成功 " + bo2.getBoName());
                    System.out.println("+++" + threadName + " 执行完成");
                    isSuccess = true;
                    return true;
                } finally {
                    if (bo2.getLock().isHeldByCurrentThread()) {
                        bo2.getLock().unlock();
                        System.out.println(threadName + " 释放锁 " + bo2.getBoName());
                    }
                }
            } catch (Exception e) {
                System.out.println(e);
            } finally {
                if (bo1.getLock().isHeldByCurrentThread()) {
                    bo1.getLock().unlock();
                    System.out.println(threadName + " 释放锁 " + bo1.getBoName());
                }
                if (!isSuccess) {
                    // 如果当前线程业务没有执行完成,则在释放锁之后,尝试休眠一下,以便让其他线程有处理机会
                    System.out.println("休眠一下再试...");
                    Thread.sleep(new Random().nextInt(100));
                }
            }
        }
    }

    private static boolean isTimeout(String threadName, long stopTime) throws InterruptedException {
        if (stopTime < System.nanoTime()) {
            System.out.println(threadName + "重试超时...");
            return true;
        }
        return false;
    }

}

class BusinessObject {

    private ReentrantLock lock;

    private String boName;

    public ReentrantLock getLock() {
        return lock;
    }

    public void setLock(ReentrantLock lock) {
        this.lock = lock;
    }

    public String getBoName() {
        return boName;
    }

    public void setBoName(String boName) {
        this.boName = boName;
    }
}

可以通过使用循环重试或者定时重试让tryLock失败的时候,重新循环的去重试。很重要的一点是:您需要设置一个超时时间,避免一直在重试

可以看到,这个逻辑还是稍微有点复杂的,一定要在释放锁之后,休眠片刻,以便让其他线程有机会重试获取当前线程刚刚释放的锁。

2.2、使用带有时间限制的tryLock重试获取锁

如果一个资源锁,多个线程都会尝试获取,如果每个线程占有锁的时间都不长,那么通过这种方式还是比较好的,可以避免线程阻塞导致的用户态内核态切换。这也就是自旋锁的特点。当然,如果每个线程都占用锁很长时间,那么这种方式就不太可取了。

如果使用的是synchronized关键字,其实在轻量级锁阶段就会有自旋等待的操作,并且synchronized实现了自适应的自旋等待,优化的还是比较好的。不同的点是:tryLock在自旋超时后就放弃了,而synchronized则会进一步升级为重量级锁,继续等待线程调度去尝试获取锁,这一点上是比较消耗性能的。

如果你的业务运行获取搜失败,或者执行失败,那么通过tryLock还是不错的。像前端发起的请求,一次执行失败了,那可以让前端重新发起,这种场景就比较适合tryLock了。

下面是使用的方法:

if (lock2.tryLock(3, TimeUnit.SECONDS)) {
  try {
    System.out.println(Thread.currentThread().getName() + "获取到了lock2");
  } finally {
    lock2.unlock();
    System.out.println(Thread.currentThread().getName() + "释放了lock2");
  }
} else {
  System.out.println(Thread.currentThread().getName() + "获取lock2失败");
}

2.3、可中断锁

如果在一个线程中使用lock.lockInterruptibly(),只有该线程执行了interrupt()方法之后,lockInterruptibly才起作用。

可以查看源码:

public final void acquireInterruptibly(int arg)
  throws InterruptedException {
  if (Thread.interrupted())
    throw new InterruptedException();
  if (!tryAcquire(arg))
    doAcquireInterruptibly(arg);
}

也就是说,会先去查看中断状态位,如果中断状态为为true,那么就不会尝试获取锁了,直接跑异常。这个中断状态为正是interrupt()方法设置的。

tryLock()底层代码也会判断中断状态,决定是否要继续尝试获取锁:

> public final boolean tryAcquireNanos(int arg, long nanosTimeout)
>   throws InterruptedException {
>   if (Thread.interrupted())
>     throw new InterruptedException();
>   return tryAcquire(arg) ||
>     doAcquireNanos(arg, nanosTimeout);
> }
>

也就是说lock.lockInterruptibly相当于不限超时时间的tryLock

要了解中断锁的使用,就必须知道Thread.interrupt()意味着什么。

Thread.interrupt()

该方法将会设置线程的中断状态位。

判断线程是否被中断,可以使用Thread.currentThread().isInterrupted()方法。我们可以在程序中检查这个状态位,来做一些逻辑处理。

判断是否被中断,如果中断则清楚中断位:Thread.currentThread().interrupted()。

如果一个线程处于阻塞状态:如调用了thread.sleep,thread.join,thread.wait,condition.await,以及可中断通道上的I/O操作方法后,这些阻塞方法会定时去判断中断状态位,如果中断状态位为true,则会在调用处抛出InterruptedException异常,并且在抛出异常后立即清除中断位。这些方法声明处都会抛出InterruptedException,表示这些方法是可中断的,会对interrupt调用做出响应,异常都是由可中断方法自己抛出,并不是由interrupt方法直接引起的。

synchronized在获取锁的过程中不能被中断,因为锁定的位置根本无法抛出异常。

下面一个例子是演示通过lock.lockInterruptibly获取锁:

  • 1号获取到锁的线程进行sleep阻塞;
  • 后续2,3,4,5号线程在等待锁的过程中,被interrupt打断了,导致抛出InterruptedException异常;
  • 6号线程在1号线程释放锁之后,成功获取到了所;
  • 但是6号线程在休眠过程中很快又被打断然后释放锁了;
  • 最终7号线程获取到锁,接着又被打断,释放锁。
package com.itzhai.concurrency;

import java.util.concurrent.locks.ReentrantLock;

/**
 * Created by arthinking on 1/3/2020.
 */
public class AcquireLockRunnable implements Runnable{

    public static void main(String[] args) throws InterruptedException {

        ReentrantLock lock = new ReentrantLock();

        // 1号线程启动并获取锁,然后休眠2000毫秒
        Thread firstThread = new Thread(new AcquireLockRunnable(lock, 1), "线程(1)");
        firstThread.start();
        Thread.sleep(1000);

        // 依次启动其他线程
        Thread[] others = new Thread[6];
        for (int i = 0; i < 6; i++) {
            others[i] = new Thread(new AcquireLockRunnable(lock, i + 2), "线程(" + (i + 2) + ")");
            others[i].start();
        }
        print("开始给所有线程发送中断...");
        for (int i = 0; i < 6; i++) {
            Thread.sleep(500 * i / 2);
            print("Interrupt " + others[i].getName());
            others[i].interrupt();
        }
    }


    private int id;
    private ReentrantLock lock;

    private AcquireLockRunnable(ReentrantLock lock, int id) {
        this.lock = lock;
        this.id = id;
    }

    public void run() {
        print("开启线程 " + id + " 尝试获取锁...");
        try {
            lock.lockInterruptibly();
        } catch (InterruptedException e) {
            print("获取锁失败, 原因: " + e);
            return;
        }
        print("获取到锁(" + id + ")");
        try {
            try {
                if (id == 1) {
                    Thread.sleep(3000);
                } else {
                    Thread.sleep(2500);
                }
            } catch (InterruptedException e) {
                print("线程 " + id + " 休眠过程中被打断");
            }
        } finally {
            lock.unlock();
            print("释放锁(" + id + ")");
        }
    }

    static void print(String p) {
        System.out.println(Thread.currentThread().getName() + ": " + p);
    }
}

输出结构:

线程(1): 开启线程 1 尝试获取锁...
线程(1): 获取到锁(1)
线程(2): 开启线程 2 尝试获取锁...
线程(3): 开启线程 3 尝试获取锁...
线程(4): 开启线程 4 尝试获取锁...
线程(5): 开启线程 5 尝试获取锁...
线程(6): 开启线程 6 尝试获取锁...
main: 开始给所有线程发送中断...
线程(7): 开启线程 7 尝试获取锁...
main: Interrupt 线程(2)
线程(2): 获取锁失败, 原因: java.lang.InterruptedException
main: Interrupt 线程(3)
线程(3): 获取锁失败, 原因: java.lang.InterruptedException
main: Interrupt 线程(4)
线程(4): 获取锁失败, 原因: java.lang.InterruptedException
main: Interrupt 线程(5)
线程(5): 获取锁失败, 原因: java.lang.InterruptedException
线程(1): 释放锁(1)
线程(6): 获取到锁(6)
main: Interrupt 线程(6)
线程(6): 线程 6 休眠过程中被打断
线程(6): 释放锁(6)
线程(7): 获取到锁(7)
main: Interrupt 线程(7)
线程(7): 线程 7 休眠过程中被打断
线程(7): 释放锁(7)

Process finished with exit code 0

2.4、条件变量

我们知道,synchronized关键字配合Object.wiat()和Object.notify()或者Object.notifyAll()方法,可以实现线程之间的同步。

类似的,Lock中也提供了这样的机制。是通过Condition类提供的方法实现的:

  • Condition.await()方法相当于Object.wait();
  • Condition.await(long time, TimeUnit unit)方法相当于Object.wait(long timeout);
  • Condition.signal()方法相当于Object.notify()方法;
  • Condition.signalAll()方法相当于Object.notifyAll()方法;

对比:

与synchronized代码块的wait方式相比,Lock中的condition.wait更加灵活;

condition.wait()可以将不同条件的线程放入不同的等待队列,而synchronized中所有线程会放入同一个等待队列,在大量线程唤醒的时候会造成资源浪费

Lock对象同步只需要依赖Lock对象,而synchronized必须配合一个特定的对象使用。

下面是一个消费者生产者的例子,其中有一个共同的锁ReentrantLock,具有两个条件:

  • 队列未满的条件,当符合条件的时候,通知生产者可以生产消息了;
  • 队列非空的条件,当符合条件的时候,通知消费者可以消费消息了。

以下是完整的代码:

package com.itzhai.concurrency;

import java.util.LinkedList;
import java.util.Queue;
import java.util.Random;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * Created by arthinking on 1/3/2020.
 */
public class ProducerConsumerSolutionUsingLock {

    public static void main(String[] args) {

        // 生产者线程和消费者线程需要操作的对象
        ProducerConsumerImpl sharedObject = new ProducerConsumerImpl();

        // 创建生产者线程和消费者线程
        Producer p = new Producer(sharedObject);
        Consumer c = new Consumer(sharedObject);

        // 启动生产者线程和消费者线程
        c.start();
        p.start();

        while (Thread.activeCount() > 2) {
            Thread.yield();
        }

        System.out.println("总共消费消息条数: " + sharedObject.getConsumeCount());
    }
}


class ProducerConsumerImpl {
    // 队列大小
    private static final int CAPACITY = 10;
    private final Queue queue = new LinkedList<>();
    private final Random theRandom = new Random();

    private int consumeCount = 0;

    // 锁和条件变量
    private final Lock lock = new ReentrantLock();
    // 队列未满的条件
    private final Condition queueNotFull = lock.newCondition();
    // 队列非空的条件
    private final Condition queueNotEmpty = lock.newCondition();

    /**
     * 生产者生产消息
     * @throws InterruptedException
     */
    public void produce() throws InterruptedException {
        lock.lock();
        try {
            while (queue.size() == CAPACITY) {
                System.out.println(Thread.currentThread().getName()
                        + ", 生产者准备生产消息, 但是队列满了, 等待...");
                queueNotFull.await();
            }

            int number = theRandom.nextInt();
            boolean isAdded = queue.offer(number);
            if (isAdded) {
                System.out.printf("%s 添加 %d 到队列 %n", Thread
                        .currentThread().getName(), number);

                // 通知消费者队列中已经有消息了
                System.out.println(Thread.currentThread().getName()
                        + ", 通知消费者队列中已经有消息了");
                queueNotEmpty.signalAll();
            }
        } finally {
            lock.unlock();
        }
    }

    /**
     * 消费者消费消息
     * @throws InterruptedException
     */
    public void consume() throws InterruptedException {
        lock.lock();
        try {
            while (queue.size() == 0) {
                System.out.println(Thread.currentThread().getName()
                        + ", 消费者准备消费消息, 但是队列空了, 等待...");
                queueNotEmpty.await();
            }

            Object value = queue.poll();
            if (value != null) {
                consumeCount ++;
                System.out.printf("%s 从消息队列消费 %d %n", Thread
                        .currentThread().getName(), value);

                // signal producer thread that, buffer may be empty now
                System.out.println(Thread.currentThread().getName()
                        + ", 通知生产者现在消息队列有空间了, 可以来生产消息了");
                queueNotFull.signalAll();
            }

        } finally {
            lock.unlock();
        }
    }

    public int getConsumeCount() {
        return consumeCount;
    }

}

class Producer extends Thread {
    ProducerConsumerImpl pc;

    public Producer(ProducerConsumerImpl sharedObject) {
        super("PRODUCER");
        this.pc = sharedObject;
    }

    @Override
    public void run() {
        try {
            for (int i = 0; i < 100; i++) {
                pc.produce();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

class Consumer extends Thread {
    ProducerConsumerImpl pc;

    public Consumer(ProducerConsumerImpl sharedObject) {
        super("CONSUMER");
        this.pc = sharedObject;
    }

    @Override
    public void run() {
        try {
            for (int i = 0; i < 100; i++) {
                pc.consume();
            }
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
}

3、ReentrantLock与synchronized的异同

3.1、synchronized

优点

  • JDK进行了优化,会有锁升级的过程,在没有竞争场景下通过偏向锁或者轻量级锁提高性能。未来JVM会更多支持原生锁的优化;
  • Java语法内存,使用简单,无需手动获取锁和解锁。通过底层指令实现的管程自动达到同步的目的;

缺点

  • 可控性比较低,不够灵活,一个锁只能绑定一个条件。锁对象的wait()和notify()或notifyAll()方法可以实现一个隐含的条件,如果要关联多个条件,就不得额外添加锁了;
  • 非公平锁,在锁被释放的时候,任何等待的锁都有机会获得锁,有的线程会长时间得不到执行,但非公平锁优势是吞吐量会大一点;
  • 程序无法获取到是否已经获取到了锁。

3.2、ReentrantLock

优点

  • 支持公平锁:
  • 支持多条件的锁:ReentrantLock能够将wait/notify/notifyAll对象化;
  • 能够获取到锁定状态:
  • 实现了可中断的锁;
  • 支持带超时的获取锁尝试,避免了死锁;

缺点

  • 需要手动锁定解锁,使用比较繁琐,容易出错;
  • 随着JDK对synchronized锁的不断优化,ReentrantLock性能未必会比synchronized锁高;

3.3、如何选择

这两个锁如何选择呢?

我们可以看到,ReentrantLock更加灵活,提供给了包括定时的锁等待可中断的锁等待公平性,以及实现非块结构的加锁

但是ReentrantLock的危险性比synchronized要高,内置锁会随着JDK版本不断做优化。

仅当内置锁不能满足需求的时候,才可以考虑使用ReentrantLock。

References

Thread的中断机制(interrupt)

Java ReentrantLock Interruption Example

Java Lock and Condition Example using Producer Consumer Solution

作者:arthinking

来源:https://www.itzhai.com/cpj/introduction-and-use-of-reentrantlock.html


版权声明:文末如注明作者和来源,则表示本文系转载,版权为原作者所有 | 本文如有侵权,请及时联系,承诺在收到消息后第一时间删除 | 如转载本文,请注明原文链接。
喜欢 (0)