This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 65fd44453 RATIS-1995. Prevent data loss when a storage is accidentally
re-formatted. (#1261)
65fd44453 is described below
commit 65fd4445335d0500fd372f37c8b7cb3c39259e87
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Fri May 16 04:43:24 2025 -0700
RATIS-1995. Prevent data loss when a storage is accidentally re-formatted.
(#1261)
---
ratis-proto/src/main/proto/Raft.proto | 1 +
.../apache/ratis/server/protocol/TermIndex.java | 15 ++
.../apache/ratis/server/impl/LeaderElection.java | 191 +++++++++++++++++---
.../apache/ratis/server/impl/RaftServerImpl.java | 2 +-
.../apache/ratis/server/impl/ServerProtoUtils.java | 4 +-
.../ratis/server/util/ServerStringUtils.java | 3 +-
.../impl/TestLeaderElectionServerInterface.java | 193 +++++++++++++++++++++
7 files changed, 382 insertions(+), 27 deletions(-)
diff --git a/ratis-proto/src/main/proto/Raft.proto
b/ratis-proto/src/main/proto/Raft.proto
index 7cf2fd87c..6dbfdb15a 100644
--- a/ratis-proto/src/main/proto/Raft.proto
+++ b/ratis-proto/src/main/proto/Raft.proto
@@ -169,6 +169,7 @@ message RequestVoteReplyProto {
RaftRpcReplyProto serverReply = 1;
uint64 term = 2;
bool shouldShutdown = 3;
+ TermIndexProto lastEntry = 4; // to determine if the voter log is empty.
}
message CommitInfoProto {
diff --git
a/ratis-server-api/src/main/java/org/apache/ratis/server/protocol/TermIndex.java
b/ratis-server-api/src/main/java/org/apache/ratis/server/protocol/TermIndex.java
index 6115bccad..369aefc85 100644
---
a/ratis-server-api/src/main/java/org/apache/ratis/server/protocol/TermIndex.java
+++
b/ratis-server-api/src/main/java/org/apache/ratis/server/protocol/TermIndex.java
@@ -21,9 +21,11 @@ import org.apache.ratis.proto.RaftProtos.LogEntryProto;
import org.apache.ratis.proto.RaftProtos.TermIndexProto;
import org.apache.ratis.server.raftlog.RaftLog;
import org.apache.ratis.util.BiWeakValueCache;
+import org.apache.ratis.util.MemoizedSupplier;
import java.util.Comparator;
import java.util.Optional;
+import java.util.function.Supplier;
/** The term and the log index defined in the Raft consensus algorithm. */
public interface TermIndex extends Comparable<TermIndex> {
@@ -37,6 +39,7 @@ public interface TermIndex extends Comparable<TermIndex> {
* are respectively 1 and 0 (= {@link RaftLog#LEAST_VALID_LOG_INDEX}).
*/
TermIndex INITIAL_VALUE = valueOf(0, RaftLog.INVALID_LOG_INDEX);
+ TermIndex PROTO_DEFAULT = valueOf(TermIndexProto.getDefaultInstance());
/** An empty {@link TermIndex} array. */
TermIndex[] EMPTY_ARRAY = {};
@@ -93,6 +96,8 @@ public interface TermIndex extends Comparable<TermIndex> {
private static TermIndex newTermIndex(long term, long index) {
return new TermIndex() {
+ private final Supplier<TermIndexProto> protoSupplier =
MemoizedSupplier.valueOf(TermIndex.super::toProto);
+
@Override
public long getTerm() {
return term;
@@ -121,12 +126,22 @@ public interface TermIndex extends Comparable<TermIndex> {
return Long.hashCode(term) ^ Long.hashCode(index);
}
+ @Override
+ public TermIndexProto toProto() {
+ return protoSupplier.get();
+ }
+
private String longToString(long n) {
return n >= 0L ? String.valueOf(n) : "~";
}
@Override
public String toString() {
+ if (this.equals(INITIAL_VALUE)) {
+ return "<INITIAL_VALUE>";
+ } else if (this.equals(PROTO_DEFAULT)) {
+ return "<PROTO_DEFAULT>";
+ }
return String.format("(t:%s, i:%s)", longToString(term),
longToString(index));
}
};
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
index 439405871..9953e12af 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
@@ -20,12 +20,14 @@ package org.apache.ratis.server.impl;
import org.apache.ratis.metrics.Timekeeper;
import org.apache.ratis.proto.RaftProtos.RequestVoteReplyProto;
import org.apache.ratis.proto.RaftProtos.RequestVoteRequestProto;
+import org.apache.ratis.proto.RaftProtos.TermIndexProto;
+import org.apache.ratis.protocol.RaftGroupMemberId;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
-import org.apache.ratis.server.DivisionInfo;
import org.apache.ratis.server.RaftConfiguration;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.server.raftlog.RaftLog;
import org.apache.ratis.server.util.ServerStringUtils;
import
org.apache.ratis.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.ratis.util.Daemon;
@@ -78,6 +80,121 @@ import static
org.apache.ratis.util.LifeCycle.State.STARTING;
class LeaderElection implements Runnable {
public static final Logger LOG =
LoggerFactory.getLogger(LeaderElection.class);
+ interface ServerInterface {
+ default RaftPeerId getId() {
+ return getMemberId().getPeerId();
+ }
+
+ RaftGroupMemberId getMemberId();
+ boolean isAlive();
+ boolean isCandidate();
+
+ long getCurrentTerm();
+ long getLastCommittedIndex();
+ TermIndex getLastEntry();
+
+ boolean isPreVoteEnabled();
+ ConfAndTerm initElection(Phase phase) throws IOException;
+ RequestVoteReplyProto requestVote(RequestVoteRequestProto r) throws
IOException;
+
+ void changeToLeader();
+ void rejected(long term, ResultAndTerm result) throws IOException;
+ void shutdown();
+
+ Timekeeper getLeaderElectionTimer();
+ void onNewLeaderElectionCompletion();
+
+ TimeDuration getRandomElectionTimeout();
+ ThreadGroup getThreadGroup();
+
+ static ServerInterface get(RaftServerImpl server) {
+ final boolean preVote =
RaftServerConfigKeys.LeaderElection.preVote(server.getRaftServer().getProperties());
+
+ return new ServerInterface() {
+ @Override
+ public RaftGroupMemberId getMemberId() {
+ return server.getMemberId();
+ }
+
+ @Override
+ public boolean isAlive() {
+ return server.getInfo().isAlive();
+ }
+
+ @Override
+ public boolean isCandidate() {
+ return server.getInfo().isCandidate();
+ }
+
+ @Override
+ public long getCurrentTerm() {
+ return server.getState().getCurrentTerm();
+ }
+
+ @Override
+ public long getLastCommittedIndex() {
+ return server.getRaftLog().getLastCommittedIndex();
+ }
+
+ @Override
+ public TermIndex getLastEntry() {
+ return server.getState().getLastEntry();
+ }
+
+ @Override
+ public boolean isPreVoteEnabled() {
+ return preVote;
+ }
+
+ @Override
+ public ConfAndTerm initElection(Phase phase) throws IOException {
+ return server.getState().initElection(phase);
+ }
+
+ @Override
+ public RequestVoteReplyProto requestVote(RequestVoteRequestProto r)
throws IOException {
+ return server.getServerRpc().requestVote(r);
+ }
+
+ @Override
+ public void changeToLeader() {
+ server.changeToLeader();
+ }
+
+ @Override
+ public void rejected(long term, ResultAndTerm result) throws
IOException {
+ server.changeToFollowerAndPersistMetadata(term, false, result);
+ }
+
+ @Override
+ public void shutdown() {
+ server.close();
+
server.getStateMachine().event().notifyServerShutdown(server.getRoleInfoProto(),
false);
+ }
+
+ @Override
+ public Timekeeper getLeaderElectionTimer() {
+ return server.getLeaderElectionMetrics().getLeaderElectionTimer();
+ }
+
+ @Override
+ public void onNewLeaderElectionCompletion() {
+ server.getLeaderElectionMetrics().onNewLeaderElectionCompletion();
+ }
+
+ @Override
+ public TimeDuration getRandomElectionTimeout() {
+ return server.getRandomElectionTimeout();
+ }
+
+ @Override
+ public ThreadGroup getThreadGroup() {
+ return server.getThreadGroup();
+ }
+ };
+ }
+ }
+
private ResultAndTerm logAndReturn(Phase phase, Result result,
Map<RaftPeerId, RequestVoteReplyProto> responses,
List<Exception> exceptions) {
return logAndReturn(phase, result, responses, exceptions, null);
@@ -106,7 +223,7 @@ class LeaderElection implements Runnable {
enum Result {PASSED, SINGLE_MODE_PASSED, REJECTED, TIMEOUT,
DISCOVERED_A_NEW_TERM, SHUTDOWN, NOT_IN_CONF}
- private static class ResultAndTerm {
+ static class ResultAndTerm {
private final Result result;
private final Long term;
@@ -185,22 +302,24 @@ class LeaderElection implements Runnable {
private final Daemon daemon;
private final CompletableFuture<Void> stopped = new CompletableFuture<>();
- private final RaftServerImpl server;
+ private final ServerInterface server;
private final boolean skipPreVote;
private final ConfAndTerm round0;
LeaderElection(RaftServerImpl server, boolean force) {
+ this(ServerInterface.get(server), force);
+ }
+
+ LeaderElection(ServerInterface server, boolean force) {
this.name = ServerStringUtils.generateUnifiedName(server.getMemberId(),
getClass()) + COUNT.incrementAndGet();
this.lifeCycle = new LifeCycle(this);
this.daemon = Daemon.newBuilder().setName(name).setRunnable(this)
.setThreadGroup(server.getThreadGroup()).build();
this.server = server;
- this.skipPreVote = force ||
- !RaftServerConfigKeys.LeaderElection.preVote(
- server.getRaftServer().getProperties());
+ this.skipPreVote = force || !server.isPreVoteEnabled();
try {
// increase term of the candidate in advance if it's forced to election
- this.round0 = force ? server.getState().initElection(Phase.ELECTION) :
null;
+ this.round0 = force ? server.initElection(Phase.ELECTION) : null;
} catch (IOException e) {
throw new IllegalStateException(name + ": Failed to initialize
election", e);
}
@@ -250,7 +369,7 @@ class LeaderElection implements Runnable {
return;
}
- try (AutoCloseable ignored =
Timekeeper.start(server.getLeaderElectionMetrics().getLeaderElectionTimer())) {
+ try (AutoCloseable ignored =
Timekeeper.start(server.getLeaderElectionTimer())) {
for (int round = 0; shouldRun(); round++) {
if (skipPreVote || askForVotes(Phase.PRE_VOTE, round)) {
if (askForVotes(Phase.ELECTION, round)) {
@@ -264,10 +383,10 @@ class LeaderElection implements Runnable {
}
final LifeCycle.State state = lifeCycle.getCurrentState();
if (state.isClosingOrClosed()) {
- LOG.info(this + ": since this is already " + state + ", safely ignore
" + e);
+ LOG.info("{}: since this is already {}, safely ignore {}", this,
state, e.toString());
} else {
- if (!server.getInfo().isAlive()) {
- LOG.info(this + ": since the server is not alive, safely ignore " +
e);
+ if (!server.isAlive()) {
+ LOG.info("{}: since the server is not alive, safely ignore {}",
this, e.toString());
} else {
LOG.error("{}: Failed, state={}", this, state, e);
}
@@ -275,18 +394,17 @@ class LeaderElection implements Runnable {
}
} finally {
// Update leader election completion metric(s).
- server.getLeaderElectionMetrics().onNewLeaderElectionCompletion();
+ server.onNewLeaderElectionCompletion();
lifeCycle.checkStateAndClose(() -> {});
}
}
private boolean shouldRun() {
- final DivisionInfo info = server.getInfo();
- return lifeCycle.getCurrentState().isRunning() && info.isCandidate() &&
info.isAlive();
+ return lifeCycle.getCurrentState().isRunning() && server.isCandidate() &&
server.isAlive();
}
private boolean shouldRun(long electionTerm) {
- return shouldRun() && server.getState().getCurrentTerm() == electionTerm;
+ return shouldRun() && server.getCurrentTerm() == electionTerm;
}
private ResultAndTerm submitRequestAndWaitResult(Phase phase,
RaftConfigurationImpl conf, long electionTerm)
@@ -299,7 +417,7 @@ class LeaderElection implements Runnable {
if (others.isEmpty()) {
r = new ResultAndTerm(Result.PASSED, electionTerm);
} else {
- final TermIndex lastEntry = server.getState().getLastEntry();
+ final TermIndex lastEntry = server.getLastEntry();
final Executor voteExecutor = new Executor(this, others.size());
try {
final int submitted = submitRequests(phase, electionTerm, lastEntry,
others, voteExecutor);
@@ -322,8 +440,7 @@ class LeaderElection implements Runnable {
}
// If round0 is non-null, we have already called initElection in the
constructor,
// reuse round0 to avoid initElection again for the first round
- final ConfAndTerm confAndTerm = (round == 0 && round0 != null) ?
- round0 : server.getState().initElection(phase);
+ final ConfAndTerm confAndTerm = (round == 0 && round0 != null) ? round0
: server.initElection(phase);
electionTerm = confAndTerm.getTerm();
conf = confAndTerm.getConf();
}
@@ -343,15 +460,14 @@ class LeaderElection implements Runnable {
return true;
case NOT_IN_CONF:
case SHUTDOWN:
- server.close();
-
server.getStateMachine().event().notifyServerShutdown(server.getRoleInfoProto(),
false);
+ server.shutdown();
return false;
case TIMEOUT:
return false; // should retry
case REJECTED:
case DISCOVERED_A_NEW_TERM:
- final long term = r.maxTerm(server.getState().getCurrentTerm());
- server.changeToFollowerAndPersistMetadata(term, false, r);
+ final long term = r.maxTerm(server.getCurrentTerm());
+ server.rejected(term, r);
return false;
default: throw new IllegalArgumentException("Unable to process result
" + r.result);
}
@@ -364,7 +480,7 @@ class LeaderElection implements Runnable {
for (final RaftPeer peer : others) {
final RequestVoteRequestProto r =
ServerProtoUtils.toRequestVoteRequestProto(
server.getMemberId(), peer.getId(), electionTerm, lastEntry, phase
== Phase.PRE_VOTE);
- voteExecutor.submit(() -> server.getServerRpc().requestVote(r));
+ voteExecutor.submit(() -> server.requestVote(r));
submitted++;
}
return submitted;
@@ -390,6 +506,9 @@ class LeaderElection implements Runnable {
Set<RaftPeerId> higherPriorityPeers = getHigherPriorityPeers(conf);
final boolean singleMode = conf.isSingleMode(server.getId());
+ // true iff this server does not have any commits
+ final boolean emptyCommit = server.getLastCommittedIndex() <
RaftLog.LEAST_VALID_LOG_INDEX;
+
while (waitForNum > 0 && shouldRun(electionTerm)) {
final TimeDuration waitTime = timeout.elapsedTime().apply(n -> -n);
if (waitTime.isNonPositive()) {
@@ -439,7 +558,10 @@ class LeaderElection implements Runnable {
// all higher priority peers have replied
higherPriorityPeers.remove(replierId);
- if (r.getServerReply().getSuccess()) {
+ final boolean acceptVote = r.getServerReply().getSuccess()
+ // When the commits are non-empty, do not accept votes from empty
log voters.
+ && (emptyCommit || nonEmptyLog(r));
+ if (acceptVote) {
votedPeers.add(replierId);
// If majority and all peers with higher priority have voted,
candidate pass vote
if (higherPriorityPeers.isEmpty() && conf.hasMajority(votedPeers,
server.getId())) {
@@ -448,6 +570,7 @@ class LeaderElection implements Runnable {
} else {
rejectedPeers.add(replierId);
if (conf.majorityRejectVotes(rejectedPeers)) {
+ LOG.info("rejectedPeers: {}, emptyCommit? {}", rejectedPeers,
emptyCommit);
return logAndReturn(phase, Result.REJECTED, responses, exceptions);
}
}
@@ -467,6 +590,26 @@ class LeaderElection implements Runnable {
}
}
+ /**
+ * @return true if the given reply indicates that the voter has a non-empty
raft log.
+ * Note that a voter running with an old version may not include the
lastEntry in the reply.
+ * For compatibility, this method returns true for such case.
+ */
+ static boolean nonEmptyLog(RequestVoteReplyProto reply) {
+ final TermIndexProto lastEntry = reply.getLastEntry();
+ // valid term >= 1 and valid index >= 0; therefore, (0, 0) can only be the
proto default
+ if (lastEntry.equals(TermIndexProto.getDefaultInstance())) { // default:
(0,0)
+ LOG.info("Reply missing lastEntry: {} ",
ServerStringUtils.toRequestVoteReplyString(reply));
+ return true; // accept voters with an older version
+ }
+ if (lastEntry.getTerm() > 0) { // when log is empty, lastEntry is (0,-1).
+ return true; // accept voters with a non-empty log
+ }
+
+ LOG.info("Replier log is empty: {} ",
ServerStringUtils.toRequestVoteReplyString(reply));
+ return false; // reject voters with an empty log
+ }
+
@Override
public String toString() {
return name;
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index c7e29c539..043ba1ee7 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -1504,7 +1504,7 @@ class RaftServerImpl implements RaftServer.Division,
shouldShutdown = true;
}
reply = toRequestVoteReplyProto(candidateId, getMemberId(),
- voteGranted, state.getCurrentTerm(), shouldShutdown);
+ voteGranted, state.getCurrentTerm(), shouldShutdown,
state.getLastEntry());
if (LOG.isInfoEnabled()) {
LOG.info("{} replies to {} vote request: {}. Peer's state: {}",
getMemberId(), phase, toRequestVoteReplyString(reply), state);
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
index 29a42f65a..e6a29189a 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
@@ -43,11 +43,13 @@ final class ServerProtoUtils {
}
static RequestVoteReplyProto toRequestVoteReplyProto(
- RaftPeerId requestorId, RaftGroupMemberId replyId, boolean success, long
term, boolean shouldShutdown) {
+ RaftPeerId requestorId, RaftGroupMemberId replyId, boolean success, long
term, boolean shouldShutdown,
+ TermIndex lastEntry) {
return RequestVoteReplyProto.newBuilder()
.setServerReply(toRaftRpcReplyProtoBuilder(requestorId, replyId,
success))
.setTerm(term)
.setShouldShutdown(shouldShutdown)
+ .setLastEntry((lastEntry != null? lastEntry :
TermIndex.INITIAL_VALUE).toProto())
.build();
}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/util/ServerStringUtils.java
b/ratis-server/src/main/java/org/apache/ratis/server/util/ServerStringUtils.java
index 6601eddce..3a5db6285 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/util/ServerStringUtils.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/util/ServerStringUtils.java
@@ -118,7 +118,8 @@ public final class ServerStringUtils {
if (proto == null) {
return null;
}
- return ProtoUtils.toString(proto.getServerReply()) + "-t" +
proto.getTerm();
+ return ProtoUtils.toString(proto.getServerReply()) + "-t" + proto.getTerm()
+ + "-last:" + TermIndex.valueOf(proto.getLastEntry());
}
/**
diff --git
a/ratis-test/src/test/java/org/apache/ratis/server/impl/TestLeaderElectionServerInterface.java
b/ratis-test/src/test/java/org/apache/ratis/server/impl/TestLeaderElectionServerInterface.java
new file mode 100644
index 000000000..876633db1
--- /dev/null
+++
b/ratis-test/src/test/java/org/apache/ratis/server/impl/TestLeaderElectionServerInterface.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.impl;
+
+import org.apache.ratis.BaseTest;
+import org.apache.ratis.metrics.Timekeeper;
+import org.apache.ratis.proto.RaftProtos.RequestVoteReplyProto;
+import org.apache.ratis.proto.RaftProtos.RequestVoteRequestProto;
+import org.apache.ratis.protocol.RaftGroup;
+import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.protocol.RaftGroupMemberId;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.util.TimeDuration;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+public class TestLeaderElectionServerInterface extends BaseTest {
+ private final List<RaftPeer> peers = IntStream.range(0, 3).boxed()
+ .map(i -> RaftPeer.newBuilder().setId("s" + i).build())
+ .collect(Collectors.toList());
+ private final RaftGroup group = RaftGroup.valueOf(RaftGroupId.randomId(),
peers);
+ private final RaftConfigurationImpl conf =
RaftConfigurationImpl.newBuilder().setLogEntryIndex(0).setConf(peers).build();
+ private final ThreadGroup threadGroup = new ThreadGroup("ServerInterface");
+
+ private final RaftGroupMemberId candidate =
RaftGroupMemberId.valueOf(peers.get(0).getId(), group.getGroupId());
+
+ LeaderElection.ServerInterface newServerInterface(boolean expectToPass,
+ Map<RaftPeerId, TermIndex> lastEntries) {
+ return new LeaderElection.ServerInterface() {
+ private volatile boolean isAlive = true;
+
+ @Override
+ public RaftGroupMemberId getMemberId() {
+ return candidate;
+ }
+
+ @Override
+ public boolean isAlive() {
+ return isAlive;
+ }
+
+ @Override
+ public boolean isCandidate() {
+ return true;
+ }
+
+ @Override
+ public long getCurrentTerm() {
+ final TermIndex lastEntry = getLastEntry();
+ return lastEntry != null? lastEntry.getTerm() :
TermIndex.INITIAL_VALUE.getTerm();
+ }
+
+ @Override
+ public long getLastCommittedIndex() {
+ final TermIndex lastEntry = getLastEntry();
+ return lastEntry != null? lastEntry.getIndex() :
TermIndex.INITIAL_VALUE.getIndex();
+ }
+
+ @Override
+ public TermIndex getLastEntry() {
+ return lastEntries.get(getId());
+ }
+
+ @Override
+ public boolean isPreVoteEnabled() {
+ return false;
+ }
+
+ @Override
+ public LeaderElection.ConfAndTerm initElection(LeaderElection.Phase
phase) {
+ return new LeaderElection.ConfAndTerm(conf, getCurrentTerm());
+ }
+
+ @Override
+ public RequestVoteReplyProto requestVote(RequestVoteRequestProto r) {
+ final RaftPeerId voterPeerId =
RaftPeerId.valueOf(r.getServerRequest().getReplyId());
+ final RaftGroupMemberId voter = RaftGroupMemberId.valueOf(voterPeerId,
group.getGroupId());
+ final TermIndex lastEntry = lastEntries.get(voterPeerId);
+ final long term = (lastEntry != null? lastEntry :
TermIndex.INITIAL_VALUE).getTerm();
+
+ // voter replies to candidate
+ return ServerProtoUtils.toRequestVoteReplyProto(getId(), voter, true,
term, false, lastEntry);
+ }
+
+ @Override
+ public void changeToLeader() {
+ assertTrue(expectToPass);
+ isAlive = false;
+ }
+
+ @Override
+ public void rejected(long term, LeaderElection.ResultAndTerm result) {
+ assertFalse(expectToPass);
+ isAlive = false;
+ }
+
+ @Override
+ public void shutdown() {
+ fail();
+ }
+
+ @Override
+ public Timekeeper getLeaderElectionTimer() {
+ final long start = System.nanoTime();
+ final Timekeeper.Context context = () -> System.nanoTime() - start;
+ return () -> context;
+ }
+
+ @Override
+ public void onNewLeaderElectionCompletion() {
+ // no op
+ }
+
+ @Override
+ public TimeDuration getRandomElectionTimeout() {
+ final int millis = 100 + ThreadLocalRandom.current().nextInt(100);
+ return TimeDuration.valueOf(millis, TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public ThreadGroup getThreadGroup() {
+ return threadGroup;
+ }
+ };
+ }
+
+ @Test
+ public void testVoterWithEmptyLog() {
+ // all the candidate and the voters have an empty log
+ // expect to pass: empty-log-candidate will accept votes from
empty-log-voters
+ runTestVoterWithEmptyLog(true);
+
+ // candidate: non-empty commit
+ // voter 1 : empty log
+ // voter 2 : empty log
+ // expect to fail: non-empty-commit-candidate will NOT accept votes from
empty-log-voters
+ final TermIndex candidateLastEntry = TermIndex.valueOf(2, 9);
+ runTestVoterWithEmptyLog(false, candidateLastEntry);
+
+ // candidate: non-empty commit
+ // voter 1 : non-empty log
+ // voter 2 : empty log
+ // expect to pass: non-empty-commit-candidate will accept votes from
non-empty-log-voters
+ final TermIndex voterLastEntry = TermIndex.valueOf(2, 7);
+ runTestVoterWithEmptyLog(true, candidateLastEntry, voterLastEntry);
+
+ // candidate: non-empty log
+ // voter 1 : older version
+ // voter 2 : empty log
+ // expect to pass: non-empty-commit-candidate will accept votes from
older-version-voters
+ runTestVoterWithEmptyLog(true, candidateLastEntry,
TermIndex.PROTO_DEFAULT);
+ }
+
+ void runTestVoterWithEmptyLog(boolean expectToPass, TermIndex...
lastEntries) {
+ LOG.info("expectToPass? {}, lastEntries={}",
+ expectToPass, lastEntries);
+ final Map<RaftPeerId, TermIndex> map = new HashMap<>();
+ for(int i = 0; i < lastEntries.length; i++) {
+ map.put(peers.get(i).getId(), lastEntries[i]);
+ }
+ final LeaderElection election = new
LeaderElection(newServerInterface(expectToPass, map), false);
+ election.startInForeground();
+ }
+
+}
\ No newline at end of file