Heim > Java > javaLernprogramm > Java-Blockierungswarteschlange BlockingQueue-Instanzanalyse

Java-Blockierungswarteschlange BlockingQueue-Instanzanalyse

王林
Freigeben: 2023-04-25 15:13:15
nach vorne
1205 Leute haben es durchsucht

Arten von Warteschlangen

  • Unbegrenzte WarteschlangeKeine Kapazitätsbeschränkung, ändert sich nur mit der Speicherung

  • Begrenzte WarteschlangeDefiniert die maximale Kapazität

Elemente zur unendlichen Warteschlange hinzufügen Alle Vorgänge werden niemals blockiert (auch Thread). -safe), so dass es auf sehr große Kapazitäten anwachsen kann. Verwenden einer unendlichen Blockierungswarteschlange BlockingQueue Das Wichtigste beim Entwerfen eines Produzenten-Konsumenten-Modells ist, dass der Verbraucher in der Lage sein sollte, Nachrichten genauso schnell zu konsumieren, wie der Produzent Nachrichten zur Warteschlange hinzufügt. Andernfalls ist möglicherweise nicht genügend Arbeitsspeicher vorhanden und es wird möglicherweise eine OutOfMemory-Ausnahme ausgelöst.

Datenstruktur

  • 1. Normalerweise mit verknüpften Listen oder Arrays implementiert

  • 2 Im Allgemeinen mit FIFO-Eigenschaften (First In First Out), kann es auch als doppelendige Warteschlange konzipiert werden 3. Die Hauptoperationen der Warteschlange:

    Eintreten und Entfernen aus der Warteschlange
  • Blockieren der Warteschlange Blockierender Warteschlange

  • Definition:
In der Thread-Kommunikation gibt es zu jeder Zeit, egal wie hoch die Parallelität ist, auf einer einzelnen JVM nur einen Thread Sie können die Warteschlange immer gleichzeitig mit Warteschlangen- oder Entnahmevorgängen betreten. Blockingqueue kann zwischen Threads ohne explizite Synchronisation geteilt werden. und Producer-Consumer-Modelle im Thread-Pool

Arbeitsprinzip:

Basierend auf ReentrantLock, um Thread-Sicherheit zu gewährleisten und die Blockierung zu implementieren, wenn die Warteschlange gemäß Bedingung voll ist

Java-Blockierungswarteschlange BlockingQueue-Instanzanalyse

LinkedBlockingQueue: Unbegrenzte Warteschlange basierend auf verknüpfter Liste (theoretisch begrenzt)

  • Priority BlockingQueue: Unbegrenzte Prioritätswarteschlange, unterstützt durch Prioritätsheap

    • DelayQueue:

      Zeitbasierte Planungswarteschlange, unterstützt durch Prioritätsheap, intern implementiert basierend auf unbegrenzter Warteschlange PriorityQueue und unbegrenzte Warteschlange basierend auf der Array-Erweiterungsimplementierung

    • Verwendung: Der Warteschlange hinzugefügte Objekte müssen die Delayed-Schnittstelle implementieren, und Delayed wird über die Comparable-Schnittstelle integriert

Anwendungsszenarien:
    Verkauf von Kinokarten usw.
  • Arbeitsprinzip:
  • Die Warteschlange wird intern nach Zeit verarbeitet. Sortiert nach Priorität. Verzögern Sie die Ausführung des Klassen-Thread-Pool-Zyklus.
  • Die alle implementieren die Blockingqueue -Schnittstelle mit Put () und take () Methoden .

    Bedeutung
  • add()

    Wenn das Einfügen erfolgreich ist, geben Sie true zurück, andernfalls wird eine IllegalStateException-Ausnahme ausgelöst
    • put()Fügen Sie das angegebene Element in die Warteschlange ein. es wird blockiert, bis Platz zum Einfügen vorhanden ist

    • offer()Gibt „true“ zurück, wenn das Einfügen erfolgreich ist, andernfalls wird „false“ zurückgegeben

    • offer(E e, long timeout, TimeUnit unit)Versuchen Sie, Elemente einzufügen in die Warteschlange. Wenn die Warteschlange voll ist, wird sie blockiert, bis eine räumliche Einfügung erfolgt. Das Blockieren hat eine Zeitsteuerung

      Holen Sie sich das Kopfelement der Warteschlange und löschen Sie es, wenn die Warteschlange leer ist. Blockieren Sie es dann und warten Sie, bis das Element verfügbar wird. Warten Sie bei Bedarf die angegebene Wartezeit ab, um das Element verfügbar zu machen, und kehren Sie zurück, wenn das Zeitlimit überschritten wird, null

ArrayBlockingQueue 源码简解

实现:同步等待队列(CLH)+ 条件等待队列满足条件的元素在CLH队列中等待锁,不满足条件的队列挪到条件等待队列,满足条件后再从 tail 插入 CLH 队列

线程获取锁的条件: 在 CLH 队列里等待的 Node 节点,并且 Node 节点的前驱节点是 Singal。条件等待队列里的线程是无法获取锁的。

/**
 * 构造方法
 * 还有两个构造函数,一个无fair参数,一个可传入集合,创建时插入队列
 * @param capacity 固定容量
 * @param fair 默认是false:访问顺序未指定; true:按照FIFO顺序处理
 */
public ArrayBlockingQueue(int capacity, boolean fair) {
   if (capacity <= 0)
        throw new IllegalArgumentException();
    this.items = new Object[capacity];
    lock = new ReentrantLock(fair); // 根据fair创建对应的锁
    // 条件对象,配合容器能满足业务
    notEmpty = lock.newCondition(); // 出队条件对象
    notFull =  lock.newCondition(); // 入队条件对象
}
/**
 * 入队方法
 * 在队列的尾部插入指定的元素,如果队列已满,则等待空间可用
 */
public void put(E e) throws InterruptedException {
    checkNotNull(e); // 检查put对象是否为空,空抛出异常
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly(); // 若未被中断尝试获取锁,详见下文
    try {
        // 队列中元素的数量 等于 排队元素的长度
        while (count == items.length)
            notFull.await(); // 见下文
        enqueue(e); // 元素入队
    } finally {
        lock.unlock();
    }
}
/**
 * 出队方法
 * 获取队列的头部元素并将其删除,如果队列为空,则阻塞并等待元素变为可用
 */
public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly(); // 见下文
    try {
        while (count == 0)
            notEmpty.await(); // 见下文
        return dequeue(); // 元素出队
    } finally {
        lock.unlock();
    }
}
Nach dem Login kopieren

令当前线程等待,直到收到信号或被中断详:与此 Condition 关联的锁被自动释放,进入等待,并且处于休眠状态,直到发生以下四种情况之一:

  • ①其他线程调用这个Condition的 signal 方法,当前线程恰好被选为要被唤醒的线程;

  • ②其他线程调用这个条件的 signalAll 方法

  • ③其他线程中断当前线程,支持中断线程挂起;

  • ④一个“虚假的唤醒”发生了。

在这些情况下,在此方法返回之前,当前线程必须重新获得与此条件相关联的锁。当线程返回时,保证它持有这个锁。

如果当前线程有以下两种情况之一:

  • ①在进入该方法时设置中断状态;

  • ②在等待时被中断,支持线程挂起的中断 抛出InterruptedException

生产者消费者模式

BlockingQueue 可以在线程之间共享而无需任何显式同步,在生产者消费者之间,只需要将阻塞队列以参数的形式进行传递即可。它内部的机制会自动保证线程的安全性。

生产者:实现了 Runnable 接口,每个生产者生产100种商品和1个中断标记后完成线程任务

@Slf4j
@Slf4j
public class Producer implements Runnable{
    // 作为参数的阻塞队列
    private BlockingQueue<Integer> blockingQueue;
    private final int stopTag;
    /**
     * 构造方法
     * @param blockingQueue
     * @param stopTag
     */
    public Producer(BlockingQueue<Integer> blockingQueue,int stopTag) {
        this.blockingQueue = blockingQueue;
        this.stopTag = stopTag;
    }
    @Override
    public void run() {
        try {
            generateNumbers();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
   private void generateNumbers() throws InterruptedException {
        // 每个生产者都随机生产10种商品
        for (int i = 0; i < 10; i++) {
            int product = ThreadLocalRandom.current().nextInt(1000,1100);
            log.info("生产者{}号,生产了商品,编号为{}",Thread.currentThread().getId(),product);
            blockingQueue.put(product);
        }
        // 生产终止标记
        blockingQueue.put(stopTag);
        log.info("生产者{}号,生产了第终止标记编号{}",Thread.currentThread().getId(),Thread.currentThread().getId());
    }
}
Nach dem Login kopieren

消费者:消费者拿到终止消费标记终止消费,否则消费商品,拿到终止标记后完成线程任务

@Slf4j
public class Consumer implements Runnable{
    // 作为参数的阻塞队列
    private BlockingQueue<Integer> queue;
    private final int stopTage;
    public Consumer(BlockingQueue<Integer> queue, int stopTage) {
        this.queue = queue;
        this.stopTage = stopTage;
    }
    @Override
    public void run() {
        try {
            while (true) {
                Integer product = queue.take();
                if (product.equals(stopTage)) {
                    log.info("{}号消费者,停止消费,因为拿到了停止消费标记",Thread.currentThread().getId());
                    return;
                }
                log.info("{}号消费者,拿到的商品编号:{}",Thread.currentThread().getId(),product);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}
Nach dem Login kopieren

客户端类: 创建与计算机 CPU 核数相同的线程数,与 16个生产者

public class ProductConsumerTest {
    public static void main(String[] args) {
        // 阻塞队列容量
        int blockingQueueSize = 10;
        // 生产者数量
        int producerSize = 16;
        // 消费者数量 = 计算机线程核数 8
        int consumerSize = Runtime.getRuntime().availableProcessors();
        // 终止消费标记
        int stopTag = Integer.MAX_VALUE;
        BlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<>(blockingQueueSize);
        // 创建16个生产者线程
        for (int i = 0; i < producerSize; i++) {
            new Thread(new Producer(blockingQueue, stopTag)).start();
        }
        // 创建8个消费者线程
        for (int j = 0; j < consumerSize; j++) {
            new Thread(new Consumer(blockingQueue, stopTag)).start();
        }
    }
}
Nach dem Login kopieren

延迟队列 DelayQueue

定义: Java 延迟队列提供了在指定时间才能获取队列元素的功能,队列头元素是最接近过期的元素。没有过期元素的话,使用 poll() 方法会返回 null 值,超时判定是通过getDelay(TimeUnit.NANOSECONDS)方法的返回值小于等于 0 来判断。延时队列不能存放空元素。

/**
 * 电影票类,实现了Delayed接口,重写 compareTo 和 getDelay方法
 */
public class MovieTicket implements Delayed {
    //延迟时间
    private final long delay;
    //到期时间
    private final long expire;
    //数据
    private final String msg;
    //创建时间
    private final long now;
    public long getDelay() {
        return delay;
    }
    public long getExpire() {
        return expire;
    }
    public String getMsg() {
        return msg;
    }
    public long getNow() {
        return now;
    }
    /**
     * @param msg 消息
     * @param delay 延期时间
     */
    public MovieTicket(String msg , long delay) {
        this.delay = delay;
        this.msg = msg;
        expire = System.currentTimeMillis() + delay;    //到期时间 = 当前时间+延迟时间
        now = System.currentTimeMillis();
    }
    /**
     * @param msg
     */
    public MovieTicket(String msg){
        this(msg,1000);
    }
    public MovieTicket(){
        this(null,1000);
    }
    /**
     * 获得延迟时间   用过期时间-当前时间,时间单位毫秒
     * @param unit
     * @return
     */
    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(this.expire
                - System.currentTimeMillis() , TimeUnit.MILLISECONDS);
    }
    /**
     * 用于延迟队列内部比较排序  当前时间的延迟时间 - 比较对象的延迟时间
     * 越早过期的时间在队列中越靠前
     * @param delayed
     * @return
     */
    @Override
    public int compareTo(Delayed delayed) {
        return (int) (this.getDelay(TimeUnit.MILLISECONDS)
                - delayed.getDelay(TimeUnit.MILLISECONDS));
    }
}
Nach dem Login kopieren

测试类:

public static void main(String[] args) {
    DelayQueue<MovieTicket> delayQueue = new DelayQueue<MovieTicket>();
    MovieTicket ticket = new MovieTicket("电影票1",10000);
    delayQueue.put(ticket);
    MovieTicket ticket1 = new MovieTicket("电影票2",5000);
    delayQueue.put(ticket1);
    MovieTicket ticket2 = new MovieTicket("电影票3",8000);
    delayQueue.put(ticket2);
    log.info("message:--->入队完毕");
    while( delayQueue.size() > 0 ){
        try {
            ticket = delayQueue.take();
            log.info("电影票出队:{}",ticket.getMsg());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
Nach dem Login kopieren

从运行结果可以看出队列是延迟出队,间隔和我们所设置的时间相同

Das obige ist der detaillierte Inhalt vonJava-Blockierungswarteschlange BlockingQueue-Instanzanalyse. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Verwandte Etiketten:
Quelle:yisu.com
Erklärung dieser Website
Der Inhalt dieses Artikels wird freiwillig von Internetnutzern beigesteuert und das Urheberrecht liegt beim ursprünglichen Autor. Diese Website übernimmt keine entsprechende rechtliche Verantwortung. Wenn Sie Inhalte finden, bei denen der Verdacht eines Plagiats oder einer Rechtsverletzung besteht, wenden Sie sich bitte an admin@php.cn
Beliebte Tutorials
Mehr>
Neueste Downloads
Mehr>
Web-Effekte
Quellcode der Website
Website-Materialien
Frontend-Vorlage