This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new c66e758a1c [ISSUE #9379] Fix timeStoreTable delete logic in
IndexService (#9384)
c66e758a1c is described below
commit c66e758a1cb35f6af9f81c0b919aa156a44fc3df
Author: lizhimins <[email protected]>
AuthorDate: Tue May 6 15:31:03 2025 +0800
[ISSUE #9379] Fix timeStoreTable delete logic in IndexService (#9384)
* [ISSUE #9379] Fix timeStoreTable delete logic in IndexService
* [ISSUE #9379] Fix delete logic from TimeStoreTable in IndexService
---
.../rocketmq/tieredstore/file/FlatAppendFile.java | 2 +-
.../tieredstore/index/IndexStoreService.java | 21 ++++++++-----
.../tieredstore/index/IndexStoreServiceTest.java | 36 ++++++++++++++++++++++
3 files changed, 50 insertions(+), 9 deletions(-)
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatAppendFile.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatAppendFile.java
index 377341d950..38e451d3ff 100644
---
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatAppendFile.java
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatAppendFile.java
@@ -253,7 +253,7 @@ public class FlatAppendFile {
FileSegment fileSegment = fileSegmentTable.get(0);
if (fileSegment.getMaxTimestamp() != Long.MAX_VALUE &&
- fileSegment.getMaxTimestamp() > expireTimestamp) {
+ fileSegment.getMaxTimestamp() >= expireTimestamp) {
log.debug("FileSegment has not expired, filePath={},
fileType={}, " +
"offset={}, expireTimestamp={}, maxTimestamp={}",
filePath, fileType,
fileSegment.getBaseOffset(), expireTimestamp,
fileSegment.getMaxTimestamp());
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java
index 7fe645da0f..f4f602a105 100644
---
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java
@@ -254,12 +254,12 @@ public class IndexStoreService extends ServiceThread
implements IndexService {
.whenComplete((v, t) -> {
// Try to return the query results as much as possible here
// rather than directly throwing exceptions
- if (result.isEmpty() && t != null) {
- future.completeExceptionally(t);
- } else {
- List<IndexItem> resultList = new
ArrayList<>(result.values());
- future.complete(resultList.subList(0,
Math.min(resultList.size(), maxCount)));
+ if (t != null) {
+ log.error("IndexStoreService#queryAsync, topicId={},
key={}, maxCount={}, timestamp={}-{}",
+ topic, key, maxCount, beginTime, endTime, t);
}
+ List<IndexItem> resultList = new
ArrayList<>(result.values());
+ future.complete(resultList.subList(0,
Math.min(resultList.size(), maxCount)));
});
} catch (Exception e) {
log.error("IndexStoreService#queryAsync, topicId={}, key={},
maxCount={}, timestamp={}-{}",
@@ -344,10 +344,15 @@ public class IndexStoreService extends ServiceThread
implements IndexService {
// delete file in time store table
readWriteLock.writeLock().lock();
try {
- timeStoreTable.entrySet().removeIf(entry ->
- entry.getKey() < expireTimestamp &&
-
IndexFile.IndexStatusEnum.UPLOAD.equals(entry.getValue().getFileStatus()));
flatAppendFile.destroyExpiredFile(expireTimestamp);
+ timeStoreTable.entrySet().removeIf(entry ->
+
IndexFile.IndexStatusEnum.UPLOAD.equals(entry.getValue().getFileStatus()) &&
+ entry.getKey() < flatAppendFile.getMinTimestamp());
+ int tableSize = (int) timeStoreTable.entrySet().stream()
+ .filter(entry ->
IndexFile.IndexStatusEnum.UPLOAD.equals(entry.getValue().getFileStatus()))
+ .count();
+ log.info("IndexStoreService delete file, timestamp={}, remote={},
table={}, all={}",
+ expireTimestamp, flatAppendFile.getFileSegmentList().size(),
tableSize, timeStoreTable.size());
} finally {
readWriteLock.writeLock().unlock();
}
diff --git
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexStoreServiceTest.java
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexStoreServiceTest.java
index 83b407e73b..7b881ddd44 100644
---
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexStoreServiceTest.java
+++
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexStoreServiceTest.java
@@ -33,15 +33,18 @@ import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.store.logfile.DefaultMappedFile;
import org.apache.rocketmq.tieredstore.MessageStoreConfig;
import org.apache.rocketmq.tieredstore.common.AppendResult;
+import org.apache.rocketmq.tieredstore.file.FlatAppendFile;
import org.apache.rocketmq.tieredstore.file.FlatFileFactory;
import org.apache.rocketmq.tieredstore.metadata.DefaultMetadataStore;
import org.apache.rocketmq.tieredstore.metadata.MetadataStore;
import org.apache.rocketmq.tieredstore.util.MessageStoreUtil;
import org.apache.rocketmq.tieredstore.util.MessageStoreUtilTest;
+import org.awaitility.Awaitility;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -206,6 +209,39 @@ public class IndexStoreServiceTest {
});
}
+ @Test
+ public void deleteFileTest() throws InterruptedException,
IllegalAccessException {
+ indexService = new IndexStoreService(fileAllocator, filePath);
+ indexService.start();
+
+ for (int i = 0; i < 2 * 20; i++) {
+ AppendResult result = indexService.putKey(
+ TOPIC_NAME, TOPIC_ID, QUEUE_ID,
Collections.singleton(String.valueOf(i)),
+ i * 100L, MESSAGE_SIZE, System.currentTimeMillis());
+ Assert.assertEquals(AppendResult.SUCCESS, result);
+ TimeUnit.MILLISECONDS.sleep(1);
+ }
+
+ indexService.wakeup();
+ Awaitility.await().until(() -> {
+ int tableSize = (int)
indexService.getTimeStoreTable().entrySet().stream()
+ .filter(entry ->
IndexFile.IndexStatusEnum.UPLOAD.equals(entry.getValue().getFileStatus()))
+ .count();
+ return tableSize == 2;
+ });
+
+ long timestamp =
indexService.getTimeStoreTable().firstEntry().getValue().getEndTimestamp();
+ FlatAppendFile flatAppendFile = (FlatAppendFile)
+ FieldUtils.readField(indexService, "flatAppendFile", true);
+
+ indexService.destroyExpiredFile(timestamp);
+ Assert.assertEquals(2, flatAppendFile.getFileSegmentList().size());
+ Assert.assertEquals(3, indexService.getTimeStoreTable().size());
+ indexService.destroyExpiredFile(timestamp + 1);
+ Assert.assertEquals(1, flatAppendFile.getFileSegmentList().size());
+ Assert.assertEquals(2, indexService.getTimeStoreTable().size());
+ }
+
@Test
public void restartServiceTest() throws InterruptedException {
indexService = new IndexStoreService(fileAllocator, filePath);