This is an automated email from the ASF dual-hosted git repository.
kirktrue pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 9a8fd60fa24 KAFKA-19932: adding handling of OOM and avoiding wrapped
as timeout (#21117)
9a8fd60fa24 is described below
commit 9a8fd60fa2470f9a611eb7d38e4eba5582ebd23b
Author: Arpit Goyal <[email protected]>
AuthorDate: Wed Apr 1 21:44:15 2026 +0530
KAFKA-19932: adding handling of OOM and avoiding wrapped as timeout (#21117)
adding handling of OOM and avoiding wrapped as timeout
---
.../kafka/clients/admin/KafkaAdminClient.java | 6 +++++
.../kafka/clients/admin/KafkaAdminClientTest.java | 26 ++++++++++++++++++++++
2 files changed, 32 insertions(+)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index e1ab4c6929d..8ca79899421 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -900,6 +900,7 @@ public class KafkaAdminClient extends AdminClient {
* @param now The current time in milliseconds.
* @param throwable The failure exception.
*/
+ @SuppressWarnings("NPathComplexity")
final void fail(long now, Throwable throwable) {
if (curNode != null) {
runnable.nodeReadyDeadlines.remove(curNode);
@@ -921,6 +922,11 @@ public class KafkaAdminClient extends AdminClient {
}
nextAllowedTryMs = now + retryBackoff.backoff(tries++);
+ // Don't mask VirtualMachineError as TimeoutException - propagate
it directly
+ if (throwable instanceof VirtualMachineError) {
+ handleFailure(throwable);
+ return;
+ }
// If the call has timed out, fail.
if (calcTimeoutMsRemainingAsInt(now, deadlineMs) <= 0) {
handleTimeoutFailure(now, throwable);
diff --git
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 1bb7e2e171e..202959a3eb2 100644
---
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -11787,4 +11787,30 @@ public class KafkaAdminClientTest {
assertTrue(duration >= 150L && duration < 30000);
}
}
+
+ /**
+ * Test that OutOfMemoryError is properly propagated and not masked as
TimeoutException.
+ * This test simulates an OOM error during response processing and
verifies it propagates
+ * without being wrapped. This is a regression test for KAFKA-19932.
+ */
+ @Test
+ public void testOutOfMemoryErrorPropagation() throws Exception {
+ MockTime time = new MockTime();
+ try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time,
mockCluster(1, 0),
+ AdminClientConfig.RETRIES_CONFIG, "2",
+ AdminClientConfig.RETRY_BACKOFF_MS_CONFIG, "100")) {
+ env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+
+ OutOfMemoryError oomError = new OutOfMemoryError("Simulated OOM
during response handling");
+ MetadataResponse mockResponse = mock(MetadataResponse.class);
+ doThrow(oomError).when(mockResponse).topicMetadata();
+
+ env.kafkaClient().prepareResponse(mockResponse);
+
+ // Make the listTopics call - this will internally trigger a
metadata request
+ ListTopicsResult result = env.adminClient().listTopics(new
ListTopicsOptions().timeoutMs(10000));
+
+ TestUtils.assertFutureThrows(OutOfMemoryError.class,
result.names());
+ }
+ }
}