本文章基于 RocketMQ 4.9.3
1. 前言
RocketMQ 存储部分系列文章:
- 【RocketMQ 存储】- RocketMQ存储类 MappedFile
- 【RocketMQ 存储】- 一文总结 RocketMQ 的存储结构-基础
- 【RocketMQ 存储】- broker 端存储单条消息的逻辑
- 【RocketMQ 存储】- broker 端存储批量消息的逻辑
上一篇文章中,我们解析了 RocketMQ 是如何存储单条消息和批量消息的,但是消息也只是存储到了 CommitLog#MappedFile#ByteBuffer,也就是内容还是没有刷盘,那么这篇文章就来介绍下刷盘的逻辑。
2. 概述
RocketMQ 的刷盘有两种策略,同步刷盘
和 异步刷盘
。
- 同步刷盘:只有在消息真正持久化至磁盘后 RocketMQ 的 Broker 端才会真正返回给 Producer 端一个成功的 ACK 响应。同步刷盘对 MQ 消息可靠性来说是一种不错的保障,但是性能上会有较大影响,一般适用于金融业务应用该模式较多。
- 异步刷盘:能够充分利用 OS 的PageCache的优势,只要消息写入PageCache即可将成功的ACK返回给Producer端。消息刷盘采用后台异步线程提交的方式进行,降低了读写延迟,提高了MQ的性能和吞吐量。
消息写入 CommitLog 的 ByteBuffer 后,通过 submitFlushRequest
方法添加了一个刷盘请求,下面我们就来看下这个方法。
3. submitFlushRequest 提交刷盘请求
java">/**
* 提交刷盘请求
* @param result
* @param messageExt
* @return
*/
public CompletableFuture<PutMessageStatus> submitFlushRequest(AppendMessageResult result, MessageExt messageExt) {
// 同步刷盘,默认是异步刷盘,也就是 FlushDiskType.ASYNC_FLUSH
if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
// 同步刷盘服务 GroupCommitService
final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
// 消息是否需要等待存储完成后才返回
if (messageExt.isWaitStoreMsgOK()) {
// 创建刷盘请求,这里偏移量 nextOffset = 消息写入位置 + 写入的消息长度,意思是刷完盘之后偏移量应该设置为 nextOffset
GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes(),
// 同步刷盘的等待时间是 5s
this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
// 添加请求到 flushDiskWatcher 中
flushDiskWatcher.add(request);
// 把请求添加到刷盘请求集合 requestsWrite 中,等待刷盘服务 GroupCommitService 处理
service.putRequest(request);
// 返回刷盘请求的 flushOKFuture,但是并没有填充结果
return request.future();
} else {
// 如果不需要等待刷盘结果,那么唤醒刷盘服务就可以直接返回添加刷盘请求成功的状态码(PUT_OK)了
service.wakeup();
return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
}
}
// 异步刷盘
else {
// 看看是否开启了堆外缓存
if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
// 如果没有启动堆外缓存,那么唤醒异步刷盘服务 FlushRealTimeService
flushCommitLogService.wakeup();
} else {
// 这里就是开启了堆外缓存,那么唤醒异步提交服务 CommitRealTimeService
commitLogService.wakeup();
}
// 返回结果
return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
}
}
下面就以这个方法引出同步刷盘和异步刷盘,在解析之前先说下消息刷盘的标记,分别是同步刷盘(FlushDiskType.SYNC_FLUSH ),异步刷盘(FlushDiskType.ASYNC_FLUSH) ,这个方法逻辑如下:
-
如果刷盘服务设定是异步刷盘,也就是 FlushDiskType.ASYNC_FLUSH
- 判断是否开启了堆外缓存,如果开启了,那么说明消息是写入的 writeBuffer,我们上一篇文章就已经解释过了,writeBuffer 不是使用 mmap 映射到 page cache 的,所以写入这里面的消息得先 commit 提交到 page cache,所以开启了堆外缓存那么会唤醒
CommitRealTimeService
。 - 如果没有开启了堆外缓存,证明消息是写入了 mappedByteBuffer,这时候直接唤醒异步刷盘服务
FlushRealTimeService
。
- 判断是否开启了堆外缓存,如果开启了,那么说明消息是写入的 writeBuffer,我们上一篇文章就已经解释过了,writeBuffer 不是使用 mmap 映射到 page cache 的,所以写入这里面的消息得先 commit 提交到 page cache,所以开启了堆外缓存那么会唤醒
好了,上面就是这个方法的全部逻辑,那么顺着上面的代码,我提出两个问题:
其实上面第二个问题可以从同步刷盘服务 GroupCommitService
得到解答,当然我这里只是做一个总结,具体逻辑会在后续慢慢看,下面还是先看下第一个问题,也就是 FlushDiskWatcher
是干什么的?
4. FlushDiskWatcher 同步刷盘监视器
FlushDiskWatcher 是同步刷盘请求的监视器,还记得前面创建的刷盘请求呢?
java">// 创建刷盘请求,这里偏移量 nextOffset = 消息写入位置 + 写入的消息长度,意思是刷完盘之后偏移量应该设置为 nextOffset
GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes(),
// 同步刷盘的等待时间是 5s
this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
/**
* 同步刷盘请求
*/
public static class GroupCommitRequest {
// 最新刷盘位置,意思就是这个刷盘请求处理完成后最新的刷盘位置 flushedWhere >= nextOffset
private final long nextOffset;
// 同步刷盘的阻塞等待结果
private CompletableFuture<PutMessageStatus> flushOKFuture = new CompletableFuture<>();
// 超时时间
private final long deadLine;
public GroupCommitRequest(long nextOffset, long timeoutMillis) {
this.nextOffset = nextOffset;
this.deadLine = System.nanoTime() + (timeoutMillis * 1_000_000);
}
public long getDeadLine() {
return deadLine;
}
public long getNextOffset() {
return nextOffset;
}
public void wakeupCustomer(final PutMessageStatus putMessageStatus) {
this.flushOKFuture.complete(putMessageStatus);
}
public CompletableFuture<PutMessageStatus> future() {
return flushOKFuture;
}
}
这里设置的刷盘等待时间是 5 s,所以 deadLine = System.nanoTime() + (5 * 1_000_000)
。那么下面来看下 FlushDiskWatcher 这个类里面的方法和属性。
java">// 同步刷盘的请求集合
private final LinkedBlockingQueue<GroupCommitRequest> commitRequests = new LinkedBlockingQueue<>();
这个就是同步刷盘的请求集合,FlushDiskWatcher 里面只有这一个属性参数。
java">public class FlushDiskWatcher extends ServiceThread {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
// 同步刷盘的请求集合
private final LinkedBlockingQueue<GroupCommitRequest> commitRequests = new LinkedBlockingQueue<>();
@Override
public String getServiceName() {
return FlushDiskWatcher.class.getSimpleName();
}
@Override
public void run() {
...
}
public void add(GroupCommitRequest request) {
commitRequests.add(request);
}
public int queueSize() {
return commitRequests.size();
}
}
上面就是 FlushDiskWatcher 里面的方法,比较简单,核心逻辑还是在 run 方法中,下面就来看下 run 方法的逻辑。
首先就是一个 while 循环,FlushDiskWatcher 只要不 shutdown 都会一直循环,就算 while 循环里面出异常也不会退出。
java">// 如果服务没有停止
while (!isStopped()) {
...
}
在 while 循环中,首先从 commitRequests
集合中阻塞获取刷盘请求,当消息同步刷盘的时候,就会把请求往 commitRequests
里面添加,这里就能获取到了。
java">try {
// 从 commitRequests 集合中阻塞获取刷盘请求,当消息同步刷盘的时候,就会把请求往 commitRequests 里面添加,这里就能获取到了
request = commitRequests.take();
} catch (InterruptedException e) {
// 中断异常,继续执行
log.warn("take flush disk commit request, but interrupted, this may caused by shutdown");
continue;
}
到这里就是能获取到请求,那么就会阻塞等待这个请求到时间,下面又是一个 while 循环,这个 while 循环就是专门处理这个 request 的。
java">// 下面就是 while 循环判断这个 request 有没有返回结果了
while (!request.future().isDone()) {
// 当前时间
long now = System.nanoTime();
// 看看是不是已经超时了
if (now - request.getDeadLine() >= 0) {
// 刷盘请求超时了,就是往请求里面写入 FLUSH_DISK_TIMEOUT 的结果
request.wakeupCustomer(PutMessageStatus.FLUSH_DISK_TIMEOUT);
// 跳出煦暖
break;
}
// 这里的截至时间是 nacos,所以求睡眠时间是 / 1 000 000
long sleepTime = (request.getDeadLine() - now) / 1_000_000;
// 最小不能低于 10 ms
sleepTime = Math.min(10, sleepTime);
if (sleepTime == 0) {
// 如果是 0,就是到点了,返回结果超时,因为下面就是 sleep,所以这里不超时下一次 while 循环也会超时
request.wakeupCustomer(PutMessageStatus.FLUSH_DISK_TIMEOUT);
break;
}
try {
// 睡眠
Thread.sleep(sleepTime);
} catch (InterruptedException e) {
// 睡眠被中断就跳出循环
log.warn(
"An exception occurred while waiting for flushing disk to complete. this may caused by shutdown");
break;
}
}
上面就是阻塞等待的核心逻辑了,里面的逻辑和 Timer 的有点像,但是吧看上面的逻辑,如果说里面同时添加了 10000 个 request 请求,那么第 10000 个请求就得遍历 10000 次才能判断到,这时候就有可能已经超时了。所以这里感觉使用 ScheduledExecutorService
来处理会好一点。
我倒是想不明白这里为什么不直接在同步处理请求的时候用当前时间和 deadline
作比较,如果超时直接返回,为什么还要单独写一个监视器呢?如果有知道的朋友可以评论下。
所以 FlushDiskWatcher
这个类的作用就是不断检测同步刷盘请求看看有没有超时,如果超时了就直接返回结果 FLUSH_DISK_TIMEOUT
。
5. 同步刷盘但是不需要等待刷盘结果
好了,上面第四小结就解释了第三小节给出的问题:FlushDiskWatcher
这个 Watcher 是干什么用的?,下面这一小节就来回答下最后一个问题:同步刷盘服务不需要等待刷盘结果的情况下,为什么不需要添加一个 request 到同步刷盘请求集合中?
。
我们重新看下为什么会提出这个问题,还是回到 submitFlushRequest
方法,那么我们来截下图:
当刷盘请求是同步刷盘但是不需要返回结果的时候,这里只是唤醒同步刷盘服务 GroupCommitService
,然后直接返回 PUT_OK
。上两篇文章就解释过了,这里消息提交只是提交到 writeBuffer 或者 mappedByteBuffer,还没有刷盘的。
要解答这个问题,就得先去 GroupCommitService
里面看下刷盘逻辑,里面刷盘的核心逻辑在 doCommit,不过这篇文章我先不详细解释如何刷盘的,主要是整体看下里面的逻辑。
java">/**
* 同步刷盘
*/
private void doCommit() {
// 从读队列中获取刷盘请求
if (!this.requestsRead.isEmpty()) {
...
} else {
// 有些消息是同步刷盘不等待,就不需要走上面的流程去读取 requestsRead 处理刷盘请求,这类的也不会往 requestsWrite 里面设置刷盘请求
CommitLog.this.mappedFileQueue.flush(0);
}
}
上面同步刷盘服务不需要等待刷盘结果的情况下,请求不会添加到 requestsRead
里面,所以就会走 else 的逻辑,在里面直接进行刷盘,这个方法我们后面也会解析。
但是又有问题了,我们知道如果开启了堆外缓存,消息先写入 writeBuffer,要知道这部分数据是不能直接通过 fileChannel 刷盘的,看刷盘逻辑,就是 MappedFile#flush 方法,就看这里:
finalChannel 初始化的时候是通过 mmap 映射出 mappedByteBuffer 的。
要知道这个 finalChannel 是当前用户空间到 Page Cache 中的区域的通道,跟 writeBuffer 这个堆外缓存可没什么关系,所以上面的 MappedFile#flush
方法在通过 fileChannel.force 的时候其实是把 Page Cache 里面的数据给刷盘,但是此时 GroupCommitService
通过 flush 刷盘的时候,writeBuffer
里面的消息可还没有提交到 Page Cache,所以这部分数据是怎么刷盘的呢?
这里后面说到 GroupCommitService
源码的时候会解析,这里就先不说了。
6. 小结
好了,本文介绍了刷盘里面同步刷盘和异步刷盘,同时也解析了添加同步刷盘请求的逻辑,下一篇文章就开始介绍这几种刷盘服务。
如有错误,欢迎指出!!!