This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 25d09060299 MINOR: Move testCallInFlightTimeouts to Java and remove 
internal factory hack (#22220)
25d09060299 is described below

commit 25d09060299819f671daa0a3220d3d711868f238
Author: Jiayao Sun <[email protected]>
AuthorDate: Fri May 8 17:03:38 2026 +1200

    MINOR: Move testCallInFlightTimeouts to Java and remove internal factory 
hack (#22220)
    
    Moves `testCallInFlightTimeouts` from the Scala core module to a new
    Java `AdminClientTimeoutIntegrationTest` in `clients-integration-tests`.
    By placing the test in the `org.apache.kafka.clients.admin` package, it
    can directly access `KafkaAdminClient.createInternal()`. This allows us
    to:
    1. Delete the `KafkaAdminClientInternalFactory` backdoor, restoring
    proper encapsulation.
    2. Inline `FailureInjectingTimeoutProcessorFactory` into the test class,
    reducing test fixture clutter.
    
    Reviewers: Chia-Ping Tsai <[email protected]>, Igor Soarez
     <[email protected]>
---
 .../admin/AdminClientTimeoutIntegrationTest.java   | 121 +++++++++++++++++++++
 .../FailureInjectingTimeoutProcessorFactory.java   |  64 -----------
 .../admin/KafkaAdminClientInternalFactory.java     |  23 ----
 .../kafka/api/PlaintextAdminIntegrationTest.scala  |  19 +---
 4 files changed, 122 insertions(+), 105 deletions(-)

diff --git 
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/AdminClientTimeoutIntegrationTest.java
 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/AdminClientTimeoutIntegrationTest.java
new file mode 100644
index 00000000000..9503b887c9e
--- /dev/null
+++ 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/AdminClientTimeoutIntegrationTest.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.test.ClusterInstance;
+import org.apache.kafka.common.test.api.ClusterTest;
+import org.apache.kafka.common.test.api.ClusterTestDefaults;
+import org.apache.kafka.common.test.api.Type;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+@ClusterTestDefaults(types = { Type.KRAFT })
+public class AdminClientTimeoutIntegrationTest {
+
+    private static final Logger log = 
LoggerFactory.getLogger(AdminClientTimeoutIntegrationTest.class);
+
+    private final ClusterInstance clusterInstance;
+
+    public AdminClientTimeoutIntegrationTest(ClusterInstance clusterInstance) {
+        this.clusterInstance = clusterInstance;
+    }
+
+    /**
+     * Test injecting timeouts for calls that are in flight.
+     */
+    @ClusterTest
+    public void testCallInFlightTimeouts() throws Exception {
+        var config = Map.of(
+                AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, 
clusterInstance.bootstrapServers(),
+                AdminClientConfig.RETRIES_CONFIG, "0",
+                // Set an extremely large overall API timeout to ensure the
+                // FailureInjectingTimeoutProcessor triggers before the 
API-level timeout does.
+                AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, "100000000"
+        );
+
+        var factory = new FailureInjectingTimeoutProcessorFactory();
+        try (var client = KafkaAdminClient.createInternal(new 
AdminClientConfig(config), factory)) {
+
+            var future = client.createTopics(Stream.of("mytopic1", "mytopic2")
+                    .map(t -> new NewTopic(t, 1, (short) 1)).toList(),
+                    new CreateTopicsOptions().validateOnly(true)).all();
+
+            var e = assertThrows(ExecutionException.class, future::get);
+            assertInstanceOf(TimeoutException.class, e.getCause());
+
+            var future2 = client.createTopics(Stream.of("mytopic3", "mytopic4")
+                    .map(t -> new NewTopic(t, 1, (short) 1)).toList(),
+                    new CreateTopicsOptions().validateOnly(true)).all();
+            future2.get();
+            assertEquals(1, factory.failuresInjected());
+        }
+    }
+
+
+    static class FailureInjectingTimeoutProcessorFactory extends 
KafkaAdminClient.TimeoutProcessorFactory {
+
+        private int numTries = 0;
+
+        private int failuresInjected = 0;
+
+        @Override
+        KafkaAdminClient.TimeoutProcessor create(long now) {
+            return new FailureInjectingTimeoutProcessor(now);
+        }
+
+        synchronized boolean shouldInjectFailure() {
+            numTries++;
+            if (numTries == 1) {
+                failuresInjected++;
+                return true;
+            }
+            return false;
+        }
+
+        synchronized int failuresInjected() {
+            return failuresInjected;
+        }
+
+        final class FailureInjectingTimeoutProcessor extends 
KafkaAdminClient.TimeoutProcessor {
+            FailureInjectingTimeoutProcessor(long now) {
+                super(now);
+            }
+
+            @Override
+            boolean callHasExpired(KafkaAdminClient.Call call) {
+                if ((!call.isInternal()) && shouldInjectFailure()) {
+                    log.debug("Injecting timeout for {}.", call);
+                    return true;
+                } else {
+                    boolean ret = super.callHasExpired(call);
+                    log.debug("callHasExpired({}) = {}", call, ret);
+                    return ret;
+                }
+            }
+        }
+    }
+}
diff --git 
a/clients/src/testFixtures/java/org/apache/kafka/clients/admin/FailureInjectingTimeoutProcessorFactory.java
 
b/clients/src/testFixtures/java/org/apache/kafka/clients/admin/FailureInjectingTimeoutProcessorFactory.java
deleted file mode 100644
index 61e9341944c..00000000000
--- 
a/clients/src/testFixtures/java/org/apache/kafka/clients/admin/FailureInjectingTimeoutProcessorFactory.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.clients.admin;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class FailureInjectingTimeoutProcessorFactory extends 
KafkaAdminClient.TimeoutProcessorFactory {
-
-    private static final Logger log = 
LoggerFactory.getLogger(FailureInjectingTimeoutProcessorFactory.class);
-
-    private int numTries = 0;
-
-    private int failuresInjected = 0;
-
-    @Override
-    public KafkaAdminClient.TimeoutProcessor create(long now) {
-        return new FailureInjectingTimeoutProcessor(now);
-    }
-
-    synchronized boolean shouldInjectFailure() {
-        numTries++;
-        if (numTries == 1) {
-            failuresInjected++;
-            return true;
-        }
-        return false;
-    }
-
-    public synchronized int failuresInjected() {
-        return failuresInjected;
-    }
-
-    public final class FailureInjectingTimeoutProcessor extends 
KafkaAdminClient.TimeoutProcessor {
-        public FailureInjectingTimeoutProcessor(long now) {
-            super(now);
-        }
-
-        boolean callHasExpired(KafkaAdminClient.Call call) {
-            if ((!call.isInternal()) && shouldInjectFailure()) {
-                log.debug("Injecting timeout for {}.", call);
-                return true;
-            } else {
-                boolean ret = super.callHasExpired(call);
-                log.debug("callHasExpired({}) = {}", call, ret);
-                return ret;
-            }
-        }
-    }
-}
diff --git 
a/clients/src/testFixtures/java/org/apache/kafka/clients/admin/KafkaAdminClientInternalFactory.java
 
b/clients/src/testFixtures/java/org/apache/kafka/clients/admin/KafkaAdminClientInternalFactory.java
deleted file mode 100644
index 72d977e2adf..00000000000
--- 
a/clients/src/testFixtures/java/org/apache/kafka/clients/admin/KafkaAdminClientInternalFactory.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.clients.admin;
-
-public class KafkaAdminClientInternalFactory {
-    public static KafkaAdminClient createInternal(AdminClientConfig config, 
KafkaAdminClient.TimeoutProcessorFactory timeoutProcessorFactory) {
-        return KafkaAdminClient.createInternal(config, 
timeoutProcessorFactory);
-    }
-}
diff --git 
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
index bcc2f8d4817..2e4b7acf26f 100644
--- 
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
@@ -1854,24 +1854,7 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     assertTrue(endTimeMs > startTimeMs, "Expected the timeout to take at least 
one millisecond.")
   }
 
-  /**
-    * Test injecting timeouts for calls that are in flight.
-    */
-  @Test
-  def testCallInFlightTimeouts(): Unit = {
-    val config = createConfig
-    config.put(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, "100000000")
-    config.put(AdminClientConfig.RETRIES_CONFIG, "0")
-    val factory = new FailureInjectingTimeoutProcessorFactory()
-    client = KafkaAdminClientInternalFactory.createInternal(new 
AdminClientConfig(config), factory)
-    val future = client.createTopics(Seq("mytopic", "mytopic2").map(new 
NewTopic(_, 1, 1.toShort)).asJava,
-        new CreateTopicsOptions().validateOnly(true)).all()
-    assertFutureThrows(classOf[TimeoutException], future)
-    val future2 = client.createTopics(Seq("mytopic3", "mytopic4").map(new 
NewTopic(_, 1, 1.toShort)).asJava,
-      new CreateTopicsOptions().validateOnly(true)).all()
-    future2.get
-    assertEquals(1, factory.failuresInjected)
-  }
+
 
   @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
   @MethodSource(Array("getTestGroupProtocolParametersAll"))

Reply via email to