• 欢迎访问 winrains 的个人网站!
  • 本网站主要从互联网整理和收集了与Java、网络安全、Linux等技术相关的文章,供学习和研究使用。如有侵权,请留言告知,谢谢!

ConcurrentHashMap 原理浅析

Java技术 winrains 来源:米奇罗 6个月前 (04-05) 35次浏览

1. 前言

为什么要使用 ConcurrentHashMap

主要基于两个原因:

  1. 在并发编程中使用 HashMap 可能造成死循环(jdk1.7,jdk1.8 中会造成数据丢失)
  2. HashTable 效率非常低下

2. ConcurrentHashMap 结构

jdk 1.7 和 jdk 1.8 中,ConcurrentHashMap 的结构有着很大的变化,后面会讲解。

2.1 jdk 1.7 中结构

jdk1.7 ConcurrentHashMap结构

在 jdk 1.7 中,ConcurrentHashMap 是由 Segment 数据结构和 HashEntry 数组结构构成。采取分段锁来保证安全性。

Segment 是 ReentrantLock 重入锁,在 ConcurrentHashMap 中扮演锁的角色;HashEntry 则用于存储键值对数据。

一个 ConcurrentHashMap 里包含一个 Segment 数组,一个 Segment 里包含一个 HashEntry 数组,Segment 的结构和 HashMap 类似,是一个数组和链表结构。

2.2 jdk 1.8 中结构

jdk1.8 ConcurrentHashMap结构

JDK1.8 的实现已经摒弃了 Segment 的概念,而是直接用 Node 数组+链表+红黑树的数据结构来实现,并发控制使用 Synchronized 和 CAS 来操作,整个看起来就像是优化过且线程安全的 HashMap,虽然在 JDK1.8 中还能看到 Segment 的数据结构,但是已经简化了属性,只是为了兼容旧版本。

3. 实现

3.1 JDK 1.7 中的实现

3.1.1 初始化

ConcurrentHashMap 的初始化是通过位运算来初始化 Segment 的大小的(ssize 表示),通过concurrentLevel 计算得出。

int sshift = 0;
int ssize = 1;
while (ssize < concurrencyLevel) {
    ++sshift;
    ssize <<= 1;
}

ssize 用位于运算来计算(ssize <<=1),所以 Segment 的大小取值都是以2的N次方,Segment 的大小 ssize 默认为16.

每一个 Segment 元素下的 HashEntry 的初始化也是按照位于运算来计算,用 cap 来表示

int cap = 1;
while (cap < c)
    cap <<= 1;

HashEntry 大小的计算也是2的N次方(cap <<=1), cap 的初始值为1,所以 HashEntry 最小的容量为2.

3.1.2 get 操作

Segment 的 get 操作实现非常简单和高效,先经过一次再散列,然后使用这个散列值通过散列运算定位到 Segment,再通过散列算法定位到元素。

public V get(Object key){
    int hash = hash(key.hashCode());
    return segmentFor(hash).get(key,hash);
}

get 操作的高效之处在于整个 get 过程都不需要加锁,除非读到空的值才会加锁重读。原因就是将使用的共享变量定义成 volatile 类型。

transient volatile int count;
volatile V value;

3.1.3 put 操作

对于 ConcurrentHashMap 的数据插入,这里要进行两次 Hash 去定位数据的存储位置

static class Segment<K,V> extends ReentrantLock implements Serializable {
    //省略
}

当执行put操作时,会经历两个步骤:

  1. 判断是否需要扩容
  2. 定位到添加元素的位置,将其放入 HashEntry 数组中

插入过程会进行第一次 key 的 hash 来定位 Segment 的位置,如果该 Segment 还没有初始化,即通过 CAS 操作进行赋值,然后进行第二次 hash 操作,找到相应的 HashEntry 的位置,这里会利用继承过来的锁的特性,在将数据插入指定的 HashEntry 位置时(尾插法),会通过继承 ReentrantLock 的 tryLock() 方法尝试去获取锁,如果获取成功就直接插入相应的位置,如果已经有线程获取该Segment的锁,那当前线程会以自旋的方式去继续的调用 tryLock() 方法去获取锁,超过指定次数就挂起,等待唤醒。

3.1.4 size 操作

计算 ConcurrentHashMap 的元素大小是并发操作的,就是在你计算 size 的时候,他还在并发的插入数据,这就可能会导致你计算出来的 size 和你实际的 size 有相差。

ConcurrentHashMap 采取的解决方法是先尝试 2 次通过不锁住 Segment 的方式来统计各个 Segment 大小,统计过程中如果 count 发生变化,则再采用加锁的方式来统计所有 Segment 的大小。

try {
    for (; ; ) {
        if (retries++ == RETRIES_BEFORE_LOCK) {
            for (int j = 0; j < segments.length; ++j)
                ensureSegment(j).lock(); // force creation  
        }
        sum = 0L;
        size = 0;
        overflow = false;
        for (int j = 0; j < segments.length; ++j) {
            Segment<K, V> seg = segmentAt(segments, j);
            if (seg != null) {
                /* 在put、remove、clean方法里操作
                * 元素都会将变量modCount进行加一,
                * 统计也是依靠这个变量的前后变化来进行的 */
                sum += seg.modCount;
                int c = seg.count;
                if (c < 0 || (size += c) < 0) overflow = true;
            }
        }
        if (sum == last)
            break;
        last = sum;
    }
} finally {
    if (retries > RETRIES_BEFORE_LOCK) {
        for (int j = 0; j < segments.length; ++j)
            segmentAt(segments, j).unlock();
    }
}

3.2 JDK 1.8 中的实现

3.2.1 基本属性及概念

看一下基本属性

//node数组最大容量:2^30=1073741824
private static final int MAXIMUM_CAPACITY = 1 << 30;
//默认初始值,必须是2的幂数
private static final int DEFAULT_CAPACITY = 16;
//数组可能最大值,需要与toArray()相关方法关联
static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
//并发级别,遗留下来的,为兼容以前的版本
private static final int DEFAULT_CONCURRENCY_LEVEL = 16;
//负载因子
private static final float LOAD_FACTOR = 0.75f;
//链表转红黑树阀值,> 8 链表转换为红黑树
static final int TREEIFY_THRESHOLD = 8;
//树转链表阀值,小于等于6(tranfer时,lc、hc=0两个计数器分别++记录原bin、新binTreeNode数量,<=UNTREEIFY_THRESHOLD 则untreeify(lo))
static final int UNTREEIFY_THRESHOLD = 6;
static final int MIN_TREEIFY_CAPACITY = 64;
private static final int MIN_TRANSFER_STRIDE = 16;
private static int RESIZE_STAMP_BITS = 16;
//2^15-1,help resize的最大线程数
private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;
//32-16=16,sizeCtl中记录size大小的偏移量
private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;
//forwarding nodes的hash值
static final int MOVED = -1;
//树根节点的hash值
static final int TREEBIN = -2;
//ReservationNode的hash值
static final int RESERVED = -3;
//可用处理器数量
static final int NCPU = Runtime.getRuntime().availableProcessors();
//存放node的数组
transient volatile Node<K,V>[] table;
/*控制标识符,用来控制table的初始化和扩容的操作,不同的值有不同的含义
    *当为负数时:-1代表正在初始化,-N代表有N-1个线程正在 进行扩容
    *当为0时:代表当时的table还没有被初始化
    *当为正数时:表示初始化或者下一次进行扩容的大小
    */
private transient volatile int sizeCtl;

重要概念:

  1. table: 默认为null,初始化发生在第一次插入操作,默认大小为16的数组,用来存储Node节点数据,扩容时大小总是2的幂次方。
  2. nextTable: 默认为null,扩容时新生成的数组,其大小为原数组的两倍
  3. Node :保存 key,value 及 key 的 hash 值的数据结构。
class Node<K,V> implements Map.Entry<K,V> {
    final int hash;
    final K key;
    volatile V val;
    volatile Node<K,V> next;
    //省略部分代码
}

其中 value 和 next 都用 volatile 修饰,保证并发的可见性。

  1. ForwardingNode: 一个特殊的 Node 节点,hash 值为 -1,其中存储 nextTable 的引用。
final class ForwardingNode<K,V> extends Node<K,V> {
    final Node<K,V>[] nextTable;
    ForwardingNode(Node<K,V>[] tab) {
        super(MOVED, null, null, null);
        this.nextTable = tab;
    }
}

只有table发生扩容的时候,ForwardingNode 才会发挥作用,作为一个占位符放在table中表示当前节点为 null 或则已经被移动。

  1. TreeNode类和TreeBin类:  TreeNode类表示的是红黑树上的每个节点。当一个链表上的节点数量超过了指定的值,会将这个链表变为红黑树,当然每个节点就转换为TreeNode。不像HashMap,ConcurrentHashMap在桶里面直接存储的不是TreeNode,而是一个TreeBin,在TreeBin内部维护一个红黑树,也就是说TreeNode在TreeBin内部使用的。

3.2.2 初始化

实例化 ConcurrentHashMap 时带参数时,会根据参数调整 table 的大小,假设参数为 100,最终会调整成 256,确保 table 的大小总是2的幂次方.

table 初始化

private final Node<K,V>[] initTable() {
    Node<K,V>[] tab; int sc;
    while ((tab = table) == null || tab.length == 0) {
        //如果一个线程发现sizeCtl<0,意味着另外的线程执行CAS操作成功,当前线程只需要让出cpu时间片
        if ((sc = sizeCtl) < 0) 
            Thread.yield(); // lost initialization race; just spin
        else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
            try {
                if ((tab = table) == null || tab.length == 0) {
                    int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                    @SuppressWarnings("unchecked")
                    Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                    table = tab = nt;
                    sc = n - (n >>> 2);
                }
            } finally {
                sizeCtl = sc;
            }
            break;
        }
    }
    return tab;
}

3.2.3 put 操作

假设 table 已经初始化完成,put 操作采用 CAS + synchronized 实现并发插入或更新操作。

final V putVal(K key, V value, boolean onlyIfAbsent) {
    if (key == null || value == null) throw new NullPointerException();
    int hash = spread(key.hashCode());
    int binCount = 0;
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
        if (tab == null || (n = tab.length) == 0)
            tab = initTable();
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null)))
                break;                   // no lock when adding to empty bin
        }
        else if ((fh = f.hash) == MOVED)
            tab = helpTransfer(tab, f);
        ...省略部分代码
    }
    addCount(1L, binCount);
    return null;
}

hash算法

static final int spread(int h) {
    return (h ^ (h >>> 16)) & HASH_BITS;
}

table 中定位索引位置,n 是 table 的大小

int index = (n - 1) & hash

获取 table 中对应索引的元素f

Unsafe.getObjectVolatile 可以直接获取指定内存的数据,保证了每次拿到数据都是最新的。

如果 f 为 null,说明 table 中这个位置第一次插入元素,利用Unsafe.compareAndSwapObject 方法插入 Node 节点。

如果 CAS 成功,说明 Node 节点已经插入,随后 addCount(1L, binCount) 方法会检查当前容量是否需要进行扩容。

如果 CAS 失败,说明有其它线程提前插入了节点,自旋重新尝试在这个位置插入节点。

如果f的 hash 值为 -1,说明当前 f 是 ForwardingNode 节点,意味有其它线程正在扩容,则一起进行扩容操作。

其余情况把新的 Node 节点按链表或红黑树的方式插入到合适的位置,这个过程采用同步内置锁实现并发,代码如下:

synchronized (f) {
    if (tabAt(tab, i) == f) {
        if (fh >= 0) {
            binCount = 1;
            for (Node<K,V> e = f;; ++binCount) {
                K ek;
                if (e.hash == hash &&
                    ((ek = e.key) == key ||
                     (ek != null && key.equals(ek)))) {
                    oldVal = e.val;
                    if (!onlyIfAbsent)
                        e.val = value;
                    break;
                }
                Node<K,V> pred = e;
                if ((e = e.next) == null) {
                    pred.next = new Node<K,V>(hash, key,
                                              value, null);
                    break;
                }
            }
        }
        else if (f instanceof TreeBin) {
            Node<K,V> p;
            binCount = 2;
            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                           value)) != null) {
                oldVal = p.val;
                if (!onlyIfAbsent)
                    p.val = value;
            }
        }
    }
}

在节点 f 上进行同步,节点插入之前,再次利用tabAt(tab, i) == f判断,防止被其它线程修改。

如果 f.hash >= 0,说明 f 是链表结构的头结点,遍历链表,如果找到对应的 node 节点,则修改 value,否则在链表尾部加入节点。 如果 f 是 TreeBin 类型节点,说明 f 是红黑树根节点,则在树结构上遍历元素,更新或增加节点。 如果链表中节点数 binCount >= TREEIFY_THRESHOLD(默认是8),则把链表转化为红黑树结构。

table扩容

当 table 容量不足的时候,即 table 的元素数量达到容量阈值 sizeCtl,需要对 table 进行扩容。

整个扩容分为两部分:

构建一个 nextTable,大小为 table 的两倍。 把 table 的数据复制到 nextTable 中。

这两个过程在单线程下实现很简单,但是 ConcurrentHashMap 是支持并发插入的,扩容操作自然也会有并发的出现,这种情况下,第二步可以支持节点的并发复制,这样性能自然提升不少,但实现的复杂度也上升了一个台阶。

先看第一步,构建nextTable,毫无疑问,这个过程只能只有单个线程进行 nextTable 的初始化,具体实现如下:

private final void addCount(long x, int check) {
    ... 省略部分代码
    if (check >= 0) {
        Node<K,V>[] tab, nt; int n, sc;
        while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
               (n = tab.length) < MAXIMUM_CAPACITY) {
            int rs = resizeStamp(n);
            if (sc < 0) {
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                    sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                    transferIndex <= 0)
                    break;
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                    transfer(tab, nt);
            }
            else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                         (rs << RESIZE_STAMP_SHIFT) + 2))
                transfer(tab, null);
            s = sumCount();
        }
    }
}

通过 Unsafe.compareAndSwapInt 修改 sizeCtl 值,保证只有一个线程能够初始化 nextTable,扩容后的数组长度为原来的两倍,但是容量是原来的 1.5。

节点从 table 移动到 nextTable,大体思想是遍历、复制的过程。

首先根据运算得到需要遍历的次数i,然后利用 tabAt 方法获得 i 位置的元素 f,初始化一个 forwardNode 实例 fwd。

如果 f == null,则在 table 中的 i 位置放入 fwd,这个过程是采用 Unsafe.compareAndSwapObjectf 方法实现的,很巧妙的实现了节点的并发移动。

如果 f 是链表的头节点,就构造一个反序链表,把他们分别放在 nextTable 的 i 和 i+n 的位置上,移动完成,采用 Unsafe.putObjectVolatile 方法给 table 原位置赋值 fwd。 如果 f 是 TreeBin 节点,也做一个反序处理,并判断是否需要 untreeify,把处理的结果分别放在 nextTable 的 i 和 i+n 的位置上,移动完成,同样采用 Unsafe.putObjectVolatile 方法给 table 原位置赋值 fwd。 遍历过所有的节点以后就完成了复制工作,把 table 指向 nextTable,并更新 sizeCtl 为新数组大小的 0.75 倍 ,扩容完成。

红黑树构造

注意:如果链表结构中元素超过 TREEIFY_THRESHOLD 阈值,默认为 8 个,则把链表转化为红黑树,提高遍历查询效率。

if (binCount != 0) {
    if (binCount >= TREEIFY_THRESHOLD)
        treeifyBin(tab, i);
    if (oldVal != null)
        return oldVal;
    break;
}

接下来我们看看如何构造树结构,代码如下:

private final void treeifyBin(Node<K,V>[] tab, int index) {
    Node<K,V> b; int n, sc;
    if (tab != null) {
        if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
            tryPresize(n << 1);
        else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
            synchronized (b) {
                if (tabAt(tab, index) == b) {
                    TreeNode<K,V> hd = null, tl = null;
                    for (Node<K,V> e = b; e != null; e = e.next) {
                        TreeNode<K,V> p =
                            new TreeNode<K,V>(e.hash, e.key, e.val,
                                              null, null);
                        if ((p.prev = tl) == null)
                            hd = p;
                        else
                            tl.next = p;
                        tl = p;
                    }
                    setTabAt(tab, index, new TreeBin<K,V>(hd));
                }
            }
        }
    }
}

可以看出,生成树节点的代码块是同步的,进入同步代码块之后,再次验证 table 中 index 位置元素是否被修改过。

  1. 根据 table 中 index 位置 Node 链表,重新生成一个 hd 为头结点的 TreeNode 链表。
  2. 根据 hd 头结点,生成 TreeBin 树结构,并把树结构的root节点写到 table 的 index 位置的内存中,具体实现如下:
TreeBin(TreeNode<K,V> b) {
    super(TREEBIN, null, null, null);
    this.first = b;
    TreeNode<K,V> r = null;
    for (TreeNode<K,V> x = b, next; x != null; x = next) {
        next = (TreeNode<K,V>)x.next;
        x.left = x.right = null;
        if (r == null) {
            x.parent = null;
            x.red = false;
            r = x;
        }
        else {
            K k = x.key;
            int h = x.hash;
            Class<?> kc = null;
            for (TreeNode<K,V> p = r;;) {
                int dir, ph;
                K pk = p.key;
                if ((ph = p.hash) > h)
                    dir = -1;
                else if (ph < h)
                    dir = 1;
                else if ((kc == null &&
                          (kc = comparableClassFor(k)) == null) ||
                         (dir = compareComparables(kc, k, pk)) == 0)
                    dir = tieBreakOrder(k, pk);
                    TreeNode<K,V> xp = p;
                if ((p = (dir <= 0) ? p.left : p.right) == null) {
                    x.parent = xp;
                    if (dir <= 0)
                        xp.left = x;
                    else
                        xp.right = x;
                    r = balanceInsertion(r, x);
                    break;
                }
            }
        }
    }
    this.root = r;
    assert checkInvariants(root);
}

主要根据 Node 节点的 hash 值大小构建二叉树。

3.2.4 get 操作

get操作和put操作相比,显得简单了许多。

public V get(Object key) {
    Node<K,V>[] tab; 
    Node<K,V> e, p;
    int n, eh; 
    K ek;
    int h = spread(key.hashCode());
    if ((tab = table) != null && (n = tab.length) > 0 &&
        (e = tabAt(tab, (n - 1) & h)) != null) {
        if ((eh = e.hash) == h) {
            if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                return e.val;
        }
        else if (eh < 0)
            return (p = e.find(h, key)) != null ? p.val : null;
        while ((e = e.next) != null) {
            if (e.hash == h &&
                ((ek = e.key) == key || (ek != null && key.equals(ek))))
                return e.val;
        }
    }
    return null;
}
  1. 判断table是否为空,如果为空,直接返回null。
  2. 计算key的hash值,并获取指定table中指定位置的Node节点,通过遍历链表或则树结构找到对应的节点,返回value值。

3.2.4 size 操作

JDK1.8 size 是通过对 baseCount 和 counterCell 进行 CAS 计算,最终通过 baseCount 和 遍历 CounterCell 数组得出 size。 具体参考:ConcurrentHashMap 的size方法原理分析

4. JDK 1.8 中为什么要摒弃分段锁

很多人不明白为什么Doug Lea在JDK1.8为什么要做这么大变动,使用重级锁synchronized,性能反而更高,原因如下:

  1. jdk1.8中锁的粒度更细了。jdk1.7中ConcurrentHashMap 的concurrentLevel(并发数)基本上是固定的。jdk1.8中的concurrentLevel是和数组大小保持一致的,每次扩容,并发度扩大一倍.
  2. 红黑树的引入,对链表的优化使得 hash 冲突时的 put 和 get 效率更高
  3. 获得JVM的支持 ,ReentrantLock 毕竟是 API 这个级别的,后续的性能优化空间很小。 synchronized 则是 JVM 直接支持的, JVM 能够在运行时作出相应的优化措施:锁粗化、锁消除、锁自旋等等。这就使得 synchronized 能够随着 JDK 版本的升级而不改动代码的前提下获得性能上的提升。

5. 小结&参考资料

小结

可以看出 JDK1.8 版本的 ConcurrentHashMap 的数据结构已经接近 HashMap,相对而言,ConcurrentHashMap 只是增加了同步的操作来控制并发,从 JDK1.7 版本的 ReentrantLock+Segment+HashEntry,到 JDK1.8 版本中synchronized+CAS+HashEntry+红黑树,优化确实很大。

参考资料

作者:米奇罗

来源:https://juejin.im/post/5df0f172e51d45583a66b756


版权声明:文末如注明作者和来源,则表示本文系转载,版权为原作者所有 | 本文如有侵权,请及时联系,承诺在收到消息后第一时间删除 | 如转载本文,请注明原文链接。
喜欢 (0)