jsancio commented on code in PR #19854:
URL: https://github.com/apache/kafka/pull/19854#discussion_r2121590408
##########
raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java:
##########
@@ -2145,7 +2186,20 @@ public void
testObserverSendDiscoveryFetchAfterFetchTimeout(boolean withKip853Rp
fetchRequest = context.assertSentFetchRequest();
assertNotEquals(leaderId, fetchRequest.destination().id());
assertTrue(context.bootstrapIds.contains(fetchRequest.destination().id()));
+
assertTrue(context.client.quorum().followerStateOrThrow().hasFetchTimeoutExpired(context.time.milliseconds()));
context.assertFetchRequestData(fetchRequest, epoch, 0L, 0);
+
+ context.deliverResponse(
+ fetchRequest.correlationId(),
+ fetchRequest.destination(),
+ context.fetchResponse(epoch, leaderId, MemoryRecords.EMPTY, 0L,
Errors.NOT_LEADER_OR_FOLLOWER)
+ );
+
+ context.client.poll();
+ context.pollUntilRequest();
+ fetchRequest = context.assertSentFetchRequest();
+ assertEquals(leaderId, fetchRequest.destination().id());
+
assertFalse(context.client.quorum().followerStateOrThrow().hasFetchTimeoutExpired(context.time.milliseconds()));
Review Comment:
Let's move this line. This suite should test the protocol and not the
internal implementation state.
##########
raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java:
##########
@@ -2145,7 +2186,20 @@ public void
testObserverSendDiscoveryFetchAfterFetchTimeout(boolean withKip853Rp
fetchRequest = context.assertSentFetchRequest();
assertNotEquals(leaderId, fetchRequest.destination().id());
assertTrue(context.bootstrapIds.contains(fetchRequest.destination().id()));
+
assertTrue(context.client.quorum().followerStateOrThrow().hasFetchTimeoutExpired(context.time.milliseconds()));
Review Comment:
Let's not test internal kraft state. This suite should check the protocol.
##########
raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java:
##########
@@ -2033,6 +2033,47 @@ public void
testUnattachedAsVoterCanBecomeFollowerAfterFindingLeader(boolean wit
assertTrue(context.client.quorum().isFollower());
}
+ @ParameterizedTest
+ @ValueSource(booleans = { true, false })
+ public void
testUnattachedWithLeaderCanBecomeFollowerAfterFindingLeader(boolean
withKip853Rpc) throws Exception {
+ int localId = randomReplicaId();
+ int otherNodeId = localId + 1;
+ int leaderNodeId = localId + 2;
+ int epoch = 5;
+ Set<Integer> voters = Set.of(localId, otherNodeId, leaderNodeId);
+ List<InetSocketAddress> bootstrapServers = voters
+ .stream()
+ .map(RaftClientTestContext::mockAddress)
+ .toList();
+
+ RaftClientTestContext context = new
RaftClientTestContext.Builder(localId, voters)
+ .withBootstrapServers(Optional.of(bootstrapServers))
+ .withElectedLeader(epoch, leaderNodeId)
+ .withKip853Rpc(withKip853Rpc)
+ .build();
+
+ // after fetch timeout, node will become prospective with leader
+ context.time.sleep(context.fetchTimeoutMs);
+ context.client.poll();
+ assertTrue(context.client.quorum().isProspective());
+ assertEquals(leaderNodeId,
context.client.quorum().leaderId().getAsInt());
Review Comment:
You can use `RaftClientTestContext#assertElectedLeader(int, int)`.
##########
raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java:
##########
@@ -2033,6 +2033,47 @@ public void
testUnattachedAsVoterCanBecomeFollowerAfterFindingLeader(boolean wit
assertTrue(context.client.quorum().isFollower());
}
+ @ParameterizedTest
+ @ValueSource(booleans = { true, false })
+ public void
testUnattachedWithLeaderCanBecomeFollowerAfterFindingLeader(boolean
withKip853Rpc) throws Exception {
+ int localId = randomReplicaId();
+ int otherNodeId = localId + 1;
+ int leaderNodeId = localId + 2;
+ int epoch = 5;
+ Set<Integer> voters = Set.of(localId, otherNodeId, leaderNodeId);
+ List<InetSocketAddress> bootstrapServers = voters
+ .stream()
+ .map(RaftClientTestContext::mockAddress)
+ .toList();
+
+ RaftClientTestContext context = new
RaftClientTestContext.Builder(localId, voters)
+ .withBootstrapServers(Optional.of(bootstrapServers))
+ .withElectedLeader(epoch, leaderNodeId)
+ .withKip853Rpc(withKip853Rpc)
+ .build();
+
+ // after fetch timeout, node will become prospective with leader
+ context.time.sleep(context.fetchTimeoutMs);
+ context.client.poll();
+ assertTrue(context.client.quorum().isProspective());
+ assertEquals(leaderNodeId,
context.client.quorum().leaderId().getAsInt());
+ assertEquals(epoch, context.currentEpoch());
+
+ // after election loss node will become follower because it had a last
known leader
+ context.time.sleep(context.electionTimeoutMs() * 2L);
+ context.client.poll();
+ assertTrue(context.client.quorum().isFollower());
+ assertEquals(leaderNodeId,
context.client.quorum().leaderId().getAsInt());
+ assertEquals(epoch, context.currentEpoch());
+
+ // node will send fetch request to leader
+ context.pollUntilRequest();
Review Comment:
Why not `pollUntilRequest` and check that the destination is the leader and
the epoch is `epoch`?
We should try to avoid testing internal KRaft state in protocol suites like
`KafkaRaftClientTest`.
##########
raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java:
##########
@@ -2033,6 +2033,47 @@ public void
testUnattachedAsVoterCanBecomeFollowerAfterFindingLeader(boolean wit
assertTrue(context.client.quorum().isFollower());
}
+ @ParameterizedTest
+ @ValueSource(booleans = { true, false })
+ public void
testUnattachedWithLeaderCanBecomeFollowerAfterFindingLeader(boolean
withKip853Rpc) throws Exception {
+ int localId = randomReplicaId();
+ int otherNodeId = localId + 1;
+ int leaderNodeId = localId + 2;
+ int epoch = 5;
+ Set<Integer> voters = Set.of(localId, otherNodeId, leaderNodeId);
+ List<InetSocketAddress> bootstrapServers = voters
+ .stream()
+ .map(RaftClientTestContext::mockAddress)
+ .toList();
+
+ RaftClientTestContext context = new
RaftClientTestContext.Builder(localId, voters)
+ .withBootstrapServers(Optional.of(bootstrapServers))
+ .withElectedLeader(epoch, leaderNodeId)
+ .withKip853Rpc(withKip853Rpc)
+ .build();
+
+ // after fetch timeout, node will become prospective with leader
+ context.time.sleep(context.fetchTimeoutMs);
+ context.client.poll();
+ assertTrue(context.client.quorum().isProspective());
Review Comment:
Same comment here. The test should instead expect VOTE requests with
pre-vote set to true.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]