複数のスレッドがクラスにアクセスする場合、ランタイム環境がどのようなスケジューリング方法を使用するか、またはこれらのプロセスがどのように交互に実行するかは問題ではなく、呼び出し側のコードや調整で追加の同期は必要ありません。このクラスが正しい動作を示すことができる場合、このクラスはスレッドセーフであると言われます。
原子性: 相互排他的なアクセスを提供し、1 つのスレッドのみがアクセスできます。同時に動作します
可視性: 1 つのスレッドによるメイン メモリへの変更は、他のスレッドによってタイムリーに監視できます
秩序性:スレッドは他のスレッドでの命令の実行順序を観察します。命令の並べ替えが存在するため、観察結果は一般に乱雑で無秩序になります
Atomic パッケージには多くの Atomicxxx クラスが用意されています:
それらはすべて CAS によって実装されています(compareAndSwap) アトミック。まず、次のような簡単な例を作成します。
@Slf4j public class AtomicExample1 { // 请求总数 public static int clientTotal = 5000; // 同时并发执行的线程数 public static int threadTotal = 200; public static AtomicInteger count = new AtomicInteger(0); public static void main(String[] args) throws Exception { ExecutorService executorService = Executors.newCachedThreadPool(); final Semaphore semaphore = new Semaphore(threadTotal); final CountDownLatch countDownLatch = new CountDownLatch(clientTotal); for (int i = 0; i < clientTotal ; i++) { executorService.execute(() -> { try { semaphore.acquire(); add(); semaphore.release(); } catch (Exception e) { log.error("exception", e); } countDownLatch.countDown(); }); } countDownLatch.await(); executorService.shutdown(); log.info("count:{}", count.get()); } private static void add() { count.incrementAndGet(); } }
public class AtomicInteger extends Number implements java.io.Serializable { private static final long serialVersionUID = 6214790243416807050L; // setup to use Unsafe.compareAndSwapInt for updates private static final Unsafe unsafe = Unsafe.getUnsafe(); private static final long valueOffset; static { try { valueOffset = unsafe.objectFieldOffset (AtomicInteger.class.getDeclaredField("value")); } catch (Exception ex) { throw new Error(ex); } } private volatile int value; ... 此处省略多个方法... /** * Atomically increments by one the current value. * * @return the updated value */ public final int incrementAndGet() { return unsafe.getAndAddInt(this, valueOffset, 1) + 1; } }
public final class Unsafe { private static final Unsafe theUnsafe; ....此处省略很多方法及成员变量.... public final int getAndAddInt(Object var1, long var2, int var4) { int var5; do { var5 = this.getIntVolatile(var1, var2); } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4)); return var5; } public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5); public native int getIntVolatile(Object var1, long var2); }
0 1=0 の操作を実行したいとします。単一スレッドの各パラメータの値は次のとおりです:
更新:
compareAndSwapInt() メソッドの最初のパラメータ (var1)は現在のオブジェクトであり、コード例では count です。このときの値は0(期待値)です。 2 番目の値 (var2) は渡された valueOffset 値で、値は 12 です。 3 番目のパラメータ (var4) は定数 1 です。メソッド内の変数パラメーター (var5) は、パラメーター 1 とパラメーター 2 の valueOffset に基づいて、基になる getIntVolatile メソッドを呼び出すことによって取得された値であり、この時点では、その値は 0 です。 CompareAndSwapInt() が達成したい目標は、count オブジェクトについて、現在の期待値 var1 の値が基になる戻り値 (var5) と同じである場合、それを値 var5 var4 に更新することです。異なる場合は、現在の値が期待値と同じになるまでリサイクルして期待値 (var5) を取得し、更新します。 CompareAndSwap メソッドの中核は、通常 CAS と呼ばれるものです。 AtomicLong など、Atomic パッケージ内の他のクラスの実装原則は、基本的に上記と同じです。 ここでは LongAdder クラスを紹介します。上記の分析を通じて、AtomicLong が CAS を使用し、変更が成功するまで無限ループでターゲット値の変更を常に試行していることがわかりました。競争が激しくなければ、改造が成功する確率は非常に高くなります。一方、競争が激しい状況で変更失敗の確率が高い場合は、複数回のループ試行が行われるため、パフォーマンスに影響します。 一般的なタイプの long 変数と double 変数の場合、jvm では 64 ビットの読み取り操作または書き込み操作を 2 つの 32 ビット操作に分割できます。 LongAdder の中心的なアイデアは、ホットスポット データを分離することであり、AtomicLong の内部コア データ値を配列に分離し、各スレッドがそれにアクセスするときにハッシュやその他のアルゴリズムを通じてカウントするための数値の 1 つにマッピングできます。最終的なカウント結果は、この配列の合計と累積です。その中で、ホットスポット データ値は複数のセルに分割されます。各セルは、その内部値を独立して保持します。現在のオブジェクトの実際の値は、すべてのセルによって累積および合成されます。 。このようにして、ホットスポットが効果的に分離され、並列度が向上します。 LongAdder は、AtomicLong に基づいてシングルポイント更新の圧力を各ノードに分散することに相当し、同時実行性が低い場合には、ベースに直接更新することで、基本的に Atomic と一致したパフォーマンスを確保できます。同時実行性が高い場合、分散化によってパフォーマンスが向上します。ただし、統計処理中に同時更新が行われた場合、統計データに誤差が生じる可能性があります。 実際の同時実行数が多い場合は、LongAdder を最初に使用できます。 AtomicLong は、並列性が低い場合、または正確な値が必要な場合に最初に使用でき、より効率的です。以下は、Atomic パッケージでの AtomicReference の簡単な使用法を示す簡単なデモンストレーションです。
@Slf4j public class AtomicExample4 { private static AtomicReference<Integer> count = new AtomicReference<>(0); public static void main(String[] args) { count.compareAndSet(0, 2); count.compareAndSet(0, 1); log.info("count:{}", count.get()); } }
compareAndSet()分别传入的是预期值跟更新值,只有当预期值跟当前值相等时,才会将值更新为更新值;
上面的第一个方法可以将值更新为2,而第二个步中无法将值更新为1。
下面简单介绍下AtomicIntegerFieldUpdater 用法(利用原子性去更新某个类的实例):
@Slf4j public class AtomicExample5 { private static AtomicIntegerFieldUpdater<AtomicExample5> updater = AtomicIntegerFieldUpdater.newUpdater(AtomicExample5.class, "count"); @Getter private volatile int count = 100; public static void main(String[] args) { AtomicExample5 example5 = new AtomicExample5(); if (updater.compareAndSet(example5, 100, 120)) { log.info("update success 1, {}", example5.getCount()); } if (updater.compareAndSet(example5, 100, 120)) { log.info("update success 2, {}", example5.getCount()); } else { log.info("update failed, {}", example5.getCount()); } } }
它可以更新某个类中指定成员变量的值。
注意:修改的成员变量需要用volatile关键字来修饰,并且不能是static描述的字段。
AtomicStampReference这个类它的核心是要解决CAS的ABA问题(CAS操作的时候,其他线程将变量的值A改成了B,接着又改回了A,等线程使用期望值A与当前变量进行比较的时候,发现A变量没有变,于是CAS就将A值进行了交换操作。
实际上该值已经被其他线程改变过)。
ABA问题的解决思路就是每次变量变更的时候,就将版本号加一。
看一下它的一个核心方法compareAndSet():
public class AtomicStampedReference<V> { private static class Pair<T> { final T reference; final int stamp; private Pair(T reference, int stamp) { this.reference = reference; this.stamp = stamp; } static <T> Pair<T> of(T reference, int stamp) { return new Pair<T>(reference, stamp); } } ... 此处省略多个方法 .... public boolean compareAndSet(V expectedReference, V newReference, int expectedStamp, int newStamp) { Pair<V> current = pair; return expectedReference == current.reference && expectedStamp == current.stamp && ((newReference == current.reference && newStamp == current.stamp) || casPair(current, Pair.of(newReference, newStamp))); } }
可以看到它多了一个stamp的比较,stamp的值是由每次更新的时候进行维护的。
再介绍下AtomicLongArray,它维护了一个数组。在该数组下,我们可以选择性的已原子性操作更新某个索引对应的值。
public class AtomicLongArray implements java.io.Serializable { private static final long serialVersionUID = -2308431214976778248L; private static final Unsafe unsafe = Unsafe.getUnsafe(); ...此处省略.... /** * Atomically sets the element at position {@code i} to the given value * and returns the old value. * * @param i the index * @param newValue the new value * @return the previous value */ public final long getAndSet(int i, long newValue) { return unsafe.getAndSetLong(array, checkedByteOffset(i), newValue); } /** * Atomically sets the element at position {@code i} to the given * updated value if the current value {@code ==} the expected value. * * @param i the index * @param expect the expected value * @param update the new value * @return {@code true} if successful. False return indicates that * the actual value was not equal to the expected value. */ public final boolean compareAndSet(int i, long expect, long update) { return compareAndSetRaw(checkedByteOffset(i), expect, update); } }
最后再写一个AtomcBoolean的简单使用:
@Slf4j public class AtomicExample6 { private static AtomicBoolean isHappened = new AtomicBoolean(false); // 请求总数 public static int clientTotal = 5000; // 同时并发执行的线程数 public static int threadTotal = 200; public static void main(String[] args) throws Exception { ExecutorService executorService = Executors.newCachedThreadPool(); final Semaphore semaphore = new Semaphore(threadTotal); final CountDownLatch countDownLatch = new CountDownLatch(clientTotal); for (int i = 0; i < clientTotal ; i++) { executorService.execute(() -> { try { semaphore.acquire(); test(); semaphore.release(); } catch (Exception e) { log.error("exception", e); } countDownLatch.countDown(); }); } countDownLatch.await(); executorService.shutdown(); log.info("isHappened:{}", isHappened.get()); } private static void test() { if (isHappened.compareAndSet(false, true)) { log.info("execute"); } } }
以上がJavaでアトミック関数を使用するにはどうすればよいですか?の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。