This is an automated email from the ASF dual-hosted git repository.

squah-confluent pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a7dfb89db47 KAFKA-20409: Don't expose internal group configs unless 
they are user-defined (#22302)
a7dfb89db47 is described below

commit a7dfb89db47fb7ec3503981057fab54f8c078d15
Author: majialong <[email protected]>
AuthorDate: Thu May 21 15:04:25 2026 +0800

    KAFKA-20409: Don't expose internal group configs unless they are 
user-defined (#22302)
    
    Previously, `internal group configs` with a `broker-level synonym` were
    always included in the group config map, exposing them even when the
    user had never explicitly set them. This change skips such configs
    unless the user has configured them either via the broker synonym or
    directly at the group level.
    
    Reviewers: Sean Quah <[email protected]>
---
 .../kafka/coordinator/group/GroupConfig.java       |  9 ++++
 .../kafka/server/config/AbstractKafkaConfig.java   | 17 +++++-
 .../server/config/AbstractKafkaConfigTest.java     | 60 ++++++++++++++++++++++
 3 files changed, 84 insertions(+), 2 deletions(-)

diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
index f74e7a0a5d3..100bbc0c9b8 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
@@ -343,6 +343,15 @@ public final class GroupConfig extends AbstractConfig {
         Map.entry(ERRORS_DEADLETTERQUEUE_COPY_RECORD_ENABLE_CONFIG, 
Optional.empty())
     );
 
+    /**
+     * Returns {@code true} if the given config name is defined as internal in 
{@link #CONFIG_DEF}.
+     * Returns {@code false} for unknown names or non-internal configs.
+     */
+    public static boolean isInternal(String configName) {
+        ConfigDef.ConfigKey configKey = 
CONFIG_DEF.configKeys().get(configName);
+        return configKey != null && configKey.internalConfig;
+    }
+
     /**
      * Returns the broker-level synonym config name for the given group config 
name,
      * or {@code Optional.empty()} if no broker-level synonym exists.
diff --git 
a/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java 
b/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java
index ac5e33dc70b..f324c9c1974 100644
--- 
a/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java
+++ 
b/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java
@@ -632,10 +632,23 @@ public abstract class AbstractKafkaConfig extends 
AbstractConfig {
         return millis < 0 ? Long.valueOf(-1) : millis;
     }
 
+    /**
+     * Returns a map of group config names to their broker-level synonym 
values, used as
+     * defaults when building a {@link GroupConfig} for {@code 
DescribeConfigs}.
+     * Internal group configs are excluded unless their broker synonym was 
explicitly configured.
+     *
+     * @return a map of group config names to their corresponding broker-level 
values
+     */
     public Map<String, Object> extractGroupConfigMap() {
         Map<String, Object> defaults = new HashMap<>();
-        GroupConfig.ALL_GROUP_CONFIG_SYNONYMS.forEach((groupConfigName, 
brokerConfigName) ->
-            brokerConfigName.ifPresent(name -> defaults.put(groupConfigName, 
get(name)))
+        Map<String, Object> brokerOriginals = originals();
+        GroupConfig.configNames().forEach(groupConfigName ->
+            
GroupConfig.brokerSynonym(groupConfigName).ifPresent(brokerConfigName -> {
+                // Skip internal configs unless they are explicitly configured 
via the broker synonym.
+                if (!GroupConfig.isInternal(groupConfigName) || 
brokerOriginals.containsKey(brokerConfigName)) {
+                    defaults.put(groupConfigName, get(brokerConfigName));
+                }
+            })
         );
         return defaults;
     }
diff --git 
a/server/src/test/java/org/apache/kafka/server/config/AbstractKafkaConfigTest.java
 
b/server/src/test/java/org/apache/kafka/server/config/AbstractKafkaConfigTest.java
index 3600d80253b..d7243c6192e 100644
--- 
a/server/src/test/java/org/apache/kafka/server/config/AbstractKafkaConfigTest.java
+++ 
b/server/src/test/java/org/apache/kafka/server/config/AbstractKafkaConfigTest.java
@@ -16,18 +16,31 @@
  */
 package org.apache.kafka.server.config;
 
+import org.apache.kafka.common.Reconfigurable;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.coordinator.group.GroupConfig;
 import org.apache.kafka.raft.KRaftConfigs;
 
 import org.junit.jupiter.api.Test;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
 
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mockStatic;
 
 public class AbstractKafkaConfigTest {
 
+    private static final String TEST_INTERNAL_GROUP_CONFIG = 
"group.test.internal.config";
+    private static final String TEST_INTERNAL_GROUP_CONFIG_BROKER_SYNONYM = 
"group.test.internal.config.broker.synonym";
+
     @Test
     public void testPopulateSynonymsOnEmptyMap() {
         assertEquals(Collections.emptyMap(), 
AbstractKafkaConfig.populateSynonyms(Collections.emptyMap()));
@@ -52,4 +65,51 @@ public class AbstractKafkaConfigTest {
         expectedOutput.put(KRaftConfigs.NODE_ID_CONFIG, "4");
         assertEquals(expectedOutput, 
AbstractKafkaConfig.populateSynonyms(input));
     }
+
+    @Test
+    public void 
testExtractGroupConfigMapExcludesInternalConfigWithUnconfiguredBrokerSynonym() {
+        Map<String, Object> config = extractGroupConfigMap(Map.of(), true);
+
+        assertFalse(config.containsKey(TEST_INTERNAL_GROUP_CONFIG));
+    }
+
+    @Test
+    public void 
testExtractGroupConfigMapIncludesInternalConfigWithConfiguredBrokerSynonym() {
+        Map<String, Object> config = 
extractGroupConfigMap(Map.of(TEST_INTERNAL_GROUP_CONFIG_BROKER_SYNONYM, 
"override-value"), true);
+
+        assertTrue(config.containsKey(TEST_INTERNAL_GROUP_CONFIG));
+        assertEquals("override-value", config.get(TEST_INTERNAL_GROUP_CONFIG));
+    }
+
+    @Test
+    public void testExtractGroupConfigMapIncludesNonInternalConfig() {
+        Map<String, Object> config = extractGroupConfigMap(Map.of(), false);
+
+        assertTrue(config.containsKey(TEST_INTERNAL_GROUP_CONFIG));
+        assertEquals("default-value", config.get(TEST_INTERNAL_GROUP_CONFIG));
+    }
+
+    private static Map<String, Object> extractGroupConfigMap(Map<String, 
Object> brokerProps, boolean isInternal) {
+        try (MockedStatic<GroupConfig> mocked = mockStatic(GroupConfig.class, 
Mockito.CALLS_REAL_METHODS)) {
+
+            // mock group config
+            
mocked.when(GroupConfig::configNames).thenReturn(Set.of(TEST_INTERNAL_GROUP_CONFIG));
+            mocked.when(() -> 
GroupConfig.brokerSynonym(TEST_INTERNAL_GROUP_CONFIG)).thenReturn(Optional.of(TEST_INTERNAL_GROUP_CONFIG_BROKER_SYNONYM));
+            mocked.when(() -> 
GroupConfig.isInternal(TEST_INTERNAL_GROUP_CONFIG)).thenReturn(isInternal);
+
+            // mock broker synonym config
+            ConfigDef configDef = new 
ConfigDef().define(TEST_INTERNAL_GROUP_CONFIG_BROKER_SYNONYM, 
ConfigDef.Type.STRING,
+                "default-value", ConfigDef.Importance.LOW, "test broker 
synonym");
+
+            AbstractKafkaConfig kafkaConfig = new 
AbstractKafkaConfig(configDef, new HashMap<>(brokerProps), Map.of(), false) {
+                @Override
+                public void addReconfigurable(Reconfigurable reconfigurable) { 
}
+
+                @Override
+                public void removeReconfigurable(Reconfigurable 
reconfigurable) { }
+            };
+
+            return kafkaConfig.extractGroupConfigMap();
+        }
+    }
 }

Reply via email to