## ##static final class Node<E> {
E item;//入队元素 Node<E> next;//指向后继节点 Node(E x) {
item = x;
ArrayBlockingQueue is slightly different. 在第12行中获取锁是为了保证可见性,这个的原因我认为是,线程T1是实例化LinkedBlockingQueue对象,T2是对实例化的LinkedBlockingQueue对象做入队操作(当然要保证T1和T2的执行顺序),如果不对它进行加锁操作(加锁会保证其可见性,也就是写回主存),T1的集合c有可能只存在T1线程维护的缓存中,并没有写回主存,T2中实例化的LinkedBlockingQueue维护的缓存以及主存中并没有集合c,此时就因为可见性造成数据不一致的情况,引发线程安全问题。 在了解完LinkedBlockingQueue的构造方法后,我们回过头来看LinkedBlockingQueue的两个成员变量: 可见LinkedBlockingQueue中有两个锁,一个是为了锁住入队操作,一个是为了锁住出队操作。而在ArrayBlockingQueue中则只有一个锁,同时锁住队列的入队、出队操作。 这两个成员变量则是线程等待队列,一个是出队锁上的等待队列,一个是入队锁上的等待队列。在ArrayBlockingQueue也有两个等待队列,一个是非空等待队列,另一个则是非满等待队列,在这一点上两者一致。 #throw an exception Return value (non-blocking)
; when the queue is full, it throws IllegalStateException(“Queue full”)Exception——AbstractQueue false. Non-blocking returns immediately. LinkedBlockingQueue中并没有像ArrayBlockingQueue那样重写了AbstractQueue的add方法而直接调用父类的add方法,所以LinkedBlockingQueue#add方法与ArrayBlockingQueue#add一样,都是直接调用其AbstractQueue。 在第10行是获取插入锁,和ArrayBlockingQueue只有一个锁不同的是,LinkedBlockingQueue分为入队锁和出队锁,也就是说对于ArrayBlockingQueue同时只能有一个线程对它进行入队或者出队操作,而对于LinkedBlockingQueue来说同时能有两个线程对队列进行入队或者出队操作。 前两个add和offer方法都是非阻塞的,对于put方法则是阻塞的,线程会一直阻塞直到线程非空或者非满,但是它在阻塞时能被线程中断返回。 队列插入的最后一个方法来看上面出现的enqueue入队方法。 抛出异常 返回值(非阻塞) 一定时间内返回值 返回值(阻塞) remove()//队列不为空时,返回队首值并移除;队列为空时抛出NoSuchElementException()异常——AbstractQueue poll()//队列不为空时返回队首值并移除;队列为空时返回null。非阻塞立即返回。 poll(time, unit)//设定等待的时间,如果在指定时间内队列还未孔则返回null,不为空则返回队首值 take(e)//队列不为空返回队首值并移除;当队列为空时会阻塞等待,一直等到队列不为空时再返回队首值。 前两个remove和poll方法都是非阻塞的,对于take方法则是阻塞的,线程会一直阻塞直到线程非空或者非满,但是它在阻塞时能被线程中断返回。 队列出队的最后一个方法来看上面出现的dequeue入队方法。 最后一个方法size。 1 public LinkedBlockingQueue() {
2 this(Integer.MAX_VALUE)//默认构造容量为int型的最大值队列 3 }
4 public LinkedBlockingQueue(int capacity) {
5 if (capacity <= o) throw new IllegalArgumentException();
6 this.capacity = capacity;
7 last = head = new Node<E>(null);//头指针和尾指针指向头节点(null) 8 }
9 public LinkedBlockingQueue(Collection<? extends E> c ) {
10 this(Integer.MAX_VALUE);
11 final ReentrantLock putLock = this.putLock;
12 putLock.lock();//这里和ArrayBlockingQueue也会获取锁,但它同样不是为了互斥操作,同样也是为了保证其可见性。 13 try {
14 int n = 0;
15 for (E e : c) {
16 if (e == null)
17 throw new NullPointerException();
18 if (n == capacity)
19 throw new IllegalStateException("Queue full");
20 enqueue(new Node<E>(e));//入队 21 ++n;
22 }
23 count.set(n);
24 } finally {
25 putLock.unlock();
26 }
27 }
private final ReentrantLock takeLock = new ReentrantLock();
private final ReentrantLock putLock = new ReentrantLock();
private final Condition notEmpty = takeLock.newCondition();
private final Condition notFull = putLock.newCondition();
add(e)// #offer( e) //When the queue is not full, return Set the waiting time. If data cannot be inserted into the queue within the specified time, false
//AbstractQueue#add,这是一个模板方法,只定义add入队算法骨架,成功时返回true,失败时抛出IllegalStateException异常,具体offer实现交给子类实现。 public boolean add(E e) {
if (offer(e))//offer方法由Queue接口定义 return true;
else throw new IllegalStateException();
1 //LinkedBlockingQueue#offer 2 public boolean offer(E e) {
3 if (e == null) throw new NullPointerException();
4 final AtomicInteger count = this.count;//原子型int变量,线程安全,指向队列数据量引用 5 if (count.get() == capacity) //当数据量等于队列容量时,无法入队,返回false 6 return false;
7 int c = -1;
8 Node<E> node = new Node(e);
9 final ReentrantLock putLock = this.putLock;//插入锁 10 putLock.lock();//获得插入锁 11 try {
12 if (count.get() < capacity) {
13 enqueuer(node);//入队 14 c = count.getAndIncrement();//队列数据总数自增+1后返回 15 if (c + 1 < capacity)
16 notFull.signal();//唤醒非满等待队列上的线程 17 }
18 } finally {
19 putLock.unlock();
20 }
21 if (c == 0)
22 signalNotEmpty();//队列中刚好有一个数据,唤醒非空等待队列 23 return c >= 0
24 }
//LinkedBlockingQueue#put public void put(E e) throws InterruptedException {
if (e == null) throws new NullPointerException();
int c = -1;
Node<E> node = new Node(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterrupted();//能被线程中断地获取锁 try {
while (count.get() == capacity) {//队列数据量等于队列容量 notFull.await();//休眠非满等待队列上的线程 }
enqueuer(node);//入队 c = count.getAndIncrement();//队列数据总数自增+1后返回 if (c + 1 < capacity)//还没有达到队列容量 notFull.signal();//唤醒非满等待队列上的线程 } finally {
if (c == 0)
signalNotEmpty();//唤醒非空等待队列上的线程 }
private void enqueuer(Node<E> node) {
last = last.next = node;//将LinkedBlockingQueue中指向队尾的last.next指向新加入的node节点 }
//AbstractQueue#remove,同样这也是一个模板方法,定义删除队列元素的算法骨架,具体实现由子类来实现poll方法 public E remove() {
E x = poll();//poll方法由Queue接口定义 if (x != null)
return x;
else throw new NoSuchElementException();
AtomicInteger count = (count.get() == 0 = c = -1 ReentrantLock takeLock = (count.get() > 0) { x = dequeuer(); c = count.getAndDecrement(); ( c > 1 (c ==
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
take.lockInterruptibly();//可被线程中断返回地获取锁 try {
while (count.get() == 0) {//队列数据为空 notEmpty.await();//休眠非空等待队列上的线程 }
x = dequeuer();//此时非空等待队列上的线程被唤醒,队列数据不为空,出队 c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();//唤醒非空等待队列上的线程 } finally {
if (c == capacity)
signalNotFull();//唤醒非满等待队列 return x;
private E dequeue() {
Node<E> h = head;//头节点,为空 Node<E> first = h.next;
h.next = h;//此时没有节点指向头节点,便于GC head = first;
E x = first.item;
first.item = null;
return x;
public int size() {
return count.get();//和ArrayBlockingQueue类似,与ConcurrentLinkedQueue不同,没有遍历整个队列,而是直接返回count变量。此处的count是AtomicInteger变量。 }
