vamossagar12 commented on code in PR #15305:
URL: https://github.com/apache/kafka/pull/15305#discussion_r1571150850
##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java:
##########
@@ -533,6 +536,57 @@ public void testSkippingAssignmentFails() {
verify(configStorage).snapshot();
}
+ @Test
+ public void testPollTimeoutExpiry() throws InterruptedException {
+ // We will create a new WorkerCoordinator object with a rebalance
timeout smaller
+ // than session timeout. This might not happen in the real world but
it makes testing
+ // easier and the test not flaky.
+ int smallRebalanceTimeout = 20;
+ this.rebalanceConfig = new GroupRebalanceConfig(sessionTimeoutMs,
+ smallRebalanceTimeout,
+ heartbeatIntervalMs,
+ groupId,
+ Optional.empty(),
+ retryBackoffMs,
+ retryBackoffMaxMs,
+ true);
+ this.coordinator = new WorkerCoordinator(rebalanceConfig,
+ logContext,
+ consumerClient,
+ new Metrics(time),
+ "consumer" + groupId,
+ time,
+ LEADER_URL,
+ configStorage,
+ rebalanceListener,
+ compatibility,
+ 0);
+
+ when(configStorage.snapshot()).thenReturn(configState1);
+
+
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE,
groupId, node));
+ coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+ client.prepareResponse(joinGroupFollowerResponse(1, "member",
"leader", Errors.NONE));
+
client.prepareResponse(syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR,
"leader", configState1.offset(), Collections.emptyList(),
+ Collections.singletonList(taskId1x0), Errors.NONE));
+
+ try (LogCaptureAppender logCaptureAppender =
LogCaptureAppender.createAndRegister(WorkerCoordinator.class)) {
+ coordinator.ensureActiveGroup();
+ coordinator.poll(0, () -> null);
+
+ // The heartbeat thread is running and keeps sending heartbeat
requests.
+ TestUtils.waitForCondition(() -> {
+ // Rebalance timeout elapses while poll is never invoked
causing a poll timeout expiry
+ coordinator.sendHeartbeatRequest();
+ client.prepareResponse(new HeartbeatResponse(new
HeartbeatResponseData()));
+ time.sleep(1);
Review Comment:
This sleep value, though small was the one that eventually worked after
running the test continuously for 30 times. Sleeping for 10s still kept failing
because sometimes the coordinator's session timeout would happen even though it
doesn't look like it should.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]