final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) { int h; if ((h = getProbe()) == 0) { ThreadLocalRandom.current(); // force initialization h = getProbe(); wasUncontended = true; } boolean collide = false; // True if last slot nonempty for (;;) { Cell[] as; Cell a; int n; long v; if ((as = cells) != null && (n = as.length) > 0) { if ((a = as[(n - 1) & h]) == null) { if (cellsBusy == 0) { // Try to attach new Cell Cell r = new Cell(x); // Optimistically create if (cellsBusy == 0 && casCellsBusy()) { boolean created = false; try { // Recheck under lock Cell[] rs; int m, j; if ((rs = cells) != null && (m = rs.length) > 0 && rs[j = (m - 1) & h] == null) { rs[j] = r; created = true; } } finally { cellsBusy = 0; } if (created) break; continue; // Slot is now non-empty } } collide = false; } else if (!wasUncontended) // CAS already known to fail wasUncontended = true; // Continue after rehash else if (a.cas(v = a.value, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) break; else if (n >= NCPU || cells != as) collide = false; // At max size or stale else if (!collide) collide = true; else if (cellsBusy == 0 && casCellsBusy()) { try { if (cells == as) { // Expand table unless stale Cell[] rs = new Cell[n << 1]; for (int i = 0; i < n; ++i) rs[i] = as[i]; cells = rs; } } finally { cellsBusy = 0; } collide = false; continue; // Retry with expanded table } h = advanceProbe(h); } else if (cellsBusy == 0 && cells == as && casCellsBusy()) { boolean init = false; try { // Initialize table if (cells == as) { Cell[] rs = new Cell[2]; rs[h & 1] = new Cell(x); cells = rs; init = true; } } finally { cellsBusy = 0; } if (init) break; } else if (casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) break; // Fall back on using base } }
程式碼較長,我們分段來分析,先介紹一下各部分的內容
第一部分:for
循環之前的程式碼,主要是取得執行緒的hash值,如果是0的話就強制初始化
#第二部分:for
迴圈中第一個if
語句,在Cell
陣列中進行累加、擴容
第三部分:for
循環中第一個else if
語句,這部分的作用是建立Cell
陣列並初始化
第四部分:for
循環中第二個else if
語句,當Cell
陣列競爭激烈時嘗試在base
上進行累加
int h; if ((h = getProbe()) == 0) { ThreadLocalRandom.current(); // force initialization h = getProbe(); wasUncontended = true; // true表示没有竞争 } boolean collide = false; // True if last slot nonempty 可以理解为是否需要扩容
這部分的核心程式碼是getProbe
方法,這個方法的作用就是取得執行緒的hash
值,方便後面透過位元運算定位到Cell
陣列中某個位置,如果是0
的話就會進行強制初始化
final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) { // 省略... for (;;) { Cell[] as; Cell a; int n; long v; if ((as = cells) != null && (n = as.length) > 0) { // 省略... } else if (cellsBusy == 0 && cells == as && casCellsBusy()) { // 获取锁 boolean init = false; // 初始化标志 try { // Initialize table if (cells == as) { Cell[] rs = new Cell[2]; // 创建Cell数组 rs[h & 1] = new Cell(x); // 索引1位置创建Cell元素,值为x=1 cells = rs; // cells指向新数组 init = true; // 初始化完成 } } finally { cellsBusy = 0; // 释放锁 } if (init) break; // 跳出循环 } else if (casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) break; // Fall back on using base } }
第一種情況下Cell
陣列為null
,所以會進入第一個else if
語句,並且沒有其他執行緒可以操作,所以cellsBusy==0
, cells==as
也是true
,casCellsBusy()
嘗試對cellsBusy
進行cas
操作改成1
也是true
。
首先建立了一個有兩個元素的Cell
數組,然後透過執行緒h & 1
的位元運算在索引1
的位置設定一個value
為1
的Cell
,然後重新賦值給cells
,標記初始化成功,修改cellsBusy
為0
表示釋放鎖,最後跳出循環,初始化操作就完成了。
final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) { // 省略... for (;;) { Cell[] as; Cell a; int n; long v; if ((as = cells) != null && (n = as.length) > 0) { // 省略... } else if (cellsBusy == 0 && cells == as && casCellsBusy()) { // 省略... } else if (casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) break; // Fall back on using base } }
第二個else if
語句的意思是當Cell
陣列中所有位置競爭都很激烈時,就嘗試在base
上進行累加,可以理解為最後的保障
final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) { // 省略... for (;;) { Cell[] as; Cell a; int n; long v; if ((as = cells) != null && (n = as.length) > 0) { // as初始化之后满足条件 if ((a = as[(n - 1) & h]) == null) { // as中某个位置的值为null if (cellsBusy == 0) { // Try to attach new Cell 是否加锁 Cell r = new Cell(x); // Optimistically create 创建新Cell if (cellsBusy == 0 && casCellsBusy()) { // 双重检查是否有锁,并尝试加锁 boolean created = false; // try { // Recheck under lock Cell[] rs; int m, j; if ((rs = cells) != null && (m = rs.length) > 0 && rs[j = (m - 1) & h] == null) { // 重新检查该位置是否为null rs[j] = r; // 该位置添加Cell元素 created = true; // 新Cell创建成功 } } finally { cellsBusy = 0; // 释放锁 } if (created) break; // 创建成功,跳出循环 continue; // Slot is now non-empty } } collide = false; // 扩容标志 } else if (!wasUncontended) // 上面定位到的索引位置的值不为null wasUncontended = true; // 重新计算hash,重新定位其他索引位置重试 else if (a.cas(v = a.value, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) // 尝试在该索引位置进行累加 break; else if (n >= NCPU || cells != as) // 如果数组长度大于等于CPU核心数,就不能在扩容 collide = false; // At max size or stale else if (!collide) // 数组长度没有达到最大值,修改扩容标志可以扩容 collide = true; else if (cellsBusy == 0 && casCellsBusy()) { // 尝试加锁 try { if (cells == as) { // Expand table unless stale Cell[] rs = new Cell[n << 1]; // 创建一个原来长度2倍的数组 for (int i = 0; i < n; ++i) rs[i] = as[i]; // 把原来的元素拷贝到新数组中 cells = rs; // cells指向新数组 } } finally { cellsBusy = 0; // 释放锁 } collide = false; // 已经扩容完成,修改标志不用再扩容 continue; // Retry with expanded table } h = advanceProbe(h); // 重新获取hash值 } // 省略... }
根據程式碼中的註解分析一遍整體邏輯
#首先如果找到陣列某個位置上的值為null
,說明可以在這個位置進行操作,就建立一個新的Cell
並初始化值為1
放到這個位置,如果失敗了就重新計算hash
值再重試
定位到的位置已經有值了,說明線程之間產生了競爭,如果wasUncontended
是false
就修改為true
,並重新計算hash
重試
定位的位置有值且wasUncontended
已經是true
,就嘗試在該位置進行累加
當累加失敗時,判斷數組容量是否已經達到最大,如果是就不能進行擴容,只能rehash
並重試
如果前面條件都不滿足,而擴容標誌collide
標記為false
的話就修改為true
,表示可以進行擴充,然後rehash
重試
先嘗試加鎖,成功了就進行擴容操作,每次擴容長度是之前的2
倍,然後把原來數組內容拷貝到新數組,擴容就完成了。
以上是Java並發程式設計中LongAdder的執行情況是怎樣的的詳細內容。更多資訊請關注PHP中文網其他相關文章!