This is an automated email from the ASF dual-hosted git repository.

jsancio pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 075401e2e1d KAFKA-19964; ApiVersions response return unknown features 
if the cluster metadata HWM is unknown (#21122)
075401e2e1d is described below

commit 075401e2e1dd6ad5be85e75d81030542c3a840bb
Author: mannoopj <[email protected]>
AuthorDate: Mon Jun 1 06:02:44 2026 -0400

    KAFKA-19964; ApiVersions response return unknown features if the cluster 
metadata HWM is unknown (#21122)
    
    FeaturesPublisher's default finalizedFeatures assumes a metadata version
    of 7 when there is no finalized level for metadata.version. Instead of
    defaulting to 7, it should not report a value and report -1 for the epoch
    indicating it is unknown.
    
    The FinalizedFeatures type now support 3 different kind of values:
    
    1. FinalizedFeatures#unknown() returns an instance representing unknown
    metadata version (no finalized features, epoch -1)
    2. FinalizedFeatures#fromMetadataVersion(MetadataVersion) creates
    instance with only the metadata version known
    3. FinalizedFeatures#of(MetadataVersion, Map<String, Short>, long)
    creates instance with full features information
    
    With this implementation the ApiVersions response will return a finalized
    epoch of -1 and no finalized versions when the finalized features are
    unknown.
    
    Reviewers: José Armando García Sancio <[email protected]>, Kevin Wu
     <[email protected]>
---
 .../main/scala/kafka/server/ControllerApis.scala   |   5 +-
 .../main/scala/kafka/server/ControllerServer.scala |  12 +-
 .../server/metadata/BrokerMetadataPublisher.scala  |   4 +-
 .../main/scala/kafka/tools/TestRaftServer.scala    |   2 +-
 .../TransactionCoordinatorConcurrencyTest.scala    |   2 +-
 .../transaction/TransactionStateManagerTest.scala  |   4 +-
 .../scala/unit/kafka/network/ProcessorTest.scala   |   2 +-
 .../unit/kafka/network/SocketServerTest.scala      |   2 +-
 .../unit/kafka/server/ControllerApisTest.scala     |   2 +-
 .../scala/unit/kafka/server/KafkaApisTest.scala    |   4 +-
 .../metadata/KRaftMetadataRequestBenchmark.java    |   2 +-
 .../apache/kafka/metadata/KRaftMetadataCache.java  |   6 +-
 .../metadata/publisher/FeaturesPublisher.java      |   8 +-
 .../kafka/server/common/FinalizedFeatures.java     | 148 +++++++++++++++++++--
 .../kafka/server/common/FinalizedFeaturesTest.java |  74 ++++++++++-
 .../kafka/server/SimpleApiVersionManagerTest.java  |  45 +++++++
 16 files changed, 284 insertions(+), 38 deletions(-)

diff --git a/core/src/main/scala/kafka/server/ControllerApis.scala 
b/core/src/main/scala/kafka/server/ControllerApis.scala
index a36672311fd..4ba6c4069a0 100644
--- a/core/src/main/scala/kafka/server/ControllerApis.scala
+++ b/core/src/main/scala/kafka/server/ControllerApis.scala
@@ -1060,7 +1060,10 @@ class ControllerApis(
   def handleDescribeCluster(request: Request): CompletableFuture[Unit] = {
     // Nearly all RPCs should check MetadataVersion inside the 
QuorumController. However, this
     // RPC is consulting a cache which lives outside the QC. So we check 
MetadataVersion here.
-    if 
(!apiVersionManager.features.metadataVersion().isControllerRegistrationSupported)
 {
+    if (apiVersionManager.features.isUnknown) {
+      throw new UnsupportedVersionException("There is no finalized 
MetadataVersion, so " +
+        "direct-to-controller communication is not supported.")
+    } else if 
(!apiVersionManager.features.metadataVersionOrThrow.isControllerRegistrationSupported)
 {
       throw new UnsupportedVersionException("Direct-to-controller 
communication is not " +
         "supported with the current MetadataVersion.")
     }
diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala 
b/core/src/main/scala/kafka/server/ControllerServer.scala
index 902252233ad..e0e991cfbb5 100644
--- a/core/src/main/scala/kafka/server/ControllerServer.scala
+++ b/core/src/main/scala/kafka/server/ControllerServer.scala
@@ -162,9 +162,15 @@ class ControllerServer(
       val apiVersionManager = new SimpleApiVersionManager(
         ListenerType.CONTROLLER,
         config.unstableApiVersionsEnabled,
-        () => featuresPublisher.features().setFinalizedLevel(
-          KRaftVersion.FEATURE_NAME,
-          raftManager.client.kraftVersion().featureLevel())
+        () => {
+          val features = featuresPublisher.features()
+          if (!features.isUnknown)
+            features.setFinalizedLevel(
+              KRaftVersion.FEATURE_NAME,
+              raftManager.client.kraftVersion().featureLevel())
+          else
+            features
+        }
       )
 
       //  metrics will be set to null when closing a controller, so we should 
recreate it for testing
diff --git 
a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala 
b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
index 770dacb868f..472ccdf0f37 100644
--- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
+++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
@@ -104,7 +104,7 @@ class BrokerMetadataPublisher(
   /**
    * The share version being used in the broker metadata.
    */
-  private var finalizedShareVersion: Short = 
FinalizedFeatures.fromKRaftVersion(MINIMUM_VERSION).finalizedFeatures().getOrDefault(ShareVersion.FEATURE_NAME,
 0.toShort)
+  private var finalizedShareVersion: Short = 
FinalizedFeatures.fromMetadataVersion(MINIMUM_VERSION).finalizedFeatures().getOrDefault(ShareVersion.FEATURE_NAME,
 0.toShort)
 
   override def name(): String = "BrokerMetadataPublisher"
 
@@ -225,7 +225,7 @@ class BrokerMetadataPublisher(
 
       if (delta.featuresDelta != null) {
         try {
-          val newFinalizedFeatures = new 
FinalizedFeatures(newImage.features.metadataVersionOrThrow, 
newImage.features.finalizedVersions, newImage.provenance.lastContainedOffset)
+          val newFinalizedFeatures = 
FinalizedFeatures.of(newImage.features.metadataVersionOrThrow, 
newImage.features.finalizedVersions, newImage.provenance.lastContainedOffset)
           val newFinalizedShareVersion = 
newFinalizedFeatures.finalizedFeatures().getOrDefault(ShareVersion.FEATURE_NAME,
 0.toShort)
           // Share version feature has been toggled.
           if (newFinalizedShareVersion != finalizedShareVersion) {
diff --git a/core/src/main/scala/kafka/tools/TestRaftServer.scala 
b/core/src/main/scala/kafka/tools/TestRaftServer.scala
index 4690fabf7e3..419cbe397d6 100644
--- a/core/src/main/scala/kafka/tools/TestRaftServer.scala
+++ b/core/src/main/scala/kafka/tools/TestRaftServer.scala
@@ -83,7 +83,7 @@ class TestRaftServer(
     val apiVersionManager = new SimpleApiVersionManager(
       ListenerType.CONTROLLER,
       true,
-      () => 
FinalizedFeatures.fromKRaftVersion(MetadataVersion.MINIMUM_VERSION))
+      () => 
FinalizedFeatures.fromMetadataVersion(MetadataVersion.MINIMUM_VERSION))
     socketServer = new SocketServer(config, metrics, time, credentialProvider, 
apiVersionManager)
 
     val endpoints = Endpoints.fromInetSocketAddresses(
diff --git 
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala
 
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala
index 26317d8535d..0be4317134a 100644
--- 
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala
+++ 
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala
@@ -86,7 +86,7 @@ class TransactionCoordinatorConcurrencyTest extends 
AbstractCoordinatorConcurren
       any[ListenerName])
     ).thenReturn(Optional.of(brokerNode))
     when(metadataCache.features()).thenReturn {
-      new FinalizedFeatures(
+      FinalizedFeatures.of(
         MetadataVersion.latestTesting(),
         util.Map.of(TransactionVersion.FEATURE_NAME, 
TransactionVersion.TV_2.featureLevel()),
         0)
diff --git 
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
 
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
index 84c58f40c87..ea7e44b4149 100644
--- 
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
+++ 
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
@@ -71,7 +71,7 @@ class TransactionStateManagerTest {
   val metadataCache: MetadataCache = mock(classOf[MetadataCache])
 
   when(metadataCache.features()).thenReturn {
-    new FinalizedFeatures(
+    FinalizedFeatures.of(
       MetadataVersion.latestTesting(),
       util.Map.of(TransactionVersion.FEATURE_NAME, 
TransactionVersion.TV_2.featureLevel()),
       0)
@@ -1356,7 +1356,7 @@ class TransactionStateManagerTest {
   def testTransactionVersionInTransactionManager(transactionVersion: 
TransactionVersion): Unit = {
     val metadataCache = mock(classOf[MetadataCache])
     when(metadataCache.features()).thenReturn {
-      new FinalizedFeatures(
+      FinalizedFeatures.of(
         MetadataVersion.latestTesting(),
         util.Map.of(TransactionVersion.FEATURE_NAME, 
transactionVersion.featureLevel()),
         0)
diff --git a/core/src/test/scala/unit/kafka/network/ProcessorTest.scala 
b/core/src/test/scala/unit/kafka/network/ProcessorTest.scala
index 980ca2cadb4..51d3de41250 100644
--- a/core/src/test/scala/unit/kafka/network/ProcessorTest.scala
+++ b/core/src/test/scala/unit/kafka/network/ProcessorTest.scala
@@ -42,7 +42,7 @@ class ProcessorTest {
     val requestHeader = RequestTestUtils.serializeRequestHeader(
       new RequestHeader(ApiKeys.INIT_PRODUCER_ID, 0, "clientid", 0))
     val apiVersionManager = new 
SimpleApiVersionManager(ListenerType.CONTROLLER, true,
-      () => new FinalizedFeatures(MetadataVersion.latestTesting(), 
util.Map.of[String, java.lang.Short], 0))
+      () => FinalizedFeatures.of(MetadataVersion.latestTesting(), 
util.Map.of[String, java.lang.Short], 0))
     val e = assertThrows(classOf[InvalidRequestException],
       (() => Processor.parseRequestHeader(apiVersionManager, requestHeader)): 
Executable,
       "INIT_PRODUCER_ID with listener type CONTROLLER should throw 
InvalidRequestException exception")
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala 
b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index 978fc39715f..5dc29bf5576 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -86,7 +86,7 @@ class SocketServerTest {
   ServerTestUtils.clearYammerMetrics()
 
   private val apiVersionManager = new 
SimpleApiVersionManager(ListenerType.BROKER, true,
-    () => new FinalizedFeatures(MetadataVersion.latestTesting(), 
util.Map.of[String, java.lang.Short], 0))
+    () => FinalizedFeatures.of(MetadataVersion.latestTesting(), 
util.Map.of[String, java.lang.Short], 0))
   var server: SocketServer = _
   val sockets = new ArrayBuffer[Socket]
 
diff --git a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala 
b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
index 7f30d8f2971..bc79fd11735 100644
--- a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
@@ -176,7 +176,7 @@ class ControllerApisTest {
       new SimpleApiVersionManager(
         ListenerType.CONTROLLER,
         true,
-        () => 
FinalizedFeatures.fromKRaftVersion(MetadataVersion.latestTesting())),
+        () => 
FinalizedFeatures.fromMetadataVersion(MetadataVersion.latestTesting())),
       metadataCache
     )
   }
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 6ab30fce2fd..5471c410293 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -190,7 +190,7 @@ class KafkaApisTest extends Logging {
     val apiVersionManager = new SimpleApiVersionManager(
       ListenerType.BROKER,
       true,
-      () => new FinalizedFeatures(MetadataVersion.latestTesting(), 
util.Map.of[String, java.lang.Short], 0))
+      () => FinalizedFeatures.of(MetadataVersion.latestTesting(), 
util.Map.of[String, java.lang.Short], 0))
 
     setupFeatures(featureVersions)
 
@@ -224,7 +224,7 @@ class KafkaApisTest extends Logging {
     if (featureVersions.isEmpty) return
 
     when(metadataCache.features()).thenReturn {
-      new FinalizedFeatures(
+      FinalizedFeatures.of(
         MetadataVersion.latestTesting,
         featureVersions.map { featureVersion =>
           featureVersion.featureName -> 
featureVersion.featureLevel.asInstanceOf[java.lang.Short]
diff --git 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java
 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java
index aba475e86cc..5e7de246783 100644
--- 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java
+++ 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java
@@ -209,7 +209,7 @@ public class KRaftMetadataRequestBenchmark {
                 setApiVersionManager(new SimpleApiVersionManager(
                         ApiMessageType.ListenerType.BROKER,
                         false,
-                        () -> 
FinalizedFeatures.fromKRaftVersion(MetadataVersion.latestTesting()))).
+                        () -> 
FinalizedFeatures.fromMetadataVersion(MetadataVersion.latestTesting()))).
                 setGroupConfigManager(groupConfigManager).
                 build();
     }
diff --git 
a/metadata/src/main/java/org/apache/kafka/metadata/KRaftMetadataCache.java 
b/metadata/src/main/java/org/apache/kafka/metadata/KRaftMetadataCache.java
index 283c062a9ff..f05e34e408d 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/KRaftMetadataCache.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/KRaftMetadataCache.java
@@ -518,6 +518,10 @@ public class KRaftMetadataCache implements MetadataCache {
         if (kraftVersionLevel > 0) {
             finalizedFeatures.put(KRaftVersion.FEATURE_NAME, 
kraftVersionLevel);
         }
-        return new 
FinalizedFeatures(image.features().metadataVersionOrThrow(), finalizedFeatures, 
image.highestOffsetAndEpoch().offset());
+        return FinalizedFeatures.of(
+            image.features().metadataVersionOrThrow(),
+            finalizedFeatures,
+            image.highestOffsetAndEpoch().offset()
+        );
     }
 }
diff --git 
a/metadata/src/main/java/org/apache/kafka/metadata/publisher/FeaturesPublisher.java
 
b/metadata/src/main/java/org/apache/kafka/metadata/publisher/FeaturesPublisher.java
index 678a855ceaa..f4cbb4faddc 100644
--- 
a/metadata/src/main/java/org/apache/kafka/metadata/publisher/FeaturesPublisher.java
+++ 
b/metadata/src/main/java/org/apache/kafka/metadata/publisher/FeaturesPublisher.java
@@ -27,13 +27,10 @@ import org.apache.kafka.server.fault.FaultHandler;
 
 import org.slf4j.Logger;
 
-import static org.apache.kafka.server.common.MetadataVersion.MINIMUM_VERSION;
-
-
 public class FeaturesPublisher implements MetadataPublisher {
     private final Logger log;
     private final FaultHandler faultHandler;
-    private volatile FinalizedFeatures finalizedFeatures = 
FinalizedFeatures.fromKRaftVersion(MINIMUM_VERSION);
+    private volatile FinalizedFeatures finalizedFeatures = 
FinalizedFeatures.unknown();
 
     public FeaturesPublisher(
         LogContext logContext,
@@ -60,7 +57,8 @@ public class FeaturesPublisher implements MetadataPublisher {
     ) {
         try {
             if (delta.featuresDelta() != null) {
-                FinalizedFeatures newFinalizedFeatures = new 
FinalizedFeatures(newImage.features().metadataVersionOrThrow(),
+                FinalizedFeatures newFinalizedFeatures = FinalizedFeatures.of(
+                    newImage.features().metadataVersionOrThrow(),
                     newImage.features().finalizedVersions(),
                     newImage.provenance().lastContainedOffset()
                 );
diff --git 
a/server-common/src/main/java/org/apache/kafka/server/common/FinalizedFeatures.java
 
b/server-common/src/main/java/org/apache/kafka/server/common/FinalizedFeatures.java
index 9133f0db798..dd39984fbcd 100644
--- 
a/server-common/src/main/java/org/apache/kafka/server/common/FinalizedFeatures.java
+++ 
b/server-common/src/main/java/org/apache/kafka/server/common/FinalizedFeatures.java
@@ -19,28 +19,128 @@ package org.apache.kafka.server.common;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 
-public record FinalizedFeatures(
-    MetadataVersion metadataVersion,
-    Map<String, Short> finalizedFeatures,
-    long finalizedFeaturesEpoch
-) {
-    public static FinalizedFeatures fromKRaftVersion(MetadataVersion version) {
-        return new FinalizedFeatures(version, Map.of(), -1);
-    }
+/**
+ * Represents the finalized feature levels for a Kafka cluster.
+ * <p>
+ * This class can be in one of three states:
+ * <ul>
+ *   <li>Unknown - the metadata version has not been committed yet, e.g. 
before a quorum is
+ *       formed. Used as the initial state in {@code FeaturesPublisher}. (use 
{@link #unknown()})</li>
+ *   <li>Metadata version only - the metadata version is known but no 
additional features or
+ *       epoch have been set. Used when only the metadata version needs to be 
represented
+ *       without a full set of finalized features. (use {@link 
#fromMetadataVersion(MetadataVersion)})</li>
+ *   <li>Full features - metadata version, features map, and epoch are all 
known. Used after
+ *       the controller has committed feature records. (use {@link 
#of(MetadataVersion, Map, long)})</li>
+ * </ul>
+ */
+public final class FinalizedFeatures {
+    private static final FinalizedFeatures UNKNOWN = new 
FinalizedFeatures(Optional.empty(), Map.of(), -1);
+
+    private final Optional<MetadataVersion> metadataVersion;
+    private final Map<String, Short> finalizedFeatures;
+    private final long finalizedFeaturesEpoch;
 
-    public FinalizedFeatures(
-        MetadataVersion metadataVersion,
+    private FinalizedFeatures(
+        Optional<MetadataVersion> metadataVersion,
         Map<String, Short> finalizedFeatures,
         long finalizedFeaturesEpoch
     ) {
-        this.metadataVersion = Objects.requireNonNull(metadataVersion);
+        this.metadataVersion = metadataVersion;
         this.finalizedFeatures = new HashMap<>(finalizedFeatures);
         this.finalizedFeaturesEpoch = finalizedFeaturesEpoch;
-        this.finalizedFeatures.put(MetadataVersion.FEATURE_NAME, 
metadataVersion.featureLevel());
+        metadataVersion.ifPresent(mv ->
+            this.finalizedFeatures.put(MetadataVersion.FEATURE_NAME, 
mv.featureLevel()));
+    }
+
+    /**
+     * Returns a sentinel value representing unknown finalized features.
+     *
+     * @return the unknown finalized features instance
+     */
+    public static FinalizedFeatures unknown() {
+        return UNKNOWN;
+    }
+
+    /**
+     * Creates a new instance from the given KRaft metadata version.
+     *
+     * @param version the metadata version
+     * @return a new FinalizedFeatures instance
+     * @throws NullPointerException if version is null
+     */
+    public static FinalizedFeatures fromMetadataVersion(MetadataVersion 
version) {
+        Objects.requireNonNull(version, "version cannot be null");
+        return new FinalizedFeatures(Optional.of(version), Map.of(), -1);
+    }
+
+    /**
+     * Creates a new instance with the given metadata version, features map, 
and epoch.
+     *
+     * @param metadataVersion the metadata version
+     * @param finalizedFeatures the map of feature names to their finalized 
levels
+     * @param epoch the epoch of the finalized features
+     * @return a new FinalizedFeatures instance
+     * @throws NullPointerException if metadataVersion or finalizedFeatures is 
null
+     */
+    public static FinalizedFeatures of(MetadataVersion metadataVersion, 
Map<String, Short> finalizedFeatures, long epoch) {
+        Objects.requireNonNull(metadataVersion, "metadataVersion cannot be 
null");
+        Objects.requireNonNull(finalizedFeatures, "finalizedFeatures cannot be 
null");
+        return new FinalizedFeatures(Optional.of(metadataVersion), 
finalizedFeatures, epoch);
+    }
+
+    /**
+     * Returns whether the finalized features are unknown.
+     *
+     * @return true if the finalized features are unknown, false otherwise
+     */
+    public boolean isUnknown() {
+        return this == UNKNOWN;
+    }
+
+    /**
+     * Returns the metadata version, throwing an exception if unknown.
+     *
+     * @return the metadata version
+     * @throws IllegalStateException if the metadata version is unknown
+     */
+    public MetadataVersion metadataVersionOrThrow() {
+        return metadataVersion.orElseThrow(() ->
+            new IllegalStateException("Metadata version is unknown"));
+    }
+
+    /**
+     * Returns the map of feature names to their finalized levels.
+     *
+     * @return the finalized features map
+     */
+    public Map<String, Short> finalizedFeatures() {
+        return finalizedFeatures;
     }
 
+    /**
+     * Returns the epoch of the finalized features.
+     *
+     * @return the finalized features epoch
+     */
+    public long finalizedFeaturesEpoch() {
+        return finalizedFeaturesEpoch;
+    }
+
+    /**
+     * Creates a new instance with the specified feature level set or removed.
+     * If level is 0, the feature is removed. Otherwise, the feature is set to 
the given level.
+     *
+     * @param key the feature name
+     * @param level the feature level (0 to remove)
+     * @return a new FinalizedFeatures instance with the updated feature level
+     * @throws IllegalStateException if this is the unknown instance
+     */
     public FinalizedFeatures setFinalizedLevel(String key, short level) {
+        if (isUnknown()) {
+            throw new IllegalStateException("Cannot set finalized level on 
unknown FinalizedFeatures");
+        }
         if (level == (short) 0) {
             if (finalizedFeatures.containsKey(key)) {
                 Map<String, Short> newFinalizedFeatures = new 
HashMap<>(finalizedFeatures);
@@ -61,4 +161,28 @@ public record FinalizedFeatures(
                 finalizedFeaturesEpoch);
         }
     }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        FinalizedFeatures that = (FinalizedFeatures) o;
+        return finalizedFeaturesEpoch == that.finalizedFeaturesEpoch &&
+                Objects.equals(metadataVersion, that.metadataVersion) &&
+                Objects.equals(finalizedFeatures, that.finalizedFeatures);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(metadataVersion, finalizedFeatures, 
finalizedFeaturesEpoch);
+    }
+
+    @Override
+    public String toString() {
+        return "FinalizedFeatures(" +
+               "metadataVersion=" + metadataVersion +
+               ", finalizedFeatures=" + finalizedFeatures +
+               ", finalizedFeaturesEpoch=" + finalizedFeaturesEpoch +
+               ')';
+    }
 }
diff --git 
a/server-common/src/test/java/org/apache/kafka/server/common/FinalizedFeaturesTest.java
 
b/server-common/src/test/java/org/apache/kafka/server/common/FinalizedFeaturesTest.java
index 1c4c9547cf0..c0dd9c58a83 100644
--- 
a/server-common/src/test/java/org/apache/kafka/server/common/FinalizedFeaturesTest.java
+++ 
b/server-common/src/test/java/org/apache/kafka/server/common/FinalizedFeaturesTest.java
@@ -24,12 +24,54 @@ import java.util.Map;
 import static org.apache.kafka.server.common.MetadataVersion.FEATURE_NAME;
 import static org.apache.kafka.server.common.MetadataVersion.MINIMUM_VERSION;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 class FinalizedFeaturesTest {
+
+    @Test
+    public void testUnknownFeatures() {
+        FinalizedFeatures features = FinalizedFeatures.unknown();
+
+        assertTrue(features.isUnknown());
+        assertTrue(features.finalizedFeatures().isEmpty());
+        assertEquals(-1, features.finalizedFeaturesEpoch());
+    }
+
+    @Test
+    public void testUnknownFeaturesMetadataVersionThrows() {
+        FinalizedFeatures features = FinalizedFeatures.unknown();
+
+        assertThrows(IllegalStateException.class, 
features::metadataVersionOrThrow);
+    }
+
+    @Test
+    public void testUnknownFeaturesIsSingleton() {
+        assertSame(FinalizedFeatures.unknown(), FinalizedFeatures.unknown());
+    }
+
+    @Test
+    public void testFromKRaftVersion() {
+        FinalizedFeatures features = 
FinalizedFeatures.fromMetadataVersion(MINIMUM_VERSION);
+
+        assertFalse(features.isUnknown());
+        assertEquals(MINIMUM_VERSION, features.metadataVersionOrThrow());
+        assertEquals(MINIMUM_VERSION.featureLevel(), 
features.finalizedFeatures().get(FEATURE_NAME));
+        assertEquals(1, features.finalizedFeatures().size());
+        assertEquals(-1, features.finalizedFeaturesEpoch());
+    }
+
+    @Test
+    public void testFromKRaftVersionNullThrows() {
+        assertThrows(NullPointerException.class, () -> 
FinalizedFeatures.fromMetadataVersion(null));
+    }
+
     @Test
     public void testKRaftModeFeatures() {
-        FinalizedFeatures finalizedFeatures = new 
FinalizedFeatures(MINIMUM_VERSION,
+        FinalizedFeatures finalizedFeatures = 
FinalizedFeatures.of(MINIMUM_VERSION,
                 Map.of("foo", (short) 2), 123);
         assertEquals(MINIMUM_VERSION.featureLevel(),
                 finalizedFeatures.finalizedFeatures().get(FEATURE_NAME));
@@ -38,20 +80,44 @@ class FinalizedFeaturesTest {
         assertEquals(2, finalizedFeatures.finalizedFeatures().size());
     }
 
+    @Test
+    public void testOfNullMetadataVersionThrows() {
+        assertThrows(NullPointerException.class,
+            () -> FinalizedFeatures.of(null, Map.of(), 0));
+    }
+
+    @Test
+    public void testOfNullFeaturesMapThrows() {
+        assertThrows(NullPointerException.class,
+            () -> FinalizedFeatures.of(MINIMUM_VERSION, null, 0));
+    }
+
     @Test
     public void testSetFinalizedLevel() {
-        FinalizedFeatures finalizedFeatures = new FinalizedFeatures(
+        FinalizedFeatures finalizedFeatures = FinalizedFeatures.of(
             MINIMUM_VERSION,
             Map.of("foo", (short) 2),
             123
         );
 
-        // Override an existing finalized feature version to 0
         FinalizedFeatures removedFeatures = 
finalizedFeatures.setFinalizedLevel("foo", (short) 0);
         assertNull(removedFeatures.finalizedFeatures().get("foo"));
 
-        // Override a missing finalized feature version to 0
         FinalizedFeatures sameFeatures = 
removedFeatures.setFinalizedLevel("foo", (short) 0);
         assertEquals(sameFeatures.finalizedFeatures(), 
removedFeatures.finalizedFeatures());
     }
+
+    @Test
+    public void testSetFinalizedLevelAdd() {
+        FinalizedFeatures features = FinalizedFeatures.of(MINIMUM_VERSION, 
Map.of(), 123);
+
+        FinalizedFeatures updatedFeatures = features.setFinalizedLevel("bar", 
(short) 5);
+        assertEquals((short) 5, 
updatedFeatures.finalizedFeatures().get("bar"));
+    }
+
+    @Test
+    public void testSetFinalizedLevelOnUnknownThrows() {
+        assertThrows(IllegalStateException.class,
+            () -> FinalizedFeatures.unknown().setFinalizedLevel("foo", (short) 
1));
+    }
 }
diff --git 
a/server/src/test/java/org/apache/kafka/server/SimpleApiVersionManagerTest.java 
b/server/src/test/java/org/apache/kafka/server/SimpleApiVersionManagerTest.java
new file mode 100644
index 00000000000..7da6bd612fc
--- /dev/null
+++ 
b/server/src/test/java/org/apache/kafka/server/SimpleApiVersionManagerTest.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server;
+
+import org.apache.kafka.common.message.ApiMessageType;
+import org.apache.kafka.common.requests.ApiVersionsResponse;
+import org.apache.kafka.server.common.FinalizedFeatures;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class SimpleApiVersionManagerTest {
+
+    @Test
+    public void testUnknownFeaturesHasNoMetadataVersion() {
+        SimpleApiVersionManager apiVersionManager = new 
SimpleApiVersionManager(
+            ApiMessageType.ListenerType.CONTROLLER,
+            true,
+            FinalizedFeatures::unknown
+        );
+        ApiVersionsResponse response = apiVersionManager.apiVersionResponse(0, 
false);
+
+        assertTrue(response.data().finalizedFeatures().isEmpty(),
+            "Finalized features should be empty when no quorum exists");
+
+        assertEquals(-1, response.data().finalizedFeaturesEpoch(),
+            "Finalized features epoch should be -1 when no quorum exists");
+    }
+}

Reply via email to