博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
实现ArrayBlockingQueue-阻塞队列
阅读量:6785 次
发布时间:2019-06-26

本文共 4415 字,大约阅读时间需要 14 分钟。

hot3.png

一:基本特性

二:实现原理

三:实现代码

一:基本特性

    先进先出、写入队列空间不足时会阻塞、获取队列数据是队列为空会阻塞

二:实现原理

    初始化队列、写入队列、消费队列

初始化队列

private String[] items = new String[5];private Object full = new Object();private Object empty = new Object();private static int count;private static int putIndex = 0;private static int getIndex = 0;

写入队列

/**	 * 写入锁put	 */	public void put(String t) {		synchronized (full) {			while (count == items.length) {				try {					full.wait();				} catch (InterruptedException e) {					break;				}			}		}		synchronized (empty) {			items[putIndex] = t;			putIndex++;			count++;			if (putIndex == items.length) {				putIndex = 0;			}			empty.notify();		}	}

消费队列

/**	 * 消费锁	 * @return	 */	public String get() {		synchronized (empty) {			while (count == 0) {				try {					empty.wait();				} catch (InterruptedException e) {					return null;				}			}		}		synchronized (full) {			Object result = items[getIndex];			items[getIndex] = null;			getIndex++;			count--;			if (getIndex == items.length) {				getIndex = 0;			}			full.notify();			return (String) result;		}	}

这样就可以实现一个队列先进先出的功能。

原理很简单:写入队列,在写入队列是先判断队列中元素是否已满,如果满则进入等待状态;消费队列,在消费前先看下队列中是否有元素,如果有就消费,没有进入等待。和消费者生产者模式类似。

三:实现代码:

我们了解了基本的原理,现在看下jdk是如何实现的

package com.block;import java.util.concurrent.TimeUnit;import java.util.concurrent.locks.Condition;import java.util.concurrent.locks.ReentrantLock;/** * 重写ArrayBlockingQueue
* @author l * * @param
*/public class ArrayQueueryTest3
{ private final E[] items;//数组 private final ReentrantLock lock;//锁 private final Condition notEmpty;// private final Condition notFull;// private int count;//总数 private int takeIndex;// private int putIndex;// public ArrayQueueryTest3(int capacity){ this(capacity,false); } public ArrayQueueryTest3(int capacity,boolean fair){ if(capacity <= 0) throw new IllegalArgumentException(); this.items = (E[]) new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); } public void put(E e) throws InterruptedException{ if(e == null) throw new NullPointerException(); final E[] items = this.items; final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { try { while(count == items.length){ notFull.await();//等待 } } catch (Exception e2) { notFull.signal(); throw e2; } insert(e);//插入队列 } finally{ lock.unlock(); } } private void insert(E x){ items[putIndex] = x; putIndex = inc(putIndex); ++count; notEmpty.signal(); } public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try{ try{ while(count == 0) notEmpty.await();//等待 }catch(InterruptedException ie){ notEmpty.signal(); throw ie; } E x = extract(); return x; }finally{ lock.unlock(); } } public E poll(){ final ReentrantLock lock = this.lock; lock.lock(); try { if(count == 0) return null; E x = extract(); return x; } finally { lock.unlock(); } } private E extract(){ final E[] items = this.items; E x = items[takeIndex]; items[takeIndex] = null; takeIndex = inc(takeIndex); --count; notFull.signal(); return x; } final int inc(int i){ return (++i == items.length)? 0: i; } private boolean isEmpty(){ final ReentrantLock lock = this.lock; lock.lock(); try{ return items.length == 0; }finally{ lock.unlock(); } } public int size(){ final ReentrantLock lock = this.lock; lock.lock(); try{ return count; }finally{ lock.unlock(); } } public E poll(long timeout, TimeUnit unit) throws Exception{ long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { if(count != 0){ E x = extract(); return x; } if(nanos <= 0) return null; try { nanos = notEmpty.awaitNanos(nanos);//等待时长 } catch (Exception e) { notEmpty.signal(); throw e; } } } finally { lock.unlock(); } } public static void main(String[] args) throws Exception { ArrayQueueryTest3
at = new ArrayQueueryTest3<>(4, false); at.put("123"); at.put("123"); at.put("123"); System.out.println(at.size()); while(!at.isEmpty()) //如果生产者没有数据,则会一直进入等待状态,即使生产出数据,依旧会处在等待状态 System.out.println(at.take()); //每隔3s会进行一次get即使没有也会去get,当生产数据后可以依旧可以正常执行 System.out.println(at.poll(3000,TimeUnit.MILLISECONDS)); }}

 

转载于:https://my.oschina.net/869088067/blog/3037089

你可能感兴趣的文章
TEMP表空间之Ogg复制进程占用
查看>>
java中的构造函数总结
查看>>
windows下kangle虚拟主机-安装mysql教程及心得
查看>>
我的友情链接
查看>>
ios中SQLite的重构封装
查看>>
centos 搭建 nagios 监控系统.
查看>>
管理禁忌小记录(一)
查看>>
遍历接口信息
查看>>
Dell R710 服务器更新windows server 2012的相关问题
查看>>
编程中最神奇的数字,你知道吗?
查看>>
数据可视化:柱状图、雷达图等六种基本图表的特点和适用场合
查看>>
选择器 :gt(index)
查看>>
notes on python
查看>>
kafa
查看>>
资源 | Feature Tools:可自动构造机器学习特征的Python库
查看>>
linux Shell 中常用的条件判断
查看>>
angular 动态设置blob链接给 ng-href时遇到unsafe 解决方案
查看>>
Java与Highcharts实例(四) - Hello Highcharts (后台Java传递数
查看>>
连接数据库的操作 总结
查看>>
Android 小米手机开发APP图标更换后还显示原来的图标
查看>>