jsancio commented on code in PR #19589:
URL: https://github.com/apache/kafka/pull/19589#discussion_r2070577759
##########
raft/src/main/java/org/apache/kafka/raft/QuorumConfig.java:
##########
@@ -99,6 +100,13 @@ public class QuorumConfig {
public static final String QUORUM_RETRY_BACKOFF_MS_DOC =
CommonClientConfigs.RETRY_BACKOFF_MS_DOC;
public static final int DEFAULT_QUORUM_RETRY_BACKOFF_MS = 20;
+ public static final String QUORUM_AUTO_JOIN_ENABLE = QUORUM_PREFIX +
"auto.join.enable";
+ public static final String QUORUM_AUTO_JOIN_ENABLE_DOC = "If set to true,
controllers will remove the entry " +
+ "in the voters set that matches its replica id but does not match its
directory id if it exists by sending " +
+ "the RemoveVoter RPC. When no old entry for the controller exists in
the voter set, it will then add itself " +
+ "by sending a AddVoter RPC to the leader.";
Review Comment:
This documentation will be read by end users. They are not and don't need to
understand the algorithm. We could just use the paragraph from the KIP:
> Controls whether a KRaft controller should automatically join the cluster
metadata partition for its cluster id. If the configuration is set to true the
controller must be stopped before removing the controller with
kafka-metadata-quorum remove-controller.
##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -3274,6 +3320,37 @@ private long pollFollowerAsVoter(FollowerState state,
long currentTimeMs) {
private long pollFollowerAsObserver(FollowerState state, long
currentTimeMs) {
if (state.hasFetchTimeoutExpired(currentTimeMs)) {
return maybeSendFetchToAnyBootstrap(currentTimeMs);
+ } else if (partitionState.lastKraftVersion().isReconfigSupported() &&
followersAlwaysFlush &&
+ quorumConfig.autoJoinEnable() &&
state.hasAddRemoveVoterPeriodExpired(currentTimeMs)) {
+ var voters = partitionState.lastVoterSet();
+ var localReplicaKey = quorum.localReplicaKeyOrThrow();
+ final boolean resetAddRemoveVoterTimer;
+ final long backoffMs;
+
+ Optional<ReplicaKey> oldVoter =
voters.getOldVoterForReplicaKey(localReplicaKey);
Review Comment:
How about:
```java
var voterIds = voters.voterIds();
if (voterIds.contains(localReplicaKey.id()) {
/* Replica id is in the voter set but replica is not voter. Remove old
voter.
* Local replica is not in the voter set because the replica is getting
as an observer.
*/
...
} else {
// Replica id is not in the voter set. Add local replica
...
}
```
##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -3328,6 +3405,23 @@ private UpdateRaftVoterRequestData
buildUpdateVoterRequest() {
);
}
+ private AddRaftVoterRequestData buildAddVoterRequest() {
+ return RaftUtil.addVoterRequest(
+ clusterId,
+ // TODO: What to set the AddVoterRequest timeout to?
+ 10000,
Review Comment:
Make it the request timeout.
##########
raft/src/main/java/org/apache/kafka/raft/QuorumConfig.java:
##########
@@ -161,6 +172,10 @@ public int appendLingerMs() {
return appendLingerMs;
}
+ public boolean autoJoinEnable() {
+ return autoJoinEnable;
+ }
Review Comment:
This could just be `autoJoin`. For both the method name and the field name.
##########
raft/src/main/java/org/apache/kafka/raft/FollowerState.java:
##########
@@ -42,6 +42,8 @@ public class FollowerState implements EpochState {
private final Timer fetchTimer;
// Used to track when to send another update voter request
private final Timer updateVoterPeriodTimer;
+ // Used to track when to send another add or remove voter request
+ private final Timer addRemoveVoterPeriodTimer;
Review Comment:
Is there a reason why you added a new timer? It currently has the same
period as update and both sets of RPCs are mutually exclusive. Following voters
would send update voter in a period while following observers would send add or
remove in a period.
##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -3274,6 +3320,37 @@ private long pollFollowerAsVoter(FollowerState state,
long currentTimeMs) {
private long pollFollowerAsObserver(FollowerState state, long
currentTimeMs) {
if (state.hasFetchTimeoutExpired(currentTimeMs)) {
return maybeSendFetchToAnyBootstrap(currentTimeMs);
+ } else if (partitionState.lastKraftVersion().isReconfigSupported() &&
followersAlwaysFlush &&
+ quorumConfig.autoJoinEnable() &&
state.hasAddRemoveVoterPeriodExpired(currentTimeMs)) {
Review Comment:
Okay. I think we should document why we require both `followersAlwaysFlush`
and `autoJoinEnable` to be true.
##########
raft/src/main/java/org/apache/kafka/raft/VoterSet.java:
##########
@@ -337,6 +337,33 @@ public boolean supportsVersion(KRaftVersion version) {
.allMatch(voter -> voter.supportsVersion(version));
}
+ /**
+ * Gets the old voter's replica key for a given replica key if it exists.
+ * An old voter is a voter that has the same replica id as the given
replica key, but a different directory id.
+ *
+ * @param replicaKey the replica key to check against
+ * @return the replica key with the same id but a different directory id,
if present, Optional.empty() otherwise
+ */
+ public Optional<ReplicaKey> getOldVoterForReplicaKey(ReplicaKey
replicaKey) {
+ return voters
+ .values()
+ .stream()
+ .map(VoterNode::voterKey)
+ .filter(voter -> voter.id() == replicaKey.id() &&
!voter.directoryId().equals(replicaKey.directoryId()))
+ .findFirst();
+ }
+
+ /**
+ * @param replicaKey the replica key to check against
+ * @return true if the voter set does not contain the given replica id,
false otherwise
+ */
+ public boolean doesNotContainReplicaId(ReplicaKey replicaKey) {
+ return voters
+ .values()
+ .stream()
+ .noneMatch(voter -> voter.voterKey().id() == replicaKey.id());
+ }
Review Comment:
I think we can remove this methods if you agree with my other comment.
##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -2618,6 +2656,14 @@ private void handleResponse(RaftResponse.Inbound
response, long currentTimeMs) {
handledSuccessfully = handleUpdateVoterResponse(response,
currentTimeMs);
break;
+ case ADD_RAFT_VOTER:
+ handledSuccessfully = handleAddVoterResponse(response,
currentTimeMs);
+ break;
+
+ case REMOVE_RAFT_VOTER:
+ handledSuccessfully = handleRemoveVoterResponse(response,
currentTimeMs);
+ break;
Review Comment:
Since now kraft can send these RPCs, we need to add a case for them in
`RaftUtil#errorResponse`.
##########
raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java:
##########
@@ -375,6 +377,11 @@ Builder withLocalListeners(Endpoints localListeners) {
return this;
}
+ Builder withAutoJoinEnabled(boolean autoJoinEnabled) {
+ this.autoJoinEnabled = autoJoinEnabled;
+ return this;
+ }
Review Comment:
This could be `withAutoJoin` for the method name and `autoJoin` for the
field name.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]