lianetm commented on code in PR #16694:
URL: https://github.com/apache/kafka/pull/16694#discussion_r1722269401
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##########
@@ -746,6 +746,37 @@ public void
testSendingLeaveGroupHeartbeatWhenPreviousOneInFlight(final short ve
NetworkClientDelegate.PollResult pollAgain =
heartbeatRequestManager.poll(time.milliseconds());
assertEquals(0, pollAgain.unsentRequests.size());
}
+
+ @ParameterizedTest
+ @ApiKeyVersionsSource(apiKey = ApiKeys.CONSUMER_GROUP_HEARTBEAT)
+ public void
testConsumerAcksReconciledAssignmentWithFirstHeartBeatAckLost(final short
version) {
Review Comment:
nit: would this be simpler/clearer maybe?
testConsumerAcksReconciledAssignmentAfterAckLost
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##########
@@ -746,6 +746,37 @@ public void
testSendingLeaveGroupHeartbeatWhenPreviousOneInFlight(final short ve
NetworkClientDelegate.PollResult pollAgain =
heartbeatRequestManager.poll(time.milliseconds());
assertEquals(0, pollAgain.unsentRequests.size());
}
+
+ @ParameterizedTest
+ @ApiKeyVersionsSource(apiKey = ApiKeys.CONSUMER_GROUP_HEARTBEAT)
+ public void
testConsumerAcksReconciledAssignmentWithFirstHeartBeatAckLost(final short
version) {
+ // 1. complete reconciliation
+ createHeartbeatStatAndRequestManager();
+ String topic = "topic1";
+ int exceededTimeMs = 5;
+ Set<String> set = Collections.singleton(topic);
+ when(subscriptions.subscription()).thenReturn(set);
+ subscriptions.subscribe(set, Optional.empty());
+ mockReconcilingMemberData();
+ // 2. send heartbeat1 to ack assignment tp0
+ time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS);
+ NetworkClientDelegate.PollResult result =
heartbeatRequestManager.poll(time.milliseconds());
+ // 3. HB1 times out
+ result.unsentRequests.get(0)
+ .handler()
+ .onFailure(time.milliseconds(), new
TimeoutException("timeout"));
+ // 4. heartbeat request manager resets the sentFields to null
HeartbeatState.reset()
+ time.sleep(DEFAULT_MAX_POLL_INTERVAL_MS + exceededTimeMs);
+ assertHeartbeat(heartbeatRequestManager,
DEFAULT_HEARTBEAT_INTERVAL_MS);
+ verify(heartbeatRequestState).reset();
+ // 5. following HB will include tp0 (and act as ack), tp0 != null
+ result = heartbeatRequestManager.poll(time.milliseconds());
+ NetworkClientDelegate.UnsentRequest request =
result.unsentRequests.get(0);
+ ConsumerGroupHeartbeatRequest heartbeatRequest =
+ (ConsumerGroupHeartbeatRequest)
request.requestBuilder().build(version);
+
+ assertEquals(Collections.singletonList(topic),
heartbeatRequest.data().subscribedTopicNames());
Review Comment:
In this situation (ack lost), we expect that the member should resend the
partitions, not only the topic names. So should we assert it does? (we probably
need to pass the partitions into the `mockReconcilingMemberData`, to be
returned in the currentAssignment on ln 919, and then assert that the same
partitions are indeed in the `heartbeatRequest.data().topicPartitions()`
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##########
@@ -746,6 +746,37 @@ public void
testSendingLeaveGroupHeartbeatWhenPreviousOneInFlight(final short ve
NetworkClientDelegate.PollResult pollAgain =
heartbeatRequestManager.poll(time.milliseconds());
assertEquals(0, pollAgain.unsentRequests.size());
}
+
+ @ParameterizedTest
+ @ApiKeyVersionsSource(apiKey = ApiKeys.CONSUMER_GROUP_HEARTBEAT)
+ public void
testConsumerAcksReconciledAssignmentWithFirstHeartBeatAckLost(final short
version) {
+ // 1. complete reconciliation
+ createHeartbeatStatAndRequestManager();
Review Comment:
since we're here, let's please fix the typo in
createHeartbeat**State**AndRequestManager
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##########
@@ -746,6 +746,37 @@ public void
testSendingLeaveGroupHeartbeatWhenPreviousOneInFlight(final short ve
NetworkClientDelegate.PollResult pollAgain =
heartbeatRequestManager.poll(time.milliseconds());
assertEquals(0, pollAgain.unsentRequests.size());
}
+
+ @ParameterizedTest
+ @ApiKeyVersionsSource(apiKey = ApiKeys.CONSUMER_GROUP_HEARTBEAT)
+ public void
testConsumerAcksReconciledAssignmentWithFirstHeartBeatAckLost(final short
version) {
+ // 1. complete reconciliation
+ createHeartbeatStatAndRequestManager();
+ String topic = "topic1";
+ int exceededTimeMs = 5;
+ Set<String> set = Collections.singleton(topic);
+ when(subscriptions.subscription()).thenReturn(set);
+ subscriptions.subscribe(set, Optional.empty());
+ mockReconcilingMemberData();
+ // 2. send heartbeat1 to ack assignment tp0
+ time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS);
+ NetworkClientDelegate.PollResult result =
heartbeatRequestManager.poll(time.milliseconds());
+ // 3. HB1 times out
+ result.unsentRequests.get(0)
+ .handler()
+ .onFailure(time.milliseconds(), new
TimeoutException("timeout"));
+ // 4. heartbeat request manager resets the sentFields to null
HeartbeatState.reset()
+ time.sleep(DEFAULT_MAX_POLL_INTERVAL_MS + exceededTimeMs);
Review Comment:
I would expect that just sleeping the interval would be enough, so we could
maybe simplify the test and remove the `exceededTimeMs` from here and the var
itself?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]