Maison > Java > javaDidacticiel > Explication détaillée du mode partagé et des instances d'attente/notification basées sur des conditions

Explication détaillée du mode partagé et des instances d'attente/notification basées sur des conditions

零下一度
Libérer: 2017-07-17 14:17:25
original
1669 Les gens l'ont consulté

Processus de mise en œuvre de l'acquisition en mode partagé

Ci-dessus, nous avons expliqué le processus de mise en œuvre de l'acquisition du mode exclusif AbstractQueuedSynchronizer. Cet article frappe pendant que le fer est chaud et. continue d'examiner le mode partagé AbstractQueuedSynchronizer Le processus de mise en œuvre de l'acquisition. Après avoir étudié deux articles consécutifs, vous pouvez également comparer la différence entre l'acquisition en mode exclusif et l'acquisition en mode partagé pour approfondir votre compréhension de AbstractQueuedSynchronizer.

Regardons d'abord l'implémentation de l'acquisition en mode partagé. Les méthodes sont acquireShared et acquireSharedInterruptably. La différence est que cette dernière a un traitement d'interruption. acquireShared à titre d'exemple :

 1 public final void acquireShared(int arg) { 2     if (tryAcquireShared(arg) < 0) 3         doAcquireShared(arg); 4 }
Copier après la connexion

Ici vous pouvez voir la première différence : Lors de l'acquisition en mode exclusif, la méthode tryAcquire est remplacée par la sous-classe renvoie C'est un booléen, c'est-à-dire si tryAcquire réussit ; lors de l'acquisition en mode partagé, une variable int est renvoyée pour déterminer si elle est <0. L'implémentation de la méthode doAcquireShared est :

 1 private void doAcquireShared(int arg) { 2     final Node node = addWaiter(Node.SHARED); 3     boolean failed = true; 4     try { 5         boolean interrupted = false; 6         for (;;) { 7             final Node p = node.predecessor(); 8             if (p == head) { 9                 int r = tryAcquireShared(arg);10                 if (r >= 0) {11                     setHeadAndPropagate(node, r);12                     p.next = null; // help GC13                     if (interrupted)14                         selfInterrupt();15                     failed = false;16                     return;17                 }18             }19             if (shouldParkAfterFailedAcquire(p, node) &&20                 parkAndCheckInterrupt())21                 interrupted = true;22         }23     } finally {24         if (failed)25             cancelAcquire(node);26     }27 }
Copier après la connexion

Analysons ce que fait ce code :

  1. addWaiter, addWaiter All tryAcquireShared Les threads

  2. Obtenir le nœud prédécesseur du nœud actuel, seul le nœud prédécesseur est la tête Le nœud peut essayer AcquireShared, ce qui est identique au verrou exclusif

  3. Le nœud prédécesseur n'est pas la tête, exécutez "shouldParkAfterFailedAcquire() && parkAndCheckInterrupt()", boucle for(;;) , la méthode "shouldParkAfterFailedAcquire()" est exécutée deux fois et le thread actuel est bloqué. C'est la même chose que le verrouillage exclusif

En effet, la plupart de la logique d'acquisition en mode partagé et d'acquisition. en mode exclusif sont similaires. La plus grande différence est qu'une fois tryAcquireShared réussi, l'acquisition en mode exclusif peut directement définir le nœud actuel comme nœud principal, tandis que le mode partagé exécutera la méthode setHeadAndPropagate, . effectue une opération de propagation supplémentaire après avoir réglé la tête. Le code source de la méthode setHeadAndPropagate est :

 1 private void setHeadAndPropagate(Node node, int propagate) { 2     Node h = head; // Record old head for check below 3     setHead(node); 4     /* 5      * Try to signal next queued node if: 6      *   Propagation was indicated by caller, 7      *     or was recorded (as h.waitStatus) by a previous operation 8      *     (note: this uses sign-check of waitStatus because 9      *      PROPAGATE status may transition to SIGNAL.)10      * and11      *   The next node is waiting in shared mode,12      *     or we don't know, because it appears null13      *14      * The conservatism in both of these checks may cause15      * unnecessary wake-ups, but only when there are multiple16      * racing acquires/releases, so most need signals now or soon17      * anyway.18      */19     if (propagate > 0 || h == null || h.waitStatus < 0) {20         Node s = node.next;21         if (s == null || s.isShared())22             doReleaseShared();23     }24 }
Copier après la connexion

Le code de la 3ème ligne définit la tête de réinitialisation, et le code de la 2ème ligne réinitialise la tête car le le code dans la 3ème ligne réinitialise la tête. Définissez d’abord une variable Node h pour obtenir l’adresse de la tête d’origine. Ces deux lignes de code sont très simples.

Le code des lignes 19 à 23 est l'endroit le plus différent entre le verrouillage exclusif et le verrouillage partagé. Regardons le code du verrouillage exclusif acquérirQueued :

 1 final boolean acquireQueued(final Node node, int arg) { 2     boolean failed = true; 3     try { 4         boolean interrupted = false; 5         for (;;) { 6             final Node p = node.predecessor(); 7             if (p == head && tryAcquire(arg)) { 8                 setHead(node); 9                 p.next = null; // help GC10                 failed = false;11                 return interrupted;12             }13             if (shouldParkAfterFailedAcquire(p, node) &&14                 parkAndCheckInterrupt())15                 interrupted = true;16         }17     } finally {18         if (failed)19             cancelAcquire(node);20     }21 }
Copier après la connexion

Cela signifie que après le réveil d'un nœud de verrouillage exclusif, il lui suffit de définir ce nœud comme tête et c'est fait, mais les verrous partagés sont différents, après un nœud est défini comme tête, si son nœud successeur est dans l'état PARTAGÉ, il continuera à essayer de réveiller le nœud via la méthode doReleaseShared, réalisant la propagation vers l'arrière de l'état partagé .

Processus de mise en œuvre de la version en mode partagé

Ce qui précède décrit comment acquérir en mode partagé Implémenté, jetons un coup d'œil au processus d'implémentation de la version. La méthode est releaseShared :

1 public final boolean releaseShared(int arg) {2     if (tryReleaseShared(arg)) {3         doReleaseShared();4         return true;5     }6     return false;7 }
Copier après la connexion

La méthode tryReleaseShared est implémentée par une sous-classe. Si tryReleaseShared réussit, alors exécutez. Méthode doReleaseShared() :

 1 private void doReleaseShared() { 2     /* 3      * Ensure that a release propagates, even if there are other 4      * in-progress acquires/releases.  This proceeds in the usual 5      * way of trying to unparkSuccessor of head if it needs 6      * signal. But if it does not, status is set to PROPAGATE to 7      * ensure that upon release, propagation continues. 8      * Additionally, we must loop in case a new node is added 9      * while we are doing this. Also, unlike other uses of10      * unparkSuccessor, we need to know if CAS to reset status11      * fails, if so rechecking.12      */13     for (;;) {14         Node h = head;15         if (h != null && h != tail) {16             int ws = h.waitStatus;17             if (ws == Node.SIGNAL) {18                 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))19                     continue;            // loop to recheck cases20                 unparkSuccessor(h);21             }22             else if (ws == 0 &&23                      !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))24                 continue;                // loop on failed CAS25         }26         if (h == head)                   // loop if head changed27             break;28     }29 }
Copier après la connexion

主要是两层逻辑:

  1. 头结点本身的waitStatus是SIGNAL且能通过CAS算法将头结点的waitStatus从SIGNAL设置为0,唤醒头结点的后继节点

  2. 头结点本身的waitStatus是0的话,尝试将其设置为PROPAGATE状态的,意味着共享状态可以向后传播

Condition的await()方法实现原理----构建等待队列

我们知道,Condition是用于实现通知/等待机制的,和Object的wait()/notify()一样,由于本文之前描述AbstractQueuedSynchronizer的共享模式的篇幅不是很长,加之Condition也是AbstractQueuedSynchronizer的一部分,因此将Condition也放在这里写了。

Condition分为await()和signal()两部分,前者用于等待、后者用于唤醒,首先看一下await()是如何实现的。Condition本身是一个接口,其在AbstractQueuedSynchronizer中的实现为ConditionObject:

1 public class ConditionObject implements Condition, java.io.Serializable {2         private static final long serialVersionUID = 1173984872572414699L;3         /** First node of condition queue. */4         private transient Node firstWaiter;5         /** Last node of condition queue. */6         private transient Node lastWaiter;7         8         ...9 }
Copier après la connexion

这里贴了一些字段定义,后面都是方法就不贴了,会对重点方法进行分析的。从字段定义我们可以看到,ConditionObject全局性地记录了第一个等待的节点与最后一个等待的节点

像ReentrantLock每次要使用ConditionObject,直接new一个ConditionObject出来即可。我们关注一下await()方法的实现:

 1 public final void await() throws InterruptedException { 2     if (Thread.interrupted()) 3         throw new InterruptedException(); 4     Node node = addConditionWaiter(); 5     int savedState = fullyRelease(node); 6     int interruptMode = 0; 7     while (!isOnSyncQueue(node)) { 8         LockSupport.park(this); 9         if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)10             break;11     }12     if (acquireQueued(node, savedState) && interruptMode != THROW_IE)13         interruptMode = REINTERRUPT;14     if (node.nextWaiter != null) // clean up if cancelled15         unlinkCancelledWaiters();16     if (interruptMode != 0)17         reportInterruptAfterWait(interruptMode);18 }
Copier après la connexion
Copier après la connexion

第2行~第3行的代码用于处理中断,第4行代码比较关键,添加Condition的等待者,看一下实现:

 1 private Node addConditionWaiter() { 2     Node t = lastWaiter; 3     // If lastWaiter is cancelled, clean out. 4     if (t != null && t.waitStatus != Node.CONDITION) { 5         unlinkCancelledWaiters(); 6         t = lastWaiter; 7     } 8     Node node = new Node(Thread.currentThread(), Node.CONDITION); 9     if (t == null)10         firstWaiter = node;11     else12         t.nextWaiter = node;13     lastWaiter = node;14     return node;15 }
Copier après la connexion

首先拿到队列(注意数据结构,Condition构建出来的也是一个队列)中最后一个等待者,紧接着第4行的的判断,判断最后一个等待者的waitStatus不是CONDITION的话,执行第5行的代码,解绑取消的等待者,因为通过第8行的代码,我们看到,new出来的Node的状态都是CONDITION的

那么unlinkCancelledWaiters做了什么?里面的流程就不看了,就是一些指针遍历并判断状态的操作,总结一下就是:从头到尾遍历每一个Node,遇到Node的waitStatus不是CONDITION的就从队列中踢掉,该节点的前后节点相连。

接着第8行的代码前面说过了,new出来了一个Node,存储了当前线程,waitStatus是CONDITION,接着第9行~第13行的操作很好理解:

  1. 如果lastWaiter是null,说明FIFO队列中没有任何Node,firstWaiter=Node

  2. 如果lastWaiter不是null,说明FIFO队列中有Node,原lastWaiter的next指向Node

  3. 无论如何,新加入的Node编程lastWaiter,即新加入的Node一定是在最后面

用一张图表示一下构建的数据结构就是:

对比学习,我们总结一下Condition构建出来的队列和AbstractQueuedSynchronizer构建出来的队列的差别,主要体现在2点上:

  1. AbstractQueuedSynchronizer构建出来的队列,头节点是一个没有Thread的空节点,其标识作用,而Condition构建出来的队列,头节点就是真正等待的节点

  2. AbstractQueuedSynchronizer构建出来的队列,节点之间有next与pred相互标识该节点的前一个节点与后一个节点的地址,而Condition构建出来的队列,只使用了nextWaiter标识下一个等待节点的地址

整个过程中,我们看到没有使用任何CAS操作,firstWaiter和lastWaiter也没有用volatile修饰,其实原因很简单:要await()必然要先lock(),既然lock()了就表示没有竞争,没有竞争自然也没必要使用volatile+CAS的机制去保证什么

Condition的await()方法实现原理----线程等待

前面我们看了Condition构建等待队列的过程,接下来我们看一下等待的过程,await()方法的代码比较短,再贴一下:

 1 public final void await() throws InterruptedException { 2     if (Thread.interrupted()) 3         throw new InterruptedException(); 4     Node node = addConditionWaiter(); 5     int savedState = fullyRelease(node); 6     int interruptMode = 0; 7     while (!isOnSyncQueue(node)) { 8         LockSupport.park(this); 9         if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)10             break;11     }12     if (acquireQueued(node, savedState) && interruptMode != THROW_IE)13         interruptMode = REINTERRUPT;14     if (node.nextWaiter != null) // clean up if cancelled15         unlinkCancelledWaiters();16     if (interruptMode != 0)17         reportInterruptAfterWait(interruptMode);18 }
Copier après la connexion
Copier après la connexion

构建完毕队列之后,执行第5行的fullyRelease方法,顾名思义:fullyRelease方法的作用是完全释放Node的状态。方法实现为:

 1 final int fullyRelease(Node node) { 2     boolean failed = true; 3     try { 4         int savedState = getState(); 5         if (release(savedState)) { 6             failed = false; 7             return savedState; 8         } else { 9             throw new IllegalMonitorStateException();10         }11     } finally {12         if (failed)13             node.waitStatus = Node.CANCELLED;14     }15 }
Copier après la connexion

这里第4行获取state,第5行release的时候将整个state传过去,理由是某线程可能多次调用了lock()方法,比如调用了10次lock,那么此线程就将state加到了10,所以这里要将10传过去,将状态全部释放,这样后面的线程才能重新从state=0开始竞争锁,这也是方法被命名为fullyRelease的原因,因为要完全释放锁,释放锁之后,如果有竞争锁的线程,那么就唤醒第一个,这都是release方法的逻辑了,前面的文章详细讲解过。

接着看await()方法的第7行判断"while(!isOnSyncQueue(node))":

 1 final boolean isOnSyncQueue(Node node) { 2     if (node.waitStatus == Node.CONDITION || node.prev == null) 3         return false; 4     if (node.next != null) // If has successor, it must be on queue 5         return true; 6     /* 7      * node.prev can be non-null, but not yet on queue because 8      * the CAS to place it on queue can fail. So we have to 9      * traverse from tail to make sure it actually made it.  It10      * will always be near the tail in calls to this method, and11      * unless the CAS failed (which is unlikely), it will be12      * there, so we hardly ever traverse much.13      */14     return findNodeFromTail(node);15 }
Copier après la connexion

注意这里的判断是Node是否在AbstractQueuedSynchronizer构建的队列中而不是Node是否在Condition构建的队列中,如果Node不在AbstractQueuedSynchronizer构建的队列中,那么调用LockSupport的park方法阻塞。

至此调用await()方法的线程构建Condition等待队列--释放锁--等待的过程已经全部分析完毕。

Condition的signal()实现原理

上面的代码分析了构建Condition等待队列--释放锁--等待的过程,接着看一下signal()方法通知是如何实现的:

1 public final void signal() {2     if (!isHeldExclusively())3         throw new IllegalMonitorStateException();4     Node first = firstWaiter;5     if (first != null)6         doSignal(first);7 }
Copier après la connexion

首先从第2行的代码我们看到,要能signal(),当前线程必须持有独占锁,否则抛出异常IllegalMonitorStateException。

那么真正操作的时候,获取第一个waiter,如果有waiter,调用doSignal方法:

1 private void doSignal(Node first) {2     do {3         if ( (firstWaiter = first.nextWaiter) == null)4             lastWaiter = null;5         first.nextWaiter = null;6     } while (!transferForSignal(first) &&7              (first = firstWaiter) != null);8 }
Copier après la connexion

第3行~第5行的代码很好理解:

  1. 重新设置firstWaiter,指向第一个waiter的nextWaiter

  2. 如果第一个waiter的nextWaiter为null,说明当前队列中只有一个waiter,lastWaiter置空

  3. 因为firstWaiter是要被signal的,因此它没什么用了,nextWaiter置空

接着执行第6行和第7行的代码,这里重点就是第6行的transferForSignal方法:

 1 final boolean transferForSignal(Node node) { 2     /* 3      * If cannot change waitStatus, the node has been cancelled. 4      */ 5     if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) 6         return false; 7  8     /* 9      * Splice onto queue and try to set waitStatus of predecessor to10      * indicate that thread is (probably) waiting. If cancelled or11      * attempt to set waitStatus fails, wake up to resync (in which12      * case the waitStatus can be transiently and harmlessly wrong).13      */14     Node p = enq(node);15     int ws = p.waitStatus;16     if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))17         LockSupport.unpark(node.thread);18     return true;19 }
Copier après la connexion

方法本意是将一个节点从Condition队列转换为AbstractQueuedSynchronizer队列,总结一下方法的实现:

  1. 尝试将Node的waitStatus从CONDITION置为0,这一步失败直接返回false

  2. 当前节点进入调用enq方法进入AbstractQueuedSynchronizer队列

  3. 当前节点通过CAS机制将waitStatus置为SIGNAL

最后上面的步骤全部成功,返回true,返回true唤醒等待节点成功。从唤醒的代码我们可以得出一个重要结论:某个await()的节点被唤醒之后并不意味着它后面的代码会立即执行,它会被加入到AbstractQueuedSynchronizer队列的尾部,只有前面等待的节点获取锁全部完毕才能轮到它

代码分析到这里,我想类似的signalAll方法也没有必要再分析了,显然signalAll方法的作用就是将所有Condition队列中等待的节点逐一队列中从移除,由CONDITION状态变为SIGNAL状态并加入AbstractQueuedSynchronizer队列的尾部。

 

代码示例

可能大家看了我分析半天代码会有点迷糊,这里最后我贴一段我用于验证上面Condition结论的示例代码,首先建立一个Thread,我将之命名为ConditionThread:

 1 /** 2  * @author 五月的仓颉 3  */ 4 public class ConditionThread implements Runnable { 5  6     private Lock lock; 7      8     private Condition condition; 9     10     public ConditionThread(Lock lock, Condition condition) {11         this.lock = lock;12         this.condition = condition;13     }14     15     @Override16     public void run() {17         18         if ("线程0".equals(JdkUtil.getThreadName())) {19             thread0Process();20         } else if ("线程1".equals(JdkUtil.getThreadName())) {21             thread1Process();22         } else if ("线程2".equals(JdkUtil.getThreadName())) {23             thread2Process();24         }25         26     }27     28     private void thread0Process() {29         try {30             lock.lock();31             System.out.println("线程0休息5秒");32             JdkUtil.sleep(5000);33             condition.signal();34             System.out.println("线程0唤醒等待线程");35         } finally {36             lock.unlock();37         }38     }39     40     private void thread1Process() {41         try {42             lock.lock();43             System.out.println("线程1阻塞");44             condition.await();45             System.out.println("线程1被唤醒");46         } catch (InterruptedException e) {47             48         } finally {49             lock.unlock();50         }51     }52     53     private void thread2Process() {54         try {55             System.out.println("线程2想要获取锁");56             lock.lock();57             System.out.println("线程2获取锁成功");58         } finally {59             lock.unlock();60         }61     }62     63 }
Copier après la connexion

这个类里面的方法就不解释了,反正就三个方法片段,根据线程名判断,每个线层执行的是其中的一个代码片段。写一段测试代码:

 1 /** 2  * @author 五月的仓颉 3  */ 4 @Test 5 public void testCondition() throws Exception { 6     Lock lock = new ReentrantLock(); 7     Condition condition = lock.newCondition(); 8          9     // 线程0的作用是signal10     Runnable runnable0 = new ConditionThread(lock, condition);11     Thread thread0 = new Thread(runnable0);12     thread0.setName("线程0");13     // 线程1的作用是await14     Runnable runnable1 = new ConditionThread(lock, condition);15     Thread thread1 = new Thread(runnable1);16     thread1.setName("线程1");17     // 线程2的作用是lock18     Runnable runnable2 = new ConditionThread(lock, condition);19     Thread thread2 = new Thread(runnable2);20     thread2.setName("线程2");21         22     thread1.start();23     Thread.sleep(1000);24     thread0.start();25     Thread.sleep(1000);26     thread2.start();27         28     thread1.join();29 }
Copier après la connexion

测试代码的意思是:

  1. 线程1先启动,获取锁,调用await()方法等待

  2. 线程0后启动,获取锁,休眠5秒准备signal()

  3. 线程2最后启动,获取锁,由于线程0未使用完毕锁,因此线程2排队,可以此时由于线程0还未signal(),因此线程1在线程0执行signal()后,在AbstractQueuedSynchronizer队列中的顺序是在线程2后面的

代码执行结果为:

<span style="color: #008080"> 1</span> <span style="color: #000000">线程1阻塞</span><span style="color: #008080"> 2</span> <span style="color: #000000">线程0休息5秒</span><span style="color: #008080"> 3</span> <span style="color: #000000">线程2想要获取锁</span><span style="color: #008080"> 4</span> <span style="color: #000000">线程0唤醒等待线程</span><span style="color: #008080"> 5</span> <span style="color: #000000">线程2获取锁成功</span><span style="color: #008080"> 6</span> <span style="color: #000000">线程1被唤醒</span><span style="color: #008080"><br></span>
Copier après la connexion

符合我们的结论:signal()并不意味着被唤醒的线程立即执行。由于线程2先于线程0排队,因此看到第5行打印的内容,线程2先获取锁。

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Étiquettes associées:
source:php.cn
Déclaration de ce site Web
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn
Tutoriels populaires
Plus>
Derniers téléchargements
Plus>
effets Web
Code source du site Web
Matériel du site Web
Modèle frontal