차단 대기열의 주요 요구 사항은 다음과 같습니다.
큐의 기본 기능은 대기열에 데이터를 넣고 대기열에서 데이터를 가져오는 것입니다.
모든 대기열 작업은 동시성 안전이어야 합니다.
큐가 가득 차서 데이터가 큐에 들어가면 스레드를 일시 중지해야 합니다. 큐에 있는 데이터를 빼낼 때 큐에 공간이 있을 때 스레드를 깨워야 합니다.
큐가 비어 있고 큐에서 데이터를 가져오면 스레드를 일시 중지해야 합니다. 스레드가 큐에 데이터를 추가하면 일시 중지된 스레드를 깨워야 합니다.
우리가 구현한 큐에서는 배열을 사용하여 데이터를 저장하므로 생성자에서 배열의 초기 크기를 제공하고 배열의 크기를 설정해야 합니다.
위에서 우리는 블로킹 큐가 동시성 안전인 것에 대해 이야기했으며, 스레드를 깨우고 차단해야 할 필요성도 있으므로 다음을 선택할 수 있습니다. ReentrantLock
잠금은 동시성 안전을 보장하지만 스레드를 깨우고 차단해야 하므로 조건 변수 Condition
을 선택하여 스레드를 깨우고 차단할 수 있습니다. . Condition
에서는 주로 다음 두 가지 함수를 사용합니다. ReentrantLock
保证并发安全,但是我们还需要将线程唤醒和阻塞,因此我们可以选择条件变量Condition
进行线程的唤醒和阻塞操作,在Condition
当中我们将会使用到的,主要有以下两个函数:
signal
用于唤醒线程,当一个线程调用Condition
的signal
函数的时候就可以唤醒一个被await
函数阻塞的线程。
await
用于阻塞线程,当一个线程调用Condition
的await
函数的时候这个线程就会阻塞。
因为队列是一端进一端出,因此队列肯定有头有尾。
当我们往队列当中加入一些数据之后,队列的情况可能如下:
在上图的基础之上我们在进行四次出队操作,结果如下:
在上面的状态下,我们继续加入8个数据,那么布局情况如下:
我们知道上图在加入数据的时候不仅将数组后半部分的空间使用完了,而且可以继续使用前半部分没有使用过的空间,也就是说在队列内部实现了一个循环使用的过程。
为了保证数组的循环使用,我们需要用一个变量记录队列头在数组当中的位置,用一个变量记录队列尾部在数组当中的位置,还需要有一个变量记录队列当中有多少个数据。
根据上面的分析我们可以知道,在我们自己实现的类当中我们需要有如下的类成员变量:
// 用于保护临界区的锁 private final ReentrantLock lock; // 用于唤醒取数据的时候被阻塞的线程 private final Condition notEmpty; // 用于唤醒放数据的时候被阻塞的线程 private final Condition notFull; // 用于记录从数组当中取数据的位置 也就是队列头部的位置 private int takeIndex; // 用于记录从数组当中放数据的位置 也就是队列尾部的位置 private int putIndex; // 记录队列当中有多少个数据 private int count; // 用于存放具体数据的数组 private Object[] items;
我们的构造函数也很简单,最核心的就是传入一个数组大小的参数,并且给上面的变量进行初始化赋值。
@SuppressWarnings("unchecked") public MyArrayBlockingQueue(int size) { this.lock = new ReentrantLock(); this.notEmpty = lock.newCondition(); this.notFull = lock.newCondition(); // 其实可以不用初始化 类会有默认初始化 默认初始化为0 takeIndex = 0; putIndex = 0; count = 0; // 数组的长度肯定不能够小于0 if (size <= 0) throw new RuntimeException("size can not be less than 1"); items = (E[])new Object[size]; }
这是一个比较重要的函数了,在这个函数当中如果队列没有满,则直接将数据放入到数组当中即可,如果数组满了,则需要将线程挂起。
public void put(E x){ // put 函数可能多个线程调用 但是我们需要保证在给变量赋值的时候只能够有一个线程 // 因为如果多个线程同时进行赋值的话 那么可能后一个线程的赋值操作覆盖了前一个线程的赋值操作 // 因此这里需要上锁 lock.lock(); try { // 如果队列当中的数据个数等于数组的长度的话 说明数组已经满了 // 这个时候需要将线程挂起 while (count == items.length) notFull.await(); // 将调用 await的线程挂起 // 当数组没有满 或者在挂起之后再次唤醒的话说明数组当中有空间了 // 这个时候需要将数组入队 // 调用入队函数将数据入队 enqueue(x); } catch (InterruptedException e) { e.printStackTrace(); } finally { // 解锁 lock.unlock(); } } // 将数据入队 private void enqueue(E x) { this.items[putIndex] = x; if (++putIndex == items.length) putIndex = 0; count++; notEmpty.signal(); // 唤醒一个被 take 函数阻塞的线程唤醒 }
offer函数和put函数一样,但是与put函数不同的是,当数组当中数据填满之后offer函数返回false
,而不是被阻塞。
public boolean offer(E e) { final ReentrantLock lock = this.lock; lock.lock(); try { // 如果数组满了 则直接返回false 而不是被阻塞 if (count == items.length) return false; else { // 如果数组没有满则直接入队 并且返回 true enqueue(e); return true; } } finally { lock.unlock(); } }
这个函数和上面两个函数作用一样,也是往队列当中加入数据,但当单队列满了之后这个函数会抛出异常。
public boolean add(E e) { if (offer(e)) return true; else throw new RuntimeException("Queue full"); }
这个函数主要是从队列当中取出一个数据,但是当队列为空的时候,这个函数会阻塞调用该函数的线程:
public E take() throws InterruptedException { // 这个函数也是不能够并发的 否则可能不同的线程取出的是同一个位置的数据 // 进行加锁操作 lock.lock(); try { // 当 count 等于0 说明队列为空 // 需要将线程挂起等待 while (count == 0) notEmpty.await(); // 当被唤醒之后进行出队操作 return dequeue(); }finally { lock.unlock(); } } private E dequeue() { final Object[] items = this.items; @SuppressWarnings("unchecked") E x = (E) items[takeIndex]; items[takeIndex] = null; // 将对应的位置设置为 null GC就可以回收了 if (++takeIndex == items.length) takeIndex = 0; count--; // 队列当中数据少一个了 // 因为出队了一个数据 可以唤醒一个被 put 函数阻塞的线程 如果这个时候没有被阻塞的线程 // 这个函数就不会起作用 也就说在这个函数调用之后被 put 函数挂起的线程也不会被唤醒 notFull.signal(); // 唤醒一个被 put 函数阻塞的线程 return x; }
因为我们在后面的测试函数当中会打印我们这个类,而打印这个类的时候会调用对象的toString
signal
은 스레드가 Condition
을 호출할 때 사용됩니다. 's code>signal 함수를 호출하면 await
함수에 의해 차단된 스레드를 깨울 수 있습니다. await
는 스레드를 차단하는 데 사용됩니다. 스레드가 Condition
의 await
함수를 호출하면 스레드가 차단됩니다. 어레이 재활용🎜🎜큐는 한쪽 끝에서 들어가고 다른 쪽 끝에서 나가기 때문에 큐에는 머리와 꼬리가 있어야 합니다. 🎜🎜🎜🎜우리가 대기열로 이동 일부 데이터를 추가한 후 대기열은 다음과 같이 보일 수 있습니다: 🎜🎜@Override public String toString() { StringBuilder stringBuilder = new StringBuilder(); stringBuilder.append("["); // 这里需要上锁 因为我们在打印的时候需要打印所有的数据 // 打印所有的数据就需要对数组进行遍历操作 而在进行遍历 // 操作的时候是不能进行插入和删除操作的 因为打印的是某 // 个时刻的数据 lock.lock(); try { if (count == 0) stringBuilder.append("]"); else { int cur = 0; // 对数据进行遍历 一共遍历 count 次 因为数组当中一共有 count // 个数据 while (cur != count) { // 从 takeIndex 位置开始进行遍历 因为数据是从这个位置开始的 stringBuilder.append(items[(cur + takeIndex) % items.length].toString() + ", "); cur += 1; } // 删除掉最后一次没用的 ", " stringBuilder.delete(stringBuilder.length() - 2, stringBuilder.length()); stringBuilder.append(']'); } }finally { lock.unlock(); } return stringBuilder.toString(); }
import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; public class MyArrayBlockingQueue<E> { // 用于保护临界区的锁 private final ReentrantLock lock; // 用于唤醒取数据的时候被阻塞的线程 private final Condition notEmpty; // 用于唤醒放数据的时候被阻塞的线程 private final Condition notFull; // 用于记录从数组当中取数据的位置 也就是队列头部的位置 private int takeIndex; // 用于记录从数组当中放数据的位置 也就是队列尾部的位置 private int putIndex; // 记录队列当中有多少个数据 private int count; // 用于存放具体数据的数组 private Object[] items; @SuppressWarnings("unchecked") public MyArrayBlockingQueue(int size) { this.lock = new ReentrantLock(); this.notEmpty = lock.newCondition(); this.notFull = lock.newCondition(); // 其实可以不用初始化 类会有默认初始化 默认初始化为0 takeIndex = 0; putIndex = 0; count = 0; if (size <= 0) throw new RuntimeException("size can not be less than 1"); items = (E[])new Object[size]; } public void put(E x){ lock.lock(); try { while (count == items.length) notFull.await(); enqueue(x); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } private void enqueue(E x) { this.items[putIndex] = x; if (++putIndex == items.length) putIndex = 0; count++; notEmpty.signal(); } private E dequeue() { final Object[] items = this.items; @SuppressWarnings("unchecked") E x = (E) items[takeIndex]; items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; count--; notFull.signal(); return x; } public boolean add(E e) { if (offer(e)) return true; else throw new RuntimeException("Queue full"); } public boolean offer(E e) { final ReentrantLock lock = this.lock; lock.lock(); try { if (count == items.length) return false; else { enqueue(e); return true; } } finally { lock.unlock(); } } public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { return (count == 0) ? null : dequeue(); } finally { lock.unlock(); } } public E take() throws InterruptedException { lock.lock(); try { while (count == 0) notEmpty.await(); return dequeue(); }finally { lock.unlock(); } } @Override public String toString() { StringBuilder stringBuilder = new StringBuilder(); stringBuilder.append("["); lock.lock(); try { if (count == 0) stringBuilder.append("]"); else { int cur = 0; while (cur != count) { stringBuilder.append(items[(cur + takeIndex) % items.length].toString()).append(", "); cur += 1; } stringBuilder.delete(stringBuilder.length() - 2, stringBuilder.length()); stringBuilder.append(']'); } }finally { lock.unlock(); } return stringBuilder.toString(); } }
import java.util.concurrent.TimeUnit; public class Test { public static void main(String[] args) throws InterruptedException { MyArrayBlockingQueue<Integer> queue = new MyArrayBlockingQueue<>(5); Thread thread = new Thread(() -> { for (int i = 0; i < 10; i++) { System.out.println(Thread.currentThread().getName() + " 往队列当中加入数据:" + i); queue.put(i); } }, "生产者"); Thread thread1 = new Thread(() -> { for (int i = 0; i < 10; i++) { try { System.out.println(Thread.currentThread().getName() + " 从队列当中取出数据:" + queue.take()); System.out.println(Thread.currentThread().getName() + " 当前队列当中的数据:" + queue); } catch (InterruptedException e) { e.printStackTrace(); } } }, "消费者"); thread.start(); TimeUnit.SECONDS.sleep(3); thread1.start(); } }
false
를 반환한다는 점입니다. 막힌. 🎜rrreee🎜add function🎜🎜이 함수는 위의 두 함수와 동일한 기능을 가지고 있습니다. 또한 큐에 데이터를 추가하지만 단일 큐가 가득 차면 이 함수는 예외를 발생시킵니다. 🎜rrreee🎜take 함수🎜🎜이 함수는 주로 대기열에서 데이터를 가져오지만 대기열이 비어 있으면 이 함수는 함수를 호출하는 스레드를 차단합니다. 🎜rrreee🎜toString 함수를 다시 작성하세요🎜🎜왜냐하면 우리는 클래스가 테스트 함수에 출력되고, 이 클래스를 출력할 때 객체의 toString
메서드가 호출되어 문자열을 얻어 최종적으로 문자열이 출력됩니다. 🎜rrreee🎜전체 코드🎜🎜우리가 직접 완성한 전체 차단 큐 코드는 다음과 같습니다. 🎜rrreee🎜이제 위 코드를 테스트해 보세요. 🎜🎜이제 차단 큐를 사용하여 생산자-소비자 모델을 시뮬레이션하고 크기를 설정합니다. 차단 대기열을 5로 지정합니다. 생산자 스레드는 대기열에 데이터를 추가합니다. 데이터는 0부터 9까지의 10개 숫자입니다. 소비자 스레드는 이를 총 10회 소비합니다. 🎜rrreee🎜위 코드의 출력은 다음과 같습니다. 🎜생산자가 대기열에 데이터를 추가합니다: 0
생산자가 대기열에 데이터를 추가합니다: 1
생산자가 대기열에 데이터를 추가합니다: 2
생산자가 대기열에 데이터를 추가합니다: 3
생산자가 대기열에 데이터를 추가합니다: 4
생산자가 대기열에 데이터를 추가합니다: 5
소비자가 대기열에서 데이터를 가져옵니다: 0
생산자가 대기열에 데이터를 추가합니다: 6
소비자의 현재 대기열에 있는 데이터: [1, 2, 3, 4, 5 ]
소비자가 대기열에서 데이터를 가져옵니다: 1
소비자의 현재 대기열에 있는 데이터: [2, 3, 4, 5]
소비자가 대기열에서 데이터를 가져옵니다: 2
소비자의 현재 대기열에 있는 데이터 현재 대기열: [3 , 4, 5, 6]
생산자가 대기열에 데이터를 추가합니다: 7
소비자가 대기열에서 데이터를 제거합니다: 3
소비자의 현재 대기열에 있는 데이터: [4, 5, 6, 7]
소비자는 대기열에서 데이터를 가져옵니다. 가져오는 데이터: 4
소비자의 현재 대기열에 있는 데이터: [5, 6, 7]
소비자가 대기열에서 가져오는 데이터: 5
소비자의 현재 대기열에 있는 데이터 소비자: [6, 7]
생산자가 대기열로 이동합니다. 데이터 추가: 8
소비자는 대기열에서 데이터를 가져옵니다. 6
소비자의 현재 대기열에 있는 데이터: [7, 8]
소비자는 대기열에서 데이터를 가져옵니다. 대기열: 7
소비자의 현재 대기열에 있는 데이터: [8]
소비자가 대기열에서 데이터를 가져옵니다: 8
소비자의 현재 대기열에 있는 데이터: []
생산자가 대기열에 데이터를 추가합니다: 9
소비자는 대기열에서 데이터를 가져옵니다. 9
소비자의 현재 대기열에 있는 데이터:[]
위의 출력 결과에서 생산자 스레드가 5를 인쇄한 후 일시 중단되었음을 알 수 있습니다. 소비자 스레드가 3초 동안 차단되었으므로 생산자 스레드는 확실히 한 번에 출력을 완료할 수 있습니다. 블로킹 큐가 가득 차서 숫자 5를 인쇄한 후 출력을 완료하지 않아 생산자 스레드가 일시 중단되었습니다. 소비자가 소비를 시작하면 차단 대기열에서 공간을 사용할 수 있게 되고 생산자 스레드는 계속해서 생산할 수 있습니다.
위 내용은 Java 필기 차단 대기열 사용 방법의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!