一:基本特性
二:实现原理
三:实现代码
一:基本特性
先进先出、写入队列空间不足时会阻塞、获取队列数据是队列为空会阻塞
二:实现原理
初始化队列、写入队列、消费队列
初始化队列
private String[] items = new String[5];private Object full = new Object();private Object empty = new Object();private static int count;private static int putIndex = 0;private static int getIndex = 0;
写入队列
/** * 写入锁put */ public void put(String t) { synchronized (full) { while (count == items.length) { try { full.wait(); } catch (InterruptedException e) { break; } } } synchronized (empty) { items[putIndex] = t; putIndex++; count++; if (putIndex == items.length) { putIndex = 0; } empty.notify(); } }
消费队列
/** * 消费锁 * @return */ public String get() { synchronized (empty) { while (count == 0) { try { empty.wait(); } catch (InterruptedException e) { return null; } } } synchronized (full) { Object result = items[getIndex]; items[getIndex] = null; getIndex++; count--; if (getIndex == items.length) { getIndex = 0; } full.notify(); return (String) result; } }
这样就可以实现一个队列先进先出的功能。
原理很简单:写入队列,在写入队列是先判断队列中元素是否已满,如果满则进入等待状态;消费队列,在消费前先看下队列中是否有元素,如果有就消费,没有进入等待。和消费者生产者模式类似。
三:实现代码:
我们了解了基本的原理,现在看下jdk是如何实现的
package com.block;import java.util.concurrent.TimeUnit;import java.util.concurrent.locks.Condition;import java.util.concurrent.locks.ReentrantLock;/** * 重写ArrayBlockingQueue* @author l * * @param */public class ArrayQueueryTest3 { private final E[] items;//数组 private final ReentrantLock lock;//锁 private final Condition notEmpty;// private final Condition notFull;// private int count;//总数 private int takeIndex;// private int putIndex;// public ArrayQueueryTest3(int capacity){ this(capacity,false); } public ArrayQueueryTest3(int capacity,boolean fair){ if(capacity <= 0) throw new IllegalArgumentException(); this.items = (E[]) new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); } public void put(E e) throws InterruptedException{ if(e == null) throw new NullPointerException(); final E[] items = this.items; final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { try { while(count == items.length){ notFull.await();//等待 } } catch (Exception e2) { notFull.signal(); throw e2; } insert(e);//插入队列 } finally{ lock.unlock(); } } private void insert(E x){ items[putIndex] = x; putIndex = inc(putIndex); ++count; notEmpty.signal(); } public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try{ try{ while(count == 0) notEmpty.await();//等待 }catch(InterruptedException ie){ notEmpty.signal(); throw ie; } E x = extract(); return x; }finally{ lock.unlock(); } } public E poll(){ final ReentrantLock lock = this.lock; lock.lock(); try { if(count == 0) return null; E x = extract(); return x; } finally { lock.unlock(); } } private E extract(){ final E[] items = this.items; E x = items[takeIndex]; items[takeIndex] = null; takeIndex = inc(takeIndex); --count; notFull.signal(); return x; } final int inc(int i){ return (++i == items.length)? 0: i; } private boolean isEmpty(){ final ReentrantLock lock = this.lock; lock.lock(); try{ return items.length == 0; }finally{ lock.unlock(); } } public int size(){ final ReentrantLock lock = this.lock; lock.lock(); try{ return count; }finally{ lock.unlock(); } } public E poll(long timeout, TimeUnit unit) throws Exception{ long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { if(count != 0){ E x = extract(); return x; } if(nanos <= 0) return null; try { nanos = notEmpty.awaitNanos(nanos);//等待时长 } catch (Exception e) { notEmpty.signal(); throw e; } } } finally { lock.unlock(); } } public static void main(String[] args) throws Exception { ArrayQueueryTest3 at = new ArrayQueueryTest3<>(4, false); at.put("123"); at.put("123"); at.put("123"); System.out.println(at.size()); while(!at.isEmpty()) //如果生产者没有数据,则会一直进入等待状态,即使生产出数据,依旧会处在等待状态 System.out.println(at.take()); //每隔3s会进行一次get即使没有也会去get,当生产数据后可以依旧可以正常执行 System.out.println(at.poll(3000,TimeUnit.MILLISECONDS)); }}