kevin-wu24 commented on code in PR #19982:
URL: https://github.com/apache/kafka/pull/19982#discussion_r2201295733
##########
raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java:
##########
@@ -374,26 +428,105 @@ public void testAddVoter() throws Exception {
apiVersionRequest.destination(),
apiVersionsResponse(Errors.NONE)
);
+ }
- // Handle the API_VERSIONS response
- context.client.poll();
- // Append new VotersRecord to log
- context.client.poll();
+ private void commitNewVoterSetForAddVoter(
+ RaftClientTestContext context,
+ ReplicaKey leader,
+ ReplicaKey follower,
+ ReplicaKey newVoter,
+ int epoch
+ ) throws Exception {
// The new voter is now a voter after writing the VotersRecord to the
log
assertTrue(context.client.quorum().isVoter(newVoter));
checkLeaderMetricValues(3, 0, 1, context);
// Send a FETCH to increase the HWM and commit the new voter set
context.deliverRequest(
- context.fetchRequest(epoch, follower,
context.log.endOffset().offset(), epoch, 0)
+ context.fetchRequest(
+ epoch,
+ follower,
+ context.log.endOffset().offset(),
+ epoch,
+ 0
+ )
);
context.pollUntilResponse();
- context.assertSentFetchPartitionResponse(Errors.NONE, epoch,
OptionalInt.of(local.id()));
+ context.assertSentFetchPartitionResponse(Errors.NONE, epoch,
OptionalInt.of(leader.id()));
checkLeaderMetricValues(3, 0, 0, context);
+ }
- // Expect reply for AddVoter request
+ @ParameterizedTest
+ @EnumSource(value = RaftProtocol.class, names = {
+ "KIP_853_PROTOCOL",
+ "KIP_996_PROTOCOL",
+ "KIP_1166_PROTOCOL"
+ })
+ void testAddVoterAckWhenCommittedUnsupported(RaftProtocol protocol) throws
Exception {
+ ReplicaKey local = replicaKey(randomReplicaId(), true);
+ ReplicaKey follower = replicaKey(local.id() + 1, true);
+
+ VoterSet voters = VoterSetTest.voterSet(Stream.of(local, follower));
+
+ RaftClientTestContext context = new
RaftClientTestContext.Builder(local.id(), local.directoryId().get())
+ .withRaftProtocol(protocol)
+ .withBootstrapSnapshot(Optional.of(voters))
+ .withUnknownLeader(3)
+ .build();
+
+ context.unattachedToLeader();
+ int epoch = context.currentEpoch();
+
+ ReplicaKey newVoter = replicaKey(local.id() + 2, true);
+ InetSocketAddress newAddress = InetSocketAddress.createUnresolved(
+ "localhost",
+ 9990 + newVoter.id()
+ );
+ Endpoints newListeners = Endpoints.fromInetSocketAddresses(
+ Map.of(context.channel.listenerName(), newAddress)
+ );
+
+ prepareLeaderToReceiveAddVoter(context, epoch, local, follower,
newVoter);
+
+ // Attempt to add new voter to the quorum
+ assertThrows(
+ UnsupportedVersionException.class,
+ () -> context.deliverRequest(
+ context.addVoterRequest(
+ Integer.MAX_VALUE,
+ newVoter,
+ newListeners
+ ).setAckWhenCommitted(false)
+ )
+ );
Review Comment:
> This is not testing KafkaRaftClient. This is testing the serialization
code. It is testing that serializing a false "ack when committed" throws an
exception when the target version is 0.
We want to test the serialization code behaves as expected right? How else
would we know that we are throwing an exception that prevents us from sending
AddVoter v1 to a node that only supports v0?
> If you want to test this case for KafkaRaftClient, you need to make the
local replica a non-voter that can auto-join and show that AddVoterRequest
always sets "ack when committed" to false.
I was planning to add this as a check in all the
`KafkaRaftClientAutoJoinTest` tests in the other PR after this PR is merged,
since all of those tests have the local replica as a non-voter. Specifically,
we add another check in `assertSentAddVoterRequest` that `AckWhenCommitted ==
false` for any auto-joining replica.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]