This is an automated email from the ASF dual-hosted git repository.
lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new d60198f621 [ISSUE #8239] Fix the issue of potential message loss after
a crash under synchronous disk flushing configuration. (#8240)
d60198f621 is described below
commit d60198f621baac54bffb981d7a797a04dfa0bb5a
Author: rongtong <[email protected]>
AuthorDate: Fri Jun 7 13:44:13 2024 +0800
[ISSUE #8239] Fix the issue of potential message loss after a crash under
synchronous disk flushing configuration. (#8240)
---
.../java/org/apache/rocketmq/store/CommitLog.java | 2 +-
.../apache/rocketmq/store/DefaultMessageStore.java | 12 +-
.../rocketmq/store/config/MessageStoreConfig.java | 15 ++-
.../rocketmq/store/ReputMessageServiceTest.java | 148 +++++++++++++++++++++
4 files changed, 171 insertions(+), 6 deletions(-)
diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
index 1174eca1ba..c2150d7a32 100644
--- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
@@ -651,7 +651,7 @@ public class CommitLog implements Swappable {
} else if
(this.defaultMessageStore.getMessageStoreConfig().isDuplicationEnable()) {
return this.confirmOffset;
} else {
- return getMaxOffset();
+ return this.defaultMessageStore.isSyncDiskFlush() ?
getFlushedWhere() : getMaxOffset();
}
}
diff --git
a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index 97833351d1..a901e850ed 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -2814,7 +2814,11 @@ public class DefaultMessageStore implements MessageStore
{
}
public boolean isCommitLogAvailable() {
- return this.reputFromOffset <
DefaultMessageStore.this.getConfirmOffset();
+ return this.reputFromOffset < getReputEndOffset();
+ }
+
+ protected long getReputEndOffset() {
+ return
DefaultMessageStore.this.getMessageStoreConfig().isReadUnCommitted() ?
DefaultMessageStore.this.commitLog.getMaxOffset() :
DefaultMessageStore.this.commitLog.getConfirmOffset();
}
public void doReput() {
@@ -2834,12 +2838,12 @@ public class DefaultMessageStore implements
MessageStore {
try {
this.reputFromOffset = result.getStartOffset();
- for (int readSize = 0; readSize < result.getSize() &&
reputFromOffset < DefaultMessageStore.this.getConfirmOffset() && doNext; ) {
+ for (int readSize = 0; readSize < result.getSize() &&
reputFromOffset < getReputEndOffset() && doNext; ) {
DispatchRequest dispatchRequest =
DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(),
false, false, false);
int size = dispatchRequest.getBufferSize() == -1 ?
dispatchRequest.getMsgSize() : dispatchRequest.getBufferSize();
- if (reputFromOffset + size >
DefaultMessageStore.this.getConfirmOffset()) {
+ if (reputFromOffset + size > getReputEndOffset()) {
doNext = false;
break;
}
@@ -3127,7 +3131,7 @@ public class DefaultMessageStore implements MessageStore {
try {
this.reputFromOffset = result.getStartOffset();
- for (int readSize = 0; readSize < result.getSize() &&
reputFromOffset < DefaultMessageStore.this.getConfirmOffset() && doNext; ) {
+ for (int readSize = 0; readSize < result.getSize() &&
reputFromOffset < getReputEndOffset() && doNext; ) {
ByteBuffer byteBuffer = result.getByteBuffer();
int totalSize =
preCheckMessageAndReturnSize(byteBuffer);
diff --git
a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
index 9afc02a0c9..0060b144cf 100644
---
a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
+++
b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
@@ -413,6 +413,12 @@ public class MessageStoreConfig {
private int topicQueueLockNum = 32;
+ /**
+ * If readUnCommitted is true, the dispatch of the consume queue will
exceed the confirmOffset, which may cause the client to read uncommitted
messages.
+ * For example, reput offset exceeding the flush offset during synchronous
disk flushing.
+ */
+ private boolean readUnCommitted = false;
+
public boolean isEnabledAppendPropCRC() {
return enabledAppendPropCRC;
}
@@ -672,7 +678,6 @@ public class MessageStoreConfig {
this.forceVerifyPropCRC = forceVerifyPropCRC;
}
-
public String getStorePathCommitLog() {
if (storePathCommitLog == null) {
return storePathRootDir + File.separator + "commitlog";
@@ -1819,4 +1824,12 @@ public class MessageStoreConfig {
public void setTopicQueueLockNum(int topicQueueLockNum) {
this.topicQueueLockNum = topicQueueLockNum;
}
+
+ public boolean isReadUnCommitted() {
+ return readUnCommitted;
+ }
+
+ public void setReadUnCommitted(boolean readUnCommitted) {
+ this.readUnCommitted = readUnCommitted;
+ }
}
diff --git
a/store/src/test/java/org/apache/rocketmq/store/ReputMessageServiceTest.java
b/store/src/test/java/org/apache/rocketmq/store/ReputMessageServiceTest.java
new file mode 100644
index 0000000000..d1ce095333
--- /dev/null
+++ b/store/src/test/java/org/apache/rocketmq/store/ReputMessageServiceTest.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.store;
+
+import java.lang.reflect.Field;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.UUID;
+import java.io.File;
+import java.util.concurrent.CompletableFuture;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.message.MessageDecoder;
+
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageExtBrokerInner;
+import org.apache.rocketmq.store.config.FlushDiskType;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.apache.rocketmq.store.stats.BrokerStatsManager;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.mockito.ArgumentMatchers.any;
+
+public class ReputMessageServiceTest {
+ private DefaultMessageStore syncFlushMessageStore;
+ private DefaultMessageStore asyncFlushMessageStore;
+ private final String topic = "FooBar";
+ private final String tmpdir = System.getProperty("java.io.tmpdir");
+ private final String storePathRootParentDir =
(StringUtils.endsWith(tmpdir, File.separator) ? tmpdir : tmpdir +
File.separator) + UUID.randomUUID();
+ private SocketAddress bornHost;
+ private SocketAddress storeHost;
+
+ @Before
+ public void init() throws Exception {
+ File file = new File(storePathRootParentDir);
+ UtilAll.deleteFile(file);
+ syncFlushMessageStore = buildMessageStore(FlushDiskType.SYNC_FLUSH);
+ asyncFlushMessageStore = buildMessageStore(FlushDiskType.ASYNC_FLUSH);
+ assertTrue(syncFlushMessageStore.load());
+ assertTrue(asyncFlushMessageStore.load());
+ syncFlushMessageStore.start();
+ asyncFlushMessageStore.start();
+ storeHost = new InetSocketAddress(InetAddress.getLocalHost(), 8123);
+ bornHost = new InetSocketAddress(InetAddress.getByName("127.0.0.1"),
0);
+ }
+
+ private DefaultMessageStore buildMessageStore(FlushDiskType flushDiskType)
throws Exception {
+ MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
+ messageStoreConfig.setHaListenPort(0);
+ messageStoreConfig.setFlushDiskType(flushDiskType);
+ messageStoreConfig.setStorePathRootDir(storePathRootParentDir +
File.separator + flushDiskType);
+ messageStoreConfig.setStorePathCommitLog(storePathRootParentDir +
File.separator + flushDiskType + File.separator + "commitlog");
+ BrokerConfig brokerConfig = new BrokerConfig();
+ brokerConfig.setLongPollingEnable(false);
+ DefaultMessageStore messageStore = new
DefaultMessageStore(messageStoreConfig, mock(BrokerStatsManager.class), null,
brokerConfig, null);
+ // Mock flush disk service
+ Field field = CommitLog.class.getDeclaredField("flushManager");
+ field.setAccessible(true);
+ FlushManager flushManager = mock(FlushManager.class);
+ CompletableFuture<PutMessageStatus> completableFuture = new
CompletableFuture<>();
+ completableFuture.complete(PutMessageStatus.PUT_OK);
+ when(flushManager.handleDiskFlush(any(AppendMessageResult.class),
any(MessageExt.class))).thenReturn(completableFuture);
+ field.set(messageStore.getCommitLog(), flushManager);
+ return messageStore;
+ }
+
+ @Test
+ public void testReputEndOffset_whenSyncFlush() throws Exception {
+ for (int i = 0; i < 10; i++) {
+ assertEquals(PutMessageStatus.PUT_OK,
syncFlushMessageStore.putMessage(buildMessage()).getPutMessageStatus());
+ }
+ assertEquals(1580, syncFlushMessageStore.getMaxPhyOffset());
+ assertEquals(0,
syncFlushMessageStore.getCommitLog().getFlushedWhere());
+ // wait for cq dispatch
+ Thread.sleep(3000);
+ assertEquals(0,
syncFlushMessageStore.getCommitLog().getFlushedWhere());
+ assertEquals(0, syncFlushMessageStore.getMaxOffsetInQueue(topic, 0));
+ GetMessageResult getMessageResult =
syncFlushMessageStore.getMessage("testGroup", topic, 0, 0, 32, null);
+ assertEquals(GetMessageStatus.NO_MESSAGE_IN_QUEUE,
getMessageResult.getStatus());
+ }
+
+ @Test
+ public void testReputEndOffset_whenAsyncFlush() throws Exception {
+ for (int i = 0; i < 10; i++) {
+ assertEquals(PutMessageStatus.PUT_OK,
asyncFlushMessageStore.putMessage(buildMessage()).getPutMessageStatus());
+ }
+ assertEquals(1580, asyncFlushMessageStore.getMaxPhyOffset());
+ assertEquals(0,
asyncFlushMessageStore.getCommitLog().getFlushedWhere());
+ // wait for cq dispatch
+ Thread.sleep(3000);
+ assertEquals(0,
asyncFlushMessageStore.getCommitLog().getFlushedWhere());
+ assertEquals(10, asyncFlushMessageStore.getMaxOffsetInQueue(topic, 0));
+ GetMessageResult getMessageResult =
asyncFlushMessageStore.getMessage("testGroup", topic, 0, 0, 32, null);
+ assertEquals(10, getMessageResult.getMessageCount());
+ }
+
+ private MessageExtBrokerInner buildMessage() {
+ MessageExtBrokerInner msg = new MessageExtBrokerInner();
+ msg.setTopic(topic);
+ msg.setTags("TAG1");
+ msg.setBody("Once, there was a chance for me!".getBytes());
+ msg.setKeys(String.valueOf(System.currentTimeMillis()));
+ msg.setQueueId(0);
+ msg.setSysFlag(0);
+ msg.setBornTimestamp(System.currentTimeMillis());
+ msg.setStoreHost(storeHost);
+ msg.setBornHost(bornHost);
+
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
+ return msg;
+ }
+
+ @After
+ public void destroy() throws Exception {
+ if (this.syncFlushMessageStore != null) {
+ syncFlushMessageStore.shutdown();
+ syncFlushMessageStore.destroy();
+ }
+ if (this.asyncFlushMessageStore != null) {
+ asyncFlushMessageStore.shutdown();
+ asyncFlushMessageStore.destroy();
+ }
+ File file = new File(storePathRootParentDir);
+ UtilAll.deleteFile(file);
+ }
+}