Home > Java > javaTutorial > How to use ConcurrentHashMap to implement thread-safe mapping in Java?

How to use ConcurrentHashMap to implement thread-safe mapping in Java?

PHPz
Release: 2023-05-10 10:25:12
forward
904 people have browsed it

jdk1.7 version

Data structure

    /**
     * The segments, each of which is a specialized hash table.
     */
    final Segment<K,V>[] segments;
Copy after login

You can see that it is mainly a Segment array, with comments also written, each of which is a special hash table.

Let’s take a look at what Segment is.

static final class Segment<K,V> extends ReentrantLock implements Serializable {
    	......
            /**
         * The per-segment table. Elements are accessed via
         * entryAt/setEntryAt providing volatile semantics.
         */
        transient volatile HashEntry<K,V>[] table;
        transient int threshold;
        final float loadFactor;
    	// 构造函数
        Segment(float lf, int threshold, HashEntry<K,V>[] tab) {
            this.loadFactor = lf;
            this.threshold = threshold;
            this.table = tab;
        }
  		......
    }
Copy after login

The above is part of the code. You can see that Segment inherits ReentrantLock, so in fact, each Segment is a lock.

The HashEntry array is stored in it, and the variable is modified with volatile. HashEntry is similar to the node of hashmap and is also a node of a linked list.

Let's take a look at the specific code. You can see that it is slightly different from hashmap in that its member variables are modified with volatile.

    static final class HashEntry<K,V> {
        final int hash;
        final K key;
        volatile V value;
        volatile HashEntry<K,V> next;
        HashEntry(int hash, K key, V value, HashEntry<K,V> next) {
            this.hash = hash;
            this.key = key;
            this.value = value;
            this.next = next;
        }
        ......
    }
Copy after login

So the data structure of ConcurrentHashMap is almost like the figure below.

How to use ConcurrentHashMap to implement thread-safe mapping in Java?

During construction, the number of Segments is determined by the so-called concurrentcyLevel. The default is 16. It can also be specified directly in the corresponding constructor. Note that Java requires it to be a power of 2 value. If the input is a non-power value like 15, it will be automatically adjusted to a power of 2 value like 16.

Let’s take a look at the source code, starting with the simple get method

get()

    public V get(Object key) {
        Segment<K,V> s; // manually integrate access methods to reduce overhead
        HashEntry<K,V>[] tab;
        int h = hash(key);
        long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
        // 通过unsafe获取Segment数组的元素
        if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
            (tab = s.table) != null) {
            // 还是通过unsafe获取HashEntry数组的元素
            for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
                     (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
                 e != null; e = e.next) {
                K k;
                if ((k = e.key) == key || (e.hash == h && key.equals(k)))
                    return e.value;
            }
        }
        return null;
    }
Copy after login

The logic of get is very simple, that is, find the HashEntry array corresponding to the subscript of the Segment, and then Find the linked list header corresponding to the subscript of the HashEntry array, and then traverse the linked list to obtain the data.

To obtain the data in the array, use UNSAFE.getObjectVolatile(segments, u). Unsafe provides the ability to directly access memory like the c language. This method can obtain the data of the corresponding offset of the object. u is a calculated offset, so it is equivalent to segments[i], but more efficient.

put()

    public V put(K key, V value) {
        Segment<K,V> s;
        if (value == null)
            throw new NullPointerException();
        int hash = hash(key);
        int j = (hash >>> segmentShift) & segmentMask;
        if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
             (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
            s = ensureSegment(j);
        return s.put(key, hash, value, false);
    }
Copy after login

For the put operation, the corresponding Segment is directly obtained through the Unsafe calling method, and then the thread-safe put operation is performed:

The main logic is The put method inside Segment

final V put(K key, int hash, V value, boolean onlyIfAbsent) {
            // scanAndLockForPut会去查找是否有key相同Node
            // 无论如何,确保获取锁
            HashEntry<K,V> node = tryLock() ? null :
                scanAndLockForPut(key, hash, value);
            V oldValue;
            try {
                HashEntry<K,V>[] tab = table;
                int index = (tab.length - 1) & hash;
                HashEntry<K,V> first = entryAt(tab, index);
                for (HashEntry<K,V> e = first;;) {
                    if (e != null) {
                        K k;
                        // 更新已有value...
                    }
                    else {
                        // 放置HashEntry到特定位置,如果超过阈值,进行rehash
                        // ...
                    }
                }
            } finally {
                unlock();
            }
            return oldValue;
        }
Copy after login

size()

Let’s take a look at the main code.

for (;;) {
    // 如果重试次数等于默认的2,就锁住所有的segment,来计算值
    if (retries++ == RETRIES_BEFORE_LOCK) {
        for (int j = 0; j < segments.length; ++j)
            ensureSegment(j).lock(); // force creation
    }
    sum = 0L;
    size = 0;
    overflow = false;
    for (int j = 0; j < segments.length; ++j) {
        Segment<K,V> seg = segmentAt(segments, j);
        if (seg != null) {
            sum += seg.modCount;
            int c = seg.count;
            if (c < 0 || (size += c) < 0)
                overflow = true;
        }
    }
    // 如果sum不再变化,就表示得到了一个确切的值
    if (sum == last)
        break;
    last = sum;
}
Copy after login

This is actually to calculate the sum of the numbers of all segments. If the sum keeps up If the obtained values ​​are equal, it means that the map has not been operated. This value is relatively correct. If you still cannot get a unified value after retrying twice, lock all segments and get the value again.

Expansion

private void rehash(HashEntry<K,V> node) {
            HashEntry<K,V>[] oldTable = table;
            int oldCapacity = oldTable.length;
    		// 新表的大小是原来的两倍
            int newCapacity = oldCapacity << 1;
            threshold = (int)(newCapacity * loadFactor);
            HashEntry<K,V>[] newTable =
                (HashEntry<K,V>[]) new HashEntry[newCapacity];
            int sizeMask = newCapacity - 1;
            for (int i = 0; i < oldCapacity ; i++) {
                HashEntry<K,V> e = oldTable[i];
                if (e != null) {
                    HashEntry<K,V> next = e.next;
                    int idx = e.hash & sizeMask;
                    if (next == null)   //  Single node on list
                        newTable[idx] = e;
                    else { // Reuse consecutive sequence at same slot
                        // 如果有多个节点
                        HashEntry<K,V> lastRun = e;
                        int lastIdx = idx;
                        // 这里操作就是找到末尾的一段索引值都相同的链表节点,这段的头结点是lastRun.
                        for (HashEntry<K,V> last = next;
                             last != null;
                             last = last.next) {
                            int k = last.hash & sizeMask;
                            if (k != lastIdx) {
                                lastIdx = k;
                                lastRun = last;
                            }
                        }
                        // 然后将lastRun结点赋值给数组位置,这样lastRun后面的节点也跟着过去了。
                        newTable[lastIdx] = lastRun;
                        // 之后就是复制开头到lastRun之间的节点
                        // Clone remaining nodes
                        for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
                            V v = p.value;
                            int h = p.hash;
                            int k = h & sizeMask;
                            HashEntry<K,V> n = newTable[k];
                            newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
                        }
                    }
                }
            }
            int nodeIndex = node.hash & sizeMask; // add the new node
            node.setNext(newTable[nodeIndex]);
            newTable[nodeIndex] = node;
            table = newTable;
        }
Copy after login

jdk1.8 version

Data structure

The 1.8 version of ConcurrentHashmap is somewhat similar to Hashmap as a whole, but the segment is removed. Is an array using node.

transient volatile Node<K,V>[] table;
Copy after login

There is still an internal class called Segment in 1.8, but its existence is only for serialization compatibility and is no longer used.

Let’s take a look at the node node

    static class Node<K,V> implements Map.Entry<K,V> {
        final int hash;
        final K key;
        volatile V val;
        volatile Node<K,V> next;
        Node(int hash, K key, V val, Node<K,V> next) {
            this.hash = hash;
            this.key = key;
            this.val = val;
            this.next = next;
        }
        ......
    }
Copy after login

It is similar to the node node in HashMap, and also implements Map.Entry. The difference is that val and next are modified with volatile to ensure visibility.

put()

    final V putVal(K key, V value, boolean onlyIfAbsent) {
        if (key == null || value == null) throw new NullPointerException();
        int hash = spread(key.hashCode());
        int binCount = 0;
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            if (tab == null || (n = tab.length) == 0)
                // 初始化
                tab = initTable();
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                // 利用CAS去进行无锁线程安全操作,如果bin是空的
                if (casTabAt(tab, i, null,
                             new Node<K,V>(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin
            }
            else if ((fh = f.hash) == MOVED)
                tab = helpTransfer(tab, f);
            else {
                V oldVal = null;
                synchronized (f) {
                     // 细粒度的同步修改操作... 
                    if (tabAt(tab, i) == f) {
                        if (fh >= 0) {
                            binCount = 1;
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                                // 找到相同key就更新
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node<K,V> pred = e;
                                // 没有相同的就新增
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K,V>(hash, key,
                                                              value, null);
                                    break;
                                }
                            }
                        }
                        // 如果是树节点,进行树的操作
                        else if (f instanceof TreeBin) {
                            Node<K,V> p;
                            binCount = 2;
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
                // Bin超过阈值,进行树化
                if (binCount != 0) {
                    if (binCount >= TREEIFY_THRESHOLD)
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        addCount(1L, binCount);
        return null;
    }
Copy after login

You can see that in the synchronization logic, it uses synchronized instead of the usually recommended ReentrantLock and the like. Why is this? Now in JDK1.8, synchronized has been continuously optimized, so you no longer have to worry too much about performance differences. In addition, compared to ReentrantLock, it can reduce memory consumption, which is a very big advantage.

At the same time, more detailed implementations have been optimized by using Unsafe. For example, tabAt directly uses getObjectAcquire to avoid the overhead of indirect calls.

So, let’s take a look at how size works?

    final long sumCount() {
        CounterCell[] as = counterCells; CounterCell a;
        long sum = baseCount;
        if (as != null) {
            for (int i = 0; i < as.length; ++i) {
                if ((a = as[i]) != null)
                    sum += a.value;
            }
        }
        return sum;
    }
Copy after login

Here is to get the member variable counterCells and traverse to get the total number.

In fact, the operation of CounterCell is based on java.util.concurrent.atomic.LongAdder. It is a method for JVM to use space in exchange for higher efficiency, taking advantage of the complex logic inside Striped64. This thing is very niche. In most cases, it is recommended to use AtomicLong, which is enough to meet the performance needs of most applications.

Expansion

 private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
		......
        // 初始化
        if (nextTab == null) {            // initiating
            try {
                @SuppressWarnings("unchecked")
                Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
                nextTab = nt;
            } catch (Throwable ex) {      // try to cope with OOME
                sizeCtl = Integer.MAX_VALUE;
                return;
            }
            nextTable = nextTab;
            transferIndex = n;
        }
        int nextn = nextTab.length;
        ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
     	// 是否继续处理下一个
        boolean advance = true;
     	// 是否完成
        boolean finishing = false; // to ensure sweep before committing nextTab
        for (int i = 0, bound = 0;;) {
            Node<K,V> f; int fh;
            while (advance) {
                int nextIndex, nextBound;
                if (--i >= bound || finishing)
                    advance = false;
                else if ((nextIndex = transferIndex) <= 0) {
                    i = -1;
                    advance = false;
                }
                // 首次循环才会进来这里
                else if (U.compareAndSwapInt
                         (this, TRANSFERINDEX, nextIndex,
                          nextBound = (nextIndex > stride ?
                                       nextIndex - stride : 0))) {
                    bound = nextBound;
                    i = nextIndex - 1;
                    advance = false;
                }
            }
            if (i < 0 || i >= n || i + n >= nextn) {
                int sc;
                //扩容结束后做后续工作
                if (finishing) {
                    nextTable = null;
                    table = nextTab;
                    sizeCtl = (n << 1) - (n >>> 1);
                    return;
                }
                //每当一条线程扩容结束就会更新一次 sizeCtl 的值,进行减 1 操作
                if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                    if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                        return;
                    finishing = advance = true;
                    i = n; // recheck before commit
                }
            }
            // 如果是null,设置fwd
            else if ((f = tabAt(tab, i)) == null)
                advance = casTabAt(tab, i, null, fwd);
            // 说明该位置已经被处理过了,不需要再处理
            else if ((fh = f.hash) == MOVED)
                advance = true; // already processed
            else {
                // 真正的处理逻辑
                synchronized (f) {
                    if (tabAt(tab, i) == f) {
                        Node<K,V> ln, hn;
                        if (fh >= 0) {
                            int runBit = fh & n;
                            Node<K,V> lastRun = f;
                            for (Node<K,V> p = f.next; p != null; p = p.next) {
                                int b = p.hash & n;
                                if (b != runBit) {
                                    runBit = b;
                                    lastRun = p;
                                }
                            }
                            if (runBit == 0) {
                                ln = lastRun;
                                hn = null;
                            }
                            else {
                                hn = lastRun;
                                ln = null;
                            }
                            for (Node<K,V> p = f; p != lastRun; p = p.next) {
                                int ph = p.hash; K pk = p.key; V pv = p.val;
                                if ((ph & n) == 0)
                                    ln = new Node<K,V>(ph, pk, pv, ln);
                                else
                                    hn = new Node<K,V>(ph, pk, pv, hn);
                            }
                            setTabAt(nextTab, i, ln);
                            setTabAt(nextTab, i + n, hn);
                            setTabAt(tab, i, fwd);
                            advance = true;
                        }
                        // 树节点操作
                        else if (f instanceof TreeBin) {
                            ......
                        }
                    }
                }
            }
        }
    }
Copy after login
     }
                        setTabAt(nextTab, i, ln);
                        setTabAt(nextTab, i + n, hn);
                        setTabAt(tab, i, fwd);
                        advance = true;
                    }
                    // 树节点操作
                    else if (f instanceof TreeBin) {
                        ......
                    }
                }
            }
        }
    }
}
Copy after login

The core logic is the same as HashMap in creating two linked lists, but with the addition of the operation of obtaining lastRun.

The above is the detailed content of How to use ConcurrentHashMap to implement thread-safe mapping in Java?. For more information, please follow other related articles on the PHP Chinese website!

Related labels:
source:yisu.com
Statement of this Website
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn
Popular Tutorials
More>
Latest Downloads
More>
Web Effects
Website Source Code
Website Materials
Front End Template