This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch 4.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.3 by this push:
     new 72535ff011f KAFKA-20289: Fix DescribeConfigs to correctly resolve 
broker synonyms for group configs (#21855)
72535ff011f is described below

commit 72535ff011f138716c672e0a0dce87d57fd154f9
Author: majialong <[email protected]>
AuthorDate: Thu Apr 2 02:50:11 2026 +0800

    KAFKA-20289: Fix DescribeConfigs to correctly resolve broker synonyms for 
group configs (#21855)
    
    `createGroupConfigEntry` used the group config name (e.g.
    `consumer.session.timeout.ms`) directly  to look up broker synonyms,
    which could produce incorrect synonym chains.
    
    This change introduces `ALL_GROUP_CONFIG_SYNONYMS` in `GroupConfig` , an
    explicit mapping from each group config to its broker-level synonym (or
    empty for configs without one, e.g. `share.auto.offset.reset`).
    
    Reviewers: Chia-Ping Tsai <[email protected]>, David Jacot
     <[email protected]>
---
 .../admin/DynamicGroupConfigIntegrationTest.java   | 308 +++++++++++++++++++++
 .../clients/admin/StaticBrokerConfigTest.java      |   2 +-
 .../src/main/scala/kafka/admin/ConfigCommand.scala |   2 +-
 .../src/main/scala/kafka/server/ConfigHelper.scala |  19 +-
 .../kafka/coordinator/group/GroupConfig.java       |  57 +++-
 .../kafka/coordinator/group/GroupConfigTest.java   |  30 ++
 .../kafka/server/config/AbstractKafkaConfig.java   |   9 +
 .../config/DefaultSupportedConfigChecker.java      |   2 +-
 .../kafka/tools/ConfigCommandIntegrationTest.java  |  12 +-
 9 files changed, 417 insertions(+), 24 deletions(-)

diff --git 
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/DynamicGroupConfigIntegrationTest.java
 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/DynamicGroupConfigIntegrationTest.java
new file mode 100644
index 00000000000..01b0d73b299
--- /dev/null
+++ 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/DynamicGroupConfigIntegrationTest.java
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.test.ClusterInstance;
+import org.apache.kafka.common.test.api.ClusterConfigProperty;
+import org.apache.kafka.common.test.api.ClusterTest;
+import org.apache.kafka.common.test.api.Type;
+import org.apache.kafka.coordinator.group.GroupConfig;
+import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
+
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class DynamicGroupConfigIntegrationTest {
+
+    @ClusterTest(types = {Type.KRAFT})
+    public void 
testDescribeGroupConfigSynonymsWithBrokerSynonym(ClusterInstance cluster) 
throws Exception {
+        try (var admin = cluster.admin()) {
+            var group = "synonym-test-group";
+            var groupResource = new ConfigResource(ConfigResource.Type.GROUP, 
group);
+            var brokerResource = new 
ConfigResource(ConfigResource.Type.BROKER, "0");
+            var brokerDefaultResource = new 
ConfigResource(ConfigResource.Type.BROKER, "");
+
+            // Verify default config only.
+            // Expected synonym chain: DEFAULT_CONFIG
+            assertGroupConfig(
+                admin,
+                groupResource,
+                GroupConfig.CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG,
+                
String.valueOf(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_DEFAULT),
+                ConfigEntry.ConfigSource.DEFAULT_CONFIG,
+                List.of(
+                    
Map.entry(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG,
+                        ConfigEntry.ConfigSource.DEFAULT_CONFIG))
+            );
+
+            // Set per-broker dynamic config.
+            // Expected synonym chain: DYNAMIC_BROKER_CONFIG -> DEFAULT_CONFIG
+            alterConfig(admin, cluster, brokerResource, 
GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, "1500");
+
+            assertGroupConfig(
+                admin,
+                groupResource,
+                GroupConfig.CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG,
+                "1500",
+                ConfigEntry.ConfigSource.DYNAMIC_BROKER_CONFIG,
+                List.of(
+                    
Map.entry(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG,
+                        ConfigEntry.ConfigSource.DYNAMIC_BROKER_CONFIG),
+                    
Map.entry(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG,
+                        ConfigEntry.ConfigSource.DEFAULT_CONFIG))
+            );
+
+            // Set dynamic default broker config; per-broker config still 
takes precedence.
+            // Expected synonym chain: DYNAMIC_BROKER_CONFIG -> 
DYNAMIC_DEFAULT_BROKER_CONFIG -> DEFAULT_CONFIG
+            alterConfig(admin, cluster, brokerDefaultResource, 
GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, "2000");
+
+            assertGroupConfig(
+                admin,
+                groupResource,
+                GroupConfig.CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG,
+                "1500",
+                ConfigEntry.ConfigSource.DYNAMIC_BROKER_CONFIG,
+                List.of(
+                    
Map.entry(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG,
+                        ConfigEntry.ConfigSource.DYNAMIC_BROKER_CONFIG),
+                    
Map.entry(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG,
+                        
ConfigEntry.ConfigSource.DYNAMIC_DEFAULT_BROKER_CONFIG),
+                    
Map.entry(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG,
+                        ConfigEntry.ConfigSource.DEFAULT_CONFIG))
+            );
+
+            // Set group override; it takes precedence over all broker configs.
+            // Expected synonym chain: DYNAMIC_GROUP_CONFIG -> 
DYNAMIC_BROKER_CONFIG
+            // -> DYNAMIC_DEFAULT_BROKER_CONFIG -> DEFAULT_CONFIG
+            alterConfig(admin, cluster, groupResource, 
GroupConfig.CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG, "3000");
+
+            assertGroupConfig(
+                admin,
+                groupResource,
+                GroupConfig.CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG,
+                "3000",
+                ConfigEntry.ConfigSource.DYNAMIC_GROUP_CONFIG,
+                List.of(
+                    
Map.entry(GroupConfig.CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG,
+                        ConfigEntry.ConfigSource.DYNAMIC_GROUP_CONFIG),
+                    
Map.entry(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG,
+                        ConfigEntry.ConfigSource.DYNAMIC_BROKER_CONFIG),
+                    
Map.entry(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG,
+                        
ConfigEntry.ConfigSource.DYNAMIC_DEFAULT_BROKER_CONFIG),
+                    
Map.entry(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG,
+                        ConfigEntry.ConfigSource.DEFAULT_CONFIG))
+            );
+        }
+    }
+
+    @ClusterTest(types = {Type.KRAFT})
+    public void 
testDescribeGroupConfigSynonymsWithoutBrokerSynonym(ClusterInstance cluster) 
throws Exception {
+        try (var admin = cluster.admin()) {
+            var group = "synonym-no-broker-test-group";
+            var groupResource = new ConfigResource(ConfigResource.Type.GROUP, 
group);
+
+            // Verify default config for a config with no broker synonym.
+            // Expected synonym chain: DEFAULT_CONFIG
+            assertGroupConfig(
+                admin,
+                groupResource,
+                GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG,
+                GroupConfig.SHARE_AUTO_OFFSET_RESET_DEFAULT,
+                ConfigEntry.ConfigSource.DEFAULT_CONFIG,
+                List.of(
+                    Map.entry(GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG,
+                        ConfigEntry.ConfigSource.DEFAULT_CONFIG))
+            );
+
+            // Set group override; synonyms use group config name since there 
is no broker synonym.
+            // Expected synonym chain: DYNAMIC_GROUP_CONFIG -> DEFAULT_CONFIG
+            alterConfig(admin, cluster, groupResource, 
GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG, "earliest");
+
+            assertGroupConfig(
+                admin,
+                groupResource,
+                GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG,
+                "earliest",
+                ConfigEntry.ConfigSource.DYNAMIC_GROUP_CONFIG,
+                List.of(
+                    Map.entry(GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG,
+                        ConfigEntry.ConfigSource.DYNAMIC_GROUP_CONFIG),
+                    Map.entry(GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG,
+                        ConfigEntry.ConfigSource.DEFAULT_CONFIG))
+            );
+        }
+    }
+
+    @ClusterTest(types = {Type.KRAFT},
+        serverProperties = {
+            @ClusterConfigProperty(key = 
GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, value = 
"2000")
+        })
+    public void 
testDescribeGroupConfigSynonymsWithStaticBrokerConfig(ClusterInstance cluster) 
throws Exception {
+        try (var admin = cluster.admin()) {
+            var group = "synonym-static-test-group";
+            var groupResource = new ConfigResource(ConfigResource.Type.GROUP, 
group);
+            var brokerResource = new 
ConfigResource(ConfigResource.Type.BROKER, "0");
+            var brokerDefaultResource = new 
ConfigResource(ConfigResource.Type.BROKER, "");
+
+            // Verify static broker config is reflected in synonyms.
+            // Expected synonym chain: STATIC_BROKER_CONFIG -> DEFAULT_CONFIG
+            assertGroupConfig(
+                admin,
+                groupResource,
+                GroupConfig.CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG,
+                "2000",
+                ConfigEntry.ConfigSource.STATIC_BROKER_CONFIG,
+                List.of(
+                    
Map.entry(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG,
+                        ConfigEntry.ConfigSource.STATIC_BROKER_CONFIG),
+                    
Map.entry(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG,
+                        ConfigEntry.ConfigSource.DEFAULT_CONFIG))
+            );
+
+            // Set group override; it takes precedence over static broker 
config.
+            // Expected synonym chain: DYNAMIC_GROUP_CONFIG -> 
STATIC_BROKER_CONFIG -> DEFAULT_CONFIG
+            alterConfig(admin, cluster, groupResource, 
GroupConfig.CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG, "3000");
+
+            assertGroupConfig(
+                admin,
+                groupResource,
+                GroupConfig.CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG,
+                "3000",
+                ConfigEntry.ConfigSource.DYNAMIC_GROUP_CONFIG,
+                List.of(
+                    
Map.entry(GroupConfig.CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG,
+                        ConfigEntry.ConfigSource.DYNAMIC_GROUP_CONFIG),
+                    
Map.entry(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG,
+                        ConfigEntry.ConfigSource.STATIC_BROKER_CONFIG),
+                    
Map.entry(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG,
+                        ConfigEntry.ConfigSource.DEFAULT_CONFIG))
+            );
+
+            // Add dynamic default broker config.
+            // Expected synonym chain: DYNAMIC_GROUP_CONFIG -> 
DYNAMIC_DEFAULT_BROKER_CONFIG
+            // -> STATIC_BROKER_CONFIG -> DEFAULT_CONFIG
+            alterConfig(admin, cluster, brokerDefaultResource, 
GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, "4000");
+
+            assertGroupConfig(
+                admin,
+                groupResource,
+                GroupConfig.CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG,
+                "3000",
+                ConfigEntry.ConfigSource.DYNAMIC_GROUP_CONFIG,
+                List.of(
+                    
Map.entry(GroupConfig.CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG,
+                        ConfigEntry.ConfigSource.DYNAMIC_GROUP_CONFIG),
+                    
Map.entry(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG,
+                        
ConfigEntry.ConfigSource.DYNAMIC_DEFAULT_BROKER_CONFIG),
+                    
Map.entry(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG,
+                        ConfigEntry.ConfigSource.STATIC_BROKER_CONFIG),
+                    
Map.entry(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG,
+                        ConfigEntry.ConfigSource.DEFAULT_CONFIG)));
+
+            // Add per-broker dynamic config to complete the full 5-layer 
synonym chain.
+            // Expected synonym chain: DYNAMIC_GROUP_CONFIG -> 
DYNAMIC_BROKER_CONFIG -> DYNAMIC_DEFAULT_BROKER_CONFIG
+            // -> STATIC_BROKER_CONFIG -> DEFAULT_CONFIG
+            alterConfig(admin, cluster, brokerResource, 
GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, "5000");
+
+            assertGroupConfig(
+                admin,
+                groupResource,
+                GroupConfig.CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG,
+                "3000",
+                ConfigEntry.ConfigSource.DYNAMIC_GROUP_CONFIG,
+                List.of(
+                    
Map.entry(GroupConfig.CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG,
+                        ConfigEntry.ConfigSource.DYNAMIC_GROUP_CONFIG),
+                    
Map.entry(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG,
+                        ConfigEntry.ConfigSource.DYNAMIC_BROKER_CONFIG),
+                    
Map.entry(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG,
+                        
ConfigEntry.ConfigSource.DYNAMIC_DEFAULT_BROKER_CONFIG),
+                    
Map.entry(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG,
+                        ConfigEntry.ConfigSource.STATIC_BROKER_CONFIG),
+                    
Map.entry(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG,
+                        ConfigEntry.ConfigSource.DEFAULT_CONFIG))
+            );
+
+            // Delete group override; value falls back to per-broker dynamic 
config.
+            // Expected synonym chain: DYNAMIC_BROKER_CONFIG -> 
DYNAMIC_DEFAULT_BROKER_CONFIG
+            // -> STATIC_BROKER_CONFIG -> DEFAULT_CONFIG
+            deleteConfig(admin, cluster, groupResource, 
GroupConfig.CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG);
+
+            assertGroupConfig(
+                admin,
+                groupResource,
+                GroupConfig.CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG,
+                "5000",
+                ConfigEntry.ConfigSource.DYNAMIC_BROKER_CONFIG,
+                List.of(
+                    
Map.entry(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG,
+                        ConfigEntry.ConfigSource.DYNAMIC_BROKER_CONFIG),
+                    
Map.entry(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG,
+                        
ConfigEntry.ConfigSource.DYNAMIC_DEFAULT_BROKER_CONFIG),
+                    
Map.entry(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG,
+                        ConfigEntry.ConfigSource.STATIC_BROKER_CONFIG),
+                    
Map.entry(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG,
+                        ConfigEntry.ConfigSource.DEFAULT_CONFIG)));
+        }
+    }
+
+    private static void alterConfig(
+        Admin admin,
+        ClusterInstance cluster,
+        ConfigResource resource,
+        String key,
+        String value
+    ) throws Exception {
+        admin.incrementalAlterConfigs(Map.of(resource, List.of(
+            new AlterConfigOp(new ConfigEntry(key, value), 
AlterConfigOp.OpType.SET)
+        ))).all().get();
+        cluster.ensureConsistentMetadata();
+    }
+
+    private static void deleteConfig(
+        Admin admin,
+        ClusterInstance cluster,
+        ConfigResource resource,
+        String key
+    ) throws Exception {
+        admin.incrementalAlterConfigs(Map.of(resource, List.of(
+            new AlterConfigOp(new ConfigEntry(key, ""), 
AlterConfigOp.OpType.DELETE)
+        ))).all().get();
+        cluster.ensureConsistentMetadata();
+    }
+
+    private static void assertGroupConfig(
+        Admin admin,
+        ConfigResource groupResource,
+        String configKey,
+        String expectedValue,
+        ConfigEntry.ConfigSource expectedSource,
+        List<Map.Entry<String, ConfigEntry.ConfigSource>> expectedSynonyms
+    ) throws Exception {
+        DescribeConfigsOptions options = new 
DescribeConfigsOptions().includeSynonyms(true);
+        ConfigEntry entry = admin.describeConfigs(List.of(groupResource), 
options)
+            .all().get().get(groupResource).get(configKey);
+        assertEquals(expectedValue, entry.value());
+        assertEquals(expectedSource, entry.source());
+        assertEquals(expectedSynonyms, entry.synonyms().stream()
+            .map(s -> Map.entry(s.name(), s.source()))
+            .toList());
+    }
+}
diff --git 
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/StaticBrokerConfigTest.java
 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/StaticBrokerConfigTest.java
index d880449d4ee..6f35f7ec307 100644
--- 
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/StaticBrokerConfigTest.java
+++ 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/StaticBrokerConfigTest.java
@@ -155,7 +155,7 @@ public class StaticBrokerConfigTest {
 
             // test for case ConfigResource.Type == GROUP
             Config groupConfig = configResourceMap.get(groupResource);
-            assertNotContainsAnyInternalConfig(groupConfig, 
GroupConfig.configDef().configKeys());
+            assertNotContainsAnyInternalConfig(groupConfig, 
GroupConfig.CONFIG_DEF.configKeys());
 
             // test for case ConfigResource.Type == CLIENT_METRICS
             Config clientMetricsConfig = 
configResourceMap.get(clientMetricsResource);
diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala 
b/core/src/main/scala/kafka/admin/ConfigCommand.scala
index 8cc46dadb06..0166a64e67f 100644
--- a/core/src/main/scala/kafka/admin/ConfigCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala
@@ -580,7 +580,7 @@ object ConfigCommand extends Logging {
       "For entity-type '" + ClientType + "': " + 
QuotaConfig.userAndClientQuotaConfigs().names.asScala.toSeq.sorted.map("\t" + 
_).mkString(nl, nl, nl) +
       "For entity-type '" + IpType + "': " + 
QuotaConfig.ipConfigs.names.asScala.toSeq.sorted.map("\t" + _).mkString(nl, nl, 
nl) +
       "For entity-type '" + ClientMetricsType + "': " + 
ClientMetricsConfigs.configDef().names.asScala.toSeq.sorted.map("\t" + 
_).mkString(nl, nl, nl) +
-      "For entity-type '" + GroupType + "': " + 
GroupConfig.configDef().names.asScala.toSeq.sorted.map("\t" + _).mkString(nl, 
nl, nl) +
+      "For entity-type '" + GroupType + "': " + 
GroupConfig.CONFIG_DEF.names.asScala.toSeq.sorted.map("\t" + _).mkString(nl, 
nl, nl) +
       s"Entity types '$UserType' and '$ClientType' may be specified together 
to update config for clients of a specific user.")
       .withRequiredArg
       .ofType(classOf[String])
diff --git a/core/src/main/scala/kafka/server/ConfigHelper.scala 
b/core/src/main/scala/kafka/server/ConfigHelper.scala
index 5c32c14eb2d..2f7a86b4eda 100644
--- a/core/src/main/scala/kafka/server/ConfigHelper.scala
+++ b/core/src/main/scala/kafka/server/ConfigHelper.scala
@@ -133,7 +133,7 @@ class ConfigHelper(metadataCache: MetadataCache, config: 
KafkaConfig, configRepo
               throw new InvalidRequestException("Group name must not be empty")
             } else {
               val groupProps = configRepository.groupConfig(group)
-              val groupConfig = 
GroupConfig.fromProps(config.groupCoordinatorConfig.extractGroupConfigMap(config.shareGroupConfig),
 groupProps)
+              val groupConfig = 
GroupConfig.fromProps(config.extractGroupConfigMap, groupProps)
               createResponseConfig(resource, groupConfig, 
createGroupConfigEntry(groupConfig, groupProps, includeSynonyms, 
includeDocumentation)(_, _))
             }
 
@@ -161,15 +161,24 @@ class ConfigHelper(metadataCache: MetadataCache, config: 
KafkaConfig, configRepo
 
   private def createGroupConfigEntry(groupConfig: GroupConfig, groupProps: 
Properties, includeSynonyms: Boolean, includeDocumentation: Boolean)
                                     (name: String, value: Any): 
DescribeConfigsResponseData.DescribeConfigsResourceResult = {
-    val allNames = brokerSynonyms(name)
     val configEntryType = GroupConfig.configType(name).toScala
     val isSensitive = KafkaConfig.maybeSensitive(configEntryType)
     val valueAsString = if (isSensitive) null else 
ConfigDef.convertToString(value, configEntryType.orNull)
     val allSynonyms = {
-      val list = configSynonyms(name, allNames, isSensitive)
+      val list = GroupConfig.brokerSynonym(name).toScala match {
+        case Some(brokerName) =>
+          configSynonyms(brokerName, brokerSynonyms(brokerName), isSensitive)
+        case None =>
+          // No broker synonym, fall back to GroupConfig defaults
+          Option(GroupConfig.CONFIG_DEF.defaultValues().get(name))
+            .map(v => List(new 
DescribeConfigsResponseData.DescribeConfigsSynonym()
+              .setName(name)
+              .setValue(if (isSensitive) null else 
ConfigDef.convertToString(v, configEntryType.orNull))
+              .setSource(ConfigSource.DEFAULT_CONFIG.id)))
+            .getOrElse(List.empty)
+      }
       if (!groupProps.containsKey(name))
-        new 
DescribeConfigsResponseData.DescribeConfigsSynonym().setName(name).setValue(valueAsString)
-          .setSource(ConfigSource.DEFAULT_CONFIG.id) +: list
+        list
       else
         new 
DescribeConfigsResponseData.DescribeConfigsSynonym().setName(name).setValue(valueAsString)
           .setSource(ConfigSource.GROUP_CONFIG.id) +: list
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
index 8b1ce751ef0..0a6841ec324 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
@@ -157,7 +157,7 @@ public final class GroupConfig extends AbstractConfig {
 
     public final boolean shareRenewAcknowledgeEnable;
 
-    private static final ConfigDef CONFIG = new ConfigDef()
+    public static final ConfigDef CONFIG_DEF = new ConfigDef()
         .define(CONSUMER_SESSION_TIMEOUT_MS_CONFIG,
             INT,
             GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_DEFAULT,
@@ -281,8 +281,49 @@ public final class GroupConfig extends AbstractConfig {
             MEDIUM,
             GroupCoordinatorConfig.STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_DOC);
 
+    /**
+     * Mapping from GroupConfig name to its broker-level synonym config name.
+     * {@code Optional.empty()} indicates that the config has no broker-level 
synonym.
+     */
+    public static final Map<String, Optional<String>> 
ALL_GROUP_CONFIG_SYNONYMS = Map.ofEntries(
+        // Consumer group configs
+        Map.entry(CONSUMER_SESSION_TIMEOUT_MS_CONFIG, 
Optional.of(GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG)),
+        Map.entry(CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, 
Optional.of(GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG)),
+        Map.entry(CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG, 
Optional.of(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG)),
+        Map.entry(CONSUMER_ASSIGNOR_OFFLOAD_ENABLE_CONFIG, 
Optional.of(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG)),
+
+        // Share group configs
+        Map.entry(SHARE_SESSION_TIMEOUT_MS_CONFIG, 
Optional.of(GroupCoordinatorConfig.SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG)),
+        Map.entry(SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, 
Optional.of(GroupCoordinatorConfig.SHARE_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG)),
+        Map.entry(SHARE_RECORD_LOCK_DURATION_MS_CONFIG, 
Optional.of(ShareGroupConfig.SHARE_GROUP_RECORD_LOCK_DURATION_MS_CONFIG)),
+        Map.entry(SHARE_DELIVERY_COUNT_LIMIT_CONFIG, 
Optional.of(ShareGroupConfig.SHARE_GROUP_DELIVERY_COUNT_LIMIT_CONFIG)),
+        Map.entry(SHARE_PARTITION_MAX_RECORD_LOCKS_CONFIG, 
Optional.of(ShareGroupConfig.SHARE_GROUP_PARTITION_MAX_RECORD_LOCKS_CONFIG)),
+        Map.entry(SHARE_AUTO_OFFSET_RESET_CONFIG, Optional.empty()),
+        Map.entry(SHARE_ISOLATION_LEVEL_CONFIG, Optional.empty()),
+        Map.entry(SHARE_RENEW_ACKNOWLEDGE_ENABLE_CONFIG, Optional.empty()),
+        Map.entry(SHARE_ASSIGNMENT_INTERVAL_MS_CONFIG, 
Optional.of(GroupCoordinatorConfig.SHARE_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG)),
+        Map.entry(SHARE_ASSIGNOR_OFFLOAD_ENABLE_CONFIG, 
Optional.of(GroupCoordinatorConfig.SHARE_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG)),
+
+        // Streams group configs
+        Map.entry(STREAMS_SESSION_TIMEOUT_MS_CONFIG, 
Optional.of(GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG)),
+        Map.entry(STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG, 
Optional.of(GroupCoordinatorConfig.STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG)),
+        Map.entry(STREAMS_NUM_STANDBY_REPLICAS_CONFIG, 
Optional.of(GroupCoordinatorConfig.STREAMS_GROUP_NUM_STANDBY_REPLICAS_CONFIG)),
+        Map.entry(STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG, 
Optional.of(GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG)),
+        Map.entry(STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG, 
Optional.of(GroupCoordinatorConfig.STREAMS_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG)),
+        Map.entry(STREAMS_ASSIGNOR_OFFLOAD_ENABLE_CONFIG, 
Optional.of(GroupCoordinatorConfig.STREAMS_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG)),
+        Map.entry(STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG, 
Optional.of(GroupCoordinatorConfig.STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_CONFIG))
+    );
+
+    /**
+     * Returns the broker-level synonym config name for the given group config 
name,
+     * or {@code Optional.empty()} if no broker-level synonym exists.
+     */
+    public static Optional<String> brokerSynonym(String groupConfigName) {
+        return ALL_GROUP_CONFIG_SYNONYMS.getOrDefault(groupConfigName, 
Optional.empty());
+    }
+
     public GroupConfig(Map<?, ?> props) {
-        super(CONFIG, props, false);
+        super(CONFIG_DEF, props, false);
         this.consumerSessionTimeoutMs = 
getInt(CONSUMER_SESSION_TIMEOUT_MS_CONFIG);
         this.consumerHeartbeatIntervalMs = 
getInt(CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG);
         // These have to be optionals because their default group coordinator 
configs are dynamic,
@@ -327,16 +368,12 @@ public final class GroupConfig extends AbstractConfig {
         this.shareRenewAcknowledgeEnable = 
getBoolean(SHARE_RENEW_ACKNOWLEDGE_ENABLE_CONFIG);
     }
 
-    public static ConfigDef configDef() {
-        return CONFIG;
-    }
-
     public static Optional<Type> configType(String configName) {
-        return Optional.ofNullable(CONFIG.configKeys().get(configName)).map(c 
-> c.type);
+        return 
Optional.ofNullable(CONFIG_DEF.configKeys().get(configName)).map(c -> c.type);
     }
 
     public static Set<String> configNames() {
-        return CONFIG.names();
+        return CONFIG_DEF.names();
     }
 
     /**
@@ -356,7 +393,7 @@ public final class GroupConfig extends AbstractConfig {
      */
     @SuppressWarnings({"CyclomaticComplexity", "NPathComplexity"})
     private static void validateValues(Map<String, Object> unparsedMap, 
GroupCoordinatorConfig groupCoordinatorConfig, ShareGroupConfig 
shareGroupConfig) {
-        Map<String, Object> valueMaps = CONFIG.parse(unparsedMap);
+        Map<String, Object> valueMaps = CONFIG_DEF.parse(unparsedMap);
         int consumerHeartbeatInterval = (Integer) 
valueMaps.get(CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG);
         int consumerSessionTimeout = (Integer) 
valueMaps.get(CONSUMER_SESSION_TIMEOUT_MS_CONFIG);
         int consumerAssignmentIntervalMs = (Integer) 
valueMaps.get(CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG);
@@ -899,6 +936,6 @@ public final class GroupConfig extends AbstractConfig {
     }
 
     public static void main(String[] args) {
-        System.out.println(CONFIG.toHtml(4, config -> "groupconfigs_" + 
config));
+        System.out.println(CONFIG_DEF.toHtml(4, config -> "groupconfigs_" + 
config));
     }
 }
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
index 139aa4ab3ce..d1468b1eba1 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
@@ -28,9 +28,11 @@ import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.MethodSource;
 
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Properties;
+import java.util.Set;
 import java.util.stream.Stream;
 
 import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.CONSUMER_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_DEFAULT;
@@ -637,6 +639,34 @@ public class GroupConfigTest {
         assertEquals(expectedMin, result.get(key));
     }
 
+    @Test
+    public void testAllGroupConfigSynonyms() {
+        // Every GroupConfig entry should have an entry in 
ALL_GROUP_CONFIG_SYNONYMS.
+        for (String groupConfigName : GroupConfig.CONFIG_DEF.names()) {
+            
assertTrue(GroupConfig.ALL_GROUP_CONFIG_SYNONYMS.containsKey(groupConfigName),
+                "GroupConfig entry '" + groupConfigName + "' is not in 
ALL_GROUP_CONFIG_SYNONYMS. " +
+                    "Add it with Optional.of(brokerConfigName) or 
Optional.empty() if it has no broker synonym.");
+        }
+
+        // Every key in ALL_GROUP_CONFIG_SYNONYMS should be a valid 
GroupConfig entry.
+        for (String key : GroupConfig.ALL_GROUP_CONFIG_SYNONYMS.keySet()) {
+            assertTrue(GroupConfig.CONFIG_DEF.names().contains(key),
+                "ALL_GROUP_CONFIG_SYNONYMS contains '" + key + "' which is not 
a valid GroupConfig entry.");
+        }
+
+        // Every present synonym mapping should point to a valid broker config.
+        Set<String> brokerConfigNames = new HashSet<>();
+        brokerConfigNames.addAll(GroupCoordinatorConfig.CONFIG_DEF.names());
+        brokerConfigNames.addAll(ShareGroupConfig.CONFIG_DEF.names());
+
+        for (Map.Entry<String, Optional<String>> entry : 
GroupConfig.ALL_GROUP_CONFIG_SYNONYMS.entrySet()) {
+            entry.getValue().ifPresent(brokerConfigName ->
+                assertTrue(brokerConfigNames.contains(brokerConfigName),
+                    "ALL_GROUP_CONFIG_SYNONYMS maps '" + entry.getKey() + "' 
to '" +
+                        brokerConfigName + "' but this broker config does not 
exist."));
+        }
+    }
+
     private Map<String, String> createValidGroupConfig() {
         Map<String, String> props = new HashMap<>();
         props.put(GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG, "45000");
diff --git 
a/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java 
b/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java
index 9baa67c1515..e1542923af1 100644
--- 
a/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java
+++ 
b/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java
@@ -26,6 +26,7 @@ import 
org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
 import org.apache.kafka.common.network.ListenerName;
 import org.apache.kafka.common.security.auth.SecurityProtocol;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.coordinator.group.GroupConfig;
 import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
 import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig;
 import org.apache.kafka.coordinator.share.ShareCoordinatorConfig;
@@ -350,4 +351,12 @@ public abstract class AbstractKafkaConfig extends 
AbstractConfig {
      * @param reconfigurable the component to unregister
      */
     public abstract void removeReconfigurable(Reconfigurable reconfigurable);
+
+    public Map<String, Object> extractGroupConfigMap() {
+        Map<String, Object> defaults = new HashMap<>();
+        GroupConfig.ALL_GROUP_CONFIG_SYNONYMS.forEach((groupConfigName, 
brokerConfigName) ->
+            brokerConfigName.ifPresent(name -> defaults.put(groupConfigName, 
get(name)))
+        );
+        return defaults;
+    }
 }
diff --git 
a/server/src/main/java/org/apache/kafka/server/config/DefaultSupportedConfigChecker.java
 
b/server/src/main/java/org/apache/kafka/server/config/DefaultSupportedConfigChecker.java
index 301d9e1d6bf..1c3976750dd 100644
--- 
a/server/src/main/java/org/apache/kafka/server/config/DefaultSupportedConfigChecker.java
+++ 
b/server/src/main/java/org/apache/kafka/server/config/DefaultSupportedConfigChecker.java
@@ -65,7 +65,7 @@ public final class DefaultSupportedConfigChecker implements 
SupportedConfigCheck
             ConfigResource.Type.TOPIC, new SetContainsPredicate(new 
HashSet<>(LogConfig.configNames())),
             ConfigResource.Type.BROKER, ignore -> true,
             ConfigResource.Type.CLIENT_METRICS, new 
SetContainsPredicate(ClientMetricsConfigs.configDef().names()),
-            ConfigResource.Type.GROUP, new 
SetContainsPredicate(GroupConfig.configDef().names())
+            ConfigResource.Type.GROUP, new 
SetContainsPredicate(GroupConfig.CONFIG_DEF.names())
         );
     }
 
diff --git 
a/tools/src/test/java/org/apache/kafka/tools/ConfigCommandIntegrationTest.java 
b/tools/src/test/java/org/apache/kafka/tools/ConfigCommandIntegrationTest.java
index ccc356820d1..9cb8ef7f3b1 100644
--- 
a/tools/src/test/java/org/apache/kafka/tools/ConfigCommandIntegrationTest.java
+++ 
b/tools/src/test/java/org/apache/kafka/tools/ConfigCommandIntegrationTest.java
@@ -137,7 +137,7 @@ public class ConfigCommandIntegrationTest {
             "--describe"));
         message = captureStandardOut(run(command));
         assertTrue(message.contains("Dynamic configs for group group are:"));
-        assertTrue(message.contains("consumer.session.timeout.ms=50000 
sensitive=false 
synonyms={DYNAMIC_GROUP_CONFIG:consumer.session.timeout.ms=50000}"));
+        assertTrue(message.contains("consumer.session.timeout.ms=50000 
sensitive=false 
synonyms={DYNAMIC_GROUP_CONFIG:consumer.session.timeout.ms=50000, 
DEFAULT_CONFIG:group.consumer.session.timeout.ms=45000}"));
 
         command = Stream.concat(quorumArgs(), Stream.of(
             "--entity-type", "groups",
@@ -145,7 +145,7 @@ public class ConfigCommandIntegrationTest {
             "--describe"));
         message = captureStandardOut(run(command));
         assertTrue(message.contains("Dynamic configs for group group are:"));
-        assertTrue(message.contains("consumer.session.timeout.ms=50000 
sensitive=false 
synonyms={DYNAMIC_GROUP_CONFIG:consumer.session.timeout.ms=50000}"));
+        assertTrue(message.contains("consumer.session.timeout.ms=50000 
sensitive=false 
synonyms={DYNAMIC_GROUP_CONFIG:consumer.session.timeout.ms=50000, 
DEFAULT_CONFIG:group.consumer.session.timeout.ms=45000}"));
     }
 
     @ClusterTest(serverProperties = {
@@ -293,10 +293,10 @@ public class ConfigCommandIntegrationTest {
             "--describe", "--all"));
         String message = captureStandardOut(run(command));
 
-        assertTrue(message.contains("streams.heartbeat.interval.ms=5000 
sensitive=false synonyms={DEFAULT_CONFIG:streams.heartbeat.interval.ms=5000}"));
-        assertTrue(message.contains("streams.num.standby.replicas=0 
sensitive=false synonyms={DEFAULT_CONFIG:streams.num.standby.replicas=0}"));
-        assertTrue(message.contains("streams.session.timeout.ms=45000 
sensitive=false synonyms={DEFAULT_CONFIG:streams.session.timeout.ms=45000}"));
-        assertTrue(message.contains("streams.task.offset.interval.ms=60000 
sensitive=false 
synonyms={DEFAULT_CONFIG:streams.task.offset.interval.ms=60000}"));
+        assertTrue(message.contains("streams.heartbeat.interval.ms=5000 
sensitive=false 
synonyms={DEFAULT_CONFIG:group.streams.heartbeat.interval.ms=5000}"));
+        assertTrue(message.contains("streams.num.standby.replicas=0 
sensitive=false 
synonyms={DEFAULT_CONFIG:group.streams.num.standby.replicas=0}"));
+        assertTrue(message.contains("streams.session.timeout.ms=45000 
sensitive=false 
synonyms={DEFAULT_CONFIG:group.streams.session.timeout.ms=45000}"));
+        assertTrue(message.contains("streams.task.offset.interval.ms=60000 
sensitive=false 
synonyms={DEFAULT_CONFIG:group.streams.task.offset.interval.ms=60000}"));
     }
 
     @ClusterTest


Reply via email to