首页 > Java > java教程 > 正文

怎么掌握Java LinkedBlockingQueue

王林
发布: 2023-04-18 16:04:03
转载
1123 人浏览过

    队列在生活中随处可见,医院缴费需要排队、做核酸需要排队、汽车等红绿灯需要排队等等。

    怎么掌握Java LinkedBlockingQueue

    队列是一个按照先来到就排在前面,后来到排在后面的数据结构,并且出队的时候也是按照先来到先出队。使用数组和链表进行实现。通常用于协调任务的执行和数据的交换。

    介绍

    LinkedBlockingQueue 是一个可选有界阻塞队列,有界指的是队列存在一个最大容量;阻塞指的是如果队列已经满了,想要往队列继续添加元素的话,那么这个操作将会被暂停,直到队列中有空位才会继续完成添加操作。如果队列已经为空,想要从队列中获取元素,那么这个操作将会被暂停,直接队列中存在元素才会继续完成获取操作。

    实现原理

    LinkedBlockingQueue 内部使用链表作为元素的存储结构。内部使用了两个锁,分别使用于存操作和取操作。

    执行存取操作时,都必须先获取锁,才可以执行存取操作,保证 LinkedBlockingQueue 是线程安全。

    LinkedBlockingQueue 通过两个 Condition 条件队列,一个 notFull 条件,一个 notEmpty 条件。在对队列进行插入元素操作时,判断当前队列已经满,则通过 notFull 条件将线程阻塞,直到其他线程通知该线程队列可以继续插入元素。在对队列进行移除元素操作时,判断当前队列已经空,则通过 notEmpty 条件阻塞线程,直到其他线程通过该线程可以继续获取元素。

    这样保证线程的存取操作不会出现错误。避免队列在满时,丢弃插入的元素;也避免在队列空时取到一个 null 值。

    构造函数

    public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
    }
    
    public LinkedBlockingQueue(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
        last = head = new Node<E>(null);
    }
    登录后复制

    无参构造函数中,默认使用 Integer.MAX_VALUE 作为队列的最大容量。

    有参构造函数中,可以自己指定队列的最大容量,并且创建了头节点和尾节点。那么 LinkedBlockingQueue 使用的是有头单向链表

    private final int capacity;
    
    /** Current number of elements */
    private final AtomicInteger count = new AtomicInteger();
    
    transient Node<E> head;
    
    private transient Node<E> last;
    
    // 取锁
    private final ReentrantLock takeLock = new ReentrantLock();
    
    private final Condition notEmpty = takeLock.newCondition();
    
    // 存锁
    private final ReentrantLock putLock = new ReentrantLock();
    
    private final Condition notFull = putLock.newCondition();
    登录后复制

    并且在对象初始化时,创建了两个锁,分别使用于存操作和取操作。创建了两个条件队列,分别用于队列空和满的情况。

    插入函数

    public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        final int c;
        final Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();
        try {
            while (count.get() == capacity) {
                notFull.await();
            }
            enqueue(node);
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
    }
    登录后复制

    1.获取锁

    2.判断当前队列是否已经满了

    • 如果队列已经满了,调用 notFull 条件队列的 await() 方法,将该线程阻塞,暂停该线程的插入操作。避免内部溢出的问题。

    • 如果没有满,则直接调用入队函数 enqueue 插入到队列末尾。

    3.检查此时队列是否已满

    如果未满,则调用 notFull 条件队列的 signal() 方法,唤醒被阻塞在 notFull 条件队列的线程。

    4.解锁

    • 检查插入元素前的队列元素数量是否等于0

    • 等于 0,则调用 notEmpty 条件队列的 signal() 方法,通知其队列现在不为空,可以唤醒阻塞线程获取元素。

    怎么掌握Java LinkedBlockingQueue

    为什么需要调用 notFull 条件队列的 signal() 方法? 因为队列取操作和存操作所使用的锁是不一样的,那么就说明,一个线程执行存入操作时,其他线程是可以执行取出操作的。我们来看下面这个例子:

    怎么掌握Java LinkedBlockingQueue

    • 队列总容量为 5,当前元素数量为5。线程 A 获取了存锁,想要插入了元素。但是因为队列容量已满,释放锁,并且加入到条件队列中,等待被唤醒。

    • 线程 B 获取了存锁,想要插入了元素。但是因为队列容量已满,释放锁,并且加入到条件队列中,等待被唤醒。

    • 线程 C 获取了取锁,取出了元素 1。并且通过 notFull 的 signal 方法唤醒条件队列中被阻塞的线程 A。线程 A 被唤醒后加入到同步队列中,但是此时还没有竞争到锁。

    • 线程 D 获取了取锁,取出了元素 2。但是还没有执行到唤醒阻塞线程的代码。

    • 线程 A 竞争到锁,开始执行插入元素操作。将元素插入之后,检查到队列元素数量为 4,小于队列的总容量,因此会执行 notFull 的 signal 方法唤醒条件队列中被阻塞的线程 B。线程 B 被唤醒后加入到同步队列中,开始竞争锁。

    • 线程 B 竞争到锁,开始执行插入元素操作。将元素插入之后,检查到队列元素数量等于 5,不进行唤醒操作。

    这样做的目的是尽快的唤醒阻塞线程,可以更快的完成插入元素操作。因为线程存和取的操作相互之间并不是互斥的,而是独立运行的,提高吞吐量。

    获取函数

    public E take() throws InterruptedException {
        final E x;
        final int c;
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
            while (count.get() == 0) {
                notEmpty.await();
            }
            x = dequeue();
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    }
    登录后复制

    1.获得取锁

    2.判断当前队列是否为空

    • 如果队列没有元素,调用 notEmpty 条件队列的 await() 方法,将该线程阻塞,暂停该线程的获取操作。避免获取元素出错。

    • 如果不为空,则直接调用出队函数 dequeue 移除队列第一个元素,并返回给客户端。

    3.检查此时队列是否为空

    如果不为空,则调用 notEmpty 条件队列的 signal() 方法,唤醒被阻塞在 notEmpty 条件队列的线程。

    4.释放锁

    5.检查获取元素前的队列元素数量是否等于最大容量

    等于最大容量,因为此时已经取出一个元素,因此队列处于未满的状态,可以唤醒阻塞在 notFull 条件的线程,让线程继续插入元素。

    怎么掌握Java LinkedBlockingQueue

    步骤 3 的目的是尽快的唤醒阻塞线程,可以更快的完成取元素操作。提高吞吐量。可以尝试自己画出流程图。

    入队函数

    private void enqueue(Node<E> node) {
        last = last.next = node;
    }
    登录后复制

    入队函数极其简单,只要将最后一个元素的 next 指针指向当前元素即完成了插入操作。

    怎么掌握Java LinkedBlockingQueue

    出队函数

    private E dequeue() {
        // assert takeLock.isHeldByCurrentThread();
        // assert head.item == null;
        Node<E> h = head;
        Node<E> first = h.next;
        h.next = h; // help GC
        head = first;
        E x = first.item;
        first.item = null;
        return x;
    }
    登录后复制

    我们前面说 LinkedBlockingQueue 使用的是有头链表。头节点只是作为一个标志,实际上并不是一个真正的元素。当获取元素时,将头节点的下一个节点作为头节点,将原来的头节点取消引用,被垃圾回收即可。

    怎么掌握Java LinkedBlockingQueue

    应用场景

    适用场景

    LinkedBlockingQueue 和 ArrayBlockingQueue 一样适用于多个线程之间需要共享数据、协调任务执行的场景。因此可以总结出以下几个应用场景:

    线程池:线程池是一个常见的并发编程模型,它通过线程池中的线程执行任务。并且可以重复使用这些线程。在线程池中,可以使用 LinkedBlockingQueue 来存储需要执行的任务,以此控制任务数量和执行顺序。当线程池中的线程执行完任务之后,可以从 LinkedBlockingQueue 中取出下一个任务执行。

    生产者-消费者:在生产者-消费者模型中,生产者负责生产数据,消费者负责对数据进行处理。在这种模式下,LinkedBlockingQueue 可以作为生产者与消费者之间的数据通道,保证线程安全和数据正确。

    实际应用场景

    • Nacos: Nacos 是一个动态服务发现、配置和服务管理平台,它使用 LinkedBlockingQueue 来实现内部的任务队列。

    • Tomcat:从 Tomcat 7 开始,请求队列默认使用了 LinkedBlockingQueue 实现。

    • Hystrix: 一个流行的容错框架,其默认使用 LinkedBlockingQueue 作为请求队列。

    以上是怎么掌握Java LinkedBlockingQueue的详细内容。更多信息请关注PHP中文网其他相关文章!

    相关标签:
    来源:yisu.com
    本站声明
    本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn
    热门教程
    更多>
    最新下载
    更多>
    网站特效
    网站源码
    网站素材
    前端模板