編寫優質的並發程式碼是一件難度極高的事情。 Java語言從第一個版本開始內建了對多線程的支持,這一點在當年是非常了不起的,但是當我們對並發編程有了更深刻的認識和更多的實踐後,實現並發程式設計就有了更多的方案和更好的選擇。本文是對並發程式設計的一點總結和思考,同時也分享了Java 5以後的版本中如何編寫並發程式碼的一點點經驗。
並發其實是一種解耦合的策略,它幫助我們把做什麼(目標)和什麼時候做(時機)分開。這樣做可以明顯改進應用程式的吞吐量(獲得更多的CPU調度時間)和結構(程式有多個部分在協同工作)。做過Java Web開發的人都知道,Java Web中的Servlet程式在Servlet容器的支援下採用單一實例多執行緒的工作模式,Servlet容器為你處理了並發問題。
最常見的對並發程式設計的誤解有以下這些:
-並發總是能改進效能(並發在CPU有很多空閒時間時能明顯改善程式的效能,但當執行緒數量較多的時候,執行緒間頻繁的調度切換反而會讓系統的效能下降)
-編寫並發程式無需修改原有的設計(目的與時機的解耦往往會對系統結構產生巨大的影響)
-在使用Web或EJB容器時不用關注並發問題(只有了解了容器在做什麼,才能更好的使用容器)
下面的這些說法才是對並發客觀的認識:
-編寫並發程式會在程式碼上增加額外的開銷
-正確的並發是非常複雜的,即使對於很簡單的問題
-並發中的缺陷因為不易重現也不容易被發現
-並發往往需要對設計策略從根本上進行修改
#分離並發相關程式碼和其他程式碼(並發相關程式碼有自己的開發、修改和調優生命週期)。
兩個執行緒修改共享物件的相同欄位時可能會互相干擾,導致不可預期的行為,解決方案之一是建構臨界區,但是必須限制臨界區的數量。
資料副本是避免共享資料的好方法,複製出來的物件只是以唯讀的方式對待。 Java 5的java.util.concurrent套件中增加一個名為CopyOnWriteArrayList的類,它是List介面的子類型,所以你可以認為它是ArrayList的線程安全的版本,它使用了寫時複製的方式創建資料副本進行操作來避免共享資料並發存取而引發的問題。
讓執行緒存在於自己的世界中,不與其他執行緒共享資料。有過Java Web開發經驗的人都知道,Servlet就是以單一實例多執行緒的方式工作,和每個請求相關的資料都是透過Servlet子類別的service方法(或是doGet或doPost方法)的參數傳入的。只要Servlet中的程式碼只使用局部變數,Servlet就不會導致同步問題。 springMVC的控制器也是這麼做的,從請求中獲得的物件都是以方法的參數傳入而不是作為類別的成員,很明顯Struts 2的做法就正好相反,因此Struts 2中作為控制器的Action類都是每個請求對應一個實例。
Java的執行緒模型建立在搶佔式執行緒調度的基礎上,也就是說:
所有執行緒可以很容易的共享同一進程中的物件。
能夠引用這些物件的任何執行緒都可以修改這些物件。
為了保護數據,物件可以被鎖住。
Java基於線程和鎖的並發過於底層,而且使用鎖很多時候都是很萬惡的,因為它相當於讓所有的並發都變成了排隊等待。
在Java 5以前,可以用synchronized關鍵字來實現鎖的功能,它可以用在程式碼區塊和方法上,表示在執行整個程式碼區塊或方法之前執行緒必須取得適當的鎖定。對於類別的非靜態方法(成員方法)而言,這意味著這要取得物件實例的鎖,對於類別的靜態方法(類別方法)而言,要取得類別的Class物件的鎖,對於同步程式碼區塊,程式設計師可以指定要取得的是那個物件的鎖。
不管是同步程式碼區塊還是同步方法,每次只有一個執行緒可以進入,如果其他執行緒試圖進入(不管是同一同步區塊還是不同的同步區塊),JVM會將它們掛起(放入到等鎖池中)。這種結構在並發理論中稱為臨界區(critical section)。這裡我們可以對Java中用synchronized實作同步與鎖定的功能做一個總結:
#只能鎖定對象,不能鎖定基本資料型別
被鎖定的物件陣列中的單一物件不會被鎖定
同步方法可以視為包含整個方法的synchronized(this) { … }程式碼區塊
靜態同步方法會鎖定它的Class物件
內部類別的同步是獨立於外部類別的
synchronized修飾符並不是方法簽名的組成部分,所以不能出現在介面的方法聲明中
非同步的方法不關心鎖定的狀態,它們在同步方法運行時仍可得以執行
synchronized實作的鎖定是可重入的鎖定。
在JVM內部,為了提高效率,同時運行的每個執行緒都會有它正在處理的資料的快取副本,當我們使用synchronzied進行同步的時候,真正被同步的是在不同線程中表示被鎖定物件的記憶體區塊(副本資料會保持和主記憶體的同步,現在知道為什麼要用同步這個詞彙了吧),簡單的說就是在同步區塊或同步方法執行完後,對被鎖定的物件所做的任何修改要在釋放鎖之前寫回到主記憶體中;在進入同步區塊得到鎖之後,被鎖定物件的資料是從主記憶體中讀出來的,持有鎖的執行緒的資料副本一定和主記憶體中的資料視圖是同步的。
在Java最初的版本中,就有一個叫Volatile的關鍵字,它是一種簡單的同步的處理機制,因為被volatile修飾的變數遵循以下規則:
變數的值在使用前總是會從主記憶體再讀取。
變數值的修改總是會在完成之後寫回主記憶體。
使用volatile關鍵字可以在多執行緒環境下預防編譯器不正確的最佳化假設(編譯器可能會將在一個執行緒中值不會改變的變數最佳化成常數),但只有在修改時不依賴目前狀態(讀取時的值)的變數才應該宣告為volatile變數。
不變模式也是並發程式設計時可以考慮的設計。讓對象的狀態是不變的,如果希望修改對象的狀態,就會創建對象的副本並將改變寫入副本而不改變原來的對象,這樣就不會出現狀態不一致的情況,因此不變物件是線程安全的。 Java中我們使用頻率極高的String類別就採用了這樣的設計。如果對不變模式不熟悉,可以閱讀閻宏博士的《Java與模式》一書的第34章。說到這裡你可能也體會到final關鍵字的重要意義了。
不管今後的Java朝向何種方向發展或滅亡,Java 5絕對是Java發展史中一個極為重要的版本,這個版本所提供的各種語言特性我們不在這裡討論(有興趣的可以閱讀我的另一篇文章《Java的第20年:從Java版本演進看編程技術的發展》),但是我們必須要感謝Doug Lea在Java 5中提供了他里程碑式的傑作java.util.concurrent包,它的出現讓Java的並發程式有了更多的選擇和更好的工作方式。 Doug Lea的傑作主要包括以下內容:
更好的線程安全的容器
線程池和相關的工具類別
可選的非阻塞解決方案
顯示的鎖定和信號量機制
下面我們對這些東西進行一一解讀。
Java 5中的java.util.concurrent包下面有一個atomic子包,其中有幾個以Atomic打頭的類,例如AtomicInteger和AtomicLong。它們利用了現代處理器的特性,可以用非阻塞的方式完成原子操作,程式碼如下所示:
/** ID序列生成器 */ public class IdGenerator { private final AtomicLong sequenceNumber = new AtomicLong(0); public long next() { return sequenceNumber.getAndIncrement(); } }
基於synchronized關鍵字的鎖定機制有以下問題:
鎖定只有一種類型,而且對所有同步運算都是一樣的作用
「鎖定」只能在程式碼區塊或方法開始的地方獲得,在結束的地方釋放
線程要么得到鎖,要么阻塞,沒有其他的可能性
Java 5對鎖機制進行了重構,提供了顯示的鎖,這樣可以在以下幾個方面提升鎖定機制:
可以添加不同类型的锁,例如读取锁和写入锁
可以在一个方法中加锁,在另一个方法中解锁
可以使用tryLock方式尝试获得锁,如果得不到锁可以等待、回退或者干点别的事情,当然也可以在超时之后放弃操作
显示的锁都实现了java.util.concurrent.Lock接口,主要有两个实现类:
ReentrantLock – 比synchronized稍微灵活一些的重入锁
ReentrantReadWriteLock – 在读操作很多写操作很少时性能更好的一种重入锁
对于如何使用显示锁,可以参考我的Java面试系列文章《Java面试题集51-70》中第60题的代码。只有一点需要提醒,解锁的方法unlock的调用最好能够在finally块中,因为这里是释放外部资源最好的地方,当然也是释放锁的最佳位置,因为不管正常异常可能都要释放掉锁来给其他线程以运行的机会。
CountDownLatch是一种简单的同步模式,它让一个线程可以等待一个或多个线程完成它们的工作从而避免对临界资源并发访问所引发的各种问题。下面借用别人的一段代码(我对它做了一些重构)来演示CountDownLatch是如何工作的。
import java.util.concurrent.CountDownLatch; /** * 工人类 * @author 骆昊 * */ class Worker { private String name; // 名字 private long workDuration; // 工作持续时间 /** * 构造器 */ public Worker(String name, long workDuration) { this.name = name; this.workDuration = workDuration; } /** * 完成工作 */ public void doWork() { System.out.println(name + " begins to work..."); try { Thread.sleep(workDuration); // 用休眠模拟工作执行的时间 } catch(InterruptedException ex) { ex.printStackTrace(); } System.out.println(name + " has finished the job..."); } } /** * 测试线程 * @author 骆昊 * */ class WorkerTestThread implements Runnable { private Worker worker; private CountDownLatch cdLatch; public WorkerTestThread(Worker worker, CountDownLatch cdLatch) { this.worker = worker; this.cdLatch = cdLatch; } @Override public void run() { worker.doWork(); // 让工人开始工作 cdLatch.countDown(); // 工作完成后倒计时次数减1 } } class CountDownLatchTest { private static final int MAX_WORK_DURATION = 5000; // 最大工作时间 private static final int MIN_WORK_DURATION = 1000; // 最小工作时间 // 产生随机的工作时间 private static long getRandomWorkDuration(long min, long max) { return (long) (Math.random() * (max - min) + min); } public static void main(String[] args) { CountDownLatch latch = new CountDownLatch(2); // 创建倒计时闩并指定倒计时次数为2 Worker w1 = new Worker("骆昊", getRandomWorkDuration(MIN_WORK_DURATION, MAX_WORK_DURATION)); Worker w2 = new Worker("王大锤", getRandomWorkDuration(MIN_WORK_DURATION, MAX_WORK_DURATION)); new Thread(new WorkerTestThread(w1, latch)).start(); new Thread(new WorkerTestThread(w2, latch)).start(); try { latch.await(); // 等待倒计时闩减到0 System.out.println("All jobs have been finished!"); } catch (InterruptedException e) { e.printStackTrace(); } } }
ConcurrentHashMap是HashMap在并发环境下的版本,大家可能要问,既然已经可以通过Collections.synchronizedMap获得线程安全的映射型容器,为什么还需要ConcurrentHashMap呢?因为通过Collections工具类获得的线程安全的HashMap会在读写数据时对整个容器对象上锁,这样其他使用该容器的线程无论如何也无法再获得该对象的锁,也就意味着要一直等待前一个获得锁的线程离开同步代码块之后才有机会执行。实际上,HashMap是通过哈希函数来确定存放键值对的桶(桶是为了解决哈希冲突而引入的),修改HashMap时并不需要将整个容器锁住,只需要锁住即将修改的“桶”就可以了。HashMap的数据结构如下图所示。
此外,ConcurrentHashMap还提供了原子操作的方法,如下所示:
putIfAbsent:如果还没有对应的键值对映射,就将其添加到HashMap中。
remove:如果键存在而且值与当前状态相等(equals比较结果为true),则用原子方式移除该键值对映射
replace:替换掉映射中元素的原子操作
CopyOnWriteArrayList是ArrayList在并发环境下的替代品。CopyOnWriteArrayList通过增加写时复制语义来避免并发访问引起的问题,也就是说任何修改操作都会在底层创建一个列表的副本,也就意味着之前已有的迭代器不会碰到意料之外的修改。这种方式对于不要严格读写同步的场景非常有用,因为它提供了更好的性能。记住,要尽量减少锁的使用,因为那势必带来性能的下降(对数据库中数据的并发访问不也是如此吗?如果可以的话就应该放弃悲观锁而使用乐观锁),CopyOnWriteArrayList很明显也是通过牺牲空间获得了时间(在计算机的世界里,时间和空间通常是不可调和的矛盾,可以牺牲空间来提升效率获得时间,当然也可以通过牺牲时间来减少对空间的使用)。
可以通过下面两段代码的运行状况来验证一下CopyOnWriteArrayList是不是线程安全的容器。
import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; class AddThread implements Runnable { private List<Double> list; public AddThread(List<Double> list) { this.list = list; } @Override public void run() { for(int i = 0; i < 10000; ++i) { list.add(Math.random()); } } } public class Test05 { private static final int THREAD_POOL_SIZE = 2; public static void main(String[] args) { List<Double> list = new ArrayList<>(); ExecutorService es = Executors.newFixedThreadPool(THREAD_POOL_SIZE); es.execute(new AddThread(list)); es.execute(new AddThread(list)); es.shutdown(); } }
上面的代码会在运行时产生ArrayIndexOutOfBoundsException,试一试将上面代码25行的ArrayList换成CopyOnWriteArrayList再重新运行。
List<Double> list = new CopyOnWriteArrayList<>();
队列是一个无处不在的美妙概念,它提供了一种简单又可靠的方式将资源分发给处理单元(也可以说是将工作单元分配给待处理的资源,这取决于你看待问题的方式)。实现中的并发编程模型很多都依赖队列来实现,因为它可以在线程之间传递工作单元。
Java 5中的BlockingQueue就是一个在并发环境下非常好用的工具,在调用put方法向队列中插入元素时,如果队列已满,它会让插入元素的线程等待队列腾出空间;在调用take方法从队列中取元素时,如果队列为空,取出元素的线程就会阻塞。
可以用BlockingQueue来实现生产者-消费者并发模型(下一节中有介绍),当然在Java 5以前也可以通过wait和notify来实现线程调度,比较一下两种代码就知道基于已有的并发工具类来重构并发代码到底好在哪里了。
基于wait和notify的实现
import java.util.ArrayList; import java.util.List; import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * 公共常量 * @author 骆昊 * */ class Constants { public static final int MAX_BUFFER_SIZE = 10; public static final int NUM_OF_PRODUCER = 2; public static final int NUM_OF_CONSUMER = 3; } /** * 工作任务 * @author 骆昊 * */ class Task { private String id; // 任务的编号 public Task() { id = UUID.randomUUID().toString(); } @Override public String toString() { return "Task[" + id + "]"; } } /** * 消费者 * @author 骆昊 * */ class Consumer implements Runnable { private List<Task> buffer; public Consumer(List<Task> buffer) { this.buffer = buffer; } @Override public void run() { while(true) { synchronized(buffer) { while(buffer.isEmpty()) { try { buffer.wait(); } catch(InterruptedException e) { e.printStackTrace(); } } Task task = buffer.remove(0); buffer.notifyAll(); System.out.println("Consumer[" + Thread.currentThread().getName() + "] got " + task); } } } } /** * 生产者 * @author 骆昊 * */ class Producer implements Runnable { private List<Task> buffer; public Producer(List<Task> buffer) { this.buffer = buffer; } @Override public void run() { while(true) { synchronized (buffer) { while(buffer.size() >= Constants.MAX_BUFFER_SIZE) { try { buffer.wait(); } catch(InterruptedException e) { e.printStackTrace(); } } Task task = new Task(); buffer.add(task); buffer.notifyAll(); System.out.println("Producer[" + Thread.currentThread().getName() + "] put " + task); } } } } public class Test06 { public static void main(String[] args) { List<Task> buffer = new ArrayList<>(Constants.MAX_BUFFER_SIZE); ExecutorService es = Executors.newFixedThreadPool(Constants.NUM_OF_CONSUMER + Constants.NUM_OF_PRODUCER); for(int i = 1; i <= Constants.NUM_OF_PRODUCER; ++i) { es.execute(new Producer(buffer)); } for(int i = 1; i <= Constants.NUM_OF_CONSUMER; ++i) { es.execute(new Consumer(buffer)); } } }
基于BlockingQueue的实现
import java.util.UUID; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; /** * 公共常量 * @author 骆昊 * */ class Constants { public static final int MAX_BUFFER_SIZE = 10; public static final int NUM_OF_PRODUCER = 2; public static final int NUM_OF_CONSUMER = 3; } /** * 工作任务 * @author 骆昊 * */ class Task { private String id; // 任务的编号 public Task() { id = UUID.randomUUID().toString(); } @Override public String toString() { return "Task[" + id + "]"; } } /** * 消费者 * @author 骆昊 * */ class Consumer implements Runnable { private BlockingQueue<Task> buffer; public Consumer(BlockingQueue<Task> buffer) { this.buffer = buffer; } @Override public void run() { while(true) { try { Task task = buffer.take(); System.out.println("Consumer[" + Thread.currentThread().getName() + "] got " + task); } catch (InterruptedException e) { e.printStackTrace(); } } } } /** * 生产者 * @author 骆昊 * */ class Producer implements Runnable { private BlockingQueue<Task> buffer; public Producer(BlockingQueue<Task> buffer) { this.buffer = buffer; } @Override public void run() { while(true) { try { Task task = new Task(); buffer.put(task); System.out.println("Producer[" + Thread.currentThread().getName() + "] put " + task); } catch (InterruptedException e) { e.printStackTrace(); } } } } public class Test07 { public static void main(String[] args) { BlockingQueue<Task> buffer = new LinkedBlockingQueue<>(Constants.MAX_BUFFER_SIZE); ExecutorService es = Executors.newFixedThreadPool(Constants.NUM_OF_CONSUMER + Constants.NUM_OF_PRODUCER); for(int i = 1; i <= Constants.NUM_OF_PRODUCER; ++i) { es.execute(new Producer(buffer)); } for(int i = 1; i <= Constants.NUM_OF_CONSUMER; ++i) { es.execute(new Consumer(buffer)); } } }
使用BlockingQueue后代码优雅了很多。
在继续下面的探讨之前,我们还是重温一下几个概念:
概念 | 解释 |
---|---|
临界资源 | 并发环境中有着固定数量的资源 |
互斥 | 对资源的访问是排他式的 |
饥饿 | 一个或一组线程长时间或永远无法取得进展 |
死锁 | 两个或多个线程相互等待对方结束 |
活锁 | 想要执行的线程总是发现其他的线程正在执行以至于长时间或永远无法执行 |
重温了这几个概念后,我们可以探讨一下下面的几种并发模型。
一个或多个生产者创建某些工作并将其置于缓冲区或队列中,一个或多个消费者会从队列中获得这些工作并完成之。这里的缓冲区或队列是临界资源。当缓冲区或队列放满的时候,生产这会被阻塞;而缓冲区或队列为空的时候,消费者会被阻塞。生产者和消费者的调度是通过二者相互交换信号完成的。
当存在一个主要为读者提供信息的共享资源,它偶尔会被写者更新,但是需要考虑系统的吞吐量,又要防止饥饿和陈旧资源得不到更新的问题。在这种并发模型中,如何平衡读者和写者是最困难的,当然这个问题至今还是一个被热议的问题,恐怕必须根据具体的场景来提供合适的解决方案而没有那种放之四海而皆准的方法(不像我在国内的科研文献中看到的那样)。
1965年,荷兰计算机科学家图灵奖得主Edsger Wybe Dijkstra提出并解决了一个他称之为哲学家进餐的同步问题。这个问题可以简单地描述如下:五个哲学家围坐在一张圆桌周围,每个哲学家面前都有一盘通心粉。由于通心粉很滑,所以需要两把叉子才能夹住。相邻两个盘子之间放有一把叉子如下图所示。哲学家的生活中有两种交替活动时段:即吃饭和思考。当一个哲学家觉得饿了时,他就试图分两次去取其左边和右边的叉子,每次拿一把,但不分次序。如果成功地得到了两把叉子,就开始吃饭,吃完后放下叉子继续思考。
把上面问题中的哲学家换成线程,把叉子换成竞争的临界资源,上面的问题就是线程竞争资源的问题。如果没有经过精心的设计,系统就会出现死锁、活锁、吞吐量下降等问题。
下面是用信号量原语来解决哲学家进餐问题的代码,使用了Java 5并发工具包中的Semaphore类(代码不够漂亮但是已经足以说明问题了)。
//import java.util.concurrent.ExecutorService; //import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; /** * 存放线程共享信号量的上下问 * @author 骆昊 * */ class AppContext { public static final int NUM_OF_FORKS = 5; // 叉子数量(资源) public static final int NUM_OF_PHILO = 5; // 哲学家数量(线程) public static Semaphore[] forks; // 叉子的信号量 public static Semaphore counter; // 哲学家的信号量 static { forks = new Semaphore[NUM_OF_FORKS]; for (int i = 0, len = forks.length; i < len; ++i) { forks[i] = new Semaphore(1); // 每个叉子的信号量为1 } counter = new Semaphore(NUM_OF_PHILO - 1); // 如果有N个哲学家,最多只允许N-1人同时取叉子 } /** * 取得叉子 * @param index 第几个哲学家 * @param leftFirst 是否先取得左边的叉子 * @throws InterruptedException */ public static void putOnFork(int index, boolean leftFirst) throws InterruptedException { if(leftFirst) { forks[index].acquire(); forks[(index + 1) % NUM_OF_PHILO].acquire(); } else { forks[(index + 1) % NUM_OF_PHILO].acquire(); forks[index].acquire(); } } /** * 放回叉子 * @param index 第几个哲学家 * @param leftFirst 是否先放回左边的叉子 * @throws InterruptedException */ public static void putDownFork(int index, boolean leftFirst) throws InterruptedException { if(leftFirst) { forks[index].release(); forks[(index + 1) % NUM_OF_PHILO].release(); } else { forks[(index + 1) % NUM_OF_PHILO].release(); forks[index].release(); } } } /** * 哲学家 * @author 骆昊 * */ class Philosopher implements Runnable { private int index; // 编号 private String name; // 名字 public Philosopher(int index, String name) { this.index = index; this.name = name; } @Override public void run() { while(true) { try { AppContext.counter.acquire(); boolean leftFirst = index % 2 == 0; AppContext.putOnFork(index, leftFirst); System.out.println(name + "正在吃意大利面(通心粉)..."); // 取到两个叉子就可以进食 AppContext.putDownFork(index, leftFirst); AppContext.counter.release(); } catch (InterruptedException e) { e.printStackTrace(); } } } } public class Test04 { public static void main(String[] args) { String[] names = { "骆昊", "王大锤", "张三丰", "杨过", "李莫愁" }; // 5位哲学家的名字 // ExecutorService es = Executors.newFixedThreadPool(AppContext.NUM_OF_PHILO); // 创建固定大小的线程池 // for(int i = 0, len = names.length; i < len; ++i) { // es.execute(new Philosopher(i, names[i])); // 启动线程 // } // es.shutdown(); for(int i = 0, len = names.length; i < len; ++i) { new Thread(new Philosopher(i, names[i])).start(); } } }
现实中的并发问题基本上都是这三种模型或者是这三种模型的变体。
对并发代码的测试也是非常棘手的事情,棘手到无需说明大家也很清楚的程度,所以这里我们只是探讨一下如何解决这个棘手的问题。我们建议大家编写一些能够发现问题的测试并经常性的在不同的配置和不同的负载下运行这些测试。不要忽略掉任何一次失败的测试,线程代码中的缺陷可能在上万次测试中仅仅出现一次。具体来说有这么几个注意事项:
不要将系统的失效归结于偶发事件,就像拉不出屎的时候不能怪地球没有引力。
先让非并发代码工作起来,不要试图同时找到并发和非并发代码中的缺陷。
编写可以在不同配置环境下运行的线程代码。
编写容易调整的线程代码,这样可以调整线程使性能达到最优。
让线程的数量多于CPU或CPU核心的数量,这样CPU调度切换过程中潜在的问题才会暴露出来。
让并发代码在不同的平台上运行。
通过自动化或者硬编码的方式向并发代码中加入一些辅助测试的代码。
Java 7中引入了TransferQueue,它比BlockingQueue多了一个叫transfer的方法,如果接收线程处于等待状态,该操作可以马上将任务交给它,否则就会阻塞直至取走该任务的线程出现。可以用TransferQueue代替BlockingQueue,因为它可以获得更好的性能。
刚才忘记了一件事情,Java 5中还引入了Callable接口、Future接口和FutureTask接口,通过他们也可以构建并发应用程序,代码如下所示。
import java.util.ArrayList; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; public class Test07 { private static final int POOL_SIZE = 10; static class CalcThread implements Callable<Double> { private List<Double> dataList = new ArrayList<>(); public CalcThread() { for(int i = 0; i < 10000; ++i) { dataList.add(Math.random()); } } @Override public Double call() throws Exception { double total = 0; for(Double d : dataList) { total += d; } return total / dataList.size(); } } public static void main(String[] args) { List<Future<Double>> fList = new ArrayList<>(); ExecutorService es = Executors.newFixedThreadPool(POOL_SIZE); for(int i = 0; i < POOL_SIZE; ++i) { fList.add(es.submit(new CalcThread())); } for(Future<Double> f : fList) { try { System.out.println(f.get()); } catch (Exception e) { e.printStackTrace(); } } es.shutdown(); } }
Callable接口也是一个单方法接口,显然这是一个回调方法,类似于函数式编程中的回调函数,在Java 8 以前,Java中还不能使用Lambda表达式来简化这种函数式编程。和Runnable接口不同的是Callable接口的回调方法call方法会返回一个对象,这个对象可以用将来时的方式在线程执行结束的时候获得信息。上面代码中的call方法就是将计算出的10000个0到1之间的随机小数的平均值返回,我们通过一个Future接口的对象得到了这个返回值。目前最新的Java版本中,Callable接口和Runnable接口都被打上了@FunctionalInterface的注解,也就是说它可以用函数式编程的方式(Lambda表达式)创建接口对象。
下面是Future接口的主要方法:
get():获取结果。如果结果还没有准备好,get方法会阻塞直到取得结果;当然也可以通过参数设置阻塞超时时间。
cancel():在运算结束前取消。
isDone():可以用来判断运算是否结束。
Java 7中还提供了分支/合并(fork/join)框架,它可以实现线程池中任务的自动调度,并且这种调度对用户来说是透明的。为了达到这种效果,必须按照用户指定的方式对任务进行分解,然后再将分解出的小型任务的执行结果合并成原来任务的执行结果。这显然是运用了分治法(pide-and-conquer)的思想。下面的代码使用了分支/合并框架来计算1到10000的和,当然对于如此简单的任务根本不需要分支/合并框架,因为分支和合并本身也会带来一定的开销,但是这里我们只是探索一下在代码中如何使用分支/合并框架,让我们的代码能够充分利用现代多核CPU的强大运算能力。
import java.util.concurrent.ForkJoinPool; import java.util.concurrent.Future; import java.util.concurrent.RecursiveTask; class Calculator extends RecursiveTask<Integer> { private static final long serialVersionUID = 7333472779649130114L; private static final int THRESHOLD = 10; private int start; private int end; public Calculator(int start, int end) { this.start = start; this.end = end; } @Override public Integer compute() { int sum = 0; if ((end - start) < THRESHOLD) { // 当问题分解到可求解程度时直接计算结果 for (int i = start; i <= end; i++) { sum += i; } } else { int middle = (start + end) >>> 1; // 将任务一分为二 Calculator left = new Calculator(start, middle); Calculator right = new Calculator(middle + 1, end); left.fork(); right.fork(); // 注意:由于此处是递归式的任务分解,也就意味着接下来会二分为四,四分为八... sum = left.join() + right.join(); // 合并两个子任务的结果 } return sum; } } public class Test08 { public static void main(String[] args) throws Exception { ForkJoinPool forkJoinPool = new ForkJoinPool(); Future<Integer> result = forkJoinPool.submit(new Calculator(1, 10000)); System.out.println(result.get()); } }
伴随着Java 7的到来,Java中默认的数组排序算法已经不再是经典的快速排序(双枢轴快速排序)了,新的排序算法叫TimSort,它是归并排序和插入排序的混合体,TimSort可以通过分支合并框架充分利用现代处理器的多核特性,从而获得更好的性能(更短的排序时间)。
以上是關於Java並發程式設計的總結與思考的詳細介紹的詳細內容。更多資訊請關注PHP中文網其他相關文章!