DelayQueue实现原理

news/2025/2/22 5:38:12

文章目录

  • 类图结构及概要
  • 核心方法
    • offer方法
    • take方法
    • poll方法
    • size方法
  • 总结

DelayQueue 并发队列是 一个无界阻塞延迟队 列 ,队列中的每个元素都有个过期时 间,当从队列获取元素时,只有过期元素才会出队列。队列头元素是最快要过期的元素。

类图结构及概要

在这里插入图片描述
DelayQueue 内部使用 PriorityQueue 存放数据,使用 ReentrantLock 实现线程同步 。另外,队列里面的元素要实现 Delayed 接口,由于每个元素都有一个过期时间 ,所以要实现获知当前元素还剩下多少时 间就过期了的接口,由于内部使用优先级队列来实现,所以要实现元素之间相互比较的接口。

条件变量 available 与 lock 锁是对应的,其目的是为了实现线程问同步 。其中 leader 变量的使用基于 Leader-Follower 模式的变体,用于尽量减少不必要的线程等待。当一个线程调用队列的 take 方法变为 leader 线程后,它会调用条件变量 available.awaitNanos(delay) 等待 delay 时间,但是其他线程 (follwer 线程) 则 会调用 available.await()进行无限等待 。 leader 线程延迟时间过期后,会退出 take 方法 , 并通过调用available.signal()方法唤醒一个 follwer 线程 ,被唤醒的 follwer 线程被选举为新的 leader 线程 。

核心方法

offer方法

插入元素到队列,如果插入元素为 null 则抛 出 NullPointerException 异常 , 否则由于是无界队列 , 所以一直返回 true。插入元素要实现 Delayed 接口。

``

public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
q.offer(e);
if (q.peek() == e) {
leader = null;
available.signal();
}
return true;
} finally {
lock.unlock();
}
}

``

如上代码首先获取独占锁,然后添加元素到优先级队列,由于 q 是优先级队列,所以添加元素后,调用 q.peek() 方法返回的并不一定是当前添加的元素。如果代码( 2 )判断结果为 true,则说明当前元素 e 是最先将过期的 , 那么重置 leader 线程为 null , 这时候激活 avaliab\e 变量条件队列里面的一个线程,告诉它队列里面有元素了。

take方法

获取并移除队列里面延迟时间过期的元素,如果队列里面没有过期元素则等待。

``

public E take() throws InterruptedException {

    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (;;) {
        //  获取但不移除队首元素( 1)
            E first = q.peek();
            if (first == null)
                available.await();
            else {
                long delay = first.getDelay(NANOSECONDS);
                if (delay <= 0)
                    return q.poll();
                first = null; // don't retain ref while waiting
                if (leader != null)
                    available.await();
                else {
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        available.awaitNanos(delay);
                    } finally {
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        if (leader == null && q.peek() != null)
            available.signal();
        lock.unlock();
    }

}

``

如上代码首先获取独占锁 lock 。 假设线程 A 第一次调用 队列的 take() 方法时队列为空 ,则 执行代码(1)后 first==null ,所以会执行代码 (2) 把当前线程放入 available 的条件队列里阻塞等待。

当有另 外一个线程 B 执行 offer ( item )方法并且添加元素到队列时 , 假设此时没有其他线程执行入队操作 ,则线程 B 添加 的元素是队首元素 , 那么执行 qpeek() 。

e 这时候就会重置 leader 线程为 null ,并且激活条件变量的条件队列里面的一个线程。此时线程 A 就会被激活。

线程 A 被撤活并循环后重新获取 队首元素,这时候 first 就是线程 B 新增的元素,可知这时候 first 不为 null , 则调用 first. getDelay(TimeUnit.NANOSECONDS) 方法查看该元素还剩余多 少时间就要过期,如 果 de lay<=O 则说明 己经过期,那么直接出队返回。否则查看 leader 是否为 null ,不为 null 则说明其他线程也在执行 take ,则把该线程放入条件队列。如果这时候 leader 为 null,则 :izt取当前线程 A 为 leader 线程 , 然后执行代码(5 ) 等待 delay时间(这期间该线程会释放锁,所以其他线程可 以 offer 添加元素,也可以 take 阻塞自己) ,剩余过期时间到后,线程 A 会重新竞争得到锁,然后重置 leader 线程为 null , 重新进入循环,这时候就会发现队头的元素己经过期了,则会直接返回队头元素 。

在返回前会执行 finally 块里面的代码( 7 ),代码 (7 )执行结果为 true 则说明 当前线程从队列移除过期元素后,又有其他线程执行了入队操作,那么这时候调用条件变量的singa l 方法,激活条件队列里面的等待线程。

poll方法

获取并移除队头过期元素,如果没有过期元素则返回 null 。首先获取独占锁,然后获取队头元素,如果队头元素为 null 或者还没过期则返回 null ,否则返回队头元素。

size方法

计算队列元素个数,包含过期的和没有过期的。首先获取独占锁,然后调用优先级队列 的 size 方法。

总结

DelayQueue 队列,其内部使用 PriorityQueue 存放数据,使用 ReentrantLock 实现线程同步 。另 外队列里面的 元素要实现 Delayed 接口,其中一个是获取当前元素到过期时间剩余时间的接 口,在出队时判断元素是否过期 了,一个是元素之间比较的接口 ,因为这是一个有优先级的队列 。


http://www.niftyadmin.cn/n/5858210.html

相关文章

Jenkins 调用 Shell 脚本,在Shell脚本中调用 Unity 类方法,传递参数给Unity

Jenkins 调用 Shell 脚本&#xff0c;在Shell脚本中调用 Unity 类方法&#xff0c;传递参数给Unity 还是以 Jenkins 调用 Pipeline 方式 调用 Shell Pipeline 脚本如下 pipeline {agent anystages {stage(Test Parameter) {steps {script {// shell 脚本目录SHELL_UNITY_PAT…

dl学习笔记(11):VGG,NIN,GooleNet经典架构pytorch实现

&#xff08;1&#xff09;VGG16&#xff1a; 先给出架构图&#xff1a; 实现&#xff1a; class VGG16(nn.Module):def __init__(self):super().__init__()self.feature_part nn.Sequential(nn.Conv2d(3,64,3,padding1),nn.ReLU(inplaceTrue),nn.Conv2d(64,64,3,padding1),…

高效率:转换效率高达 96%,可有效减少能源损耗

WD5030 的特点 高效率&#xff1a;转换效率高达 96%&#xff0c;可有效减少能源损耗&#xff0c;降低设备发热&#xff0c;提高能源利用效率&#xff0c;延长电池供电设备的续航时间135。 精准输出电压&#xff1a;内置可调线路补偿和可调输出电压功能&#xff0c;输出电压精度…

SpringAI系列 - RAG篇(三) - ETL

目录 一、引言二、组件说明三、集成示例一、引言 接下来我们介绍ETL框架,该框架对应我们之前提到的阶段1:ETL,主要负责知识的提取和管理。ETL 框架是检索增强生成(RAG)数据处理的核心,其将原始数据源转换为结构化向量并进行存储,确保数据以最佳格式供 AI 模型检索。 …

使用GitLab和GitLab-Runner建立CICD流水线

1.安装部署 使用docker-compose来部署gitlab系统,创建一个用于存放gitlab的目录: # 创建gitlab存储目录 mkdir -p /opt/docker/gitlab # 进入到存储目录中 cd /opt/docker/gitlab # 创建docker-compose.yml文件 touch docker-compose.yml在docker-compose.yml中加入以下配…

Spring Boot 自动装配原理深度剖析

一、引言 在 Java 开发领域&#xff0c;Spring 框架无疑是中流砥柱。而 Spring Boot 的出现&#xff0c;更是极大地简化了 Spring 应用的搭建和开发过程。其中&#xff0c;自动装配原理是 Spring Boot 的核心亮点之一&#xff0c;它让开发者无需手动编写大量繁琐的配置代码&am…

基于微信小程序的宿舍报修管理系统设计与实现,SpringBoot(15500字)+Vue+毕业论文+指导搭建视频

运行环境 jdkmysqlIntelliJ IDEAmaven3微信开发者工具 项目技术SpringBoothtmlcssjsjqueryvue2uni-app 宿舍报修小程序是一个集中管理宿舍维修请求的在线平台&#xff0c;为学生、维修人员和管理员提供了一个便捷、高效的交互界面。以下是关于这些功能的简单介绍&#xff1a; …

汽车迷你Fakra连接器市场报告:未来几年年复合增长率CAGR为21.3%

汽车微型 Fakra 连接器是汽车行业的专用连接器&#xff0c;用于连接汽车内的各种电子控制单元 (ECU)、传感器和通信系统。Fakra 连接器以其坚固耐用的设计而著称&#xff0c;这对于抵御汽车环境中的高温、振动、潮湿和化学物质等恶劣环境至关重要。 据QYResearch调研团队最新报…