dajac commented on code in PR #15685: URL: https://github.com/apache/kafka/pull/15685#discussion_r1602958576
########## server-common/src/main/java/org/apache/kafka/server/common/FeatureVersionUtils.java: ########## @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.server.common; + +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public interface FeatureVersionUtils { Review Comment: Is there a particular reason for grouping those two interfaces in this one? I am asking because at first that `FeatureVersionUtils.java` was an utils library (e.g. useful functions). I did not get that it was actually the one that contains the interface that one must implement. ########## server-common/src/main/java/org/apache/kafka/server/common/FeatureVersion.java: ########## @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.common; + +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; + +/** + * This is enum for the various features implemented for Kafka clusters. + * KIP-584: Versioning Scheme for Features introduced the idea of various features, but only added one feature -- MetadataVersion. + * KIP-1022: Formatting and Updating Features allowed for more features to be added. In order to set and update features, + * they need to be specified via the StorageTool or FeatureCommand tools. + * <br> + * Having a unified enum for the features that will use a shared type in the API used to set and update them + * makes it easier to process these features. + */ +public enum FeatureVersion { + + /** + * Features defined. If a feature is included in this list, and marked to be used in production they will also be specified when + * formatting a cluster via the StorageTool. MetadataVersion is handled separately, so it is not included here. + * + * See {@link TestFeatureVersion} as an example. See {@link FeatureVersionUtils.FeatureVersionImpl} when implementing a new feature. + */ + TEST_VERSION("test.feature.version", TestFeatureVersion.values(), TestFeatureVersion::fromFeatureLevel, false); + + public static final FeatureVersion[] FEATURES; + public static final List<FeatureVersion> PRODUCTION_FEATURES; + private final String name; + private final FeatureVersionUtils.FeatureVersionImpl[] features; + private final FeatureVersionUtils.CreateMethod createFeatureVersionMethod; + private final boolean usedInProduction; + + FeatureVersion(String name, + FeatureVersionUtils.FeatureVersionImpl[] features, + FeatureVersionUtils.CreateMethod createMethod, + boolean usedInProduction) { + this.name = name; + this.features = features; + this.createFeatureVersionMethod = createMethod; + this.usedInProduction = usedInProduction; + } + + static { + FeatureVersion[] enumValues = FeatureVersion.values(); + FEATURES = Arrays.copyOf(enumValues, enumValues.length); + + PRODUCTION_FEATURES = Arrays.stream(FEATURES).filter(feature -> + feature.usedInProduction).collect(Collectors.toList()); + } + + public String featureName() { + return name; + } + + /** + * Creates a FeatureVersion from a given name and level with the correct feature object underneath. + * + * @param level the level of the feature + * @returns the FeatureVersionUtils.FeatureVersionImpl for the feature the enum is based on. + * @throws IllegalArgumentException if the feature name is not valid (not implemented for this method) + */ + public FeatureVersionUtils.FeatureVersionImpl fromFeatureLevel(short level) { + return createFeatureVersionMethod.fromFeatureLevel(level); + } + + /** + * A method to validate the feature can be set. If a given feature relies on another feature, the dependencies should be + * captured in {@link FeatureVersionUtils.FeatureVersionImpl#dependencies()} + * <p> + * For example, say feature X level x relies on feature Y level y: + * if feature X >= x then throw an error if feature Y < y. + * + * All feature levels above 0 require metadata.version=4 (IBP_3_3_IV0) in order to write the feature records to the cluster. + * + * @param feature the feature we are validating + * @param metadataVersion the metadata version we have (or want to set) + * @param features the feature versions (besides MetadataVersion) we have (or want to set) + * @throws IllegalArgumentException if the feature is not valid + */ + public static void validateVersion(FeatureVersionUtils.FeatureVersionImpl feature, MetadataVersion metadataVersion, Map<String, Short> features) { + if (feature.featureLevel() >= 1 && metadataVersion.isLessThan(MetadataVersion.IBP_3_3_IV0)) + throw new IllegalArgumentException(feature.featureName() + " could not be set to " + feature.featureLevel() + + " because it depends on metadata.version=14 (" + MetadataVersion.IBP_3_3_IV0 + ")"); + + for (Map.Entry<String, Short> dependency: feature.dependencies().entrySet()) { + Short featureLevel = features.get(dependency.getKey()); + + if (featureLevel == null || featureLevel < dependency.getValue()) { + throw new IllegalArgumentException(feature.featureName() + " could not be set to " + feature.featureLevel() + + " because it depends on " + dependency.getKey() + " level " + dependency.getValue()); + } + } + } + + /** + * A method to return the default version level of a feature. If metadataVersionOpt is not empty, the default is based on + * the metadataVersion. If not, use the latest production version for the given feature. + * + * Every time a new feature is added, it should create a mapping from metadata version to feature version + * with {@link FeatureVersionUtils.FeatureVersionImpl#metadataVersionMapping()} + * + * @param metadataVersionOpt the metadata version we want to use to set the default, or None if the latest production version is desired + * @return the default version level for the feature and potential metadata version + */ + public short defaultValue(Optional<MetadataVersion> metadataVersionOpt) { + MetadataVersion mv = metadataVersionOpt.orElse(MetadataVersion.LATEST_PRODUCTION); Review Comment: I wonder if we should rather let the caller to always specify the MV. Instead of passing `Optional.empty` they could just pass `MetadataVersion.LATEST_PRODUCTION`. Or is there a reason against it? ########## core/src/main/scala/kafka/server/BrokerFeatures.scala: ########## @@ -75,16 +76,19 @@ object BrokerFeatures extends Logging { } def defaultSupportedFeatures(unstableMetadataVersionsEnabled: Boolean): Features[SupportedVersionRange] = { Review Comment: Should we add a unit test for this change? ########## core/src/main/scala/kafka/tools/StorageTool.scala: ########## @@ -469,4 +522,28 @@ object StorageTool extends Logging { } 0 } + + private def parseNameAndLevel(input: String): Array[String] = { + val equalsIndex = input.indexOf("=") + if (equalsIndex < 0) + throw new RuntimeException("Can't parse feature=level string " + input + ": equals sign not found.") + val name = input.substring(0, equalsIndex).trim + val levelString = input.substring(equalsIndex + 1).trim + try { + levelString.toShort + } catch { + case _: Throwable => + throw new RuntimeException("Can't parse feature=level string " + input + ": " + "unable to parse " + levelString + " as a short.") + } + Array[String](name, levelString) Review Comment: nit: It may be better to use a tuple `(String, Short)` if possible. It will also simplify the calling side. ########## core/src/main/scala/kafka/tools/StorageTool.scala: ########## @@ -469,4 +522,28 @@ object StorageTool extends Logging { } 0 } + + private def parseNameAndLevel(input: String): Array[String] = { + val equalsIndex = input.indexOf("=") + if (equalsIndex < 0) + throw new RuntimeException("Can't parse feature=level string " + input + ": equals sign not found.") + val name = input.substring(0, equalsIndex).trim + val levelString = input.substring(equalsIndex + 1).trim + try { + levelString.toShort + } catch { + case _: Throwable => + throw new RuntimeException("Can't parse feature=level string " + input + ": " + "unable to parse " + levelString + " as a short.") + } + Array[String](name, levelString) + } + + def featureNamesAndLevels(features: List[String]): Map[String, java.lang.Short] = { + features.map((feature: String) => { + // Ensure the feature exists + Review Comment: nit: Should we remove this empty line? ########## metadata/src/test/java/org/apache/kafka/controller/QuorumFeaturesTest.java: ########## @@ -55,6 +56,12 @@ public void testDefaultFeatureMap() { expectedFeatures.put(MetadataVersion.FEATURE_NAME, VersionRange.of( MetadataVersion.MINIMUM_KRAFT_VERSION.featureLevel(), MetadataVersion.LATEST_PRODUCTION.featureLevel())); + for (FeatureVersion feature : FeatureVersion.PRODUCTION_FEATURES) { + expectedFeatures.put(feature.featureName(), VersionRange.of( + 0, + feature.defaultValue(Optional.empty()) Review Comment: nit: Indentation. ########## server-common/src/main/java/org/apache/kafka/server/common/FeatureVersionUtils.java: ########## @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.server.common; + +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public interface FeatureVersionUtils { + + interface FeatureVersionImpl { + short featureLevel(); Review Comment: nit: Javadoc? ########## server-common/src/main/java/org/apache/kafka/server/common/FeatureVersionUtils.java: ########## @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.server.common; + +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public interface FeatureVersionUtils { + + interface FeatureVersionImpl { + short featureLevel(); + + String featureName(); + + /** + * The next metadata version to be released when the feature became production ready. + * (Ie, if the current production MV is 17 when a feature is released, its mapping should be to MV 18) + */ + MetadataVersion metadataVersionMapping(); Review Comment: If I understand this one correctly, this is basically the minimum required metadata version for this feature version to be allowed. Is my understanding correct? If it is, it may be better to call it `minimumRequiredMetadataVersion` or something like this. My second question is about the production readiness for a feature. My understanding is that a given version won't be allowed unless the MV is set to the right version. So if the MV is "unreleased" and we allow it via the configuration, then I can set a feature version depending on it. Is my understanding correct? ########## server-common/src/main/java/org/apache/kafka/server/common/FeatureVersion.java: ########## @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.common; + +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; + +/** + * This is enum for the various features implemented for Kafka clusters. + * KIP-584: Versioning Scheme for Features introduced the idea of various features, but only added one feature -- MetadataVersion. + * KIP-1022: Formatting and Updating Features allowed for more features to be added. In order to set and update features, + * they need to be specified via the StorageTool or FeatureCommand tools. + * <br> + * Having a unified enum for the features that will use a shared type in the API used to set and update them + * makes it easier to process these features. + */ +public enum FeatureVersion { + + /** + * Features defined. If a feature is included in this list, and marked to be used in production they will also be specified when + * formatting a cluster via the StorageTool. MetadataVersion is handled separately, so it is not included here. + * + * See {@link TestFeatureVersion} as an example. See {@link FeatureVersionUtils.FeatureVersionImpl} when implementing a new feature. + */ + TEST_VERSION("test.feature.version", TestFeatureVersion.values(), TestFeatureVersion::fromFeatureLevel, false); + + public static final FeatureVersion[] FEATURES; + public static final List<FeatureVersion> PRODUCTION_FEATURES; + private final String name; + private final FeatureVersionUtils.FeatureVersionImpl[] features; + private final FeatureVersionUtils.CreateMethod createFeatureVersionMethod; + private final boolean usedInProduction; + + FeatureVersion(String name, + FeatureVersionUtils.FeatureVersionImpl[] features, + FeatureVersionUtils.CreateMethod createMethod, + boolean usedInProduction) { + this.name = name; + this.features = features; + this.createFeatureVersionMethod = createMethod; + this.usedInProduction = usedInProduction; + } + + static { + FeatureVersion[] enumValues = FeatureVersion.values(); + FEATURES = Arrays.copyOf(enumValues, enumValues.length); + + PRODUCTION_FEATURES = Arrays.stream(FEATURES).filter(feature -> + feature.usedInProduction).collect(Collectors.toList()); + } + + public String featureName() { + return name; + } + + /** + * Creates a FeatureVersion from a given name and level with the correct feature object underneath. + * + * @param level the level of the feature + * @returns the FeatureVersionUtils.FeatureVersionImpl for the feature the enum is based on. + * @throws IllegalArgumentException if the feature name is not valid (not implemented for this method) + */ + public FeatureVersionUtils.FeatureVersionImpl fromFeatureLevel(short level) { + return createFeatureVersionMethod.fromFeatureLevel(level); + } + + /** + * A method to validate the feature can be set. If a given feature relies on another feature, the dependencies should be + * captured in {@link FeatureVersionUtils.FeatureVersionImpl#dependencies()} + * <p> + * For example, say feature X level x relies on feature Y level y: + * if feature X >= x then throw an error if feature Y < y. + * + * All feature levels above 0 require metadata.version=4 (IBP_3_3_IV0) in order to write the feature records to the cluster. + * + * @param feature the feature we are validating + * @param metadataVersion the metadata version we have (or want to set) + * @param features the feature versions (besides MetadataVersion) we have (or want to set) + * @throws IllegalArgumentException if the feature is not valid + */ + public static void validateVersion(FeatureVersionUtils.FeatureVersionImpl feature, MetadataVersion metadataVersion, Map<String, Short> features) { Review Comment: Are you going to add tests for this one in this PR or in a follow-up? ########## server-common/src/main/java/org/apache/kafka/server/common/FeatureVersionUtils.java: ########## @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.server.common; + +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public interface FeatureVersionUtils { + + interface FeatureVersionImpl { Review Comment: I find the name confusing for an interface as we usually use `Impl` for the implementation. Would it be possible to call it `FeatureVersion` and to perhaps rename `FeatureVersion` to `Features` (following ApiKeys naming)? ########## server-common/src/main/java/org/apache/kafka/server/common/TestFeatureVersion.java: ########## @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.common; + +import java.util.Collections; +import java.util.Map; + +public enum TestFeatureVersion implements FeatureVersionUtils.FeatureVersionImpl { Review Comment: For my understanding, I suppose that we are going to remove this one when we add real features. Am I correct? ########## core/src/main/scala/kafka/tools/StorageTool.scala: ########## @@ -60,24 +60,19 @@ object StorageTool extends Logging { case "format" => val directories = configToLogDirectories(config.get) val clusterId = namespace.getString("cluster_id") - val metadataVersion = getMetadataVersion(namespace, - Option(config.get.originals.get(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG)).map(_.toString)) - if (!metadataVersion.isKRaftSupported) { - throw new TerseFailure(s"Must specify a valid KRaft metadata.version of at least ${MetadataVersion.IBP_3_0_IV0}.") - } - if (!metadataVersion.isProduction) { - if (config.get.unstableMetadataVersionsEnabled) { - System.out.println(s"WARNING: using pre-production metadata.version $metadataVersion.") - } else { - throw new TerseFailure(s"The metadata.version $metadataVersion is not ready for production use yet.") - } - } val metaProperties = new MetaProperties.Builder(). setVersion(MetaPropertiesVersion.V1). setClusterId(clusterId). setNodeId(config.get.nodeId). build() val metadataRecords : ArrayBuffer[ApiMessageAndVersion] = ArrayBuffer() + val specifiedFeatures: util.List[String] = namespace.getList("feature") + val featureNamesAndLevelsMap = featureNamesAndLevels(Option(specifiedFeatures).getOrElse(Collections.emptyList).asScala.toList) + val metadataVersion = getMetadataVersion(namespace, featureNamesAndLevelsMap, + Option(config.get.originals.get(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG)).map(_.toString)) + validateMetadataVersion(metadataVersion, config) + // Get all other features, validate, and create records for them + metadataRecords.appendAll(generateFeatureRecords(metadataVersion, featureNamesAndLevelsMap, FeatureVersion.PRODUCTION_FEATURES.asScala.toList)) Review Comment: nit: It may be better to pass `metadataRecords` as an argument so we can directly add to it. ########## core/src/main/scala/kafka/tools/StorageTool.scala: ########## @@ -469,4 +522,28 @@ object StorageTool extends Logging { } 0 } + + private def parseNameAndLevel(input: String): Array[String] = { + val equalsIndex = input.indexOf("=") + if (equalsIndex < 0) + throw new RuntimeException("Can't parse feature=level string " + input + ": equals sign not found.") + val name = input.substring(0, equalsIndex).trim + val levelString = input.substring(equalsIndex + 1).trim + try { + levelString.toShort + } catch { + case _: Throwable => + throw new RuntimeException("Can't parse feature=level string " + input + ": " + "unable to parse " + levelString + " as a short.") + } + Array[String](name, levelString) + } + + def featureNamesAndLevels(features: List[String]): Map[String, java.lang.Short] = { + features.map((feature: String) => { Review Comment: nit: We usually use `map { (feature: String) =>`. ########## server-common/src/main/java/org/apache/kafka/server/common/FeatureVersionUtils.java: ########## @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.server.common; + +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public interface FeatureVersionUtils { + + interface FeatureVersionImpl { + short featureLevel(); + + String featureName(); + + /** + * The next metadata version to be released when the feature became production ready. + * (Ie, if the current production MV is 17 when a feature is released, its mapping should be to MV 18) + */ + MetadataVersion metadataVersionMapping(); + + /** + * A mapping from feature to level for all features that this feature depends on. If this feature doesn't + * depend on any others, return an empty map. + * For example, say feature X level x relies on feature Y level y: + * feature (X level x).dependencies() will return (Y -> y) + */ + Map<String, Short> dependencies(); Review Comment: nit: I was wondering whether we could return a List<FeatureVersionUtils.FeatureVersionImpl> here. The FeatureVersionImpl provides the name and the required level. Would it be possible? ########## metadata/src/test/java/org/apache/kafka/controller/QuorumFeaturesTest.java: ########## @@ -64,6 +71,12 @@ public void testDefaultFeatureMapWithUnstable() { expectedFeatures.put(MetadataVersion.FEATURE_NAME, VersionRange.of( MetadataVersion.MINIMUM_KRAFT_VERSION.featureLevel(), MetadataVersion.latestTesting().featureLevel())); + for (FeatureVersion feature : FeatureVersion.PRODUCTION_FEATURES) { + expectedFeatures.put(feature.featureName(), VersionRange.of( + 0, + feature.defaultValue(Optional.empty()) Review Comment: ditto. ########## core/src/main/scala/kafka/tools/StorageTool.scala: ########## @@ -109,6 +105,49 @@ object StorageTool extends Logging { } } + private def validateMetadataVersion(metadataVersion: MetadataVersion, config: Option[KafkaConfig]): Unit = { + if (!metadataVersion.isKRaftSupported) { + throw new TerseFailure(s"Must specify a valid KRaft metadata.version of at least ${MetadataVersion.IBP_3_0_IV0}.") + } + if (!metadataVersion.isProduction) { + if (config.get.unstableMetadataVersionsEnabled) { + System.out.println(s"WARNING: using pre-production metadata.version $metadataVersion.") + } else { + throw new TerseFailure(s"The metadata.version $metadataVersion is not ready for production use yet.") + } + } + } + + def generateFeatureRecords(metadataVersion: MetadataVersion, Review Comment: nit: Could we make it private? ########## metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java: ########## @@ -459,18 +459,20 @@ BrokerFeature processRegistrationFeature( FinalizedControllerFeatures finalizedFeatures, BrokerRegistrationRequestData.Feature feature ) { - Optional<Short> finalized = finalizedFeatures.get(feature.name()); - if (finalized.isPresent()) { - if (!VersionRange.of(feature.minSupportedVersion(), feature.maxSupportedVersion()).contains(finalized.get())) { - throw new UnsupportedVersionException("Unable to register because the broker " + - "does not support version " + finalized.get() + " of " + feature.name() + - ". It wants a version between " + feature.minSupportedVersion() + " and " + - feature.maxSupportedVersion() + ", inclusive."); - } - } else { - log.warn("Broker {} registered with feature {} that is unknown to the controller", + int defaultVersion = feature.name().equals(MetadataVersion.FEATURE_NAME) ? 1 : 0; // The default value for MetadataVersion is 1 not 0. + short finalized = finalizedFeatures.getOrDefault(feature.name(), (short) defaultVersion); + if (!VersionRange.of(feature.minSupportedVersion(), feature.maxSupportedVersion()).contains(finalized)) { + throw new UnsupportedVersionException("Unable to register because the broker " + + "does not support version " + finalized + " of " + feature.name() + + ". It wants a version between " + feature.minSupportedVersion() + " and " + + feature.maxSupportedVersion() + ", inclusive."); + } + // A feature is not found in the finalizedFeature map if it is unknown to the controller or set to 0 (feature not enabled). + // As more features roll out, it may be common to leave a feature disabled, so this log is debug level in the case + // an intended feature is not being set. + if (finalized == 0) + log.debug("Broker {} registered with feature {} that is either unknown or version 0 on the controller", Review Comment: I wonder whether we should just keep it as a warning. Or could we differentiate between a disabled feature or a unknown one? ########## metadata/src/main/java/org/apache/kafka/controller/QuorumFeatures.java: ########## @@ -61,6 +62,12 @@ public static Map<String, VersionRange> defaultFeatureMap(boolean enableUnstable enableUnstable ? MetadataVersion.latestTesting().featureLevel() : MetadataVersion.latestProduction().featureLevel())); + for (FeatureVersion feature : FeatureVersion.PRODUCTION_FEATURES) { + features.put(feature.featureName(), VersionRange.of( + 0, + feature.defaultValue(Optional.empty()) Review Comment: nit: Indentation seems to be off here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
