推奨事項 123: volatile はデータの同期を保証できません
volatile キーワードは 2 つの理由から比較的まれに使用されます。まず、Java 1.5 より前では、このキーワードのパフォーマンスがオペレーティング システムごとに異なり、移植性が低いという問題がありました。は設計がより難しく、誤用がより頻繁に行われるため、その「評判」も傷つきます。
各スレッドはスタック メモリで実行され、各スレッドには独自の作業メモリ (レジスタ、キャッシュなどの作業メモリ) があり、一般に、スレッドの計算は作業メモリを通じて相互作用することがわかっています。以下の図:
概略図から、スレッドは初期化中に必要な変数値をメインメモリから作業メモリにロードし、その後、スレッドの実行中に読み取られた場合にその値をロードすることがわかります。これは、JVM の単純なメモリ モデルですが、このような構造には問題があります。マルチスレッドの場合、次のような問題が発生する可能性があります。スレッド A は変数の値を変更してメイン メモリに更新しますが、この間、スレッド B と C は依然としてこのスレッドの作業メモリを読み取ります。読み取られる値は「最新」ではありません。現時点では、異なるスレッドが保持するパブリック リソースは同期されていません。
この種の問題には、同期された同期コード ブロックを使用するか、Lock ロックを使用してこの問題を解決するなど、多くの解決策があります。ただし、Java では、volatile キーを追加するなど、volatile を使用してこの種の問題をより簡単に解決できます。変数の前に配置すると、各スレッドのローカル変数へのアクセスと変更が、このスレッドの作業メモリではなくメモリと直接対話するようになり、各スレッドが最も「新鮮な」変数値、その回路図を取得できるようになります。図は次のとおりです:
volatile 変数の原理を理解して、それについて考えてみましょう: volatile 変数はデータの同期を保証できますか? 2 つのスレッドが volatile を同時に変更すると、ダーティ データが生成されますか?次のコードを見てみましょう:
class UnsafeThread implements Runnable { // 共享资源 private volatile int count = 0; @Override public void run() { // 增加CPU的繁忙程度,不必关心其逻辑含义 for (int i = 0; i < 1000; i++) { Math.hypot(Math.pow(92456789, i), Math.cos(i)); } count++; } public int getCount() { return count; } }
上記のコードは、run メソッドの主なロジックは共有リソース数の自己インクリメント操作であり、volatile キーワードも追加します。 count 変数を使用して、メモリから確実に読み取られるようにします。読み取りと書き込みの場合、複数のスレッドが実行されている場合、つまり、複数のスレッドが count 変数の自己インクリメント操作を実行している場合、count 変数はダーティ データを生成しますか?考えてみてください。カウントするために volatile キーワードが追加されました。マルチスレッドをシミュレートするコードは次のとおりです:
public static void main(String[] args) throws InterruptedException { // 理想值,并作为最大循环次数 int value = 1000; // 循环次数,防止造成无限循环或者死循环 int loops = 0; // 主线程组,用于估计活动线程数 ThreadGroup tg = Thread.currentThread().getThreadGroup(); while (loops++ < value) { // 共享资源清零 UnsafeThread ut = new UnsafeThread(); for (int i = 0; i < value; i++) { new Thread(ut).start(); } // 先等15毫秒,等待活动线程为1 do { Thread.sleep(15); } while (tg.activeCount() != 1); // 检查实际值与理论值是否一致 if (ut.getCount() != value) { // 出现线程不安全的情况 System.out.println("循环到:" + loops + " 遍,出现线程不安全的情况"); System.out.println("此时,count= " + ut.getCount()); System.exit(0); } } }
揮発性変数を「醜い」ものにするためには、まだある程度の努力が必要です。このプログラムの実行ロジックは次のとおりです:
100 個のスレッドを開始し、共有リソース数の値を変更します
15 秒間一時停止し、アクティブなスレッドの数が 1 かどうか (つまり、メイン スレッドのみが残っているかどうかを観察します)実行する)、1 でない場合は、さらに 15 秒待ちます。
共有リソースが安全でないかどうか、つまり実際の値が理想値と同じであるかどうかを判断し、そうでない場合は、この時点での count の値がダーティ データであることがわかります。
見つからない場合は、最大ループに達するまでループを続けます。
実行結果は以下の通りです:
ループ: 40回、スレッド安全でない状況が発生します
このとき、count= 999
これは可能な結果にすぎず、実行ごとに異なる結果が生じる可能性があります。これは、count 変数がデータ同期を実装していないことも示しています。複数のスレッドによって変更された場合、count の実際の値は理論値から逸脱します。これは、volatile キーワードがスレッドの安全性を保証できないことを直接示しています。
理由を説明する前に、まず自己追加操作について話しましょう。 count++ が意味するのは、最初に count の値を取り出し、次に 1 を加算すること、つまり count=count+1 であるため、次のような魔法のようなことが特定の即時セグメントで起こります:
(1)、初めて。セグメント
スレッド A は、volatile というキーワードで変更されているため、メインメモリから count の最新値である 998 を取得します。 次に 2 つのタイプに分けられます。 CPU が 1 つである場合、スケジューラはスレッド A の実行を一時停止し、スレッド B に実行機会を与えます。そのため、スレッド B もカウント 998 の最新値を取得します。
複数の CPU がある場合、スレッド A はこの時点で実行を継続します。同時にスレッド B も count の最新値 998 を取得します。
(2)、2 番目のフラグメント
単一の CPU の場合、スレッド B は +1 操作を完了しています (これはアトミック処理です)。揮発性型の変数なので直接書きます。メインメモリに入り、その後Aスレッドが実行を続け、計算結果も999となりメインメモリに書き換えられます。
複数のCPUがある場合、スレッドAは1を加算するアクションの実行後にメインメモリ内の変数カウントを999に変更し、スレッドBも実行完了後にメインメモリ内の変数カウントを999に変更します
这两个时间片段执行完毕后,原本期望的结果为1000,单运行后的值为999,这表示出现了线程不安全的情况。这也是我们要说明的:volatile关键字并不能保证线程安全,它只能保证当前线程需要该变量的值时能够获得最新的值,而不能保证线程修改的安全性。
顺便说一下,在上面的代码中,UnsafeThread类的消耗CPU计算是必须的,其目的是加重线程的负荷,以便出现单个线程抢占整个CPU资源的情景,否则很难模拟出volatile线程不安全的情况,大家可以自行模拟测试。
回到顶部
建议124:异步运算考虑使用Callable接口
多线程应用有两种实现方式,一种是实现Runnable接口,另一种是继承Thread类,这两个方法都有缺点:run方法没有返回值,不能抛出异常(这两个缺点归根到底是Runnable接口的缺陷,Thread类也实现了Runnable接口),如果需要知道一个线程的运行结果就需要用户自行设计,线程类本身也不能提供返回值和异常。但是从Java1.5开始引入了一个新的接口Callable,它类似于Runnable接口,实现它就可以实现多线程任务,Callable的接口定义如下:
public interface Callable<V> { /** * Computes a result, or throws an exception if unable to do so. * * @return computed result * @throws Exception if unable to compute a result */ V call() throws Exception; }
实现Callable接口的类,只是表明它是一个可调用的任务,并不表示它具有多线程运算能力,还是需要执行器来执行的,我们先编写一个任务类,代码如下:
//税款计算器 class TaxCalculator implements Callable<Integer> { // 本金 private int seedMoney; // 接收主线程提供的参数 public TaxCalculator(int _seedMoney) { seedMoney = _seedMoney; } @Override public Integer call() throws Exception { // 复杂计算,运行一次需要2秒 TimeUnit.MILLISECONDS.sleep(2000); return seedMoney / 10; } }
这里模拟了一个复杂运算:税款计算器,该运算可能要花费10秒钟的时间,此时不能让用户一直等着吧,需要给用户输出点什么,让用户知道系统还在运行,这也是系统友好性的体现:用户输入即有输出,若耗时较长,则显示运算进度。如果我们直接计算,就只有一个main线程,是不可能有友好提示的,如果税金不计算完毕,也不会执行后续动作,所以此时最好的办法就是重启一个线程来运算,让main线程做进度提示,代码如下:
public static void main(String[] args) throws InterruptedException, ExecutionException { // 生成一个单线程的异步执行器 ExecutorService es = Executors.newSingleThreadExecutor(); // 线程执行后的期望值 Future<Integer> future = es.submit(new TaxCalculator(100)); while (!future.isDone()) { // 还没有运算完成,等待200毫秒 TimeUnit.MICROSECONDS.sleep(200); // 输出进度符号 System.out.print("*"); } System.out.println("\n计算完成,税金是:" + future.get() + " 元 "); es.shutdown(); }
在这段代码中,Executors是一个静态工具类,提供了异步执行器的创建能力,如单线程异步执行器newSingleThreadExecutor、固定线程数量的执行器newFixedThreadPool等,一般它是异步计算的入口类。future关注的是线程执行后的结果,比如没有运行完毕,执行结果是多少等。此段代码的运行结果如下所示:
**********************************************......
计算完成,税金是:10 元
执行时,"*"会依次递增,表示系统正在运算,为用户提供了运算进度,此类异步计算的好处是:
尽可能多的占用系统资源,提供快速运算
可以监控线程的执行情况,比如是否执行完毕、是否有返回值、是否有异常等。
可以为用户提供更好的支持,比如例子中的运算进度等。
回到顶部
建议125:优先选择线程池
在Java1.5之前,实现多线程比较麻烦,需要自己启动线程,并关注同步资源,防止出现线程死锁等问题,在1.5版本之后引入了并行计算框架,大大简化了多线程开发。我们知道一个线程有五个状态:新建状态(NEW)、可运行状态(Runnable,也叫作运行状态)、阻塞状态(Blocked)、等待状态(Waiting)、结束状态(Terminated),线程的状态只能由新建转变为了运行状态后才能被阻塞或等待,最后终结,不可能产生本末倒置的情况,比如把一个结束状态的线程转变为新建状态,则会出现异常,例如如下代码会抛出异常:
public static void main(String[] args) throws InterruptedException { // 创建一个线程,新建状态 Thread t = new Thread(new Runnable() { @Override public void run() { System.out.println("线程正在运行"); } }); // 运行状态 t.start(); // 是否是运行状态,若不是则等待10毫秒 while (!t.getState().equals(Thread.State.TERMINATED)) { TimeUnit.MICROSECONDS.sleep(10); } // 直接由结束转变为云心态 t.start(); }
此段程序运行时会报java.lang.IllegalThreadStateException异常,原因就是不能从结束状态直接转变为运行状态,我们知道一个线程的运行时间分为3部分:T1为线程启动时间,T2为线程的运行时间,T3为线程销毁时间,如果一个线程不能被重复使用,每次创建一个线程都需要经过启动、运行、销毁时间,这势必增大系统的响应时间,有没有更好的办法降低线程的运行时间呢?
T2是无法避免的,只有通过优化代码来实现降低运行时间。T1和T2都可以通过线程池(Thread Pool)来缩减时间,比如在容器(或系统)启动时,创建足够多的线程,当容器(或系统)需要时直接从线程池中获得线程,运算出结果,再把线程返回到线程池中___ExecutorService就是实现了线程池的执行器,我们来看一个示例代码:
public static void main(String[] args) throws InterruptedException { // 2个线程的线程池 ExecutorService es = Executors.newFixedThreadPool(2); // 多次执行线程体 for (int i = 0; i < 4; i++) { es.submit(new Runnable() { @Override public void run() { System.out.println(Thread.currentThread().getName()); } }); } // 关闭执行器 es.shutdown(); }
此段代码首先创建了一个包含两个线程的线程池,然后在线程池中多次运行线程体,输出运行时的线程名称,结果如下:
pool-1-thread-1
pool-1-thread-2
pool-1-thread-1
pool-1-thread-2
本次代码执行了4遍线程体,按照我们之前阐述的" 一个线程不可能从结束状态转变为可运行状态 ",那为什么此处的2个线程可以反复使用呢?这就是我们要搞清楚的重点。
线程池涉及以下几个名词:
工作线程(Worker):线程池中的线程,只有两个状态:可运行状态和等待状态,没有任务时它们处于等待状态,运行时它们循环的执行任务。
任务接口(Task):这是每个任务必须实现的接口,以供工作线程调度器调度,它主要规定了任务的入口、任务执行完的场景处理,任务的执行状态等。这里有两种类型的任务:具有返回值(异常)的Callable接口任务和无返回值并兼容旧版本的Runnable接口任务。
任务对列(Work Quene):也叫作工作队列,用于存放等待处理的任务,一般是BlockingQuene的实现类,用来实现任务的排队处理。
我们首先从线程池的创建说起,Executors.newFixedThreadPool(2)表示创建一个具有两个线程的线程池,源代码如下:
public class Executors { //生成一个最大为nThreads的线程池执行器 public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } }
这里使用了LinkedBlockingQueue作为队列任务管理器,所有等待处理的任务都会放在该对列中,需要注意的是,此队列是一个阻塞式的单端队列。线程池建立好了,那就需要线程在其中运行了,线程池中的线程是在submit第一次提交任务时建立的,代码如下:
public Future<?> submit(Runnable task) { //检查任务是否为null if (task == null) throw new NullPointerException(); //把Runnable任务包装成具有返回值的任务对象,不过此时并没有执行,只是包装 RunnableFuture<Object> ftask = newTaskFor(task, null); //执行此任务 execute(ftask); //返回任务预期执行结果 return ftask; }
此处的代码关键是execute方法,它实现了三个职责。
创建足够多的工作线程数,数量不超过最大线程数量,并保持线程处于运行或等待状态。
把等待处理的任务放到任务队列中
从任务队列中取出任务来执行
其中此处的关键是工作线程的创建,它也是通过new Thread方式创建的一个线程,只是它创建的并不是我们的任务线程(虽然我们的任务实现了Runnable接口,但它只是起了一个标志性的作用),而是经过包装的Worker线程,代码如下:
private final class Worker implements Runnable { // 运行一次任务 private void runTask(Runnable task) { /* 这里的task才是我们自定义实现Runnable接口的任务 */ task.run(); /* 该方法其它代码略 */ } // 工作线程也是线程,必须实现run方法 public void run() { try { Runnable task = firstTask; firstTask = null; while (task != null || (task = getTask()) != null) { runTask(task); task = null; } } finally { workerDone(this); } } // 任务队列中获得任务 Runnable getTask() { /* 其它代码略 */ for (;;) { return r = workQueue.take(); } } }
此处为示意代码,删除了大量的判断条件和锁资源。execute方法是通过Worker类启动的一个工作线程,执行的是我们的第一个任务,然后改线程通过getTask方法从任务队列中获取任务,之后再继续执行,但问题是任务队列是一个BlockingQuene,是阻塞式的,也就是说如果该队列的元素为0,则保持等待状态,直到有任务进入为止,我们来看LinkedBlockingQuene的take方法,代码如下:
public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { try { // 如果队列中的元素为0,则等待 while (count.get() == 0) notEmpty.await(); } catch (InterruptedException ie) { notEmpty.signal(); // propagate to a non-interrupted thread throw ie; } // 等待状态结束,弹出头元素 x = extract(); c = count.getAndDecrement(); // 如果队列数量还多于一个,唤醒其它线程 if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); // 返回头元素 return x; }
分析到这里,我们就明白了线程池的创建过程:创建一个阻塞队列以容纳任务,在第一次执行任务时创建做够多的线程(不超过许可线程数),并处理任务,之后每个工作线程自行从任务对列中获得任务,直到任务队列中的任务数量为0为止,此时,线程将处于等待状态,一旦有任务再加入到队列中,即召唤醒工作线程进行处理,实现线程的可复用性。
使用线程池减少的是线程的创建和销毁时间,这对于多线程应用来说非常有帮助,比如我们常用的Servlet容器,每次请求处理的都是一个线程,如果不采用线程池技术,每次请求都会重新创建一个新的线程,这会导致系统的性能符合加大,响应效率下降,降低了系统的友好性。