0

0

怎么利用Java手写阻塞队列

WBOY

WBOY

发布时间:2023-05-20 09:28:20

|

1186人浏览过

|

来源于亿速云

转载

    需求分析

    阻塞队列的主要的需求如下:

    • 队列基础的功能需要有,往队列当中放数据,从队列当中取数据。

    • 所有的队列操作都要是并发安全的。

    • 当队列满了之后再往队列当中放数据的时候,线程需要被挂起,当队列当中的数据被取出,让队列当中有空间的时候线程需要被唤醒。

    • 当队列空了之后再往队列当中取数据的时候,线程需要被挂起,当有线程往队列当中加入数据的时候被挂起的线程需要被唤醒。

      立即学习Java免费学习笔记(深入)”;

    • 在我们实现的队列当中我们使用数组去存储数据,因此在构造函数当中需要提供数组的初始大小,设置用多大的数组。

    阻塞队列实现原理

    线程阻塞和唤醒

    在上面我们已经谈到了阻塞队列是并发安全的,而且我们还有将线程唤醒和阻塞的需求,因此我们可以选择可重入锁ReentrantLock保证并发安全,但是我们还需要将线程唤醒和阻塞,因此我们可以选择条件变量Condition进行线程的唤醒和阻塞操作,在Condition当中我们将会使用到的,主要有以下两个函数:

    • signal用于唤醒线程,当一个线程调用Conditionsignal函数的时候就可以唤醒一个被await函数阻塞的线程。

    • await用于阻塞线程,当一个线程调用Conditionawait函数的时候这个线程就会阻塞。

    数组循环使用

    因为队列是一端进一端出,因此队列肯定有头有尾。

    怎么利用Java手写阻塞队列

    当我们往队列当中加入一些数据之后,队列的情况可能如下:

    怎么利用Java手写阻塞队列

    在上图的基础之上我们在进行四次出队操作,结果如下:

    怎么利用Java手写阻塞队列

    在上面的状态下,我们继续加入8个数据,那么布局情况如下:

    微信 WeLM
    微信 WeLM

    WeLM不是一个直接的对话机器人,而是一个补全用户输入信息的生成模型。

    下载

    怎么利用Java手写阻塞队列

    我们知道上图在加入数据的时候不仅将数组后半部分的空间使用完了,而且可以继续使用前半部分没有使用过的空间,也就是说在队列内部实现了一个循环使用的过程。

    为了保证数组的循环使用,我们需要用一个变量记录队列头在数组当中的位置,用一个变量记录队列尾部在数组当中的位置,还需要有一个变量记录队列当中有多少个数据。

    代码实现

    成员变量定义

    根据上面的分析我们可以知道,在我们自己实现的类当中我们需要有如下的类成员变量:

    // 用于保护临界区的锁
    private final ReentrantLock lock;
    // 用于唤醒取数据的时候被阻塞的线程
    private final Condition notEmpty;
    // 用于唤醒放数据的时候被阻塞的线程
    private final Condition notFull;
    // 用于记录从数组当中取数据的位置 也就是队列头部的位置
    private int takeIndex;
    // 用于记录从数组当中放数据的位置 也就是队列尾部的位置
    private int putIndex;
    // 记录队列当中有多少个数据
    private int count;
    // 用于存放具体数据的数组
    private Object[] items;

    构造函数

    我们的构造函数也很简单,最核心的就是传入一个数组大小的参数,并且给上面的变量进行初始化赋值。

    @SuppressWarnings("unchecked")
    public MyArrayBlockingQueue(int size) {
      this.lock = new ReentrantLock();
      this.notEmpty = lock.newCondition();
      this.notFull = lock.newCondition();
      // 其实可以不用初始化 类会有默认初始化 默认初始化为0
      takeIndex = 0;
      putIndex = 0;
      count = 0;
      // 数组的长度肯定不能够小于0
      if (size <= 0)
        throw new RuntimeException("size can not be less than 1");
      items = (E[])new Object[size];
    }

    put函数

    这是一个比较重要的函数了,在这个函数当中如果队列没有满,则直接将数据放入到数组当中即可,如果数组满了,则需要将线程挂起。

    public void put(E x){
      // put 函数可能多个线程调用 但是我们需要保证在给变量赋值的时候只能够有一个线程
      // 因为如果多个线程同时进行赋值的话 那么可能后一个线程的赋值操作覆盖了前一个线程的赋值操作
      // 因此这里需要上锁
      lock.lock();
     
      try {
        // 如果队列当中的数据个数等于数组的长度的话 说明数组已经满了
        // 这个时候需要将线程挂起
        while (count == items.length)
          notFull.await(); // 将调用 await的线程挂起
        // 当数组没有满 或者在挂起之后再次唤醒的话说明数组当中有空间了
        // 这个时候需要将数组入队 
        // 调用入队函数将数据入队
        enqueue(x);
      } catch (InterruptedException e) {
        e.printStackTrace();
      } finally {
        // 解锁
        lock.unlock();
      }
    }
     
    // 将数据入队
    private void enqueue(E x) {
      this.items[putIndex] = x;
      if (++putIndex == items.length)
        putIndex = 0;
      count++;
      notEmpty.signal(); // 唤醒一个被 take 函数阻塞的线程唤醒
    }

    offer函数

    offer函数和put函数一样,但是与put函数不同的是,当数组当中数据填满之后offer函数返回false,而不是被阻塞。

    public boolean offer(E e) {
      final ReentrantLock lock = this.lock;
      lock.lock();
      try {
        // 如果数组满了 则直接返回false 而不是被阻塞
        if (count == items.length)
          return false;
        else {
          // 如果数组没有满则直接入队 并且返回 true
          enqueue(e);
          return true;
        }
      } finally {
        lock.unlock();
      }
    }

    add函数

    这个函数和上面两个函数作用一样,也是往队列当中加入数据,但当单队列满了之后这个函数会抛出异常。

    public boolean add(E e) {
      if (offer(e))
        return true;
      else
        throw new RuntimeException("Queue full");
    }

    take函数

    这个函数主要是从队列当中取出一个数据,但是当队列为空的时候,这个函数会阻塞调用该函数的线程:

    public E take() throws InterruptedException {
      // 这个函数也是不能够并发的 否则可能不同的线程取出的是同一个位置的数据
      // 进行加锁操作
      lock.lock();
      try {
        // 当 count 等于0 说明队列为空
        // 需要将线程挂起等待
        while (count == 0)
          notEmpty.await();
        // 当被唤醒之后进行出队操作
        return dequeue();
      }finally {
        lock.unlock();
      }
    }
     
    private E  dequeue() {
      final Object[] items = this.items;
      @SuppressWarnings("unchecked")
      E x = (E) items[takeIndex];
      items[takeIndex] = null; // 将对应的位置设置为 null GC就可以回收了
      if (++takeIndex == items.length)
        takeIndex = 0;
      count--; // 队列当中数据少一个了
      // 因为出队了一个数据 可以唤醒一个被 put 函数阻塞的线程 如果这个时候没有被阻塞的线程
      // 这个函数就不会起作用 也就说在这个函数调用之后被 put 函数挂起的线程也不会被唤醒
      notFull.signal(); // 唤醒一个被 put 函数阻塞的线程
      return x;
    }

    重写toString函数

    因为我们在后面的测试函数当中会打印我们这个类,而打印这个类的时候会调用对象的toString方法得到一个字符串,最后打印这个字符串。

    @Override
    public String toString() {
      StringBuilder stringBuilder = new StringBuilder();
      stringBuilder.append("[");
      // 这里需要上锁 因为我们在打印的时候需要打印所有的数据
      // 打印所有的数据就需要对数组进行遍历操作 而在进行遍历
      // 操作的时候是不能进行插入和删除操作的 因为打印的是某
      // 个时刻的数据
      lock.lock();
      try {
        if (count == 0)
          stringBuilder.append("]");
        else {
          int cur = 0;
          // 对数据进行遍历 一共遍历 count 次 因为数组当中一共有 count
          // 个数据
          while (cur != count) {
            // 从 takeIndex 位置开始进行遍历 因为数据是从这个位置开始的
            stringBuilder.append(items[(cur + takeIndex) % items.length].toString() + ", ");
            cur += 1;
          }
          // 删除掉最后一次没用的 ", "
          stringBuilder.delete(stringBuilder.length() - 2, stringBuilder.length());
          stringBuilder.append(']');
        }
      }finally {
        lock.unlock();
      }
      return stringBuilder.toString();
    }

    完整代码

    整个我们自己完成的阻塞队列的代码如下:

    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.ReentrantLock;
     
    public class MyArrayBlockingQueue {
     
      // 用于保护临界区的锁
      private final ReentrantLock lock;
      // 用于唤醒取数据的时候被阻塞的线程
      private final Condition notEmpty;
      // 用于唤醒放数据的时候被阻塞的线程
      private final Condition notFull;
      // 用于记录从数组当中取数据的位置 也就是队列头部的位置
      private int takeIndex;
      // 用于记录从数组当中放数据的位置 也就是队列尾部的位置
      private int putIndex;
      // 记录队列当中有多少个数据
      private int count;
      // 用于存放具体数据的数组
      private Object[] items;
     
     
      @SuppressWarnings("unchecked")
      public MyArrayBlockingQueue(int size) {
        this.lock = new ReentrantLock();
        this.notEmpty = lock.newCondition();
        this.notFull = lock.newCondition();
        // 其实可以不用初始化 类会有默认初始化 默认初始化为0
        takeIndex = 0;
        putIndex = 0;
        count = 0;
        if (size <= 0)
          throw new RuntimeException("size can not be less than 1");
        items = (E[])new Object[size];
      }
     
      public void put(E x){
        lock.lock();
     
        try {
          while (count == items.length)
            notFull.await();
          enqueue(x);
        } catch (InterruptedException e) {
          e.printStackTrace();
        } finally {
          lock.unlock();
        }
      }
     
      private void enqueue(E x) {
        this.items[putIndex] = x;
        if (++putIndex == items.length)
          putIndex = 0;
        count++;
        notEmpty.signal();
      }
     
      private E  dequeue() {
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        E x = (E) items[takeIndex];
        items[takeIndex] = null;
        if (++takeIndex == items.length)
          takeIndex = 0;
        count--;
        notFull.signal();
        return x;
      }
     
      public boolean add(E e) {
        if (offer(e))
          return true;
        else
          throw new RuntimeException("Queue full");
      }
     
      public boolean offer(E e) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
          if (count == items.length)
            return false;
          else {
            enqueue(e);
            return true;
          }
        } finally {
          lock.unlock();
        }
      }
     
      public E poll() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
          return (count == 0) ? null : dequeue();
        } finally {
          lock.unlock();
        }
      }
     
      public E take() throws InterruptedException {
        lock.lock();
        try {
          while (count == 0)
            notEmpty.await();
          return dequeue();
        }finally {
          lock.unlock();
        }
      }
     
      @Override
      public String toString() {
        StringBuilder stringBuilder = new StringBuilder();
        stringBuilder.append("[");
        lock.lock();
        try {
          if (count == 0)
            stringBuilder.append("]");
          else {
            int cur = 0;
            while (cur != count) {
              stringBuilder.append(items[(cur + takeIndex) % items.length].toString()).append(", ");
              cur += 1;
            }
            stringBuilder.delete(stringBuilder.length() - 2, stringBuilder.length());
            stringBuilder.append(']');
          }
        }finally {
          lock.unlock();
        }
        return stringBuilder.toString();
      }
     
    }

    现在对上面的代码进行测试:

    我们现在使用阻塞队列模拟一个生产者消费者模型,设置阻塞队列的大小为5,生产者线程会往队列当中加入数据,数据为0-9的10个数字,消费者线程一共会消费10次。

    import java.util.concurrent.TimeUnit;
     
    public class Test {
     
      public static void main(String[] args) throws InterruptedException {
        MyArrayBlockingQueue queue = new MyArrayBlockingQueue<>(5);
        Thread thread = new Thread(() -> {
          for (int i = 0; i < 10; i++) {
            System.out.println(Thread.currentThread().getName() + " 往队列当中加入数据:" + i);
            queue.put(i);
          }
        }, "生产者");
     
     
        Thread thread1 = new Thread(() -> {
          for (int i = 0; i < 10; i++) {
            try {
              System.out.println(Thread.currentThread().getName() + " 从队列当中取出数据:" + queue.take());
              System.out.println(Thread.currentThread().getName() + " 当前队列当中的数据:" + queue);
            } catch (InterruptedException e) {
              e.printStackTrace();
            }
          }
        }, "消费者");
        thread.start();
        TimeUnit.SECONDS.sleep(3);
        thread1.start();
     
      }
    }

    上面代码的输出如下所示:

    生产者 往队列当中加入数据:0生产者 往队列当中加入数据:1生产者 往队列当中加入数据:2生产者 往队列当中加入数据:3生产者 往队列当中加入数据:4生产者 往队列当中加入数据:5消费者 从队列当中取出数据:0生产者 往队列当中加入数据:6消费者 当前队列当中的数据:[1, 2, 3, 4, 5]消费者 从队列当中取出数据:1消费者 当前队列当中的数据:[2, 3, 4, 5]消费者 从队列当中取出数据:2消费者 当前队列当中的数据:[3, 4, 5, 6]生产者 往队列当中加入数据:7消费者 从队列当中取出数据:3消费者 当前队列当中的数据:[4, 5, 6, 7]消费者 从队列当中取出数据:4消费者 当前队列当中的数据:[5, 6, 7]消费者 从队列当中取出数据:5消费者 当前队列当中的数据:[6, 7]生产者 往队列当中加入数据:8消费者 从队列当中取出数据:6消费者 当前队列当中的数据:[7, 8]消费者 从队列当中取出数据:7消费者 当前队列当中的数据:[8]消费者 从队列当中取出数据:8消费者 当前队列当中的数据:[]生产者 往队列当中加入数据:9消费者 从队列当中取出数据:9消费者 当前队列当中的数据:[]

    从上面的输出结果我们知道,生产者线程打印5之后被挂起了,因为如果没有被挂起,生产者线程肯定可以一次性输出完成,因为消费者线程阻塞了3秒。由于阻塞队列已满,他在打印数字5后就未完成输出,导致生产者线程被挂起。一旦消费者开始消费,阻塞队列中就会腾出空间,生产者线程就可以继续生产。

    相关文章

    java速学教程(入门到精通)
    java速学教程(入门到精通)

    java怎么学习?java怎么入门?java在哪学?java怎么学才快?不用担心,这里为大家提供了java速学教程(入门到精通),有需要的小伙伴保存下载就能学习啦!

    下载

    相关标签:

    本站声明:本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn

    相关专题

    更多
    Golang gRPC 服务开发与Protobuf实战
    Golang gRPC 服务开发与Protobuf实战

    本专题系统讲解 Golang 在 gRPC 服务开发中的完整实践,涵盖 Protobuf 定义与代码生成、gRPC 服务端与客户端实现、流式 RPC(Unary/Server/Client/Bidirectional)、错误处理、拦截器、中间件以及与 HTTP/REST 的对接方案。通过实际案例,帮助学习者掌握 使用 Go 构建高性能、强类型、可扩展的 RPC 服务体系,适用于微服务与内部系统通信场景。

    8

    2026.01.15

    公务员递补名单公布时间 公务员递补要求
    公务员递补名单公布时间 公务员递补要求

    公务员递补名单公布时间不固定,通常在面试前,由招录单位(如国家知识产权局、海关等)发布,依据是原入围考生放弃资格,会按笔试成绩从高到低递补,递补考生需按公告要求限时确认并提交材料,及时参加面试/体检等后续环节。要求核心是按招录单位公告及时响应、提交材料(确认书、资格复审材料)并准时参加面试。

    44

    2026.01.15

    公务员调剂条件 2026调剂公告时间
    公务员调剂条件 2026调剂公告时间

    (一)符合拟调剂职位所要求的资格条件。 (二)公共科目笔试成绩同时达到拟调剂职位和原报考职位的合格分数线,且考试类别相同。 拟调剂职位设置了专业科目笔试条件的,专业科目笔试成绩还须同时达到合格分数线,且考试类别相同。 (三)未进入原报考职位面试人员名单。

    58

    2026.01.15

    国考成绩查询入口 国考分数公布时间2026
    国考成绩查询入口 国考分数公布时间2026

    笔试成绩查询入口已开通,考生可登录国家公务员局中央机关及其直属机构2026年度考试录用公务员专题网站http://bm.scs.gov.cn/pp/gkweb/core/web/ui/business/examResult/written_result.html,查询笔试成绩和合格分数线,点击“笔试成绩查询”按钮,凭借身份证及准考证进行查询。

    11

    2026.01.15

    Java 桌面应用开发(JavaFX 实战)
    Java 桌面应用开发(JavaFX 实战)

    本专题系统讲解 Java 在桌面应用开发领域的实战应用,重点围绕 JavaFX 框架,涵盖界面布局、控件使用、事件处理、FXML、样式美化(CSS)、多线程与UI响应优化,以及桌面应用的打包与发布。通过完整示例项目,帮助学习者掌握 使用 Java 构建现代化、跨平台桌面应用程序的核心能力。

    65

    2026.01.14

    php与html混编教程大全
    php与html混编教程大全

    本专题整合了php和html混编相关教程,阅读专题下面的文章了解更多详细内容。

    36

    2026.01.13

    PHP 高性能
    PHP 高性能

    本专题整合了PHP高性能相关教程大全,阅读专题下面的文章了解更多详细内容。

    75

    2026.01.13

    MySQL数据库报错常见问题及解决方法大全
    MySQL数据库报错常见问题及解决方法大全

    本专题整合了MySQL数据库报错常见问题及解决方法,阅读专题下面的文章了解更多详细内容。

    21

    2026.01.13

    PHP 文件上传
    PHP 文件上传

    本专题整合了PHP实现文件上传相关教程,阅读专题下面的文章了解更多详细内容。

    35

    2026.01.13

    热门下载

    更多
    网站特效
    /
    网站源码
    /
    网站素材
    /
    前端模板

    精品课程

    更多
    相关推荐
    /
    热门推荐
    /
    最新课程
    Kotlin 教程
    Kotlin 教程

    共23课时 | 2.5万人学习

    C# 教程
    C# 教程

    共94课时 | 6.8万人学习

    Java 教程
    Java 教程

    共578课时 | 46.2万人学习

    关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
    php中文网:公益在线php培训,帮助PHP学习者快速成长!
    关注服务号 技术交流群
    PHP中文网订阅号
    每天精选资源文章推送

    Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号