vamossagar12 commented on code in PR #15305:
URL: https://github.com/apache/kafka/pull/15305#discussion_r1604767686
##########
connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java:
##########
@@ -850,6 +855,46 @@ public void testRequestTimeouts() throws Exception {
);
}
+ @Test
+ public void testPollTimeoutExpiry() throws Exception {
+ // This is a fabricated test to ensure that a poll timeout expiry
happens. The tick thread awaits on
+ // task#stop method which is blocked. The timeouts have been set
accordingly
+ workerProps.put(REBALANCE_TIMEOUT_MS_CONFIG,
Long.toString(TimeUnit.SECONDS.toMillis(20)));
+ workerProps.put(TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG,
Long.toString(TimeUnit.SECONDS.toMillis(40)));
+ connect = connectBuilder
+ .numBrokers(1)
+ .numWorkers(1)
+ .build();
+
+ connect.start();
+
+ connect.assertions().assertExactlyNumWorkersAreUp(1, "Worker not
brought up in time");
+
+ Map<String, String> connectorWithBlockingTaskStopConfig = new
HashMap<>();
+ connectorWithBlockingTaskStopConfig.put(CONNECTOR_CLASS_CONFIG,
BlockingConnectorTest.BlockingSourceConnector.class.getName());
+ connectorWithBlockingTaskStopConfig.put(TASKS_MAX_CONFIG, "1");
+
connectorWithBlockingTaskStopConfig.put(BlockingConnectorTest.Block.BLOCK_CONFIG,
Objects.requireNonNull(TASK_STOP));
+
+ connect.configureConnector(CONNECTOR_NAME,
connectorWithBlockingTaskStopConfig);
+
+ connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(
+ CONNECTOR_NAME, 1, "connector and tasks did not start in time"
+ );
+
+ try (LogCaptureAppender logCaptureAppender =
LogCaptureAppender.createAndRegister(WorkerCoordinator.class)) {
+ connect.restartTask(CONNECTOR_NAME, 0);
+ TestUtils.waitForCondition(() ->
logCaptureAppender.getEvents().stream().anyMatch(e ->
e.getLevel().equals("WARN")) &&
Review Comment:
Yeah I wanted to add the test in `BlockingConnectorTest` itself but it would
have meant a lot of changes in that class. That is because currently that test
doesn't support setting worker level properties or changing the number of
workers. Being able to change the worker level properties was the way I could
get the poll timeout expiry.
Moreover, the test I have added doesn't really block for the entire stop
method but ends almost after the task shutdown graceful ms period ends because
of the reset at the end of the test. Let me know if that makes sense.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]