首页 常见问题 concurrenthashmap为什么是线程安全

concurrenthashmap为什么是线程安全

Mar 30, 2020 am 09:21 AM
concurrenthashmap

concurrenthashmap为什么是线程安全

一、HashMap和ConcurrentHashMap的对比

我们用一段代码证明下HashMap的线程不安全,以及ConcurrentHashMap的线程安全性。代码逻辑很简单,开启10000个线程,每个线程做很简单的操作,就是put一个key,然后删除一个key,理论上线程安全的情况下,最后map的size()肯定为0。

推荐教程:《java学习

Map<Object,Object> myMap=new HashMap<>();
        // ConcurrentHashMap myMap=new ConcurrentHashMap();
        for (int i = 0; i <10000 ; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                        double a=Math.random();
                        myMap.put(a,a);
                        myMap.remove(a);
                }
            }).start();
        }
        Thread.sleep(15000l);//多休眠下,保证上面的线程操作完毕。
        System.out.println(myMap.size());
登录后复制

这里显示Map的size=13,但是实际上map里还有一个key。 同样的代码我们用ConcurrentHashMap来运行下,结果map ==0

这里也就证明了ConcurrentHashMap是线程安全的,我们接下来从源码分析下ConcurrentHashMap是如何保证线程安全的,本次源码jdk版本为1.8。

二、ConcurrentHashMap源码分析

3.1 ConcurrentHashMap的基础属性

//默认最大的容量 
private static final int MAXIMUM_CAPACITY = 1 << 30;
//默认初始化的容量
private static final int DEFAULT_CAPACITY = 16;
//最大的数组可能长度
static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
//默认的并发级别,目前并没有用,只是为了保持兼容性
private static final int DEFAULT_CONCURRENCY_LEVEL = 16;
//和hashMap一样,负载因子
private static final float LOAD_FACTOR = 0.75f;
//和HashMap一样,链表转换为红黑树的阈值,默认是8
static final int TREEIFY_THRESHOLD = 8;
//红黑树转换链表的阀值,默认是6
static final int UNTREEIFY_THRESHOLD = 6;
//进行链表转换最少需要的数组长度,如果没有达到这个数字,只能进行扩容
static final int MIN_TREEIFY_CAPACITY = 64;
//table扩容时, 每个线程最少迁移table的槽位个数
private static final int MIN_TRANSFER_STRIDE = 16;
//感觉是用来计算偏移量和线程数量的标记
private static int RESIZE_STAMP_BITS = 16;
//能够调整的最大线程数量
private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;
//记录偏移量
private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;
//值为-1, 当Node.hash为MOVED时, 代表着table正在扩容
static final int MOVED     = -1;
//TREEBIN, 置为-2, 代表此元素后接红黑树
static final int TREEBIN   = -2;
//感觉是占位符,目前没看出来明显的作用
static final int RESERVED  = -3;
//主要用来计算Hash值的
static final int HASH_BITS = 0x7fffffff; 
//节点数组
transient volatile Node<K,V>[] table;
//table迁移过程临时变量, 在迁移过程中将元素全部迁移到nextTable上
private transient volatile Node<K,V>[] nextTable;
//基础计数器
private transient volatile long baseCount;
//table扩容和初始化的标记,不同的值代表不同的含义,默认为0,表示未初始化
//-1: table正在初始化;小于-1,表示table正在扩容;大于0,表示初始化完成后下次扩容的大小
private transient volatile int sizeCtl;
//table容量从n扩到2n时, 是从索引n->1的元素开始迁移, transferIndex代表当前已经迁移的元素下标
private transient volatile int transferIndex;
//扩容时候,CAS锁标记
private transient volatile int cellsBusy;
//计数器表,大小是2次幂
private transient volatile CounterCell[] counterCells;
登录后复制

上面就是ConcurrentHashMap的基本属性,我们大部分和HashMap一样,只是增加了部分属性,后面我们来分析增加的部分属性是起到如何的作用的。

2.2 ConcurrentHashMap的常用方法属性

put方法

final V putVal(K key, V value, boolean onlyIfAbsent) {
        //key和value不允许为null
        if (key == null || value == null) throw new NullPointerException();
        //计算hash值
        int hash = spread(key.hashCode());
        int binCount = 0;
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            //如果table没有初始化,进行初始化
            if (tab == null || (n = tab.length) == 0)
                tab = initTable();
            //计算数组的位置    
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                //如果该位置为空,构造新节点添加即可
                if (casTabAt(tab, i, null,
                             new Node<K,V>(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin
            }//如果正在扩容
            else if ((fh = f.hash) == MOVED)
            //帮着一起扩容
                tab = helpTransfer(tab, f);
            else {
            //开始真正的插入
                V oldVal = null;
                synchronized (f) {
                    if (tabAt(tab, i) == f) {
                    //如果已经初始化完成了
                        if (fh >= 0) {
                            binCount = 1;
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                                //这里key相同直接覆盖原来的节点
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node<K,V> pred = e;
                                //否则添加到节点的最后面
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K,V>(hash, key,  value, null);
                                    break;
                                }
                            }
                        }//如果节点是树节点,就进行树节点添加操作
                        else if (f instanceof TreeBin) {
                            Node<K,V> p;
                            binCount = 2;
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,alue)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }//判断节点是否要转换成红黑树
                if (binCount != 0) {
                    if (binCount >= TREEIFY_THRESHOLD)
                        treeifyBin(tab, i);//红黑树转换
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        //计数器,采用CAS计算size大小,并且检查是否需要扩容
        addCount(1L, binCount);
        return null;
    }
登录后复制

我们发现ConcurrentHashMap的put方法和HashMap的逻辑相差不大,主要是新增了线程安全部分,在添加元素时候,采用synchronized来保证线程安全,然后计算size的时候采用CAS操作进行计算。整个put流程比较简单,总结下就是:

1.判断key和vaule是否为空,如果为空,直接抛出异常。

2.判断table数组是否已经初始化完毕,如果没有初始化,进行初始化。

3.计算key的hash值,如果该位置为空,直接构造节点放入。

4.如果table正在扩容,进入帮助扩容方法。

5.最后开启同步锁,进行插入操作,如果开启了覆盖选项,直接覆盖,否则,构造节点添加到尾部,如果节点数超过红黑树阈值,进行红黑树转换。如果当前节点是树节点,进行树插入操作。

6.最后统计size大小,并计算是否需要扩容。

get方法

public V get(Object key) {
        Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
        //计算hash值
        int h = spread(key.hashCode());
        //如果table已经初始化,并且计算hash值的索引位置node不为空
        if ((tab = table) != null && (n = tab.length) > 0 &&
            (e = tabAt(tab, (n - 1) & h)) != null) {
            //如果hash相等,key相等,直接返回该节点的value
            if ((eh = e.hash) == h) {
                if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                    return e.val;
            }//如果hash值为负值表示正在扩容,这个时候查的是ForwardingNode的find方法来定位到节点。
            else if (eh < 0)
                return (p = e.find(h, key)) != null ? p.val : null;
            //循环遍历链表,查询key和hash值相等的节点。
            while ((e = e.next) != null) {
                if (e.hash == h &&
                    ((ek = e.key) == key || (ek != null && key.equals(ek))))
                    return e.val;
            }
        }
        return null;
    }
登录后复制

get方法比较简单,主要流程如下:

1.直接计算hash值,查找的节点如果key和hash值相等,直接返回该节点的value就行。

2.如果table正在扩容,就调用ForwardingNode的find方法查找节点。

3.如果没有扩容,直接循环遍历链表,查找到key和hash值一样的节点值即可。

ConcurrentHashMap的扩容

ConcurrentHashMap的扩容相对于HashMap的扩容相对复杂,因为涉及到了多线程操作,这里扩容方法主要是transfer,我们来分析下这个方法的源码,研究下是如何扩容的。

private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
        int n = tab.length, stride;
        //保证每个线程扩容最少是16,
        if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
            stride = MIN_TRANSFER_STRIDE; // subdivide range
        if (nextTab == null) {            // initiating
            try {
            //扩容2倍
                @SuppressWarnings("unchecked")
                Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
                nextTab = nt;
            } catch (Throwable ex) {      // try to cope with OOME
                //出现异常情况就不扩容了。
                sizeCtl = Integer.MAX_VALUE;
                return;
            }
            //用新数组对象接收
            nextTable = nextTab;
            //初始化扩容下表为原数组的长度
            transferIndex = n;
        }
        int nextn = nextTab.length;
        //扩容期间的过渡节点
        ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
        boolean advance = true;
        boolean finishing = false; // to ensure sweep before committing nextTab
        
        for (int i = 0, bound = 0;;) {
            Node<K,V> f; int fh;
            while (advance) {
                int nextIndex, nextBound;
                //如果该线程已经完成了
                if (--i >= bound || finishing)
                    advance = false;
                //设置扩容转移下标,如果下标小于0,说明已经没有区间可以操作了,线程可以退出了
                else if ((nextIndex = transferIndex) <= 0) {
                    i = -1;
                    advance = false;
                }CAS操作设置区间
                else if (U.compareAndSwapInt
                         (this, TRANSFERINDEX, nextIndex,
                          nextBound = (nextIndex > stride ?
                                       nextIndex - stride : 0))) {
                    bound = nextBound;
                    i = nextIndex - 1;
                    advance = false;
                }
            }
            //如果计算的区间小于0了,说明区间分配已经完成,没有剩余区间分配了
            if (i < 0 || i >= n || i + n >= nextn) {
                int sc;
                if (finishing) {//如果扩容完成了,进行收尾工作
                    nextTable = null;//清空临时数组
                    table = nextTab;//赋值原数组
                    sizeCtl = (n << 1) - (n >>> 1);//重新赋值sizeCtl
                    return;
                }//如果扩容还在进行,自己任务完成就进行sizeCtl-1,这里是因为,扩容是通过helpTransfer()和addCount()方法来调用的,在调用transfer()真正扩容之前,sizeCtl都会+1,所以这里每个线程完成后就进行-1。
                if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                //这里应该是判断扩容是否结束
                    if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                        return;
                    //结束,赋值状态
                    finishing = advance = true;
                    i = n; // recheck before commit
                }
            }//如果在table中没找到,就用过渡节点
            else if ((f = tabAt(tab, i)) == null)
                //成功设置就进入下一个节点
                advance = casTabAt(tab, i, null, fwd);
            else if ((fh = f.hash) == MOVED)
                //如果节点不为空,并且该位置的hash值为-1,表示已经处理了,直接进入下一个循环即可
                advance = true; // already processed
            else {
            //这里说明老table该位置不为null,也没有被处理过,进行真正的处理逻辑。进行同步锁
                synchronized (f) {
                    if (tabAt(tab, i) == f) {
                        Node<K,V> ln, hn;
                        //如果hash值大于0
                        if (fh >= 0) {
                        //为运算结果
                            int runBit = fh & n;
                            Node<K,V> lastRun = f;
                            for (Node<K,V> p = f.next; p != null; p = p.next) {
                                int b = p.hash & n;
                                if (b != runBit) {
                                    runBit = b;
                                    lastRun = p;
                                }
                            }
                            if (runBit == 0) {
                                ln = lastRun;
                                hn = null;
                            }
                            else {
                                hn = lastRun;
                                ln = null;
                            }
                            for (Node<K,V> p = f; p != lastRun; p = p.next) {
                                int ph = p.hash; K pk = p.key; V pv = p.val;
                                //这里的逻辑和hashMap是一样的,都是采用2个链表进行处理,具体分析可以查看我分析HashMap的文章
                                if ((ph & n) == 0)
                                    ln = new Node<K,V>(ph, pk, pv, ln);
                                else
                                    hn = new Node<K,V>(ph, pk, pv, hn);
                            }
                            setTabAt(nextTab, i, ln);
                            setTabAt(nextTab, i + n, hn);
                            setTabAt(tab, i, fwd);
                            advance = true;
                        }//如果是树节点,执行树节点的扩容数据转移
                        else if (f instanceof TreeBin) {
                            TreeBin<K,V> t = (TreeBin<K,V>)f;
                            TreeNode<K,V> lo = null, loTail = null;
                            TreeNode<K,V> hi = null, hiTail = null;
                            int lc = 0, hc = 0;
                            for (Node<K,V> e = t.first; e != null; e = e.next) {
                                int h = e.hash;
                                TreeNode<K,V> p = new TreeNode<K,V>
                                    (h, e.key, e.val, null, null);
                                //也是通过位运算判断两个链表的位置    
                                if ((h & n) == 0) {
                                    if ((p.prev = loTail) == null)
                                        lo = p;
                                    else
                                        loTail.next = p;
                                    loTail = p;
                                    ++lc;
                                }
                                else {
                                    if ((p.prev = hiTail) == null)
                                        hi = p;
                                    else
                                        hiTail.next = p;
                                    hiTail = p;
                                    ++hc;
                                }
                            }
                            //这里判断是否进行树转换
                            ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                                (hc != 0) ? new TreeBin<K,V>(lo) : t;
                            hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                                (lc != 0) ? new TreeBin<K,V>(hi) : t;
                           //这里把新处理的链表赋值到新数组中
                            setTabAt(nextTab, i, ln);
                            setTabAt(nextTab, i + n, hn);
                            setTabAt(tab, i, fwd);
                            advance = true;
                        }
                    }
                }
            }
        }
    }
登录后复制

ConcurrentHashMap的扩容还是比较复杂,复杂主要表现在,控制多线程扩容层面上,扩容的源码我没有解析的很细,一方面是确实比较复杂,本人有某些地方也不是太明白,另一方面是我觉得我们研究主要是弄懂其思想,能搞明白关键代码和关键思路即可,只要不是重新实现一套类似的功能,我想就不用纠结其全部细节了。总结下ConcurrentHashMap的扩容步骤如下:

1.获取线程扩容处理步长,最少是16,也就是单个线程处理扩容的节点个数。

2.新建一个原来容量2倍的数组,构造过渡节点,用于扩容期间的查询操作。

3.进行死循环进行转移节点,主要根据finishing变量判断是否扩容结束,在扩容期间通过给不同的线程设置不同的下表索引进行扩容操作,就是不同的线程,操作的数组分段不一样,同时利用synchronized同步锁锁住操作的节点,保证了线程安全。

4.真正进行节点在新数组的位置是和HashMap扩容逻辑一样的,通过位运算计算出新链表是否位于原位置或者位于原位置+扩容的长度位置,具体分析可以查看我的这篇文章。

三、总结

1.ConcurrentHashMap大部分的逻辑代码和HashMap是一样的,主要通过synchronized和来保证节点插入扩容的线程安全,这里肯定有同学会问,为啥使用synchronized呢?而不用采取乐观锁,或者lock呢?我个人觉得可能原因有2点:

a.乐观锁比较适用于竞争冲突比较少的场景,如果冲突比较多,那么就会导致不停的重试,这样反而性能更低。

b.synchronized在经历了优化之后,其实性能已经和lock没啥差异了,某些场景可能还比lock快。所以,我觉得这是采用synchronized来同步的原因。

2.ConcurrentHashMap的扩容核心逻辑主要是给不同的线程分配不同的数组下标,然后每个线程处理各自下表区间的节点。同时处理节点复用了hashMap的逻辑,通过位运行,可以知道节点扩容后的位置,要么在原位置,要么在原位置+oldlength位置,最后直接赋值即可。

以上是concurrenthashmap为什么是线程安全的详细内容。更多信息请关注PHP中文网其他相关文章!

本站声明
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn

热AI工具

Undresser.AI Undress

Undresser.AI Undress

人工智能驱动的应用程序,用于创建逼真的裸体照片

AI Clothes Remover

AI Clothes Remover

用于从照片中去除衣服的在线人工智能工具。

Undress AI Tool

Undress AI Tool

免费脱衣服图片

Clothoff.io

Clothoff.io

AI脱衣机

AI Hentai Generator

AI Hentai Generator

免费生成ai无尽的。

热门文章

R.E.P.O.能量晶体解释及其做什么(黄色晶体)
2 周前 By 尊渡假赌尊渡假赌尊渡假赌
仓库:如何复兴队友
4 周前 By 尊渡假赌尊渡假赌尊渡假赌
Hello Kitty Island冒险:如何获得巨型种子
3 周前 By 尊渡假赌尊渡假赌尊渡假赌

热工具

记事本++7.3.1

记事本++7.3.1

好用且免费的代码编辑器

SublimeText3汉化版

SublimeText3汉化版

中文版,非常好用

禅工作室 13.0.1

禅工作室 13.0.1

功能强大的PHP集成开发环境

Dreamweaver CS6

Dreamweaver CS6

视觉化网页开发工具

SublimeText3 Mac版

SublimeText3 Mac版

神级代码编辑软件(SublimeText3)

deepseek网页版官方入口 deepseek网页版官方入口 Mar 12, 2025 pm 01:42 PM

国产AI黑马DeepSeek强势崛起,震撼全球AI界!这家成立仅一年半的中国人工智能公司,凭借其免费开源的大模型DeepSeek-V3和DeepSeek-R1,在性能上与OpenAI等国际巨头比肩,甚至在成本控制方面实现了突破性进展,赢得了全球用户的广泛赞誉。DeepSeek-R1现已全面上线,性能媲美OpenAIo1正式版!您可以在网页端、APP以及API接口体验其强大的功能。下载方式:支持iOS和安卓系统,用户可通过应用商店下载;网页版也已正式开放!DeepSeek网页版官方入口:ht

深度求索deepseek官网入口 深度求索deepseek官网入口 Mar 12, 2025 pm 01:33 PM

2025年开年,国产AI“深度求索”(deepseek)惊艳亮相!这款免费开源的AI模型,性能堪比OpenAI的o1正式版,并已在网页端、APP和API全面上线,支持iOS、安卓和网页版多端同步使用。深度求索deepseek官网及使用指南:官网地址:https://www.deepseek.com/网页版使用步骤:点击上方链接进入deepseek官网。点击首页的“开始对话”按钮。首次使用需进行手机验证码登录。登录后即可进入对话界面。deepseek功能强大,可进行代码编写、文件读取、创

deepseek服务器繁忙怎么解决 deepseek服务器繁忙怎么解决 Mar 12, 2025 pm 01:39 PM

DeepSeek:火爆AI遭遇服务器拥堵,如何应对?DeepSeek作为2025年开年爆款AI,免费开源且性能媲美OpenAIo1正式版,其受欢迎程度可见一斑。然而,高并发也带来了服务器繁忙的问题。本文将分析原因并提供应对策略。DeepSeek网页版入口:https://www.deepseek.com/DeepSeek服务器繁忙的原因:高并发访问:DeepSeek的免费和强大功能吸引了大量用户同时使用,导致服务器负载过高。网络攻击:据悉,DeepSeek对美国金融界造成冲击,