jdk1.7.0_79
# 上一節中對並發包中的非阻塞隊列ConcurrentLinkedQueue的入隊、出隊做了一個簡要的分析,本文將對並發包中的阻塞隊列做一個簡要分析。
# Java並發套件中的阻斷佇列一共7個,當然他們都是線程安全的。
# ArrayBlockingQueue:由陣列結構組成的有界阻塞隊列。
# LinkedBlockingQueue:由鍊錶結構組成的有界阻塞佇列。
# PriorityBlockingQueue:支援優先權排序的無界阻塞佇列。
# DealyQueue:一個使用優先權佇列實作的無界阻塞佇列。
# SynchronousQueue:一個不儲存元素的阻塞佇列。
# LinkedTransferQueue:由鍊錶結構組成的無邊界阻塞佇列。
# LinkedBlockingDeque:由鍊錶結構組成的雙向阻塞佇列。 (摘自《Java並發程式設計的藝術》)
在本文對ArrayBlockingQueue阻塞佇列做一個簡單解析
對於ArrayLinkedQueue,放眼看過去其安全性的保證是由ReentrantLock#保證的,有關ReentrantLock的解析可參考《5.Lock介面及其實作ReentrantLock》,下文我也會適當的提及。
首先來查看其建構子:
建構方法 # |
#public ArrayBlockingQueue( |
##建構指定大小的有界佇列 public ArrayBlockingQueue(int capacity, | fair ) |
1 public ArrayBlockingQueue(int capacity) { 2 this(capacity, false);//默认构造非公平锁的阻塞队列 3 } 4 public ArrayBlockingQueue(int capacity, boolean fair) { 5 if (capacity <= 0) 6 throw new IllegalArgumentException(); 7 this.items = new Object[capacity]; 8 lock = new ReentrantLock(fair);//初始化ReentrantLock重入锁,出队入队拥有这同一个锁 9 notEmpty = lock.newCondition;//初始化非空等待队列,有关Condition可参考《6.类似Object监视器方法的Condition接口》10 notFull = lock.newCondition;//初始化非满等待队列 11 } 12 public ArrayBlockingQueue(int capacity, boolean fair, Collecation<? extends E> c) { 13 this(capacity, fair); 14 final ReentrantLock lock = this.lock; 15 lock.lock();//注意在这个地方需要获得锁,这为什么需要获取锁的操作呢? 16 try { 17 int i = 0; 18 try { 19 for (E e : c) { 20 checkNotNull(e); 21 item[i++] = e;//将集合添加进数组构成的队列中 22 } 23 } catch (ArrayIndexOutOfBoundsException ex) { 24 throw new IllegalArgumentException(); 25 } 26 count = i;//队列中的实际数据数量 27 putIndex = (i == capacity) ? 0 : i; 28 } finally { 29 lock.unlock(); 30 } 31 }
在第15行,原始碼裡給了一個註解: Lock only for visibility, not mutual exclusion。這句話的意思就是給出,這個鎖的操作並不是為了互斥操作,而是保證其可見性。執行緒T1是實例化ArrayBlockingQueue對象,T2是對實例化的ArrayBlockingQueue物件做入隊操作(當然要保證T1和T2的執行順序),如果不對它進行加鎖操作(加鎖會保證其可見性,也就是寫回主記憶體),T1的集合c有可能只存在T1執行緒維護的快取中,並沒有寫回主存,T2中實例化的ArrayBlockingQueue維護的快取以及主記憶體中並沒有集合c,此時就因為可見性造成資料不一致的情況,引發執行緒安全性問題。
##。 的插入
|
|
|||
#插入 |
#add(e)//佇列未滿時,傳回true;佇列滿則拋出IllegalStateException(“Queue full”)例外-AbstractQueue |
offer(e)//佇列未滿時,傳回true;佇列滿時回傳false。非阻塞立即返回。 |
//ArrayBlockingQueue#add public boolean add(E e) { return super.add(e); }
//AbstractQueue#add,这是一个模板方法,只定义add入队算法骨架,成功时返回true,失败时抛出IllegalStateException异常,具体offer实现交给子类实现。 public boolean add(E e) { if (offer(e))//offer方法由Queue接口定义 return true; else throw new IllegalStateException(); }
//ArrayBlockingQueue#offer,队列未满时返回true,满时返回false public boolean offer(E e) { checkNotNull(e);//检查入队元素是否为空 final ReentrantLock lock = this.lock; lock.lock();//获得锁,线程安全 try { if (count == items.length)//队列满时,不阻塞等待,直接返回false return false; else { insert(e);//队列未满,直接插入 return true; } } finally { lock.unlock(); } }
//ArrayBlockingQueue#insert private void insert(E e) { items[putIndex] = x; putIndex = inc(putIndex); ++count; notEmpty.signal();//唤醒非空等待队列中的线程,有关Condition可参考《6.类似Object监视器方法的Condition接口》
}
在这里有几个ArrayBlockingQueue成员变量。items即队列的数组引用,putIndex表示等待插入的数组下标位置。当items[putIndex] = x将新元素插入队列中后,调用inc将数组下标向后移动,如果队列满则将putIndex置为0:
//ArrayBlockingQueue#inc private int inc(int i) { return (++i == items.length) ? 0 : i; }
接着解析下put方法,阻塞插入队列,当队列满时不会返回false,也不会抛出异常,而是一直阻塞等待,直到有空位可插入,但它可被中断返回。
//ArrayBlockingQueue#put public void put(E e) throws InterruptedException { checkNotNull(e);//同样检查插入元素是否为空 final ReentrantLock lock = this.lock; lock.lockInterruptibly();//这里并没有调用lock方法,而是调用了可被中断的lockInterruptibly,该方法可被线程中断返回,lock不能被中断返回。 try { while (count == items.length) notFull.await();//当队列满时,使非满等待队列休眠 insert(e);//此时表示队列非满,故插入元素,同时在该方法里唤醒非空等待队列 } finally { lock.unlock(); } }
抛出异常 |
返回值(非阻塞) |
一定时间内返回值 |
返回值(阻塞) |
remove()//队列不为空时,返回队首值并移除;队列为空时抛出NoSuchElementException()异常——AbstractQueue |
poll()//队列不为空时返回队首值并移除;队列为空时返回null。非阻塞立即返回。 |
poll(time, unit)//设定等待的时间,如果在指定时间内队列还未孔则返回null,不为空则返回队首值 |
take(e)//队列不为空返回队首值并移除;当队列为空时会阻塞等待,一直等到队列不为空时再返回队首值。
|
//AbstractQueue#remove,这也是一个模板方法,定义删除队列元素的算法骨架,队列中元素时返回具体元素,元素为空时抛出异常,具体实现poll由子类实现, public E remove() { E x = poll();//poll方法由Queue接口定义 if (x != null) return x; else throw new NoSuchElementException(); }
//ArrayBlockingQueue#poll,队列中有元素时返回元素,不为空时返回null public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { return (count == 0) ? null : extract(); } finally { lock.unlock(); } }
//ArrayBlockingQueue#extract private E extract() { final Object[] items = this.items; E x = this.<E>cast(items[takeIndex]);//移除队首元素 items[takeIndex] = null;//将队列数组中的第一个元素置为null,便于GC回收 takeIndex = inc(takeIndex); --count; notFull.signal();//唤醒非满等待队列线程 return x; }
对比add和offer方法,理解了上两个方法后remove和poll实际不难理解,同理在理解了put阻塞插入队列后,对比take阻塞删除队列元素同样也很好理解。
//ArrayBlockQueue#take public E take() throws InterruptedException { final ReentrantLock lock = this.lock(); lock.lockInterrupted();//这里并没有调用lock方法,而是调用了可被中断的lockInterruptibly,该方法可被线程中断返回,lock不能被中断返回。 try { while (count == 0)//队列元素为空 notEmpty.await();//非空等待队列休眠 return extract();//此时表示队列非空,故删除元素,同时在里唤醒非满等待队列 } finally { lock.unlock(); } }
最后一个方法size。
public int size() { final ReentrantLock lock = this.lock; lock.lock(); try { return count; } finally { lock.unlock(); } }
可以看到ArrayBlockingQueue队列的size方法,是直接返回的count变量,它不像ConcurrentLinkedQueue,ConcurrentLinkedQueue的size则是每次会遍历这个队列,故ArrayBlockingQueue的size方法比ConcurrentLinkedQueue的size方法效率高。而且ConcurrentLinkedQueue的size方法并没有加锁!也就是说很有可能其size并不准确,这在它的注释中说明了ConcurrentLinkedQueue的size并没有多大的用处。
以上是並發包阻塞隊列之ArrayBlockingQueue的詳細內容。更多資訊請關注PHP中文網其他相關文章!