This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch 4.3
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.3 by this push:
new 72535ff011f KAFKA-20289: Fix DescribeConfigs to correctly resolve
broker synonyms for group configs (#21855)
72535ff011f is described below
commit 72535ff011f138716c672e0a0dce87d57fd154f9
Author: majialong <[email protected]>
AuthorDate: Thu Apr 2 02:50:11 2026 +0800
KAFKA-20289: Fix DescribeConfigs to correctly resolve broker synonyms for
group configs (#21855)
`createGroupConfigEntry` used the group config name (e.g.
`consumer.session.timeout.ms`) directly to look up broker synonyms,
which could produce incorrect synonym chains.
This change introduces `ALL_GROUP_CONFIG_SYNONYMS` in `GroupConfig` , an
explicit mapping from each group config to its broker-level synonym (or
empty for configs without one, e.g. `share.auto.offset.reset`).
Reviewers: Chia-Ping Tsai <[email protected]>, David Jacot
<[email protected]>
---
.../admin/DynamicGroupConfigIntegrationTest.java | 308 +++++++++++++++++++++
.../clients/admin/StaticBrokerConfigTest.java | 2 +-
.../src/main/scala/kafka/admin/ConfigCommand.scala | 2 +-
.../src/main/scala/kafka/server/ConfigHelper.scala | 19 +-
.../kafka/coordinator/group/GroupConfig.java | 57 +++-
.../kafka/coordinator/group/GroupConfigTest.java | 30 ++
.../kafka/server/config/AbstractKafkaConfig.java | 9 +
.../config/DefaultSupportedConfigChecker.java | 2 +-
.../kafka/tools/ConfigCommandIntegrationTest.java | 12 +-
9 files changed, 417 insertions(+), 24 deletions(-)
diff --git
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/DynamicGroupConfigIntegrationTest.java
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/DynamicGroupConfigIntegrationTest.java
new file mode 100644
index 00000000000..01b0d73b299
--- /dev/null
+++
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/DynamicGroupConfigIntegrationTest.java
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.test.ClusterInstance;
+import org.apache.kafka.common.test.api.ClusterConfigProperty;
+import org.apache.kafka.common.test.api.ClusterTest;
+import org.apache.kafka.common.test.api.Type;
+import org.apache.kafka.coordinator.group.GroupConfig;
+import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
+
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class DynamicGroupConfigIntegrationTest {
+
+ @ClusterTest(types = {Type.KRAFT})
+ public void
testDescribeGroupConfigSynonymsWithBrokerSynonym(ClusterInstance cluster)
throws Exception {
+ try (var admin = cluster.admin()) {
+ var group = "synonym-test-group";
+ var groupResource = new ConfigResource(ConfigResource.Type.GROUP,
group);
+ var brokerResource = new
ConfigResource(ConfigResource.Type.BROKER, "0");
+ var brokerDefaultResource = new
ConfigResource(ConfigResource.Type.BROKER, "");
+
+ // Verify default config only.
+ // Expected synonym chain: DEFAULT_CONFIG
+ assertGroupConfig(
+ admin,
+ groupResource,
+ GroupConfig.CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG,
+
String.valueOf(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_DEFAULT),
+ ConfigEntry.ConfigSource.DEFAULT_CONFIG,
+ List.of(
+
Map.entry(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG,
+ ConfigEntry.ConfigSource.DEFAULT_CONFIG))
+ );
+
+ // Set per-broker dynamic config.
+ // Expected synonym chain: DYNAMIC_BROKER_CONFIG -> DEFAULT_CONFIG
+ alterConfig(admin, cluster, brokerResource,
GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, "1500");
+
+ assertGroupConfig(
+ admin,
+ groupResource,
+ GroupConfig.CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG,
+ "1500",
+ ConfigEntry.ConfigSource.DYNAMIC_BROKER_CONFIG,
+ List.of(
+
Map.entry(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG,
+ ConfigEntry.ConfigSource.DYNAMIC_BROKER_CONFIG),
+
Map.entry(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG,
+ ConfigEntry.ConfigSource.DEFAULT_CONFIG))
+ );
+
+ // Set dynamic default broker config; per-broker config still
takes precedence.
+ // Expected synonym chain: DYNAMIC_BROKER_CONFIG ->
DYNAMIC_DEFAULT_BROKER_CONFIG -> DEFAULT_CONFIG
+ alterConfig(admin, cluster, brokerDefaultResource,
GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, "2000");
+
+ assertGroupConfig(
+ admin,
+ groupResource,
+ GroupConfig.CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG,
+ "1500",
+ ConfigEntry.ConfigSource.DYNAMIC_BROKER_CONFIG,
+ List.of(
+
Map.entry(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG,
+ ConfigEntry.ConfigSource.DYNAMIC_BROKER_CONFIG),
+
Map.entry(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG,
+
ConfigEntry.ConfigSource.DYNAMIC_DEFAULT_BROKER_CONFIG),
+
Map.entry(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG,
+ ConfigEntry.ConfigSource.DEFAULT_CONFIG))
+ );
+
+ // Set group override; it takes precedence over all broker configs.
+ // Expected synonym chain: DYNAMIC_GROUP_CONFIG ->
DYNAMIC_BROKER_CONFIG
+ // -> DYNAMIC_DEFAULT_BROKER_CONFIG -> DEFAULT_CONFIG
+ alterConfig(admin, cluster, groupResource,
GroupConfig.CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG, "3000");
+
+ assertGroupConfig(
+ admin,
+ groupResource,
+ GroupConfig.CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG,
+ "3000",
+ ConfigEntry.ConfigSource.DYNAMIC_GROUP_CONFIG,
+ List.of(
+
Map.entry(GroupConfig.CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG,
+ ConfigEntry.ConfigSource.DYNAMIC_GROUP_CONFIG),
+
Map.entry(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG,
+ ConfigEntry.ConfigSource.DYNAMIC_BROKER_CONFIG),
+
Map.entry(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG,
+
ConfigEntry.ConfigSource.DYNAMIC_DEFAULT_BROKER_CONFIG),
+
Map.entry(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG,
+ ConfigEntry.ConfigSource.DEFAULT_CONFIG))
+ );
+ }
+ }
+
+ @ClusterTest(types = {Type.KRAFT})
+ public void
testDescribeGroupConfigSynonymsWithoutBrokerSynonym(ClusterInstance cluster)
throws Exception {
+ try (var admin = cluster.admin()) {
+ var group = "synonym-no-broker-test-group";
+ var groupResource = new ConfigResource(ConfigResource.Type.GROUP,
group);
+
+ // Verify default config for a config with no broker synonym.
+ // Expected synonym chain: DEFAULT_CONFIG
+ assertGroupConfig(
+ admin,
+ groupResource,
+ GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG,
+ GroupConfig.SHARE_AUTO_OFFSET_RESET_DEFAULT,
+ ConfigEntry.ConfigSource.DEFAULT_CONFIG,
+ List.of(
+ Map.entry(GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG,
+ ConfigEntry.ConfigSource.DEFAULT_CONFIG))
+ );
+
+ // Set group override; synonyms use group config name since there
is no broker synonym.
+ // Expected synonym chain: DYNAMIC_GROUP_CONFIG -> DEFAULT_CONFIG
+ alterConfig(admin, cluster, groupResource,
GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG, "earliest");
+
+ assertGroupConfig(
+ admin,
+ groupResource,
+ GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG,
+ "earliest",
+ ConfigEntry.ConfigSource.DYNAMIC_GROUP_CONFIG,
+ List.of(
+ Map.entry(GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG,
+ ConfigEntry.ConfigSource.DYNAMIC_GROUP_CONFIG),
+ Map.entry(GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG,
+ ConfigEntry.ConfigSource.DEFAULT_CONFIG))
+ );
+ }
+ }
+
+ @ClusterTest(types = {Type.KRAFT},
+ serverProperties = {
+ @ClusterConfigProperty(key =
GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, value =
"2000")
+ })
+ public void
testDescribeGroupConfigSynonymsWithStaticBrokerConfig(ClusterInstance cluster)
throws Exception {
+ try (var admin = cluster.admin()) {
+ var group = "synonym-static-test-group";
+ var groupResource = new ConfigResource(ConfigResource.Type.GROUP,
group);
+ var brokerResource = new
ConfigResource(ConfigResource.Type.BROKER, "0");
+ var brokerDefaultResource = new
ConfigResource(ConfigResource.Type.BROKER, "");
+
+ // Verify static broker config is reflected in synonyms.
+ // Expected synonym chain: STATIC_BROKER_CONFIG -> DEFAULT_CONFIG
+ assertGroupConfig(
+ admin,
+ groupResource,
+ GroupConfig.CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG,
+ "2000",
+ ConfigEntry.ConfigSource.STATIC_BROKER_CONFIG,
+ List.of(
+
Map.entry(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG,
+ ConfigEntry.ConfigSource.STATIC_BROKER_CONFIG),
+
Map.entry(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG,
+ ConfigEntry.ConfigSource.DEFAULT_CONFIG))
+ );
+
+ // Set group override; it takes precedence over static broker
config.
+ // Expected synonym chain: DYNAMIC_GROUP_CONFIG ->
STATIC_BROKER_CONFIG -> DEFAULT_CONFIG
+ alterConfig(admin, cluster, groupResource,
GroupConfig.CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG, "3000");
+
+ assertGroupConfig(
+ admin,
+ groupResource,
+ GroupConfig.CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG,
+ "3000",
+ ConfigEntry.ConfigSource.DYNAMIC_GROUP_CONFIG,
+ List.of(
+
Map.entry(GroupConfig.CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG,
+ ConfigEntry.ConfigSource.DYNAMIC_GROUP_CONFIG),
+
Map.entry(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG,
+ ConfigEntry.ConfigSource.STATIC_BROKER_CONFIG),
+
Map.entry(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG,
+ ConfigEntry.ConfigSource.DEFAULT_CONFIG))
+ );
+
+ // Add dynamic default broker config.
+ // Expected synonym chain: DYNAMIC_GROUP_CONFIG ->
DYNAMIC_DEFAULT_BROKER_CONFIG
+ // -> STATIC_BROKER_CONFIG -> DEFAULT_CONFIG
+ alterConfig(admin, cluster, brokerDefaultResource,
GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, "4000");
+
+ assertGroupConfig(
+ admin,
+ groupResource,
+ GroupConfig.CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG,
+ "3000",
+ ConfigEntry.ConfigSource.DYNAMIC_GROUP_CONFIG,
+ List.of(
+
Map.entry(GroupConfig.CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG,
+ ConfigEntry.ConfigSource.DYNAMIC_GROUP_CONFIG),
+
Map.entry(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG,
+
ConfigEntry.ConfigSource.DYNAMIC_DEFAULT_BROKER_CONFIG),
+
Map.entry(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG,
+ ConfigEntry.ConfigSource.STATIC_BROKER_CONFIG),
+
Map.entry(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG,
+ ConfigEntry.ConfigSource.DEFAULT_CONFIG)));
+
+ // Add per-broker dynamic config to complete the full 5-layer
synonym chain.
+ // Expected synonym chain: DYNAMIC_GROUP_CONFIG ->
DYNAMIC_BROKER_CONFIG -> DYNAMIC_DEFAULT_BROKER_CONFIG
+ // -> STATIC_BROKER_CONFIG -> DEFAULT_CONFIG
+ alterConfig(admin, cluster, brokerResource,
GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, "5000");
+
+ assertGroupConfig(
+ admin,
+ groupResource,
+ GroupConfig.CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG,
+ "3000",
+ ConfigEntry.ConfigSource.DYNAMIC_GROUP_CONFIG,
+ List.of(
+
Map.entry(GroupConfig.CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG,
+ ConfigEntry.ConfigSource.DYNAMIC_GROUP_CONFIG),
+
Map.entry(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG,
+ ConfigEntry.ConfigSource.DYNAMIC_BROKER_CONFIG),
+
Map.entry(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG,
+
ConfigEntry.ConfigSource.DYNAMIC_DEFAULT_BROKER_CONFIG),
+
Map.entry(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG,
+ ConfigEntry.ConfigSource.STATIC_BROKER_CONFIG),
+
Map.entry(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG,
+ ConfigEntry.ConfigSource.DEFAULT_CONFIG))
+ );
+
+ // Delete group override; value falls back to per-broker dynamic
config.
+ // Expected synonym chain: DYNAMIC_BROKER_CONFIG ->
DYNAMIC_DEFAULT_BROKER_CONFIG
+ // -> STATIC_BROKER_CONFIG -> DEFAULT_CONFIG
+ deleteConfig(admin, cluster, groupResource,
GroupConfig.CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG);
+
+ assertGroupConfig(
+ admin,
+ groupResource,
+ GroupConfig.CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG,
+ "5000",
+ ConfigEntry.ConfigSource.DYNAMIC_BROKER_CONFIG,
+ List.of(
+
Map.entry(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG,
+ ConfigEntry.ConfigSource.DYNAMIC_BROKER_CONFIG),
+
Map.entry(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG,
+
ConfigEntry.ConfigSource.DYNAMIC_DEFAULT_BROKER_CONFIG),
+
Map.entry(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG,
+ ConfigEntry.ConfigSource.STATIC_BROKER_CONFIG),
+
Map.entry(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG,
+ ConfigEntry.ConfigSource.DEFAULT_CONFIG)));
+ }
+ }
+
+ private static void alterConfig(
+ Admin admin,
+ ClusterInstance cluster,
+ ConfigResource resource,
+ String key,
+ String value
+ ) throws Exception {
+ admin.incrementalAlterConfigs(Map.of(resource, List.of(
+ new AlterConfigOp(new ConfigEntry(key, value),
AlterConfigOp.OpType.SET)
+ ))).all().get();
+ cluster.ensureConsistentMetadata();
+ }
+
+ private static void deleteConfig(
+ Admin admin,
+ ClusterInstance cluster,
+ ConfigResource resource,
+ String key
+ ) throws Exception {
+ admin.incrementalAlterConfigs(Map.of(resource, List.of(
+ new AlterConfigOp(new ConfigEntry(key, ""),
AlterConfigOp.OpType.DELETE)
+ ))).all().get();
+ cluster.ensureConsistentMetadata();
+ }
+
+ private static void assertGroupConfig(
+ Admin admin,
+ ConfigResource groupResource,
+ String configKey,
+ String expectedValue,
+ ConfigEntry.ConfigSource expectedSource,
+ List<Map.Entry<String, ConfigEntry.ConfigSource>> expectedSynonyms
+ ) throws Exception {
+ DescribeConfigsOptions options = new
DescribeConfigsOptions().includeSynonyms(true);
+ ConfigEntry entry = admin.describeConfigs(List.of(groupResource),
options)
+ .all().get().get(groupResource).get(configKey);
+ assertEquals(expectedValue, entry.value());
+ assertEquals(expectedSource, entry.source());
+ assertEquals(expectedSynonyms, entry.synonyms().stream()
+ .map(s -> Map.entry(s.name(), s.source()))
+ .toList());
+ }
+}
diff --git
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/StaticBrokerConfigTest.java
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/StaticBrokerConfigTest.java
index d880449d4ee..6f35f7ec307 100644
---
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/StaticBrokerConfigTest.java
+++
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/StaticBrokerConfigTest.java
@@ -155,7 +155,7 @@ public class StaticBrokerConfigTest {
// test for case ConfigResource.Type == GROUP
Config groupConfig = configResourceMap.get(groupResource);
- assertNotContainsAnyInternalConfig(groupConfig,
GroupConfig.configDef().configKeys());
+ assertNotContainsAnyInternalConfig(groupConfig,
GroupConfig.CONFIG_DEF.configKeys());
// test for case ConfigResource.Type == CLIENT_METRICS
Config clientMetricsConfig =
configResourceMap.get(clientMetricsResource);
diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala
b/core/src/main/scala/kafka/admin/ConfigCommand.scala
index 8cc46dadb06..0166a64e67f 100644
--- a/core/src/main/scala/kafka/admin/ConfigCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala
@@ -580,7 +580,7 @@ object ConfigCommand extends Logging {
"For entity-type '" + ClientType + "': " +
QuotaConfig.userAndClientQuotaConfigs().names.asScala.toSeq.sorted.map("\t" +
_).mkString(nl, nl, nl) +
"For entity-type '" + IpType + "': " +
QuotaConfig.ipConfigs.names.asScala.toSeq.sorted.map("\t" + _).mkString(nl, nl,
nl) +
"For entity-type '" + ClientMetricsType + "': " +
ClientMetricsConfigs.configDef().names.asScala.toSeq.sorted.map("\t" +
_).mkString(nl, nl, nl) +
- "For entity-type '" + GroupType + "': " +
GroupConfig.configDef().names.asScala.toSeq.sorted.map("\t" + _).mkString(nl,
nl, nl) +
+ "For entity-type '" + GroupType + "': " +
GroupConfig.CONFIG_DEF.names.asScala.toSeq.sorted.map("\t" + _).mkString(nl,
nl, nl) +
s"Entity types '$UserType' and '$ClientType' may be specified together
to update config for clients of a specific user.")
.withRequiredArg
.ofType(classOf[String])
diff --git a/core/src/main/scala/kafka/server/ConfigHelper.scala
b/core/src/main/scala/kafka/server/ConfigHelper.scala
index 5c32c14eb2d..2f7a86b4eda 100644
--- a/core/src/main/scala/kafka/server/ConfigHelper.scala
+++ b/core/src/main/scala/kafka/server/ConfigHelper.scala
@@ -133,7 +133,7 @@ class ConfigHelper(metadataCache: MetadataCache, config:
KafkaConfig, configRepo
throw new InvalidRequestException("Group name must not be empty")
} else {
val groupProps = configRepository.groupConfig(group)
- val groupConfig =
GroupConfig.fromProps(config.groupCoordinatorConfig.extractGroupConfigMap(config.shareGroupConfig),
groupProps)
+ val groupConfig =
GroupConfig.fromProps(config.extractGroupConfigMap, groupProps)
createResponseConfig(resource, groupConfig,
createGroupConfigEntry(groupConfig, groupProps, includeSynonyms,
includeDocumentation)(_, _))
}
@@ -161,15 +161,24 @@ class ConfigHelper(metadataCache: MetadataCache, config:
KafkaConfig, configRepo
private def createGroupConfigEntry(groupConfig: GroupConfig, groupProps:
Properties, includeSynonyms: Boolean, includeDocumentation: Boolean)
(name: String, value: Any):
DescribeConfigsResponseData.DescribeConfigsResourceResult = {
- val allNames = brokerSynonyms(name)
val configEntryType = GroupConfig.configType(name).toScala
val isSensitive = KafkaConfig.maybeSensitive(configEntryType)
val valueAsString = if (isSensitive) null else
ConfigDef.convertToString(value, configEntryType.orNull)
val allSynonyms = {
- val list = configSynonyms(name, allNames, isSensitive)
+ val list = GroupConfig.brokerSynonym(name).toScala match {
+ case Some(brokerName) =>
+ configSynonyms(brokerName, brokerSynonyms(brokerName), isSensitive)
+ case None =>
+ // No broker synonym, fall back to GroupConfig defaults
+ Option(GroupConfig.CONFIG_DEF.defaultValues().get(name))
+ .map(v => List(new
DescribeConfigsResponseData.DescribeConfigsSynonym()
+ .setName(name)
+ .setValue(if (isSensitive) null else
ConfigDef.convertToString(v, configEntryType.orNull))
+ .setSource(ConfigSource.DEFAULT_CONFIG.id)))
+ .getOrElse(List.empty)
+ }
if (!groupProps.containsKey(name))
- new
DescribeConfigsResponseData.DescribeConfigsSynonym().setName(name).setValue(valueAsString)
- .setSource(ConfigSource.DEFAULT_CONFIG.id) +: list
+ list
else
new
DescribeConfigsResponseData.DescribeConfigsSynonym().setName(name).setValue(valueAsString)
.setSource(ConfigSource.GROUP_CONFIG.id) +: list
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
index 8b1ce751ef0..0a6841ec324 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
@@ -157,7 +157,7 @@ public final class GroupConfig extends AbstractConfig {
public final boolean shareRenewAcknowledgeEnable;
- private static final ConfigDef CONFIG = new ConfigDef()
+ public static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(CONSUMER_SESSION_TIMEOUT_MS_CONFIG,
INT,
GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_DEFAULT,
@@ -281,8 +281,49 @@ public final class GroupConfig extends AbstractConfig {
MEDIUM,
GroupCoordinatorConfig.STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_DOC);
+ /**
+ * Mapping from GroupConfig name to its broker-level synonym config name.
+ * {@code Optional.empty()} indicates that the config has no broker-level
synonym.
+ */
+ public static final Map<String, Optional<String>>
ALL_GROUP_CONFIG_SYNONYMS = Map.ofEntries(
+ // Consumer group configs
+ Map.entry(CONSUMER_SESSION_TIMEOUT_MS_CONFIG,
Optional.of(GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG)),
+ Map.entry(CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG,
Optional.of(GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG)),
+ Map.entry(CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG,
Optional.of(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG)),
+ Map.entry(CONSUMER_ASSIGNOR_OFFLOAD_ENABLE_CONFIG,
Optional.of(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG)),
+
+ // Share group configs
+ Map.entry(SHARE_SESSION_TIMEOUT_MS_CONFIG,
Optional.of(GroupCoordinatorConfig.SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG)),
+ Map.entry(SHARE_HEARTBEAT_INTERVAL_MS_CONFIG,
Optional.of(GroupCoordinatorConfig.SHARE_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG)),
+ Map.entry(SHARE_RECORD_LOCK_DURATION_MS_CONFIG,
Optional.of(ShareGroupConfig.SHARE_GROUP_RECORD_LOCK_DURATION_MS_CONFIG)),
+ Map.entry(SHARE_DELIVERY_COUNT_LIMIT_CONFIG,
Optional.of(ShareGroupConfig.SHARE_GROUP_DELIVERY_COUNT_LIMIT_CONFIG)),
+ Map.entry(SHARE_PARTITION_MAX_RECORD_LOCKS_CONFIG,
Optional.of(ShareGroupConfig.SHARE_GROUP_PARTITION_MAX_RECORD_LOCKS_CONFIG)),
+ Map.entry(SHARE_AUTO_OFFSET_RESET_CONFIG, Optional.empty()),
+ Map.entry(SHARE_ISOLATION_LEVEL_CONFIG, Optional.empty()),
+ Map.entry(SHARE_RENEW_ACKNOWLEDGE_ENABLE_CONFIG, Optional.empty()),
+ Map.entry(SHARE_ASSIGNMENT_INTERVAL_MS_CONFIG,
Optional.of(GroupCoordinatorConfig.SHARE_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG)),
+ Map.entry(SHARE_ASSIGNOR_OFFLOAD_ENABLE_CONFIG,
Optional.of(GroupCoordinatorConfig.SHARE_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG)),
+
+ // Streams group configs
+ Map.entry(STREAMS_SESSION_TIMEOUT_MS_CONFIG,
Optional.of(GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG)),
+ Map.entry(STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG,
Optional.of(GroupCoordinatorConfig.STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG)),
+ Map.entry(STREAMS_NUM_STANDBY_REPLICAS_CONFIG,
Optional.of(GroupCoordinatorConfig.STREAMS_GROUP_NUM_STANDBY_REPLICAS_CONFIG)),
+ Map.entry(STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG,
Optional.of(GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG)),
+ Map.entry(STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG,
Optional.of(GroupCoordinatorConfig.STREAMS_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG)),
+ Map.entry(STREAMS_ASSIGNOR_OFFLOAD_ENABLE_CONFIG,
Optional.of(GroupCoordinatorConfig.STREAMS_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG)),
+ Map.entry(STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG,
Optional.of(GroupCoordinatorConfig.STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_CONFIG))
+ );
+
+ /**
+ * Returns the broker-level synonym config name for the given group config
name,
+ * or {@code Optional.empty()} if no broker-level synonym exists.
+ */
+ public static Optional<String> brokerSynonym(String groupConfigName) {
+ return ALL_GROUP_CONFIG_SYNONYMS.getOrDefault(groupConfigName,
Optional.empty());
+ }
+
public GroupConfig(Map<?, ?> props) {
- super(CONFIG, props, false);
+ super(CONFIG_DEF, props, false);
this.consumerSessionTimeoutMs =
getInt(CONSUMER_SESSION_TIMEOUT_MS_CONFIG);
this.consumerHeartbeatIntervalMs =
getInt(CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG);
// These have to be optionals because their default group coordinator
configs are dynamic,
@@ -327,16 +368,12 @@ public final class GroupConfig extends AbstractConfig {
this.shareRenewAcknowledgeEnable =
getBoolean(SHARE_RENEW_ACKNOWLEDGE_ENABLE_CONFIG);
}
- public static ConfigDef configDef() {
- return CONFIG;
- }
-
public static Optional<Type> configType(String configName) {
- return Optional.ofNullable(CONFIG.configKeys().get(configName)).map(c
-> c.type);
+ return
Optional.ofNullable(CONFIG_DEF.configKeys().get(configName)).map(c -> c.type);
}
public static Set<String> configNames() {
- return CONFIG.names();
+ return CONFIG_DEF.names();
}
/**
@@ -356,7 +393,7 @@ public final class GroupConfig extends AbstractConfig {
*/
@SuppressWarnings({"CyclomaticComplexity", "NPathComplexity"})
private static void validateValues(Map<String, Object> unparsedMap,
GroupCoordinatorConfig groupCoordinatorConfig, ShareGroupConfig
shareGroupConfig) {
- Map<String, Object> valueMaps = CONFIG.parse(unparsedMap);
+ Map<String, Object> valueMaps = CONFIG_DEF.parse(unparsedMap);
int consumerHeartbeatInterval = (Integer)
valueMaps.get(CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG);
int consumerSessionTimeout = (Integer)
valueMaps.get(CONSUMER_SESSION_TIMEOUT_MS_CONFIG);
int consumerAssignmentIntervalMs = (Integer)
valueMaps.get(CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG);
@@ -899,6 +936,6 @@ public final class GroupConfig extends AbstractConfig {
}
public static void main(String[] args) {
- System.out.println(CONFIG.toHtml(4, config -> "groupconfigs_" +
config));
+ System.out.println(CONFIG_DEF.toHtml(4, config -> "groupconfigs_" +
config));
}
}
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
index 139aa4ab3ce..d1468b1eba1 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
@@ -28,9 +28,11 @@ import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
+import java.util.Set;
import java.util.stream.Stream;
import static
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.CONSUMER_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_DEFAULT;
@@ -637,6 +639,34 @@ public class GroupConfigTest {
assertEquals(expectedMin, result.get(key));
}
+ @Test
+ public void testAllGroupConfigSynonyms() {
+ // Every GroupConfig entry should have an entry in
ALL_GROUP_CONFIG_SYNONYMS.
+ for (String groupConfigName : GroupConfig.CONFIG_DEF.names()) {
+
assertTrue(GroupConfig.ALL_GROUP_CONFIG_SYNONYMS.containsKey(groupConfigName),
+ "GroupConfig entry '" + groupConfigName + "' is not in
ALL_GROUP_CONFIG_SYNONYMS. " +
+ "Add it with Optional.of(brokerConfigName) or
Optional.empty() if it has no broker synonym.");
+ }
+
+ // Every key in ALL_GROUP_CONFIG_SYNONYMS should be a valid
GroupConfig entry.
+ for (String key : GroupConfig.ALL_GROUP_CONFIG_SYNONYMS.keySet()) {
+ assertTrue(GroupConfig.CONFIG_DEF.names().contains(key),
+ "ALL_GROUP_CONFIG_SYNONYMS contains '" + key + "' which is not
a valid GroupConfig entry.");
+ }
+
+ // Every present synonym mapping should point to a valid broker config.
+ Set<String> brokerConfigNames = new HashSet<>();
+ brokerConfigNames.addAll(GroupCoordinatorConfig.CONFIG_DEF.names());
+ brokerConfigNames.addAll(ShareGroupConfig.CONFIG_DEF.names());
+
+ for (Map.Entry<String, Optional<String>> entry :
GroupConfig.ALL_GROUP_CONFIG_SYNONYMS.entrySet()) {
+ entry.getValue().ifPresent(brokerConfigName ->
+ assertTrue(brokerConfigNames.contains(brokerConfigName),
+ "ALL_GROUP_CONFIG_SYNONYMS maps '" + entry.getKey() + "'
to '" +
+ brokerConfigName + "' but this broker config does not
exist."));
+ }
+ }
+
private Map<String, String> createValidGroupConfig() {
Map<String, String> props = new HashMap<>();
props.put(GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG, "45000");
diff --git
a/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java
b/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java
index 9baa67c1515..e1542923af1 100644
---
a/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java
+++
b/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java
@@ -26,6 +26,7 @@ import
org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.coordinator.group.GroupConfig;
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig;
import org.apache.kafka.coordinator.share.ShareCoordinatorConfig;
@@ -350,4 +351,12 @@ public abstract class AbstractKafkaConfig extends
AbstractConfig {
* @param reconfigurable the component to unregister
*/
public abstract void removeReconfigurable(Reconfigurable reconfigurable);
+
+ public Map<String, Object> extractGroupConfigMap() {
+ Map<String, Object> defaults = new HashMap<>();
+ GroupConfig.ALL_GROUP_CONFIG_SYNONYMS.forEach((groupConfigName,
brokerConfigName) ->
+ brokerConfigName.ifPresent(name -> defaults.put(groupConfigName,
get(name)))
+ );
+ return defaults;
+ }
}
diff --git
a/server/src/main/java/org/apache/kafka/server/config/DefaultSupportedConfigChecker.java
b/server/src/main/java/org/apache/kafka/server/config/DefaultSupportedConfigChecker.java
index 301d9e1d6bf..1c3976750dd 100644
---
a/server/src/main/java/org/apache/kafka/server/config/DefaultSupportedConfigChecker.java
+++
b/server/src/main/java/org/apache/kafka/server/config/DefaultSupportedConfigChecker.java
@@ -65,7 +65,7 @@ public final class DefaultSupportedConfigChecker implements
SupportedConfigCheck
ConfigResource.Type.TOPIC, new SetContainsPredicate(new
HashSet<>(LogConfig.configNames())),
ConfigResource.Type.BROKER, ignore -> true,
ConfigResource.Type.CLIENT_METRICS, new
SetContainsPredicate(ClientMetricsConfigs.configDef().names()),
- ConfigResource.Type.GROUP, new
SetContainsPredicate(GroupConfig.configDef().names())
+ ConfigResource.Type.GROUP, new
SetContainsPredicate(GroupConfig.CONFIG_DEF.names())
);
}
diff --git
a/tools/src/test/java/org/apache/kafka/tools/ConfigCommandIntegrationTest.java
b/tools/src/test/java/org/apache/kafka/tools/ConfigCommandIntegrationTest.java
index ccc356820d1..9cb8ef7f3b1 100644
---
a/tools/src/test/java/org/apache/kafka/tools/ConfigCommandIntegrationTest.java
+++
b/tools/src/test/java/org/apache/kafka/tools/ConfigCommandIntegrationTest.java
@@ -137,7 +137,7 @@ public class ConfigCommandIntegrationTest {
"--describe"));
message = captureStandardOut(run(command));
assertTrue(message.contains("Dynamic configs for group group are:"));
- assertTrue(message.contains("consumer.session.timeout.ms=50000
sensitive=false
synonyms={DYNAMIC_GROUP_CONFIG:consumer.session.timeout.ms=50000}"));
+ assertTrue(message.contains("consumer.session.timeout.ms=50000
sensitive=false
synonyms={DYNAMIC_GROUP_CONFIG:consumer.session.timeout.ms=50000,
DEFAULT_CONFIG:group.consumer.session.timeout.ms=45000}"));
command = Stream.concat(quorumArgs(), Stream.of(
"--entity-type", "groups",
@@ -145,7 +145,7 @@ public class ConfigCommandIntegrationTest {
"--describe"));
message = captureStandardOut(run(command));
assertTrue(message.contains("Dynamic configs for group group are:"));
- assertTrue(message.contains("consumer.session.timeout.ms=50000
sensitive=false
synonyms={DYNAMIC_GROUP_CONFIG:consumer.session.timeout.ms=50000}"));
+ assertTrue(message.contains("consumer.session.timeout.ms=50000
sensitive=false
synonyms={DYNAMIC_GROUP_CONFIG:consumer.session.timeout.ms=50000,
DEFAULT_CONFIG:group.consumer.session.timeout.ms=45000}"));
}
@ClusterTest(serverProperties = {
@@ -293,10 +293,10 @@ public class ConfigCommandIntegrationTest {
"--describe", "--all"));
String message = captureStandardOut(run(command));
- assertTrue(message.contains("streams.heartbeat.interval.ms=5000
sensitive=false synonyms={DEFAULT_CONFIG:streams.heartbeat.interval.ms=5000}"));
- assertTrue(message.contains("streams.num.standby.replicas=0
sensitive=false synonyms={DEFAULT_CONFIG:streams.num.standby.replicas=0}"));
- assertTrue(message.contains("streams.session.timeout.ms=45000
sensitive=false synonyms={DEFAULT_CONFIG:streams.session.timeout.ms=45000}"));
- assertTrue(message.contains("streams.task.offset.interval.ms=60000
sensitive=false
synonyms={DEFAULT_CONFIG:streams.task.offset.interval.ms=60000}"));
+ assertTrue(message.contains("streams.heartbeat.interval.ms=5000
sensitive=false
synonyms={DEFAULT_CONFIG:group.streams.heartbeat.interval.ms=5000}"));
+ assertTrue(message.contains("streams.num.standby.replicas=0
sensitive=false
synonyms={DEFAULT_CONFIG:group.streams.num.standby.replicas=0}"));
+ assertTrue(message.contains("streams.session.timeout.ms=45000
sensitive=false
synonyms={DEFAULT_CONFIG:group.streams.session.timeout.ms=45000}"));
+ assertTrue(message.contains("streams.task.offset.interval.ms=60000
sensitive=false
synonyms={DEFAULT_CONFIG:group.streams.task.offset.interval.ms=60000}"));
}
@ClusterTest