jsancio commented on code in PR #21053:
URL: https://github.com/apache/kafka/pull/21053#discussion_r2770513959


##########
metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java:
##########
@@ -87,6 +89,7 @@ static class Builder {
         private Map<String, Object> staticConfig = Map.of();
         private int nodeId = 0;
         private FeatureControlManager featureControl = null;
+        private SupportedConfigChecker supportedConfigChecker = (resourceType, 
configName) -> true;

Review Comment:
   Instead of defining this default value 3 times (at least) why not define 
this in SupportedConfigChecker. E.g.
   
   ```java
       private static final SupportedConfigChecker TRUE = ...;
   
       SupportedConfigChecker true() {
           return TRUE;
       }
   ```
   



##########
coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/KRaftCoordinatorMetadataImage.java:
##########
@@ -76,7 +76,9 @@ public Optional<TopicMetadata> topicMetadata(String 
topicName) {
 
     @Override
     public CoordinatorMetadataDelta emptyDelta() {
-        return new KRaftCoordinatorMetadataDelta(new 
MetadataDelta(metadataImage));
+        return new KRaftCoordinatorMetadataDelta(new MetadataDelta.Builder().
+            setImage(metadataImage).
+            build());

Review Comment:
   We tend to use this formatting in this module:
   ```java
           return new KRaftCoordinatorMetadataDelta(
               new MetadataDelta.Builder()
                   .setImage(metadataImage)
                   .build()
           );
   ```



##########
metadata/src/main/java/org/apache/kafka/image/ConfigurationDelta.java:
##########
@@ -45,6 +48,9 @@ public void finishSnapshot() {
     }
 
     public void replay(ConfigRecord record) {
+        if (!supportedConfigChecker.isSupported(image.resource().type(), 
record.name())) {
+            return;
+        }

Review Comment:
   Same here. I suggest writing a comment before the "return" explaining why 
kafka skips these records.



##########
core/src/main/scala/kafka/server/SharedServer.scala:
##########
@@ -112,6 +111,7 @@ class SharedServer(
   private var usedByController: Boolean = false
   val brokerConfig = new KafkaConfig(sharedServerConfig.props, false)
   val controllerConfig = new KafkaConfig(sharedServerConfig.props, false)
+  val supportedConfigChecker: SupportedConfigChecker = new 
ControllerConfigurationValidator(brokerConfig)

Review Comment:
   Btw, it is a code smell that this type is called controller configuration 
validator, it takes a broker config and it is used in shared server. For 
example, before your change the controller configuration validator was only 
created in controller server and used by the quorum controller.



##########
core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala:
##########
@@ -44,7 +47,33 @@ import scala.collection.mutable
  * in the same RPC, BROKER_LOGGER is not really a dynamic configuration in the 
same sense
  * as the others. It is not persisted to the metadata log.
  */
-class ControllerConfigurationValidator(kafkaConfig: KafkaConfig) extends 
ConfigurationValidator {
+class ControllerConfigurationValidator(private val kafkaConfig: KafkaConfig) 
+    extends ConfigurationValidator with SupportedConfigChecker {
+  private val validConfigsByType: Map[ConfigResource.Type, util.Set[String]] = 
{
+    val topicConfigs = LogConfig.nonInternalConfigNames.asScala.toSet
+    val brokerConfigs = DynamicConfig.Broker.names.asScala.toSet
+    val clientMetricsConfigs = 
ClientMetricsConfigs.configDef().names.asScala.toSet
+    val groupConfigs = GroupConfig.configDef().names.asScala.toSet
+    // Quota configs can be used with different resource types, so we include 
them for all types
+    val allQuotaConfigs = 
QuotaConfig.scramMechanismsPlusUserAndClientQuotaConfigs().names.asScala ++
+                          
QuotaConfig.userAndClientQuotaConfigs().names.asScala ++
+                          QuotaConfig.ipConfigs.names.asScala
+
+    Map(
+      ConfigResource.Type.TOPIC -> (topicConfigs ++ allQuotaConfigs).asJava,
+      ConfigResource.Type.BROKER -> (brokerConfigs ++ allQuotaConfigs).asJava,
+      ConfigResource.Type.CLIENT_METRICS -> (clientMetricsConfigs ++ 
allQuotaConfigs).asJava,
+      ConfigResource.Type.GROUP -> (groupConfigs ++ allQuotaConfigs).asJava
+    )

Review Comment:
   Why did you have to add all quota configs for all of the resources. That 
doesn't seem right. Each resource should define what quota are valid. For 
example, I see the following definition in LogConfig:
   
   ```java
             .define(QuotaConfig.LEADER_REPLICATION_THROTTLED_REPLICAS_CONFIG, 
LIST, QuotaConfig.LEADER_REPLICATION_THROTTLED_REPLICAS_DEFAULT,
                     ThrottledReplicaListValidator.INSTANCE, MEDIUM, 
QuotaConfig.LEADER_REPLICATION_THROTTLED_REPLICAS_DOC)
             
.define(QuotaConfig.FOLLOWER_REPLICATION_THROTTLED_REPLICAS_CONFIG, LIST, 
QuotaConfig.FOLLOWER_REPLICATION_THROTTLED_REPLICAS_DEFAULT,
                     ThrottledReplicaListValidator.INSTANCE, MEDIUM, 
QuotaConfig.FOLLOWER_REPLICATION_THROTTLED_REPLICAS_DOC)
   ```
   
   Looking at the validation for the broker resource it looks like all keys are 
allowed:
   
   ```java
           case BROKER => validateBrokerName(resource.name())
   ```
   
   



##########
coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java:
##########
@@ -2059,7 +2059,8 @@ public void testOnMetadataUpdate() {
         verify(coordinator0).onLoaded(CoordinatorMetadataImage.EMPTY);
 
         // Publish a new image.
-        CoordinatorMetadataDelta delta = new KRaftCoordinatorMetadataDelta(new 
MetadataDelta(MetadataImage.EMPTY));
+        CoordinatorMetadataDelta delta = new KRaftCoordinatorMetadataDelta(
+                new 
MetadataDelta.Builder().setImage(MetadataImage.EMPTY).build());

Review Comment:
   Same comment here. See my other comment about how to format this change.



##########
core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala:
##########
@@ -44,7 +47,33 @@ import scala.collection.mutable
  * in the same RPC, BROKER_LOGGER is not really a dynamic configuration in the 
same sense
  * as the others. It is not persisted to the metadata log.
  */
-class ControllerConfigurationValidator(kafkaConfig: KafkaConfig) extends 
ConfigurationValidator {
+class ControllerConfigurationValidator(private val kafkaConfig: KafkaConfig) 

Review Comment:
   Why did you decide to extend this type? The implementation of `isSupported` 
is completely disjoint of the rest of the type. It looks like you should be 
able to implement this type in the server module. The server module should have 
access to tall of the config definitions that you need.



##########
core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala:
##########
@@ -44,7 +47,33 @@ import scala.collection.mutable
  * in the same RPC, BROKER_LOGGER is not really a dynamic configuration in the 
same sense
  * as the others. It is not persisted to the metadata log.
  */
-class ControllerConfigurationValidator(kafkaConfig: KafkaConfig) extends 
ConfigurationValidator {
+class ControllerConfigurationValidator(private val kafkaConfig: KafkaConfig) 
+    extends ConfigurationValidator with SupportedConfigChecker {
+  private val validConfigsByType: Map[ConfigResource.Type, util.Set[String]] = 
{

Review Comment:
   The formatting looks off. How about:
   
   ```scala
   class ControllerConfigurationValidator(
     private val kafkaConfig: KafkaConfig
   ) extends ConfigurationValidator with SupportedConfigChecker {
     private val validConfigsByType: Map[ConfigResource.Type, util.Set[String]] 
= {
   ```



##########
metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java:
##########
@@ -509,6 +520,11 @@ private List<String> getParts(String value, String key, 
ConfigResource configRes
     public void replay(ConfigRecord record) {
         Type type = Type.forId(record.resourceType());
         ConfigResource configResource = new ConfigResource(type, 
record.resourceName());
+
+        if (!supportedConfigChecker.isSupported(configResource.type(), 
record.name())) {
+            return;
+        }

Review Comment:
   I suggest writing a comment before the return explaining why the controller 
skips records that are not "supported."



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to