This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new de38d89e1d3 MINOR: Rewrite DefaultApiVersionManagerTest from Scala to
Java and move it to server module (#22109)
de38d89e1d3 is described below
commit de38d89e1d31ca92a4fbcc49ebe6ff36ac9d77a9
Author: Jiayao Sun <[email protected]>
AuthorDate: Mon May 4 12:04:06 2026 +1200
MINOR: Rewrite DefaultApiVersionManagerTest from Scala to Java and move it
to server module (#22109)
Rewrite by Java
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../server/DefaultApiVersionManagerTest.scala | 149 -------------------
.../kafka/server/DefaultApiVersionManagerTest.java | 159 +++++++++++++++++++++
2 files changed, 159 insertions(+), 149 deletions(-)
diff --git
a/core/src/test/scala/unit/kafka/server/DefaultApiVersionManagerTest.scala
b/core/src/test/scala/unit/kafka/server/DefaultApiVersionManagerTest.scala
deleted file mode 100644
index e26f035a2d2..00000000000
--- a/core/src/test/scala/unit/kafka/server/DefaultApiVersionManagerTest.scala
+++ /dev/null
@@ -1,149 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.server
-
-import org.apache.kafka.clients.NodeApiVersions
-import org.apache.kafka.common.message.ApiMessageType.ListenerType
-import org.apache.kafka.common.metadata.FeatureLevelRecord
-import org.apache.kafka.common.protocol.ApiKeys
-import org.apache.kafka.image.{MetadataDelta, MetadataImage,
MetadataProvenance}
-import org.apache.kafka.metadata.KRaftMetadataCache
-import org.apache.kafka.server.{BrokerFeatures, DefaultApiVersionManager}
-import org.apache.kafka.server.common.{KRaftVersion, MetadataVersion}
-import org.junit.jupiter.api.Test
-import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.EnumSource
-import org.mockito.Mockito
-
-import java.util.Optional
-import java.util.function.Supplier
-import scala.jdk.CollectionConverters._
-
-class DefaultApiVersionManagerTest {
- private val brokerFeatures = BrokerFeatures.createDefault(true)
- private val metadataCache = {
- val cache = new KRaftMetadataCache(1, () => KRaftVersion.LATEST_PRODUCTION)
- val delta = new MetadataDelta.Builder()
- .setImage(MetadataImage.EMPTY)
- .build()
- delta.replay(new FeatureLevelRecord()
- .setName(MetadataVersion.FEATURE_NAME)
- .setFeatureLevel(MetadataVersion.latestProduction().featureLevel())
- )
- cache.setImage(delta.apply(MetadataProvenance.EMPTY))
- cache
- }
-
- @ParameterizedTest
- @EnumSource(classOf[ListenerType])
- def testApiScope(apiScope: ListenerType): Unit = {
- val nodeApiVersionsSupplier =
Mockito.mock(classOf[Supplier[Optional[NodeApiVersions]]])
- val versionManager = new DefaultApiVersionManager(
- apiScope,
- nodeApiVersionsSupplier,
- brokerFeatures,
- metadataCache,
- true,
- Optional.empty
- )
- assertTrue(ApiKeys.apisForListener(apiScope).asScala.forall { apiKey =>
- apiKey.allVersions.asScala.forall { version =>
- versionManager.isApiEnabled(apiKey, version)
- }
- })
- }
-
- @ParameterizedTest
- @EnumSource(classOf[ListenerType])
- def testDisabledApis(apiScope: ListenerType): Unit = {
- val nodeApiVersionsSupplier =
Mockito.mock(classOf[Supplier[Optional[NodeApiVersions]]])
- val versionManager = new DefaultApiVersionManager(
- apiScope,
- nodeApiVersionsSupplier,
- brokerFeatures,
- metadataCache,
- false,
- Optional.empty
- )
-
- ApiKeys.apisForListener(apiScope).forEach { apiKey =>
- if (apiKey.id == ApiKeys.API_VERSIONS.id) {
- // ApiVersions API is a particular case. The client always send the
highest version
- // that it supports and the server fails back to version 0 if it does
not know it.
- // See ApiKeys.isVersionEnabled for more information (KIP-511).
- // Because API_VERSIONS has an unstable version while KIP-1242 is
under development,
- // we need a special case in this test. This assertion will start
failing when the
- // API is no longer unstable and the special case can be removed.
- assertTrue(apiKey.messageType.latestVersionUnstable());
- } else if (apiKey.messageType.latestVersionUnstable()) {
- assertFalse(versionManager.isApiEnabled(apiKey, apiKey.latestVersion),
- s"$apiKey version ${apiKey.latestVersion} should be disabled.")
- }
- }
- }
-
- @Test
- def testControllerApiIntersection(): Unit = {
- val controllerMinVersion: Short = 3
- val controllerMaxVersion: Short = 5
-
- val nodeApiVersionsSupplier =
Mockito.mock(classOf[Supplier[Optional[NodeApiVersions]]])
-
-
Mockito.when(nodeApiVersionsSupplier.get).thenReturn(Optional.of(NodeApiVersions.create(
- ApiKeys.CREATE_TOPICS.id,
- controllerMinVersion,
- controllerMaxVersion
- )))
-
- val versionManager = new DefaultApiVersionManager(
- ListenerType.BROKER,
- nodeApiVersionsSupplier,
- brokerFeatures,
- metadataCache,
- true,
- Optional.empty
- )
-
- val apiVersionsResponse = versionManager.apiVersionResponse(0, false)
- val alterConfigVersion =
apiVersionsResponse.data.apiKeys.find(ApiKeys.CREATE_TOPICS.id)
- assertNotNull(alterConfigVersion)
- assertEquals(controllerMinVersion, alterConfigVersion.minVersion)
- assertEquals(controllerMaxVersion, alterConfigVersion.maxVersion)
- }
-
- @Test
- def testEnvelopeDisabledForKRaftBroker(): Unit = {
- val nodeApiVersionsSupplier =
Mockito.mock(classOf[Supplier[Optional[NodeApiVersions]]])
- Mockito.when(nodeApiVersionsSupplier.get).thenReturn(Optional.empty())
-
- val versionManager = new DefaultApiVersionManager(
- ListenerType.BROKER,
- nodeApiVersionsSupplier,
- brokerFeatures,
- metadataCache,
- true,
- Optional.empty
- )
- assertFalse(versionManager.isApiEnabled(ApiKeys.ENVELOPE,
ApiKeys.ENVELOPE.latestVersion))
-
assertFalse(ApiKeys.apisForListener(versionManager.listenerType()).contains(ApiKeys.ENVELOPE))
-
- val apiVersionsResponse = versionManager.apiVersionResponse(0, false)
- val envelopeVersion =
apiVersionsResponse.data.apiKeys.find(ApiKeys.ENVELOPE.id)
- assertNull(envelopeVersion)
- }
-}
diff --git
a/server/src/test/java/org/apache/kafka/server/DefaultApiVersionManagerTest.java
b/server/src/test/java/org/apache/kafka/server/DefaultApiVersionManagerTest.java
new file mode 100644
index 00000000000..99b826d9d55
--- /dev/null
+++
b/server/src/test/java/org/apache/kafka/server/DefaultApiVersionManagerTest.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server;
+
+import org.apache.kafka.clients.NodeApiVersions;
+import org.apache.kafka.common.message.ApiMessageType.ListenerType;
+import org.apache.kafka.common.metadata.FeatureLevelRecord;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.image.MetadataProvenance;
+import org.apache.kafka.metadata.KRaftMetadataCache;
+import org.apache.kafka.server.common.KRaftVersion;
+import org.apache.kafka.server.common.MetadataVersion;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+
+import java.util.Optional;
+import java.util.function.Supplier;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class DefaultApiVersionManagerTest {
+
+ private final BrokerFeatures brokerFeatures =
BrokerFeatures.createDefault(true);
+
+ private final KRaftMetadataCache metadataCache = createMetadataCache();
+
+ private static KRaftMetadataCache createMetadataCache() {
+ var cache = new KRaftMetadataCache(1, () ->
KRaftVersion.LATEST_PRODUCTION);
+ var delta = new MetadataDelta.Builder()
+ .setImage(MetadataImage.EMPTY)
+ .build();
+ delta.replay(new FeatureLevelRecord()
+ .setName(MetadataVersion.FEATURE_NAME)
+ .setFeatureLevel(MetadataVersion.latestProduction().featureLevel())
+ );
+ cache.setImage(delta.apply(MetadataProvenance.EMPTY));
+ return cache;
+ }
+
+ @ParameterizedTest
+ @EnumSource(ListenerType.class)
+ public void testApiScope(ListenerType apiScope) {
+ Supplier<Optional<NodeApiVersions>> nodeApiVersionsSupplier =
Optional::empty;
+ var versionManager = new DefaultApiVersionManager(
+ apiScope,
+ nodeApiVersionsSupplier,
+ brokerFeatures,
+ metadataCache,
+ true,
+ Optional.empty()
+ );
+ for (ApiKeys apiKey : ApiKeys.apisForListener(apiScope)) {
+ for (short version : apiKey.allVersions()) {
+ assertTrue(versionManager.isApiEnabled(apiKey, version));
+ }
+ }
+ }
+
+ @ParameterizedTest
+ @EnumSource(ListenerType.class)
+ public void testDisabledApis(ListenerType apiScope) {
+ Supplier<Optional<NodeApiVersions>> nodeApiVersionsSupplier =
Optional::empty;
+ var versionManager = new DefaultApiVersionManager(
+ apiScope,
+ nodeApiVersionsSupplier,
+ brokerFeatures,
+ metadataCache,
+ false,
+ Optional.empty()
+ );
+
+ for (ApiKeys apiKey : ApiKeys.apisForListener(apiScope)) {
+ if (apiKey.id == ApiKeys.API_VERSIONS.id) {
+ // ApiVersions API is a particular case. The client always
sends the highest version
+ // that it supports and the server falls back to version 0 if
it does not know it.
+ // See ApiKeys.isVersionEnabled for more information (KIP-511).
+ // Because API_VERSIONS has an unstable version while KIP-1242
is under development,
+ // we need a special case in this test. This assertion will
start failing when the
+ // API is no longer unstable and the special case can be
removed.
+ assertTrue(apiKey.messageType.latestVersionUnstable());
+ } else if (apiKey.messageType.latestVersionUnstable()) {
+ assertFalse(
+ versionManager.isApiEnabled(apiKey,
apiKey.latestVersion()),
+ apiKey + " version " + apiKey.latestVersion() + " should
be disabled."
+ );
+ }
+ }
+ }
+
+ @Test
+ public void testControllerApiIntersection() {
+ short controllerMinVersion = 3;
+ short controllerMaxVersion = 5;
+
+ Supplier<Optional<NodeApiVersions>> nodeApiVersionsSupplier = () ->
Optional.of(
+ NodeApiVersions.create(
+ ApiKeys.CREATE_TOPICS.id,
+ controllerMinVersion,
+ controllerMaxVersion
+ ));
+
+ var versionManager = new DefaultApiVersionManager(
+ ListenerType.BROKER,
+ nodeApiVersionsSupplier,
+ brokerFeatures,
+ metadataCache,
+ true,
+ Optional.empty()
+ );
+
+ var apiVersionsResponse = versionManager.apiVersionResponse(0, false);
+ var alterConfigVersion =
apiVersionsResponse.data().apiKeys().find(ApiKeys.CREATE_TOPICS.id);
+ assertNotNull(alterConfigVersion);
+ assertEquals(controllerMinVersion, alterConfigVersion.minVersion());
+ assertEquals(controllerMaxVersion, alterConfigVersion.maxVersion());
+ }
+
+ @Test
+ public void testEnvelopeDisabledForKRaftBroker() {
+ Supplier<Optional<NodeApiVersions>> nodeApiVersionsSupplier =
Optional::empty;
+
+ DefaultApiVersionManager versionManager = new DefaultApiVersionManager(
+ ListenerType.BROKER,
+ nodeApiVersionsSupplier,
+ brokerFeatures,
+ metadataCache,
+ true,
+ Optional.empty()
+ );
+ assertFalse(versionManager.isApiEnabled(ApiKeys.ENVELOPE,
ApiKeys.ENVELOPE.latestVersion()));
+
assertFalse(ApiKeys.apisForListener(versionManager.listenerType()).contains(ApiKeys.ENVELOPE));
+
+ var apiVersionsResponse = versionManager.apiVersionResponse(0, false);
+ var envelopeVersion =
apiVersionsResponse.data().apiKeys().find(ApiKeys.ENVELOPE.id);
+ assertNull(envelopeVersion);
+ }
+}