Home > Java > javaTutorial > Detailed explanation of concurrency classes based on AbstractQueuedSynchronizer

Detailed explanation of concurrency classes based on AbstractQueuedSynchronizer

Release: 2017-07-17 14:58:47
1531 people have browsed it

Fair mode ReentrantLock implementation principle

The previous article studied the exclusive lock and shared lock of AbstractQueuedSynchronizer, which has the basis of the first two articles. , you can take advantage of the victory and see how the concurrency class based on AbstractQueuedSynchronizer is implemented.

ReentrantLock is obviously an exclusive lock. The first is ReentrantLock in fair mode. Sync is the base class in ReentractLock and inherits from AbstractQueuedSynchronizer , take a look at the code implementation:

 1 abstract static class Sync extends AbstractQueuedSynchronizer { 2     private static final long serialVersionUID = -5179523762034025860L; 3  4     /** 5      * Performs {@link Lock#lock}. The main reason for subclassing 6      * is to allow fast path for nonfair version. 7      */ 8     abstract void lock(); 9 10     /**11      * Performs non-fair tryLock.  tryAcquire is12      * implemented in subclasses, but both need nonfair13      * try for trylock method.14      */15     final boolean nonfairTryAcquire(int acquires) {16         final Thread current = Thread.currentThread();17         int c = getState();18         if (c == 0) {19             if (compareAndSetState(0, acquires)) {20                 setExclusiveOwnerThread(current);21                 return true;22             }23         }24         else if (current == getExclusiveOwnerThread()) {25             int nextc = c + acquires;26             if (nextc < 0) // overflow27                 throw new Error("Maximum lock count exceeded");28             setState(nextc);29             return true;30         }31         return false;32     }33 34     protected final boolean tryRelease(int releases) {35         int c = getState() - releases;36         if (Thread.currentThread() != getExclusiveOwnerThread())37             throw new IllegalMonitorStateException();38         boolean free = false;39         if (c == 0) {40             free = true;41             setExclusiveOwnerThread(null);42         }43         setState(c);44         return free;45     }46 47     protected final boolean isHeldExclusively() {48         // While we must in general read state before owner,49         // we don&#39;t need to do so to check if current thread is owner50         return getExclusiveOwnerThread() == Thread.currentThread();51     }52 53     final ConditionObject newCondition() {54         return new ConditionObject();55     }56 57     // Methods relayed from outer class58 59     final Thread getOwner() {60         return getState() == 0 ? null : getExclusiveOwnerThread();61     }62 63     final int getHoldCount() {64         return isHeldExclusively() ? getState() : 0;65     }66 67     final boolean isLocked() {68         return getState() != 0;69     }70 71     /**72      * Reconstitutes this lock instance from a stream.73      * @param s the stream74      */75     private void readObject(java.io.ObjectInputStream s)76         throws java.io.IOException, ClassNotFoundException {77         s.defaultReadObject();78         setState(0); // reset to unlocked state79     }80 }
Copy after login

Sync belongs to a public class. It is an abstract description that Sync will be inherited. Let’s briefly summarize what Sync mainly does. (Because Sync is not the key to fair locking of ReentrantLock):

  1. defines a lock method for subclasses to implement. The reason why we can usually call ReentrantLock's lock( ) method, because Sync defines it

  2. implements the unfair lock tryAcquira method

  3. Implemented the tryRelease method, relatively simple, status -1, the thread of the exclusive lock is empty

  4. Implemented the isHeldExclusively method

  5. The newCondition method is defined so that developers can use Condition to implement notification/waiting

Next, let’s take a look at the fair lock The implementation of FairSync class, which inherits from Sync:

 1 static final class FairSync extends Sync { 2     private static final long serialVersionUID = -3000897897090466540L; 3  4     final void lock() { 5         acquire(1); 6     } 7  8     /** 9      * Fair version of tryAcquire.  Don&#39;t grant access unless10      * recursive call or no waiters or is first.11      */12     protected final boolean tryAcquire(int acquires) {13         final Thread current = Thread.currentThread();14         int c = getState();15         if (c == 0) {16             if (!hasQueuedPredecessors() &&17                 compareAndSetState(0, acquires)) {18                 setExclusiveOwnerThread(current);19                 return true;20             }21         }22         else if (current == getExclusiveOwnerThread()) {23             int nextc = c + acquires;24             if (nextc < 0)25                 throw new Error("Maximum lock count exceeded");26             setState(nextc);27             return true;28         }29         return false;30     }31 }
Copy after login

Let’s sort out the key points:

  1. Every time acquire, state+1, if the current thread lock() and then lock(), state will continue to +1, correspondingly, when unlock(), state-1, until the state is reduced to 0, It means that the current thread has released all the states, and other threads can compete. When

  2. #state=0, make a judgment through the hasQueuedPredecessors method, the implementation of hasQueuedPredecessors For "##h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); ", where h is head and t is tail. Since the result is inverted in the code, the judgment after inversion is "h == t || ((s = h.next) != null && s.thread == Thread.currentThread());", in summary, there are two situations that can be judged by !hasQueuedPredecessors():

    1. ##h==t, h==t situation is

      Either there is no data in the current FIFO queue , or Only one head has been built and has not been connected to any Node, so head is tail

    2. (s = h.next ) != null && s.thread == Thread.currentThread(),

      The current thread is the thread in the first Node that is waiting 

  3. If no thread waits longer than the current thread to perform the acquire operation, then the tryAcquire of the thread that changes the state from 0 to 1 through the CAS operation succeeds

  4. Threads that do not have a successful tryAcquire are constructed as a FIFO queue according to the order of tryAcquire, that is, the first one that fails tryAcquire is ranked after the head, and the second The first two threads that failed tryAcquire are ranked in the last two places in the head

  5. When the thread with successful tryAcquire is released, the first thread that failed tryAcquire is the first to try tryAcquire. This is a first-come, first-served, typical fair lock

#Unfair mode ReentrantLock implementation principle

After reading the fair mode ReentrantLock, let’s take a look at how the unfair mode ReentrantLock is implemented. The NonfairSync class is also inherited from the Sync class and is implemented as:

 1 static final class NonfairSync extends Sync { 2     private static final long serialVersionUID = 7316153563782823691L; 3  4     /** 5      * Performs lock.  Try immediate barge, backing up to normal 6      * acquire on failure. 7      */ 8     final void lock() { 9         if (compareAndSetState(0, 1))10             setExclusiveOwnerThread(Thread.currentThread());11         else12             acquire(1);13     }14 15     protected final boolean tryAcquire(int acquires) {16         return nonfairTryAcquire(acquires);17     }18 }
Copy after login


 1 final boolean nonfairTryAcquire(int acquires) { 2     final Thread current = Thread.currentThread(); 3     int c = getState(); 4     if (c == 0) { 5         if (compareAndSetState(0, acquires)) { 6             setExclusiveOwnerThread(current); 7             return true; 8         } 9     }10     else if (current == getExclusiveOwnerThread()) {11         int nextc = c + acquires;12         if (nextc < 0) // overflow13             throw new Error("Maximum lock count exceeded");14         setState(nextc);15         return true;16     }17     return false;18 }
Copy after login


  1. 线程1、线程2、线程3竞争锁,线程1竞争成功获取锁,线程2、线程3依次排队

  2. 线程1执行完毕,释放锁,state变为0,唤醒了第一个排队的线程2

  3. 此时线程4来尝试获取锁了,由于线程2被唤醒了,因此线程2与线程4竞争锁

  4. 线程4成功将state从0变为1,线程2竞争锁失败,继续park



<span style="color: #000000">线程1是先将state设为0,再去唤醒线程2,这两个过程之间是有时间差的。<br/><br/>那么如果线程1将state设置为0的时候,线程4就通过CAS算法获取到了锁,且在线程1唤醒线程2之前就已经使用完毕锁,那么相当于线程2获取锁的时间并没有推迟,在线程1将state设置为0到线程1唤醒线程2的这段时间里,反而有线程4获取了锁执行了任务,这就增加了系统的吞吐量,相当于单位时间处理了更多的任务。</span>
Copy after login





 1 abstract static class Sync extends AbstractQueuedSynchronizer { 2     private static final long serialVersionUID = 1192457210091910933L; 3  4     Sync(int permits) { 5         setState(permits); 6     } 7  8     final int getPermits() { 9         return getState();10     }11 12     final int nonfairTryAcquireShared(int acquires) {13         for (;;) {14             int available = getState();15             int remaining = available - acquires;16             if (remaining < 0 ||17                 compareAndSetState(available, remaining))18                 return remaining;19         }20     }21 22     protected final boolean tryReleaseShared(int releases) {23         for (;;) {24             int current = getState();25             int next = current + releases;26             if (next < current) // overflow27                 throw new Error("Maximum permit count exceeded");28             if (compareAndSetState(current, next))29                 return true;30         }31     }32 33     final void reducePermits(int reductions) {34         for (;;) {35             int current = getState();36             int next = current - reductions;37             if (next > current) // underflow38                 throw new Error("Permit count underflow");39             if (compareAndSetState(current, next))40                 return;41         }42     }43 44     final int drainPermits() {45         for (;;) {46             int current = getState();47             if (current == 0 || compareAndSetState(current, 0))48                 return current;49         }50     }51 }
Copy after login


  1. getPermits方法获取当前的许可剩余量还剩多少,即还有多少线程可以同时获得信号量

  2. 定义了非公平信号量获取共享锁的逻辑nonfairTryAcquireShared

  3. 定义了公平模式释放信号量的逻辑tryReleaseShared,相当于释放一次信号量,state就向上+1(信号量每次的获取与释放都是以1为单位的)


 1 static final class FairSync extends Sync { 2     private static final long serialVersionUID = 2014338818796000944L; 3  4     FairSync(int permits) { 5         super(permits); 6     } 7  8     protected int tryAcquireShared(int acquires) { 9         for (;;) {10             if (hasQueuedPredecessors())11                 return -1;12             int available = getState();13             int remaining = available - acquires;14             if (remaining < 0 ||15                 compareAndSetState(available, remaining))16                 return remaining;17         }18     }19 }
Copy after login



  1. remaining减完<0

  2. 通过cas设置成功



 1 static final class NonfairSync extends Sync { 2     private static final long serialVersionUID = -2694183684443567898L; 3  4     NonfairSync(int permits) { 5         super(permits); 6     } 7  8     protected int tryAcquireShared(int acquires) { 9         return nonfairTryAcquireShared(acquires);10     }11 }
Copy after login


1 final int nonfairTryAcquireShared(int acquires) {2     for (;;) {3         int available = getState();4         int remaining = available - acquires;5         if (remaining < 0 ||6             compareAndSetState(available, remaining))7             return remaining;8     }9 }
Copy after login







 1 public void await() throws InterruptedException { 2     sync.acquireSharedInterruptibly(1); 3 }
Copy after login


 1 private static final class Sync extends AbstractQueuedSynchronizer { 2     private static final long serialVersionUID = 4982264981922014374L; 3  4     Sync(int count) { 5         setState(count); 6     } 7  8     int getCount() { 9         return getState();10     }11 12     protected int tryAcquireShared(int acquires) {13         return (getState() == 0) ? 1 : -1;14     }15 16     protected boolean tryReleaseShared(int releases) {17         // Decrement count; signal when transition to zero18         for (;;) {19             int c = getState();20             if (c == 0)21                 return false;22             int nextc = c-1;23             if (compareAndSetState(c, nextc))24                 return nextc == 0;25         }26     }27 }
Copy after login


  1. 传入一个count,state就等于count,await的时候判断是不是0,是0返回1表示成功,不是0返回-1表示失败,构建FIFO队列,head头只连接一个Node,Node中的线程就是调用CountDownLatch的await()方法的线程

  2. 每次countDown的时候对state-1,直到state减到0的时候才算tryReleaseShared成功,tryReleaseShared成功,唤醒被挂起的线程


The above is the detailed content of Detailed explanation of concurrency classes based on AbstractQueuedSynchronizer. For more information, please follow other related articles on the PHP Chinese website!

Statement of this Website
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn
Popular Tutorials
Latest Downloads
Web Effects
Website Source Code
Website Materials
Front End Template