showuon commented on code in PR #15305:
URL: https://github.com/apache/kafka/pull/15305#discussion_r1556799227
##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java:
##########
@@ -533,6 +536,47 @@ public void testSkippingAssignmentFails() {
verify(configStorage).snapshot();
}
+ @Test
+ public void testPollTimeoutExpiry() throws InterruptedException {
+
+ when(configStorage.snapshot()).thenReturn(configState1);
+
+
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE,
groupId, node));
+ coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+ client.prepareResponse(joinGroupFollowerResponse(1, "member",
"leader", Errors.NONE));
+
client.prepareResponse(syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR,
"leader", configState1.offset(), Collections.emptyList(),
+ Collections.singletonList(taskId1x0), Errors.NONE));
+
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE,
groupId, node));
+ coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE,
groupId, node));
+
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE,
groupId, node));
+
+ client.prepareResponse(joinGroupFollowerResponse(1, "member",
"leader", Errors.NONE));
+
client.prepareResponse(syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR,
"leader", configState1.offset(), Collections.emptyList(),
+ Collections.singletonList(taskId1x0), Errors.NONE));
+
+ try (LogCaptureAppender logCaptureAppender =
LogCaptureAppender.createAndRegister(WorkerCoordinator.class)) {
+ coordinator.ensureActiveGroup();
+ coordinator.poll(0, () -> {
+ return null;
+ });
+
+ long now = time.milliseconds();
+ // We keep the heartbeat thread running behind the scenes and poll
frequently so that eventually
+ // the time goes past now + rebalanceTimeoutMs which triggers poll
timeout expiry.
+ TestUtils.waitForCondition(() -> {
+ time.sleep(heartbeatIntervalMs - 1);
+ return time.milliseconds() > now + rebalanceTimeoutMs;
+ }, Duration.ofMinutes(1).toMillis(), "Coordinator did not poll for
rebalance.timeout.ms");
+ coordinator.poll(0, () -> {
Review Comment:
1. You didn't provide HeartBeatResponse, so it'll have session timeout.
2. The `heartbeatIntervalMs` is the minimum time interval the heartbeat
should send, but the real timeout for heartBeat should be sessionTimeout, so we
can set `sessionTimeoutMs - 1` to make the time faster to reach
`rebalanceTimeoutMs`.
3. The last poll doesn't make any sense because the poll timeout should be
triggered already. Why do we need it?
What I would write is something like this, FYR:
```
public void testPollTimeoutExpiry() throws InterruptedException {
when(configStorage.snapshot()).thenReturn(configState1);
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE,
groupId, node));
coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
client.prepareResponse(joinGroupFollowerResponse(1, "member",
"leader", Errors.NONE));
client.prepareResponse(syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR,
"leader", configState1.offset(), Collections.emptyList(),
Collections.singletonList(taskId1x0), Errors.NONE));
// prepare 3 heartBeatResponses because we will trigger 3 heartBeat
requests until rebalanceTimeout,
// that is (sessionTimeoutMs - 1) * 3 > rebalanceTimeoutMs
client.prepareResponse(new HeartbeatResponse(new
HeartbeatResponseData()));
client.prepareResponse(new HeartbeatResponse(new
HeartbeatResponseData()));
client.prepareResponse(new HeartbeatResponse(new
HeartbeatResponseData()));
try (LogCaptureAppender logCaptureAppender =
LogCaptureAppender.createAndRegister(WorkerCoordinator.class)) {
coordinator.ensureActiveGroup();
System.out.println("!!! poll");
coordinator.poll(0, () -> {
return null;
});
// We keep the heartbeat thread running behind the scenes and
poll frequently so that eventually
// the time goes past now + rebalanceTimeoutMs which triggers
poll timeout expiry.
TestUtils.waitForCondition(() -> {
// sleep until sessionTimeoutMs to trigger a heartBeat
request to avoid session timeout.
// Not sure if this will be flaky in CI because the
heartbeat thread might not send out the heartBeat request in time.
time.sleep(sessionTimeoutMs - 1);
return logCaptureAppender.getEvents().stream().anyMatch(e ->
e.getLevel().equals("WARN")) &&
logCaptureAppender.getEvents().stream().anyMatch(e
-> e.getMessage().startsWith("worker poll timeout has expired"));
}, "Coordinator did not poll for rebalance.timeout.ms");
}
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]