This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 8ab57e1c0 RATIS-2278. Follower Fails to Append Entries Due to Index
Validation Race Condition in NavigableIndices (#1248)
8ab57e1c0 is described below
commit 8ab57e1c064c6ccfec504597289590ac7ce7b106
Author: GewuNewOne <[email protected]>
AuthorDate: Wed Apr 23 00:23:24 2025 +0800
RATIS-2278. Follower Fails to Append Entries Due to Index Validation Race
Condition in NavigableIndices (#1248)
---
.../org/apache/ratis/server/impl/RaftServerImpl.java | 5 ++++-
.../org/apache/ratis/server/impl/ServerImplUtils.java | 19 ++++++++++++-------
2 files changed, 16 insertions(+), 8 deletions(-)
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index a6798c48b..a3652f497 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -1694,7 +1694,10 @@ class RaftServerImpl implements RaftServer.Division,
final List<ConsecutiveIndices> entriesTermIndices;
try(UncheckedAutoCloseableSupplier<List<LogEntryProto>> entries =
entriesRef.retainAndReleaseOnClose()) {
entriesTermIndices = ConsecutiveIndices.convert(entries.get());
- appendLogTermIndices.append(entriesTermIndices);
+ if (!appendLogTermIndices.append(entriesTermIndices)) {
+ // index already exists, return the last future
+ return appendLogFuture.get();
+ }
}
entriesRef.retain();
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java
index c5010a534..ce4702d95 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java
@@ -136,15 +136,20 @@ public final class ServerImplUtils {
return floorEntry.getValue().getTerm(index);
}
- synchronized void append(List<ConsecutiveIndices> entriesTermIndices) {
- for(ConsecutiveIndices indices : entriesTermIndices) {
- // validate startIndex
- final Map.Entry<Long, ConsecutiveIndices> lastEntry = map.lastEntry();
- if (lastEntry != null) {
- Preconditions.assertSame(lastEntry.getValue().getNextIndex(),
indices.startIndex, "startIndex");
+ synchronized boolean append(List<ConsecutiveIndices> entriesTermIndices) {
+ for(int i = 0; i < entriesTermIndices.size(); i++) {
+ final ConsecutiveIndices indices = entriesTermIndices.get(i);
+ final ConsecutiveIndices previous = map.put(indices.startIndex,
indices);
+ if (previous != null) {
+ // index already exists, revert this append
+ map.put(previous.startIndex, previous);
+ for(int j = 0; j < i; j++) {
+ map.remove(entriesTermIndices.get(j).startIndex);
+ }
+ return false;
}
- map.put(indices.startIndex, indices);
}
+ return true;
}
synchronized void removeExisting(List<ConsecutiveIndices>
entriesTermIndices) {