HashMap以及ConcurrentHashMap

全栈开发工程师 2018年05月01日 31次浏览

HashMap实现

JAVA7实现

在java7中,HashMap存在一个Entry数组

thum-aced1533778214

transient Entry<K,V>[] table; 

Entry的实现其实就是链表的实现,除了key、value还有下一个元素的引用

static class Entry<K,V> implements Map.Entry<K,V> { 

        final K key; 

        V value; 

        Entry<K,V> next; 

        int hash; 

这个Entry应该放在数组的哪一个位置上(这个位置通常称为位桶或者hash桶,即hash值相同的Entry会放在同一位置,用链表相连),是通过key的hashCode来计算的

final int hash(Object k) { 

        int h = 0; 

        h ^= k.hashCode(); 

        h ^= (h >>> 20) ^ (h >>> 12); 

        return h ^ (h >>> 7) ^ (h >>> 4); 

 } 

通过hash计算出来的值将会使用indexFor方法找到它应该所在的table下标:

static int indexFor(int h, int length) { 

        return h & (length-1); 

    } 

这个方法其实相当于对table.length取模。

当两个key通过hashCode计算相同时,则发生了hash冲突(碰撞),HashMap解决hash冲突的方式是用链表。

当发生hash冲突时,则将存放在数组中的Entry设置为新值的next(这里要注意的是,比如A和B都hash后都映射到下标i中,之前已经有A了,当map.put(B)时,将B放到下标i中,A则为B的next,所以新值存放在数组中,旧值在新值的链表上)

Java8实现

一直到JDK7为止,HashMap的结构都是这么简单,基于一个数组以及多个链表的实现,hash值冲突的时候,就将对应节点以链表的形式存储。

这样子的HashMap性能上就抱有一定疑问,如果说成百上千个节点在hash时发生碰撞,存储一个链表中,那么如果要查找其中一个节点,那就不可避免的花费O(N)的查找时间,这将是多么大的性能损失。这个问题终于在JDK8中得到了解决。再最坏的情况下,链表查找的时间复杂度为O(n),而红黑树一直是O(logn),这样会提高HashMap的效率。

JDK7中HashMap采用的是位桶+链表的方式,即我们常说的散列链表的方式,而JDK8中采用的是位桶+链表/红黑树的方式,也是非线程安全的。当某个位桶的链表的长度达到某个阀值的时候,这个链表就将转换成红黑树

thum-a71f1533778225

JDK中Entry的名字变成了Node,原因是和红黑树的实现TreeNode相关联。

transient Node<K,V>[] table; 

当冲突节点数不小于8-1时,转换成红黑树。

static final int TREEIFY_THRESHOLD = 8; 

以put方法在JDK8中有了很大的改变

public V put(K key, V value) { 

        return putVal(hash(key), key, value, false, true); 

 } 

  

final V putVal(int hash, K key, V value, boolean onlyIfAbsent, 

                   boolean evict) { 

        Node<K,V>[] tab; 

    Node<K,V> p;  

    int n, i; 

    //如果当前map中无数据,执行resize方法。并且返回n 

        if ((tab = table) == null || (n = tab.length) == 0) 

            n = (tab = resize()).length; 

     //如果要插入的键值对要存放的这个位置刚好没有元素,那么把他封装成Node对象,放在这个位置上就完事了 

        if ((p = tab[i = (n - 1) & hash]) == null) 

            tab[i] = newNode(hash, key, value, null); 

    //否则的话,说明这上面有元素 

        else { 

            Node<K,V> e; K k; 

        //如果这个元素的key与要插入的一样,那么就替换一下,也完事。 

            if (p.hash == hash && 

                ((k = p.key) == key || (key != null && key.equals(k)))) 

                e = p; 

        //1.如果当前节点是TreeNode类型的数据,执行putTreeVal方法 

            else if (p instanceof TreeNode) 

                e = ((TreeNode<K,V>)p).putTreeVal(this, tab, hash, key, value); 

            else { 

        //还是遍历这条链子上的数据,跟jdk7没什么区别 

                for (int binCount = 0; ; ++binCount) { 

                    if ((e = p.next) == null) { 

                        p.next = newNode(hash, key, value, null); 

            //2.完成了操作后多做了一件事情,判断,并且可能执行treeifyBin方法 

                        if (binCount >= TREEIFY_THRESHOLD - 1) // -1 for 1st 

                            treeifyBin(tab, hash); 

                        break; 

                    } 

                    if (e.hash == hash && 

                        ((k = e.key) == key || (key != null && key.equals(k)))) 

                        break; 

                    p = e; 

                } 

            } 

            if (e != null) { // existing mapping for key 

                V oldValue = e.value; 

                if (!onlyIfAbsent || oldValue == null) //true || -- 

                    e.value = value; 

           //3. 

                afterNodeAccess(e); 

                return oldValue; 

            } 

        } 

        ++modCount; 

    //判断阈值,决定是否扩容 

        if (++size > threshold) 

            resize(); 

        //4. 

        afterNodeInsertion(evict); 

        return null; 

    } 

treeifyBin()就是将链表转换成红黑树。

之前的indefFor()方法消失 了,直接用(tab.length-1)&hash,所以看到这个,代表的就是数组的下角标。

static final int hash(Object key) { 

        int h; 

        return (key == null) ? 0 : (h = key.hashCode()) ^ (h >>> 16); 

  } 

ConcurrentHashMap实现

JAVA7实现

数据结构

jdk1.7中采用Segment + HashEntry的方式进行实现,结构如下:

thum-9d0d1533778246

ConcurrentHashMap初始化时,计算出Segment数组的大小ssize和每个Segment中HashEntry数组的大小cap,并初始化Segment数组的第一个元素;其中ssize大小为2的幂次方,默认为16,cap大小也是2的幂次方,最小值为2,最终结果根据根据初始化容量initialCapacity进行计算,计算过程如下:

if (c * ssize < initialCapacity) 

    ++c; 

int cap = MIN_SEGMENT_TABLE_CAPACITY; 

while (cap < c) 

    cap <<= 1; 

其中Segment在实现上继承了ReentrantLock,这样就自带了锁的功能。

put实现

当执行put方法插入数据时,根据key的hash值,在Segment数组中找到相应的位置,如果相应位置的Segment还未初始化,则通过CAS进行赋值,接着执行Segment对象的put方法通过加锁机制插入数据,实现如下:

场景:线程A和线程B同时执行相同Segment对象的put方法

  1. 线程A执行tryLock()方法成功获取锁,则把HashEntry对象插入到相应的位置;

  2. 线程B获取锁失败,则执行scanAndLockForPut()方法,在scanAndLockForPut方法中,会通过重复执行tryLock()方法尝试获取锁,在多处理器环境下,重复次数为64,单处理器重复次数为1,当执行tryLock()方法的次数超过上限时,则执行lock()方法挂起线程B;

  3. 当线程A执行完插入操作时,会通过unlock()方法释放锁,接着唤醒线程B继续执行;

size实现

因为ConcurrentHashMap是可以并发插入数据的,所以在准确计算元素时存在一定的难度,一般的思路是统计每个Segment对象中的元素个数,然后进行累加,但是这种方式计算出来的结果并不一样的准确的,因为在计算后面几个Segment的元素个数时,已经计算过的Segment同时可能有数据的插入或则删除,在1.7的实现中,采用了如下方式:

try { 

    for (;;) { 

        if (retries++ == RETRIES_BEFORE_LOCK) { 

            for (int j = 0; j < segments.length; ++j) 

                ensureSegment(j).lock(); // force creation 

        } 

        sum = 0L; 

        size = 0; 

        overflow = false; 

        for (int j = 0; j < segments.length; ++j) { 

            Segment<K,V> seg = segmentAt(segments, j); 

            if (seg != null) { 

                sum += seg.modCount; 

                int c = seg.count; 

                if (c < 0 || (size += c) < 0) 

                    overflow = true; 

            } 

        } 

        if (sum == last) 

            break; 

        last = sum; 

    } 

} finally { 

    if (retries > RETRIES_BEFORE_LOCK) { 

        for (int j = 0; j < segments.length; ++j) 

            segmentAt(segments, j).unlock(); 

    } 

} 

先采用不加锁的方式,连续计算元素的个数,最多计算3次:

  1. 如果前后两次计算结果相同,则说明计算出来的元素个数是准确的;
  2. 如果前后两次计算结果都不同,则给每个Segment进行加锁,再计算一次元素的个数;

JAVA8实现

数据结构

1.8中放弃了Segment臃肿的设计,取而代之的是采用Node + CAS + Synchronized来保证并发安全进行实现,结构如下:

thum-439b1533778234

只有在执行第一次put方法时才会调用initTable()初始化Node数组,实现如下:

private final Node<K,V>[] initTable() { 

    Node<K,V>[] tab; int sc; 

    while ((tab = table) == null || tab.length == 0) { 

        if ((sc = sizeCtl) < 0) 

            Thread.yield(); // lost initialization race; just spin 

        else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { 

            try { 

                if ((tab = table) == null || tab.length == 0) { 

                    int n = (sc > 0) ? sc : DEFAULT_CAPACITY; 

                    @SuppressWarnings("unchecked") 

                    Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; 

                    table = tab = nt; 

                    sc = n - (n >>> 2); 

                } 

            } finally { 

                sizeCtl = sc; 

            } 

            break; 

        } 

    } 

    return tab; 

} 

put实现

当执行put方法插入数据时,根据key的hash值,在Node数组中找到相应的位置,实现如下:

  1. 如果相应位置的Node还未初始化,则通过CAS插入相应的数据;
    else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { 

	    if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) 

	        break;                   // no lock when adding to empty bin 

    } 

  1. 如果相应位置的Node不为空,且当前该节点不处于移动状态,则对该节点加synchronized锁,如果该节点的hash不小于0,则遍历链表更新节点或插入新节点;
    if (fh >= 0) { 

	    binCount = 1; 

	    for (Node<K,V> e = f;; ++binCount) { 

	        K ek; 

	        if (e.hash == hash && 

	            ((ek = e.key) == key || 

	             (ek != null && key.equals(ek)))) { 

	            oldVal = e.val; 

	            if (!onlyIfAbsent) 

	                e.val = value; 

	            break; 

	        } 

	        Node<K,V> pred = e; 

	        if ((e = e.next) == null) { 

	            pred.next = new Node<K,V>(hash, key, value, null); 

	            break; 

	        } 

	    } 

	} 


  1. 如果该节点是TreeBin类型的节点,说明是红黑树结构,则通过putTreeVal方法往红黑树中插入节点;
	else if (f instanceof TreeBin) { 

	    Node<K,V> p; 

	    binCount = 2; 

	    if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) { 

	        oldVal = p.val; 

	        if (!onlyIfAbsent) 

	            p.val = value; 

	    } 

	} 


  1. 如果binCount不为0,说明put操作对数据产生了影响,如果当前链表的个数达到8个,则通过treeifyBin方法转化为红黑树,如果oldVal不为空,说明是一次更新操作,没有对元素个数产生影响,则直接返回旧值;
	if (binCount != 0) { 

	    if (binCount >= TREEIFY_THRESHOLD) 

	        treeifyBin(tab, i); 

	    if (oldVal != null) 

	        return oldVal; 

	    break; 

	} 

  1. 如果插入的是一个新节点,则执行addCount()方法尝试更新元素个数

size实现

1.8中使用一个volatile类型的变量baseCount记录元素的个数,当插入新数据或则删除数据时,会通过addCount()方法更新baseCount,实现如下:

	if ((as = counterCells) != null || 

	    !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) { 

	    CounterCell a; long v; int m; 

	    boolean uncontended = true; 

	    if (as == null || (m = as.length - 1) < 0 || 

	        (a = as[ThreadLocalRandom.getProbe() & m]) == null || 

	        !(uncontended = 

	          U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) { 

	        fullAddCount(x, uncontended); 

	        return; 

	    } 

	    if (check <= 1) 

	        return; 

	    s = sumCount(); 

	} 
  1. 初始化时counterCells为空,在并发量很高时,如果存在两个线程同时执行CAS修改baseCount值,则失败的线程会继续执行方法体中的逻辑,使用CounterCell记录元素个数的变化;

  2. 如果CounterCell数组counterCells为空,调用fullAddCount()方法进行初始化,并插入对应的记录数,通过CAS设置cellsBusy字段,只有设置成功的线程才能初始化CounterCell数组,实现如下:

	else if (cellsBusy == 0 && counterCells == as && 

	         U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) { 

	    boolean init = false; 

	    try {                           // Initialize table 

	        if (counterCells == as) { 

	            CounterCell[] rs = new CounterCell[2]; 

	            rs[h & 1] = new CounterCell(x); 

	            counterCells = rs; 

	            init = true; 

	        } 

	    } finally { 

	        cellsBusy = 0; 

	    } 

	    if (init) 

	        break; 

	} 


  1. 如果通过CAS设置cellsBusy字段失败的话,则继续尝试通过CAS修改baseCount字段,如果修改baseCount字段成功的话,就退出循环,否则继续循环插入CounterCell对象;
	else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))

	    break;

所以在1.8中的size实现比1.7简单多,因为元素个数保存baseCount中,部分元素的变化个数保存在CounterCell数组中,实现如下:

	public int size() { 

	    long n = sumCount(); 

	    return ((n < 0L) ? 0 : 

	            (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE : 

	            (int)n); 

	} 

	  

	final long sumCount() { 

	    CounterCell[] as = counterCells; CounterCell a; 

	    long sum = baseCount; 

	    if (as != null) { 

	        for (int i = 0; i < as.length; ++i) { 

	            if ((a = as[i]) != null) 

	                sum += a.value; 

	        } 

	    } 

	    return sum; 

	} 


通过累加baseCount和CounterCell数组中的数量,即可得到元素的总个数;