ホームページ よくある問題 concurrenthashmap がスレッドセーフであるのはなぜですか?

concurrenthashmap がスレッドセーフであるのはなぜですか?

Mar 30, 2020 am 09:21 AM
concurrenthashmap

concurrenthashmap がスレッドセーフであるのはなぜですか?

1. HashMap と ConcurrentHashMap の比較

コードを使用して、HashMap のスレッドの安全性と ConcurrentHashMap のスレッドの安全性を証明します。コード ロジックは非常に単純です。10,000 のスレッドが開始されます。各スレッドは、キーを配置してからキーを削除するという単純な操作を実行します。理論的には、スレッドの安全性が維持される場合、最終的なマップ size() は 0 でなければなりません。

推奨チュートリアル: "java learning"

Map<Object,Object> myMap=new HashMap<>();
        // ConcurrentHashMap myMap=new ConcurrentHashMap();
        for (int i = 0; i <10000 ; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                        double a=Math.random();
                        myMap.put(a,a);
                        myMap.remove(a);
                }
            }).start();
        }
        Thread.sleep(15000l);//多休眠下,保证上面的线程操作完毕。
        System.out.println(myMap.size());
ログイン後にコピー

ここではマップのサイズ= 13 と表示されていますが、実際にはマップ内にキーがあります。 ConcurrentHashMap を使用して同じコードを実行します。結果のマップ ==0

これは、ConcurrentHashMap がスレッド セーフであることを証明しています。ConcurrentHashMap がどのようにスレッド セーフを保証するかをソース コードから分析してみましょう。今回は、ソース コードjdkのバージョンは1.8です。

2. ConcurrentHashMap のソース コード分析

3.1 ConcurrentHashMap の基本属性

//默认最大的容量 
private static final int MAXIMUM_CAPACITY = 1 << 30;
//默认初始化的容量
private static final int DEFAULT_CAPACITY = 16;
//最大的数组可能长度
static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
//默认的并发级别,目前并没有用,只是为了保持兼容性
private static final int DEFAULT_CONCURRENCY_LEVEL = 16;
//和hashMap一样,负载因子
private static final float LOAD_FACTOR = 0.75f;
//和HashMap一样,链表转换为红黑树的阈值,默认是8
static final int TREEIFY_THRESHOLD = 8;
//红黑树转换链表的阀值,默认是6
static final int UNTREEIFY_THRESHOLD = 6;
//进行链表转换最少需要的数组长度,如果没有达到这个数字,只能进行扩容
static final int MIN_TREEIFY_CAPACITY = 64;
//table扩容时, 每个线程最少迁移table的槽位个数
private static final int MIN_TRANSFER_STRIDE = 16;
//感觉是用来计算偏移量和线程数量的标记
private static int RESIZE_STAMP_BITS = 16;
//能够调整的最大线程数量
private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;
//记录偏移量
private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;
//值为-1, 当Node.hash为MOVED时, 代表着table正在扩容
static final int MOVED     = -1;
//TREEBIN, 置为-2, 代表此元素后接红黑树
static final int TREEBIN   = -2;
//感觉是占位符,目前没看出来明显的作用
static final int RESERVED  = -3;
//主要用来计算Hash值的
static final int HASH_BITS = 0x7fffffff; 
//节点数组
transient volatile Node<K,V>[] table;
//table迁移过程临时变量, 在迁移过程中将元素全部迁移到nextTable上
private transient volatile Node<K,V>[] nextTable;
//基础计数器
private transient volatile long baseCount;
//table扩容和初始化的标记,不同的值代表不同的含义,默认为0,表示未初始化
//-1: table正在初始化;小于-1,表示table正在扩容;大于0,表示初始化完成后下次扩容的大小
private transient volatile int sizeCtl;
//table容量从n扩到2n时, 是从索引n->1的元素开始迁移, transferIndex代表当前已经迁移的元素下标
private transient volatile int transferIndex;
//扩容时候,CAS锁标记
private transient volatile int cellsBusy;
//计数器表,大小是2次幂
private transient volatile CounterCell[] counterCells;
ログイン後にコピー

上記は ConcurrentHashMap の基本属性です。私たちのほとんどは HashMap と同じですが、いくつかの属性を追加します。後で、追加された属性がどのような役割を果たすかを分析しましょう。

2.2 ConcurrentHashMap の共通メソッド属性

put メソッド

final V putVal(K key, V value, boolean onlyIfAbsent) {
        //key和value不允许为null
        if (key == null || value == null) throw new NullPointerException();
        //计算hash值
        int hash = spread(key.hashCode());
        int binCount = 0;
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            //如果table没有初始化,进行初始化
            if (tab == null || (n = tab.length) == 0)
                tab = initTable();
            //计算数组的位置    
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                //如果该位置为空,构造新节点添加即可
                if (casTabAt(tab, i, null,
                             new Node<K,V>(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin
            }//如果正在扩容
            else if ((fh = f.hash) == MOVED)
            //帮着一起扩容
                tab = helpTransfer(tab, f);
            else {
            //开始真正的插入
                V oldVal = null;
                synchronized (f) {
                    if (tabAt(tab, i) == f) {
                    //如果已经初始化完成了
                        if (fh >= 0) {
                            binCount = 1;
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                                //这里key相同直接覆盖原来的节点
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node<K,V> pred = e;
                                //否则添加到节点的最后面
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K,V>(hash, key,  value, null);
                                    break;
                                }
                            }
                        }//如果节点是树节点,就进行树节点添加操作
                        else if (f instanceof TreeBin) {
                            Node<K,V> p;
                            binCount = 2;
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,alue)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }//判断节点是否要转换成红黑树
                if (binCount != 0) {
                    if (binCount >= TREEIFY_THRESHOLD)
                        treeifyBin(tab, i);//红黑树转换
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        //计数器,采用CAS计算size大小,并且检查是否需要扩容
        addCount(1L, binCount);
        return null;
    }
ログイン後にコピー

ConcurrentHashMap の put メソッドのロジックは HashMap のロジックとそれほど変わらないことがわかりました。主な理由は次のとおりです。新しいスレッド セーフティ パーツ。要素を追加するときは、同期を使用してスレッド セーフティを確保し、CAS 操作を使用してサイズを計算します。 put プロセス全体は比較的単純です。概要は次のとおりです:

1. キーと値が空かどうかを確認し、空の場合は直接例外をスローします。

2. テーブル配列が初期化されているかどうかを確認し、初期化されていない場合は初期化します。

3. キーのハッシュ値を計算し、その位置が空の場合は直接ノードを構築して入れます。

4. テーブルを展開している場合は、ヘルプの展開方法を入力します。

5. 最後に、同期ロックをオンにして挿入操作を実行します。上書きオプションがオンの場合は直接上書きします。そうでない場合は、ノードを構築して最後に追加します。ノード数が超過した場合は、ノードを構築します。赤黒ツリーのしきい値を指定して、赤黒ツリー変換を実行します。現在のノードがツリー ノードの場合は、ツリー挿入操作を実行します。

6. 最後に、サイズを数えて、拡張する必要があるかどうかを計算します。

get メソッド

public V get(Object key) {
        Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
        //计算hash值
        int h = spread(key.hashCode());
        //如果table已经初始化,并且计算hash值的索引位置node不为空
        if ((tab = table) != null && (n = tab.length) > 0 &&
            (e = tabAt(tab, (n - 1) & h)) != null) {
            //如果hash相等,key相等,直接返回该节点的value
            if ((eh = e.hash) == h) {
                if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                    return e.val;
            }//如果hash值为负值表示正在扩容,这个时候查的是ForwardingNode的find方法来定位到节点。
            else if (eh < 0)
                return (p = e.find(h, key)) != null ? p.val : null;
            //循环遍历链表,查询key和hash值相等的节点。
            while ((e = e.next) != null) {
                if (e.hash == h &&
                    ((ek = e.key) == key || (ek != null && key.equals(ek))))
                    return e.val;
            }
        }
        return null;
    }
ログイン後にコピー

get メソッドは比較的単純で、主な処理は次のとおりです:

1. ハッシュ値を直接計算します。検索対象のノードが等しい場合、ノードが直接返されます。ノードの値だけで十分です。

2. テーブルが展開している場合は、ForwardingNode の find メソッドを呼び出してノードを見つけます。

3. 展開がない場合は、リンク リストをループして、同じキーとハッシュ値を持つノード値を見つけます。

ConcurrentHashMap の拡張

ConcurrentHashMap の拡張はマルチスレッド処理を伴うため、HashMap の拡張に比べて比較的複雑です。ここでの拡張方法は主に転送です。ソースコードを解析してみましょう。この方法を使って勉強してください。拡張する方法は次のとおりです。

private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
        int n = tab.length, stride;
        //保证每个线程扩容最少是16,
        if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
            stride = MIN_TRANSFER_STRIDE; // subdivide range
        if (nextTab == null) {            // initiating
            try {
            //扩容2倍
                @SuppressWarnings("unchecked")
                Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
                nextTab = nt;
            } catch (Throwable ex) {      // try to cope with OOME
                //出现异常情况就不扩容了。
                sizeCtl = Integer.MAX_VALUE;
                return;
            }
            //用新数组对象接收
            nextTable = nextTab;
            //初始化扩容下表为原数组的长度
            transferIndex = n;
        }
        int nextn = nextTab.length;
        //扩容期间的过渡节点
        ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
        boolean advance = true;
        boolean finishing = false; // to ensure sweep before committing nextTab
        
        for (int i = 0, bound = 0;;) {
            Node<K,V> f; int fh;
            while (advance) {
                int nextIndex, nextBound;
                //如果该线程已经完成了
                if (--i >= bound || finishing)
                    advance = false;
                //设置扩容转移下标,如果下标小于0,说明已经没有区间可以操作了,线程可以退出了
                else if ((nextIndex = transferIndex) <= 0) {
                    i = -1;
                    advance = false;
                }CAS操作设置区间
                else if (U.compareAndSwapInt
                         (this, TRANSFERINDEX, nextIndex,
                          nextBound = (nextIndex > stride ?
                                       nextIndex - stride : 0))) {
                    bound = nextBound;
                    i = nextIndex - 1;
                    advance = false;
                }
            }
            //如果计算的区间小于0了,说明区间分配已经完成,没有剩余区间分配了
            if (i < 0 || i >= n || i + n >= nextn) {
                int sc;
                if (finishing) {//如果扩容完成了,进行收尾工作
                    nextTable = null;//清空临时数组
                    table = nextTab;//赋值原数组
                    sizeCtl = (n << 1) - (n >>> 1);//重新赋值sizeCtl
                    return;
                }//如果扩容还在进行,自己任务完成就进行sizeCtl-1,这里是因为,扩容是通过helpTransfer()和addCount()方法来调用的,在调用transfer()真正扩容之前,sizeCtl都会+1,所以这里每个线程完成后就进行-1。
                if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                //这里应该是判断扩容是否结束
                    if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                        return;
                    //结束,赋值状态
                    finishing = advance = true;
                    i = n; // recheck before commit
                }
            }//如果在table中没找到,就用过渡节点
            else if ((f = tabAt(tab, i)) == null)
                //成功设置就进入下一个节点
                advance = casTabAt(tab, i, null, fwd);
            else if ((fh = f.hash) == MOVED)
                //如果节点不为空,并且该位置的hash值为-1,表示已经处理了,直接进入下一个循环即可
                advance = true; // already processed
            else {
            //这里说明老table该位置不为null,也没有被处理过,进行真正的处理逻辑。进行同步锁
                synchronized (f) {
                    if (tabAt(tab, i) == f) {
                        Node<K,V> ln, hn;
                        //如果hash值大于0
                        if (fh >= 0) {
                        //为运算结果
                            int runBit = fh & n;
                            Node<K,V> lastRun = f;
                            for (Node<K,V> p = f.next; p != null; p = p.next) {
                                int b = p.hash & n;
                                if (b != runBit) {
                                    runBit = b;
                                    lastRun = p;
                                }
                            }
                            if (runBit == 0) {
                                ln = lastRun;
                                hn = null;
                            }
                            else {
                                hn = lastRun;
                                ln = null;
                            }
                            for (Node<K,V> p = f; p != lastRun; p = p.next) {
                                int ph = p.hash; K pk = p.key; V pv = p.val;
                                //这里的逻辑和hashMap是一样的,都是采用2个链表进行处理,具体分析可以查看我分析HashMap的文章
                                if ((ph & n) == 0)
                                    ln = new Node<K,V>(ph, pk, pv, ln);
                                else
                                    hn = new Node<K,V>(ph, pk, pv, hn);
                            }
                            setTabAt(nextTab, i, ln);
                            setTabAt(nextTab, i + n, hn);
                            setTabAt(tab, i, fwd);
                            advance = true;
                        }//如果是树节点,执行树节点的扩容数据转移
                        else if (f instanceof TreeBin) {
                            TreeBin<K,V> t = (TreeBin<K,V>)f;
                            TreeNode<K,V> lo = null, loTail = null;
                            TreeNode<K,V> hi = null, hiTail = null;
                            int lc = 0, hc = 0;
                            for (Node<K,V> e = t.first; e != null; e = e.next) {
                                int h = e.hash;
                                TreeNode<K,V> p = new TreeNode<K,V>
                                    (h, e.key, e.val, null, null);
                                //也是通过位运算判断两个链表的位置    
                                if ((h & n) == 0) {
                                    if ((p.prev = loTail) == null)
                                        lo = p;
                                    else
                                        loTail.next = p;
                                    loTail = p;
                                    ++lc;
                                }
                                else {
                                    if ((p.prev = hiTail) == null)
                                        hi = p;
                                    else
                                        hiTail.next = p;
                                    hiTail = p;
                                    ++hc;
                                }
                            }
                            //这里判断是否进行树转换
                            ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                                (hc != 0) ? new TreeBin<K,V>(lo) : t;
                            hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                                (lc != 0) ? new TreeBin<K,V>(hi) : t;
                           //这里把新处理的链表赋值到新数组中
                            setTabAt(nextTab, i, ln);
                            setTabAt(nextTab, i + n, hn);
                            setTabAt(tab, i, fwd);
                            advance = true;
                        }
                    }
                }
            }
        }
    }
ログイン後にコピー

ConcurrentHashMap の拡張はまだ比較的複雑です。複雑さは主にマルチスレッド拡張の制御レベルに反映されています。拡張のソース コードを詳しく分析したわけではありません。は確かにもっと複雑です。いくつか問題があります。それは非常に明確です。一方で、私たちの研究は主にそのアイデアを理解することにあると思います。キーとなるコードとキーとなるアイデアを理解できる限り。そうでない限り。同様の関数のセットを再実装する場合は、すべての詳細を心配する必要はないと思います。要約すると、ConcurrentHashMap の拡張ステップは次のとおりです:

1. スレッド拡張処理ステップ サイズ (単一スレッドが拡張を処理できるノードの数である少なくとも 16) を取得します。

2. 元の容量の 2 倍の新しい配列を作成し、拡張中のクエリ操作のための遷移ノードを構築します。

3. 無限ループを実行し、主に拡張が完了したかどうかを決定する終了変数に基づいてノードを転送します。拡張期間中は、ノードごとに次の表の異なるインデックスを設定して拡張操作が実行されます。スレッド、つまり異なるスレッド、操作の配列 セグメントは異なり、スレッドの安全性を確保するために、同期された同期ロックを使用して動作ノードをロックします。

4. 新しい配列内のノードの実際の位置は、HashMap の展開ロジックと同じであり、新しいリンク リストが元の位置にあるか、元の位置にあるかをビット演算によって計算します。具体的な分析については、私のこの記事を参照してください。

3. 概要

1. ConcurrentHashMap のロジック コードのほとんどは HashMap と同じです. 主に synchronized と ノードの挿入と展開のスレッドの安全性を確保するために使用されます.ここで質問してください。なぜ同期を使用するのですか?楽観的ロックを使用する代わりに、ロックはどうでしょうか?個人的には、考えられる理由が 2 つあると考えています:

a. オプティミスティック ロックは、競合の競合が少ないシナリオに適しています。競合が多い場合は、継続的な再試行が発生し、パフォーマンスが低下します。

b. 最適化後の同期のパフォーマンスはロックと変わりませんが、シナリオによってはロックより高速になる場合があります。なので、これが同期にsynchronizedを使う理由だと思います。

2. ConcurrentHashMap のコア拡張ロジックは、異なる配列添字を異なるスレッドに割り当て、各スレッドが独自のテーブル範囲内のノードを処理することです。同時に、処理ノードは hashMap のロジックを再利用し、ビット操作を通じて拡張後のノードの位置 (元の位置または古い長さの位置) を知ることができ、最終的に値を直接代入します。

以上がconcurrenthashmap がスレッドセーフであるのはなぜですか?の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

このウェブサイトの声明
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。

ホットAIツール

Undresser.AI Undress

Undresser.AI Undress

リアルなヌード写真を作成する AI 搭載アプリ

AI Clothes Remover

AI Clothes Remover

写真から衣服を削除するオンライン AI ツール。

Undress AI Tool

Undress AI Tool

脱衣画像を無料で

Clothoff.io

Clothoff.io

AI衣類リムーバー

AI Hentai Generator

AI Hentai Generator

AIヘンタイを無料で生成します。

ホットツール

メモ帳++7.3.1

メモ帳++7.3.1

使いやすく無料のコードエディター

SublimeText3 中国語版

SublimeText3 中国語版

中国語版、とても使いやすい

ゼンドスタジオ 13.0.1

ゼンドスタジオ 13.0.1

強力な PHP 統合開発環境

ドリームウィーバー CS6

ドリームウィーバー CS6

ビジュアル Web 開発ツール

SublimeText3 Mac版

SublimeText3 Mac版

神レベルのコード編集ソフト(SublimeText3)

詳細な検索Deepseek公式ウェブサイトの入り口 詳細な検索Deepseek公式ウェブサイトの入り口 Mar 12, 2025 pm 01:33 PM

2025年の初めに、国内のAI「Deepseek」が驚くべきデビューを果たしました!この無料のオープンソースAIモデルは、OpenAIのO1の公式バージョンに匹敵するパフォーマンスを備えており、Webサイド、アプリ、APIで完全に起動され、iOS、Android、およびWebバージョンのマルチターミナル使用をサポートしています。 DeepSeekの公式Webサイトおよび使用ガイドの詳細な検索:公式Webサイトアドレス:https://www.deepseek.com/sing for webバージョンの手順:上記のリンクをクリックして、DeepSeekの公式Webサイトを入力します。ホームページの[会話の開始]ボタンをクリックします。最初に使用するには、携帯電話検証コードでログインする必要があります。ログインした後、ダイアログインターフェイスを入力できます。 DeepSeekは強力で、コードを書き、ファイルを読み取り、コードを作成できます

Deepseek Webバージョンの公式入り口 Deepseek Webバージョンの公式入り口 Mar 12, 2025 pm 01:42 PM

国内のAI Dark Horse Deepseekは強く上昇し、世界のAI業界に衝撃を与えました! 1年半しか設立されていないこの中国の人工知能会社は、無料でオープンソースのモックアップであるDeepseek-V3とDeepseek-R1で世界的なユーザーから広く称賛されています。 Deepseek-R1は完全に発売され、パフォーマンスはOpenAio1の公式バージョンに匹敵します! Webページ、アプリ、APIインターフェイスで強力な機能を体験できます。ダウンロード方法:iOSおよびAndroidシステムをサポートすると、ユーザーはApp Storeを介してダウンロードできます。 Deepseek Webバージョン公式入り口:HT

deepseekの忙しいサーバーの問題を解決する方法 deepseekの忙しいサーバーの問題を解決する方法 Mar 12, 2025 pm 01:39 PM

DeepSeek:サーバーに混雑している人気のあるAIを扱う方法は? 2025年のホットAIとして、Deepseekは無料でオープンソースであり、OpenAio1の公式バージョンに匹敵するパフォーマンスを備えており、その人気を示しています。ただし、高い並行性は、サーバーの忙しさの問題ももたらします。この記事では、理由を分析し、対処戦略を提供します。 Deepseek Webバージョンの入り口:https://www.deepseek.com/deepseekサーバーに忙しい理由:高い並行アクセス:Deepseekの無料で強力な機能が同時に使用する多数のユーザーを引き付け、サーバーの負荷が過剰になります。サイバー攻撃:Deepseekが米国の金融産業に影響を与えることが報告されています。