In general development, I often see that many students only use some basic methods in dealing with the Java concurrent development model. For example, Volatile, synchronized. Advanced concurrency packages like Lock and atomic are not often used by many people. I think most of the reasons are caused by the lack of attributes of the principle. In busy development work, who can accurately grasp and use the correct concurrency model?
So based on this idea recently, I plan to organize this part of the concurrency control mechanism into an article. It is not only a memory of the knowledge I have mastered, but also I hope that the content mentioned in this article can help most developers.
Parallel program development inevitably involves issues such as multi-threading, multi-task collaboration and data sharing. In the JDK, multiple ways are provided to achieve concurrency control between multiple threads. For example, commonly used ones: internal locks, reentrant locks, read-write locks and semaphores.
In Java, each thread has a working memory area, which stores the values of variables in the main memory shared by all threads. copy. When a thread executes, it operates on these variables in its own working memory.
In order to access a shared variable, a thread usually first acquires the lock and clears its working memory area. This ensures that the shared variable is correctly loaded from the shared memory area of all threads into the thread's working memory area. , when the thread is unlocked, the values of the variables in the working memory area are guaranteed to be associated to the shared memory.
When a thread uses a variable, regardless of whether the program uses thread synchronization operations correctly, the value it obtains must be the value stored in the variable by itself or other threads. For example, if two threads store different values or object references into the same shared variable, then the value of the variable will belong to either this thread or that thread, and the value of the shared variable will not belong to both threads. A combination of reference values.
A variable is an address that a Java program can access. It includes not only basic type variables, reference type variables, but also array type variables. Variables stored in the main memory area can be shared by all threads, but it is impossible for one thread to access the parameters or local variables of another thread, so developers do not have to worry about the thread safety of local variables.
Since each thread has its own working memory area, when a thread changes the data in its own working memory , may not be visible to other threads. To this end, you can use the volatile keyword to allow all threads to read and write variables in memory, thereby making volatile variables visible across multiple threads.
Variables declared as volatile can achieve the following guarantees:
1、其他线程对变量的修改,可以及时反应在当前线程中; 2、确保当前线程对volatile变量的修改,能及时写回到共享内存中,并被其他线程所见; 3、使用volatile声明的变量,编译器会保证其有序性。
Synchronization The keyword synchronized is one of the most commonly used synchronization methods in the Java language. In early versions of the JDK, the performance of synchronized was not very good, and it was suitable for situations where lock competition was not particularly intense. In JDK6, the gap between synchronized and unfair locks has been narrowed. More importantly, synchronized is more concise and clear, and the code is more readable and maintainable.
Method to lock an object:
public synchronized void method(){}
When the method() method is called, the calling thread must first obtain the current object. If the current object is locked Held by other threads, the calling thread will wait. After the violation is completed, the object lock will be released. The above method is equivalent to the following writing:
public void method(){ synchronized(this){ // do something … } }
Secondly, using synchronized can also be used Construct a synchronization block. Compared with the synchronization method, the synchronization block can control the scope of the synchronization code more precisely. A small synchronization code is very fast in and out of the lock, allowing the system to have higher throughput.
public void method(Object o){ // before synchronized(o){ // do something ... } // after }
synchronized can also be used for static functions:
public synchronized static void method(){}
Be sure to pay attention here, the synchronized lock is added to On the current Class object, therefore, all calls to this method must obtain the lock of the Class object.
Although synchronized can ensure the thread safety of objects or code segments, using synchronized alone is not enough to control thread interactions with complex logic. In order to achieve interaction between multiple threads, you also need to use the wait() and notify() methods of the Object object.
Typical usage:
synchronized(obj){ while(<?>){ obj.wait(); // 收到通知后,继续执行。 } }
Before using the wait() method, you need to obtain the object lock. When the wait() method is executed, the current thread may release the exclusive lock of obj for use by other threads.
When the thread waiting on obj receives obj.notify(), it can regain the exclusive lock of obj and continue running. Note that the notify() method randomly awakensa thread waiting for the current object.
The following is an implementation of a blocking queue:
public class BlockQueue{ private List list = new ArrayList(); public synchronized Object pop() throws InterruptedException{ while (list.size()==0){ this.wait(); } if (list.size()>0){ return list.remove(0); } else{ return null; } } public synchronized Object put(Object obj){ list.add(obj); this.notify(); } }
synchronized with wait(), notify() should be mastered by Java developers basic skills.
Reentrantlock称为重入锁。它比synchronized拥有更加强大的功能,它可以中断、可定时。在高并发的情况下,它比synchronized有明显的性能优势。
Reentrantlock提供了公平和非公平两种锁。公平锁是对锁的获取是先进先出,而非公平锁是可以插队的。当然从性能上分析,非公平锁的性能要好得多。因此,在无特殊需要,应该优选非公平锁,但是synchronized提供锁业不是绝对公平的。Reentrantlock在构造的时候可以指定锁是否公平。
在使用重入锁时,一定要在程序最后释放锁。一般释放锁的代码要写在finally里。否则,如果程序出现异常,Loack就永远无法释放了。synchronized的锁是JVM最后自动释放的。
经典使用方式如下:
try { if (lock.tryLock(5, TimeUnit.SECONDS)) { //如果已经被lock,尝试等待5s,看是否可以获得锁,如果5s后仍然无法获得锁则返回false继续执行 // lock.lockInterruptibly();可以响应中断事件 try { //操作 } finally { lock.unlock(); } } } catch (InterruptedException e) { e.printStackTrace(); //当前线程被中断时(interrupt),会抛InterruptedException }
Reentrantlock提供了非常丰富的锁控制功能,灵活应用这些控制方法,可以提高应用程序的性能。不过这里并非是极力推荐使用Reentrantlock。重入锁算是JDK中提供的高级开发工具。
读写分离是一种非常常见的数据处理思想。在sql中应该算是必须用到的技术。ReadWriteLock是在JDK5中提供的读写分离锁。读写分离锁可以有效地帮助减少锁竞争,以提升系统性能。读写分离使用场景主要是如果在系统中,读操作次数远远大于写操作。使用方式如下:
private ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock(); private Lock readLock = readWriteLock.readLock(); private Lock writeLock = readWriteLock.writeLock(); public Object handleRead() throws InterruptedException { try { readLock.lock(); Thread.sleep(1000); return value; }finally{ readLock.unlock(); } } public Object handleRead() throws InterruptedException { try { writeLock.lock(); Thread.sleep(1000); return value; }finally{ writeLock.unlock(); } }
Conditiond对象用于协调多线程间的复杂协作。主要与锁相关联。通过Lock接口中的newCondition()方法可以生成一个与Lock绑定的Condition实例。Condition对象和锁的关系就如用Object.wait()、Object.notify()两个函数以及synchronized关键字一样。
这里可以把ArrayBlockingQueue的源码摘出来看一下:
public class ArrayBlockingQueue extends AbstractQueue implements BlockingQueue, java.io.Serializable { /** Main lock guarding all access */ final ReentrantLock lock; /** Condition for waiting takes */ private final Condition notEmpty; /** Condition for waiting puts */ private final Condition notFull; public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); // 生成与Lock绑定的Condition notFull = lock.newCondition(); } public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) notFull.await(); insert(e); } finally { lock.unlock(); } } private void insert(E x) { items[putIndex] = x; putIndex = inc(putIndex); ++count; notEmpty.signal(); // 通知 } public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) // 如果队列为空 notEmpty.await(); // 则消费者队列要等待一个非空的信号 return extract(); } finally { lock.unlock(); } } private E extract() { final Object[] items = this.items; E x = this.<E>cast(items[takeIndex]); items[takeIndex] = null; takeIndex = inc(takeIndex); --count; notFull.signal(); // 通知put() 线程队列已有空闲空间 return x; } // other code }
信号量为多线程协作提供了更为强大的控制方法。信号量是对锁的扩展。无论是内部锁synchronized还是重入锁ReentrantLock,一次都允许一个线程访问一个资源,而信号量却可以指定多个线程同时访问某一个资源。从构造函数可以看出:
public Semaphore(int permits) {}
public Semaphore(int permits, boolean fair){} // 可以指定是否公平
permits指定了信号量的准入书,也就是同时能申请多少个许可。当每个线程每次只申请一个许可时,这就相当于指定了同时有多少个线程可以访问某一个资源。这里罗列一下主要方法的使用:
public void acquire() throws InterruptedException {} //尝试获得一个准入的许可。若无法获得,则线程会等待,知道有线程释放一个许可或者当前线程被中断。 public void acquireUninterruptibly(){} // 类似于acquire(),但是不会响应中断。 public boolean tryAcquire(){} // 尝试获取,如果成功则为true,否则false。这个方法不会等待,立即返回。 public boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException {} // 尝试等待多长时间 public void release() //用于在现场访问资源结束后,释放一个许可,以使其他等待许可的线程可以进行资源访问。
下面来看一下JDK文档中提供使用信号量的实例。这个实例很好的解释了如何通过信号量控制资源访问。
public class Pool { private static final int MAX_AVAILABLE = 100; private final Semaphore available = new Semaphore(MAX_AVAILABLE, true); public Object getItem() throws InterruptedException { available.acquire(); // 申请一个许可 // 同时只能有100个线程进入取得可用项, // 超过100个则需要等待 return getNextAvailableItem(); } public void putItem(Object x) { // 将给定项放回池内,标记为未被使用 if (markAsUnused(x)) { available.release(); // 新增了一个可用项,释放一个许可,请求资源的线程被激活一个 } } // 仅作示例参考,非真实数据 protected Object[] items = new Object[MAX_AVAILABLE]; // 用于对象池复用对象 protected boolean[] used = new boolean[MAX_AVAILABLE]; // 标记作用 protected synchronized Object getNextAvailableItem() { for (int i = 0; i < MAX_AVAILABLE; ++i) { if (!used[i]) { used[i] = true; return items[i]; } } return null; } protected synchronized boolean markAsUnused(Object item) { for (int i = 0; i < MAX_AVAILABLE; ++i) { if (item == items[i]) { if (used[i]) { used[i] = false; return true; } else { return false; } } } return false; } }
此实例简单实现了一个对象池,对象池最大容量为100。因此,当同时有100个对象请求时,对象池就会出现资源短缺,未能获得资源的线程就需要等待。当某个线程使用对象完毕后,就需要将对象返回给对象池。此时,由于可用资源增加,因此,可以激活一个等待该资源的线程。
在刚开始接触ThreadLocal,笔者很难理解这个线程局部变量的使用场景。当现在回过头去看,ThreadLocal是一种多线程间并发访问变量的解决方案。与synchronized等加锁的方式不同,ThreadLocal完全不提供锁,而使用了以空间换时间的手段,为每个线程提供变量的独立副本,以保障线程安全,因此它不是一种数据共享的解决方案。
ThreadLocal是解决线程安全问题一个很好的思路,ThreadLocal类中有一个Map,用于存储每一个线程的变量副本,Map中元素的键为线程对象,而值对应线程的变量副本,由于Key值不可重复,每一个“线程对象”对应线程的“变量副本”,而到达了线程安全。
特别值得注意的地方,从性能上说,ThreadLocal并不具有绝对的又是,在并发量不是很高时,也行加锁的性能会更好。但作为一套与锁完全无关的线程安全解决方案,在高并发量或者所竞争激烈的场合,使用ThreadLocal可以在一定程度上减少锁竞争。
下面是一个ThreadLocal的简单使用:
public class TestNum { // 通过匿名内部类覆盖ThreadLocal的initialValue()方法,指定初始值 private static ThreadLocal seqNum = new ThreadLocal() { public Integer initialValue() { return 0; } }; // 获取下一个序列值 public int getNextNum() { seqNum.set(seqNum.get() + 1); return seqNum.get(); }public static void main(String[] args) { TestNum sn = new TestNum(); //3个线程共享sn,各自产生序列号 TestClient t1 = new TestClient(sn); TestClient t2 = new TestClient(sn); TestClient t3 = new TestClient(sn); t1.start(); t2.start(); t3.start(); } private static class TestClient extends Thread { private TestNum sn; public TestClient(TestNum sn) { this.sn = sn; } public void run() { for (int i = 0; i < 3; i++) { // 每个线程打出3个序列值 System.out.println("thread[" + Thread.currentThread().getName() + "] --> sn[" + sn.getNextNum() + "]"); } } } }
输出结果:
thread[Thread-0] –> sn[1] thread[Thread-1] –> sn[1] thread[Thread-2] –> sn[1] thread[Thread-1] –> sn[2] thread[Thread-0] –> sn[2] thread[Thread-1] –> sn[3] thread[Thread-2] –> sn[2] thread[Thread-0] –> sn[3] thread[Thread-2] –> sn[3]
输出的结果信息可以发现每个线程所产生的序号虽然都共享同一个TestNum实例,但它们并没有发生相互干扰的情况,而是各自产生独立的序列号,这是因为ThreadLocal为每一个线程提供了单独的副本。
“锁”是最常用的同步方法之一。在平常开发中,经常能看到很多同学直接把锁加很大一段代码上。还有的同学只会用一种锁方式解决所有共享问题。显然这样的编码是让人无法接受的。特别的在高并发的环境下,激烈的锁竞争会导致程序的性能下降德更加明显。因此合理使用锁对程序的性能直接相关。
1、线程的开销
在多核情况下,使用多线程可以明显提高系统的性能。但是在实际情况中,使用多线程的方式会额外增加系统的开销。相对于单核系统任务本身的资源消耗外,多线程应用还需要维护额外多线程特有的信息。比如,线程本身的元数据,线程调度,线程上下文的切换等。
2、减小锁持有时间
在使用锁进行并发控制的程序中,当锁发生竞争时,单个线程对锁的持有时间与系统性能有着直接的关系。如果线程持有锁的时间很长,那么相对地,锁的竞争程度也就越激烈。因此,在程序开发过程中,应该尽可能地减少对某个锁的占有时间,以减少线程间互斥的可能。比如下面这一段代码:
public synchronized void syncMehod(){ beforeMethod(); mutexMethod(); afterMethod(); }
此实例如果只有mutexMethod()方法是有同步需要的,而在beforeMethod(),和afterMethod()并不需要做同步控制。如果beforeMethod(),和afterMethod()分别是重量级的方法,则会花费较长的CPU时间。在这个时候,如果并发量较大时,使用这种同步方案会导致等待线程大量增加。因为当前执行的线程只有在执行完所有任务后,才会释放锁。
下面是优化后的方案,只在必要的时候进行同步,这样就能明显减少线程持有锁的时间,提高系统的吞吐量。代码如下:
public void syncMehod(){ beforeMethod(); synchronized(this){ mutexMethod(); } afterMethod(); }
3、减少锁粒度
减小锁粒度也是一种削弱多线程锁竞争的一种有效手段,这种技术典型的使用场景就是ConcurrentHashMap这个类。在普通的HashMap中每当对集合进行add()操作或者get()操作时,总是获得集合对象的锁。这种操作完全是一种同步行为,因为锁是在整个集合对象上的,因此,在高并发时,激烈的锁竞争会影响到系统的吞吐量。
如果看过源码的同学应该知道HashMap是数组+链表的方式做实现的。ConcurrentHashMap在HashMap的基础上将整个HashMap分成若干个段(Segment),每个段都是一个子HashMap。如果需要在增加一个新的表项,并不是将这个HashMap加锁,二十搜线根据hashcode得到该表项应该被存放在哪个段中,然后对该段加锁,并完成put()操作。这样,在多线程环境中,如果多个线程同时进行写入操作,只要被写入的项不存在同一个段中,那么线程间便可以做到真正的并行。具体的实现希望读者自己花点时间读一读ConcurrentHashMap这个类的源码,这里就不再做过多描述了。
4、锁分离
在前面提起过ReadWriteLock读写锁,那么读写分离的延伸就是锁的分离。同样可以在JDK中找到锁分离的源码LinkedBlockingQueue。
public class LinkedBlockingQueue extends AbstractQueue implements BlockingQueue, java.io.Serializable { /* Lock held by take, poll, etc / private final ReentrantLock takeLock = new ReentrantLock(); /** Wait queue for waiting takes */ private final Condition notEmpty = takeLock.newCondition(); /** Lock held by put, offer, etc */ private final ReentrantLock putLock = new ReentrantLock(); /** Wait queue for waiting puts */ private final Condition notFull = putLock.newCondition(); public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); // 不能有两个线程同时读取数据 try { while (count.get() == 0) { // 如果当前没有可用数据,一直等待put()的通知 notEmpty.await(); } x = dequeue(); // 从头部移除一项 c = count.getAndDecrement(); // size减1 if (c > 1) notEmpty.signal(); // 通知其他take()操作 } finally { takeLock.unlock(); // 释放锁 } if (c == capacity) signalNotFull(); // 通知put()操作,已有空余空间 return x; } public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); // Note: convention in all put/take/etc is to preset local var // holding count negative to indicate failure unless set. int c = -1; Node<E> node = new Node(e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; putLock.lockInterruptibly(); // 不能有两个线程同时put数据 try { /* * Note that count is used in wait guard even though it is * not protected by lock. This works because count can * only decrease at this point (all other puts are shut * out by lock), and we (or some other waiting put) are * signalled if it ever changes from capacity. Similarly * for all other uses of count in other wait guards. */ while (count.get() == capacity) { // 队列满了 则等待 notFull.await(); } enqueue(node); // 加入队列 c = count.getAndIncrement();// size加1 if (c + 1 < capacity) notFull.signal(); // 如果有足够空间,通知其他线程 } finally { putLock.unlock();// 释放锁 } if (c == 0) signalNotEmpty();// 插入成功后,通知take()操作读取数据 } // other code }
这里需要说明一下的就是,take()和put()函数是相互独立的,它们之间不存在锁竞争关系。只需要在take()和put()各自方法内部分别对takeLock和putLock发生竞争。从而,削弱了锁竞争的可能性。
5、锁粗化
上面说到的减小锁时间和粒度,这样做就是为了满足每个线程持有锁的时间尽量短。但是,在粒度上应该把握一个度,如果对用一个锁不停地进行请求、同步和释放,其本身也会消耗系统宝贵的资源,反而加大了系统开销。
我们需要知道的是,虚拟机在遇到一连串连续的对同一锁不断进行请求和释放的操作时,便会把所有的锁操作整合成对锁的一次请求,从而减少对锁的请求同步次数,这样的操作叫做锁的粗化。下面是一段整合实例演示:
public void syncMehod(){ synchronized(lock){ method1(); } synchronized(lock){ method2(); } } JVM整合后的形式: public void syncMehod(){ synchronized(lock){ method1(); method2(); } }
因此,这样的整合给我们开发人员对锁粒度的把握给出了很好的演示作用。
上面花了很大篇幅在说锁的事情,同时也提到过锁是会带来一定的上下文切换的额外资源开销,在高并发时,”锁“的激烈竞争可能会成为系统瓶颈。因此,这里可以使用一种非阻塞同步方法。这种无锁方式依然能保证数据和程序在高并发环境下保持多线程间的一致性。
1、非阻塞同步/无锁
非阻塞同步方式其实在前面的ThreadLocal中已经有所体现,每个线程拥有各自独立的变量副本,因此在并行计算时,无需相互等待。这里笔者主要推荐一种更为重要的、基于比较并交换(Compare And Swap)CAS算法的无锁并发控制方法。
CAS算法的过程:它包含3个参数CAS(V,E,N)。V表示要更新的变量,E表示预期值,N表示新值。仅当V值等于E值时,才会将V的值设为N,如果V值和E值不同,则说明已经有其他线程做了更新,则当前线程什么都不做。最后CAS返回当前V的真实值。CAS操作时抱着乐观的态度进行的,它总是认为自己可以成功完成操作。当多个线程同时使用CAS操作一个变量时,只有一个会胜出,并成功更新,其余俊辉失败。失败的线程不会被挂起,仅是被告知失败,并且允许再次尝试,当然也允许失败的线程放弃操作。基于这样的原理,CAS操作及时没有锁,也可以发现其他线程对当前线程的干扰,并且进行恰当的处理。
2、原子量操作
JDK的java.util.concurrent.atomic包提供了使用无锁算法实现的原子操作类,代码内部主要使用了底层native代码的实现。有兴趣的同学可以继续跟踪一下native层面的代码。这里就不贴表层的代码实现了。
下面主要以一个例子来展示普通同步方法和无锁同步的性能差距:
public class TestAtomic { private static final int MAX_THREADS = 3; private static final int TASK_COUNT = 3; private static final int TARGET_COUNT = 100 * 10000; private AtomicInteger acount = new AtomicInteger(0); private int count = 0; synchronized int inc() { return ++count; } synchronized int getCount() { return count; } public class SyncThread implements Runnable { String name; long startTime; TestAtomic out; public SyncThread(TestAtomic o, long startTime) { this.out = o; this.startTime = startTime; } @Override public void run() { int v = out.inc(); while (v < TARGET_COUNT) { v = out.inc(); } long endTime = System.currentTimeMillis(); System.out.println("SyncThread spend:" + (endTime - startTime) + "ms" + ", v=" + v); } } public class AtomicThread implements Runnable { String name; long startTime; public AtomicThread(long startTime) { this.startTime = startTime; } @Override public void run() { int v = acount.incrementAndGet(); while (v < TARGET_COUNT) { v = acount.incrementAndGet(); } long endTime = System.currentTimeMillis(); System.out.println("AtomicThread spend:" + (endTime - startTime) + "ms" + ", v=" + v); } } @Test public void testSync() throws InterruptedException { ExecutorService exe = Executors.newFixedThreadPool(MAX_THREADS); long startTime = System.currentTimeMillis(); SyncThread sync = new SyncThread(this, startTime); for (int i = 0; i < TASK_COUNT; i++) { exe.submit(sync); } Thread.sleep(10000); } @Test public void testAtomic() throws InterruptedException { ExecutorService exe = Executors.newFixedThreadPool(MAX_THREADS); long startTime = System.currentTimeMillis(); AtomicThread atomic = new AtomicThread(startTime); for (int i = 0; i < TASK_COUNT; i++) { exe.submit(atomic); } Thread.sleep(10000); } }
测试结果如下:
testSync(): SyncThread spend:201ms, v=1000002 SyncThread spend:201ms, v=1000000 SyncThread spend:201ms, v=1000001 testAtomic(): AtomicThread spend:43ms, v=1000000 AtomicThread spend:44ms, v=1000001 AtomicThread spend:46ms, v=1000002
相信这样的测试结果将内部锁和非阻塞同步算法的性能差异体现的非常明显。因此笔者更推荐直接视同atomic下的这个原子类。
终于把想表达的这些东西整理完成了,其实还有一些想CountDownLatch这样的类没有讲到。不过上面的所讲到的绝对是并发编程中的核心。也许有些读者朋友能在网上看到很多这样的知识点,但是个人还是觉得知识只有在对比的基础上才能找到它合适的使用场景。因此,这也是笔者整理这篇文章的原因,也希望这篇文章能帮到更多的同学。
以上就是Java并发控制机制详解 的内容,更多相关内容请关注PHP中文网(www.php.cn)!