Table of Contents
jdk1.8 version
Data structure
get()
put()
size()
Expansion
Home Java javaTutorial How to use ConcurrentHashMap to implement thread-safe mapping in Java?

How to use ConcurrentHashMap to implement thread-safe mapping in Java?

May 10, 2023 am 10:25 AM
java map concurrenthashmap

jdk1.7 version

Data structure

    /**
     * The segments, each of which is a specialized hash table.
     */
    final Segment<K,V>[] segments;
Copy after login

You can see that it is mainly a Segment array, with comments also written, each of which is a special hash table.

Let’s take a look at what Segment is.

static final class Segment<K,V> extends ReentrantLock implements Serializable {
    	......
            /**
         * The per-segment table. Elements are accessed via
         * entryAt/setEntryAt providing volatile semantics.
         */
        transient volatile HashEntry<K,V>[] table;
        transient int threshold;
        final float loadFactor;
    	// 构造函数
        Segment(float lf, int threshold, HashEntry<K,V>[] tab) {
            this.loadFactor = lf;
            this.threshold = threshold;
            this.table = tab;
        }
  		......
    }
Copy after login

The above is part of the code. You can see that Segment inherits ReentrantLock, so in fact, each Segment is a lock.

The HashEntry array is stored in it, and the variable is modified with volatile. HashEntry is similar to the node of hashmap and is also a node of a linked list.

Let's take a look at the specific code. You can see that it is slightly different from hashmap in that its member variables are modified with volatile.

    static final class HashEntry<K,V> {
        final int hash;
        final K key;
        volatile V value;
        volatile HashEntry<K,V> next;
        HashEntry(int hash, K key, V value, HashEntry<K,V> next) {
            this.hash = hash;
            this.key = key;
            this.value = value;
            this.next = next;
        }
        ......
    }
Copy after login

So the data structure of ConcurrentHashMap is almost like the figure below.

How to use ConcurrentHashMap to implement thread-safe mapping in Java?

During construction, the number of Segments is determined by the so-called concurrentcyLevel. The default is 16. It can also be specified directly in the corresponding constructor. Note that Java requires it to be a power of 2 value. If the input is a non-power value like 15, it will be automatically adjusted to a power of 2 value like 16.

Let’s take a look at the source code, starting with the simple get method

get()

    public V get(Object key) {
        Segment<K,V> s; // manually integrate access methods to reduce overhead
        HashEntry<K,V>[] tab;
        int h = hash(key);
        long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
        // 通过unsafe获取Segment数组的元素
        if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
            (tab = s.table) != null) {
            // 还是通过unsafe获取HashEntry数组的元素
            for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
                     (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
                 e != null; e = e.next) {
                K k;
                if ((k = e.key) == key || (e.hash == h && key.equals(k)))
                    return e.value;
            }
        }
        return null;
    }
Copy after login

The logic of get is very simple, that is, find the HashEntry array corresponding to the subscript of the Segment, and then Find the linked list header corresponding to the subscript of the HashEntry array, and then traverse the linked list to obtain the data.

To obtain the data in the array, use UNSAFE.getObjectVolatile(segments, u). Unsafe provides the ability to directly access memory like the c language. This method can obtain the data of the corresponding offset of the object. u is a calculated offset, so it is equivalent to segments[i], but more efficient.

put()

    public V put(K key, V value) {
        Segment<K,V> s;
        if (value == null)
            throw new NullPointerException();
        int hash = hash(key);
        int j = (hash >>> segmentShift) & segmentMask;
        if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
             (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
            s = ensureSegment(j);
        return s.put(key, hash, value, false);
    }
Copy after login

For the put operation, the corresponding Segment is directly obtained through the Unsafe calling method, and then the thread-safe put operation is performed:

The main logic is The put method inside Segment

final V put(K key, int hash, V value, boolean onlyIfAbsent) {
            // scanAndLockForPut会去查找是否有key相同Node
            // 无论如何,确保获取锁
            HashEntry<K,V> node = tryLock() ? null :
                scanAndLockForPut(key, hash, value);
            V oldValue;
            try {
                HashEntry<K,V>[] tab = table;
                int index = (tab.length - 1) & hash;
                HashEntry<K,V> first = entryAt(tab, index);
                for (HashEntry<K,V> e = first;;) {
                    if (e != null) {
                        K k;
                        // 更新已有value...
                    }
                    else {
                        // 放置HashEntry到特定位置,如果超过阈值,进行rehash
                        // ...
                    }
                }
            } finally {
                unlock();
            }
            return oldValue;
        }
Copy after login

size()

Let’s take a look at the main code.

for (;;) {
    // 如果重试次数等于默认的2,就锁住所有的segment,来计算值
    if (retries++ == RETRIES_BEFORE_LOCK) {
        for (int j = 0; j < segments.length; ++j)
            ensureSegment(j).lock(); // force creation
    }
    sum = 0L;
    size = 0;
    overflow = false;
    for (int j = 0; j < segments.length; ++j) {
        Segment<K,V> seg = segmentAt(segments, j);
        if (seg != null) {
            sum += seg.modCount;
            int c = seg.count;
            if (c < 0 || (size += c) < 0)
                overflow = true;
        }
    }
    // 如果sum不再变化,就表示得到了一个确切的值
    if (sum == last)
        break;
    last = sum;
}
Copy after login

This is actually to calculate the sum of the numbers of all segments. If the sum keeps up If the obtained values ​​are equal, it means that the map has not been operated. This value is relatively correct. If you still cannot get a unified value after retrying twice, lock all segments and get the value again.

Expansion

private void rehash(HashEntry<K,V> node) {
            HashEntry<K,V>[] oldTable = table;
            int oldCapacity = oldTable.length;
    		// 新表的大小是原来的两倍
            int newCapacity = oldCapacity << 1;
            threshold = (int)(newCapacity * loadFactor);
            HashEntry<K,V>[] newTable =
                (HashEntry<K,V>[]) new HashEntry[newCapacity];
            int sizeMask = newCapacity - 1;
            for (int i = 0; i < oldCapacity ; i++) {
                HashEntry<K,V> e = oldTable[i];
                if (e != null) {
                    HashEntry<K,V> next = e.next;
                    int idx = e.hash & sizeMask;
                    if (next == null)   //  Single node on list
                        newTable[idx] = e;
                    else { // Reuse consecutive sequence at same slot
                        // 如果有多个节点
                        HashEntry<K,V> lastRun = e;
                        int lastIdx = idx;
                        // 这里操作就是找到末尾的一段索引值都相同的链表节点,这段的头结点是lastRun.
                        for (HashEntry<K,V> last = next;
                             last != null;
                             last = last.next) {
                            int k = last.hash & sizeMask;
                            if (k != lastIdx) {
                                lastIdx = k;
                                lastRun = last;
                            }
                        }
                        // 然后将lastRun结点赋值给数组位置,这样lastRun后面的节点也跟着过去了。
                        newTable[lastIdx] = lastRun;
                        // 之后就是复制开头到lastRun之间的节点
                        // Clone remaining nodes
                        for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
                            V v = p.value;
                            int h = p.hash;
                            int k = h & sizeMask;
                            HashEntry<K,V> n = newTable[k];
                            newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
                        }
                    }
                }
            }
            int nodeIndex = node.hash & sizeMask; // add the new node
            node.setNext(newTable[nodeIndex]);
            newTable[nodeIndex] = node;
            table = newTable;
        }
Copy after login

jdk1.8 version

Data structure

The 1.8 version of ConcurrentHashmap is somewhat similar to Hashmap as a whole, but the segment is removed. Is an array using node.

transient volatile Node<K,V>[] table;
Copy after login

There is still an internal class called Segment in 1.8, but its existence is only for serialization compatibility and is no longer used.

Let’s take a look at the node node

    static class Node<K,V> implements Map.Entry<K,V> {
        final int hash;
        final K key;
        volatile V val;
        volatile Node<K,V> next;
        Node(int hash, K key, V val, Node<K,V> next) {
            this.hash = hash;
            this.key = key;
            this.val = val;
            this.next = next;
        }
        ......
    }
Copy after login

It is similar to the node node in HashMap, and also implements Map.Entry. The difference is that val and next are modified with volatile to ensure visibility.

put()

    final V putVal(K key, V value, boolean onlyIfAbsent) {
        if (key == null || value == null) throw new NullPointerException();
        int hash = spread(key.hashCode());
        int binCount = 0;
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            if (tab == null || (n = tab.length) == 0)
                // 初始化
                tab = initTable();
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                // 利用CAS去进行无锁线程安全操作,如果bin是空的
                if (casTabAt(tab, i, null,
                             new Node<K,V>(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin
            }
            else if ((fh = f.hash) == MOVED)
                tab = helpTransfer(tab, f);
            else {
                V oldVal = null;
                synchronized (f) {
                     // 细粒度的同步修改操作... 
                    if (tabAt(tab, i) == f) {
                        if (fh >= 0) {
                            binCount = 1;
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                                // 找到相同key就更新
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node<K,V> pred = e;
                                // 没有相同的就新增
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K,V>(hash, key,
                                                              value, null);
                                    break;
                                }
                            }
                        }
                        // 如果是树节点,进行树的操作
                        else if (f instanceof TreeBin) {
                            Node<K,V> p;
                            binCount = 2;
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
                // Bin超过阈值,进行树化
                if (binCount != 0) {
                    if (binCount >= TREEIFY_THRESHOLD)
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        addCount(1L, binCount);
        return null;
    }
Copy after login

You can see that in the synchronization logic, it uses synchronized instead of the usually recommended ReentrantLock and the like. Why is this? Now in JDK1.8, synchronized has been continuously optimized, so you no longer have to worry too much about performance differences. In addition, compared to ReentrantLock, it can reduce memory consumption, which is a very big advantage.

At the same time, more detailed implementations have been optimized by using Unsafe. For example, tabAt directly uses getObjectAcquire to avoid the overhead of indirect calls.

So, let’s take a look at how size works?

    final long sumCount() {
        CounterCell[] as = counterCells; CounterCell a;
        long sum = baseCount;
        if (as != null) {
            for (int i = 0; i < as.length; ++i) {
                if ((a = as[i]) != null)
                    sum += a.value;
            }
        }
        return sum;
    }
Copy after login

Here is to get the member variable counterCells and traverse to get the total number.

In fact, the operation of CounterCell is based on java.util.concurrent.atomic.LongAdder. It is a method for JVM to use space in exchange for higher efficiency, taking advantage of the complex logic inside Striped64. This thing is very niche. In most cases, it is recommended to use AtomicLong, which is enough to meet the performance needs of most applications.

Expansion

 private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
		......
        // 初始化
        if (nextTab == null) {            // initiating
            try {
                @SuppressWarnings("unchecked")
                Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
                nextTab = nt;
            } catch (Throwable ex) {      // try to cope with OOME
                sizeCtl = Integer.MAX_VALUE;
                return;
            }
            nextTable = nextTab;
            transferIndex = n;
        }
        int nextn = nextTab.length;
        ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
     	// 是否继续处理下一个
        boolean advance = true;
     	// 是否完成
        boolean finishing = false; // to ensure sweep before committing nextTab
        for (int i = 0, bound = 0;;) {
            Node<K,V> f; int fh;
            while (advance) {
                int nextIndex, nextBound;
                if (--i >= bound || finishing)
                    advance = false;
                else if ((nextIndex = transferIndex) <= 0) {
                    i = -1;
                    advance = false;
                }
                // 首次循环才会进来这里
                else if (U.compareAndSwapInt
                         (this, TRANSFERINDEX, nextIndex,
                          nextBound = (nextIndex > stride ?
                                       nextIndex - stride : 0))) {
                    bound = nextBound;
                    i = nextIndex - 1;
                    advance = false;
                }
            }
            if (i < 0 || i >= n || i + n >= nextn) {
                int sc;
                //扩容结束后做后续工作
                if (finishing) {
                    nextTable = null;
                    table = nextTab;
                    sizeCtl = (n << 1) - (n >>> 1);
                    return;
                }
                //每当一条线程扩容结束就会更新一次 sizeCtl 的值,进行减 1 操作
                if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                    if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                        return;
                    finishing = advance = true;
                    i = n; // recheck before commit
                }
            }
            // 如果是null,设置fwd
            else if ((f = tabAt(tab, i)) == null)
                advance = casTabAt(tab, i, null, fwd);
            // 说明该位置已经被处理过了,不需要再处理
            else if ((fh = f.hash) == MOVED)
                advance = true; // already processed
            else {
                // 真正的处理逻辑
                synchronized (f) {
                    if (tabAt(tab, i) == f) {
                        Node<K,V> ln, hn;
                        if (fh >= 0) {
                            int runBit = fh & n;
                            Node<K,V> lastRun = f;
                            for (Node<K,V> p = f.next; p != null; p = p.next) {
                                int b = p.hash & n;
                                if (b != runBit) {
                                    runBit = b;
                                    lastRun = p;
                                }
                            }
                            if (runBit == 0) {
                                ln = lastRun;
                                hn = null;
                            }
                            else {
                                hn = lastRun;
                                ln = null;
                            }
                            for (Node<K,V> p = f; p != lastRun; p = p.next) {
                                int ph = p.hash; K pk = p.key; V pv = p.val;
                                if ((ph & n) == 0)
                                    ln = new Node<K,V>(ph, pk, pv, ln);
                                else
                                    hn = new Node<K,V>(ph, pk, pv, hn);
                            }
                            setTabAt(nextTab, i, ln);
                            setTabAt(nextTab, i + n, hn);
                            setTabAt(tab, i, fwd);
                            advance = true;
                        }
                        // 树节点操作
                        else if (f instanceof TreeBin) {
                            ......
                        }
                    }
                }
            }
        }
    }
Copy after login
     }
                        setTabAt(nextTab, i, ln);
                        setTabAt(nextTab, i + n, hn);
                        setTabAt(tab, i, fwd);
                        advance = true;
                    }
                    // 树节点操作
                    else if (f instanceof TreeBin) {
                        ......
                    }
                }
            }
        }
    }
}
Copy after login

The core logic is the same as HashMap in creating two linked lists, but with the addition of the operation of obtaining lastRun.

The above is the detailed content of How to use ConcurrentHashMap to implement thread-safe mapping in Java?. For more information, please follow other related articles on the PHP Chinese website!

Statement of this Website
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn

Hot AI Tools

Undresser.AI Undress

Undresser.AI Undress

AI-powered app for creating realistic nude photos

AI Clothes Remover

AI Clothes Remover

Online AI tool for removing clothes from photos.

Undress AI Tool

Undress AI Tool

Undress images for free

Clothoff.io

Clothoff.io

AI clothes remover

Video Face Swap

Video Face Swap

Swap faces in any video effortlessly with our completely free AI face swap tool!

Hot Tools

Notepad++7.3.1

Notepad++7.3.1

Easy-to-use and free code editor

SublimeText3 Chinese version

SublimeText3 Chinese version

Chinese version, very easy to use

Zend Studio 13.0.1

Zend Studio 13.0.1

Powerful PHP integrated development environment

Dreamweaver CS6

Dreamweaver CS6

Visual web development tools

SublimeText3 Mac version

SublimeText3 Mac version

God-level code editing software (SublimeText3)

Perfect Number in Java Perfect Number in Java Aug 30, 2024 pm 04:28 PM

Guide to Perfect Number in Java. Here we discuss the Definition, How to check Perfect number in Java?, examples with code implementation.

Weka in Java Weka in Java Aug 30, 2024 pm 04:28 PM

Guide to Weka in Java. Here we discuss the Introduction, how to use weka java, the type of platform, and advantages with examples.

Smith Number in Java Smith Number in Java Aug 30, 2024 pm 04:28 PM

Guide to Smith Number in Java. Here we discuss the Definition, How to check smith number in Java? example with code implementation.

Java Spring Interview Questions Java Spring Interview Questions Aug 30, 2024 pm 04:29 PM

In this article, we have kept the most asked Java Spring Interview Questions with their detailed answers. So that you can crack the interview.

Break or return from Java 8 stream forEach? Break or return from Java 8 stream forEach? Feb 07, 2025 pm 12:09 PM

Java 8 introduces the Stream API, providing a powerful and expressive way to process data collections. However, a common question when using Stream is: How to break or return from a forEach operation? Traditional loops allow for early interruption or return, but Stream's forEach method does not directly support this method. This article will explain the reasons and explore alternative methods for implementing premature termination in Stream processing systems. Further reading: Java Stream API improvements Understand Stream forEach The forEach method is a terminal operation that performs one operation on each element in the Stream. Its design intention is

TimeStamp to Date in Java TimeStamp to Date in Java Aug 30, 2024 pm 04:28 PM

Guide to TimeStamp to Date in Java. Here we also discuss the introduction and how to convert timestamp to date in java along with examples.

Java Program to Find the Volume of Capsule Java Program to Find the Volume of Capsule Feb 07, 2025 am 11:37 AM

Capsules are three-dimensional geometric figures, composed of a cylinder and a hemisphere at both ends. The volume of the capsule can be calculated by adding the volume of the cylinder and the volume of the hemisphere at both ends. This tutorial will discuss how to calculate the volume of a given capsule in Java using different methods. Capsule volume formula The formula for capsule volume is as follows: Capsule volume = Cylindrical volume Volume Two hemisphere volume in, r: The radius of the hemisphere. h: The height of the cylinder (excluding the hemisphere). Example 1 enter Radius = 5 units Height = 10 units Output Volume = 1570.8 cubic units explain Calculate volume using formula: Volume = π × r2 × h (4

Create the Future: Java Programming for Absolute Beginners Create the Future: Java Programming for Absolute Beginners Oct 13, 2024 pm 01:32 PM

Java is a popular programming language that can be learned by both beginners and experienced developers. This tutorial starts with basic concepts and progresses through advanced topics. After installing the Java Development Kit, you can practice programming by creating a simple "Hello, World!" program. After you understand the code, use the command prompt to compile and run the program, and "Hello, World!" will be output on the console. Learning Java starts your programming journey, and as your mastery deepens, you can create more complex applications.

See all articles