bbejeck commented on code in PR #19181:
URL: https://github.com/apache/kafka/pull/19181#discussion_r1994378730
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java:
##########
@@ -601,18 +560,383 @@ public void
testSendingLeaveHeartbeatRequestWhenPollTimerExpired() {
StreamsGroupHeartbeatRequest streamsRequest =
(StreamsGroupHeartbeatRequest) networkRequest.requestBuilder().build();
assertEquals(GROUP_ID, streamsRequest.data().groupId());
assertEquals(MEMBER_ID, streamsRequest.data().memberId());
- assertEquals(LEAVE_GROUP_MEMBER_EPOCH,
streamsRequest.data().memberEpoch());
+ assertEquals(MEMBER_EPOCH, streamsRequest.data().memberEpoch());
assertEquals(INSTANCE_ID, streamsRequest.data().instanceId());
verify(heartbeatRequestState).onSendAttempt(time.milliseconds());
verify(membershipManager).onHeartbeatRequestGenerated();
+ time.sleep(2000);
+ assertEquals(
+ 2.0,
+
metrics.metric(metrics.metricName("last-heartbeat-seconds-ago",
"consumer-coordinator-metrics")).metricValue()
+ );
final ClientResponse response = buildClientResponse();
networkRequest.future().complete(response);
- verify(heartbeatRequestState,
never()).updateHeartbeatIntervalMs(anyLong());
- verify(heartbeatRequestState,
never()).onSuccessfulAttempt(anyLong());
- verify(membershipManager, never()).onHeartbeatSuccess(any());
+
verify(membershipManager).onHeartbeatSuccess((StreamsGroupHeartbeatResponse)
response.responseBody());
+
verify(heartbeatRequestState).updateHeartbeatIntervalMs(RECEIVED_HEARTBEAT_INTERVAL_MS);
+
verify(heartbeatRequestState).onSuccessfulAttempt(networkRequest.handler().completionTimeMs());
+ verify(heartbeatRequestState).resetTimer();
+ final List<TopicPartition> topicPartitions =
streamsRebalanceData.partitionsByHost()
+ .get(new StreamsRebalanceData.HostInfo(
+ ENDPOINT_TO_PARTITIONS.get(0).userEndpoint().host(),
+ ENDPOINT_TO_PARTITIONS.get(0).userEndpoint().port())
+ );
+
assertEquals(ENDPOINT_TO_PARTITIONS.get(0).partitions().get(0).topic(),
topicPartitions.get(0).topic());
+
assertEquals(ENDPOINT_TO_PARTITIONS.get(0).partitions().get(0).partitions().get(0),
topicPartitions.get(0).partition());
+ assertEquals(
+ 1.0,
+ metrics.metric(metrics.metricName("heartbeat-total",
"consumer-coordinator-metrics")).metricValue()
+ );
+ }
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ public void testBuildingHeartbeatRequestFieldsThatAreAlwaysSent(final
boolean instanceIdPresent) {
+ when(membershipManager.groupId()).thenReturn(GROUP_ID);
+ when(membershipManager.memberId()).thenReturn(MEMBER_ID);
+ when(membershipManager.memberEpoch()).thenReturn(MEMBER_EPOCH);
+ when(membershipManager.groupInstanceId()).thenReturn(instanceIdPresent
? Optional.of(INSTANCE_ID) : Optional.empty());
+ final StreamsGroupHeartbeatRequestManager.HeartbeatState
heartbeatState =
+ new
StreamsGroupHeartbeatRequestManager.HeartbeatState(streamsRebalanceData,
membershipManager, 1000);
+
+ StreamsGroupHeartbeatRequestData requestData1 =
heartbeatState.buildRequestData();
+
+ assertEquals(GROUP_ID, requestData1.groupId());
+ assertEquals(MEMBER_ID, requestData1.memberId());
+ assertEquals(MEMBER_EPOCH, requestData1.memberEpoch());
+ if (instanceIdPresent) {
+ assertEquals(INSTANCE_ID, requestData1.instanceId());
+ } else {
+ assertNull(requestData1.instanceId());
+ }
+
+ StreamsGroupHeartbeatRequestData requestData2 =
heartbeatState.buildRequestData();
+
+ assertEquals(GROUP_ID, requestData2.groupId());
+ assertEquals(MEMBER_ID, requestData2.memberId());
+ assertEquals(MEMBER_EPOCH, requestData2.memberEpoch());
+ if (instanceIdPresent) {
+ assertEquals(INSTANCE_ID, requestData2.instanceId());
+ } else {
+ assertNull(requestData2.instanceId());
}
}
+ @ParameterizedTest
+ @MethodSource("provideNonJoiningStates")
+ public void testBuildingHeartbeatRequestTopologySentWhenJoining(final
MemberState memberState) {
+ final StreamsGroupHeartbeatRequestManager.HeartbeatState
heartbeatState =
+ new
StreamsGroupHeartbeatRequestManager.HeartbeatState(streamsRebalanceData,
membershipManager, 1000);
+ when(membershipManager.state()).thenReturn(MemberState.JOINING);
+
+ StreamsGroupHeartbeatRequestData requestData1 =
heartbeatState.buildRequestData();
+
+ assertEquals(streamsRebalanceData.topologyEpoch(),
requestData1.topology().epoch());
+ final List<StreamsGroupHeartbeatRequestData.Subtopology> subtopologies
= requestData1.topology().subtopologies();
+ assertEquals(2, subtopologies.size());
+ final StreamsGroupHeartbeatRequestData.Subtopology subtopology1 =
subtopologies.get(0);
+ assertEquals(SUBTOPOLOGY_NAME_1, subtopology1.subtopologyId());
+ assertEquals(List.of(SOURCE_TOPIC_1, SOURCE_TOPIC_2),
subtopology1.sourceTopics());
+ assertEquals(List.of(REPARTITION_SINK_TOPIC_1,
REPARTITION_SINK_TOPIC_2, REPARTITION_SINK_TOPIC_3),
subtopology1.repartitionSinkTopics());
+ assertEquals(REPARTITION_SOURCE_TOPICS.size(),
subtopology1.repartitionSourceTopics().size());
+ subtopology1.repartitionSourceTopics().forEach(topicInfo -> {
+ final StreamsRebalanceData.TopicInfo repartitionTopic =
REPARTITION_SOURCE_TOPICS.get(topicInfo.name());
+ assertEquals(repartitionTopic.numPartitions().get(),
topicInfo.partitions());
+ assertEquals(repartitionTopic.replicationFactor().get(),
topicInfo.replicationFactor());
+ assertEquals(repartitionTopic.topicConfigs().size(),
topicInfo.topicConfigs().size());
+ });
+ assertEquals(CHANGELOG_TOPICS.size(),
subtopology1.stateChangelogTopics().size());
+ subtopology1.stateChangelogTopics().forEach(topicInfo -> {
+ assertTrue(CHANGELOG_TOPICS.containsKey(topicInfo.name()));
+ assertEquals(0, topicInfo.partitions());
+ final StreamsRebalanceData.TopicInfo changelogTopic =
CHANGELOG_TOPICS.get(topicInfo.name());
+ assertEquals(changelogTopic.replicationFactor().get(),
topicInfo.replicationFactor());
+ assertEquals(changelogTopic.topicConfigs().size(),
topicInfo.topicConfigs().size());
+ });
+ assertEquals(2, subtopology1.copartitionGroups().size());
+ final StreamsGroupHeartbeatRequestData.CopartitionGroup
expectedCopartitionGroupData1 =
+ new StreamsGroupHeartbeatRequestData.CopartitionGroup()
+ .setRepartitionSourceTopics(Collections.singletonList((short)
0))
+ .setSourceTopics(Collections.singletonList((short) 1));
+ final StreamsGroupHeartbeatRequestData.CopartitionGroup
expectedCopartitionGroupData2 =
+ new StreamsGroupHeartbeatRequestData.CopartitionGroup()
+ .setRepartitionSourceTopics(Collections.singletonList((short)
1))
+ .setSourceTopics(Collections.singletonList((short) 0));
+
assertTrue(subtopology1.copartitionGroups().contains(expectedCopartitionGroupData1));
+
assertTrue(subtopology1.copartitionGroups().contains(expectedCopartitionGroupData2));
+ final StreamsGroupHeartbeatRequestData.Subtopology subtopology2 =
subtopologies.get(1);
+ assertEquals(SUBTOPOLOGY_NAME_2, subtopology2.subtopologyId());
+ assertEquals(List.of(SOURCE_TOPIC_3), subtopology2.sourceTopics());
+ assertEquals(Collections.emptyList(),
subtopology2.repartitionSinkTopics());
+ assertEquals(Collections.emptyList(),
subtopology2.repartitionSourceTopics());
+ assertEquals(1, subtopology2.stateChangelogTopics().size());
+ assertEquals(CHANGELOG_TOPIC_4,
subtopology2.stateChangelogTopics().get(0).name());
+ assertEquals(0,
subtopology2.stateChangelogTopics().get(0).partitions());
+ assertEquals(1,
subtopology2.stateChangelogTopics().get(0).replicationFactor());
+ assertEquals(0,
subtopology2.stateChangelogTopics().get(0).topicConfigs().size());
+
+ when(membershipManager.state()).thenReturn(memberState);
+
+ StreamsGroupHeartbeatRequestData nonJoiningRequestData =
heartbeatState.buildRequestData();
+ assertNull(nonJoiningRequestData.topology());
+ }
+
+ @ParameterizedTest
+ @MethodSource("provideNonJoiningStates")
+ public void
testBuildingHeartbeatRequestRebalanceTimeoutSentWhenJoining(final MemberState
memberState) {
+ final int rebalanceTimeoutMs = 1234;
+ final StreamsGroupHeartbeatRequestManager.HeartbeatState
heartbeatState =
+ new
StreamsGroupHeartbeatRequestManager.HeartbeatState(streamsRebalanceData,
membershipManager, rebalanceTimeoutMs);
+ when(membershipManager.state()).thenReturn(MemberState.JOINING);
+
+ StreamsGroupHeartbeatRequestData requestData1 =
heartbeatState.buildRequestData();
+
+ assertEquals(rebalanceTimeoutMs, requestData1.rebalanceTimeoutMs());
+
+ when(membershipManager.state()).thenReturn(memberState);
+
+ StreamsGroupHeartbeatRequestData nonJoiningRequestData =
heartbeatState.buildRequestData();
+
+ assertEquals(-1, nonJoiningRequestData.rebalanceTimeoutMs());
+ }
+
+ @ParameterizedTest
+ @MethodSource("provideNonJoiningStates")
+ public void testBuildingHeartbeatProcessIdSentWhenJoining(final
MemberState memberState) {
+ final StreamsGroupHeartbeatRequestManager.HeartbeatState
heartbeatState =
+ new
StreamsGroupHeartbeatRequestManager.HeartbeatState(streamsRebalanceData,
membershipManager, 1234);
+ when(membershipManager.state()).thenReturn(MemberState.JOINING);
+
+ StreamsGroupHeartbeatRequestData requestData1 =
heartbeatState.buildRequestData();
+
+ assertEquals(PROCESS_ID.toString(), requestData1.processId());
+
+ when(membershipManager.state()).thenReturn(memberState);
+
+ StreamsGroupHeartbeatRequestData nonJoiningRequestData =
heartbeatState.buildRequestData();
+
+ assertNull(nonJoiningRequestData.processId());
+ }
+
+ @ParameterizedTest
+ @MethodSource("provideNonJoiningStates")
+ public void testBuildingHeartbeatEndpointSentWhenJoining(final MemberState
memberState) {
+ final StreamsGroupHeartbeatRequestManager.HeartbeatState
heartbeatState =
+ new
StreamsGroupHeartbeatRequestManager.HeartbeatState(streamsRebalanceData,
membershipManager, 1234);
+ when(membershipManager.state()).thenReturn(MemberState.JOINING);
+
+ StreamsGroupHeartbeatRequestData joiningRequestData =
heartbeatState.buildRequestData();
+
+ assertEquals(ENDPOINT.host(),
joiningRequestData.userEndpoint().host());
+ assertEquals(ENDPOINT.port(),
joiningRequestData.userEndpoint().port());
+
+ when(membershipManager.state()).thenReturn(memberState);
+
+ StreamsGroupHeartbeatRequestData nonJoiningRequestData =
heartbeatState.buildRequestData();
+
+ assertNull(nonJoiningRequestData.userEndpoint());
+ }
+
+ @ParameterizedTest
+ @MethodSource("provideNonJoiningStates")
+ public void testBuildingHeartbeatClientTagsSentWhenJoining(final
MemberState memberState) {
+ final StreamsGroupHeartbeatRequestManager.HeartbeatState
heartbeatState =
+ new
StreamsGroupHeartbeatRequestManager.HeartbeatState(streamsRebalanceData,
membershipManager, 1234);
+ when(membershipManager.state()).thenReturn(MemberState.JOINING);
+
+ StreamsGroupHeartbeatRequestData joiningRequestData =
heartbeatState.buildRequestData();
+
+ assertEquals(CLIENT_TAG_1,
joiningRequestData.clientTags().get(0).key());
+ assertEquals(VALUE_1, joiningRequestData.clientTags().get(0).value());
+
+ when(membershipManager.state()).thenReturn(memberState);
+
+ StreamsGroupHeartbeatRequestData nonJoiningRequestData =
heartbeatState.buildRequestData();
+
+ assertNull(nonJoiningRequestData.clientTags());
+ }
+
+ @ParameterizedTest
+ @MethodSource("provideNonJoiningStates")
+ public void testBuildingHeartbeatAssignmentSentWhenChanged(final
MemberState memberState) {
+ final StreamsGroupHeartbeatRequestManager.HeartbeatState
heartbeatState =
+ new
StreamsGroupHeartbeatRequestManager.HeartbeatState(streamsRebalanceData,
membershipManager, 1234);
+ when(membershipManager.state()).thenReturn(MemberState.JOINING);
+
+ StreamsGroupHeartbeatRequestData joiningRequestData =
heartbeatState.buildRequestData();
+
+ assertEquals(List.of(), joiningRequestData.activeTasks());
+ assertEquals(List.of(), joiningRequestData.standbyTasks());
+ assertEquals(List.of(), joiningRequestData.warmupTasks());
+
+ when(membershipManager.state()).thenReturn(memberState);
+ streamsRebalanceData.setReconciledAssignment(
+ new StreamsRebalanceData.Assignment(
+ Set.of(
+ new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 0),
+ new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 1),
+ new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_2, 2)
+ ),
+ Set.of(
+ new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 2)
+ ),
+ Set.of(
+ new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 3),
+ new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 4),
+ new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 5)
+ )
+ )
+ );
+
+ StreamsGroupHeartbeatRequestData firstNonJoiningRequestData =
heartbeatState.buildRequestData();
+
+ assertEquals(
+ List.of(
+ new StreamsGroupHeartbeatRequestData.TaskIds()
+ .setSubtopologyId(SUBTOPOLOGY_NAME_1)
+ .setPartitions(List.of(0, 1)),
+ new StreamsGroupHeartbeatRequestData.TaskIds()
+ .setSubtopologyId(SUBTOPOLOGY_NAME_2)
+ .setPartitions(List.of(2))
+ ),
+ firstNonJoiningRequestData.activeTasks()
+ );
+ assertEquals(
+ List.of(
+ new StreamsGroupHeartbeatRequestData.TaskIds()
+ .setSubtopologyId(SUBTOPOLOGY_NAME_1)
+ .setPartitions(List.of(2))
+ ),
+ firstNonJoiningRequestData.standbyTasks()
+ );
+ assertEquals(
+ List.of(
+ new StreamsGroupHeartbeatRequestData.TaskIds()
+ .setSubtopologyId(SUBTOPOLOGY_NAME_1)
+ .setPartitions(List.of(3, 4, 5))
+ ),
+ firstNonJoiningRequestData.warmupTasks()
+ );
+
+ StreamsGroupHeartbeatRequestData nonJoiningRequestDataWithoutChanges =
heartbeatState.buildRequestData();
+
+ assertNull(nonJoiningRequestDataWithoutChanges.activeTasks());
+ assertNull(nonJoiningRequestDataWithoutChanges.standbyTasks());
+ assertNull(nonJoiningRequestDataWithoutChanges.warmupTasks());
+
+ streamsRebalanceData.setReconciledAssignment(
+ new StreamsRebalanceData.Assignment(
+ Set.of(
+ new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 0)
+ ),
+ Set.of(
+ new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 2)
+ ),
+ Set.of(
+ )
+ )
+ );
+
+ StreamsGroupHeartbeatRequestData nonJoiningRequestDataWithChanges =
heartbeatState.buildRequestData();
+
+ assertEquals(
+ List.of(
+ new StreamsGroupHeartbeatRequestData.TaskIds()
+ .setSubtopologyId(SUBTOPOLOGY_NAME_1)
+ .setPartitions(List.of(0))
+ ),
+ nonJoiningRequestDataWithChanges.activeTasks()
+ );
+ assertEquals(
+ List.of(
+ new StreamsGroupHeartbeatRequestData.TaskIds()
+ .setSubtopologyId(SUBTOPOLOGY_NAME_1)
+ .setPartitions(List.of(2))
+ ),
+ nonJoiningRequestDataWithChanges.standbyTasks()
+ );
+ assertEquals(List.of(),
nonJoiningRequestDataWithChanges.warmupTasks());
+ }
+
+ @ParameterizedTest
+ @MethodSource("provideNonJoiningStates")
+ public void testResettingHeartbeatState(final MemberState memberState) {
+ when(membershipManager.groupId()).thenReturn(GROUP_ID);
+ when(membershipManager.memberId()).thenReturn(MEMBER_ID);
+ when(membershipManager.memberEpoch()).thenReturn(MEMBER_EPOCH);
+
when(membershipManager.groupInstanceId()).thenReturn(Optional.of(INSTANCE_ID));
+ final StreamsGroupHeartbeatRequestManager.HeartbeatState
heartbeatState =
+ new
StreamsGroupHeartbeatRequestManager.HeartbeatState(streamsRebalanceData,
membershipManager, 1234);
+ when(membershipManager.state()).thenReturn(memberState);
+ streamsRebalanceData.setReconciledAssignment(
+ new StreamsRebalanceData.Assignment(
+ Set.of(
+ new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 0),
+ new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 1),
+ new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_2, 2)
+ ),
+ Set.of(
+ new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 2)
+ ),
+ Set.of(
+ new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 3),
+ new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 4),
+ new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 5)
+ )
+ )
+ );
+ StreamsGroupHeartbeatRequestData requestDataBeforeReset =
heartbeatState.buildRequestData();
+ assertEquals(GROUP_ID, requestDataBeforeReset.groupId());
+ assertEquals(MEMBER_ID, requestDataBeforeReset.memberId());
+ assertEquals(MEMBER_EPOCH, requestDataBeforeReset.memberEpoch());
+ assertEquals(INSTANCE_ID, requestDataBeforeReset.instanceId());
+ assertFalse(requestDataBeforeReset.activeTasks().isEmpty());
+ assertFalse(requestDataBeforeReset.standbyTasks().isEmpty());
+ assertFalse(requestDataBeforeReset.warmupTasks().isEmpty());
+
+ heartbeatState.reset();
+
+ StreamsGroupHeartbeatRequestData requestDataAfterReset =
heartbeatState.buildRequestData();
+ assertEquals(GROUP_ID, requestDataAfterReset.groupId());
+ assertEquals(MEMBER_ID, requestDataAfterReset.memberId());
+ assertEquals(MEMBER_EPOCH, requestDataAfterReset.memberEpoch());
+ assertEquals(INSTANCE_ID, requestDataAfterReset.instanceId());
+ assertEquals(requestDataBeforeReset.activeTasks(),
requestDataAfterReset.activeTasks());
+ assertEquals(requestDataBeforeReset.standbyTasks(),
requestDataAfterReset.standbyTasks());
+ assertEquals(requestDataBeforeReset.warmupTasks(),
requestDataAfterReset.warmupTasks());
+ }
+
+ private static Stream<Arguments> provideNonJoiningStates() {
Review Comment:
My apologies, I thought this didn't line up correctly, and it needed to have
an extra space removed. But looking at it again it seems fine, so ignore my
comment here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]