jsancio commented on code in PR #15859:
URL: https://github.com/apache/kafka/pull/15859#discussion_r1591319255
##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -1700,16 +1710,16 @@ private void handleResponse(RaftResponse.Inbound
response, long currentTimeMs) {
}
/**
- * Validate a request which is only valid between voters. If an error is
- * present in the returned value, it should be returned in the response.
+ * Validate common state for requests to establish leadership.
+ *
+ * These include the Vote, BeginQuorumEpoch rnd EndQuorumEpoch RPCs. If an
error is present in
+ * the returned value, it should be returned in the response.
*/
private Optional<Errors> validateVoterOnlyRequest(int remoteNodeId, int
requestEpoch) {
if (requestEpoch < quorum.epoch()) {
return Optional.of(Errors.FENCED_LEADER_EPOCH);
} else if (remoteNodeId < 0) {
return Optional.of(Errors.INVALID_REQUEST);
- } else if (quorum.isObserver() || !quorum.isVoter(remoteNodeId)) {
- return Optional.of(Errors.INCONSISTENT_VOTER_SET);
Review Comment:
In KIP-853, `INCONSISTEN_VOTER_SET` is deprecated and replicas will not
return this error anymore.
In this case replicas that think they are observer need to be allowed to
vote if the leader thinks they are voters. This can happen if a voter is added
to the set of voters right before an election cycle and the VotersRecord has
been replicated to the new voter.
##########
raft/src/main/java/org/apache/kafka/raft/QuorumState.java:
##########
@@ -112,45 +120,30 @@ public void initialize(OffsetAndEpoch
logEndOffsetAndEpoch) throws IllegalStateE
// when we send Vote or BeginEpoch requests.
ElectionState election;
- try {
- election = store.readElectionState();
- if (election == null) {
- election = ElectionState.withUnknownLeader(0, voters);
- }
- } catch (final UncheckedIOException e) {
- // For exceptions during state file loading (missing or not
readable),
- // we could assume the file is corrupted already and should be
cleaned up.
- log.warn("Clearing local quorum state store after error loading
state {}",
- store, e);
- store.clear();
- election = ElectionState.withUnknownLeader(0, voters);
- }
+ election = store
+ .readElectionState()
+ .orElseGet(() -> ElectionState.withUnknownLeader(0,
latestVoterSet.get().voterIds()));
final EpochState initialState;
- if (!election.voters().isEmpty() && !voters.equals(election.voters()))
{
- throw new IllegalStateException("Configured voter set: " + voters
- + " is different from the voter set read from the state file:
" + election.voters()
- + ". Check if the quorum configuration is up to date, "
- + "or wipe out the local state file if necessary");
- } else if (election.hasVoted() && !isVoter()) {
- String localIdDescription = localId.isPresent() ?
- localId.getAsInt() + " is not a voter" :
- "is undefined";
- throw new IllegalStateException("Initialized quorum state " +
election
- + " with a voted candidate, which indicates this node was
previously "
- + " a voter, but the local id " + localIdDescription);
Review Comment:
In KIP-853, replicas that think they are observer need to be allowed to vote
if the leader thinks they are voters. This can happen if a voter is added to
the set of voters right before an election cycle and the VotersRecord has been
replicated to the new voter.
##########
raft/src/main/java/org/apache/kafka/raft/QuorumState.java:
##########
@@ -336,40 +346,54 @@ public void transitionToUnattached(int epoch) {
*/
public void transitionToVoted(
int epoch,
- int candidateId
+ ReplicaKey candidateKey
) {
- if (localId.isPresent() && candidateId == localId.getAsInt()) {
- throw new IllegalStateException("Cannot transition to Voted with
votedId=" + candidateId +
- " and epoch=" + epoch + " since it matches the local
broker.id");
- } else if (isObserver()) {
- throw new IllegalStateException("Cannot transition to Voted with
votedId=" + candidateId +
- " and epoch=" + epoch + " since the local broker.id=" +
localId + " is not a voter");
- } else if (!isVoter(candidateId)) {
- throw new IllegalStateException("Cannot transition to Voted with
voterId=" + candidateId +
- " and epoch=" + epoch + " since it is not one of the voters "
+ voters);
- }
Review Comment:
In KIP-853, replicas that think they are observer need to be allowed to vote
if the leader thinks they are voters. Similarly, replicas need to be allowed to
vote for replicas even if they think the candidate is not a voter.
In short, only the local set of voters is consider when transitioning to the
candidate state.
##########
raft/src/main/java/org/apache/kafka/raft/QuorumState.java:
##########
@@ -379,16 +403,11 @@ public void transitionToFollower(
int epoch,
int leaderId
) {
+ int currentEpoch = state.epoch();
if (localId.isPresent() && leaderId == localId.getAsInt()) {
throw new IllegalStateException("Cannot transition to Follower
with leaderId=" + leaderId +
" and epoch=" + epoch + " since it matches the local
broker.id=" + localId);
- } else if (!isVoter(leaderId)) {
- throw new IllegalStateException("Cannot transition to Follower
with leaderId=" + leaderId +
- " and epoch=" + epoch + " since it is not one of the voters "
+ voters);
- }
Review Comment:
In KIP-853, the leader may not be in the local set of voters. This can
happen if there is an election right after a replica has been added to the set
of voters and that state has not been replicated to all of the replicas.
##########
raft/src/main/java/org/apache/kafka/raft/QuorumState.java:
##########
@@ -112,45 +120,30 @@ public void initialize(OffsetAndEpoch
logEndOffsetAndEpoch) throws IllegalStateE
// when we send Vote or BeginEpoch requests.
ElectionState election;
- try {
- election = store.readElectionState();
- if (election == null) {
- election = ElectionState.withUnknownLeader(0, voters);
- }
- } catch (final UncheckedIOException e) {
- // For exceptions during state file loading (missing or not
readable),
- // we could assume the file is corrupted already and should be
cleaned up.
- log.warn("Clearing local quorum state store after error loading
state {}",
- store, e);
- store.clear();
- election = ElectionState.withUnknownLeader(0, voters);
- }
+ election = store
+ .readElectionState()
+ .orElseGet(() -> ElectionState.withUnknownLeader(0,
latestVoterSet.get().voterIds()));
final EpochState initialState;
- if (!election.voters().isEmpty() && !voters.equals(election.voters()))
{
- throw new IllegalStateException("Configured voter set: " + voters
- + " is different from the voter set read from the state file:
" + election.voters()
- + ". Check if the quorum configuration is up to date, "
- + "or wipe out the local state file if necessary");
Review Comment:
In KIP-853, this check is not useful since the set of voters can change and
it is stored in the partition log segments and snapshot.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]