lianetm commented on code in PR #16200:
URL: https://github.com/apache/kafka/pull/16200#discussion_r1649427661
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##########
@@ -659,78 +753,38 @@ public void
testPollTimerExpirationShouldNotMarkMemberStaleIfMemberAlreadyLeavin
@Test
public void testisExpiredByUsedForLogging() {
- Timer pollTimer = spy(time.timer(DEFAULT_MAX_POLL_INTERVAL_MS));
- heartbeatRequestManager = new HeartbeatRequestManager(new
LogContext(), pollTimer, config(),
- coordinatorRequestManager, membershipManager, heartbeatState,
heartbeatRequestState,
- backgroundEventHandler, metrics);
when(membershipManager.shouldSkipHeartbeat()).thenReturn(false);
int exceededTimeMs = 5;
time.sleep(DEFAULT_MAX_POLL_INTERVAL_MS + exceededTimeMs);
+ when(membershipManager.isLeavingGroup()).thenReturn(false);
+ when(pollTimer.isExpired()).thenReturn(true);
NetworkClientDelegate.PollResult pollResult =
heartbeatRequestManager.poll(time.milliseconds());
assertEquals(1, pollResult.unsentRequests.size());
verify(membershipManager).transitionToSendingLeaveGroup(true);
verify(pollTimer, never()).isExpiredBy();
- assertEquals(exceededTimeMs, pollTimer.isExpiredBy());
clearInvocations(pollTimer);
heartbeatRequestManager.resetPollTimer(time.milliseconds());
verify(pollTimer).isExpiredBy();
}
@Test
- public void testHeartbeatMetrics() {
- // setup
- coordinatorRequestManager = mock(CoordinatorRequestManager.class);
- membershipManager = mock(MembershipManager.class);
- heartbeatState = mock(HeartbeatRequestManager.HeartbeatState.class);
- time = new MockTime();
- metrics = new Metrics(time);
- heartbeatRequestState = new
HeartbeatRequestManager.HeartbeatRequestState(
- new LogContext(),
- time,
- 0, // This initial interval should be 0 to ensure heartbeat on the
clock
- DEFAULT_RETRY_BACKOFF_MS,
- DEFAULT_RETRY_BACKOFF_MAX_MS,
- 0);
- backgroundEventHandler = mock(BackgroundEventHandler.class);
+ public void
testFencedMemberStopHeartbeatUntilItReleasesAssignmentToRejoin() {
heartbeatRequestManager = createHeartbeatRequestManager(
- coordinatorRequestManager,
- membershipManager,
- heartbeatState,
- heartbeatRequestState,
- backgroundEventHandler);
-
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(new
Node(1, "localhost", 9999)));
- when(membershipManager.state()).thenReturn(MemberState.STABLE);
-
- assertNotNull(getMetric("heartbeat-response-time-max"));
- assertNotNull(getMetric("heartbeat-rate"));
- assertNotNull(getMetric("heartbeat-total"));
- assertNotNull(getMetric("last-heartbeat-seconds-ago"));
-
- // test poll
- assertHeartbeat(heartbeatRequestManager, 0);
- time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS);
- assertEquals(1.0, getMetric("heartbeat-total").metricValue());
- assertEquals((double)
TimeUnit.MILLISECONDS.toSeconds(DEFAULT_HEARTBEAT_INTERVAL_MS),
getMetric("last-heartbeat-seconds-ago").metricValue());
-
- assertHeartbeat(heartbeatRequestManager,
DEFAULT_HEARTBEAT_INTERVAL_MS);
- assertEquals(0.06d, (double)
getMetric("heartbeat-rate").metricValue(), 0.005d);
- assertEquals(2.0, getMetric("heartbeat-total").metricValue());
-
- // Randomly sleep for some time
- Random rand = new Random();
- int randomSleepS = rand.nextInt(11);
- time.sleep(randomSleepS * 1000);
- assertEquals((double) randomSleepS,
getMetric("last-heartbeat-seconds-ago").metricValue());
- }
+ coordinatorRequestManager,
+ membershipManager,
+ heartbeatState,
+ heartbeatRequestState,
+ backgroundEventHandler);
- @Test
- public void
testFencedMemberStopHeartbeatUntilItReleasesAssignmentToRejoin() {
+ when(heartbeatRequestState.canSendRequest(anyLong())).thenReturn(true);
+ when(membershipManager.state()).thenReturn(MemberState.STABLE);
mockStableMember();
time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS);
+ when(membershipManager.isLeavingGroup()).thenReturn(true);
Review Comment:
uhm do we need this here? I wouldn't expect so (the membershipMgr is a mock
now, and the HB mgr does not check the isLeavingGroup to generate a HB)
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##########
@@ -659,78 +753,38 @@ public void
testPollTimerExpirationShouldNotMarkMemberStaleIfMemberAlreadyLeavin
@Test
public void testisExpiredByUsedForLogging() {
- Timer pollTimer = spy(time.timer(DEFAULT_MAX_POLL_INTERVAL_MS));
- heartbeatRequestManager = new HeartbeatRequestManager(new
LogContext(), pollTimer, config(),
- coordinatorRequestManager, membershipManager, heartbeatState,
heartbeatRequestState,
- backgroundEventHandler, metrics);
when(membershipManager.shouldSkipHeartbeat()).thenReturn(false);
int exceededTimeMs = 5;
time.sleep(DEFAULT_MAX_POLL_INTERVAL_MS + exceededTimeMs);
+ when(membershipManager.isLeavingGroup()).thenReturn(false);
+ when(pollTimer.isExpired()).thenReturn(true);
NetworkClientDelegate.PollResult pollResult =
heartbeatRequestManager.poll(time.milliseconds());
assertEquals(1, pollResult.unsentRequests.size());
verify(membershipManager).transitionToSendingLeaveGroup(true);
verify(pollTimer, never()).isExpiredBy();
- assertEquals(exceededTimeMs, pollTimer.isExpiredBy());
clearInvocations(pollTimer);
heartbeatRequestManager.resetPollTimer(time.milliseconds());
verify(pollTimer).isExpiredBy();
}
@Test
- public void testHeartbeatMetrics() {
- // setup
- coordinatorRequestManager = mock(CoordinatorRequestManager.class);
- membershipManager = mock(MembershipManager.class);
- heartbeatState = mock(HeartbeatRequestManager.HeartbeatState.class);
- time = new MockTime();
- metrics = new Metrics(time);
- heartbeatRequestState = new
HeartbeatRequestManager.HeartbeatRequestState(
- new LogContext(),
- time,
- 0, // This initial interval should be 0 to ensure heartbeat on the
clock
- DEFAULT_RETRY_BACKOFF_MS,
- DEFAULT_RETRY_BACKOFF_MAX_MS,
- 0);
- backgroundEventHandler = mock(BackgroundEventHandler.class);
+ public void
testFencedMemberStopHeartbeatUntilItReleasesAssignmentToRejoin() {
heartbeatRequestManager = createHeartbeatRequestManager(
- coordinatorRequestManager,
- membershipManager,
- heartbeatState,
- heartbeatRequestState,
- backgroundEventHandler);
-
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(new
Node(1, "localhost", 9999)));
- when(membershipManager.state()).thenReturn(MemberState.STABLE);
-
- assertNotNull(getMetric("heartbeat-response-time-max"));
- assertNotNull(getMetric("heartbeat-rate"));
- assertNotNull(getMetric("heartbeat-total"));
- assertNotNull(getMetric("last-heartbeat-seconds-ago"));
-
- // test poll
- assertHeartbeat(heartbeatRequestManager, 0);
- time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS);
- assertEquals(1.0, getMetric("heartbeat-total").metricValue());
- assertEquals((double)
TimeUnit.MILLISECONDS.toSeconds(DEFAULT_HEARTBEAT_INTERVAL_MS),
getMetric("last-heartbeat-seconds-ago").metricValue());
-
- assertHeartbeat(heartbeatRequestManager,
DEFAULT_HEARTBEAT_INTERVAL_MS);
- assertEquals(0.06d, (double)
getMetric("heartbeat-rate").metricValue(), 0.005d);
- assertEquals(2.0, getMetric("heartbeat-total").metricValue());
-
- // Randomly sleep for some time
- Random rand = new Random();
- int randomSleepS = rand.nextInt(11);
- time.sleep(randomSleepS * 1000);
- assertEquals((double) randomSleepS,
getMetric("last-heartbeat-seconds-ago").metricValue());
- }
+ coordinatorRequestManager,
+ membershipManager,
+ heartbeatState,
+ heartbeatRequestState,
+ backgroundEventHandler);
- @Test
- public void
testFencedMemberStopHeartbeatUntilItReleasesAssignmentToRejoin() {
+ when(heartbeatRequestState.canSendRequest(anyLong())).thenReturn(true);
+ when(membershipManager.state()).thenReturn(MemberState.STABLE);
mockStableMember();
time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS);
+ when(membershipManager.isLeavingGroup()).thenReturn(true);
Review Comment:
also on ln 793 there's a doNothing().when(membershipManager) that I would
say we don't need on the mock
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##########
@@ -744,34 +798,17 @@ public void
testFencedMemberStopHeartbeatUntilItReleasesAssignmentToRejoin() {
verify(heartbeatRequestState).onFailedAttempt(anyLong());
verify(heartbeatRequestState).reset();
+
when(heartbeatRequestState.canSendRequest(anyLong())).thenReturn(false);
when(membershipManager.state()).thenReturn(MemberState.FENCED);
Review Comment:
what the HB manager checks on the membershipMgr in this case is the
shouldSKip, so I guess what we need to do here is to mock the fencing is:
`when(membershipManager.shouldSkipHeartbeat()).thenReturn(true);` (wonder if
this will allow you to loose the when canSendRequest too, which I would expect
should work because the heartbeatRequestState is not a mock)
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##########
@@ -744,34 +798,17 @@ public void
testFencedMemberStopHeartbeatUntilItReleasesAssignmentToRejoin() {
verify(heartbeatRequestState).onFailedAttempt(anyLong());
verify(heartbeatRequestState).reset();
+
when(heartbeatRequestState.canSendRequest(anyLong())).thenReturn(false);
when(membershipManager.state()).thenReturn(MemberState.FENCED);
result = heartbeatRequestManager.poll(time.milliseconds());
assertEquals(0, result.unsentRequests.size(), "Member should not send
heartbeats while FENCED");
+ when(heartbeatRequestState.canSendRequest(anyLong())).thenReturn(true);
when(membershipManager.state()).thenReturn(MemberState.JOINING);
Review Comment:
ditto, we probably don't need to add an expectation on the canSend, and we
just need to set the right one on the membershipMgr to show it's not fenced
anymore (when(membershipManager.shouldSkipHeartbeat()).thenReturn(false))
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##########
@@ -659,78 +753,38 @@ public void
testPollTimerExpirationShouldNotMarkMemberStaleIfMemberAlreadyLeavin
@Test
public void testisExpiredByUsedForLogging() {
- Timer pollTimer = spy(time.timer(DEFAULT_MAX_POLL_INTERVAL_MS));
- heartbeatRequestManager = new HeartbeatRequestManager(new
LogContext(), pollTimer, config(),
- coordinatorRequestManager, membershipManager, heartbeatState,
heartbeatRequestState,
- backgroundEventHandler, metrics);
when(membershipManager.shouldSkipHeartbeat()).thenReturn(false);
int exceededTimeMs = 5;
time.sleep(DEFAULT_MAX_POLL_INTERVAL_MS + exceededTimeMs);
+ when(membershipManager.isLeavingGroup()).thenReturn(false);
+ when(pollTimer.isExpired()).thenReturn(true);
NetworkClientDelegate.PollResult pollResult =
heartbeatRequestManager.poll(time.milliseconds());
assertEquals(1, pollResult.unsentRequests.size());
verify(membershipManager).transitionToSendingLeaveGroup(true);
verify(pollTimer, never()).isExpiredBy();
- assertEquals(exceededTimeMs, pollTimer.isExpiredBy());
clearInvocations(pollTimer);
heartbeatRequestManager.resetPollTimer(time.milliseconds());
verify(pollTimer).isExpiredBy();
}
@Test
- public void testHeartbeatMetrics() {
- // setup
- coordinatorRequestManager = mock(CoordinatorRequestManager.class);
- membershipManager = mock(MembershipManager.class);
- heartbeatState = mock(HeartbeatRequestManager.HeartbeatState.class);
- time = new MockTime();
- metrics = new Metrics(time);
- heartbeatRequestState = new
HeartbeatRequestManager.HeartbeatRequestState(
- new LogContext(),
- time,
- 0, // This initial interval should be 0 to ensure heartbeat on the
clock
- DEFAULT_RETRY_BACKOFF_MS,
- DEFAULT_RETRY_BACKOFF_MAX_MS,
- 0);
- backgroundEventHandler = mock(BackgroundEventHandler.class);
+ public void
testFencedMemberStopHeartbeatUntilItReleasesAssignmentToRejoin() {
heartbeatRequestManager = createHeartbeatRequestManager(
- coordinatorRequestManager,
- membershipManager,
- heartbeatState,
- heartbeatRequestState,
- backgroundEventHandler);
-
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(new
Node(1, "localhost", 9999)));
- when(membershipManager.state()).thenReturn(MemberState.STABLE);
-
- assertNotNull(getMetric("heartbeat-response-time-max"));
- assertNotNull(getMetric("heartbeat-rate"));
- assertNotNull(getMetric("heartbeat-total"));
- assertNotNull(getMetric("last-heartbeat-seconds-ago"));
-
- // test poll
- assertHeartbeat(heartbeatRequestManager, 0);
- time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS);
- assertEquals(1.0, getMetric("heartbeat-total").metricValue());
- assertEquals((double)
TimeUnit.MILLISECONDS.toSeconds(DEFAULT_HEARTBEAT_INTERVAL_MS),
getMetric("last-heartbeat-seconds-ago").metricValue());
-
- assertHeartbeat(heartbeatRequestManager,
DEFAULT_HEARTBEAT_INTERVAL_MS);
- assertEquals(0.06d, (double)
getMetric("heartbeat-rate").metricValue(), 0.005d);
- assertEquals(2.0, getMetric("heartbeat-total").metricValue());
-
- // Randomly sleep for some time
- Random rand = new Random();
- int randomSleepS = rand.nextInt(11);
- time.sleep(randomSleepS * 1000);
- assertEquals((double) randomSleepS,
getMetric("last-heartbeat-seconds-ago").metricValue());
- }
+ coordinatorRequestManager,
+ membershipManager,
+ heartbeatState,
+ heartbeatRequestState,
+ backgroundEventHandler);
- @Test
- public void
testFencedMemberStopHeartbeatUntilItReleasesAssignmentToRejoin() {
+ when(heartbeatRequestState.canSendRequest(anyLong())).thenReturn(true);
+ when(membershipManager.state()).thenReturn(MemberState.STABLE);
Review Comment:
setting this expectation is great, and it actually makes me notice that the
`mockStableMember()` could probably be removed? it was just taking all the
actions needed to put the membershipMgr into STABLE (when it was an instance,
not a mock)
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##########
@@ -744,34 +798,17 @@ public void
testFencedMemberStopHeartbeatUntilItReleasesAssignmentToRejoin() {
verify(heartbeatRequestState).onFailedAttempt(anyLong());
verify(heartbeatRequestState).reset();
+
when(heartbeatRequestState.canSendRequest(anyLong())).thenReturn(false);
when(membershipManager.state()).thenReturn(MemberState.FENCED);
result = heartbeatRequestManager.poll(time.milliseconds());
assertEquals(0, result.unsentRequests.size(), "Member should not send
heartbeats while FENCED");
+ when(heartbeatRequestState.canSendRequest(anyLong())).thenReturn(true);
when(membershipManager.state()).thenReturn(MemberState.JOINING);
result = heartbeatRequestManager.poll(time.milliseconds());
assertEquals(1, result.unsentRequests.size(), "Fenced member should
resume heartbeat after transitioning to JOINING");
}
- @ParameterizedTest
- @ApiKeyVersionsSource(apiKey = ApiKeys.CONSUMER_GROUP_HEARTBEAT)
- public void testSendingLeaveGroupHeartbeatWhenPreviousOneInFlight(final
short version) {
Review Comment:
uhm we shouldn't remove this test (it was actually an issue we discovered,
fixed, and added the test for coverage). I see it involves checking the HB
content (that is might be off just because the heartbeatState is a mock, but
give it another try after addressing the comments above to turn that state into
an actual instance able to build the HB content)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]