AQS 的內容太多,所以我們分成了兩個章節,沒有看過AQS 上半章節的同學可以回首看一下哈,上半章節裡面說了很多鎖的基本概念,基本屬性,如何獲得鎖等等,本章我們主要聊下如何釋放鎖和同步隊列兩大部分。
釋放鎖定的觸發時機就是我們常用的Lock.unLock () 方法,目的就是讓執行緒釋放對資源的存取權(流程見整體架構圖紫色路線)。
釋放鎖也是分為兩類,一類是排它鎖的釋放,一類是共享鎖的釋放,我們分別來看下。
排它鎖的釋放就比較簡單了,從隊頭開始,找它的下一個節點,如果下一個節點是空的,就會從尾開始,一直找到狀態不是取消的節點,然後釋放該節點,原始碼如下:
// unlock 的基础方法 public final boolean release(int arg) { // tryRelease 交给实现类去实现,一般就是用当前同步器状态减去 arg,如果返回 true 说明成功释放锁。 if (tryRelease(arg)) { Node h = head; // 头节点不为空,并且非初始化状态 if (h != null && h.waitStatus != 0) // 从头开始唤醒等待锁的节点 unparkSuccessor(h); return true; } return false; } // 很有意思的方法,当线程释放锁成功后,从 node 开始唤醒同步队列中的节点 // 通过唤醒机制,保证线程不会一直在同步队列中阻塞等待 private void unparkSuccessor(Node node) { // node 节点是当前释放锁的节点,也是同步队列的头节点 int ws = node.waitStatus; // 如果节点已经被取消了,把节点的状态置为初始化 if (ws < 0) compareAndSetWaitStatus(node, ws, 0); // 拿出 node 节点的后面一个节点 Node s = node.next; // s 为空,表示 node 的后一个节点为空 // s.waitStatus 大于0,代表 s 节点已经被取消了 // 遇到以上这两种情况,就从队尾开始,向前遍历,找到第一个 waitStatus 字段不是被取消的 if (s == null || s.waitStatus > 0) { s = null; // 这里从尾迭代,而不是从头开始迭代是有原因的。 // 主要是因为节点被阻塞的时候,是在 acquireQueued 方法里面被阻塞的,唤醒时也一定会在 acquireQueued 方法里面被唤醒,唤醒之后的条件是,判断当前节点的前置节点是否是头节点,这里是判断当前节点的前置节点,所以这里必须使用从尾到头的迭代顺序才行,目的就是为了过滤掉无效的前置节点,不然节点被唤醒时,发现其前置节点还是无效节点,就又会陷入阻塞。 for (Node t = tail; t != null && t != node; t = t.prev) // t.waitStatus <= 0 说明 t 没有被取消,肯定还在等待被唤醒 if (t.waitStatus <= 0) s = t; } // 唤醒以上代码找到的线程 if (s != null) LockSupport.unpark(s.thread); }
釋放共享鎖定的方法是releaseShared,主要分成兩個步驟:
tryReleaseShared 嘗試釋放當前共享鎖定,失敗返回false,成功走2;
喚醒當前節點的後續阻塞節點,這個方法我們之前看過了,線程在獲得共享鎖的時候,就會去喚醒其後面的節點,方法名稱為:doReleaseShared。
我們一起來看下releaseShared 的原始碼:
// 共享模式下,释放当前线程的共享锁 public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { // 这个方法就是线程在获得锁时,唤醒后续节点时调用的方法 doReleaseShared(); return true; } return false; }
在看條件佇列的方法之前,我們先得弄清楚為什麼有了同步隊列,還需要條件隊列?
主要是因為不是所有場景一個同步隊列就可以搞定的,在遇到鎖隊列結合的場景時,就需要Lock Condition 才行,先用Lock 來決定哪些執行緒可以獲得鎖,哪些執行緒需要到同步佇列裡面排隊阻塞;取得鎖的多個執行緒在碰到佇列滿或空的時候,可以使用Condition 來管理這些執行緒,讓這些執行緒阻塞等待,然後在合適的時機後,被正常喚醒。
同步佇列 條件佇列聯手使用的場景,最多被使用到鎖定 佇列的場景。
所以說條件佇列也是不可或缺的一環。
接下來我們來看看條件佇列一些比較重要的方法,以下方法都在 ConditionObject 內部類別中。
// 线程入条件队列 public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); // 加入到条件队列的队尾 Node node = addConditionWaiter(); // 标记位置 A // 加入条件队列后,会释放 lock 时申请的资源,唤醒同步队列队列头的节点 // 自己马上就要阻塞了,必须马上释放之前 lock 的资源,不然自己不被唤醒的话,别的线程永远得不到该共享资源了 int savedState = fullyRelease(node); int interruptMode = 0; // 确认node不在同步队列上,再阻塞,如果 node 在同步队列上,是不能够上锁的 // 目前想到的只有两种可能: // 1:node 刚被加入到条件队列中,立马就被其他线程 signal 转移到同步队列中去了 // 2:线程之前在条件队列中沉睡,被唤醒后加入到同步队列中去 while (!isOnSyncQueue(node)) { // this = AbstractQueuedSynchronizer$ConditionObject // 阻塞在条件队列上 LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } // 标记位置 B // 其他线程通过 signal 已经把 node 从条件队列中转移到同步队列中的数据结构中去了 // 所以这里节点苏醒了,直接尝试 acquireQueued if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled // 如果状态不是CONDITION,就会自动删除 unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
await 方法有幾點需要特別注意:
上述程式碼標記位置A 處,節點在準備進入條件佇列之前,一定會先釋放目前持有的鎖,不然自己進去條件佇列了,其餘的執行緒都無法取得鎖定了;上述程式碼標記位置B 處,此時節點是被Condition.signal 或signalAll 方法喚醒的,此時節點已經成功的被轉移到同步佇列中去了(整體架構圖中藍色流程),所以可以直接執行acquireQueued 方法;Node 在條件佇列中的命名,原始碼喜歡用Waiter 來命名,所以我們在條件佇列中看到Waiter,其實就是Node。
await 方法中有兩個重要方法:addConditionWaiter 和 unlinkCancelledWaiters,我們一一看下去。
addConditionWaiter 方法主要是把節點放到條件佇列中,方法原始碼如下:
// 增加新的 waiter 到队列中,返回新添加的 waiter // 如果尾节点状态不是 CONDITION 状态,删除条件队列中所有状态不是 CONDITION 的节点 // 如果队列为空,新增节点作为队列头节点,否则追加到尾节点上 private Node addConditionWaiter() { Node t = lastWaiter; // If lastWaiter is cancelled, clean out. // 如果尾部的 waiter 不是 CONDITION 状态了,删除 if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } // 新建条件队列 node Node node = new Node(Thread.currentThread(), Node.CONDITION); // 队列是空的,直接放到队列头 if (t == null) firstWaiter = node; // 队列不为空,直接到队列尾部 else t.nextWaiter = node; lastWaiter = node; return node; }
整體過程比較簡單,就是追加到佇列的尾部,其中有個重要方法叫做unlinkCancelledWaiters,這個方法會刪除掉條件佇列中狀態不是CONDITION 的所有節點,我們來看下unlinkCancelledWaiters 方法的源碼,如下:
// 会检查尾部的 waiter 是不是已经不是CONDITION状态了 // 如果不是,删除这些 waiter private void unlinkCancelledWaiters() { Node t = firstWaiter; // trail 表示上一个状态,这个字段作用非常大,可以把状态都是 CONDITION 的 node 串联起来,即使 node 之间有其他节点都可以 Node trail = null; while (t != null) { Node next = t.nextWaiter; // 当前node的状态不是CONDITION,删除自己 if (t.waitStatus != Node.CONDITION) { //删除当前node t.nextWaiter = null; // 如果 trail 是空的,咱们循环又是从头开始的,说明从头到当前节点的状态都不是 CONDITION // 都已经被删除了,所以移动队列头节点到当前节点的下一个节点 if (trail == null) firstWaiter = next; // 如果找到上次状态是CONDITION的节点的话,先把当前节点删掉,然后把自己挂到上一个状态是 CONDITION 的节点上 else trail.nextWaiter = next; // 遍历结束,最后一次找到的CONDITION节点就是尾节点 if (next == null) lastWaiter = trail; } // 状态是 CONDITION 的 Node else trail = t; // 继续循环,循环顺序从头到尾 t = next; } }
為了方便大家理解這個方法,畫了一個釋義圖,如下:
signal 方法是喚醒的意思,例如之前隊列滿了,有了一些線程因為take 操作而被阻塞進條件隊列中,突然隊列中的元素被線程A 消費了,線程A 就會調用signal 方法,喚醒之前阻塞的線程,會從條件佇列的頭節點開始喚醒(流程見整體架構圖中藍色部分),原始碼如下:
// 唤醒阻塞在条件队列中的节点 public final void signal() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); // 从头节点开始唤醒 Node first = firstWaiter; if (first != null) // doSignal 方法会把条件队列中的节点转移到同步队列中去 doSignal(first); } // 把条件队列头节点转移到同步队列去 private void doSignal(Node first) { do { // nextWaiter为空,说明到队尾了 if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; // 从队列头部开始唤醒,所以直接把头节点.next 置为 null,这种操作其实就是把 node 从条件队列中移除了 // 这里有个重要的点是,每次唤醒都是从队列头部开始唤醒,所以把 next 置为 null 没有关系,如果唤醒是从任意节点开始唤醒的话,就会有问题,容易造成链表的割裂 first.nextWaiter = null; // transferForSignal 方法会把节点转移到同步队列中去 // 通过 while 保证 transferForSignal 能成功 // 等待队列的 node 不用管他,在 await 的时候,会自动清除状态不是 Condition 的节点(通过 unlinkCancelledWaiters 方法) // (first = firstWaiter) != null = true 的话,表示还可以继续循环, = false 说明队列中的元素已经循环完了 } while (!transferForSignal(first) && (first = firstWaiter) != null); }
我們來看下最關鍵的方法:transferForSignal。
// 返回 true 表示转移成功, false 失败 // 大概思路: // 1. node 追加到同步队列的队尾 // 2. 将 node 的前一个节点状态置为 SIGNAL,成功直接返回,失败直接唤醒 // 可以看出来 node 的状态此时是 0 了 final boolean transferForSignal(Node node) { /* * If cannot change waitStatus, the node has been cancelled. */ // 将 node 的状态从 CONDITION 修改成初始化,失败返回 false if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; // 当前队列加入到同步队列,返回的 p 是 node 在同步队列中的前一个节点 // 看命名是 p,实际是 pre 单词的缩写 Node p = enq(node); int ws = p.waitStatus; // 状态修改成 SIGNAL,如果成功直接返回 // 把当前节点的前一个节点修改成 SIGNAL 的原因,是因为 SIGNAL 本身就表示当前节点后面的节点都是需要被唤醒的 if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) // 如果 p 节点被取消,或者状态不能修改成SIGNAL,直接唤醒 LockSupport.unpark(node.thread); return true; }
整個原始碼下來,我們可以看到,喚醒條件佇列中的節點,實際上就是把條件佇列中的節點轉移到同步佇列中,並把其前置節點狀態置為 SIGNAL。
signalAll 的作用是喚醒條件佇列中的全部節點,原始碼如下:
public final void signalAll() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); // 拿到头节点 Node first = firstWaiter; if (first != null) // 从头节点开始唤醒条件队列中所有的节点 doSignalAll(first); } // 把条件队列所有节点依次转移到同步队列去 private void doSignalAll(Node first) { lastWaiter = firstWaiter = null; do { // 拿出条件队列队列头节点的下一个节点 Node next = first.nextWaiter; // 把头节点从条件队列中删除 first.nextWaiter = null; // 头节点转移到同步队列中去 transferForSignal(first); // 开始循环头节点的下一个节点 first = next; } while (first != null); }
從原始碼中可以看出,其本質就是for迴圈呼叫transferForSignal 方法,將條件佇列中的節點循環轉移到同步佇列中去。
AQS 原始碼終於說完了,你都懂了麼,可以默默回憶一下 AQS 架構圖,看看這張圖現在能不能看懂了。
#以上是java同步器AQS架構如何釋放鎖定和同步佇列的詳細內容。更多資訊請關注PHP中文網其他相關文章!