이 기사에서는 주로 Java 동시성 컬렉션 ConcurrentLinkedQueue를 소개합니다. 필요한 친구는
ConcurrentLinkedQueue
ConcurrentLinkedQueue는 "높은 동시성"에 적합한 스레드 안전 대기열입니다. 시나리오.
링크 노드를 기반으로 하는 무제한 스레드 안전 대기열이며 FIFO(선입선출) 원칙에 따라 요소를 정렬합니다. Null 요소는 대기열 요소에 배치될 수 없습니다(내부적으로 구현된 특수 노드 제외).
ConcurrentLinkedQueue 원리 및 데이터 구조
ConcurrentLinkedQueue의 데이터 구조는 아래 그림과 같습니다.
설명:
1 ConcurrentLinkedQueue는 AbstractQueue에서 상속됩니다.
2. ConcurrentLinkedQueue는 연결 목록을 통해 내부적으로 구현됩니다. 여기에는 연결된 목록의 헤드 노드 헤드와 테일 노드 테일이 모두 포함됩니다. ConcurrentLinkedQueue는 FIFO(선입 선출) 원칙에 따라 요소를 정렬합니다. 요소는 꼬리에서 연결 목록에 삽입되고 머리에서 반환됩니다.
3. ConcurrentLinkedQueue의 연결 목록 노드에서 next 유형은 휘발성이며 연결 목록 데이터 항목의 유형도 휘발성입니다. 휘발성과 관련하여 우리는 그 의미에 다음이 포함된다는 것을 알고 있습니다. "즉, 휘발성 변수를 읽으면 항상 이 휘발성 변수에 마지막으로 쓴 모든 스레드를 볼 수 있습니다." ConcurrentLinkedQueue는 휘발성을 사용하여 여러 스레드가 경쟁하는 리소스에 상호 배타적으로 액세스할 수 있도록 합니다.
ConcurrentLinkedQueue 함수 목록
// 创建一个最初为空的 ConcurrentLinkedQueue。 ConcurrentLinkedQueue() // 创建一个最初包含给定 collection 元素的 ConcurrentLinkedQueue,按照此 collection 迭代器的遍历顺序来添加元素。 ConcurrentLinkedQueue(Collection<? extends E> c) // 将指定元素插入此队列的尾部。 boolean add(E e) // 如果此队列包含指定元素,则返回 true。 boolean contains(Object o) // 如果此队列不包含任何元素,则返回 true。 boolean isEmpty() // 返回在此队列元素上以恰当顺序进行迭代的迭代器。 Iterator<E> iterator() // 将指定元素插入此队列的尾部。 boolean offer(E e) // 获取但不移除此队列的头;如果此队列为空,则返回 null。 E peek() // 获取并移除此队列的头,如果此队列为空,则返回 null。 E poll() // 从队列中移除指定元素的单个实例(如果存在)。 boolean remove(Object o) // 返回此队列中的元素数量。 int size() // 返回以恰当顺序包含此队列所有元素的数组。 Object[] toArray() // 返回以恰当顺序包含此队列所有元素的数组;返回数组的运行时类型是指定数组的运行时类型。 <T> T[] toArray(T[] a)
다음은 ConcurrentLinkedQueue를 생성, 추가, 삭제 측면에서 분석한 것입니다.
1 Create
다음은 ConcurrentLinkedQueue()에 대해 설명합니다.
public ConcurrentLinkedQueue() { head = tail = new Node<E>(null); }
설명: constructor에서 새로운 "Null 콘텐츠가 있는 노드"가 생성되고 head와 tail의 값이 새 노드로 설정됩니다.
head와 tail은 다음과 같이 정의됩니다.
private transient volatile Node<E> head; private transient volatile Node<E> tail;
head와 tail은 휘발성 유형이며 휘발성이 제공하는 의미를 갖습니다. "즉, 휘발성 변수를 읽으면 항상 (모든 스레드)를 볼 수 있습니다. "마지막 쓰기 휘발성 변수로".
Node의 선언은 다음과 같습니다.
private static class Node<E> { volatile E item; volatile Node<E> next; Node(E item) { UNSAFE.putObject(this, itemOffset, item); } boolean casItem(E cmp, E val) { return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val); } void lazySetNext(Node<E> val) { UNSAFE.putOrderedObject(this, nextOffset, val); } boolean casNext(Node<E> cmp, Node<E> val) { return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); } // Unsafe mechanics private static final sun.misc.Unsafe UNSAFE; private static final long itemOffset; private static final long nextOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class k = Node.class; itemOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("item")); nextOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("next")); } catch (Exception e) { throw new Error(e); } } }
설명:
Node는 단방향 연결 목록 노드이며 next는 다음 Node를 가리키는 데 사용되고 item은 데이터를 저장하는 데 사용됩니다. Node에서 노드 데이터를 조작하기 위한 API는 모두 Unsafe 메커니즘의 CAS 기능을 통해 구현됩니다. 예를 들어 casNext()는 CAS 기능을 통해 "노드의 다음 노드를 비교하고 설정합니다".
2. 추가
다음은 ConcurrentLinkedQueue의 추가를 설명하기 위해 add(E e)를 사용합니다.
public boolean add(E e) { return offer(e); }
설명: add()는 실제로 추가 작업을 완료하기 위해 Offer()를 호출합니다.
offer()의 소스 코드는 다음과 같습니다.
public boolean offer(E e) { // 检查e是不是null,是的话抛出NullPointerException异常。 checkNotNull(e); // 创建新的节点 final Node<E> newNode = new Node<E>(e); // 将“新的节点”添加到链表的末尾。 for (Node<E> t = tail, p = t;;) { Node<E> q = p.next; // 情况1:q为空 if (q == null) { // CAS操作:如果“p的下一个节点为null”(即p为尾节点),则设置p的下一个节点为newNode。 // 如果该CAS操作成功的话,则比较“p和t”(若p不等于t,则设置newNode为新的尾节点),然后返回true。 // 如果该CAS操作失败,这意味着“其它线程对尾节点进行了修改”,则重新循环。 if (p.casNext(null, newNode)) { if (p != t) // hop two nodes at a time casTail(t, newNode); // Failure is OK. return true; } } // 情况2:p和q相等 else if (p == q) p = (t != (t = tail)) ? t : head; // 情况3:其它 else p = (p != t && t != (t = tail)) ? t : q; } }
설명: Offer(E e)의 기능은 연결 목록의 끝에 요소 e를 추가하는 것입니다. Offer()의 비교는 for 루프를 이해하기 위해 3가지 상황을 구분하여 분석해 보겠습니다.
사례 1 - q가 비어 있습니다. 이는 q가 꼬리 노드 옆의 노드임을 의미합니다. 이때 p.casNext(null, newNode)를 통해 "p의 다음 노드를 newNode"로 설정한다. 설정에 성공하면 "p와 t"를 비교한다(p가 t와 같지 않으면 newNode를 새로운 tail 노드로 설정한다). ), true를 반환합니다. 그렇지 않으면("다른 스레드가 꼬리 노드를 수정했습니다"를 의미) 아무것도 하지 않고 for 루프를 계속합니다.
p.casNext(null, newNode)는 CAS를 호출하여 p. "p의 다음 노드가 null과 같다"면 "p의 다음 노드가 newNode와 같습니다"로 설정합니다. 설정이 성공하면 true를 반환하고, 실패하면 false를 반환합니다.
사례 2 - p와 q는 동일합니다. 언제 이런 일이 일어날까요? "Case 3"을 통해 우리는 "Case 3"을 처리한 후 p의 값이 q와 같을 수 있음을 알 수 있습니다.
이때 tail 노드가 변경되지 않았다면 헤드 노드가 변경되어야 하고, p를 헤드 노드로 설정한 다음 연결 리스트를 다시 순회합니다. 그렇지 않으면(tail 노드가 변경된 경우) p를 다음과 같이 설정합니다. 꼬리 노드.
사례 3 - 기타.
p = (p != t && t != (t = tail)) ? t : q;
if (p==t) { p = q; } else { Node<E> tmp=t; t = tail; if (tmp==t) { p=q; } else { p=t; } }
p와 t가 같으면 p를 q로 설정합니다. 그렇지 않으면 "꼬리 노드가 변경되었는지 여부"를 확인합니다. 변경 사항이 없으면 p를 q로 설정하고, 그렇지 않으면 p를 꼬리 노드로 설정합니다.
checkNotNull()의 소스 코드는 다음과 같습니다.
private static void checkNotNull(Object v) { if (v == null) throw new NullPointerException(); }
3. 다음은 ConcurrentLinkedQueue에서 삭제를 설명하기 위해 poll()을 예로 사용합니다.
public E poll() { // 设置“标记” restartFromHead: for (;;) { for (Node<E> h = head, p = h, q;;) { E item = p.item; // 情况1 // 表头的数据不为null,并且“设置表头的数据为null”这个操作成功的话; // 则比较“p和h”(若p!=h,即表头发生了变化,则更新表头,即设置表头为p),然后返回原表头的item值。 if (item != null && p.casItem(item, null)) { if (p != h) // hop two nodes at a time updateHead(h, ((q = p.next) != null) ? q : p); return item; } // 情况2 // 表头的下一个节点为null,即链表只有一个“内容为null的表头节点”。则更新表头为p,并返回null。 else if ((q = p.next) == null) { updateHead(h, p); return null; } // 情况3 // 这可能到由于“情况4”的发生导致p=q,在该情况下跳转到restartFromHead标记重新操作。 else if (p == q) continue restartFromHead; // 情况4 // 设置p为q else p = q; } } }
说明:poll()的作用就是删除链表的表头节点,并返回被删节点对应的值。poll()的实现原理和offer()比较类似,下面根将or循环划分为4种情况进行分析。
情况1:“表头节点的数据”不为null,并且“设置表头节点的数据为null”这个操作成功。
p.casItem(item, null) -- 调用CAS函数,比较“节点p的数据值”与item是否相等,是的话,设置节点p的数据值为null。
在情况1发生时,先比较“p和h”,若p!=h,即表头发生了变化,则调用updateHead()更新表头;然后返回删除节点的item值。
updateHead()的源码如下:
final void updateHead(Node<E> h, Node<E> p) { if (h != p && casHead(h, p)) h.lazySetNext(h); }
说明:updateHead()的最终目的是更新表头为p,并设置h的下一个节点为h本身。
casHead(h,p)是通过CAS函数设置表头,若表头等于h的话,则设置表头为p。
lazySetNext()的源码如下:
void lazySetNext(Node<E> val) { UNSAFE.putOrderedObject(this, nextOffset, val); }
putOrderedObject()函数,我们在前面一章“TODO”中介绍过。h.lazySetNext(h)的作用是通过CAS函数设置h的下一个节点为h自身,该设置可能会延迟执行。
情况2:如果表头的下一个节点为null,即链表只有一个“内容为null的表头节点”。
则调用updateHead(h, p),将表头更新p;然后返回null。
情况3:p=q
在“情况4”的发生后,会导致p=q;此时,“情况3”就会发生。当“情况3”发生后,它会跳转到restartFromHead标记重新操作。
情况4:其它情况。
设置p=q。
ConcurrentLinkedQueue示例
import java.util.*; import java.util.concurrent.*; /* * ConcurrentLinkedQueue是“线程安全”的队列,而LinkedList是非线程安全的。 * * 下面是“多个线程同时操作并且遍历queue”的示例 * (01) 当queue是ConcurrentLinkedQueue对象时,程序能正常运行。 * (02) 当queue是LinkedList对象时,程序会产生ConcurrentModificationException异常。 * * */ public class ConcurrentLinkedQueueDemo1 { // TODO: queue是LinkedList对象时,程序会出错。 //private static Queue<String> queue = new LinkedList<String>(); private static Queue<String> queue = new ConcurrentLinkedQueue<String>(); public static void main(String[] args) { // 同时启动两个线程对queue进行操作! new MyThread("ta").start(); new MyThread("tb").start(); } private static void printAll() { String value; Iterator iter = queue.iterator(); while(iter.hasNext()) { value = (String)iter.next(); System.out.print(value+", "); } System.out.println(); } private static class MyThread extends Thread { MyThread(String name) { super(name); } @Override public void run() { int i = 0; while (i++ < 6) { // “线程名” + "-" + "序号" String val = Thread.currentThread().getName()+i; queue.add(val); // 通过“Iterator”遍历queue。 printAll(); } } } }
(某一次)运行结果:
ta1, ta1, tb1, tb1, ta1, ta1, tb1, tb1, ta2, ta2, tb2, tb2, ta1, ta1, tb1, tb1, ta2, ta2, tb2, tb2, ta3, tb3, ta3, ta1, tb3, tb1, ta4, ta2, ta1, tb2, tb1, ta3, ta2, tb3, tb2, ta4, ta3, tb4, tb3, ta1, ta4, tb1, tb4, ta2, ta5, tb2, ta1, ta3, tb1, tb3, ta2, ta4, tb2, tb4, ta3, ta5, tb3, tb5, ta4, ta1, tb4, tb1, ta5, ta2, tb5, tb2, ta6, ta3, ta1, tb3, tb1, ta4, ta2, tb4, tb2, ta5, ta3, tb5, tb3, ta6, ta4, tb6, tb4, ta5, tb5, ta6, tb6,
结果说明:如果将源码中的queue改成LinkedList对象时,程序会产生ConcurrentModificationException异常。
위 내용은 Java 동시성 수집에 대한 자세한 설명의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!