> 일반적인 문제 > ConcurrentHashmap이 스레드로부터 안전한 이유는 무엇입니까?

ConcurrentHashmap이 스레드로부터 안전한 이유는 무엇입니까?

藏色散人
풀어 주다: 2020-03-30 09:21:26
원래의
8012명이 탐색했습니다.

ConcurrentHashmap이 스레드로부터 안전한 이유는 무엇입니까?

1. HashMap과 ConcurrentHashMap의 비교

HashMap의 스레드 불안정성과 ConcurrentHashMap의 스레드 안전성을 증명하기 위해 코드 조각을 사용합니다. . 코드 로직은 매우 간단합니다. 10,000개의 스레드가 시작됩니다. 각 스레드는 키를 넣은 다음 키를 삭제하는 간단한 작업을 수행합니다. 이론적으로 스레드 안전성이 유지되면 최종 맵 크기()는 0이 되어야 합니다.

추천 튜토리얼: "javalearning"

Map<Object,Object> myMap=new HashMap<>();
        // ConcurrentHashMap myMap=new ConcurrentHashMap();
        for (int i = 0; i <10000 ; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                        double a=Math.random();
                        myMap.put(a,a);
                        myMap.remove(a);
                }
            }).start();
        }
        Thread.sleep(15000l);//多休眠下,保证上面的线程操作完毕。
        System.out.println(myMap.size());
로그인 후 복사

여기에 표시된 맵의 크기는 13이지만 실제로는 맵에 키가 있습니다. . ConcurrentHashMap을 사용하여 동일한 코드를 실행하고 결과 맵 ==0

이것은 또한 ConcurrentHashMap이 스레드로부터 안전하다는 것을 증명합니다. ConcurrentHashMap이 소스 코드에서 스레드 안전성을 보장하는 방법을 분석해 보겠습니다. 보조 소스 코드 jdk 버전은 1.8입니다.

2. ConcurrentHashMap 소스 코드 분석

3.1 ConcurrentHashMap의 기본 속성

//默认最大的容量 
private static final int MAXIMUM_CAPACITY = 1 << 30;
//默认初始化的容量
private static final int DEFAULT_CAPACITY = 16;
//最大的数组可能长度
static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
//默认的并发级别,目前并没有用,只是为了保持兼容性
private static final int DEFAULT_CONCURRENCY_LEVEL = 16;
//和hashMap一样,负载因子
private static final float LOAD_FACTOR = 0.75f;
//和HashMap一样,链表转换为红黑树的阈值,默认是8
static final int TREEIFY_THRESHOLD = 8;
//红黑树转换链表的阀值,默认是6
static final int UNTREEIFY_THRESHOLD = 6;
//进行链表转换最少需要的数组长度,如果没有达到这个数字,只能进行扩容
static final int MIN_TREEIFY_CAPACITY = 64;
//table扩容时, 每个线程最少迁移table的槽位个数
private static final int MIN_TRANSFER_STRIDE = 16;
//感觉是用来计算偏移量和线程数量的标记
private static int RESIZE_STAMP_BITS = 16;
//能够调整的最大线程数量
private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;
//记录偏移量
private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;
//值为-1, 当Node.hash为MOVED时, 代表着table正在扩容
static final int MOVED     = -1;
//TREEBIN, 置为-2, 代表此元素后接红黑树
static final int TREEBIN   = -2;
//感觉是占位符,目前没看出来明显的作用
static final int RESERVED  = -3;
//主要用来计算Hash值的
static final int HASH_BITS = 0x7fffffff; 
//节点数组
transient volatile Node<K,V>[] table;
//table迁移过程临时变量, 在迁移过程中将元素全部迁移到nextTable上
private transient volatile Node<K,V>[] nextTable;
//基础计数器
private transient volatile long baseCount;
//table扩容和初始化的标记,不同的值代表不同的含义,默认为0,表示未初始化
//-1: table正在初始化;小于-1,表示table正在扩容;大于0,表示初始化完成后下次扩容的大小
private transient volatile int sizeCtl;
//table容量从n扩到2n时, 是从索引n->1的元素开始迁移, transferIndex代表当前已经迁移的元素下标
private transient volatile int transferIndex;
//扩容时候,CAS锁标记
private transient volatile int cellsBusy;
//计数器表,大小是2次幂
private transient volatile CounterCell[] counterCells;
로그인 후 복사

위 내용은 ConcurrentHashMap의 기본 속성입니다. 일부 속성이 추가되었다는 점을 제외하면 HashMap과 동일합니다. 나중에 추가된 속성이 어떤 역할을 하는지 분석할 것입니다.

2.2 ConcurrentHashMap의 공통 메소드 속성

put method

final V putVal(K key, V value, boolean onlyIfAbsent) {
        //key和value不允许为null
        if (key == null || value == null) throw new NullPointerException();
        //计算hash值
        int hash = spread(key.hashCode());
        int binCount = 0;
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            //如果table没有初始化,进行初始化
            if (tab == null || (n = tab.length) == 0)
                tab = initTable();
            //计算数组的位置    
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                //如果该位置为空,构造新节点添加即可
                if (casTabAt(tab, i, null,
                             new Node<K,V>(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin
            }//如果正在扩容
            else if ((fh = f.hash) == MOVED)
            //帮着一起扩容
                tab = helpTransfer(tab, f);
            else {
            //开始真正的插入
                V oldVal = null;
                synchronized (f) {
                    if (tabAt(tab, i) == f) {
                    //如果已经初始化完成了
                        if (fh >= 0) {
                            binCount = 1;
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                                //这里key相同直接覆盖原来的节点
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node<K,V> pred = e;
                                //否则添加到节点的最后面
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K,V>(hash, key,  value, null);
                                    break;
                                }
                            }
                        }//如果节点是树节点,就进行树节点添加操作
                        else if (f instanceof TreeBin) {
                            Node<K,V> p;
                            binCount = 2;
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,alue)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }//判断节点是否要转换成红黑树
                if (binCount != 0) {
                    if (binCount >= TREEIFY_THRESHOLD)
                        treeifyBin(tab, i);//红黑树转换
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        //计数器,采用CAS计算size大小,并且检查是否需要扩容
        addCount(1L, binCount);
        return null;
    }
로그인 후 복사

ConcurrentHashMap의 put 메소드는 HashMap의 로직과 크게 다르지 않다는 것을 발견했습니다. , 주로 신규 스레드 안전 부분이 추가됩니다. 요소 추가 시 동기화를 사용하여 스레드 안전을 보장한 다음 CAS 연산을 사용하여 크기를 계산합니다. 전체 넣기 프로세스는 비교적 간단합니다.

1. 키와 값이 비어 있으면 직접 예외를 발생시킵니다.

2. 테이블 배열이 초기화되었는지 확인합니다. 그렇지 않으면 초기화합니다.

3. 키의 해시 값을 계산하여 위치가 비어 있으면 노드를 직접 구성하여 넣습니다.

4. 테이블이 확장되는 경우 도움말 확장 방법을 입력합니다.

5.마지막으로 동기화 잠금을 켜고 삽입 작업을 수행합니다. 덮어쓰기 옵션이 켜져 있으면 직접 덮어씁니다. 그렇지 않으면 노드를 구성하여 끝에 추가합니다. 노드가 레드-블랙 트리 임계값을 초과하는 경우 레드-블랙 트리 변환을 수행합니다. 현재 노드가 트리 노드이면 트리 삽입 작업을 수행합니다.

6. 마지막으로 크기를 계산하고 확장이 필요한지 여부를 계산합니다.

get method

public V get(Object key) {
        Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
        //计算hash值
        int h = spread(key.hashCode());
        //如果table已经初始化,并且计算hash值的索引位置node不为空
        if ((tab = table) != null && (n = tab.length) > 0 &&
            (e = tabAt(tab, (n - 1) & h)) != null) {
            //如果hash相等,key相等,直接返回该节点的value
            if ((eh = e.hash) == h) {
                if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                    return e.val;
            }//如果hash值为负值表示正在扩容,这个时候查的是ForwardingNode的find方法来定位到节点。
            else if (eh < 0)
                return (p = e.find(h, key)) != null ? p.val : null;
            //循环遍历链表,查询key和hash值相等的节点。
            while ((e = e.next) != null) {
                if (e.hash == h &&
                    ((ek = e.key) == key || (ek != null && key.equals(ek))))
                    return e.val;
            }
        }
        return null;
    }
로그인 후 복사

get 메소드는 비교적 간단합니다.

1 해시 값을 직접 계산합니다. 검색할 노드는 키와 해시 값이 같으면 해당 노드의 값을 직접 반환하면 됩니다.

2. 테이블이 확장되면 ForwardingNode의 find 메소드를 호출하여 노드를 찾습니다.

3. 확장이 없으면 연결 목록을 반복하여 동일한 키와 해시 값을 가진 노드 값을 찾으세요.

ConcurrentHashMap의 확장

ConcurrentHashMap의 확장은 멀티 스레드 작업을 포함하기 때문에 HashMap의 확장에 비해 상대적으로 복잡합니다. 여기서의 확장 방법은 주로 전송입니다. 이 방법은 소스 코드를 확장하는 방법을 연구합니다.

private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
        int n = tab.length, stride;
        //保证每个线程扩容最少是16,
        if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
            stride = MIN_TRANSFER_STRIDE; // subdivide range
        if (nextTab == null) {            // initiating
            try {
            //扩容2倍
                @SuppressWarnings("unchecked")
                Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
                nextTab = nt;
            } catch (Throwable ex) {      // try to cope with OOME
                //出现异常情况就不扩容了。
                sizeCtl = Integer.MAX_VALUE;
                return;
            }
            //用新数组对象接收
            nextTable = nextTab;
            //初始化扩容下表为原数组的长度
            transferIndex = n;
        }
        int nextn = nextTab.length;
        //扩容期间的过渡节点
        ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
        boolean advance = true;
        boolean finishing = false; // to ensure sweep before committing nextTab
        
        for (int i = 0, bound = 0;;) {
            Node<K,V> f; int fh;
            while (advance) {
                int nextIndex, nextBound;
                //如果该线程已经完成了
                if (--i >= bound || finishing)
                    advance = false;
                //设置扩容转移下标,如果下标小于0,说明已经没有区间可以操作了,线程可以退出了
                else if ((nextIndex = transferIndex) <= 0) {
                    i = -1;
                    advance = false;
                }CAS操作设置区间
                else if (U.compareAndSwapInt
                         (this, TRANSFERINDEX, nextIndex,
                          nextBound = (nextIndex > stride ?
                                       nextIndex - stride : 0))) {
                    bound = nextBound;
                    i = nextIndex - 1;
                    advance = false;
                }
            }
            //如果计算的区间小于0了,说明区间分配已经完成,没有剩余区间分配了
            if (i < 0 || i >= n || i + n >= nextn) {
                int sc;
                if (finishing) {//如果扩容完成了,进行收尾工作
                    nextTable = null;//清空临时数组
                    table = nextTab;//赋值原数组
                    sizeCtl = (n << 1) - (n >>> 1);//重新赋值sizeCtl
                    return;
                }//如果扩容还在进行,自己任务完成就进行sizeCtl-1,这里是因为,扩容是通过helpTransfer()和addCount()方法来调用的,在调用transfer()真正扩容之前,sizeCtl都会+1,所以这里每个线程完成后就进行-1。
                if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                //这里应该是判断扩容是否结束
                    if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                        return;
                    //结束,赋值状态
                    finishing = advance = true;
                    i = n; // recheck before commit
                }
            }//如果在table中没找到,就用过渡节点
            else if ((f = tabAt(tab, i)) == null)
                //成功设置就进入下一个节点
                advance = casTabAt(tab, i, null, fwd);
            else if ((fh = f.hash) == MOVED)
                //如果节点不为空,并且该位置的hash值为-1,表示已经处理了,直接进入下一个循环即可
                advance = true; // already processed
            else {
            //这里说明老table该位置不为null,也没有被处理过,进行真正的处理逻辑。进行同步锁
                synchronized (f) {
                    if (tabAt(tab, i) == f) {
                        Node<K,V> ln, hn;
                        //如果hash值大于0
                        if (fh >= 0) {
                        //为运算结果
                            int runBit = fh & n;
                            Node<K,V> lastRun = f;
                            for (Node<K,V> p = f.next; p != null; p = p.next) {
                                int b = p.hash & n;
                                if (b != runBit) {
                                    runBit = b;
                                    lastRun = p;
                                }
                            }
                            if (runBit == 0) {
                                ln = lastRun;
                                hn = null;
                            }
                            else {
                                hn = lastRun;
                                ln = null;
                            }
                            for (Node<K,V> p = f; p != lastRun; p = p.next) {
                                int ph = p.hash; K pk = p.key; V pv = p.val;
                                //这里的逻辑和hashMap是一样的,都是采用2个链表进行处理,具体分析可以查看我分析HashMap的文章
                                if ((ph & n) == 0)
                                    ln = new Node<K,V>(ph, pk, pv, ln);
                                else
                                    hn = new Node<K,V>(ph, pk, pv, hn);
                            }
                            setTabAt(nextTab, i, ln);
                            setTabAt(nextTab, i + n, hn);
                            setTabAt(tab, i, fwd);
                            advance = true;
                        }//如果是树节点,执行树节点的扩容数据转移
                        else if (f instanceof TreeBin) {
                            TreeBin<K,V> t = (TreeBin<K,V>)f;
                            TreeNode<K,V> lo = null, loTail = null;
                            TreeNode<K,V> hi = null, hiTail = null;
                            int lc = 0, hc = 0;
                            for (Node<K,V> e = t.first; e != null; e = e.next) {
                                int h = e.hash;
                                TreeNode<K,V> p = new TreeNode<K,V>
                                    (h, e.key, e.val, null, null);
                                //也是通过位运算判断两个链表的位置    
                                if ((h & n) == 0) {
                                    if ((p.prev = loTail) == null)
                                        lo = p;
                                    else
                                        loTail.next = p;
                                    loTail = p;
                                    ++lc;
                                }
                                else {
                                    if ((p.prev = hiTail) == null)
                                        hi = p;
                                    else
                                        hiTail.next = p;
                                    hiTail = p;
                                    ++hc;
                                }
                            }
                            //这里判断是否进行树转换
                            ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                                (hc != 0) ? new TreeBin<K,V>(lo) : t;
                            hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                                (lc != 0) ? new TreeBin<K,V>(hi) : t;
                           //这里把新处理的链表赋值到新数组中
                            setTabAt(nextTab, i, ln);
                            setTabAt(nextTab, i + n, hn);
                            setTabAt(tab, i, fwd);
                            advance = true;
                        }
                    }
                }
            }
        }
    }
로그인 후 복사

ConcurrentHashMap의 확장은 여전히 ​​상대적으로 복잡합니다. 그 복잡성은 주로 멀티 스레드 확장의 제어에 반영됩니다. 한편으로는 확장의 소스 코드를 자세히 분석하지 않았습니다. 좀 더 복잡한 문제가 있습니다. 반면에 우리의 연구는 주로 핵심 코드와 핵심 아이디어를 이해하는 것입니다. 유사한 기능 세트를 다시 구현하지 않는 한 모든 세부 사항에 대해 걱정할 필요는 없습니다. ConcurrentHashMap의 확장 단계를 요약하면

1입니다. 스레드 확장 처리 단계 크기를 가져옵니다. 이는 단일 스레드가 확장을 처리하는 노드 수인 16 이상입니다.

2. 원래 용량의 두 배로 배열을 만들고 확장 중 쿼리 작업을 위한 전환 노드를 구성합니다.

3. 확장 완료 여부를 판단하기 위해 주로 마무리 변수를 기반으로 노드를 전송하기 위해 무한 루프를 수행하며, 아래 표에서 서로 다른 인덱스를 설정하여 확장 작업을 수행합니다. 다른 스레드, 즉 다른 스레드 작업의 경우 배열 세그먼트가 다르며 동기화된 동기화 잠금을 사용하여 작업 노드를 잠그고 스레드 안전을 보장합니다.

4. 새 배열에서 노드의 실제 위치는 HashMap의 확장 논리와 동일합니다. 비트 연산을 통해 새 연결 목록이 원래 위치에 있는지 계산됩니다. 원래 위치 + 확장 길이에 대한 구체적인 분석은 내 기사를 확인하세요.

3. 요약

1. ConcurrentHashMap의 논리 코드는 대부분 HashMap과 동일하며 주로 동기화를 사용하고 노드 삽입 및 확장의 스레드 안전성을 보장합니다. 여기에 학생들이 있을 것입니다. 왜 동기화를 사용합니까? 낙관적 잠금을 사용하는 대신 잠금은 어떻습니까? 개인적으로 두 가지 이유가 있다고 생각합니다:

🎜🎜#a. 낙관적 잠금은 경쟁 충돌이 더 적은 시나리오에 더 적합합니다. 이는 지속적인 재시도로 이어져 성능이 향상됩니다. .

b.synchronized 최적화를 거친 후 성능은 실제로 잠금과 다르지 않으며 일부 시나리오에서는 잠금보다 빠를 수 있습니다. 그래서 이것이 동기화를 위해 동기화를 사용하는 이유라고 생각합니다.

2. ConcurrentHashMap의 핵심 확장 논리는 서로 다른 배열 첨자를 서로 다른 스레드에 할당한 다음 각 스레드가 자체 테이블 범위의 노드를 처리하는 것입니다. 동시에 처리 노드는 hashMap의 논리를 재사용하여 확장 후 노드의 위치를 ​​원래 위치 또는 원래 위치 + 이전 길이 위치에서 알 수 있으며 최종적으로 값을 직접 할당할 수 있습니다. .

위 내용은 ConcurrentHashmap이 스레드로부터 안전한 이유는 무엇입니까?의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

관련 라벨:
원천:php.cn
본 웹사이트의 성명
본 글의 내용은 네티즌들의 자발적인 기여로 작성되었으며, 저작권은 원저작자에게 있습니다. 본 사이트는 이에 상응하는 법적 책임을 지지 않습니다. 표절이나 침해가 의심되는 콘텐츠를 발견한 경우 admin@php.cn으로 문의하세요.
인기 튜토리얼
더>
최신 다운로드
더>
웹 효과
웹사이트 소스 코드
웹사이트 자료
프론트엔드 템플릿