dajac commented on code in PR #15685:
URL: https://github.com/apache/kafka/pull/15685#discussion_r1602958576


##########
server-common/src/main/java/org/apache/kafka/server/common/FeatureVersionUtils.java:
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.common;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public interface FeatureVersionUtils {

Review Comment:
   Is there a particular reason for grouping those two interfaces in this one? 
I am asking because at first that `FeatureVersionUtils.java` was an utils 
library (e.g. useful functions). I did not get that it was actually the one 
that contains the interface that one must implement.



##########
server-common/src/main/java/org/apache/kafka/server/common/FeatureVersion.java:
##########
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.common;
+
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+/**
+ * This is enum for the various features implemented for Kafka clusters.
+ * KIP-584: Versioning Scheme for Features introduced the idea of various 
features, but only added one feature -- MetadataVersion.
+ * KIP-1022: Formatting and Updating Features allowed for more features to be 
added. In order to set and update features,
+ * they need to be specified via the StorageTool or FeatureCommand tools.
+ * <br>
+ * Having a unified enum for the features that will use a shared type in the 
API used to set and update them
+ * makes it easier to process these features.
+ */
+public enum FeatureVersion {
+
+    /**
+     * Features defined. If a feature is included in this list, and marked to 
be used in production they will also be specified when
+     * formatting a cluster via the StorageTool. MetadataVersion is handled 
separately, so it is not included here.
+     *
+     * See {@link TestFeatureVersion} as an example. See {@link 
FeatureVersionUtils.FeatureVersionImpl} when implementing a new feature.
+     */
+    TEST_VERSION("test.feature.version", TestFeatureVersion.values(), 
TestFeatureVersion::fromFeatureLevel, false);
+
+    public static final FeatureVersion[] FEATURES;
+    public static final List<FeatureVersion> PRODUCTION_FEATURES;
+    private final String name;
+    private final FeatureVersionUtils.FeatureVersionImpl[] features;
+    private final FeatureVersionUtils.CreateMethod createFeatureVersionMethod;
+    private final boolean usedInProduction;
+
+    FeatureVersion(String name,
+                   FeatureVersionUtils.FeatureVersionImpl[] features,
+                   FeatureVersionUtils.CreateMethod createMethod,
+                   boolean usedInProduction) {
+        this.name = name;
+        this.features = features;
+        this.createFeatureVersionMethod = createMethod;
+        this.usedInProduction = usedInProduction;
+    }
+
+    static {
+        FeatureVersion[] enumValues = FeatureVersion.values();
+        FEATURES = Arrays.copyOf(enumValues, enumValues.length);
+
+        PRODUCTION_FEATURES = Arrays.stream(FEATURES).filter(feature ->
+                feature.usedInProduction).collect(Collectors.toList());
+    }
+
+    public String featureName() {
+        return name;
+    }
+
+    /**
+     * Creates a FeatureVersion from a given name and level with the correct 
feature object underneath.
+     *
+     * @param level   the level of the feature
+     * @returns       the FeatureVersionUtils.FeatureVersionImpl for the 
feature the enum is based on.
+     * @throws        IllegalArgumentException if the feature name is not 
valid (not implemented for this method)
+     */
+    public FeatureVersionUtils.FeatureVersionImpl fromFeatureLevel(short 
level) {
+        return createFeatureVersionMethod.fromFeatureLevel(level);
+    }
+
+    /**
+     * A method to validate the feature can be set. If a given feature relies 
on another feature, the dependencies should be
+     * captured in {@link 
FeatureVersionUtils.FeatureVersionImpl#dependencies()}
+     * <p>
+     * For example, say feature X level x relies on feature Y level y:
+     * if feature X >= x then throw an error if feature Y < y.
+     *
+     * All feature levels above 0 require metadata.version=4 (IBP_3_3_IV0) in 
order to write the feature records to the cluster.
+     *
+     * @param feature                   the feature we are validating
+     * @param metadataVersion           the metadata version we have (or want 
to set)
+     * @param features                  the feature versions (besides 
MetadataVersion) we have (or want to set)
+     * @throws IllegalArgumentException if the feature is not valid
+     */
+    public static void validateVersion(FeatureVersionUtils.FeatureVersionImpl 
feature, MetadataVersion metadataVersion, Map<String, Short> features) {
+        if (feature.featureLevel() >= 1 && 
metadataVersion.isLessThan(MetadataVersion.IBP_3_3_IV0))
+            throw new IllegalArgumentException(feature.featureName() + " could 
not be set to " + feature.featureLevel() +
+                    " because it depends on metadata.version=14 (" + 
MetadataVersion.IBP_3_3_IV0 + ")");
+
+        for (Map.Entry<String, Short> dependency: 
feature.dependencies().entrySet()) {
+            Short featureLevel = features.get(dependency.getKey());
+
+            if (featureLevel == null || featureLevel < dependency.getValue()) {
+                throw new IllegalArgumentException(feature.featureName() + " 
could not be set to " + feature.featureLevel() +
+                        " because it depends on " + dependency.getKey() + " 
level " + dependency.getValue());
+            }
+        }
+    }
+
+    /**
+     * A method to return the default version level of a feature. If 
metadataVersionOpt is not empty, the default is based on
+     * the metadataVersion. If not, use the latest production version for the 
given feature.
+     *
+     * Every time a new feature is added, it should create a mapping from 
metadata version to feature version
+     * with {@link 
FeatureVersionUtils.FeatureVersionImpl#metadataVersionMapping()}
+     *
+     * @param metadataVersionOpt the metadata version we want to use to set 
the default, or None if the latest production version is desired
+     * @return the default version level for the feature and potential 
metadata version
+     */
+    public short defaultValue(Optional<MetadataVersion> metadataVersionOpt) {
+        MetadataVersion mv = 
metadataVersionOpt.orElse(MetadataVersion.LATEST_PRODUCTION);

Review Comment:
   I wonder if we should rather let the caller to always specify the MV. 
Instead of passing `Optional.empty` they could just pass 
`MetadataVersion.LATEST_PRODUCTION`. Or is there a reason against it?



##########
core/src/main/scala/kafka/server/BrokerFeatures.scala:
##########
@@ -75,16 +76,19 @@ object BrokerFeatures extends Logging {
   }
 
   def defaultSupportedFeatures(unstableMetadataVersionsEnabled: Boolean): 
Features[SupportedVersionRange] = {

Review Comment:
   Should we add a unit test for this change?



##########
core/src/main/scala/kafka/tools/StorageTool.scala:
##########
@@ -469,4 +522,28 @@ object StorageTool extends Logging {
     }
     0
   }
+
+  private def parseNameAndLevel(input: String): Array[String] = {
+    val equalsIndex = input.indexOf("=")
+    if (equalsIndex < 0)
+      throw new RuntimeException("Can't parse feature=level string " + input + 
": equals sign not found.")
+    val name = input.substring(0, equalsIndex).trim
+    val levelString = input.substring(equalsIndex + 1).trim
+    try {
+      levelString.toShort
+    } catch {
+      case _: Throwable =>
+        throw new RuntimeException("Can't parse feature=level string " + input 
+ ": " + "unable to parse " + levelString + " as a short.")
+    }
+    Array[String](name, levelString)

Review Comment:
   nit: It may be better to use a tuple `(String, Short)` if possible. It will 
also simplify the calling side.



##########
core/src/main/scala/kafka/tools/StorageTool.scala:
##########
@@ -469,4 +522,28 @@ object StorageTool extends Logging {
     }
     0
   }
+
+  private def parseNameAndLevel(input: String): Array[String] = {
+    val equalsIndex = input.indexOf("=")
+    if (equalsIndex < 0)
+      throw new RuntimeException("Can't parse feature=level string " + input + 
": equals sign not found.")
+    val name = input.substring(0, equalsIndex).trim
+    val levelString = input.substring(equalsIndex + 1).trim
+    try {
+      levelString.toShort
+    } catch {
+      case _: Throwable =>
+        throw new RuntimeException("Can't parse feature=level string " + input 
+ ": " + "unable to parse " + levelString + " as a short.")
+    }
+    Array[String](name, levelString)
+  }
+
+  def featureNamesAndLevels(features: List[String]): Map[String, 
java.lang.Short] = {
+    features.map((feature: String) => {
+      // Ensure the feature exists
+

Review Comment:
   nit: Should we remove this empty line?



##########
metadata/src/test/java/org/apache/kafka/controller/QuorumFeaturesTest.java:
##########
@@ -55,6 +56,12 @@ public void testDefaultFeatureMap() {
         expectedFeatures.put(MetadataVersion.FEATURE_NAME, VersionRange.of(
             MetadataVersion.MINIMUM_KRAFT_VERSION.featureLevel(),
             MetadataVersion.LATEST_PRODUCTION.featureLevel()));
+        for (FeatureVersion feature : FeatureVersion.PRODUCTION_FEATURES) {
+            expectedFeatures.put(feature.featureName(), VersionRange.of(
+                    0,
+                    feature.defaultValue(Optional.empty())

Review Comment:
   nit: Indentation.



##########
server-common/src/main/java/org/apache/kafka/server/common/FeatureVersionUtils.java:
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.common;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public interface FeatureVersionUtils {
+
+    interface FeatureVersionImpl {
+        short featureLevel();

Review Comment:
   nit: Javadoc?



##########
server-common/src/main/java/org/apache/kafka/server/common/FeatureVersionUtils.java:
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.common;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public interface FeatureVersionUtils {
+
+    interface FeatureVersionImpl {
+        short featureLevel();
+
+        String featureName();
+
+        /**
+         * The next metadata version to be released when the feature became 
production ready.
+         * (Ie, if the current production MV is 17 when a feature is released, 
its mapping should be to MV 18)
+         */
+        MetadataVersion metadataVersionMapping();

Review Comment:
   If I understand this one correctly, this is basically the minimum required 
metadata version for this feature version to be allowed. Is my understanding 
correct? If it is, it may be better to call it `minimumRequiredMetadataVersion` 
or something like this.
   
   My second question is about the production readiness for a feature. My 
understanding is that a given version won't be allowed unless the MV is set to 
the right version. So if the MV is "unreleased" and we allow it via the 
configuration, then I can set a feature version depending on it. Is my 
understanding correct?



##########
server-common/src/main/java/org/apache/kafka/server/common/FeatureVersion.java:
##########
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.common;
+
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+/**
+ * This is enum for the various features implemented for Kafka clusters.
+ * KIP-584: Versioning Scheme for Features introduced the idea of various 
features, but only added one feature -- MetadataVersion.
+ * KIP-1022: Formatting and Updating Features allowed for more features to be 
added. In order to set and update features,
+ * they need to be specified via the StorageTool or FeatureCommand tools.
+ * <br>
+ * Having a unified enum for the features that will use a shared type in the 
API used to set and update them
+ * makes it easier to process these features.
+ */
+public enum FeatureVersion {
+
+    /**
+     * Features defined. If a feature is included in this list, and marked to 
be used in production they will also be specified when
+     * formatting a cluster via the StorageTool. MetadataVersion is handled 
separately, so it is not included here.
+     *
+     * See {@link TestFeatureVersion} as an example. See {@link 
FeatureVersionUtils.FeatureVersionImpl} when implementing a new feature.
+     */
+    TEST_VERSION("test.feature.version", TestFeatureVersion.values(), 
TestFeatureVersion::fromFeatureLevel, false);
+
+    public static final FeatureVersion[] FEATURES;
+    public static final List<FeatureVersion> PRODUCTION_FEATURES;
+    private final String name;
+    private final FeatureVersionUtils.FeatureVersionImpl[] features;
+    private final FeatureVersionUtils.CreateMethod createFeatureVersionMethod;
+    private final boolean usedInProduction;
+
+    FeatureVersion(String name,
+                   FeatureVersionUtils.FeatureVersionImpl[] features,
+                   FeatureVersionUtils.CreateMethod createMethod,
+                   boolean usedInProduction) {
+        this.name = name;
+        this.features = features;
+        this.createFeatureVersionMethod = createMethod;
+        this.usedInProduction = usedInProduction;
+    }
+
+    static {
+        FeatureVersion[] enumValues = FeatureVersion.values();
+        FEATURES = Arrays.copyOf(enumValues, enumValues.length);
+
+        PRODUCTION_FEATURES = Arrays.stream(FEATURES).filter(feature ->
+                feature.usedInProduction).collect(Collectors.toList());
+    }
+
+    public String featureName() {
+        return name;
+    }
+
+    /**
+     * Creates a FeatureVersion from a given name and level with the correct 
feature object underneath.
+     *
+     * @param level   the level of the feature
+     * @returns       the FeatureVersionUtils.FeatureVersionImpl for the 
feature the enum is based on.
+     * @throws        IllegalArgumentException if the feature name is not 
valid (not implemented for this method)
+     */
+    public FeatureVersionUtils.FeatureVersionImpl fromFeatureLevel(short 
level) {
+        return createFeatureVersionMethod.fromFeatureLevel(level);
+    }
+
+    /**
+     * A method to validate the feature can be set. If a given feature relies 
on another feature, the dependencies should be
+     * captured in {@link 
FeatureVersionUtils.FeatureVersionImpl#dependencies()}
+     * <p>
+     * For example, say feature X level x relies on feature Y level y:
+     * if feature X >= x then throw an error if feature Y < y.
+     *
+     * All feature levels above 0 require metadata.version=4 (IBP_3_3_IV0) in 
order to write the feature records to the cluster.
+     *
+     * @param feature                   the feature we are validating
+     * @param metadataVersion           the metadata version we have (or want 
to set)
+     * @param features                  the feature versions (besides 
MetadataVersion) we have (or want to set)
+     * @throws IllegalArgumentException if the feature is not valid
+     */
+    public static void validateVersion(FeatureVersionUtils.FeatureVersionImpl 
feature, MetadataVersion metadataVersion, Map<String, Short> features) {

Review Comment:
   Are you going to add tests for this one in this PR or in a follow-up?



##########
server-common/src/main/java/org/apache/kafka/server/common/FeatureVersionUtils.java:
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.common;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public interface FeatureVersionUtils {
+
+    interface FeatureVersionImpl {

Review Comment:
   I find the name confusing for an interface as we usually use `Impl` for the 
implementation. Would it be possible to call it `FeatureVersion` and to perhaps 
rename `FeatureVersion` to `Features` (following ApiKeys naming)?



##########
server-common/src/main/java/org/apache/kafka/server/common/TestFeatureVersion.java:
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.common;
+
+import java.util.Collections;
+import java.util.Map;
+
+public enum TestFeatureVersion implements 
FeatureVersionUtils.FeatureVersionImpl {

Review Comment:
   For my understanding, I suppose that we are going to remove this one when we 
add real features. Am I correct?



##########
core/src/main/scala/kafka/tools/StorageTool.scala:
##########
@@ -60,24 +60,19 @@ object StorageTool extends Logging {
         case "format" =>
           val directories = configToLogDirectories(config.get)
           val clusterId = namespace.getString("cluster_id")
-          val metadataVersion = getMetadataVersion(namespace,
-            
Option(config.get.originals.get(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG)).map(_.toString))
-          if (!metadataVersion.isKRaftSupported) {
-            throw new TerseFailure(s"Must specify a valid KRaft 
metadata.version of at least ${MetadataVersion.IBP_3_0_IV0}.")
-          }
-          if (!metadataVersion.isProduction) {
-            if (config.get.unstableMetadataVersionsEnabled) {
-              System.out.println(s"WARNING: using pre-production 
metadata.version $metadataVersion.")
-            } else {
-              throw new TerseFailure(s"The metadata.version $metadataVersion 
is not ready for production use yet.")
-            }
-          }
           val metaProperties = new MetaProperties.Builder().
             setVersion(MetaPropertiesVersion.V1).
             setClusterId(clusterId).
             setNodeId(config.get.nodeId).
             build()
           val metadataRecords : ArrayBuffer[ApiMessageAndVersion] = 
ArrayBuffer()
+          val specifiedFeatures: util.List[String] = 
namespace.getList("feature")
+          val featureNamesAndLevelsMap = 
featureNamesAndLevels(Option(specifiedFeatures).getOrElse(Collections.emptyList).asScala.toList)
+          val metadataVersion = getMetadataVersion(namespace, 
featureNamesAndLevelsMap,
+            
Option(config.get.originals.get(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG)).map(_.toString))
+          validateMetadataVersion(metadataVersion, config)
+          // Get all other features, validate, and create records for them
+          metadataRecords.appendAll(generateFeatureRecords(metadataVersion, 
featureNamesAndLevelsMap, FeatureVersion.PRODUCTION_FEATURES.asScala.toList))

Review Comment:
   nit: It may be better to pass `metadataRecords` as an argument so we can 
directly add to it.



##########
core/src/main/scala/kafka/tools/StorageTool.scala:
##########
@@ -469,4 +522,28 @@ object StorageTool extends Logging {
     }
     0
   }
+
+  private def parseNameAndLevel(input: String): Array[String] = {
+    val equalsIndex = input.indexOf("=")
+    if (equalsIndex < 0)
+      throw new RuntimeException("Can't parse feature=level string " + input + 
": equals sign not found.")
+    val name = input.substring(0, equalsIndex).trim
+    val levelString = input.substring(equalsIndex + 1).trim
+    try {
+      levelString.toShort
+    } catch {
+      case _: Throwable =>
+        throw new RuntimeException("Can't parse feature=level string " + input 
+ ": " + "unable to parse " + levelString + " as a short.")
+    }
+    Array[String](name, levelString)
+  }
+
+  def featureNamesAndLevels(features: List[String]): Map[String, 
java.lang.Short] = {
+    features.map((feature: String) => {

Review Comment:
   nit: We usually use `map { (feature: String) =>`.



##########
server-common/src/main/java/org/apache/kafka/server/common/FeatureVersionUtils.java:
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.common;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public interface FeatureVersionUtils {
+
+    interface FeatureVersionImpl {
+        short featureLevel();
+
+        String featureName();
+
+        /**
+         * The next metadata version to be released when the feature became 
production ready.
+         * (Ie, if the current production MV is 17 when a feature is released, 
its mapping should be to MV 18)
+         */
+        MetadataVersion metadataVersionMapping();
+
+        /**
+         * A mapping from feature to level for all features that this feature 
depends on. If this feature doesn't
+         * depend on any others, return an empty map.
+         * For example, say feature X level x relies on feature Y level y:
+         * feature (X level x).dependencies() will return (Y -> y)
+         */
+        Map<String, Short> dependencies();

Review Comment:
   nit: I was wondering whether we could return a 
List<FeatureVersionUtils.FeatureVersionImpl> here. The FeatureVersionImpl 
provides the name and the required level. Would it be possible?



##########
metadata/src/test/java/org/apache/kafka/controller/QuorumFeaturesTest.java:
##########
@@ -64,6 +71,12 @@ public void testDefaultFeatureMapWithUnstable() {
         expectedFeatures.put(MetadataVersion.FEATURE_NAME, VersionRange.of(
             MetadataVersion.MINIMUM_KRAFT_VERSION.featureLevel(),
             MetadataVersion.latestTesting().featureLevel()));
+        for (FeatureVersion feature : FeatureVersion.PRODUCTION_FEATURES) {
+            expectedFeatures.put(feature.featureName(), VersionRange.of(
+                    0,
+                    feature.defaultValue(Optional.empty())

Review Comment:
   ditto.



##########
core/src/main/scala/kafka/tools/StorageTool.scala:
##########
@@ -109,6 +105,49 @@ object StorageTool extends Logging {
     }
   }
 
+  private def validateMetadataVersion(metadataVersion: MetadataVersion, 
config: Option[KafkaConfig]): Unit = {
+    if (!metadataVersion.isKRaftSupported) {
+      throw new TerseFailure(s"Must specify a valid KRaft metadata.version of 
at least ${MetadataVersion.IBP_3_0_IV0}.")
+    }
+    if (!metadataVersion.isProduction) {
+      if (config.get.unstableMetadataVersionsEnabled) {
+        System.out.println(s"WARNING: using pre-production metadata.version 
$metadataVersion.")
+      } else {
+        throw new TerseFailure(s"The metadata.version $metadataVersion is not 
ready for production use yet.")
+      }
+    }
+  }
+
+  def generateFeatureRecords(metadataVersion: MetadataVersion,

Review Comment:
   nit: Could we make it private?



##########
metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java:
##########
@@ -459,18 +459,20 @@ BrokerFeature processRegistrationFeature(
         FinalizedControllerFeatures finalizedFeatures,
         BrokerRegistrationRequestData.Feature feature
     ) {
-        Optional<Short> finalized = finalizedFeatures.get(feature.name());
-        if (finalized.isPresent()) {
-            if (!VersionRange.of(feature.minSupportedVersion(), 
feature.maxSupportedVersion()).contains(finalized.get())) {
-                throw new UnsupportedVersionException("Unable to register 
because the broker " +
-                    "does not support version " + finalized.get() + " of " + 
feature.name() +
-                        ". It wants a version between " + 
feature.minSupportedVersion() + " and " +
-                        feature.maxSupportedVersion() + ", inclusive.");
-            }
-        } else {
-            log.warn("Broker {} registered with feature {} that is unknown to 
the controller",
+        int defaultVersion = 
feature.name().equals(MetadataVersion.FEATURE_NAME) ? 1 : 0; // The default 
value for MetadataVersion is 1 not 0.
+        short finalized = finalizedFeatures.getOrDefault(feature.name(), 
(short) defaultVersion);
+        if (!VersionRange.of(feature.minSupportedVersion(), 
feature.maxSupportedVersion()).contains(finalized)) {
+            throw new UnsupportedVersionException("Unable to register because 
the broker " +
+                "does not support version " + finalized + " of " + 
feature.name() +
+                    ". It wants a version between " + 
feature.minSupportedVersion() + " and " +
+                    feature.maxSupportedVersion() + ", inclusive.");
+        }
+        // A feature is not found in the finalizedFeature map if it is unknown 
to the controller or set to 0 (feature not enabled).
+        // As more features roll out, it may be common to leave a feature 
disabled, so this log is debug level in the case
+        // an intended feature is not being set.
+        if (finalized == 0)
+            log.debug("Broker {} registered with feature {} that is either 
unknown or version 0 on the controller",

Review Comment:
   I wonder whether we should just keep it as a warning. Or could we 
differentiate between a disabled feature or a unknown one?



##########
metadata/src/main/java/org/apache/kafka/controller/QuorumFeatures.java:
##########
@@ -61,6 +62,12 @@ public static Map<String, VersionRange> 
defaultFeatureMap(boolean enableUnstable
                 enableUnstable ?
                     MetadataVersion.latestTesting().featureLevel() :
                     MetadataVersion.latestProduction().featureLevel()));
+        for (FeatureVersion feature : FeatureVersion.PRODUCTION_FEATURES) {
+            features.put(feature.featureName(), VersionRange.of(
+                    0,
+                    feature.defaultValue(Optional.empty())

Review Comment:
   nit: Indentation seems to be off here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to