為每個特定的同步問題提供了解決方案
Semaphore【信號標;旗語】,透過計數器控制對共享資源的存取。
測試類別:
package concurrent;
import concurrent.thread.SemaphoreThread;
import java.SemaphoreThread;
import java.SemaphoreThread;
import java.
package concurrent.thread;
import org. apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.concurrent.Semaphore;
/** * 拿客 * www.coderknock.com * QQ群:213732117 * 创建时间:2016年08月08日 * 描述: */ public class SemaphoreTest { public static void main(String[] args) { //在Thread里声明并不是同一个对象 Semaphore semaphore = new Semaphore(3); SemaphoreThread testA = new SemaphoreThread("A", semaphore); SemaphoreThread testB = new SemaphoreThread("B", semaphore); SemaphoreThread testC = new SemaphoreThread("C", semaphore); SemaphoreThread testD = new SemaphoreThread("D", semaphore); SemaphoreThread testE = new SemaphoreThread("E", semaphore); SemaphoreThread testF = new SemaphoreThread("F", semaphore); SemaphoreThread testG = new SemaphoreThread("G", semaphore); testA.start(); testB.start(); testC.start(); testD.start(); testE.start(); testF.start(); testG.start(); } }
/** * 拿客 * www.coderknock.com * QQ群:213732117 * 创建时间:2016年08月08日 * 描述: */ public class SemaphoreThread extends Thread { private static final Logger logger = LogManager.getLogger(SemaphoreThread.class); //创建有3个信号量的信号量计数器 public Semaphore semaphore; public SemaphoreThread(String name, Semaphore semaphore) { setName(name); this.semaphore = semaphore; } @Override public void run() { try { logger.debug(getName() + " 取号等待... " + System.currentTimeMillis()); //取出一个信号 semaphore.acquire(); logger.debug(getName() + " 提供服务... " + System.currentTimeMillis()); sleep(1000); logger.debug(getName() + " 完成服务... " + System.currentTimeMillis()); } catch (InterruptedException e) { e.printStackTrace(); } logger.debug(getName() + " 释放... " + System.currentTimeMillis()); //释放一个信号 semaphore.release(); } }
CountDownLatch
CountDownLatch【倒數計時鎖】,執行緒中呼叫countDownLatch.await()使進程進入阻塞狀態,當達成指定次數後(透過countDownLatch.countDown())繼續執行每個執行緒中剩餘的內容。
測試類別:
[C] - C 取号等待... 1470642024037 [F] - F 取号等待... 1470642024036 [E] - E 取号等待... 1470642024036 [B] - B 取号等待... 1470642024037 [D] - D 取号等待... 1470642024037 [A] - A 取号等待... 1470642023965 [D] - D 提供服务... 1470642024039 [C] - C 提供服务... 1470642024039 [G] - G 取号等待... 1470642024036 [F] - F 提供服务... 1470642024040 [D] - D 完成服务... 1470642025039 [C] - C 完成服务... 1470642025039 [D] - D 释放... 1470642025040 [F] - F 完成服务... 1470642025040 [C] - C 释放... 1470642025041 [B] - B 提供服务... 1470642025042 [A] - A 提供服务... 1470642025042 [F] - F 释放... 1470642025043 [E] - E 提供服务... 1470642025043 [A] - A 完成服务... 1470642026043 [B] - B 完成服务... 1470642026043 [B] - B 释放... 1470642026043 [A] - A 释放... 1470642026043 [G] - G 提供服务... 1470642026044 [E] - E 完成服务... 1470642026045 [E] - E 释放... 1470642026045 [G] - G 完成服务... 1470642027045 [G] - G 释放... 1470642027046
package concurrent.thread; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.util.concurrent.CountDownLatch; /** * 拿客 * www.coderknock.com * QQ群:213732117 * 创建时间:2016年08月08日 * 描述: */ public class package concurrent; import concurrent.thread.CountDownLatchThread; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; /** * 拿客 * www.coderknock.com * QQ群:213732117 * 创建时间:2016年08月08日 * 描述: */ public class CountDownLatchTest { private static final Logger logger = LogManager.getLogger(CountDownLatchTest.class); public static void main(String[] args) throws InterruptedException { //设定当达成三个计数时触发 CountDownLatch countDownLatch = new CountDownLatch(3); new CountDownLatchThread("A", countDownLatch).start(); new CountDownLatchThread("B", countDownLatch).start(); new CountDownLatchThread("C", countDownLatch).start(); new CountDownLatchThread("D", countDownLatch).start(); new CountDownLatchThread("E", countDownLatch).start(); for (int i = 3; i > 0; i--) { Thread.sleep(1000); logger.debug(i); countDownLatch.countDown(); } } }
package concurrent.thread; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.util.concurrent.CountDownLatch; /** * 拿客 * www.coderknock.com * QQ群:213732117 * 创建时间:2016年08月08日 * 描述: */ public class CountDownLatchThread extends Thread { private static final Logger logger = LogManager.getLogger(CountDownLatchThread.class); //计数器 private CountDownLatch countDownLatch; public CountDownLatchThread(String name, CountDownLatch countDownLatch) { setName(name); this.countDownLatch = countDownLatch; } @Override public void run() { logger.debug("执行操作..."); try { sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } logger.debug("等待计数器达到标准..."); try { //让线程进入阻塞状态,等待计数达成后释放 countDownLatch.await(); logger.debug("计数达成,继续执行..."); } catch (InterruptedException e) { e.printStackTrace(); } } }
Cyclic
[E] - 执行操作... [B] - 执行操作... [A] - 执行操作... [C] - 执行操作... [D] - 执行操作... [main] DEBUG concurrent.CountDownLatchTest - 3 [B] - 等待计数器达到标准... [E] - 等待计数器达到标准... [C] - 等待计数器达到标准... [D] - 等待计数器达到标准... [A] - 等待计数器达到标准... [main] DEBUG concurrent.CountDownLatchTest - 2 [main] DEBUG concurrent.CountDownLatchTest - 1 [E] - 计数达成,继续执行... [C] - 计数达成,继续执行... [B] - 计数达成,继续执行... [D] - 计数达成,继续执行... [A] - 计数达成,继续执行...
package concurrent; import concurrent.thread.CyclicBarrierThread; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.util.concurrent.CyclicBarrier; /** * 拿客 * www.coderknock.com * QQ群:213732117 * 创建时间:2016年08月08日 * 描述: */ public class CyclicBarrierTest { private static final Logger logger = LogManager.getLogger(CyclicBarrierTest.class); public static void main(String[] args) { //可以使用CyclicBarrier(int parties)不设定到达后执行的内容 CyclicBarrier cyclicBarrier = new CyclicBarrier(5, () -> { logger.debug("---计数到达后执行的内容----"); }); new CyclicBarrierThread("A", cyclicBarrier).start(); new CyclicBarrierThread("B", cyclicBarrier).start(); new CyclicBarrierThread("C", cyclicBarrier).start(); new CyclicBarrierThread("D", cyclicBarrier).start(); new CyclicBarrierThread("E", cyclicBarrier).start(); new CyclicBarrierThread("A2", cyclicBarrier).start(); new CyclicBarrierThread("B2", cyclicBarrier).start(); new CyclicBarrierThread("C2", cyclicBarrier).start(); new CyclicBarrierThread("D2", cyclicBarrier).start(); new CyclicBarrierThread("E2", cyclicBarrier).start(); //需要注意的是,如果线程数不是上面设置的等待数量的整数倍,比如这个程序中又加了个线程, // 那么当达到5个数量时,只会执行达到时的五个线程的内容, // 剩余一个线程会出于阻塞状态导致主线程无法退出,程序无法结束 // new CyclicBarrierThread("F", cyclicBarrier).start();//将这行注释去掉程序无法自动结束 } }
package concurrent.thread; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.util.Random; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; /** * 拿客 * www.coderknock.com * QQ群:213732117 * 创建时间:2016年08月08日 * 描述: */ public class CyclicBarrierThread extends Thread { private static final Logger logger = LogManager.getLogger(CyclicBarrierThread.class); private CyclicBarrier cyclicBarrier; public CyclicBarrierThread(String name, CyclicBarrier cyclicBarrier) { super(name); this.cyclicBarrier = cyclicBarrier; } @Override public void run() { logger.debug("执行操作..."); try { int time = new Random().nextInt(10) * 1000; logger.debug("休眠" + time/1000 + "秒"); sleep(time); } catch (InterruptedException e) { e.printStackTrace(); } logger.debug("等待计数器达到标准..."); try { //让线程进入阻塞状态,等待计数达成后释放 cyclicBarrier.await(); logger.debug("计数达成,继续执行..."); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } } }
[A] - 执行操作... [A] - 休眠0秒 [E2] - 执行操作... [E2] - 休眠5秒 [D2] - 执行操作... [D2] - 休眠4秒 [C2] - 执行操作... [C2] - 休眠4秒 [B2] - 执行操作... [B2] - 休眠6秒 [A2] - 执行操作... [A2] - 休眠8秒 [E] - 执行操作... [E] - 休眠5秒 [D] - 执行操作... [D] - 休眠0秒 [C] - 执行操作... [C] - 休眠3秒 [B] - 执行操作... [B] - 休眠7秒 [A] - 等待计数器达到标准... [D] - 等待计数器达到标准... [C] - 等待计数器达到标准... [D2] - 等待计数器达到标准... [C2] - 等待计数器达到标准... [C2] DEBUG concurrent.CyclicBarrierTest - ---计数到达后执行的内容---- [C2] - 计数达成,继续执行... [A] - 计数达成,继续执行... [C] - 计数达成,继续执行... [D2] - 计数达成,继续执行... [D] - 计数达成,继续执行... [E2] - 等待计数器达到标准... [E] - 等待计数器达到标准... [B2] - 等待计数器达到标准... [B] - 等待计数器达到标准... [A2] - 等待计数器达到标准... [A2] DEBUG concurrent.CyclicBarrierTest - ---计数到达后执行的内容---- [E] - 计数达成,继续执行... [B2] - 计数达成,继续执行... [E2] - 计数达成,继续执行... [B] - 计数达成,继续执行... [A2] - 计数达成,继续执行...
package concurrent; import concurrent.pojo.ExchangerPojo; import concurrent.thread.ExchangerThread; import java.util.HashMap; import java.util.concurrent.Exchanger; /** * 拿客 * www.coderknock.com * QQ群:213732117 * 创建时间:2016年08月08日 * 描述: */ public class ExchangerTest { public static void main(String[] args) { Exchanger<HashMap<String, ExchangerPojo>> exchanger = new Exchanger<>(); new ExchangerThread("A", exchanger).start(); new ExchangerThread("B", exchanger).start(); } }
package concurrent.pojo; import com.alibaba.fastjson.JSON; import java.util.Date; import java.util.List; /** * 拿客 * www.coderknock.com * QQ群:213732117 * 创建时间:2016年08月08日 * 描述: */ public class ExchangerPojo { private int intVal; private String strVal; private List<String> strList; private Date date; public ExchangerPojo(int intVal, String strVal, List<String> strList, Date date) { this.intVal = intVal; this.strVal = strVal; this.strList = strList; this.date = date; } public int getIntVal() { return intVal; } public void setIntVal(int intVal) { this.intVal = intVal; } public String getStrVal() { return strVal; } public void setStrVal(String strVal) { this.strVal = strVal; } public List<String> getStrList() { return strList; } public void setStrList(List<String> strList) { this.strList = strList; } public Date getDate() { return date; } public void setDate(Date date) { this.date = date; } @Override public String toString() { return JSON.toJSONString(this); } }
CyclicBarrier則是N個線程等待彼此執行到零界點之後再繼續執行,await()調用的同時參與了計數,並且CyclicBarrier支持條件達成後執行某個動作,而且這個過程是循環性的。
Exchanger
Exchanger
Exchanger
測試類別:
package concurrent.thread; import concurrent.pojo.ExchangerPojo; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.util.*; import java.util.concurrent.Exchanger; /** * 拿客 * www.coderknock.com * QQ群:213732117 * 创建时间:2016年08月08日 * 描述: */ public class ExchangerThread extends Thread { private Exchanger<HashMap<String, ExchangerPojo>> exchanger; private static final Logger logger = LogManager.getLogger(ExchangerThread.class); public ExchangerThread(String name, Exchanger<HashMap<String, ExchangerPojo>> exchanger) { super(name); this.exchanger = exchanger; } @Override public void run() { HashMap<String, ExchangerPojo> map = new HashMap<>(); logger.debug(getName() + "提供者提供数据..."); Random random = new Random(); for (int i = 0; i < 3; i++) { int index = random.nextInt(10); List<String> list = new ArrayList<>(); for (int j = 0; j < index; j++) { list.add("list ---> " + j); } ExchangerPojo pojo = new ExchangerPojo(index, getName() + "提供的数据", list, new Date()); map.put("第" + i + "个数据", pojo); } try { int time = random.nextInt(10); logger.debug(getName() + "等待" + time + "秒...."); for (int i = time; i > 0; i--) { sleep(1000); logger.debug(getName() + "---->" + i); } //等待exchange是会进入阻塞状态,可以在一个线程中与另一线程多次交互,此处就不写多次了 HashMap<String, ExchangerPojo> getMap = exchanger.exchange(map); time = random.nextInt(10); logger.debug(getName() + "接受到数据等待" + time + "秒...."); for (int i = time; i > 0; i--) { sleep(1000); logger.debug(getName() + "---->" + i); } getMap.forEach((x, y) -> { logger.debug(x + " -----> " + y.toString()); }); } catch (InterruptedException e) { e.printStackTrace(); } } }
[B] - B提供者提供数据... [A] - A提供者提供数据... [A] - A等待2秒.... [B] - B等待0秒.... [A] - A---->2 [A] - A---->1 [B] - B接受到数据等待1秒.... [A] - A接受到数据等待4秒.... [B] - B---->1 [A] - A---->4 [B] - 第0个数据 -----> {"date":1470652252049,"intVal":5,"strList":["list ---> 0","list ---> 1","list ---> 2","list ---> 3","list ---> 4"],"strVal":"A提供的数据"} [B] - 第1个数据 -----> {"date":1470652252049,"intVal":1,"strList":["list ---> 0"],"strVal":"A提供的数据"} [B] - 第2个数据 -----> {"date":1470652252049,"intVal":4,"strList":["list ---> 0","list ---> 1","list ---> 2","list ---> 3"],"strVal":"A提供的数据"} [A] - A---->3 [A] - A---->2 [A] - A---->1 [A] - 第0个数据 -----> {"date":1470652252057,"intVal":1,"strList":["list ---> 0"],"strVal":"B提供的数据"} [A] - 第1个数据 -----> {"date":1470652252057,"intVal":6,"strList":["list ---> 0","list ---> 1","list ---> 2","list ---> 3","list ---> 4","list ---> 5"], "strVal":"B提供的数据"} [A] - 第2个数据 -----> {"date":1470652252057,"intVal":6,"strList":["list ---> 0","list ---> 1","list ---> 2","list ---> 3","list ---> 4","list --->
Phaser個人感覺兼具了CountDownLatch與CyclicBarrier的功能,並提供了分階段的能力。的CyclicBarrier的功能
package concurrent; import concurrent.thread.PhaserThread; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.util.concurrent.Phaser; /** * 拿客 * 网站:www.coderknock.com * QQ群:213732117 * 三产 创建于 2016年08月08日 21:25:30。 */ public class PhaserTest { private static final Logger logger = LogManager.getLogger(PhaserTest.class); public static void main(String[] args) { Phaser phaser = new Phaser() { /**此方法有2个作用: * 1、当每一个阶段执行完毕,此方法会被自动调用,因此,重载此方法写入的代码会在每个阶段执行完毕时执行,相当于CyclicBarrier的barrierAction。 * 2、当此方法返回true时,意味着Phaser被终止,因此可以巧妙的设置此方法的返回值来终止所有线程。 例如:若此方法返回值为 phase>=3,其含义为当整个线程执行了4个阶段后,程序终止。 * */ @Override protected boolean onAdvance(int phase, int registeredParties) { logger.debug("阶段--->" + phase); logger.debug("注册的线程数量--->" + registeredParties); return super.onAdvance(phase, registeredParties); } }; for (int i = 3; i > 0; i--) { new PhaserThread("第" + i + "个", phaser).start(); } } }
package concurrent.thread; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.util.Random; import java.util.concurrent.Phaser; /** * 拿客 * 网站:www.coderknock.com * QQ群:213732117 * 三产 创建于 2016年08月08日 21:16:55。 */ public class PhaserThread extends Thread { private Phaser phaser; private static final Logger logger = LogManager.getLogger(PhaserThread.class); public PhaserThread(String name, Phaser phaser) { super(name); this.phaser = phaser; //把当前线程注册到Phaser this.phaser.register(); logger.debug("name为" + name + "的线程注册了" + this.phaser.getRegisteredParties() + "个线程"); } @Override public void run() { logger.debug("进入..."); phaser.arrive(); for (int i = 6; i > 0; i--) { int time = new Random().nextInt(5); try { logger.debug("睡眠" + time + "秒"); sleep(time * 1000); if (i == 1) { logger.debug("未完成的线程数量:" + phaser.getUnarrivedParties()); logger.debug("最后一次触发,并注销自身"); phaser.arriveAndDeregister(); logger.debug("未完成的线程数量:" + phaser.getUnarrivedParties()); } else { logger.debug("未完成的线程数量:" + phaser.getUnarrivedParties()); logger.debug(i + "--->触发并阻塞..."); phaser.arriveAndAwaitAdvance();//相当于CyclicBarrier.await(); logger.debug("未完成的线程数量:" + phaser.getUnarrivedParties()); } } catch (InterruptedException e) { e.printStackTrace(); } } logger.debug("注销完成之后注册的线程数量--->" + phaser.getRegisteredParties()); } }
[main] - name为第3个的线程注册了1个线程 [main] - name为第2个的线程注册了2个线程 [main] - name为第1个的线程注册了3个线程 [第3个] - 进入... [第2个] - 进入... [第3个] - 睡眠2秒 [第2个] - 睡眠1秒 [第1个] - 进入... [第1个] - 阶段--->0 [第1个] - 注册的线程数量--->3 [第1个] - 睡眠4秒 [第2个] - 未完成的线程数量:3 [第2个] - 6--->触发并阻塞... [第3个] - 未完成的线程数量:2 [第3个] - 6--->触发并阻塞... [第1个] - 未完成的线程数量:1 [第1个] - 6--->触发并阻塞... [第1个] - 阶段--->1 [第1个] - 注册的线程数量--->3 [第1个] - 未完成的线程数量:3 [第3个] - 未完成的线程数量:3 [第2个] - 未完成的线程数量:3 [第1个] - 睡眠1秒 [第3个] - 睡眠0秒 [第2个] - 睡眠4秒 [第3个] - 未完成的线程数量:3 [第3个] - 5--->触发并阻塞... [第1个] - 未完成的线程数量:2 [第1个] - 5--->触发并阻塞... [第2个] - 未完成的线程数量:1 [第2个] - 5--->触发并阻塞... [第2个] - 阶段--->2 [第2个] - 注册的线程数量--->3 [第2个] - 未完成的线程数量:3 [第3个] - 未完成的线程数量:3 [第1个] - 未完成的线程数量:3 [第2个] - 睡眠0秒 [第3个] - 睡眠2秒 [第2个] - 未完成的线程数量:3 [第1个] - 睡眠2秒 [第2个] - 4--->触发并阻塞... [第3个] - 未完成的线程数量:2 [第1个] - 未完成的线程数量:2 [第3个] - 4--->触发并阻塞... [第1个] - 4--->触发并阻塞... [第1个] - 阶段--->3 [第1个] - 注册的线程数量--->3 [第1个] - 未完成的线程数量:3 [第3个] - 未完成的线程数量:3 [第2个] - 未完成的线程数量:3 [第1个] - 睡眠2秒 [第3个] - 睡眠1秒 [第2个] - 睡眠4秒 [第3个] - 未完成的线程数量:3 [第3个] - 3--->触发并阻塞... [第1个] - 未完成的线程数量:2 [第1个] - 3--->触发并阻塞... [第2个] - 未完成的线程数量:1 [第2个] - 3--->触发并阻塞... [第2个] - 阶段--->4 [第2个] - 注册的线程数量--->3 [第2个] - 未完成的线程数量:3 [第3个] - 未完成的线程数量:3 [第1个] - 未完成的线程数量:3 [第2个] - 睡眠2秒 [第1个] - 睡眠2秒 [第3个] - 睡眠4秒 [第2个] - 未完成的线程数量:3 [第1个] - 未完成的线程数量:3 [第2个] - 2--->触发并阻塞... [第1个] - 2--->触发并阻塞... [第3个] - 未完成的线程数量:1 [第3个] - 2--->触发并阻塞... [第3个] - 阶段--->5 [第3个] - 注册的线程数量--->3 [第3个] - 未完成的线程数量:3 [第1个] - 未完成的线程数量:3 [第2个] - 未完成的线程数量:3 [第3个] - 睡眠2秒 [第1个] - 睡眠3秒 [第2个] - 睡眠0秒 [第2个] - 未完成的线程数量:3 [第2个] - 最后一次触发,并注销自身 [第2个] - 未完成的线程数量:2 [第2个] - 注销完成之后注册的线程数量--->2 [第3个] - 未完成的线程数量:2 [第3个] - 最后一次触发,并注销自身 [第3个] - 未完成的线程数量:1 [第3个] - 注销完成之后注册的线程数量--->1 [第1个] - 未完成的线程数量:1 [第1个] - 最后一次触发,并注销自身 [第1个] - 阶段--->6 [第1个] - 注册的线程数量--->0 [第1个] - 未完成的线程数量:0 [第1个] - 注销完成之后注册的线程数量--->0
實現分階段的CountDownLatch的功能
package concurrent; import concurrent.thread.PhaserThread; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.util.concurrent.Phaser; import static jodd.util.ThreadUtil.sleep; /** * 拿客 * 网站:www.coderknock.com * QQ群:213732117 * 三产 创建于 2016年08月08日 21:25:30。 */ public class PhaserTest { private static final Logger logger = LogManager.getLogger(PhaserTest.class); public static void main(String[] args) { //这里其实相当于已经注册了3个线程,但是并没有实际的线程 int coutNum=3; Phaser phaser = new Phaser(coutNum) { /**此方法有2个作用: * 1、当每一个阶段执行完毕,此方法会被自动调用,因此,重载此方法写入的代码会在每个阶段执行完毕时执行,相当于CyclicBarrier的barrierAction。 * 2、当此方法返回true时,意味着Phaser被终止,因此可以巧妙的设置此方法的返回值来终止所有线程。 例如:若此方法返回值为 phase>=3,其含义为当整个线程执行了4个阶段后,程序终止。 * */ @Override protected boolean onAdvance(int phase, int registeredParties) { logger.debug("阶段--->" + phase); logger.debug("注册的线程数量--->" + registeredParties); return registeredParties==coutNum;//当后只剩下coutNum个线程时说明所有真实的注册的线程已经运行完成,测试可以终止Phaser } }; for (int i = 3; i > 0; i--) { new PhaserThread("第" + i + "个", phaser).start(); } //当phaser未终止时循环注册这块儿可以使用实际的业务处理 while (!phaser.isTerminated()) { sleep(1000); logger.debug("触发一次"); phaser.arrive(); //相当于countDownLatch.countDown(); } } }
-->