This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 37a674d8c RATIS-2507. Fix java.lang.IllegalStateException: gap between
entries (#1439)
37a674d8c is described below
commit 37a674d8c9331bea7bc0b98fc8fced95bf368943
Author: Sammi Chen <[email protected]>
AuthorDate: Sat May 2 04:48:31 2026 +0800
RATIS-2507. Fix java.lang.IllegalStateException: gap between entries (#1439)
---
.../ratis/server/raftlog/segmented/LogSegment.java | 31 ++++----
.../server/raftlog/segmented/SegmentedRaftLog.java | 1 +
.../raftlog/segmented/SegmentedRaftLogCache.java | 8 ++-
.../server/raftlog/segmented/TestLogSegment.java | 10 +--
.../raftlog/segmented/TestSegmentedRaftLog.java | 83 +++++++++++++++++-----
.../segmented/TestSegmentedRaftLogCache.java | 2 +-
6 files changed, 96 insertions(+), 39 deletions(-)
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
index e9cb2e50f..bb2bde7ed 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
@@ -202,7 +202,7 @@ public final class LogSegment {
final CorruptionPolicy corruptionPolicy = CorruptionPolicy.get(storage,
RaftStorage::getLogCorruptionPolicy);
final boolean isOpen = startEnd.isOpen();
final int entryCount = readSegmentFile(file, startEnd, maxOpSize,
corruptionPolicy, raftLogMetrics, entry -> {
- segment.append(keepEntryInCache || isOpen, entry, Op.LOAD_SEGMENT_FILE);
+ segment.append(keepEntryInCache || isOpen, entry, Op.LOAD_SEGMENT_FILE,
true);
if (logConsumer != null) {
logConsumer.accept(entry);
}
@@ -353,24 +353,17 @@ public final class LogSegment {
return CorruptionPolicy.get(storage, RaftStorage::getLogCorruptionPolicy);
}
- void appendToOpenSegment(LogEntryProto entry, Op op) {
+ void appendToOpenSegment(LogEntryProto entry, Op op, boolean
verifyEntryIndex) {
Preconditions.assertTrue(isOpen(), "The log segment %s is not open for
append", this);
- append(true, entry, op);
+ append(true, entry, op, verifyEntryIndex);
}
public static final String APPEND_RECORD = LogSegment.class.getSimpleName()
+ ".append";
- private void append(boolean keepEntryInCache, LogEntryProto entry, Op op) {
+ private void append(boolean keepEntryInCache, LogEntryProto entry, Op op,
boolean verifyEntryIndex) {
Objects.requireNonNull(entry, "entry == null");
- final LogRecord currentLast = records.getLast();
- if (currentLast == null) {
- Preconditions.assertTrue(entry.getIndex() == startIndex,
- "gap between start index %s and first entry to append %s",
- startIndex, entry.getIndex());
- } else {
- Preconditions.assertTrue(entry.getIndex() ==
currentLast.getTermIndex().getIndex() + 1,
- "gap between entries %s and %s", entry.getIndex(),
currentLast.getTermIndex().getIndex());
+ if (verifyEntryIndex) {
+ verifyEntryIndex(entry.getIndex());
}
-
final LogRecord record = new LogRecord(totalFileSize, entry);
if (keepEntryInCache) {
// It is important to put the entry into the cache before appending the
@@ -385,6 +378,18 @@ public final class LogSegment {
endIndex = entry.getIndex();
}
+ void verifyEntryIndex(long entryIndex) {
+ final LogRecord currentLast = records.getLast();
+ if (currentLast == null) {
+ Preconditions.assertTrue(entryIndex == startIndex,
+ "gap between start index %s and first entry to append %s",
+ startIndex, entryIndex);
+ } else {
+ Preconditions.assertTrue(entryIndex ==
currentLast.getTermIndex().getIndex() + 1,
+ "gap between entries %s and %s", entryIndex,
currentLast.getTermIndex().getIndex());
+ }
+ }
+
LogEntryProto getEntryFromCache(TermIndex ti) {
return entryCache.get(ti);
}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
index 6bcc3f8e1..a6ea6e3ca 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
@@ -430,6 +430,7 @@ public final class SegmentedRaftLog extends RaftLogBase {
// If the entry has state machine data, then the entry should be inserted
// to statemachine first and then to the cache. Not following the order
// will leave a spurious entry in the cache.
+ cache.verifyAppendEntryIndex(entry);
CompletableFuture<Long> writeFuture =
fileLogWorker.writeLogEntry(entry, context).getFuture();
if (stateMachineCachingEnabled) {
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java
index 46acbcc3d..714943c49 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java
@@ -622,11 +622,17 @@ public class SegmentedRaftLogCache {
}
}
+ void verifyAppendEntryIndex(LogEntryProto entry) {
+ // SegmentedRaftLog does the segment creation/rolling work.
+ Objects.requireNonNull(openSegment, "openSegment == null");
+ openSegment.verifyEntryIndex(entry.getIndex());
+ }
+
void appendEntry(LogEntryProto entry, LogSegment.Op op) {
// SegmentedRaftLog does the segment creation/rolling work. Here we just
// simply append the entry into the open segment.
Objects.requireNonNull(openSegment, "openSegment == null");
- openSegment.appendToOpenSegment(entry, op);
+ openSegment.appendToOpenSegment(entry, op, false);
}
/**
diff --git
a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestLogSegment.java
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestLogSegment.java
index a95c683c8..6a75dfb36 100644
---
a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestLogSegment.java
+++
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestLogSegment.java
@@ -206,7 +206,7 @@ public class TestLogSegment extends BaseTest {
SimpleOperation op = new SimpleOperation("m" + i);
LogEntryProto entry =
LogProtoUtils.toLogEntryProto(op.getLogEntryContent(), term, i++ + start);
size += getEntrySize(entry,
LogSegment.Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE);
- segment.appendToOpenSegment(entry,
LogSegment.Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE);
+ segment.appendToOpenSegment(entry,
LogSegment.Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE, true);
}
Assertions.assertTrue(segment.getTotalFileSize() >= max);
@@ -238,18 +238,18 @@ public class TestLogSegment extends BaseTest {
final StateMachineLogEntryProto m = op.getLogEntryContent();
try {
LogEntryProto entry = LogProtoUtils.toLogEntryProto(m, 0, 1001);
- segment.appendToOpenSegment(entry,
LogSegment.Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE);
+ segment.appendToOpenSegment(entry,
LogSegment.Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE, true);
Assertions.fail("should fail since the entry's index needs to be 1000");
} catch (IllegalStateException e) {
// the exception is expected.
}
LogEntryProto entry = LogProtoUtils.toLogEntryProto(m, 0, 1000);
- segment.appendToOpenSegment(entry,
LogSegment.Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE);
+ segment.appendToOpenSegment(entry,
LogSegment.Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE, true);
try {
entry = LogProtoUtils.toLogEntryProto(m, 0, 1002);
- segment.appendToOpenSegment(entry,
LogSegment.Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE);
+ segment.appendToOpenSegment(entry,
LogSegment.Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE, true);
Assertions.fail("should fail since the entry's index needs to be 1001");
} catch (IllegalStateException e) {
// the exception is expected.
@@ -264,7 +264,7 @@ public class TestLogSegment extends BaseTest {
for (int i = 0; i < 100; i++) {
LogEntryProto entry = LogProtoUtils.toLogEntryProto(
new SimpleOperation("m" + i).getLogEntryContent(), term, i + start);
- segment.appendToOpenSegment(entry,
LogSegment.Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE);
+ segment.appendToOpenSegment(entry,
LogSegment.Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE, true);
}
// truncate an open segment (remove 1080~1099)
diff --git
a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
index 43aafc896..181d1fa43 100644
---
a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
+++
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
@@ -67,7 +67,6 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
import java.util.stream.Stream;
-import java.util.concurrent.atomic.AtomicBoolean;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
@@ -83,6 +82,8 @@ import org.slf4j.event.Level;
import static java.lang.Boolean.FALSE;
import static java.lang.Boolean.TRUE;
import static
org.apache.ratis.server.raftlog.segmented.SegmentedRaftLogWorker.RUN_WORKER;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.params.provider.Arguments.arguments;
@@ -231,7 +232,7 @@ public class TestSegmentedRaftLog extends BaseTest {
// check if log entries are loaded correctly
for (LogEntryProto e : entries) {
LogEntryProto entry = raftLog.get(e.getIndex());
- Assertions.assertEquals(e, entry);
+ assertEquals(e, entry);
}
final LogEntryHeader[] termIndices = raftLog.getEntries(0, 500);
@@ -245,7 +246,7 @@ public class TestSegmentedRaftLog extends BaseTest {
})
.toArray(LogEntryProto[]::new);
Assertions.assertArrayEquals(entries, entriesFromLog);
- Assertions.assertEquals(entries[entries.length - 1],
getLastEntry(raftLog));
+ assertEquals(entries[entries.length - 1], getLastEntry(raftLog));
final RatisMetricRegistry metricRegistryForLogWorker =
RaftLogMetricsBase.createRegistry(MEMBER_ID);
@@ -400,7 +401,7 @@ public class TestSegmentedRaftLog extends BaseTest {
raftLog.open(RaftLog.INVALID_LOG_INDEX, null);
// check if the raft log is correct
checkEntries(raftLog, entries, 0, entries.size());
- Assertions.assertEquals(9, raftLog.getRaftLogCache().getNumOfSegments());
+ assertEquals(9, raftLog.getRaftLogCache().getNumOfSegments());
}
}
@@ -466,12 +467,12 @@ public class TestSegmentedRaftLog extends BaseTest {
if(!tasksAdded.await(FIVE_SECONDS.getDuration(),
FIVE_SECONDS.getUnit())) {
throw new TimeoutException();
}
- Assertions.assertEquals(entries.size() + 1, tasksCount.get());
+ assertEquals(entries.size() + 1, tasksCount.get());
// check if the purge task is executed
final Long purged = purgeFuture.get().get();
LOG.info("purgeIndex = {}, purged = {}", endIndexOfClosedSegment,
purged);
- Assertions.assertEquals(endIndexOfClosedSegment,
raftLog.getRaftLogCache().getStartIndex());
+ assertEquals(endIndexOfClosedSegment,
raftLog.getRaftLogCache().getStartIndex());
// check if the appendEntry futures are done
JavaUtils.allOf(appendFutures).get(FIVE_SECONDS.getDuration(),
FIVE_SECONDS.getUnit());
@@ -515,7 +516,7 @@ public class TestSegmentedRaftLog extends BaseTest {
raftLog.open(RaftLog.INVALID_LOG_INDEX, null);
// check if the raft log is correct
if (fromIndex > 0) {
- Assertions.assertEquals(entries.get((int) (fromIndex - 1)),
+ assertEquals(entries.get((int) (fromIndex - 1)),
getLastEntry(raftLog));
} else {
Assertions.assertNull(raftLog.getLastEntryTermIndex());
@@ -529,7 +530,7 @@ public class TestSegmentedRaftLog extends BaseTest {
if (size > 0) {
for (int i = offset; i < size + offset; i++) {
LogEntryProto entry = raftLog.get(expected.get(i).getIndex());
- Assertions.assertEquals(expected.get(i), entry);
+ assertEquals(expected.get(i), entry);
}
final LogEntryHeader[] termIndices = raftLog.getEntries(
expected.get(offset).getIndex(),
@@ -637,7 +638,7 @@ public class TestSegmentedRaftLog extends BaseTest {
final CompletableFuture<Long> f = raftLog.purge(purgeIndex);
final Long purged = f.get();
LOG.info("purgeIndex = {}, purged = {}", purgeIndex, purged);
- Assertions.assertEquals(expectedIndex,
raftLog.getRaftLogCache().getStartIndex());
+ assertEquals(expectedIndex, raftLog.getRaftLogCache().getStartIndex());
}
}
@@ -681,9 +682,9 @@ public class TestSegmentedRaftLog extends BaseTest {
checkFailedEntries(entries, 650, retryCache);
checkEntries(raftLog, entries, 0, 650);
checkEntries(raftLog, newEntries, 100, 100);
- Assertions.assertEquals(newEntries.get(newEntries.size() - 1),
+ assertEquals(newEntries.get(newEntries.size() - 1),
getLastEntry(raftLog));
- Assertions.assertEquals(newEntries.get(newEntries.size() - 1).getIndex(),
+ assertEquals(newEntries.get(newEntries.size() - 1).getIndex(),
raftLog.getFlushIndex());
}
@@ -693,13 +694,57 @@ public class TestSegmentedRaftLog extends BaseTest {
raftLog.open(RaftLog.INVALID_LOG_INDEX, null);
checkEntries(raftLog, entries, 0, 650);
checkEntries(raftLog, newEntries, 100, 100);
- Assertions.assertEquals(newEntries.get(newEntries.size() - 1),
+ assertEquals(newEntries.get(newEntries.size() - 1),
getLastEntry(raftLog));
- Assertions.assertEquals(newEntries.get(newEntries.size() - 1).getIndex(),
+ assertEquals(newEntries.get(newEntries.size() - 1).getIndex(),
raftLog.getFlushIndex());
SegmentedRaftLogCache cache = raftLog.getRaftLogCache();
- Assertions.assertEquals(5, cache.getNumOfSegments());
+ assertEquals(5, cache.getNumOfSegments());
+ }
+ }
+
+ @ParameterizedTest
+ @MethodSource("data")
+ public void testAppendEntriesWithGap(Boolean useAsyncFlush, Boolean
smSyncFlush) throws Exception {
+ RaftServerConfigKeys.Log.setAsyncFlushEnabled(properties, useAsyncFlush);
+ RaftServerConfigKeys.Log.StateMachineData.setSync(properties, smSyncFlush);
+ // prepare the log for truncation
+ List<SegmentRange> ranges = prepareRanges(0, 5, 200, 0);
+ List<LogEntryProto> entries = prepareLogEntries(ranges, null);
+
+ final RetryCache retryCache = RetryCacheTestUtil.createRetryCache();
+ try (SegmentedRaftLog raftLog =
+ RetryCacheTestUtil.newSegmentedRaftLog(MEMBER_ID, retryCache,
storage, properties)) {
+ raftLog.open(RaftLog.INVALID_LOG_INDEX, null);
+ entries.forEach(entry -> RetryCacheTestUtil.createEntry(retryCache,
entry));
+ // append entries to the raftlog
+
entries.stream().map(raftLog::appendEntry).forEach(CompletableFuture::join);
+ }
+
+ long lastIndex = ranges.get(ranges.size() - 1).end;
+ long snapshotIndex = lastIndex + 100;
+ LogEntryProto entryProto = prepareLogEntry(4, snapshotIndex + 1, null,
false);
+ final LongSupplier getSnapshotIndexFromStateMachine = new LongSupplier() {
+ @Override
+ public long getAsLong() {
+ return snapshotIndex;
+ }
+ };
+ try (SegmentedRaftLog raftLog =
newSegmentedRaftLog(getSnapshotIndexFromStateMachine)) {
+ raftLog.open(RaftLog.INVALID_LOG_INDEX, null);
+ // Assert the wrapped exception
+ IllegalStateException exception =
assertThrows(IllegalStateException.class,
+ () -> raftLog.appendEntry(entryProto));
+ // Assert the original cause
+ assertTrue(exception.getMessage().contains("gap between entries"));
+ }
+
+ // load the raftlog again and check
+ try (SegmentedRaftLog raftLog =
+ RetryCacheTestUtil.newSegmentedRaftLog(MEMBER_ID, retryCache,
storage, properties)) {
+ raftLog.open(RaftLog.INVALID_LOG_INDEX, null);
+ Assertions.assertEquals(lastIndex,
raftLog.getRaftLogCache().getEndIndex());
}
}
@@ -795,7 +840,7 @@ public class TestSegmentedRaftLog extends BaseTest {
// SegmentedRaftLogWorker should catch TimeoutIOException
CompletableFuture<Long> f = raftLog.appendEntry(entry);
// Wait for async writeStateMachineData to finish
- ex = Assertions.assertThrows(ExecutionException.class, f::get);
+ ex = assertThrows(ExecutionException.class, f::get);
}
Assertions.assertSame(LifeCycle.State.PAUSED, sm.getLifeCycleState());
Assertions.assertInstanceOf(TimeoutIOException.class, ex.getCause());
@@ -815,9 +860,9 @@ public class TestSegmentedRaftLog extends BaseTest {
void assertIndices(RaftLog raftLog, long expectedFlushIndex, long
expectedNextIndex) {
LOG.info("assert expectedFlushIndex={}", expectedFlushIndex);
- Assertions.assertEquals(expectedFlushIndex, raftLog.getFlushIndex());
+ assertEquals(expectedFlushIndex, raftLog.getFlushIndex());
LOG.info("assert expectedNextIndex={}", expectedNextIndex);
- Assertions.assertEquals(expectedNextIndex, raftLog.getNextIndex());
+ assertEquals(expectedNextIndex, raftLog.getNextIndex());
}
void assertIndicesMultipleAttempts(RaftLog raftLog, long expectedFlushIndex,
long expectedNextIndex)
@@ -938,10 +983,10 @@ public class TestSegmentedRaftLog extends BaseTest {
// When the reader's get() call completed, the append was fully finished,
// so it should have returned the correct entry.
- Assertions.assertEquals(newEntry.getIndex(),
raftLog.getLastEntryTermIndex().getIndex());
+ assertEquals(newEntry.getIndex(),
raftLog.getLastEntryTermIndex().getIndex());
readEntry.set(raftLog.get(newEntry.getIndex()));
Assertions.assertNotNull(readEntry.get());
- Assertions.assertEquals(newEntry, readEntry.get());
+ assertEquals(newEntry, readEntry.get());
} finally {
CodeInjectionForTesting.remove(LogSegment.APPEND_RECORD);
}
diff --git
a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLogCache.java
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLogCache.java
index 532e32c87..3133fb36f 100644
---
a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLogCache.java
+++
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLogCache.java
@@ -63,7 +63,7 @@ public class TestSegmentedRaftLogCache {
for (long i = start; i <= end; i++) {
SimpleOperation m = new SimpleOperation("m" + i);
LogEntryProto entry =
LogProtoUtils.toLogEntryProto(m.getLogEntryContent(), 0, i);
- s.appendToOpenSegment(entry,
LogSegment.Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE);
+ s.appendToOpenSegment(entry,
LogSegment.Op.WRITE_CACHE_WITHOUT_STATE_MACHINE_CACHE, true);
}
if (!isOpen) {
s.close();