This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new c8e94644af9 MINOR: Update javadocs for GroupConfig and
GroupConfigManager (#21771)
c8e94644af9 is described below
commit c8e94644af9da9dd423a07fe12ea9b9463cbb2b3
Author: Sean Quah <[email protected]>
AuthorDate: Mon May 18 06:27:49 2026 +0100
MINOR: Update javadocs for GroupConfig and GroupConfigManager (#21771)
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../kafka/coordinator/group/GroupConfig.java | 175 +++++++++++++--------
.../coordinator/group/GroupConfigManager.java | 21 ++-
2 files changed, 132 insertions(+), 64 deletions(-)
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
index cc6508148cd..f74e7a0a5d3 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
@@ -346,6 +346,9 @@ public final class GroupConfig extends AbstractConfig {
/**
* Returns the broker-level synonym config name for the given group config
name,
* or {@code Optional.empty()} if no broker-level synonym exists.
+ *
+ * @param groupConfigName The group-level config name.
+ * @return The broker-level config name, or {@code Optional.empty()} if no
broker-level config exists.
*/
public static Optional<String> brokerSynonym(String groupConfigName) {
return ALL_GROUP_CONFIG_SYNONYMS.getOrDefault(groupConfigName,
Optional.empty());
@@ -403,10 +406,12 @@ public final class GroupConfig extends AbstractConfig {
/**
* Check that property names are valid.
+ *
+ * @param newGroupConfig The new group config overrides.
*/
- public static void validateNames(Map<String, ?> props) {
+ public static void validateNames(Map<String, ?> newGroupConfig) {
Set<String> names = configNames();
- for (String name : props.keySet()) {
+ for (String name : newGroupConfig.keySet()) {
if (!names.contains(name)) {
throw new InvalidConfigurationException("Unknown group config
name: " + name);
}
@@ -416,15 +421,19 @@ public final class GroupConfig extends AbstractConfig {
/**
* Check that the given properties contain only valid group config names
and that
* all values can be parsed and are valid.
+ *
+ * @param newGroupConfig The new unparsed group config overrides.
+ * @param groupCoordinatorConfig The group coordinator config.
+ * @param shareGroupConfig The share group config.
*/
public static void validate(
- Map<String, ?> props,
+ Map<String, ?> newGroupConfig,
GroupCoordinatorConfig groupCoordinatorConfig,
ShareGroupConfig shareGroupConfig
) {
- validateNames(props);
- var parsed = CONFIG_DEF.parse(props);
- parsed.keySet().retainAll(props.keySet());
+ validateNames(newGroupConfig);
+ var parsed = CONFIG_DEF.parse(newGroupConfig);
+ parsed.keySet().retainAll(newGroupConfig.keySet());
validateValues(
parsed,
groupCoordinatorConfig,
@@ -435,6 +444,10 @@ public final class GroupConfig extends AbstractConfig {
/**
* Validates the parsed values against broker-level bounds.
* Only configs explicitly present in the parsed map are validated.
+ *
+ * @param parsed The parsed group config overrides.
+ * @param groupCoordinatorConfig The group coordinator config.
+ * @param shareGroupConfig The share group config.
*/
private static void validateValues(
Map<String, Object> parsed,
@@ -569,6 +582,11 @@ public final class GroupConfig extends AbstractConfig {
/**
* Validates that an integer config value falls within [min, max].
* No-op when the key is absent from the parsed map.
+ *
+ * @param parsed The parsed group config overrides.
+ * @param key The config key.
+ * @param min The minimum allowed value (inclusive).
+ * @param max The maximum allowed value (inclusive).
*/
private static void validateIntRange(
Map<String, Object> parsed,
@@ -585,6 +603,10 @@ public final class GroupConfig extends AbstractConfig {
/**
* Validates that an integer config value does not exceed max.
* No-op when the key is absent from the parsed map.
+ *
+ * @param parsed The parsed group config overrides.
+ * @param key The config key.
+ * @param max The maximum allowed value (inclusive).
*/
private static void validateIntMax(
Map<String, Object> parsed,
@@ -600,6 +622,10 @@ public final class GroupConfig extends AbstractConfig {
/**
* Validates that an integer config value is at least min.
* No-op when the key is absent from the parsed map.
+ *
+ * @param parsed The parsed group config overrides.
+ * @param key The config key.
+ * @param min The minimum allowed value (inclusive).
*/
private static void validateIntMin(
Map<String, Object> parsed,
@@ -615,6 +641,12 @@ public final class GroupConfig extends AbstractConfig {
/**
* Validates that the session timeout is greater than the heartbeat
interval.
* Uses broker defaults for any config not present in the parsed map.
+ *
+ * @param parsed The parsed group config overrides.
+ * @param sessionKey The session timeout config key.
+ * @param defaultSession The default session timeout value when
there is no override.
+ * @param heartbeatKey The heartbeat interval config key.
+ * @param defaultHeartbeat The default heartbeat interval value when
there is no override.
*/
private static void validateSessionExceedsHeartbeat(
Map<String, Object> parsed,
@@ -637,52 +669,61 @@ public final class GroupConfig extends AbstractConfig {
* Evaluate group config values to their effective values within
broker-level bounds.
* Out-of-range values are capped and a WARN log is emitted.
*
- * @param props The raw group config properties.
+ * @param newGroupConfig The new unparsed group config overrides.
* @param groupId The group id.
* @param groupCoordinatorConfig The group coordinator config.
* @param shareGroupConfig The share group config.
- * @return A new Properties with out-of-range values capped.
+ * @return A new {@link Properties} with out-of-range values capped.
*/
public static Properties evaluate(
- Properties props,
+ Properties newGroupConfig,
String groupId,
GroupCoordinatorConfig groupCoordinatorConfig,
ShareGroupConfig shareGroupConfig
) {
- Properties effective = new Properties();
- effective.putAll(props);
+ Properties evaluatedGroupConfig = new Properties();
+ evaluatedGroupConfig.putAll(newGroupConfig);
evaluateValues(
- effective,
+ evaluatedGroupConfig,
groupId,
groupCoordinatorConfig,
shareGroupConfig
);
- return effective;
+ return evaluatedGroupConfig;
}
+ /**
+ * Evaluate group config values to their effective values within
broker-level bounds.
+ * Out-of-range values are capped and a WARN log is emitted.
+ *
+ * @param evaluatedGroupConfig The unparsed group config overrides to
modify in place.
+ * @param groupId The group id.
+ * @param groupCoordinatorConfig The group coordinator config.
+ * @param shareGroupConfig The share group config.
+ */
private static void evaluateValues(
- Properties props,
+ Properties evaluatedGroupConfig,
String groupId,
GroupCoordinatorConfig groupCoordinatorConfig,
ShareGroupConfig shareGroupConfig
) {
// Consumer group configs.
clampToRange(
- props,
+ evaluatedGroupConfig,
groupId,
CONSUMER_SESSION_TIMEOUT_MS_CONFIG,
groupCoordinatorConfig.consumerGroupMinSessionTimeoutMs(),
groupCoordinatorConfig.consumerGroupMaxSessionTimeoutMs()
);
clampToRange(
- props,
+ evaluatedGroupConfig,
groupId,
CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG,
groupCoordinatorConfig.consumerGroupMinHeartbeatIntervalMs(),
groupCoordinatorConfig.consumerGroupMaxHeartbeatIntervalMs()
);
clampToRange(
- props,
+ evaluatedGroupConfig,
groupId,
CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG,
groupCoordinatorConfig.consumerGroupMinAssignmentIntervalMs(),
@@ -691,42 +732,42 @@ public final class GroupConfig extends AbstractConfig {
// Share group configs.
clampToRange(
- props,
+ evaluatedGroupConfig,
groupId,
SHARE_SESSION_TIMEOUT_MS_CONFIG,
groupCoordinatorConfig.shareGroupMinSessionTimeoutMs(),
groupCoordinatorConfig.shareGroupMaxSessionTimeoutMs()
);
clampToRange(
- props,
+ evaluatedGroupConfig,
groupId,
SHARE_HEARTBEAT_INTERVAL_MS_CONFIG,
groupCoordinatorConfig.shareGroupMinHeartbeatIntervalMs(),
groupCoordinatorConfig.shareGroupMaxHeartbeatIntervalMs()
);
clampToRange(
- props,
+ evaluatedGroupConfig,
groupId,
SHARE_RECORD_LOCK_DURATION_MS_CONFIG,
shareGroupConfig.shareGroupMinRecordLockDurationMs(),
shareGroupConfig.shareGroupMaxRecordLockDurationMs()
);
clampToRange(
- props,
+ evaluatedGroupConfig,
groupId,
SHARE_DELIVERY_COUNT_LIMIT_CONFIG,
shareGroupConfig.shareGroupMinDeliveryCountLimit(),
shareGroupConfig.shareGroupMaxDeliveryCountLimit()
);
clampToRange(
- props,
+ evaluatedGroupConfig,
groupId,
SHARE_PARTITION_MAX_RECORD_LOCKS_CONFIG,
shareGroupConfig.shareGroupMinPartitionMaxRecordLocks(),
shareGroupConfig.shareGroupMaxPartitionMaxRecordLocks()
);
clampToRange(
- props,
+ evaluatedGroupConfig,
groupId,
SHARE_ASSIGNMENT_INTERVAL_MS_CONFIG,
groupCoordinatorConfig.shareGroupMinAssignmentIntervalMs(),
@@ -735,40 +776,40 @@ public final class GroupConfig extends AbstractConfig {
// Streams group configs.
clampToRange(
- props,
+ evaluatedGroupConfig,
groupId,
STREAMS_SESSION_TIMEOUT_MS_CONFIG,
groupCoordinatorConfig.streamsGroupMinSessionTimeoutMs(),
groupCoordinatorConfig.streamsGroupMaxSessionTimeoutMs()
);
clampToRange(
- props,
+ evaluatedGroupConfig,
groupId,
STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG,
groupCoordinatorConfig.streamsGroupMinHeartbeatIntervalMs(),
groupCoordinatorConfig.streamsGroupMaxHeartbeatIntervalMs()
);
clampToMax(
- props,
+ evaluatedGroupConfig,
groupId,
STREAMS_NUM_STANDBY_REPLICAS_CONFIG,
groupCoordinatorConfig.streamsGroupMaxNumStandbyReplicas()
);
clampToRange(
- props,
+ evaluatedGroupConfig,
groupId,
STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG,
groupCoordinatorConfig.streamsGroupMinAssignmentIntervalMs(),
groupCoordinatorConfig.streamsGroupMaxAssignmentIntervalMs()
);
clampToMin(
- props,
+ evaluatedGroupConfig,
groupId,
STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG,
groupCoordinatorConfig.streamsGroupMinTaskOffsetIntervalMs()
);
clampToMax(
- props,
+ evaluatedGroupConfig,
groupId,
STREAMS_NUM_WARMUP_REPLICAS_CONFIG,
groupCoordinatorConfig.streamsGroupMaxWarmupReplicas()
@@ -776,7 +817,7 @@ public final class GroupConfig extends AbstractConfig {
// Verify that clamping did not break the session > heartbeat
invariant.
checkSessionExceedsHeartbeat(
- props,
+ evaluatedGroupConfig,
groupId,
CONSUMER_SESSION_TIMEOUT_MS_CONFIG,
groupCoordinatorConfig.consumerGroupSessionTimeoutMs(),
@@ -784,7 +825,7 @@ public final class GroupConfig extends AbstractConfig {
groupCoordinatorConfig.consumerGroupHeartbeatIntervalMs()
);
checkSessionExceedsHeartbeat(
- props,
+ evaluatedGroupConfig,
groupId,
SHARE_SESSION_TIMEOUT_MS_CONFIG,
groupCoordinatorConfig.shareGroupSessionTimeoutMs(),
@@ -792,7 +833,7 @@ public final class GroupConfig extends AbstractConfig {
groupCoordinatorConfig.shareGroupHeartbeatIntervalMs()
);
checkSessionExceedsHeartbeat(
- props,
+ evaluatedGroupConfig,
groupId,
STREAMS_SESSION_TIMEOUT_MS_CONFIG,
groupCoordinatorConfig.streamsGroupSessionTimeoutMs(),
@@ -803,18 +844,25 @@ public final class GroupConfig extends AbstractConfig {
/**
* Log a WARN if the session timeout is not greater than the heartbeat
interval after
- * evaluation. When a key is absent from props, the broker-level default
is used.
+ * evaluation. When a key is absent from newGroupConfig, the broker-level
default is used.
+ *
+ * @param newGroupConfig The new unparsed group config overrides.
+ * @param groupId The group id.
+ * @param sessionKey The session timeout config key.
+ * @param defaultSession The default session timeout value when
there is no override.
+ * @param heartbeatKey The heartbeat interval config key.
+ * @param defaultHeartbeat The default heartbeat interval value when
there is no override.
*/
private static void checkSessionExceedsHeartbeat(
- Properties props,
+ Properties newGroupConfig,
String groupId,
String sessionKey,
int defaultSession,
String heartbeatKey,
int defaultHeartbeat
) {
- Object rawSession = props.get(sessionKey);
- Object rawHeartbeat = props.get(heartbeatKey);
+ Object rawSession = newGroupConfig.get(sessionKey);
+ Object rawHeartbeat = newGroupConfig.get(heartbeatKey);
if (rawSession == null && rawHeartbeat == null) return;
int session = rawSession != null ?
Integer.parseInt(rawSession.toString()) : defaultSession;
@@ -829,22 +877,22 @@ public final class GroupConfig extends AbstractConfig {
/**
* Clamp a config value to [min, max]. A WARN log is emitted on adjustment.
- * No-op when the key is absent from props.
+ * No-op when the key is absent from evaluatedGroupConfig.
*
- * @param props The properties to modify in place.
- * @param groupId The group id.
- * @param key The config key.
- * @param min The minimum allowed value (inclusive).
- * @param max The maximum allowed value (inclusive).
+ * @param evaluatedGroupConfig The unparsed group config overrides to
modify in place.
+ * @param groupId The group id.
+ * @param key The config key.
+ * @param min The minimum allowed value (inclusive).
+ * @param max The maximum allowed value (inclusive).
*/
private static void clampToRange(
- Properties props,
+ Properties evaluatedGroupConfig,
String groupId,
String key,
int min,
int max
) {
- Object rawValue = props.get(key);
+ Object rawValue = evaluatedGroupConfig.get(key);
if (rawValue == null) return;
int value = Integer.parseInt(rawValue.toString());
@@ -852,31 +900,31 @@ public final class GroupConfig extends AbstractConfig {
LOG.warn("The group config '{}' for group '{}' has value {} which
is below the broker's " +
"allowed minimum {}. The effective value will be capped to
{}.",
key, groupId, value, min, min);
- props.put(key, min);
+ evaluatedGroupConfig.put(key, min);
} else if (value > max) {
LOG.warn("The group config '{}' for group '{}' has value {} which
exceeds the broker's " +
"allowed maximum {}. The effective value will be capped to
{}.",
key, groupId, value, max, max);
- props.put(key, max);
+ evaluatedGroupConfig.put(key, max);
}
}
/**
* Clamp a config value to at most max. A WARN log is emitted on
adjustment.
- * No-op when the key is absent from props.
+ * No-op when the key is absent from evaluatedGroupConfig.
*
- * @param props The properties to modify in place.
- * @param groupId The group id.
- * @param key The config key.
- * @param max The maximum allowed value (inclusive).
+ * @param evaluatedGroupConfig The unparsed group config overrides to
modify in place.
+ * @param groupId The group id.
+ * @param key The config key.
+ * @param max The maximum allowed value (inclusive).
*/
private static void clampToMax(
- Properties props,
+ Properties evaluatedGroupConfig,
String groupId,
String key,
int max
) {
- Object rawValue = props.get(key);
+ Object rawValue = evaluatedGroupConfig.get(key);
if (rawValue == null) return;
int value = Integer.parseInt(rawValue.toString());
@@ -884,26 +932,26 @@ public final class GroupConfig extends AbstractConfig {
LOG.warn("The group config '{}' for group '{}' has value {} which
exceeds the broker's " +
"allowed maximum {}. The effective value will be capped to
{}.",
key, groupId, value, max, max);
- props.put(key, max);
+ evaluatedGroupConfig.put(key, max);
}
}
/**
* Clamp a config value to at least min. A WARN log is emitted on
adjustment.
- * No-op when the key is absent from props.
+ * No-op when the key is absent from evaluatedGroupConfig.
*
- * @param props The properties to modify in place.
- * @param groupId The group id.
- * @param key The config key.
- * @param min The minimum allowed value (inclusive).
+ * @param evaluatedGroupConfig The unparsed group config overrides to
modify in place.
+ * @param groupId The group id.
+ * @param key The config key.
+ * @param min The minimum allowed value (inclusive).
*/
private static void clampToMin(
- Properties props,
+ Properties evaluatedGroupConfig,
String groupId,
String key,
int min
) {
- Object rawValue = props.get(key);
+ Object rawValue = evaluatedGroupConfig.get(key);
if (rawValue == null) return;
int value = Integer.parseInt(rawValue.toString());
@@ -911,12 +959,15 @@ public final class GroupConfig extends AbstractConfig {
LOG.warn("The group config '{}' for group '{}' has value {} which
is below the broker's " +
"allowed minimum {}. The effective value will be capped to
{}.",
key, groupId, value, min, min);
- props.put(key, min);
+ evaluatedGroupConfig.put(key, min);
}
}
/**
* Create a group config instance using the given properties and defaults.
+ *
+ * @param defaults The full default group config values.
+ * @param overrides The group config overrides.
*/
public static GroupConfig fromProps(
Map<?, ?> defaults,
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfigManager.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfigManager.java
index 9335b9ea91e..22f9da8cbca 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfigManager.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfigManager.java
@@ -32,12 +32,29 @@ import java.util.concurrent.ConcurrentHashMap;
*/
public class GroupConfigManager implements AutoCloseable {
+ /**
+ * The group configs for each group.
+ *
+ * Groups are only present in this map when they have config overrides.
+ */
private final Map<String, GroupConfig> configMap;
+ /**
+ * The group coordinator config.
+ */
private final GroupCoordinatorConfig groupCoordinatorConfig;
+ /**
+ * The share group config.
+ */
private final ShareGroupConfig shareGroupConfig;
+ /**
+ * Constructor.
+ *
+ * @param groupCoordinatorConfig The group coordinator config.
+ * @param shareGroupConfig The share group config.
+ */
public GroupConfigManager(
GroupCoordinatorConfig groupCoordinatorConfig,
ShareGroupConfig shareGroupConfig
@@ -53,7 +70,7 @@ public class GroupConfigManager implements AutoCloseable {
* This method evaluates all configuration values within broker-level
bounds.
*
* @param groupId The group id.
- * @param newGroupConfig The new group config.
+ * @param newGroupConfig The new group config overrides.
*/
public void updateGroupConfig(String groupId, Properties newGroupConfig) {
if (null == groupId || groupId.isEmpty()) {
@@ -76,7 +93,7 @@ public class GroupConfigManager implements AutoCloseable {
}
/**
- * Get the group config if it exists, otherwise return None.
+ * Get the group config if it has any overrides, otherwise return {@link
Optional#empty()}.
* The returned config has already been evaluated within broker-level
bounds.
*
* @param groupId The group id.