Unbounded queue No capacity limit, only changes with storage
Bounded queue Defines the maximum capacity
All operations that add elements to the infinite queue will never block (also thread-safe), so It can grow to very large capacities. Using an infinite blocking queue BlockingQueue The most important thing when designing a producer-consumer model is that the consumer should be able to consume messages as fast as the producer adds messages to the queue. Otherwise, there may be insufficient memory and an OutOfMemory exception may be thrown.
Data structure
1. Usually implemented using linked lists or arrays
2. Generally with FIFO (first in, first out) Feature, it can also be designed as a double-ended queue
3. The main operations of the queue:Entering and dequeuing
Blocking QueueBlockingQueue
Definition: In thread communication, at any time, no matter how high the concurrency is, on a single JVM, only one thread can always queue or enqueue the queue at the same time. Dequeue operation. BlockingQueue can be shared between threads without any explicit synchronization
Types of blocking queues:
In JAVA Application scenarios: Thread pool, SpringCloud-Eureka three-level cache, Nacos, MQ, Netty, etc.
Common blocking queues
ArrayBlockingQueue: Bounded queue supported by array
Application scenario: There are many applications and producer-consumer models in the thread pool
Working principle: Based on ReentrantLock to ensure thread safety, and based on Condition to achieve blocking when the queue is full
LinkedBlockingQueue: Unbounded queue based on linked list (theoretically bounded)
##PriorityBlockingQueue: By Unbounded priority queue supported by the priority heap
DelayQueue: A time-based scheduling queue supported by the priority heap, internally implemented based on the unbounded queue PriorityQueue, and Array-based expansion implementation of unbounded queue
Instructions for use: The objects added to the queue must implement the Delayed interface, and Delayed is integrated from the Comparable interface
Application scenarios: Selling movie tickets, etc.
## Working principle:
The queue will be prioritized according to time Sort. Delay class thread pool cycle execution.
They all implement the BlockingQueue interface and have methods such as put() and take()
. The creation method is as follows:
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<> (666);
Copy after login
BlockingQueue API
Add element:
##Method
Meaning
add()
Returns true if the insertion is successful, otherwise an IllegalStateException exception is thrown
put()
Insert the specified element Queue, if the queue is full, will block until there is space to insert
offer()
Returns true if the insertion is successful, otherwise returns false
offer(E e, long timeout, TimeUnit unit)
Try to insert elements into the queue. If the queue is full, it will block until there is space to insert, and the blocking has time control
Retrieve elements:
Method
Meaning
take()
Get the head element of the queue and delete it. If the queue is empty, block and wait for the element to become available
poll (long timeout, TimeUnit unit)
Retrieve and remove the head of the queue, if necessary, wait the specified wait time for the element to be available, or return null if the timeout occurs
/**
* 电影票类,实现了Delayed接口,重写 compareTo 和 getDelay方法
*/
public class MovieTicket implements Delayed {
//延迟时间
private final long delay;
//到期时间
private final long expire;
//数据
private final String msg;
//创建时间
private final long now;
public long getDelay() {
return delay;
}
public long getExpire() {
return expire;
}
public String getMsg() {
return msg;
}
public long getNow() {
return now;
}
/**
* @param msg 消息
* @param delay 延期时间
*/
public MovieTicket(String msg , long delay) {
this.delay = delay;
this.msg = msg;
expire = System.currentTimeMillis() + delay; //到期时间 = 当前时间+延迟时间
now = System.currentTimeMillis();
}
/**
* @param msg
*/
public MovieTicket(String msg){
this(msg,1000);
}
public MovieTicket(){
this(null,1000);
}
/**
* 获得延迟时间 用过期时间-当前时间,时间单位毫秒
* @param unit
* @return
*/
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(this.expire
- System.currentTimeMillis() , TimeUnit.MILLISECONDS);
}
/**
* 用于延迟队列内部比较排序 当前时间的延迟时间 - 比较对象的延迟时间
* 越早过期的时间在队列中越靠前
* @param delayed
* @return
*/
@Override
public int compareTo(Delayed delayed) {
return (int) (this.getDelay(TimeUnit.MILLISECONDS)
- delayed.getDelay(TimeUnit.MILLISECONDS));
}
}
Copy after login
测试类:
public static void main(String[] args) {
DelayQueue<MovieTicket> delayQueue = new DelayQueue<MovieTicket>();
MovieTicket ticket = new MovieTicket("电影票1",10000);
delayQueue.put(ticket);
MovieTicket ticket1 = new MovieTicket("电影票2",5000);
delayQueue.put(ticket1);
MovieTicket ticket2 = new MovieTicket("电影票3",8000);
delayQueue.put(ticket2);
log.info("message:--->入队完毕");
while( delayQueue.size() > 0 ){
try {
ticket = delayQueue.take();
log.info("电影票出队:{}",ticket.getMsg());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
Copy after login
从运行结果可以看出队列是延迟出队,间隔和我们所设置的时间相同
The above is the detailed content of Java blocking queue BlockingQueue instance analysis. For more information, please follow other related articles on the PHP Chinese website!
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn