【RocketMQ 存储】- 同步刷盘和异步刷盘

news/2025/2/9 5:41:06 标签: java-rocketmq, rocketmq, java, 刷盘

文章目录

  • 1. 前言
  • 2. 概述
  • 3. submitFlushRequest 提交刷盘请求
  • 4. FlushDiskWatcher 同步刷盘监视器
  • 5. 同步刷盘但是不需要等待刷盘结果
  • 6. 小结


本文章基于 RocketMQ 4.9.3

1. 前言

RocketMQ 存储部分系列文章:

  • 【RocketMQ 存储】- RocketMQ存储类 MappedFile
  • 【RocketMQ 存储】- 一文总结 RocketMQ 的存储结构-基础
  • 【RocketMQ 存储】- broker 端存储单条消息的逻辑
  • 【RocketMQ 存储】- broker 端存储批量消息的逻辑

上一篇文章中,我们解析了 RocketMQ 是如何存储单条消息和批量消息的,但是消息也只是存储到了 CommitLog#MappedFile#ByteBuffer,也就是内容还是没有刷盘,那么这篇文章就来介绍下刷盘的逻辑。


2. 概述

RocketMQ 的刷盘有两种策略,同步刷盘异步刷盘

  • 同步刷盘只有在消息真正持久化至磁盘后 RocketMQ 的 Broker 端才会真正返回给 Producer 端一个成功的 ACK 响应。同步刷盘对 MQ 消息可靠性来说是一种不错的保障,但是性能上会有较大影响,一般适用于金融业务应用该模式较多。
  • 异步刷盘能够充分利用 OS 的PageCache的优势,只要消息写入PageCache即可将成功的ACK返回给Producer端。消息刷盘采用后台异步线程提交的方式进行,降低了读写延迟,提高了MQ的性能和吞吐量。

消息写入 CommitLog 的 ByteBuffer 后,通过 submitFlushRequest 方法添加了一个刷盘请求,下面我们就来看下这个方法。


3. submitFlushRequest 提交刷盘请求

java">/**
 * 提交刷盘请求
 * @param result
 * @param messageExt
 * @return
 */
public CompletableFuture<PutMessageStatus> submitFlushRequest(AppendMessageResult result, MessageExt messageExt) {
    // 同步刷盘,默认是异步刷盘,也就是 FlushDiskType.ASYNC_FLUSH
    if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
        // 同步刷盘服务 GroupCommitService
        final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
        // 消息是否需要等待存储完成后才返回
        if (messageExt.isWaitStoreMsgOK()) {
            // 创建刷盘请求,这里偏移量 nextOffset = 消息写入位置 + 写入的消息长度,意思是刷完盘之后偏移量应该设置为 nextOffset
            GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes(),
                    // 同步刷盘的等待时间是 5s
                    this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
            // 添加请求到 flushDiskWatcher 中
            flushDiskWatcher.add(request);
            // 把请求添加到刷盘请求集合 requestsWrite 中,等待刷盘服务 GroupCommitService 处理
            service.putRequest(request);
            // 返回刷盘请求的 flushOKFuture,但是并没有填充结果
            return request.future();
        } else {
            // 如果不需要等待刷盘结果,那么唤醒刷盘服务就可以直接返回添加刷盘请求成功的状态码(PUT_OK)了
            service.wakeup();
            return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
        }
    }
    // 异步刷盘
    else {
        // 看看是否开启了堆外缓存
        if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
            // 如果没有启动堆外缓存,那么唤醒异步刷盘服务 FlushRealTimeService
            flushCommitLogService.wakeup();
        } else  {
            // 这里就是开启了堆外缓存,那么唤醒异步提交服务 CommitRealTimeService
            commitLogService.wakeup();
        }
        // 返回结果
        return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
    }
}

下面就以这个方法引出同步刷盘和异步刷盘,在解析之前先说下消息刷盘的标记,分别是同步刷盘(FlushDiskType.SYNC_FLUSH ),异步刷盘(FlushDiskType.ASYNC_FLUSH) ,这个方法逻辑如下:

  • 如果刷盘服务设定是同步刷盘,也就是 FlushDiskType.SYNC_FLUSH

    • 获取同步刷盘服务 GroupCommitService。
    • 接着判断消息的配置是否需要等待存储完成后才返回,如果需要存储完成才返回,那么会等待刷盘请求被处理之后才返回结果,当然不是在这里等待,后面会解释。
    • 如果不需要等待刷盘结果,那么唤醒同步刷盘线程,随后直接返回PUT_OK。
  • 如果刷盘服务设定是异步刷盘,也就是 FlushDiskType.ASYNC_FLUSH

    • 判断是否开启了堆外缓存,如果开启了,那么说明消息是写入的 writeBuffer,我们上一篇文章就已经解释过了,writeBuffer 不是使用 mmap 映射到 page cache 的,所以写入这里面的消息得先 commit 提交到 page cache,所以开启了堆外缓存那么会唤醒 CommitRealTimeService
    • 如果没有开启了堆外缓存,证明消息是写入了 mappedByteBuffer,这时候直接唤醒异步刷盘服务 FlushRealTimeService

好了,上面就是这个方法的全部逻辑,那么顺着上面的代码,我提出两个问题:

  1. FlushDiskWatcher 这个 Watcher 是干什么用的?
  2. 同步刷盘服务不需要等待刷盘结果的情况下,为什么不需要添加一个 request 到同步刷盘请求集合中?

其实上面第二个问题可以从同步刷盘服务 GroupCommitService 得到解答,当然我这里只是做一个总结,具体逻辑会在后续慢慢看,下面还是先看下第一个问题,也就是 FlushDiskWatcher 是干什么的?


4. FlushDiskWatcher 同步刷盘监视器

FlushDiskWatcher 是同步刷盘请求的监视器,还记得前面创建的刷盘请求呢?

java">// 创建刷盘请求,这里偏移量 nextOffset = 消息写入位置 + 写入的消息长度,意思是刷完盘之后偏移量应该设置为 nextOffset
GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes(),
        // 同步刷盘的等待时间是 5s
        this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());

/**
 * 同步刷盘请求
 */
public static class GroupCommitRequest {
    // 最新刷盘位置,意思就是这个刷盘请求处理完成后最新的刷盘位置 flushedWhere >= nextOffset
    private final long nextOffset;
    // 同步刷盘的阻塞等待结果
    private CompletableFuture<PutMessageStatus> flushOKFuture = new CompletableFuture<>();
    // 超时时间
    private final long deadLine;

    public GroupCommitRequest(long nextOffset, long timeoutMillis) {
        this.nextOffset = nextOffset;
        this.deadLine = System.nanoTime() + (timeoutMillis * 1_000_000);
    }

    public long getDeadLine() {
        return deadLine;
    }

    public long getNextOffset() {
        return nextOffset;
    }

    public void wakeupCustomer(final PutMessageStatus putMessageStatus) {
        this.flushOKFuture.complete(putMessageStatus);
    }

    public CompletableFuture<PutMessageStatus> future() {
        return flushOKFuture;
    }

}

这里设置的刷盘等待时间是 5 s,所以 deadLine = System.nanoTime() + (5 * 1_000_000)。那么下面来看下 FlushDiskWatcher 这个类里面的方法和属性。

java">// 同步刷盘的请求集合
private final LinkedBlockingQueue<GroupCommitRequest> commitRequests = new LinkedBlockingQueue<>();

这个就是同步刷盘的请求集合,FlushDiskWatcher 里面只有这一个属性参数。

java">public class FlushDiskWatcher extends ServiceThread {
    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
    // 同步刷盘的请求集合
    private final LinkedBlockingQueue<GroupCommitRequest> commitRequests = new LinkedBlockingQueue<>();

    @Override
    public String getServiceName() {
        return FlushDiskWatcher.class.getSimpleName();
    }

    @Override
    public void run() {
        ...
    }

    public void add(GroupCommitRequest request) {
        commitRequests.add(request);
    }

    public int queueSize() {
        return commitRequests.size();
    }
}

上面就是 FlushDiskWatcher 里面的方法,比较简单,核心逻辑还是在 run 方法中,下面就来看下 run 方法的逻辑。

首先就是一个 while 循环,FlushDiskWatcher 只要不 shutdown 都会一直循环,就算 while 循环里面出异常也不会退出。

java">// 如果服务没有停止
while (!isStopped()) {
	...
}

在 while 循环中,首先从 commitRequests 集合中阻塞获取刷盘请求,当消息同步刷盘的时候,就会把请求往 commitRequests 里面添加,这里就能获取到了。

java">try {
     // 从 commitRequests 集合中阻塞获取刷盘请求,当消息同步刷盘的时候,就会把请求往 commitRequests 里面添加,这里就能获取到了
     request = commitRequests.take();
 } catch (InterruptedException e) {
     // 中断异常,继续执行
     log.warn("take flush disk commit request, but interrupted, this may caused by shutdown");
     continue;
 }

到这里就是能获取到请求,那么就会阻塞等待这个请求到时间,下面又是一个 while 循环,这个 while 循环就是专门处理这个 request 的。

java">// 下面就是 while 循环判断这个 request 有没有返回结果了
while (!request.future().isDone()) {
    // 当前时间
    long now = System.nanoTime();
    // 看看是不是已经超时了
    if (now - request.getDeadLine() >= 0) {
        // 刷盘请求超时了,就是往请求里面写入 FLUSH_DISK_TIMEOUT 的结果
        request.wakeupCustomer(PutMessageStatus.FLUSH_DISK_TIMEOUT);
        // 跳出煦暖
        break;
    }
    // 这里的截至时间是 nacos,所以求睡眠时间是 / 1 000 000
    long sleepTime = (request.getDeadLine() - now) / 1_000_000;
    // 最小不能低于 10 ms
    sleepTime = Math.min(10, sleepTime);
    if (sleepTime == 0) {
        // 如果是 0,就是到点了,返回结果超时,因为下面就是 sleep,所以这里不超时下一次 while 循环也会超时
        request.wakeupCustomer(PutMessageStatus.FLUSH_DISK_TIMEOUT);
        break;
    }
    try {
        // 睡眠
        Thread.sleep(sleepTime);
    } catch (InterruptedException e) {
        // 睡眠被中断就跳出循环
        log.warn(
                "An exception occurred while waiting for flushing disk to complete. this may caused by shutdown");
        break;
    }
}

上面就是阻塞等待的核心逻辑了,里面的逻辑和 Timer 的有点像,但是吧看上面的逻辑,如果说里面同时添加了 10000 个 request 请求,那么第 10000 个请求就得遍历 10000 次才能判断到,这时候就有可能已经超时了。所以这里感觉使用 ScheduledExecutorService 来处理会好一点。

我倒是想不明白这里为什么不直接在同步处理请求的时候用当前时间和 deadline 作比较,如果超时直接返回,为什么还要单独写一个监视器呢?如果有知道的朋友可以评论下。

所以 FlushDiskWatcher 这个类的作用就是不断检测同步刷盘请求看看有没有超时,如果超时了就直接返回结果 FLUSH_DISK_TIMEOUT


5. 同步刷盘但是不需要等待刷盘结果

好了,上面第四小结就解释了第三小节给出的问题:FlushDiskWatcher 这个 Watcher 是干什么用的?,下面这一小节就来回答下最后一个问题:同步刷盘服务不需要等待刷盘结果的情况下,为什么不需要添加一个 request 到同步刷盘请求集合中?

我们重新看下为什么会提出这个问题,还是回到 submitFlushRequest 方法,那么我们来截下图:
在这里插入图片描述
刷盘请求是同步刷盘但是不需要返回结果的时候,这里只是唤醒同步刷盘服务 GroupCommitService,然后直接返回 PUT_OK。上两篇文章就解释过了,这里消息提交只是提交到 writeBuffer 或者 mappedByteBuffer,还没有刷盘的。

要解答这个问题,就得先去 GroupCommitService 里面看下刷盘逻辑,里面刷盘的核心逻辑在 doCommit,不过这篇文章我先不详细解释如何刷盘的,主要是整体看下里面的逻辑。

java">/**
 * 同步刷盘
 */
private void doCommit() {
	// 从读队列中获取刷盘请求
    if (!this.requestsRead.isEmpty()) {
		...
	} else {
		// 有些消息是同步刷盘不等待,就不需要走上面的流程去读取 requestsRead 处理刷盘请求,这类的也不会往 requestsWrite 里面设置刷盘请求
        CommitLog.this.mappedFileQueue.flush(0);
	}
}

上面同步刷盘服务不需要等待刷盘结果的情况下,请求不会添加到 requestsRead 里面,所以就会走 else 的逻辑,在里面直接进行刷盘,这个方法我们后面也会解析。

但是又有问题了,我们知道如果开启了堆外缓存,消息先写入 writeBuffer,要知道这部分数据是不能直接通过 fileChannel 刷盘的,看刷盘逻辑,就是 MappedFile#flush 方法,就看这里:
在这里插入图片描述
finalChannel 初始化的时候是通过 mmap 映射出 mappedByteBuffer 的。在这里插入图片描述
要知道这个 finalChannel 是当前用户空间到 Page Cache 中的区域的通道,跟 writeBuffer 这个堆外缓存可没什么关系,所以上面的 MappedFile#flush 方法在通过 fileChannel.force 的时候其实是把 Page Cache 里面的数据给刷盘,但是此时 GroupCommitService 通过 flush 刷盘的时候,writeBuffer 里面的消息可还没有提交到 Page Cache,所以这部分数据是怎么刷盘的呢?

这里后面说到 GroupCommitService 源码的时候会解析,这里就先不说了。


6. 小结

好了,本文介绍了刷盘里面同步刷盘和异步刷盘,同时也解析了添加同步刷盘请求的逻辑,下一篇文章就开始介绍这几种刷盘服务。





如有错误,欢迎指出!!!


http://www.niftyadmin.cn/n/5845596.html

相关文章

React 生命周期函数详解

React 组件在其生命周期中有多个阶段&#xff0c;每个阶段都有特定的生命周期函数&#xff08;Lifecycle Methods&#xff09;。这些函数允许你在组件的不同阶段执行特定的操作。以下是 React 组件生命周期的主要阶段及其对应的生命周期函数&#xff0c;并结合了 React 16.3 的…

【ROS2】【2025】Simulate a 6DoF Robotic Arm in Gazebo and ROS2

在本教程中&#xff0c;将学习如何从头开始模拟机械臂。我们将使用 Doosan Robotics 的 6DoF 机械臂。Gazebo 和 ROS2 是执行此模拟的软件。所有代码、URDF 和配置文件都可以在我的 gitee 存储库中找到和下载。 https://gitee.com/kong-yue1/robotic_arm_environment.githttps…

FreeRTOS的事件组

1 创建事件组 xEventGroupCreate EventGroupHandle_t xEventGroupCreate( void ) { EventGroup_t *pxEventBits;/* 分配事件组内存。*/pxEventBits ( EventGroup_t * ) pvPortMalloc( sizeof( EventGroup_t ) );if( pxEventBits ! NULL ){pxEventBits->uxEventBits 0; …

uniapp中使用uCharts折线图X轴数据间隔显示

1、先看官网 https://www.ucharts.cn/ 2、设置代码 "xAxisDemo3":function(val, index, opts){if(index % 2 0){return val}else {return }}, 再在数据中引入设置好样式

Django开发入门 – 1.搭建基于Python Web框架Django的IDE开发环境

Django开发入门 – 1.搭建基于Python Web框架Django的IDE开发环境 Build A Integrated Development Environment(IDE) for Python Web Framework - django By JacksonML 1. 获取及安装最新版Python 打开Chrome浏览器&#xff0c;访问Python官网链接&#xff1a;https://www…

Windows下AMD显卡在本地运行大语言模型(deepseek-r1)

Windows下AMD显卡在本地运行大语言模型 本人电脑配置第一步先在官网确认自己的 AMD 显卡是否支持 ROCm下载Ollama安装程序模型下载位置更改下载 ROCmLibs先确认自己显卡的gfx型号下载解压 替换替换rocblas.dll替换library文件夹下的所有 重启Ollama下载模型运行效果 本人电脑配…

《Java核心技术 卷II》本地化的数字格式

数字格式 数字和货币的格式高度依赖locale。 格式化对象的集合&#xff0c;可以对java.text包中的数字进行格式化和解析。 格式化数字值 对特定locale的数字进行格式化的步骤&#xff1a; 得到Locale对象使用工厂方法得到一个格式器对象。使用这个格式器对象来完成格式化解析工…

AI知识库和全文检索的区别

1、AI知识库的作用 AI知识库是基于人工智能技术构建的智能系统&#xff0c;能够理解、推理和生成信息。它的核心作用包括&#xff1a; 1.1 语义理解 自然语言处理&#xff08;NLP&#xff09;&#xff1a;AI知识库能够理解用户查询的语义&#xff0c;而不仅仅是关键词匹配。 …