ホームページ > Java > &#&チュートリアル > Java AQSの実装原理は何ですか

Java AQSの実装原理は何ですか

PHPz
リリース: 2023-04-23 18:34:08
転載
913 人が閲覧しました

    使用

    AQS の実装原理を理解するために、ReentrantLock を使用します。

    lock

    このメソッドは、ロック操作の取得を開始するためのエントリ ポイントです。このメソッドの実装では、ロックを取得するために sync オブジェクトが渡されます。

    public void lock() {
        sync.acquire(1);
    }
    
    private final Sync sync;
    // Sync对象是一个ReentrantLock实现的内部抽象类,具体的实现又分为了公平版本与非公平两种
    abstract static class Sync extends AbstractQueuedSynchronizer {}
    
    // 在ReentrantLock的无参构造器中,默认使用的实现就是非公平锁的实现
    public ReentrantLock() {
        sync = new NonfairSync();
    }
    
    // 也可以通过带参数的构造器来使用公平锁
    public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }
    ログイン後にコピー

    Sync

    フェア ロック FairSyncNonfairSync の違いは主に tryAcquire メソッド、その他のロジックにあるためです。は同じなので、Sync と AQS の実装を直接見ていきます。

    acquire

    メソッドは、AQS の実装から次のように実装されます。

    // 首先会调用 tryAcquire 和 acquireQueued 方法,如果2个方法都返回true的话,
    // 那么才会调用自行中断的逻辑
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
    ログイン後にコピー

    tryAcquireメソッドには、公平なロックと不公平なロック 2 つの異なる実装、最初に、ReentrantLock のデフォルトの戦略である不公平なロックの実装を見てみましょう。

    NonfairSync.tryAcquire

    このメソッドは、Sync によって実装された nonfairTryAcquire(acquires) メソッドを直接呼び出して返します。

    Sync クラスでの実装

    // 这里的参数 acquires = 1
    final boolean nonfairTryAcquire(int acquires) {
        // 获取当前调用者的线程对象
        final Thread current = Thread.currentThread();
        // 获取AQS中定义的state值,这个state值是AQS的核心之一
        int c = getState();
        // 在ReentrantLock的实现中,state就表示当前是否有线程持有锁,0代表没有线程持有锁,
        // 当前访问的线程就可以继续执行代码,如果大于0则表示当前持有锁的线程的数量。
        // 由于ReentrantLock属于可重入锁,因此,这个值会>=1
        if (c == 0) {
            // 能进来就表示当前没有线程持有锁,那么尝试用CAS获取锁
            if (compareAndSetState(0, acquires)) {
                // 获取锁成功,那么将当前线程设置到AQS中的当前线程中
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        // 如果当前持有锁的就是自己,那么就代表是锁的重入
        else if (current == getExclusiveOwnerThread()) {
            // 累计持有锁的次数
            int nextc = c + acquires;
            // 这里就说明了,state能够设置的最大值就是Int.MAX_VALUE,
            // 当处于MAX_VALUE的时候再加1,那么Int数字的最高位就会变成1,符号位为负
            if (nextc < 0) // overflow
                throw new Error("Maximum lock count exceeded");
            // 更新新的state值
            setState(nextc);
            return true;
        }
        return false;
    }
    ログイン後にコピー

    AQS での実装

    private volatile int state;
    // 当前持有锁的线程对象
    private transient Thread exclusiveOwnerThread;
    
    protected final void setExclusiveOwnerThread(Thread thread) {
        exclusiveOwnerThread = thread;
    }
    ログイン後にコピー

    要約すると、不公平なロック tryAcquire メソッドは、まずロックを保持しているスレッドがあるかどうかを確認します。そうでない場合は、CAS を通じてロックを取得しようとします。ロックが正常に取得されるか、再入力すると、tryAcquire メソッドは次のようになります。 true を返すと、acquire メソッドの条件判定は直接 false を返し、lock メソッドは終了し、スレッドは引き続き次のコードをサポートします。

    FairSync.tryAcquire

    フェア ロックの実装を見てみましょう。一般的なロジックは不公平なロックのロジックと同じです。

    FairSync での実装

    // 这里的参数 acquire = 1
    protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        // 判断当前是不是有线程持有锁
        if (c == 0) {
            // 当前没有线程持有锁就进来
            // 由于是公平锁,那么就要保证只有在当前等待队列为空或者队列中等待的线程
            // 都没有到运行的条件的时候,才尝试通过CAS来获取锁。否则就去乖乖排队
            if (!hasQueuedPredecessors() &&
                compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        // 同非公平锁,锁重入
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
    ログイン後にコピー

    AQS での実装

    // 检查当前AQS等待队列中是否有正在等待的有效线程节点
    public final boolean hasQueuedPredecessors() {
        Node h, s;
        // 首先让参数h指向当前队列的头部
        if ((h = head) != null) {
            // 队列不为空
            // 将临时变量赋值为当前第二个节点
            // 这里需要简单说明一下AQS的等待队列的构成,第一个节点是没有业务含义的,
            // 只是用作唤醒下一个待执行的线程节点
            if ((s = h.next) == null || s.waitStatus > 0) {
                // 能进来就表示当前队列只有一个头结点,或者第二个节点的状态是已取消
                // - > 参考说明1
                // 如果第二个节点不为空,那么就释放这个引用
                s = null; // traverse in case of concurrent cancellation
                // 从后往前遍历,找到距离队列头最近的有效节点
                for (Node p = tail; p != h && p != null; p = p.prev) {
                    if (p.waitStatus <= 0)
                        s = p;
                }
            }
            // 如果找到了正在队列中的排队的有效节点并且不是当前访问的线程,那么就返回true
            if (s != null && s.thread != Thread.currentThread())
                return true;
        }
        // 头结点指向NULL,那么说明队列是空的,直接返回false
        return false;
    }
    ログイン後にコピー

    説明 1: ここでキュー ノードを紹介します。 重要な概念待機状態の ReentrantLock では、CANCELLEDSIGNAL に注意するだけで済みます。 CANCELLED のみが 0 より大きく、新しいノードのデフォルト値は 0 です。したがって、待機ステータスが 0 より大きい限り、ノードはキャンセルされたことを意味します。

    // 等待队列中节点的等待状态
    volatile int waitStatus;
    // 当前节点因为等待超时或者被中断了被取消
    static final int CANCELLED =  1;
    // 接下来有资格被唤醒获得锁的标记,只有获得了这个标记的节点才能被执行完的线程唤醒
    static final int SIGNAL    = -1;
    ログイン後にコピー

    要約すると、公平なロックと比較して、最初にロックを取得する機会がある場合、現在のキューに実行を待機しているスレッドがすでに存在するかどうかが最初にチェックされます。空の場合、または有効なキューイング ノードがない場合は、ロックが取得されます。ロックが正常に取得されるか、ロックが再入力されると、AQS ロジックも終了し、ビジネス コードは引き続き実行されます。

    acquireQueued

    上記の分析後、tryAcquire メソッドでロックの取得に成功すると、AQS のロジックは終了します。ロックを取得する ロジックは、acquire メソッドの条件判定の 2 番目の条件判定です。

    AQS での実装

    // 这个方法就会将当前线程添加到等待队列中,并且返回操作是否成功,arg就是传入的acquire值,为1
    acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
    ログイン後にコピー

    まず、addWaiter メソッドを使用してスレッド オブジェクトを含むノードを構築し、それをキューに追加します

    // 这里的参数为 Node.EXCLUSIVE,表示这是一个排它锁的实现,这里的值为NULL
    private Node addWaiter(Node mode) {
        // 构建一个线程节点
        Node node = new Node(mode);
    
        // 这里就是AQS的核心理念了,通过不断的自旋,将线程节点插入到队列中
        for (;;) {
            // 获取原来的队列的队尾,因为AQS才去的尾插方式
            Node oldTail = tail;
            if (oldTail != null) {
                // 将新插入的节点指向原来的尾结点
                node.setPrevRelaxed(oldTail);
                // 通过CAS的方式将当前节点设置到线程共享队列的尾部去,这里要注意,
                // 凡是涉及到多线程操作的属性,都需要通过CAS保证操作的原子性
                if (compareAndSetTail(oldTail, node)) {
                    oldTail.next = node;
                    // 设置成功,就返回插入的节点对象;如若不成功,
                    // 就表示有别的线程也修改了尾结点,那么就要等下一次循环重试
                    return node;
                }
            } else {
                // 没有尾结点说明队列不存在,那么就进行初始化
                initializeSyncQueue();
            }
        }
    }
    
    // 初始化等待队列
    private final void initializeSyncQueue() {
        Node h;
        // 还是通过CAS的方式,给队列初始化一个默认的Node节点,几个重要的属性的初始值如下
        // waitStatus = 0; thread = null
        if (HEAD.compareAndSet(this, null, (h = new Node())))
            tail = h;
    }
    ログイン後にコピー

    要約すると、addWaiter メソッドは、末尾挿入によってロックを取得していないスレッドを Node ノードにカプセル化し、それを AQS 待機キューに挿入します。キューが初期化されていない場合は、最初にキューを初期化すると、キューには実際には意味のないヘッド ノードが追加されます。

    acquireQueued

    AQS での実装

    // arg就是传入的acquire参数,为1;该方法返回值的含义为,线程在等待过程中是否被中断
    final boolean acquireQueued(final Node node, int arg) {
        boolean interrupted = false;
        try {
            // 还是不断的自旋,等待机会进行操作
            for (;;) {
                // 获取新插入节点的上一个节点
                final Node p = node.predecessor();
                // 如果上一个节点已经是头结点,那么说明当前就已经轮到当前线程获取锁执行业务了
                // 那么就再次尝试抢一次锁
                if (p == head && tryAcquire(arg)) {
                    // 能进来就说明获取锁成功了
                    // 那么就将当前线程的节点设置为头结点
                    // 在这个方法中,会去除节点中的信息,做一个纯粹的头结点
                    setHead(node);
                    // 将已经没有指向的原头结点的next指为空,等待回收
                    p.next = null; // help GC
                    // 这里返回的值为false,因为当前线程并未被阻塞就获得了锁
                    return interrupted;
                }
                // 走到这里说明当前线程并没有获取到锁,那么就要考虑是否要将线程阻塞了
                if (shouldParkAfterFailedAcquire(p, node))
                    interrupted |= parkAndCheckInterrupt();
            }
        } catch (Throwable t) {
            cancelAcquire(node);
            if (interrupted)
                selfInterrupt();
            throw t;
        }
    }
    
    private void setHead(Node node) {
        head = node;
        node.thread = null;
        node.prev = null;
    }
    
    // 线程获取锁失败之后,是否需要将线程阻塞,这里2个参数,
    // pred是新插入节点的上一个节点,node是新插入的节点
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        // 获取上一个节点的等待状态
        int ws = pred.waitStatus;
        // 判断上一个节点的状态是不是SIGNAL,只有状态为SIGNAL的才是有效的可指向节点
        if (ws == Node.SIGNAL)
            // 如果上一个节点是SIGNAL状态,那就说明当前线程可以连接上该节点,然后被挂起了
            return true;
        // 上面我们提到过,只有节点被取消了,等待状态才会>0
        if (ws > 0) {
            // 一直往前找,直到找到等待状态<=0的,数据规范的话即找到最近的一个等待状态
            // 为SIGNAL的节点,跳过全部被取消的节点
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            // 此时pred指向的即第一个合法的线程节点,指向当前新插入的节点
            pred.next = node;
        } else {
            // 这里的意思就是上一个节点就是有效节点,那么就将上一个节点的等待状态强制
            // 更新为SIGNAL,即-1。毫无意义的那个头结点也会被设置为SIGNAL状态
            pred.compareAndSetWaitStatus(ws, Node.SIGNAL);
        }
        return false;
    }
    ログイン後にコピー

    要約すると、このメソッドの意味は、現在のスレッドがロック リソースを取得していないため、それをブロックする必要があります。この方法では、同時に待機キューもソートされ、キャンセルされたノードがキューから削除されます。次のステップでは、条件判断で実行メソッドを呼び出し、スレッドを一時停止およびブロックします。

    private final boolean parkAndCheckInterrupt() {
        // 调用了park方法,底层是调用UnSafe类的park方法来实现
        LockSupport.park(this);
        return Thread.interrupted();
    }
    ログイン後にコピー

    この時点で、lock メソッドの状況はすべて明らかです。スレッドがロックを取得できる場合は、ロック フェーズを直接終了します。ロックを取得できない場合は、待機キューに入ります。キューに入る前に、まだロックを取得できる可能性があることが判明した場合は、再度ロックを取得しようとします。それでも取得できない場合は、待機キューに入ります。末尾の挿入によってスレッドをブロックし、LockSupport.park メソッドを呼び出してウェイクアップされるのを待ちます。

    unlock

    このメソッドをアクティブに呼び出した後は、ロックの占有が終了したことを意味します。

    ReentrantLock での実装

    public void unlock() {
        sync.release(1);
    }
    ログイン後にコピー

    AQS での実装

    public final boolean release(int arg) {
        // 调用tryRelease方法真正实现解除锁
        // 只有state加的所有锁被解除了,那么才会唤醒下一个线程
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
    ログイン後にコピー

    Sync での実装

    // 这里的releases就是传入的参数1,即如果是重入锁,那么这里需要解锁多次
    protected final boolean tryRelease(int releases) {
        // 计算state的值,这里的含义就是state-1
        int c = getState() - releases;
        // 如果当前线程不是持有锁的线程,那么就报错
        if (Thread.currentThread() != getExclusiveOwnerThread())
            throw new IllegalMonitorStateException();
        // 锁是否完全释放完的标记
        boolean free = false;
        // state减到0说明锁已经完全释放完了
        if (c == 0) {
            free = true;
            setExclusiveOwnerThread(null);
        }
        // 更新state的值
        setState(c);
        // 只有锁被完全释放完,才返回true
        return free;
    }
    ログイン後にコピー

    AQS で実装

    // 唤醒下一个需要锁的线程,这里的node是头结点
    private void unparkSuccessor(Node node) {
        // 获得头结点的等待状态
        int ws = node.waitStatus;
        // 如果头结点是SIGNAL,那么重置为0,因为这个节点已经没有意义了,会被移除
        if (ws < 0)
            node.compareAndSetWaitStatus(ws, 0);
        // 获得头结点后面的待唤醒的节点
        Node s = node.next;
        // 如果这个待唤醒的节点为空或者等待状态不正确,在这里就是不等于SIGNAL
        if (s == null || s.waitStatus > 0) {
            s = null;
            // 从尾部开始查询,找到合法的待唤醒节点
            for (Node p = tail; p != node && p != null; p = p.prev)
                if (p.waitStatus <= 0)
                    s = p;
        }
        if (s != null)
            // 唤醒线程
            LockSupport.unpark(s.thread);
    }
    ログイン後にコピー

    cancelAcquire

    このメソッドは、ロックの取得を待機しているスレッドをキャンセルする機能を提供します

    AQS

    private void cancelAcquire(Node node) {
        // 健壮性判断,节点非空才可以被取消
        if (node == null)
            return;
    
        // 将节点中的线程指向去掉
        node.thread = null;
    
        // 获取到当前节点的上一个节点
        Node pred = node.prev;
        // 通过循环,跳过所有当前被取消节点之前的也已经被标记取消的节点
        while (pred.waitStatus > 0)
            node.prev = pred = pred.prev;
    
        // 通过上面的循环以后pred的值就为等待队列中队尾的一个合法的未被取消的节点
        // 获取到该合法节点下一个指向的节点
        Node predNext = pred.next;
    
        // 标记当前被取消的节点的等待状态为被取消
        node.waitStatus = Node.CANCELLED;
    
        // 如果被取消的节点就是队尾的节点,那么就通过CAS将pred设置为尾结点
        // 就可以抛弃中间那些同样被标记为被取消的节点,如果有的是
        if (node == tail && compareAndSetTail(node, pred)) {
            // 将pred的下一个节点指向NULL,因为pred现在就是队尾
            pred.compareAndSetNext(predNext, null);
        } else {
            // 这说明取消的节点不是尾结点,而是中间的节点
            // 这个值会在下面的条件判断被赋值为上一个节点的等待状态
            int ws;
            // 如果找到的上一个合法节点不是头结点
            // 并且上一个节点的等待状态是SIGNAL,并且将不是SIGNAL状态的负数状态转换为SIGNAL
            // 并且线程不为空
            if (pred != head &&
                ((ws = pred.waitStatus) == Node.SIGNAL ||
                 (ws <= 0 && pred.compareAndSetWaitStatus(ws, Node.SIGNAL))) &&
                pred.thread != null) {
                // 能进来就说明被取消的节点处于中间,那么就要将这个node从队列中跳过
                Node next = node.next;
                // 如果被取消的节点是一个有效的节点,不为空并且状态也是对的
                if (next != null && next.waitStatus <= 0)
                    // 那么就将上一个节点指向被取消节点的下一个节点
                    pred.compareAndSetNext(predNext, next);
            } else {
                // 能进入这里说明上一个合法节点已经是头结点了,
                // 那么就说明被取消的这个节点已经是原本除了头结点以外的最靠前面的节点,
                // 那么被取消的这个节点其实就等价于头结点了,应该唤醒后面还在等待的线程节点
                // 唤醒下一个被挂起的线程,具体已经分析过了,这里就省略了
                unparkSuccessor(node);
            }
    
            node.next = node;
        }
    }
    ログイン後にコピー
    で実装

    以上がJava AQSの実装原理は何ですかの詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

    関連ラベル:
    ソース:yisu.com
    このウェブサイトの声明
    この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。
    人気のチュートリアル
    詳細>
    最新のダウンロード
    詳細>
    ウェブエフェクト
    公式サイト
    サイト素材
    フロントエンドテンプレート