jsancio commented on code in PR #19982:
URL: https://github.com/apache/kafka/pull/19982#discussion_r2201032910
##########
raft/src/main/java/org/apache/kafka/raft/internals/AddVoterHandler.java:
##########
@@ -184,7 +185,8 @@ public CompletableFuture<AddRaftVoterResponseData>
handleAddVoterRequest(
AddVoterHandlerState state = new AddVoterHandlerState(
voterKey,
voterEndpoints,
- time.timer(timeout.getAsLong())
+ time.timer(timeout.getAsLong()),
+ ackWhenCommitted
Review Comment:
Similar comment here. Let's swap the order of the timer and the "ack when
committed" boolean.
##########
raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java:
##########
@@ -374,26 +428,105 @@ public void testAddVoter() throws Exception {
apiVersionRequest.destination(),
apiVersionsResponse(Errors.NONE)
);
+ }
- // Handle the API_VERSIONS response
- context.client.poll();
- // Append new VotersRecord to log
- context.client.poll();
+ private void commitNewVoterSetForAddVoter(
+ RaftClientTestContext context,
+ ReplicaKey leader,
+ ReplicaKey follower,
+ ReplicaKey newVoter,
+ int epoch
+ ) throws Exception {
// The new voter is now a voter after writing the VotersRecord to the
log
assertTrue(context.client.quorum().isVoter(newVoter));
checkLeaderMetricValues(3, 0, 1, context);
// Send a FETCH to increase the HWM and commit the new voter set
context.deliverRequest(
- context.fetchRequest(epoch, follower,
context.log.endOffset().offset(), epoch, 0)
+ context.fetchRequest(
+ epoch,
+ follower,
+ context.log.endOffset().offset(),
+ epoch,
+ 0
+ )
);
context.pollUntilResponse();
- context.assertSentFetchPartitionResponse(Errors.NONE, epoch,
OptionalInt.of(local.id()));
+ context.assertSentFetchPartitionResponse(Errors.NONE, epoch,
OptionalInt.of(leader.id()));
checkLeaderMetricValues(3, 0, 0, context);
+ }
- // Expect reply for AddVoter request
+ @ParameterizedTest
+ @EnumSource(value = RaftProtocol.class, names = {
+ "KIP_853_PROTOCOL",
+ "KIP_996_PROTOCOL",
+ "KIP_1166_PROTOCOL"
+ })
+ void testAddVoterAckWhenCommittedUnsupported(RaftProtocol protocol) throws
Exception {
+ ReplicaKey local = replicaKey(randomReplicaId(), true);
+ ReplicaKey follower = replicaKey(local.id() + 1, true);
+
+ VoterSet voters = VoterSetTest.voterSet(Stream.of(local, follower));
+
+ RaftClientTestContext context = new
RaftClientTestContext.Builder(local.id(), local.directoryId().get())
+ .withRaftProtocol(protocol)
+ .withBootstrapSnapshot(Optional.of(voters))
+ .withUnknownLeader(3)
+ .build();
+
+ context.unattachedToLeader();
+ int epoch = context.currentEpoch();
+
+ ReplicaKey newVoter = replicaKey(local.id() + 2, true);
+ InetSocketAddress newAddress = InetSocketAddress.createUnresolved(
+ "localhost",
+ 9990 + newVoter.id()
+ );
+ Endpoints newListeners = Endpoints.fromInetSocketAddresses(
+ Map.of(context.channel.listenerName(), newAddress)
+ );
+
+ prepareLeaderToReceiveAddVoter(context, epoch, local, follower,
newVoter);
+
+ // Attempt to add new voter to the quorum
+ assertThrows(
+ UnsupportedVersionException.class,
+ () -> context.deliverRequest(
+ context.addVoterRequest(
+ Integer.MAX_VALUE,
+ newVoter,
+ newListeners
+ ).setAckWhenCommitted(false)
+ )
+ );
+ }
+
+ // This method sets up the context so a test can send an AddVoter request
after
+ // exiting this method
+ private void prepareLeaderToReceiveAddVoter(
+ RaftClientTestContext context,
+ int epoch,
+ ReplicaKey leader,
+ ReplicaKey follower,
Review Comment:
Interesting in all of the reconfig tests we always go from 2 voters to 3
voters? Maybe we can improve this in a future PR and make this parametrized.
##########
raft/src/main/java/org/apache/kafka/raft/internals/AddVoterHandler.java:
##########
@@ -321,7 +323,16 @@ public boolean handleApiVersionsResponse(
)
);
current.setLastOffset(leaderState.appendVotersRecord(newVoters,
currentTimeMs));
-
+ if (!current.ackWhenCommitted()) {
+ // complete the future to send response, but do not reset the
state,
+ // since the new voter set is not yet committed
+ current.future().complete(
+ RaftUtil.addVoterResponse(
+ Errors.NONE,
+ null
+ )
+ );
Review Comment:
Feel free to join some of these lines if they are less than about 100
characters. E.g.:
```java
current.future().complete(
RaftUtil.addVoterResponse(Errors.NONE, null)
);
```
##########
raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java:
##########
@@ -374,26 +428,105 @@ public void testAddVoter() throws Exception {
apiVersionRequest.destination(),
apiVersionsResponse(Errors.NONE)
);
+ }
- // Handle the API_VERSIONS response
- context.client.poll();
- // Append new VotersRecord to log
- context.client.poll();
+ private void commitNewVoterSetForAddVoter(
+ RaftClientTestContext context,
+ ReplicaKey leader,
+ ReplicaKey follower,
+ ReplicaKey newVoter,
+ int epoch
+ ) throws Exception {
// The new voter is now a voter after writing the VotersRecord to the
log
assertTrue(context.client.quorum().isVoter(newVoter));
checkLeaderMetricValues(3, 0, 1, context);
// Send a FETCH to increase the HWM and commit the new voter set
context.deliverRequest(
- context.fetchRequest(epoch, follower,
context.log.endOffset().offset(), epoch, 0)
+ context.fetchRequest(
+ epoch,
+ follower,
+ context.log.endOffset().offset(),
+ epoch,
+ 0
+ )
);
context.pollUntilResponse();
- context.assertSentFetchPartitionResponse(Errors.NONE, epoch,
OptionalInt.of(local.id()));
+ context.assertSentFetchPartitionResponse(Errors.NONE, epoch,
OptionalInt.of(leader.id()));
checkLeaderMetricValues(3, 0, 0, context);
+ }
- // Expect reply for AddVoter request
+ @ParameterizedTest
+ @EnumSource(value = RaftProtocol.class, names = {
+ "KIP_853_PROTOCOL",
+ "KIP_996_PROTOCOL",
+ "KIP_1166_PROTOCOL"
+ })
+ void testAddVoterAckWhenCommittedUnsupported(RaftProtocol protocol) throws
Exception {
+ ReplicaKey local = replicaKey(randomReplicaId(), true);
+ ReplicaKey follower = replicaKey(local.id() + 1, true);
+
+ VoterSet voters = VoterSetTest.voterSet(Stream.of(local, follower));
+
+ RaftClientTestContext context = new
RaftClientTestContext.Builder(local.id(), local.directoryId().get())
+ .withRaftProtocol(protocol)
+ .withBootstrapSnapshot(Optional.of(voters))
+ .withUnknownLeader(3)
+ .build();
+
+ context.unattachedToLeader();
+ int epoch = context.currentEpoch();
+
+ ReplicaKey newVoter = replicaKey(local.id() + 2, true);
+ InetSocketAddress newAddress = InetSocketAddress.createUnresolved(
+ "localhost",
+ 9990 + newVoter.id()
+ );
+ Endpoints newListeners = Endpoints.fromInetSocketAddresses(
+ Map.of(context.channel.listenerName(), newAddress)
+ );
+
+ prepareLeaderToReceiveAddVoter(context, epoch, local, follower,
newVoter);
+
+ // Attempt to add new voter to the quorum
+ assertThrows(
+ UnsupportedVersionException.class,
+ () -> context.deliverRequest(
+ context.addVoterRequest(
+ Integer.MAX_VALUE,
+ newVoter,
+ newListeners
+ ).setAckWhenCommitted(false)
+ )
+ );
Review Comment:
This is not testing `KafkaRaftClient`. This is testing the serialization
code. It is testing that serializing a false "ack when committed" throws an
exception when the target version is 0.
If you want to test this case for KafkaRaftClient, you need to make the
local replica a non-voter that can auto-join and show that AddVoterRequest
always sets "ack when committed" to false.
##########
raft/src/main/java/org/apache/kafka/raft/internals/AddVoterHandlerState.java:
##########
@@ -29,18 +29,21 @@ public final class AddVoterHandlerState {
private final ReplicaKey voterKey;
private final Endpoints voterEndpoints;
private final Timer timeout;
+ private final boolean ackWhenCommitted;
Review Comment:
Minor but let's swap this order here and in the constructor.
##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -2263,7 +2263,8 @@ private CompletableFuture<AddRaftVoterResponseData>
handleAddVoterRequest(
quorum.leaderStateOrThrow(),
newVoter.get(),
newVoterEndpoints,
- currentTimeMs
+ currentTimeMs,
+ data.ackWhenCommitted()
Review Comment:
Let's swap the order of `currentTimeMs` and `ackWhenCommitted`. "Ack when
committed" is more important and raft tends to pass the current time as the
last parameter.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]