This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 37a674d8c RATIS-2507. Fix java.lang.IllegalStateException: gap between 
entries (#1439)
37a674d8c is described below

commit 37a674d8c9331bea7bc0b98fc8fced95bf368943
Author: Sammi Chen <[email protected]>
AuthorDate: Sat May 2 04:48:31 2026 +0800

    RATIS-2507. Fix java.lang.IllegalStateException: gap between entries (#1439)
---
 .../ratis/server/raftlog/segmented/LogSegment.java | 31 ++++----
 .../server/raftlog/segmented/SegmentedRaftLog.java |  1 +
 .../raftlog/segmented/SegmentedRaftLogCache.java   |  8 ++-
 .../server/raftlog/segmented/TestLogSegment.java   | 10 +--
 .../raftlog/segmented/TestSegmentedRaftLog.java    | 83 +++++++++++++++++-----
 .../segmented/TestSegmentedRaftLogCache.java       |  2 +-
 6 files changed, 96 insertions(+), 39 deletions(-)

diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
index e9cb2e50f..bb2bde7ed 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
@@ -202,7 +202,7 @@ public final class LogSegment {
     final CorruptionPolicy corruptionPolicy = CorruptionPolicy.get(storage, 
RaftStorage::getLogCorruptionPolicy);
     final boolean isOpen = startEnd.isOpen();
     final int entryCount = readSegmentFile(file, startEnd, maxOpSize, 
corruptionPolicy, raftLogMetrics, entry -> {
-      segment.append(keepEntryInCache || isOpen, entry, Op.LOAD_SEGMENT_FILE);
+      segment.append(keepEntryInCache || isOpen, entry, Op.LOAD_SEGMENT_FILE, 
true);
       if (logConsumer != null) {
         logConsumer.accept(entry);
       }
@@ -353,24 +353,17 @@ public final class LogSegment {
     return CorruptionPolicy.get(storage, RaftStorage::getLogCorruptionPolicy);
   }
 
-  void appendToOpenSegment(LogEntryProto entry, Op op) {
+  void appendToOpenSegment(LogEntryProto entry, Op op, boolean 
verifyEntryIndex) {
     Preconditions.assertTrue(isOpen(), "The log segment %s is not open for 
append", this);
-    append(true, entry, op);
+    append(true, entry, op, verifyEntryIndex);
   }
 
   public static final String APPEND_RECORD = LogSegment.class.getSimpleName() 
+ ".append";
-  private void append(boolean keepEntryInCache, LogEntryProto entry, Op op) {
+  private void append(boolean keepEntryInCache, LogEntryProto entry, Op op, 
boolean verifyEntryIndex) {
     Objects.requireNonNull(entry, "entry == null");
-    final LogRecord currentLast = records.getLast();
-    if (currentLast == null) {
-      Preconditions.assertTrue(entry.getIndex() == startIndex,
-          "gap between start index %s and first entry to append %s",
-          startIndex, entry.getIndex());
-    } else {
-      Preconditions.assertTrue(entry.getIndex() == 
currentLast.getTermIndex().getIndex() + 1,
-          "gap between entries %s and %s", entry.getIndex(), 
currentLast.getTermIndex().getIndex());
+    if (verifyEntryIndex) {
+      verifyEntryIndex(entry.getIndex());
     }
-
     final LogRecord record = new LogRecord(totalFileSize, entry);
     if (keepEntryInCache) {
       // It is important to put the entry into the cache before appending the
@@ -385,6 +378,18 @@ public final class LogSegment {
     endIndex = entry.getIndex();
   }
 
+  void verifyEntryIndex(long entryIndex) {
+    final LogRecord currentLast = records.getLast();
+    if (currentLast == null) {
+      Preconditions.assertTrue(entryIndex == startIndex,
+          "gap between start index %s and first entry to append %s",
+          startIndex, entryIndex);
+    } else {
+      Preconditions.assertTrue(entryIndex == 
currentLast.getTermIndex().getIndex() + 1,
+          "gap between entries %s and %s", entryIndex, 
currentLast.getTermIndex().getIndex());
+    }
+  }
+
   LogEntryProto getEntryFromCache(TermIndex ti) {
     return entryCache.get(ti);
   }
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
index 6bcc3f8e1..a6ea6e3ca 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
@@ -430,6 +430,7 @@ public final class SegmentedRaftLog extends RaftLogBase {
       // If the entry has state machine data, then the entry should be inserted
       // to statemachine first and then to the cache. Not following the order
       // will leave a spurious entry in the cache.
+      cache.verifyAppendEntryIndex(entry);
       CompletableFuture<Long> writeFuture =
           fileLogWorker.writeLogEntry(entry, context).getFuture();
       if (stateMachineCachingEnabled) {
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java
index 46acbcc3d..714943c49 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java
@@ -622,11 +622,17 @@ public class SegmentedRaftLogCache {
     }
   }
 
+  void verifyAppendEntryIndex(LogEntryProto entry) {
+    // SegmentedRaftLog does the segment creation/rolling work.
+    Objects.requireNonNull(openSegment, "openSegment == null");
+    openSegment.verifyEntryIndex(entry.getIndex());
+  }
+
   void appendEntry(LogEntryProto entry, LogSegment.Op op) {
     // SegmentedRaftLog does the segment creation/rolling work. Here we just
     // simply append the entry into the open segment.
     Objects.requireNonNull(openSegment, "openSegment == null");
-    openSegment.appendToOpenSegment(entry, op);
+    openSegment.appendToOpenSegment(entry, op, false);
   }
 
   /**
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestLogSegment.java
 
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestLogSegment.java
index a95c683c8..6a75dfb36 100644
--- 
a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestLogSegment.java
+++ 
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestLogSegment.java
@@ -206,7 +206,7 @@ public class TestLogSegment extends BaseTest {
       SimpleOperation op = new SimpleOperation("m" + i);
       LogEntryProto entry = 
LogProtoUtils.toLogEntryProto(op.getLogEntryContent(), term, i++ + start);
       size += getEntrySize(entry, 
LogSegment.Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE);
-      segment.appendToOpenSegment(entry, 
LogSegment.Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE);
+      segment.appendToOpenSegment(entry, 
LogSegment.Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE, true);
     }
 
     Assertions.assertTrue(segment.getTotalFileSize() >= max);
@@ -238,18 +238,18 @@ public class TestLogSegment extends BaseTest {
     final StateMachineLogEntryProto m = op.getLogEntryContent();
     try {
       LogEntryProto entry = LogProtoUtils.toLogEntryProto(m, 0, 1001);
-      segment.appendToOpenSegment(entry, 
LogSegment.Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE);
+      segment.appendToOpenSegment(entry, 
LogSegment.Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE, true);
       Assertions.fail("should fail since the entry's index needs to be 1000");
     } catch (IllegalStateException e) {
       // the exception is expected.
     }
 
     LogEntryProto entry = LogProtoUtils.toLogEntryProto(m, 0, 1000);
-    segment.appendToOpenSegment(entry, 
LogSegment.Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE);
+    segment.appendToOpenSegment(entry, 
LogSegment.Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE, true);
 
     try {
       entry = LogProtoUtils.toLogEntryProto(m, 0, 1002);
-      segment.appendToOpenSegment(entry, 
LogSegment.Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE);
+      segment.appendToOpenSegment(entry, 
LogSegment.Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE, true);
       Assertions.fail("should fail since the entry's index needs to be 1001");
     } catch (IllegalStateException e) {
       // the exception is expected.
@@ -264,7 +264,7 @@ public class TestLogSegment extends BaseTest {
     for (int i = 0; i < 100; i++) {
       LogEntryProto entry = LogProtoUtils.toLogEntryProto(
           new SimpleOperation("m" + i).getLogEntryContent(), term, i + start);
-      segment.appendToOpenSegment(entry, 
LogSegment.Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE);
+      segment.appendToOpenSegment(entry, 
LogSegment.Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE, true);
     }
 
     // truncate an open segment (remove 1080~1099)
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
 
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
index 43aafc896..181d1fa43 100644
--- 
a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
+++ 
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
@@ -67,7 +67,6 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.LongSupplier;
 import java.util.function.Supplier;
 import java.util.stream.Stream;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Assertions;
@@ -83,6 +82,8 @@ import org.slf4j.event.Level;
 import static java.lang.Boolean.FALSE;
 import static java.lang.Boolean.TRUE;
 import static 
org.apache.ratis.server.raftlog.segmented.SegmentedRaftLogWorker.RUN_WORKER;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.params.provider.Arguments.arguments;
 
@@ -231,7 +232,7 @@ public class TestSegmentedRaftLog extends BaseTest {
       // check if log entries are loaded correctly
       for (LogEntryProto e : entries) {
         LogEntryProto entry = raftLog.get(e.getIndex());
-        Assertions.assertEquals(e, entry);
+        assertEquals(e, entry);
       }
 
       final LogEntryHeader[] termIndices = raftLog.getEntries(0, 500);
@@ -245,7 +246,7 @@ public class TestSegmentedRaftLog extends BaseTest {
           })
           .toArray(LogEntryProto[]::new);
       Assertions.assertArrayEquals(entries, entriesFromLog);
-      Assertions.assertEquals(entries[entries.length - 1], 
getLastEntry(raftLog));
+      assertEquals(entries[entries.length - 1], getLastEntry(raftLog));
 
       final RatisMetricRegistry metricRegistryForLogWorker = 
RaftLogMetricsBase.createRegistry(MEMBER_ID);
 
@@ -400,7 +401,7 @@ public class TestSegmentedRaftLog extends BaseTest {
       raftLog.open(RaftLog.INVALID_LOG_INDEX, null);
       // check if the raft log is correct
       checkEntries(raftLog, entries, 0, entries.size());
-      Assertions.assertEquals(9, raftLog.getRaftLogCache().getNumOfSegments());
+      assertEquals(9, raftLog.getRaftLogCache().getNumOfSegments());
     }
   }
 
@@ -466,12 +467,12 @@ public class TestSegmentedRaftLog extends BaseTest {
       if(!tasksAdded.await(FIVE_SECONDS.getDuration(), 
FIVE_SECONDS.getUnit())) {
         throw new TimeoutException();
       }
-      Assertions.assertEquals(entries.size() + 1, tasksCount.get());
+      assertEquals(entries.size() + 1, tasksCount.get());
 
       // check if the purge task is executed
       final Long purged = purgeFuture.get().get();
       LOG.info("purgeIndex = {}, purged = {}", endIndexOfClosedSegment, 
purged);
-      Assertions.assertEquals(endIndexOfClosedSegment, 
raftLog.getRaftLogCache().getStartIndex());
+      assertEquals(endIndexOfClosedSegment, 
raftLog.getRaftLogCache().getStartIndex());
 
       // check if the appendEntry futures are done
       JavaUtils.allOf(appendFutures).get(FIVE_SECONDS.getDuration(), 
FIVE_SECONDS.getUnit());
@@ -515,7 +516,7 @@ public class TestSegmentedRaftLog extends BaseTest {
       raftLog.open(RaftLog.INVALID_LOG_INDEX, null);
       // check if the raft log is correct
       if (fromIndex > 0) {
-        Assertions.assertEquals(entries.get((int) (fromIndex - 1)),
+        assertEquals(entries.get((int) (fromIndex - 1)),
             getLastEntry(raftLog));
       } else {
         Assertions.assertNull(raftLog.getLastEntryTermIndex());
@@ -529,7 +530,7 @@ public class TestSegmentedRaftLog extends BaseTest {
     if (size > 0) {
       for (int i = offset; i < size + offset; i++) {
         LogEntryProto entry = raftLog.get(expected.get(i).getIndex());
-        Assertions.assertEquals(expected.get(i), entry);
+        assertEquals(expected.get(i), entry);
       }
       final LogEntryHeader[] termIndices = raftLog.getEntries(
           expected.get(offset).getIndex(),
@@ -637,7 +638,7 @@ public class TestSegmentedRaftLog extends BaseTest {
       final CompletableFuture<Long> f = raftLog.purge(purgeIndex);
       final Long purged = f.get();
       LOG.info("purgeIndex = {}, purged = {}", purgeIndex, purged);
-      Assertions.assertEquals(expectedIndex, 
raftLog.getRaftLogCache().getStartIndex());
+      assertEquals(expectedIndex, raftLog.getRaftLogCache().getStartIndex());
     }
   }
 
@@ -681,9 +682,9 @@ public class TestSegmentedRaftLog extends BaseTest {
       checkFailedEntries(entries, 650, retryCache);
       checkEntries(raftLog, entries, 0, 650);
       checkEntries(raftLog, newEntries, 100, 100);
-      Assertions.assertEquals(newEntries.get(newEntries.size() - 1),
+      assertEquals(newEntries.get(newEntries.size() - 1),
           getLastEntry(raftLog));
-      Assertions.assertEquals(newEntries.get(newEntries.size() - 1).getIndex(),
+      assertEquals(newEntries.get(newEntries.size() - 1).getIndex(),
           raftLog.getFlushIndex());
     }
 
@@ -693,13 +694,57 @@ public class TestSegmentedRaftLog extends BaseTest {
       raftLog.open(RaftLog.INVALID_LOG_INDEX, null);
       checkEntries(raftLog, entries, 0, 650);
       checkEntries(raftLog, newEntries, 100, 100);
-      Assertions.assertEquals(newEntries.get(newEntries.size() - 1),
+      assertEquals(newEntries.get(newEntries.size() - 1),
           getLastEntry(raftLog));
-      Assertions.assertEquals(newEntries.get(newEntries.size() - 1).getIndex(),
+      assertEquals(newEntries.get(newEntries.size() - 1).getIndex(),
           raftLog.getFlushIndex());
 
       SegmentedRaftLogCache cache = raftLog.getRaftLogCache();
-      Assertions.assertEquals(5, cache.getNumOfSegments());
+      assertEquals(5, cache.getNumOfSegments());
+    }
+  }
+
+  @ParameterizedTest
+  @MethodSource("data")
+  public void testAppendEntriesWithGap(Boolean useAsyncFlush, Boolean 
smSyncFlush) throws Exception {
+    RaftServerConfigKeys.Log.setAsyncFlushEnabled(properties, useAsyncFlush);
+    RaftServerConfigKeys.Log.StateMachineData.setSync(properties, smSyncFlush);
+    // prepare the log for truncation
+    List<SegmentRange> ranges = prepareRanges(0, 5, 200, 0);
+    List<LogEntryProto> entries = prepareLogEntries(ranges, null);
+
+    final RetryCache retryCache = RetryCacheTestUtil.createRetryCache();
+    try (SegmentedRaftLog raftLog =
+             RetryCacheTestUtil.newSegmentedRaftLog(MEMBER_ID, retryCache, 
storage, properties)) {
+      raftLog.open(RaftLog.INVALID_LOG_INDEX, null);
+      entries.forEach(entry -> RetryCacheTestUtil.createEntry(retryCache, 
entry));
+      // append entries to the raftlog
+      
entries.stream().map(raftLog::appendEntry).forEach(CompletableFuture::join);
+    }
+
+    long lastIndex = ranges.get(ranges.size() - 1).end;
+    long snapshotIndex = lastIndex + 100;
+    LogEntryProto entryProto = prepareLogEntry(4, snapshotIndex + 1, null, 
false);
+    final LongSupplier getSnapshotIndexFromStateMachine = new LongSupplier() {
+      @Override
+      public long getAsLong() {
+        return snapshotIndex;
+      }
+    };
+    try (SegmentedRaftLog raftLog = 
newSegmentedRaftLog(getSnapshotIndexFromStateMachine)) {
+      raftLog.open(RaftLog.INVALID_LOG_INDEX, null);
+      // Assert the wrapped exception
+      IllegalStateException exception = 
assertThrows(IllegalStateException.class,
+          () -> raftLog.appendEntry(entryProto));
+      // Assert the original cause
+      assertTrue(exception.getMessage().contains("gap between entries"));
+    }
+
+    // load the raftlog again and check
+    try (SegmentedRaftLog raftLog =
+             RetryCacheTestUtil.newSegmentedRaftLog(MEMBER_ID, retryCache, 
storage, properties)) {
+      raftLog.open(RaftLog.INVALID_LOG_INDEX, null);
+      Assertions.assertEquals(lastIndex, 
raftLog.getRaftLogCache().getEndIndex());
     }
   }
 
@@ -795,7 +840,7 @@ public class TestSegmentedRaftLog extends BaseTest {
       // SegmentedRaftLogWorker should catch TimeoutIOException
       CompletableFuture<Long> f = raftLog.appendEntry(entry);
       // Wait for async writeStateMachineData to finish
-      ex = Assertions.assertThrows(ExecutionException.class, f::get);
+      ex = assertThrows(ExecutionException.class, f::get);
     }
     Assertions.assertSame(LifeCycle.State.PAUSED, sm.getLifeCycleState());
     Assertions.assertInstanceOf(TimeoutIOException.class, ex.getCause());
@@ -815,9 +860,9 @@ public class TestSegmentedRaftLog extends BaseTest {
 
   void assertIndices(RaftLog raftLog, long expectedFlushIndex, long 
expectedNextIndex) {
     LOG.info("assert expectedFlushIndex={}", expectedFlushIndex);
-    Assertions.assertEquals(expectedFlushIndex, raftLog.getFlushIndex());
+    assertEquals(expectedFlushIndex, raftLog.getFlushIndex());
     LOG.info("assert expectedNextIndex={}", expectedNextIndex);
-    Assertions.assertEquals(expectedNextIndex, raftLog.getNextIndex());
+    assertEquals(expectedNextIndex, raftLog.getNextIndex());
   }
 
   void assertIndicesMultipleAttempts(RaftLog raftLog, long expectedFlushIndex, 
long expectedNextIndex)
@@ -938,10 +983,10 @@ public class TestSegmentedRaftLog extends BaseTest {
 
       // When the reader's get() call completed, the append was fully finished,
       // so it should have returned the correct entry.
-      Assertions.assertEquals(newEntry.getIndex(), 
raftLog.getLastEntryTermIndex().getIndex());
+      assertEquals(newEntry.getIndex(), 
raftLog.getLastEntryTermIndex().getIndex());
       readEntry.set(raftLog.get(newEntry.getIndex()));
       Assertions.assertNotNull(readEntry.get());
-      Assertions.assertEquals(newEntry, readEntry.get());
+      assertEquals(newEntry, readEntry.get());
     } finally {
       CodeInjectionForTesting.remove(LogSegment.APPEND_RECORD);
     }
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLogCache.java
 
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLogCache.java
index 532e32c87..3133fb36f 100644
--- 
a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLogCache.java
+++ 
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLogCache.java
@@ -63,7 +63,7 @@ public class TestSegmentedRaftLogCache {
     for (long i = start; i <= end; i++) {
       SimpleOperation m = new SimpleOperation("m" + i);
       LogEntryProto entry = 
LogProtoUtils.toLogEntryProto(m.getLogEntryContent(), 0, i);
-      s.appendToOpenSegment(entry, 
LogSegment.Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE);
+      s.appendToOpenSegment(entry, 
LogSegment.Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE, true);
     }
     if (!isOpen) {
       s.close();

Reply via email to