This is an automated email from the ASF dual-hosted git repository.
lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new d8fa28c472 [ISSUE #9544] Must correct file size when init the file
segment (#9545)
d8fa28c472 is described below
commit d8fa28c47238b2a37df4243cf53cf460b1a94565
Author: lizhimins <[email protected]>
AuthorDate: Mon Jul 14 14:08:35 2025 +0800
[ISSUE #9544] Must correct file size when init the file segment (#9545)
---
.../rocketmq/tieredstore/file/FlatAppendFile.java | 37 ++++++++++++++++++----
1 file changed, 31 insertions(+), 6 deletions(-)
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatAppendFile.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatAppendFile.java
index 38e451d3ff..a7586e0b9f 100644
---
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatAppendFile.java
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatAppendFile.java
@@ -21,6 +21,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import org.apache.rocketmq.tieredstore.common.AppendResult;
@@ -70,16 +71,40 @@ public class FlatAppendFile {
this.fileSegmentTable.addAll(fileSegmentList.stream().sorted().collect(Collectors.toList()));
}
+ /**
+ * Retrieves the correct file size when initializing the file segment.
+ *
+ * @param fileSegment The file segment to get the size for.
+ * @return The correct length if the remote file exists,
+ * 0 if it does not exist,
+ * or -1 if the RPC fails.
+ * @see <a href="https://github.com/apache/rocketmq/issues/9544">Related
GitHub Issue</a>
+ */
+ public long getFileCorrectSize(FileSegment fileSegment) {
+ while (true) {
+ long fileSize = fileSegment.getSize();
+ if (fileSize != GET_FILE_SIZE_ERROR) {
+ log.debug("FlatAppendFile get file correct size, filePath={}
fileType={}, fileSize={}",
+ fileSegment.getPath(), fileSegment.getFileType(),
fileSize);
+ return fileSize;
+ } else {
+ log.warn("FlatAppendFile get file correct size error,
filePath={}, fileType={}",
+ fileSegment.getPath(), fileSegment.getFileType());
+ try {
+ TimeUnit.MILLISECONDS.sleep(50);
+ } catch (InterruptedException e) {
+ log.warn("FlatAppendFile get file correct size
interrupted", e);
+ }
+ }
+ }
+ }
+
public void recoverFileSize() {
if (fileSegmentTable.isEmpty() ||
FileSegmentType.INDEX.equals(fileType)) {
return;
}
FileSegment fileSegment = fileSegmentTable.get(fileSegmentTable.size()
- 1);
- long fileSize = fileSegment.getSize();
- if (fileSize == GET_FILE_SIZE_ERROR) {
- log.warn("FlatAppendFile get last file size error, filePath: {}",
this.filePath);
- return;
- }
+ long fileSize = this.getFileCorrectSize(fileSegment);
if (fileSegment.getCommitPosition() != fileSize) {
fileSegment.initPosition(fileSize);
flushFileSegmentMeta(fileSegment);
@@ -90,7 +115,7 @@ public class FlatAppendFile {
public void initOffset(long offset) {
if (this.fileSegmentTable.isEmpty()) {
FileSegment fileSegment =
fileSegmentFactory.createSegment(fileType, filePath, offset);
- fileSegment.initPosition(fileSegment.getSize());
+ fileSegment.initPosition(this.getFileCorrectSize(fileSegment));
this.flushFileSegmentMeta(fileSegment);
this.fileSegmentTable.add(fileSegment);
}