1. The concept of thread pool and the application of the Executors class
After jdk1.5, we can use the static method of the Executors class to create a thread pool object
/ /This method is used to create a thread pool object and determine how many thread objects there are in the thread pool through the specified parameters. The object implements the ExecutorService interface
1.
public static ExecutorService newFixedThreadPool(int nThreads)
//This method creates a thread pool, which will create the number of thread objects based on dynamic tasks. The thread pool object implements the ExecutorService interface
2.
public static ExecutorService newCachedThreadPool()
//This method creates a thread pool object. There is only one thread object in the object. The thread pool object implements the ExecutorService interface
3.
public static ExecutorService newSingleThreadExecutor()
4. In addition, explain the ScheduledExecutorService interface. Through the
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) method of Executors, you can obtain objects that implement the ScheduledExecutorService interface.
The main function of this object is that it can perform multiple thread executions on the same task
5. The relationship between ScheduledExecutorService and ExecutorService and Executor is
Executor
|
ExecutorService
|
ScheduledExecutorService
Let’s look at the specific code examples
Example 1
package com.xiaogao.thread; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; public class NewThreadPool { public static void main(String[] args) { //创建一个线程池,该线程池中含有3个线程对象 //ExecutorService es = Executors.newFixedThreadPool(3); //创建一个具有缓冲线程池的线程 //ExecutorService es = Executors.newCachedThreadPool(); //创建一个具有单线程的线程池 ExecutorService es = Executors.newSingleThreadExecutor(); for(int i=1; i<11; i++) { final int task = i; //循环十次,给线程池分配十次任务,并执行 es.execute(new Runnable() { public void run() { //执行十次循环,这是每个任务需要做的事,打印信息 for(int j=1; j<11; j++) { try { //睡眠20毫秒 Thread.sleep(20); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("第" + task + "号任务执行,第" + j + "次循环"); } } }); } System.out.println("十次任务由三个线程执行完毕,此时java虚拟机还没对退出,必须调用shutdown方法"); es.shutdown(); try { Thread.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); } //这个方法非常危险,会杀死当前正在执行任务的线程,暂停正在处理的等待中的任务,正在执行任务的线程被杀死后,会抛出异常 //es.shutdownNow(); //第一次执行,隔六秒,之后每一次执行隔2秒 Executors.newScheduledThreadPool(3).scheduleAtFixedRate( new Runnable(){ @Override public void run() { System.out.println("bombing!"); }}, 6, 2, TimeUnit.SECONDS); } }
2. The use of locks after java1.5
Before java1.5, in multi-threading, we wanted to prevent errors in multi-threaded manipulation of shared data. , we will encapsulate the corresponding code in a method or code block modified with synchronized. After jdk1.5, we can consider using the Lock interface to implement it. This interface defines six methods, the main ones are The two methods lock() and unlock() are used to obtain and release locks. The ReentrantLock class implements the Lock interface and is mainly used to
create lock objects
Example 2
package com.xiaogao.test; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class TestLock { public static void main(String[] args) { ShareaDate sd = new ShareaDate(); //创建并启动4个线程 new Thread(sd).start(); new Thread(sd).start(); new Thread(sd).start(); new Thread(sd).start(); } //定义一个内部类,该类中封装了共享数据 static class ShareaDate implements Runnable { //创建共享字段 //重设共享数据 private int count = 10000; //拿到锁对象,用于同步代码 Lock lock = new ReentrantLock(); //为了保持操作数据的原子性,原本是加synchronized关键字的,现在可以使用Lock public /* synchronized */ int getCount() { //为了保险起见,万一try代码块中发生了异常,锁无法得到释放,应该将释放锁的代码放在finally中 try { //上锁 lock.lock(); count = count - 1; return count; } finally { //释放锁 lock.unlock(); } } @Override public void run() { while(count > 0 ) { System.out.println(getCount()); } } } }
ReadWriteLock maintains a pair of related locks, one for read-only operations and the other for write operations. As long as there is no writer, the read lock can be held by multiple reader threads simultaneously. Write locks are exclusive.
This class encapsulates two abstract methods
Lock readLock()
Lock writeLock()
Used to obtain read locks and write locks
ReentrantReadWriteLock This class implements the ReadWriteLock interface. But he did not override the readLock() method and writeLick() method. By looking at the source code, we found that this class encapsulates two static internal classes, ReentrantReadWriteLock.ReadLock and ReentrantReadWriteLock.WriteLock. These two classes have readLock() methods. and the specific implementation of writeLock()
What are the functions of read locks and write locks, and what are the differences from ordinary locks? In multi-threaded programs, we may need to obtain shared data, and sometimes need to modify the shared data. When obtaining shared data , the intrusion of threads may not cause any harm, but the harm will be great when modifying data. At this time, in order to improve efficiency, the concepts of read locks and write locks appeared. What do they mean specifically?
1. Read locks are not mutually exclusive. When a thread enters the read lock and has not finished executing the corresponding code, another thread can enter the execution as if it were not locked.
2. Read-write locks are mutually exclusive. If one thread enters, the other thread cannot enter
3. Write locks are mutually exclusive. If one thread enters, another thread cannot enter.
The advantage over ordinary locks is that it can effectively improve performance. If it is an ordinary lock, no matter what operations you do, it will not Allow other threads to enter, the following code demonstrates
Example 3
package com.xiaogao.test; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantReadWriteLock; /** *测试读锁和写锁的区别 * */ public class TestLock { public static void main(String[] args) { ShareaDate sd = new ShareaDate(); //创建并启动4个线程 new Thread(sd, "1").start(); new Thread(sd, "2").start(); new Thread(sd, "3").start(); new Thread(sd, "4").start(); } //定义一个内部类,该类中封装了共享数据 static class ShareaDate implements Runnable { //创建共享字段 //重设共享数据 private int count = 100; //创建读写锁 ReadWriteLock rel = new ReentrantReadWriteLock(); //为了保持操作数据的原子性,原本是加synchronized关键字的,现在可以使用Lock /* public int getCount() { //拿到锁对象,用于同步代码 //拿到读锁,并上锁 rel.readLock().lock(); try { for(int i=0; i<10; i++) { try { Thread.sleep(20); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("第" + Thread.currentThread().getName() + "号线程进入执行," + i+"count:" + count); } return count; } finally { //释放读锁 rel.readLock().unlock(); } }*/ public void resetCount() { //拿到写锁,并上锁 rel.writeLock().lock(); //保险起见,在判断一次 if(count > 0) { try { count--; for(int i=0; i<10; i++) { //循环10次 System.out.println("第" + Thread.currentThread().getName() + "号线程进入执行," + i+"count:" + count); } } finally { //释放锁 rel.writeLock().unlock(); } } } @Override public void run() { while(count > 0 ) { //getCount(); resetCount(); } } } }
从代码运行结果来看,如果用的是读锁,在内部循环时,其他线程可以进入,而用写锁时,无法进入.
什么是锁的降级:重入还允许从写入锁降级为读取锁,其实现方式是:先获取写入锁,然后获取读取锁,最后释放写入锁。但是,从读取锁升级到写入锁是不可能的。貌似应用比较少
这里就不加举例子了
4.Condition 的简单应用
Condition 将 Object 监视器方法(wait、notify 和 notifyAll)分解成截然不同的对象,以便通过将这些对象与任意 Lock 实现组合使用,为每个对象提供多个等待
set(wait-set)。其中,Lock 替代了 synchronized 方法和语句的使用,Condition 替代了 Object 监视器方法的使用。
咱们可以这么理解 Condition ,它可用于唤醒别的线程和暂停本线程.在原来的 synchronized 修饰的语句块中,我们是通过调用锁的wait()方法和notify()来暂停和唤醒线程,同理
Condition 必须基于线程同步代码块的互斥才行,也就是必须存在于Lock.lock()与Lock.unlock()之中,这样的话我们也可以更好的理解为什么读锁为什么无法拿到 Condition 对象了
加读锁后,线程之间并不互斥,就像没有加 synchronized 一样,自然没有资格使用wait()方法和notify()一样
synchronized 和 Condition 的区别
相同点:都是使用在并发编程当中,并且必须基于一把锁对象
不同点:当有>=3个的线程的话,用notify()方法没有办法指定唤醒哪一个线程,完全是随机的,就算是你使用notifyall()唤醒全部线程也没有办法保证你想要的线程得到锁,但是 Condition就不一样,你可以指定唤醒某个线程,并将锁扔给他.
让我们现在来看一道面试题有三个线程,分别执行三段代码,主线程执行完后,执行子1号线程,之后子2号线程,之后再主线程,这样来回若干次
分析:如果我们不使用 Condition 这个新技术,会很麻烦的,因为我们唤醒其他线程的时候不知道下一个是哪个线程会被唤醒并得到执行,而如果使用 Condition 可以很好的解决这个问题
见具体代码
实例4
package com.xiaogao.test; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class ThreeConditionCommunication { /** * @param args */ public static void main(String[] args) { final Business business = new Business(); //创建3个线程对象并启动他们,每个线程执行的任务不同 new Thread( new Runnable() { @Override public void run() { for(int i=1;i<=50;i++){ business.sub2(i); } } } ).start(); new Thread( new Runnable() { @Override public void run() { for(int i=1;i<=50;i++){ business.sub3(i); } } } ).start(); for(int i=1;i<=50;i++){ business.main(i); } } static class Business { //创建锁对象 Lock lock = new ReentrantLock(); //得到3个Condition对象 Condition condition1 = lock.newCondition(); Condition condition2 = lock.newCondition(); Condition condition3 = lock.newCondition(); private int shouldSub = 1; public void sub2(int i){ //上锁 lock.lock(); try{ while(shouldSub != 2){ try { //正在执行的线程等待,condition2记住了当前线程,后面就可以通过condition2.signal()唤醒该线程; condition2.await(); System.out.println("第二号子线程2222222222"); } catch (Exception e) { e.printStackTrace(); } } for(int j=1;j<=10;j++){ System.out.println("sub2 thread sequence of " + j + ",loop of " + i); } shouldSub = 3; //通过condition3唤醒指定的线程 condition3.signal(); }finally{ lock.unlock(); } } public void sub3(int i){ lock.lock(); try{ while(shouldSub != 3){ try { condition3.await(); System.out.println("第三号子线程33333333333333"); } catch (Exception e) { e.printStackTrace(); } } for(int j=1;j<=20;j++){ System.out.println("sub3 thread sequence of " + j + ",loop of " + i); } shouldSub = 1; condition1.signal(); }finally{ //解锁,释放权限 lock.unlock(); } } public void main(int i){ lock.lock(); try{ while(shouldSub != 1){ try { condition1.await(); System.out.println("主线程1111111111111111111"); } catch (Exception e) { e.printStackTrace(); } } for(int j=1;j<=100;j++){ System.out.println("main thread sequence of " + j + ",loop of " + i); } shouldSub = 2; condition2.signal(); }finally{ lock.unlock(); } } } }
我在每个condition.await();后面加了一句打印代码,就是为了确认某个condition调用signal()后,是不是指定的线程得到权限,并执行,从结果来看,显然如此
下面介绍几个多线程并发的工具类,没什么难度,记得在哪用就可以了
5.Semaphore 的使用
一个计数信号量。从概念上讲,信号量维护了一个许可集。如有必要,在许可可用前会阻塞每一个 acquire(),然后再获取该许可。每个 release() 添加一个许可,
从而可能释放一个正在阻塞的获取者。但是,不使用实际的许可对象,Semaphore 只对可用许可的号码进行计数,并采取相应的行动。
Semaphore 通常用于限制可以访问某些资源(物理或逻辑的)的线程数目。例如,下面的类使用信号量控制对内容池的访问:
实例5
package com.xiaogao.test; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; public class SemaphoreTest { public static void main(String[] args) { ExecutorService service = Executors.newCachedThreadPool(); //创建一个Semaphore对象,并指定其权限个数 final Semaphore sp = new Semaphore(3); //循环10次 for(int i=0;i<10;i++){ Runnable runnable = new Runnable(){ public void run(){ try { //取得一个许可权限,当权限用完后,线程将阻塞,等待其他线程释放权限 sp.acquire(); } catch (InterruptedException e1) { e1.printStackTrace(); } System.out.println("线程" + Thread.currentThread().getName() + "进入,当前已有" + (3-sp.availablePermits()) + "个并发"); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("线程" + Thread.currentThread().getName() + "即将离开"); sp.release(); //下面代码有时候执行不准确,因为其没有和上面的代码合成原子单元 System.out.println("线程" + Thread.currentThread().getName() + "已离开,当前已有" + (3-sp.availablePermits()) + "个并发"); } }; service.execute(runnable); } } }
6.CyclicBarrier
一个同步辅助类,它允许一组线程互相等待,直到到达某个公共屏障点 (common barrier point)。在涉及一组固定大小的线程的程序中,这些线程必须不时地互相等待,
此时 CyclicBarrier 很有用。因为该 barrier 在释放等待线程后可以重用,所以称它为循环 的 barrier
实例5
package com.xiaogao.test; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class CyclicBarrierTest { public static void main(String[] args) { ExecutorService service = Executors.newCachedThreadPool(); final CyclicBarrier cb = new CyclicBarrier(3); for(int i=0;i<3;i++){ Runnable runnable = new Runnable(){ public void run(){ try { Thread.sleep((long)(Math.random()*10000)); System.out.println("线程" + Thread.currentThread().getName() + "即将到达集合地点1,当前已有" + (cb.getNumberWaiting()+1) + "个已经到达," + (cb.getNumberWaiting()==2?"都到齐了,继续走啊":"正在等候")); cb.await(); Thread.sleep((long)(Math.random()*10000)); System.out.println("线程" + Thread.currentThread().getName() + "即将到达集合地点2,当前已有" + (cb.getNumberWaiting()+1) + "个已经到达," + (cb.getNumberWaiting()==2?"都到齐了,继续走啊":"正在等候")); cb.await(); Thread.sleep((long)(Math.random()*10000)); System.out.println("线程" + Thread.currentThread().getName() + "即将到达集合地点3,当前已有" + (cb.getNumberWaiting() + 1) + "个已经到达," + (cb.getNumberWaiting()==2?"都到齐了,继续走啊":"正在等候")); cb.await(); } catch (Exception e) { e.printStackTrace(); } } }; service.execute(runnable); } service.shutdown(); } }
7.CountDownLatch
一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待。 用给定的计数 初始化 CountDownLatch。由于调用了 countDown() 方法,
所以在当前计数到达零之前,await 方法会一直受阻塞。之后,会释放所有等待的线程,await 的所有后续调用都将立即返回。这种现象只出现一次——计数无法被重置。
如果需要重置计数,请考虑使用 CyclicBarrier。
CountDownLatch 与 CyclicBarrier 的区别还有 CyclicBarrier 调用await()方法后,它本身的数值会减1,而 CountDownLatch 必须调用countDown()方法才能减1
实例6
package com.xiaogao.test; import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class CountdownLatchTest { public static void main(String[] args) { ExecutorService service = Executors.newCachedThreadPool(); final CountDownLatch cdOrder = new CountDownLatch(1); final CountDownLatch cdAnswer = new CountDownLatch(3); for(int i=0;i<3;i++){ Runnable runnable = new Runnable(){ public void run(){ try { System.out.println("线程" + Thread.currentThread().getName() + "正准备接受命令"); cdOrder.await(); System.out.println("线程" + Thread.currentThread().getName() + "已接受命令"); Thread.sleep((long)(Math.random()*10000)); System.out.println("线程" + Thread.currentThread().getName() + "回应命令处理结果"); cdAnswer.countDown(); } catch (Exception e) { e.printStackTrace(); } } }; service.execute(runnable); } try { Thread.sleep((long)(Math.random()*10000)); System.out.println("线程" + Thread.currentThread().getName() + "即将发布命令"); cdOrder.countDown(); System.out.println("线程" + Thread.currentThread().getName() + "已发送命令,正在等待结果"); cdAnswer.await(); System.out.println("线程" + Thread.currentThread().getName() + "已收到所有响应结果"); } catch (Exception e) { e.printStackTrace(); } service.shutdown(); } }
8.Exchanger
可以在对中对元素进行配对和交换的线程的同步点。每个线程将条目上的某个方法呈现给 exchange 方法,与伙伴线程进行匹配,并且在返回时接收其伙伴的对象。Exchanger
可能被视为 SynchronousQueue 的双向形式。Exchanger 可能在应用程序(比如遗传算法和管道设计)中很有用。
实例7
package com.xiaogao.test; import java.util.concurrent.Exchanger; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class ExchangerTest { public static void main(String[] args) { ExecutorService service = Executors.newCachedThreadPool(); final Exchanger<String> exchanger = new Exchanger<String>(); service.execute(new Runnable(){ public void run() { try { String data1 = "zxx"; System.out.println("线程" + Thread.currentThread().getName() + "正在把数据" + data1 +"换出去"); Thread.sleep((long)(Math.random()*10000)); String data2 = (String)exchanger.exchange(data1); System.out.println("线程" + Thread.currentThread().getName() + "换回的数据为" + data2); }catch(Exception e){ } } }); service.execute(new Runnable(){ public void run() { try { String data1 = "lhm"; System.out.println("线程" + Thread.currentThread().getName() + "正在把数据" + data1 +"换出去"); Thread.sleep((long)(Math.random()*10000)); String data2 = (String)exchanger.exchange(data1); System.out.println("线程" + Thread.currentThread().getName() + "换回的数据为" + data2); }catch(Exception e){ } } }); service.shutdown(); } }
9.下面介绍一下线程阻塞队列的类和接口
接口:BlockingQueue
支持两个附加操作的 Queue,这两个操作是:获取元素时等待队列变为非空,以及存储元素时等待空间变得可用。
BlockingQueue 方法以四种形式出现,对于不能立即满足但可能在将来某一时刻可以满足的操作,这四种形式的
处理方式不同:第一种是抛出一个异常,第二种是返回一个特殊值(null 或 false,具体取决于操作),
第三种是在操作可以成功前,无限期地阻塞当前线程,第四种是在放弃前只在给定的最大时间限制内阻塞。
下表中总结了这些方法:
抛出异常 特殊值 阻塞 超时
插入 add(e) offer(e) put(e) offer(e, time, unit)
移除 remove() poll() take() poll(time, unit)
检查 element() peek() 不可用 不可用
实现了这个接口的类有 ArrayBlockingQueue
一个由数组支持的有界阻塞队列。此队列按 FIFO(先进先出)原则对元素进行排序。队列的头部 是在队列中存在时间最长的元素。
队列的尾部 是在队列中存在时间最短的元素。新元素插入到队列的尾部,队列获取操作则是从队列头部开始获得元素。
这个类与 Semaphore 很像,但与 Semaphore 不同的是 Semaphore 中装的是许可权限,一个线程拿到后就少一个名额,当权限为0时
必须线程自己释放才能使别的线程得到执行机会,而 ArrayBlockingQueue 是代表一个数组序列,当数组序列满时,再调用put()方法
当前线程将会阻塞,当数组序列为0时,调用take()方法线程也将会阻塞, ArrayBlockingQueue 是多个线程操纵一个数组序列,当一个
线程调用一个方法时,可能会使另一个线程从阻塞状态恢复回来
下面看两个示例代码
实例8
package com.xiaogao.test; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; public class BlockingQueueTest { public static void main(String[] args) { final BlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(3); for(int i=0;i<2;i++){ new Thread(){ public void run(){ while(true){ try { //随眠时间1000毫秒以下的任意值时间 Thread.sleep((long)(Math.random()*1000)); System.out.println(Thread.currentThread().getName() + "准备放数据!"); //添加数据,如果已经填满,将阻塞 queue.put(1); System.out.println(Thread.currentThread().getName() + "已经放了数据," + "队列目前有" + queue.size() + "个数据"); } catch (InterruptedException e) { e.printStackTrace(); } } } }.start(); } new Thread(){ public void run(){ while(true){ try { //睡眠1秒后取数据 Thread.sleep(1000); System.out.println(Thread.currentThread().getName() + "准备取数据!"); //取得数据,如果没有数据将会阻塞 queue.take(); System.out.println(Thread.currentThread().getName() + "已经取走数据," + "队列目前有" + queue.size() + "个数据"); } catch (InterruptedException e) { e.printStackTrace(); } } } }.start(); } }
从程序执行的结果来看,很明显,程序中一共有三个线程并发执行,两个线程负责向集合中添加数据,另外还有一个负责取出数据,并且添加数据的频率明显快于取出数据的
频率,所以总会显示集合队列中有3个数据,改一下睡眠的时间,结果又会不一样
用两个具有1个空间的的队列来实现同步功能
问题分析
1.实现同步功能,首先必须要有两个线程,每个线程处理一个队列
2.为了避免阻塞的产生,应该当一个线程阻塞时,另外一个线程得以运行,在第二个线程阻塞前使第一个线程致使阻塞的条件得以解除,这样往复执行
实例9
package com.xiaogao.test; import java.util.Collections; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.atomic.AtomicInteger; public class BlockingQueueCommunication { public static void main(String[] args) { final Business business = new Business(); //创建并执行线程,线程内部循环50次 new Thread( new Runnable() { public void run() { for(int i=1;i<=50;i++){ business.sub(i); } } } ).start(); //主线程循环50次 for(int i=1;i<=50;i++){ business.main(i); } } static class Business { //创建两个序列,每个序列的容量为1 BlockingQueue<Integer> queue1 = new ArrayBlockingQueue<Integer>(1); BlockingQueue<Integer> queue2 = new ArrayBlockingQueue<Integer>(1); //让Business对象创建时就填满队列queue2队列 { try { queue2.put(1); } catch (InterruptedException e) { e.printStackTrace(); } } public void sub(int i){ try { queue1.put(1); } catch (InterruptedException e) { e.printStackTrace(); } for(int j=1;j<=10;j++){ System.out.println("sub thread sequece of " + j + ",loop of " + i); } try { queue2.take(); } catch (InterruptedException e) { e.printStackTrace(); } } public void main(int i){ try { queue2.put(1); } catch (InterruptedException e1) { e1.printStackTrace(); } for(int j=1;j<=100;j++){ System.out.println("main thread sequece of " + j + ",loop of " + i); } try { queue1.take(); } catch (InterruptedException e) { e.printStackTrace(); } } } }
The above is the detailed content of Java special topic on jdk1.5 thread enhancement. For more information, please follow other related articles on the PHP Chinese website!