1.线程池的概念与 Executors 类的应用
在jdk1.5之后,我们可以使用 Executors 这个类的静态方法来创建线程池对象
//该方法用于创建一个线程池对象,通过指定的参数确定该线程池中有多少个线程对象,该对象实现了ExecutorService接口
1.
public static ExecutorService newFixedThreadPool(int nThreads)
//该方法创建一个线程池,该线程池会根据动态的任务创建线程对象个数,线程池对象实现了ExecutorService接口
2.
public static ExecutorService newCachedThreadPool()
//该方法创建一个线程池对象,该对象中只有一个线程对象,线程池对象实现了ExecutorService接口
3.
public static ExecutorService newSingleThreadExecutor()
4.另外,讲解一下 ScheduledExecutorService 这个接口,通过 Executors 的
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)方法可以获取实现了实现了 ScheduledExecutorService 接口的对象
该对象主要作用是他可以对同一个任务进行多次的线程执行
5. ScheduledExecutorService 和 ExecutorService 和 Executor 的关系是
Executor
|
ExecutorService
|
ScheduledExecutorService
接下来看具体的代码示例
实例1
package com.xiaogao.thread; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; public class NewThreadPool { public static void main(String[] args) { //创建一个线程池,该线程池中含有3个线程对象 //ExecutorService es = Executors.newFixedThreadPool(3); //创建一个具有缓冲线程池的线程 //ExecutorService es = Executors.newCachedThreadPool(); //创建一个具有单线程的线程池 ExecutorService es = Executors.newSingleThreadExecutor(); for(int i=1; i<11; i++) { final int task = i; //循环十次,给线程池分配十次任务,并执行 es.execute(new Runnable() { public void run() { //执行十次循环,这是每个任务需要做的事,打印信息 for(int j=1; j<11; j++) { try { //睡眠20毫秒 Thread.sleep(20); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("第" + task + "号任务执行,第" + j + "次循环"); } } }); } System.out.println("十次任务由三个线程执行完毕,此时java虚拟机还没对退出,必须调用shutdown方法"); es.shutdown(); try { Thread.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); } //这个方法非常危险,会杀死当前正在执行任务的线程,暂停正在处理的等待中的任务,正在执行任务的线程被杀死后,会抛出异常 //es.shutdownNow(); //第一次执行,隔六秒,之后每一次执行隔2秒 Executors.newScheduledThreadPool(3).scheduleAtFixedRate( new Runnable(){ @Override public void run() { System.out.println("bombing!"); }}, 6, 2, TimeUnit.SECONDS); } }
2.java1.5之后锁的使用
在java1.5之前,在多线程之中我们为了防止多线程操纵共享数据发生错误,我们会将相应的代码封装在用 synchronized 修饰的方法,或代码块中,在jdk1.5之后我们可以考虑
使用 Lock 这个接口来实现,这个接口中定义了六个方法,主要的有lock() 和 unlock() 这两个方法,用于得到锁和释放锁, ReentrantLock 这个类实现了 Lock 接口,主要用于
创建创建锁对象
实例2
package com.xiaogao.test; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class TestLock { public static void main(String[] args) { ShareaDate sd = new ShareaDate(); //创建并启动4个线程 new Thread(sd).start(); new Thread(sd).start(); new Thread(sd).start(); new Thread(sd).start(); } //定义一个内部类,该类中封装了共享数据 static class ShareaDate implements Runnable { //创建共享字段 //重设共享数据 private int count = 10000; //拿到锁对象,用于同步代码 Lock lock = new ReentrantLock(); //为了保持操作数据的原子性,原本是加synchronized关键字的,现在可以使用Lock public /* synchronized */ int getCount() { //为了保险起见,万一try代码块中发生了异常,锁无法得到释放,应该将释放锁的代码放在finally中 try { //上锁 lock.lock(); count = count - 1; return count; } finally { //释放锁 lock.unlock(); } } @Override public void run() { while(count > 0 ) { System.out.println(getCount()); } } } }
3.锁的深入研究
ReadWriteLock 维护了一对相关的锁,一个用于只读操作,另一个用于写入操作。只要没有 writer,读取锁可以由多个 reader 线程同时保持。写入锁是独占的。
该类封装了两个抽象方法
Lock readLock()
Lock writeLock()
用于取得读取锁和写入锁
ReentrantReadWriteLock 这个类实现了 ReadWriteLock 这个接口,但是他并没有覆写readLock()方法和writeLick()方法,通过查看源代码,我们发现这个类中封装了两个静态内部类ReentrantReadWriteLock.ReadLock 和 ReentrantReadWriteLock.WriteLock 这两个类中有readLock()方法和writeLock()的具体实现
读锁和写锁到底有什么作用,和普通锁有什么区别,在多线程程序中,我们可能需要取得共享数据,有时需要修改共享数据,取得共享数据时,线程的乱入可能没有什么危害,但是修改数据时危害就很大,这时为了提高效率,就出现了读锁和写锁的概念,具体是什么意思
1.读锁不互斥,当一个线程进入读锁,还没有执行完相应的代码,另一个线程可以进入执行,就像没有加锁一样.
2.读写锁互斥,一个线程进入,另一个线程无法进入
3.写锁互斥,一个线程进入,另一个线程无法进入
相对于普通锁的好处是他能有效的提高性能,如果是普通锁的话,不管你做什么操作,他都不允许别的线程进入,下面代码演示
实例3
package com.xiaogao.test; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantReadWriteLock; /** *测试读锁和写锁的区别 * */ public class TestLock { public static void main(String[] args) { ShareaDate sd = new ShareaDate(); //创建并启动4个线程 new Thread(sd, "1").start(); new Thread(sd, "2").start(); new Thread(sd, "3").start(); new Thread(sd, "4").start(); } //定义一个内部类,该类中封装了共享数据 static class ShareaDate implements Runnable { //创建共享字段 //重设共享数据 private int count = 100; //创建读写锁 ReadWriteLock rel = new ReentrantReadWriteLock(); //为了保持操作数据的原子性,原本是加synchronized关键字的,现在可以使用Lock /* public int getCount() { //拿到锁对象,用于同步代码 //拿到读锁,并上锁 rel.readLock().lock(); try { for(int i=0; i<10; i++) { try { Thread.sleep(20); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("第" + Thread.currentThread().getName() + "号线程进入执行," + i+"count:" + count); } return count; } finally { //释放读锁 rel.readLock().unlock(); } }*/ public void resetCount() { //拿到写锁,并上锁 rel.writeLock().lock(); //保险起见,在判断一次 if(count > 0) { try { count--; for(int i=0; i<10; i++) { //循环10次 System.out.println("第" + Thread.currentThread().getName() + "号线程进入执行," + i+"count:" + count); } } finally { //释放锁 rel.writeLock().unlock(); } } } @Override public void run() { while(count > 0 ) { //getCount(); resetCount(); } } } }
从代码运行结果来看,如果用的是读锁,在内部循环时,其他线程可以进入,而用写锁时,无法进入.
什么是锁的降级:重入还允许从写入锁降级为读取锁,其实现方式是:先获取写入锁,然后获取读取锁,最后释放写入锁。但是,从读取锁升级到写入锁是不可能的。貌似应用比较少
这里就不加举例子了
4.Condition 的简单应用
Condition 将 Object 监视器方法(wait、notify 和 notifyAll)分解成截然不同的对象,以便通过将这些对象与任意 Lock 实现组合使用,为每个对象提供多个等待
set(wait-set)。其中,Lock 替代了 synchronized 方法和语句的使用,Condition 替代了 Object 监视器方法的使用。
咱们可以这么理解 Condition ,它可用于唤醒别的线程和暂停本线程.在原来的 synchronized 修饰的语句块中,我们是通过调用锁的wait()方法和notify()来暂停和唤醒线程,同理
Condition 必须基于线程同步代码块的互斥才行,也就是必须存在于Lock.lock()与Lock.unlock()之中,这样的话我们也可以更好的理解为什么读锁为什么无法拿到 Condition 对象了
加读锁后,线程之间并不互斥,就像没有加 synchronized 一样,自然没有资格使用wait()方法和notify()一样
synchronized 和 Condition 的区别
相同点:都是使用在并发编程当中,并且必须基于一把锁对象
不同点:当有>=3个的线程的话,用notify()方法没有办法指定唤醒哪一个线程,完全是随机的,就算是你使用notifyall()唤醒全部线程也没有办法保证你想要的线程得到锁,但是 Condition就不一样,你可以指定唤醒某个线程,并将锁扔给他.
让我们现在来看一道面试题有三个线程,分别执行三段代码,主线程执行完后,执行子1号线程,之后子2号线程,之后再主线程,这样来回若干次
分析:如果我们不使用 Condition 这个新技术,会很麻烦的,因为我们唤醒其他线程的时候不知道下一个是哪个线程会被唤醒并得到执行,而如果使用 Condition 可以很好的解决这个问题
见具体代码
实例4
package com.xiaogao.test; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class ThreeConditionCommunication { /** * @param args */ public static void main(String[] args) { final Business business = new Business(); //创建3个线程对象并启动他们,每个线程执行的任务不同 new Thread( new Runnable() { @Override public void run() { for(int i=1;i<=50;i++){ business.sub2(i); } } } ).start(); new Thread( new Runnable() { @Override public void run() { for(int i=1;i<=50;i++){ business.sub3(i); } } } ).start(); for(int i=1;i<=50;i++){ business.main(i); } } static class Business { //创建锁对象 Lock lock = new ReentrantLock(); //得到3个Condition对象 Condition condition1 = lock.newCondition(); Condition condition2 = lock.newCondition(); Condition condition3 = lock.newCondition(); private int shouldSub = 1; public void sub2(int i){ //上锁 lock.lock(); try{ while(shouldSub != 2){ try { //正在执行的线程等待,condition2记住了当前线程,后面就可以通过condition2.signal()唤醒该线程; condition2.await(); System.out.println("第二号子线程2222222222"); } catch (Exception e) { e.printStackTrace(); } } for(int j=1;j<=10;j++){ System.out.println("sub2 thread sequence of " + j + ",loop of " + i); } shouldSub = 3; //通过condition3唤醒指定的线程 condition3.signal(); }finally{ lock.unlock(); } } public void sub3(int i){ lock.lock(); try{ while(shouldSub != 3){ try { condition3.await(); System.out.println("第三号子线程33333333333333"); } catch (Exception e) { e.printStackTrace(); } } for(int j=1;j<=20;j++){ System.out.println("sub3 thread sequence of " + j + ",loop of " + i); } shouldSub = 1; condition1.signal(); }finally{ //解锁,释放权限 lock.unlock(); } } public void main(int i){ lock.lock(); try{ while(shouldSub != 1){ try { condition1.await(); System.out.println("主线程1111111111111111111"); } catch (Exception e) { e.printStackTrace(); } } for(int j=1;j<=100;j++){ System.out.println("main thread sequence of " + j + ",loop of " + i); } shouldSub = 2; condition2.signal(); }finally{ lock.unlock(); } } } }
我在每个condition.await();后面加了一句打印代码,就是为了确认某个condition调用signal()后,是不是指定的线程得到权限,并执行,从结果来看,显然如此
下面介绍几个多线程并发的工具类,没什么难度,记得在哪用就可以了
5.Semaphore 的使用
一个计数信号量。从概念上讲,信号量维护了一个许可集。如有必要,在许可可用前会阻塞每一个 acquire(),然后再获取该许可。每个 release() 添加一个许可,
从而可能释放一个正在阻塞的获取者。但是,不使用实际的许可对象,Semaphore 只对可用许可的号码进行计数,并采取相应的行动。
Semaphore 通常用于限制可以访问某些资源(物理或逻辑的)的线程数目。例如,下面的类使用信号量控制对内容池的访问:
实例5
package com.xiaogao.test; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; public class SemaphoreTest { public static void main(String[] args) { ExecutorService service = Executors.newCachedThreadPool(); //创建一个Semaphore对象,并指定其权限个数 final Semaphore sp = new Semaphore(3); //循环10次 for(int i=0;i<10;i++){ Runnable runnable = new Runnable(){ public void run(){ try { //取得一个许可权限,当权限用完后,线程将阻塞,等待其他线程释放权限 sp.acquire(); } catch (InterruptedException e1) { e1.printStackTrace(); } System.out.println("线程" + Thread.currentThread().getName() + "进入,当前已有" + (3-sp.availablePermits()) + "个并发"); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("线程" + Thread.currentThread().getName() + "即将离开"); sp.release(); //下面代码有时候执行不准确,因为其没有和上面的代码合成原子单元 System.out.println("线程" + Thread.currentThread().getName() + "已离开,当前已有" + (3-sp.availablePermits()) + "个并发"); } }; service.execute(runnable); } } }
6.CyclicBarrier
一个同步辅助类,它允许一组线程互相等待,直到到达某个公共屏障点 (common barrier point)。在涉及一组固定大小的线程的程序中,这些线程必须不时地互相等待,
此时 CyclicBarrier 很有用。因为该 barrier 在释放等待线程后可以重用,所以称它为循环 的 barrier
实例5
package com.xiaogao.test; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class CyclicBarrierTest { public static void main(String[] args) { ExecutorService service = Executors.newCachedThreadPool(); final CyclicBarrier cb = new CyclicBarrier(3); for(int i=0;i<3;i++){ Runnable runnable = new Runnable(){ public void run(){ try { Thread.sleep((long)(Math.random()*10000)); System.out.println("线程" + Thread.currentThread().getName() + "即将到达集合地点1,当前已有" + (cb.getNumberWaiting()+1) + "个已经到达," + (cb.getNumberWaiting()==2?"都到齐了,继续走啊":"正在等候")); cb.await(); Thread.sleep((long)(Math.random()*10000)); System.out.println("线程" + Thread.currentThread().getName() + "即将到达集合地点2,当前已有" + (cb.getNumberWaiting()+1) + "个已经到达," + (cb.getNumberWaiting()==2?"都到齐了,继续走啊":"正在等候")); cb.await(); Thread.sleep((long)(Math.random()*10000)); System.out.println("线程" + Thread.currentThread().getName() + "即将到达集合地点3,当前已有" + (cb.getNumberWaiting() + 1) + "个已经到达," + (cb.getNumberWaiting()==2?"都到齐了,继续走啊":"正在等候")); cb.await(); } catch (Exception e) { e.printStackTrace(); } } }; service.execute(runnable); } service.shutdown(); } }
7.CountDownLatch
一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待。 用给定的计数 初始化 CountDownLatch。由于调用了 countDown() 方法,
所以在当前计数到达零之前,await 方法会一直受阻塞。之后,会释放所有等待的线程,await 的所有后续调用都将立即返回。这种现象只出现一次——计数无法被重置。
如果需要重置计数,请考虑使用 CyclicBarrier。
CountDownLatch 与 CyclicBarrier 的区别还有 CyclicBarrier 调用await()方法后,它本身的数值会减1,而 CountDownLatch 必须调用countDown()方法才能减1
实例6
package com.xiaogao.test; import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class CountdownLatchTest { public static void main(String[] args) { ExecutorService service = Executors.newCachedThreadPool(); final CountDownLatch cdOrder = new CountDownLatch(1); final CountDownLatch cdAnswer = new CountDownLatch(3); for(int i=0;i<3;i++){ Runnable runnable = new Runnable(){ public void run(){ try { System.out.println("线程" + Thread.currentThread().getName() + "正准备接受命令"); cdOrder.await(); System.out.println("线程" + Thread.currentThread().getName() + "已接受命令"); Thread.sleep((long)(Math.random()*10000)); System.out.println("线程" + Thread.currentThread().getName() + "回应命令处理结果"); cdAnswer.countDown(); } catch (Exception e) { e.printStackTrace(); } } }; service.execute(runnable); } try { Thread.sleep((long)(Math.random()*10000)); System.out.println("线程" + Thread.currentThread().getName() + "即将发布命令"); cdOrder.countDown(); System.out.println("线程" + Thread.currentThread().getName() + "已发送命令,正在等待结果"); cdAnswer.await(); System.out.println("线程" + Thread.currentThread().getName() + "已收到所有响应结果"); } catch (Exception e) { e.printStackTrace(); } service.shutdown(); } }
8.Exchanger
可以在对中对元素进行配对和交换的线程的同步点。每个线程将条目上的某个方法呈现给 exchange 方法,与伙伴线程进行匹配,并且在返回时接收其伙伴的对象。Exchanger
可能被视为 SynchronousQueue 的双向形式。Exchanger 可能在应用程序(比如遗传算法和管道设计)中很有用。
实例7
package com.xiaogao.test; import java.util.concurrent.Exchanger; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class ExchangerTest { public static void main(String[] args) { ExecutorService service = Executors.newCachedThreadPool(); final Exchanger<String> exchanger = new Exchanger<String>(); service.execute(new Runnable(){ public void run() { try { String data1 = "zxx"; System.out.println("线程" + Thread.currentThread().getName() + "正在把数据" + data1 +"换出去"); Thread.sleep((long)(Math.random()*10000)); String data2 = (String)exchanger.exchange(data1); System.out.println("线程" + Thread.currentThread().getName() + "换回的数据为" + data2); }catch(Exception e){ } } }); service.execute(new Runnable(){ public void run() { try { String data1 = "lhm"; System.out.println("线程" + Thread.currentThread().getName() + "正在把数据" + data1 +"换出去"); Thread.sleep((long)(Math.random()*10000)); String data2 = (String)exchanger.exchange(data1); System.out.println("线程" + Thread.currentThread().getName() + "换回的数据为" + data2); }catch(Exception e){ } } }); service.shutdown(); } }
9.下面介绍一下线程阻塞队列的类和接口
接口:BlockingQueue
支持两个附加操作的 Queue,这两个操作是:获取元素时等待队列变为非空,以及存储元素时等待空间变得可用。
BlockingQueue 方法以四种形式出现,对于不能立即满足但可能在将来某一时刻可以满足的操作,这四种形式的
处理方式不同:第一种是抛出一个异常,第二种是返回一个特殊值(null 或 false,具体取决于操作),
第三种是在操作可以成功前,无限期地阻塞当前线程,第四种是在放弃前只在给定的最大时间限制内阻塞。
下表中总结了这些方法:
抛出异常 特殊值 阻塞 超时
插入 add(e) offer(e) put(e) offer(e, time, unit)
移除 remove() poll() take() poll(time, unit)
检查 element() peek() 不可用 不可用
实现了这个接口的类有 ArrayBlockingQueue
一个由数组支持的有界阻塞队列。此队列按 FIFO(先进先出)原则对元素进行排序。队列的头部 是在队列中存在时间最长的元素。
队列的尾部 是在队列中存在时间最短的元素。新元素插入到队列的尾部,队列获取操作则是从队列头部开始获得元素。
这个类与 Semaphore 很像,但与 Semaphore 不同的是 Semaphore 中装的是许可权限,一个线程拿到后就少一个名额,当权限为0时
必须线程自己释放才能使别的线程得到执行机会,而 ArrayBlockingQueue 是代表一个数组序列,当数组序列满时,再调用put()方法
当前线程将会阻塞,当数组序列为0时,调用take()方法线程也将会阻塞, ArrayBlockingQueue 是多个线程操纵一个数组序列,当一个
线程调用一个方法时,可能会使另一个线程从阻塞状态恢复回来
下面看两个示例代码
实例8
package com.xiaogao.test; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; public class BlockingQueueTest { public static void main(String[] args) { final BlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(3); for(int i=0;i<2;i++){ new Thread(){ public void run(){ while(true){ try { //随眠时间1000毫秒以下的任意值时间 Thread.sleep((long)(Math.random()*1000)); System.out.println(Thread.currentThread().getName() + "准备放数据!"); //添加数据,如果已经填满,将阻塞 queue.put(1); System.out.println(Thread.currentThread().getName() + "已经放了数据," + "队列目前有" + queue.size() + "个数据"); } catch (InterruptedException e) { e.printStackTrace(); } } } }.start(); } new Thread(){ public void run(){ while(true){ try { //睡眠1秒后取数据 Thread.sleep(1000); System.out.println(Thread.currentThread().getName() + "准备取数据!"); //取得数据,如果没有数据将会阻塞 queue.take(); System.out.println(Thread.currentThread().getName() + "已经取走数据," + "队列目前有" + queue.size() + "个数据"); } catch (InterruptedException e) { e.printStackTrace(); } } } }.start(); } }
从程序执行的结果来看,很明显,程序中一共有三个线程并发执行,两个线程负责向集合中添加数据,另外还有一个负责取出数据,并且添加数据的频率明显快于取出数据的
频率,所以总会显示集合队列中有3个数据,改一下睡眠的时间,结果又会不一样
用两个具有1个空间的的队列来实现同步功能
问题分析
1.实现同步功能,首先必须要有两个线程,每个线程处理一个队列
2.为了避免阻塞的产生,应该当一个线程阻塞时,另外一个线程得以运行,在第二个线程阻塞前使第一个线程致使阻塞的条件得以解除,这样往复执行
实例9
package com.xiaogao.test; import java.util.Collections; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.atomic.AtomicInteger; public class BlockingQueueCommunication { public static void main(String[] args) { final Business business = new Business(); //创建并执行线程,线程内部循环50次 new Thread( new Runnable() { public void run() { for(int i=1;i<=50;i++){ business.sub(i); } } } ).start(); //主线程循环50次 for(int i=1;i<=50;i++){ business.main(i); } } static class Business { //创建两个序列,每个序列的容量为1 BlockingQueue<Integer> queue1 = new ArrayBlockingQueue<Integer>(1); BlockingQueue<Integer> queue2 = new ArrayBlockingQueue<Integer>(1); //让Business对象创建时就填满队列queue2队列 { try { queue2.put(1); } catch (InterruptedException e) { e.printStackTrace(); } } public void sub(int i){ try { queue1.put(1); } catch (InterruptedException e) { e.printStackTrace(); } for(int j=1;j<=10;j++){ System.out.println("sub thread sequece of " + j + ",loop of " + i); } try { queue2.take(); } catch (InterruptedException e) { e.printStackTrace(); } } public void main(int i){ try { queue2.put(1); } catch (InterruptedException e1) { e1.printStackTrace(); } for(int j=1;j<=100;j++){ System.out.println("main thread sequece of " + j + ",loop of " + i); } try { queue1.take(); } catch (InterruptedException e) { e.printStackTrace(); } } } }
以上是java关于jdk1.5线程加强的专题的详细内容。更多信息请关注PHP中文网其他相关文章!