This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new ba0738d69af KAFKA-20251: Add group-level configs for assignment
batching and offload (#21627)
ba0738d69af is described below
commit ba0738d69af06fee62be08281355204692f29deb
Author: Sean Quah <[email protected]>
AuthorDate: Thu Mar 19 14:44:53 2026 +0000
KAFKA-20251: Add group-level configs for assignment batching and offload
(#21627)
Add group-level {consumer,share,streams}.assignment.interval.ms config
options to control the delay between assignment calculation. These
config options override the dynamic broker-level configs.
Add group-level {consumer,share,streams}.assignor.offload.enable config
options to control whether assignment calculation is offloaded to a
group coordinator background thread. These config options override the
dynamic broker-level configs.
Since the broker-level configs for these group-level configs are
dynamic, we have to use a different approach compared to the existing
group-level configs. In the interests of getting these new configs into
Apache Kafka 4.3, we defer refactoring GroupConfig until a later patch.
Reviewers: majialong <[email protected]>, Christo Lolov
<[email protected]>, David Jacot <[email protected]>
---
.../scala/unit/kafka/server/KafkaApisTest.scala | 8 +-
.../kafka/coordinator/group/GroupConfig.java | 194 ++++++++++++++++++++-
.../coordinator/group/GroupMetadataManager.java | 24 ++-
.../kafka/coordinator/group/GroupConfigTest.java | 126 ++++++++++++-
.../group/GroupCoordinatorConfigTest.java | 16 ++
.../group/GroupMetadataManagerTest.java | 124 +++++++++++++
6 files changed, 480 insertions(+), 12 deletions(-)
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 663aaefe839..aaf5409e5f3 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -76,7 +76,7 @@ import org.apache.kafka.common.resource.{PatternType,
Resource, ResourcePattern,
import org.apache.kafka.common.security.auth.{KafkaPrincipal,
KafkaPrincipalSerde, SecurityProtocol}
import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource
import org.apache.kafka.common.utils.{ImplicitLinkedHashCollection,
ProducerIdAndEpoch, SecurityUtils, Utils}
-import
org.apache.kafka.coordinator.group.GroupConfig.{CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG,
CONSUMER_SESSION_TIMEOUT_MS_CONFIG, SHARE_AUTO_OFFSET_RESET_CONFIG,
SHARE_DELIVERY_COUNT_LIMIT_CONFIG, SHARE_HEARTBEAT_INTERVAL_MS_CONFIG,
SHARE_ISOLATION_LEVEL_CONFIG, SHARE_PARTITION_MAX_RECORD_LOCKS_CONFIG,
SHARE_RECORD_LOCK_DURATION_MS_CONFIG, SHARE_RENEW_ACKNOWLEDGE_ENABLE_CONFIG,
SHARE_SESSION_TIMEOUT_MS_CONFIG, STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG,
STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFI [...]
+import
org.apache.kafka.coordinator.group.GroupConfig.{CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG,
CONSUMER_ASSIGNOR_OFFLOAD_ENABLE_CONFIG,
CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, CONSUMER_SESSION_TIMEOUT_MS_CONFIG,
SHARE_ASSIGNMENT_INTERVAL_MS_CONFIG, SHARE_ASSIGNOR_OFFLOAD_ENABLE_CONFIG,
SHARE_AUTO_OFFSET_RESET_CONFIG, SHARE_DELIVERY_COUNT_LIMIT_CONFIG,
SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, SHARE_ISOLATION_LEVEL_CONFIG,
SHARE_PARTITION_MAX_RECORD_LOCKS_CONFIG, SHARE_RECORD_LOCK_DURATION_MS_CO [...]
import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig
import org.apache.kafka.coordinator.group.{GroupConfig, GroupConfigManager,
GroupCoordinator, GroupCoordinatorConfig}
import org.apache.kafka.coordinator.group.streams.StreamsGroupHeartbeatResult
@@ -356,6 +356,8 @@ class KafkaApisTest extends Logging {
val cgConfigs = new Properties()
cgConfigs.put(CONSUMER_SESSION_TIMEOUT_MS_CONFIG,
GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_DEFAULT.toString)
cgConfigs.put(CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG,
GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT.toString)
+ cgConfigs.put(CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG, "1000")
+ cgConfigs.put(CONSUMER_ASSIGNOR_OFFLOAD_ENABLE_CONFIG, "false");
cgConfigs.put(SHARE_SESSION_TIMEOUT_MS_CONFIG,
GroupCoordinatorConfig.SHARE_GROUP_SESSION_TIMEOUT_MS_DEFAULT.toString)
cgConfigs.put(SHARE_HEARTBEAT_INTERVAL_MS_CONFIG,
GroupCoordinatorConfig.SHARE_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT.toString)
cgConfigs.put(SHARE_RECORD_LOCK_DURATION_MS_CONFIG,
ShareGroupConfig.SHARE_GROUP_RECORD_LOCK_DURATION_MS_DEFAULT.toString)
@@ -364,10 +366,14 @@ class KafkaApisTest extends Logging {
cgConfigs.put(SHARE_AUTO_OFFSET_RESET_CONFIG,
GroupConfig.SHARE_AUTO_OFFSET_RESET_DEFAULT)
cgConfigs.put(SHARE_ISOLATION_LEVEL_CONFIG,
GroupConfig.SHARE_ISOLATION_LEVEL_DEFAULT)
cgConfigs.put(SHARE_RENEW_ACKNOWLEDGE_ENABLE_CONFIG,
GroupConfig.SHARE_RENEW_ACKNOWLEDGE_ENABLE_DEFAULT.toString)
+ cgConfigs.put(SHARE_ASSIGNMENT_INTERVAL_MS_CONFIG, "1000")
+ cgConfigs.put(SHARE_ASSIGNOR_OFFLOAD_ENABLE_CONFIG, "false");
cgConfigs.put(STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG,
GroupCoordinatorConfig.STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT.toString)
cgConfigs.put(STREAMS_SESSION_TIMEOUT_MS_CONFIG,
GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_DEFAULT.toString)
cgConfigs.put(STREAMS_NUM_STANDBY_REPLICAS_CONFIG,
GroupCoordinatorConfig.STREAMS_GROUP_NUM_STANDBY_REPLICAS_DEFAULT.toString)
cgConfigs.put(STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG,
GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_DEFAULT.toString)
+ cgConfigs.put(STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG, "1000")
+ cgConfigs.put(STREAMS_ASSIGNOR_OFFLOAD_ENABLE_CONFIG, "false");
cgConfigs.put(STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG,
GroupCoordinatorConfig.STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_DEFAULT.toString)
when(configRepository.groupConfig(consumerGroupId)).thenReturn(cgConfigs)
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
index 3d6ca00af45..8b1ce751ef0 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
@@ -53,6 +53,10 @@ public final class GroupConfig extends AbstractConfig {
public static final String CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG =
"consumer.heartbeat.interval.ms";
+ public static final String CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG =
"consumer.assignment.interval.ms";
+
+ public static final String CONSUMER_ASSIGNOR_OFFLOAD_ENABLE_CONFIG =
"consumer.assignor.offload.enable";
+
public static final String SHARE_SESSION_TIMEOUT_MS_CONFIG =
"share.session.timeout.ms";
public static final String SHARE_HEARTBEAT_INTERVAL_MS_CONFIG =
"share.heartbeat.interval.ms";
@@ -84,6 +88,10 @@ public final class GroupConfig extends AbstractConfig {
public static final boolean SHARE_RENEW_ACKNOWLEDGE_ENABLE_DEFAULT = true;
public static final String SHARE_RENEW_ACKNOWLEDGE_ENABLE_DOC = "Whether
the renew acknowledge type is enabled for the share group.";
+ public static final String SHARE_ASSIGNMENT_INTERVAL_MS_CONFIG =
"share.assignment.interval.ms";
+
+ public static final String SHARE_ASSIGNOR_OFFLOAD_ENABLE_CONFIG =
"share.assignor.offload.enable";
+
public static final String STREAMS_SESSION_TIMEOUT_MS_CONFIG =
"streams.session.timeout.ms";
public static final String STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG =
"streams.heartbeat.interval.ms";
@@ -92,12 +100,23 @@ public final class GroupConfig extends AbstractConfig {
public static final String STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG =
"streams.initial.rebalance.delay.ms";
+ public static final String STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG =
"streams.assignment.interval.ms";
+
+ public static final String STREAMS_ASSIGNOR_OFFLOAD_ENABLE_CONFIG =
"streams.assignor.offload.enable";
+
public static final String STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG =
"streams.task.offset.interval.ms";
public final int consumerSessionTimeoutMs;
public final int consumerHeartbeatIntervalMs;
+ // These have to be optionals because their default group coordinator
configs are dynamic,
+ // so we must not capture the default value at the time of GroupConfig
construction.
+ // KAFKA-20337 tracks the work to refactor GroupConfig into something more
consistent.
+ public final Optional<Integer> consumerAssignmentIntervalMs;
+
+ public final Optional<Boolean> consumerAssignorOffloadEnable;
+
public final int shareSessionTimeoutMs;
public final int shareHeartbeatIntervalMs;
@@ -110,6 +129,13 @@ public final class GroupConfig extends AbstractConfig {
public final String shareAutoOffsetReset;
+ // These have to be optionals because their default group coordinator
configs are dynamic,
+ // so we must not capture the default value at the time of GroupConfig
construction.
+ // KAFKA-20337 tracks the work to refactor GroupConfig into something more
consistent.
+ public final Optional<Integer> shareAssignmentIntervalMs;
+
+ public final Optional<Boolean> shareAssignorOffloadEnable;
+
public final int streamsSessionTimeoutMs;
public final int streamsHeartbeatIntervalMs;
@@ -118,6 +144,13 @@ public final class GroupConfig extends AbstractConfig {
public final int streamsInitialRebalanceDelayMs;
+ // These have to be optionals because their default group coordinator
configs are dynamic,
+ // so we must not capture the default value at the time of GroupConfig
construction.
+ // KAFKA-20337 tracks the work to refactor GroupConfig into something more
consistent.
+ public final Optional<Integer> streamsAssignmentIntervalMs;
+
+ public final Optional<Boolean> streamsAssignorOffloadEnable;
+
public final int streamsTaskOffsetIntervalMs;
public final String shareIsolationLevel;
@@ -137,6 +170,17 @@ public final class GroupConfig extends AbstractConfig {
atLeast(1),
MEDIUM,
GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_DOC)
+ .define(CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG,
+ INT,
+
GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_DEFAULT,
+ atLeast(0),
+ MEDIUM,
+ GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_DOC)
+ .define(CONSUMER_ASSIGNOR_OFFLOAD_ENABLE_CONFIG,
+ BOOLEAN,
+
GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNOR_OFFLOAD_ENABLE_DEFAULT,
+ MEDIUM,
+ GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNOR_OFFLOAD_ENABLE_DOC)
.define(SHARE_SESSION_TIMEOUT_MS_CONFIG,
INT,
GroupCoordinatorConfig.SHARE_GROUP_SESSION_TIMEOUT_MS_DEFAULT,
@@ -184,6 +228,17 @@ public final class GroupConfig extends AbstractConfig {
SHARE_RENEW_ACKNOWLEDGE_ENABLE_DEFAULT,
MEDIUM,
SHARE_RENEW_ACKNOWLEDGE_ENABLE_DOC)
+ .define(SHARE_ASSIGNMENT_INTERVAL_MS_CONFIG,
+ INT,
+ GroupCoordinatorConfig.SHARE_GROUP_ASSIGNMENT_INTERVAL_MS_DEFAULT,
+ atLeast(0),
+ MEDIUM,
+ GroupCoordinatorConfig.SHARE_GROUP_ASSIGNMENT_INTERVAL_MS_DOC)
+ .define(SHARE_ASSIGNOR_OFFLOAD_ENABLE_CONFIG,
+ BOOLEAN,
+ GroupCoordinatorConfig.SHARE_GROUP_ASSIGNOR_OFFLOAD_ENABLE_DEFAULT,
+ MEDIUM,
+ GroupCoordinatorConfig.SHARE_GROUP_ASSIGNOR_OFFLOAD_ENABLE_DOC)
.define(STREAMS_SESSION_TIMEOUT_MS_CONFIG,
INT,
GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_DEFAULT,
@@ -208,6 +263,17 @@ public final class GroupConfig extends AbstractConfig {
atLeast(0),
MEDIUM,
GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_DOC)
+ .define(STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG,
+ INT,
+
GroupCoordinatorConfig.STREAMS_GROUP_ASSIGNMENT_INTERVAL_MS_DEFAULT,
+ atLeast(0),
+ MEDIUM,
+ GroupCoordinatorConfig.STREAMS_GROUP_ASSIGNMENT_INTERVAL_MS_DOC)
+ .define(STREAMS_ASSIGNOR_OFFLOAD_ENABLE_CONFIG,
+ BOOLEAN,
+
GroupCoordinatorConfig.STREAMS_GROUP_ASSIGNOR_OFFLOAD_ENABLE_DEFAULT,
+ MEDIUM,
+ GroupCoordinatorConfig.STREAMS_GROUP_ASSIGNOR_OFFLOAD_ENABLE_DOC)
.define(STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG,
INT,
GroupCoordinatorConfig.STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_DEFAULT,
@@ -219,16 +285,43 @@ public final class GroupConfig extends AbstractConfig {
super(CONFIG, props, false);
this.consumerSessionTimeoutMs =
getInt(CONSUMER_SESSION_TIMEOUT_MS_CONFIG);
this.consumerHeartbeatIntervalMs =
getInt(CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG);
+ // These have to be optionals because their default group coordinator
configs are dynamic,
+ // so we must not capture the default value at the time of GroupConfig
construction.
+ // KAFKA-20337 tracks the work to refactor GroupConfig into something
more consistent.
+ this.consumerAssignmentIntervalMs =
props.containsKey(CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG) ?
+ Optional.of(getInt(CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG)) :
+ Optional.empty();
+ this.consumerAssignorOffloadEnable =
props.containsKey(CONSUMER_ASSIGNOR_OFFLOAD_ENABLE_CONFIG) ?
+ Optional.of(getBoolean(CONSUMER_ASSIGNOR_OFFLOAD_ENABLE_CONFIG)) :
+ Optional.empty();
this.shareSessionTimeoutMs = getInt(SHARE_SESSION_TIMEOUT_MS_CONFIG);
this.shareHeartbeatIntervalMs =
getInt(SHARE_HEARTBEAT_INTERVAL_MS_CONFIG);
this.shareRecordLockDurationMs =
getInt(SHARE_RECORD_LOCK_DURATION_MS_CONFIG);
this.shareDeliveryCountLimit =
getInt(SHARE_DELIVERY_COUNT_LIMIT_CONFIG);
this.sharePartitionMaxRecordLocks =
getInt(SHARE_PARTITION_MAX_RECORD_LOCKS_CONFIG);
this.shareAutoOffsetReset = getString(SHARE_AUTO_OFFSET_RESET_CONFIG);
+ // These have to be optionals because their default group coordinator
configs are dynamic,
+ // so we must not capture the default value at the time of GroupConfig
construction.
+ // KAFKA-20337 tracks the work to refactor GroupConfig into something
more consistent.
+ this.shareAssignmentIntervalMs =
props.containsKey(SHARE_ASSIGNMENT_INTERVAL_MS_CONFIG) ?
+ Optional.of(getInt(SHARE_ASSIGNMENT_INTERVAL_MS_CONFIG)) :
+ Optional.empty();
+ this.shareAssignorOffloadEnable =
props.containsKey(SHARE_ASSIGNOR_OFFLOAD_ENABLE_CONFIG) ?
+ Optional.of(getBoolean(SHARE_ASSIGNOR_OFFLOAD_ENABLE_CONFIG)) :
+ Optional.empty();
this.streamsSessionTimeoutMs =
getInt(STREAMS_SESSION_TIMEOUT_MS_CONFIG);
this.streamsHeartbeatIntervalMs =
getInt(STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG);
this.streamsNumStandbyReplicas =
getInt(STREAMS_NUM_STANDBY_REPLICAS_CONFIG);
this.streamsInitialRebalanceDelayMs =
getInt(STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG);
+ // These have to be optionals because their default group coordinator
configs are dynamic,
+ // so we must not capture the default value at the time of GroupConfig
construction.
+ // KAFKA-20337 tracks the work to refactor GroupConfig into something
more consistent.
+ this.streamsAssignmentIntervalMs =
props.containsKey(STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG) ?
+ Optional.of(getInt(STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG)) :
+ Optional.empty();
+ this.streamsAssignorOffloadEnable =
props.containsKey(STREAMS_ASSIGNOR_OFFLOAD_ENABLE_CONFIG) ?
+ Optional.of(getBoolean(STREAMS_ASSIGNOR_OFFLOAD_ENABLE_CONFIG)) :
+ Optional.empty();
this.streamsTaskOffsetIntervalMs =
getInt(STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG);
this.shareIsolationLevel = getString(SHARE_ISOLATION_LEVEL_CONFIG);
this.shareRenewAcknowledgeEnable =
getBoolean(SHARE_RENEW_ACKNOWLEDGE_ENABLE_CONFIG);
@@ -262,17 +355,21 @@ public final class GroupConfig extends AbstractConfig {
* Validates the values of the given properties.
*/
@SuppressWarnings({"CyclomaticComplexity", "NPathComplexity"})
- private static void validateValues(Map<String, ?> valueMaps,
GroupCoordinatorConfig groupCoordinatorConfig, ShareGroupConfig
shareGroupConfig) {
+ private static void validateValues(Map<String, Object> unparsedMap,
GroupCoordinatorConfig groupCoordinatorConfig, ShareGroupConfig
shareGroupConfig) {
+ Map<String, Object> valueMaps = CONFIG.parse(unparsedMap);
int consumerHeartbeatInterval = (Integer)
valueMaps.get(CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG);
int consumerSessionTimeout = (Integer)
valueMaps.get(CONSUMER_SESSION_TIMEOUT_MS_CONFIG);
+ int consumerAssignmentIntervalMs = (Integer)
valueMaps.get(CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG);
int shareHeartbeatInterval = (Integer)
valueMaps.get(SHARE_HEARTBEAT_INTERVAL_MS_CONFIG);
int shareSessionTimeout = (Integer)
valueMaps.get(SHARE_SESSION_TIMEOUT_MS_CONFIG);
int shareRecordLockDurationMs = (Integer)
valueMaps.get(SHARE_RECORD_LOCK_DURATION_MS_CONFIG);
int shareDeliveryCountLimit = (Integer)
valueMaps.get(SHARE_DELIVERY_COUNT_LIMIT_CONFIG);
int sharePartitionMaxRecordLocks = (Integer)
valueMaps.get(SHARE_PARTITION_MAX_RECORD_LOCKS_CONFIG);
+ int shareAssignmentIntervalMs = (Integer)
valueMaps.get(SHARE_ASSIGNMENT_INTERVAL_MS_CONFIG);
int streamsSessionTimeoutMs = (Integer)
valueMaps.get(STREAMS_SESSION_TIMEOUT_MS_CONFIG);
int streamsHeartbeatIntervalMs = (Integer)
valueMaps.get(STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG);
int streamsNumStandbyReplicas = (Integer)
valueMaps.get(STREAMS_NUM_STANDBY_REPLICAS_CONFIG);
+ int streamsAssignmentIntervalMs = (Integer)
valueMaps.get(STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG);
int streamsTaskOffsetIntervalMs = (Integer)
valueMaps.get(STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG);
if (consumerHeartbeatInterval <
groupCoordinatorConfig.consumerGroupMinHeartbeatIntervalMs()) {
throw new
InvalidConfigurationException(CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG + " must be
greater than or equal to " +
@@ -290,6 +387,19 @@ public final class GroupConfig extends AbstractConfig {
throw new
InvalidConfigurationException(CONSUMER_SESSION_TIMEOUT_MS_CONFIG + " must be
less than or equal to " +
GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG);
}
+ // If no group-level consumer assignment interval is configured, do
not attempt to validate it.
+ // TODO: It's not clear if we can run the validation unconditionally.
+ // KAFKA-20337 tracks the work to clean this up.
+ if (unparsedMap.containsKey(CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG)) {
+ if (consumerAssignmentIntervalMs <
groupCoordinatorConfig.consumerGroupMinAssignmentIntervalMs()) {
+ throw new
InvalidConfigurationException(CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG + " must
be greater than or equal to " +
+
GroupCoordinatorConfig.CONSUMER_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG);
+ }
+ if (consumerAssignmentIntervalMs >
groupCoordinatorConfig.consumerGroupMaxAssignmentIntervalMs()) {
+ throw new
InvalidConfigurationException(CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG + " must
be less than or equal to " +
+
GroupCoordinatorConfig.CONSUMER_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_CONFIG);
+ }
+ }
if (shareHeartbeatInterval <
groupCoordinatorConfig.shareGroupMinHeartbeatIntervalMs()) {
throw new
InvalidConfigurationException(SHARE_HEARTBEAT_INTERVAL_MS_CONFIG + " must be
greater than or equal to " +
GroupCoordinatorConfig.SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG);
@@ -330,6 +440,19 @@ public final class GroupConfig extends AbstractConfig {
throw new
InvalidConfigurationException(SHARE_PARTITION_MAX_RECORD_LOCKS_CONFIG + " must
be less than or equal to " +
ShareGroupConfig.SHARE_GROUP_MAX_PARTITION_MAX_RECORD_LOCKS_CONFIG);
}
+ // If no group-level share assignment interval is configured, do not
attempt to validate it.
+ // TODO: It's not clear if we can run the validation unconditionally.
+ // KAFKA-20337 tracks the work to clean this up.
+ if (unparsedMap.containsKey(SHARE_ASSIGNMENT_INTERVAL_MS_CONFIG)) {
+ if (shareAssignmentIntervalMs <
groupCoordinatorConfig.shareGroupMinAssignmentIntervalMs()) {
+ throw new
InvalidConfigurationException(SHARE_ASSIGNMENT_INTERVAL_MS_CONFIG + " must be
greater than or equal to " +
+
GroupCoordinatorConfig.SHARE_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG);
+ }
+ if (shareAssignmentIntervalMs >
groupCoordinatorConfig.shareGroupMaxAssignmentIntervalMs()) {
+ throw new
InvalidConfigurationException(SHARE_ASSIGNMENT_INTERVAL_MS_CONFIG + " must be
less than or equal to " +
+
GroupCoordinatorConfig.SHARE_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_CONFIG);
+ }
+ }
if (streamsHeartbeatIntervalMs <
groupCoordinatorConfig.streamsGroupMinHeartbeatIntervalMs()) {
throw new
InvalidConfigurationException(STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG + " must be
greater than or equal to " +
GroupCoordinatorConfig.STREAMS_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG);
@@ -350,6 +473,19 @@ public final class GroupConfig extends AbstractConfig {
throw new
InvalidConfigurationException(STREAMS_NUM_STANDBY_REPLICAS_CONFIG + " must be
less than or equal to " +
GroupCoordinatorConfig.STREAMS_GROUP_MAX_STANDBY_REPLICAS_CONFIG);
}
+ // If no group-level streams assignment interval is configured, do not
attempt to validate it.
+ // TODO: It's not clear if we can run the validation unconditionally.
+ // KAFKA-20337 tracks the work to clean this up.
+ if (unparsedMap.containsKey(STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG)) {
+ if (streamsAssignmentIntervalMs <
groupCoordinatorConfig.streamsGroupMinAssignmentIntervalMs()) {
+ throw new
InvalidConfigurationException(STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG + " must be
greater than or equal to " +
+
GroupCoordinatorConfig.STREAMS_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG);
+ }
+ if (streamsAssignmentIntervalMs >
groupCoordinatorConfig.streamsGroupMaxAssignmentIntervalMs()) {
+ throw new
InvalidConfigurationException(STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG + " must be
less than or equal to " +
+
GroupCoordinatorConfig.STREAMS_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_CONFIG);
+ }
+ }
if (streamsTaskOffsetIntervalMs <
groupCoordinatorConfig.streamsGroupMinTaskOffsetIntervalMs()) {
throw new
InvalidConfigurationException(STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG + " must
be greater than or equal to " +
GroupCoordinatorConfig.STREAMS_GROUP_MIN_TASK_OFFSET_INTERVAL_MS_CONFIG);
@@ -374,13 +510,14 @@ public final class GroupConfig extends AbstractConfig {
* the broker-level defaults before validation.
*/
public static void validate(Map<String, ?> props, GroupCoordinatorConfig
groupCoordinatorConfig, ShareGroupConfig shareGroupConfig) {
+ // TODO: We shouldn't be re-deriving the default config from
GroupConfigManager here.
+ // KAFKA-20337 tracks the work to clean this up.
Map<String, Object> combinedConfigs = new HashMap<>();
combinedConfigs.putAll(groupCoordinatorConfig.extractGroupConfigMap(shareGroupConfig));
combinedConfigs.putAll(props);
validateNames(combinedConfigs);
- Map<String, Object> valueMaps = CONFIG.parse(combinedConfigs);
- validateValues(valueMaps, groupCoordinatorConfig, shareGroupConfig);
+ validateValues(combinedConfigs, groupCoordinatorConfig,
shareGroupConfig);
}
/**
@@ -418,6 +555,9 @@ public final class GroupConfig extends AbstractConfig {
clampToRange(props, groupId, CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG,
groupCoordinatorConfig.consumerGroupMinHeartbeatIntervalMs(),
groupCoordinatorConfig.consumerGroupMaxHeartbeatIntervalMs());
+ clampToRange(props, groupId, CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG,
+ groupCoordinatorConfig.consumerGroupMinAssignmentIntervalMs(),
+ groupCoordinatorConfig.consumerGroupMaxAssignmentIntervalMs());
// Share group configs
clampToRange(props, groupId, SHARE_SESSION_TIMEOUT_MS_CONFIG,
@@ -435,6 +575,9 @@ public final class GroupConfig extends AbstractConfig {
clampToRange(props, groupId, SHARE_PARTITION_MAX_RECORD_LOCKS_CONFIG,
shareGroupConfig.shareGroupMinPartitionMaxRecordLocks(),
shareGroupConfig.shareGroupMaxPartitionMaxRecordLocks());
+ clampToRange(props, groupId, SHARE_ASSIGNMENT_INTERVAL_MS_CONFIG,
+ groupCoordinatorConfig.shareGroupMinAssignmentIntervalMs(),
+ groupCoordinatorConfig.shareGroupMaxAssignmentIntervalMs());
// Streams group configs
clampToRange(props, groupId, STREAMS_SESSION_TIMEOUT_MS_CONFIG,
@@ -445,6 +588,9 @@ public final class GroupConfig extends AbstractConfig {
groupCoordinatorConfig.streamsGroupMaxHeartbeatIntervalMs());
clampToMax(props, groupId, STREAMS_NUM_STANDBY_REPLICAS_CONFIG,
groupCoordinatorConfig.streamsGroupMaxNumStandbyReplicas());
+ clampToRange(props, groupId, STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG,
+ groupCoordinatorConfig.streamsGroupMinAssignmentIntervalMs(),
+ groupCoordinatorConfig.streamsGroupMaxAssignmentIntervalMs());
clampToMin(props, groupId, STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG,
groupCoordinatorConfig.streamsGroupMinTaskOffsetIntervalMs());
@@ -612,6 +758,20 @@ public final class GroupConfig extends AbstractConfig {
return consumerHeartbeatIntervalMs;
}
+ /**
+ * The interval between assignment updates for a consumer group.
+ */
+ public Optional<Integer> consumerAssignmentIntervalMs() {
+ return consumerAssignmentIntervalMs;
+ }
+
+ /**
+ * Whether to offload consumer group assignment to a group coordinator
background thread.
+ */
+ public Optional<Boolean> consumerAssignorOffloadEnable() {
+ return consumerAssignorOffloadEnable;
+ }
+
/**
* The share group session timeout in milliseconds.
*/
@@ -654,6 +814,20 @@ public final class GroupConfig extends AbstractConfig {
return
ShareGroupAutoOffsetResetStrategy.fromString(shareAutoOffsetReset);
}
+ /**
+ * The interval between assignment updates for a share group.
+ */
+ public Optional<Integer> shareAssignmentIntervalMs() {
+ return shareAssignmentIntervalMs;
+ }
+
+ /**
+ * Whether to offload share group assignment to a group coordinator
background thread.
+ */
+ public Optional<Boolean> shareAssignorOffloadEnable() {
+ return shareAssignorOffloadEnable;
+ }
+
/**
* The streams group session timeout in milliseconds.
*/
@@ -682,6 +856,20 @@ public final class GroupConfig extends AbstractConfig {
return streamsInitialRebalanceDelayMs;
}
+ /**
+ * The interval between assignment updates for a streams group.
+ */
+ public Optional<Integer> streamsAssignmentIntervalMs() {
+ return streamsAssignmentIntervalMs;
+ }
+
+ /**
+ * Whether to offload streams group assignment to a group coordinator
background thread.
+ */
+ public Optional<Boolean> streamsAssignorOffloadEnable() {
+ return streamsAssignorOffloadEnable;
+ }
+
/**
* The task offset reporting interval.
*/
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index 27fa34ccce5..09714ab28b8 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -8776,7 +8776,9 @@ public class GroupMetadataManager {
*/
// package private for testing
int consumerGroupAssignmentIntervalMs(String groupId) {
- return config.consumerGroupAssignmentIntervalMs();
+ Optional<GroupConfig> groupConfig =
groupConfigManager.groupConfig(groupId);
+ return groupConfig.flatMap(GroupConfig::consumerAssignmentIntervalMs)
+ .orElse(config.consumerGroupAssignmentIntervalMs());
}
/**
@@ -8784,7 +8786,9 @@ public class GroupMetadataManager {
*/
// package private for testing
boolean consumerGroupAssignorOffloadEnable(String groupId) {
- return config.consumerGroupAssignorOffloadEnable();
+ Optional<GroupConfig> groupConfig =
groupConfigManager.groupConfig(groupId);
+ return groupConfig.flatMap(GroupConfig::consumerAssignorOffloadEnable)
+ .orElse(config.consumerGroupAssignorOffloadEnable());
}
/**
@@ -8810,7 +8814,9 @@ public class GroupMetadataManager {
*/
// package private for testing
int shareGroupAssignmentIntervalMs(String groupId) {
- return config.shareGroupAssignmentIntervalMs();
+ Optional<GroupConfig> groupConfig =
groupConfigManager.groupConfig(groupId);
+ return groupConfig.flatMap(GroupConfig::shareAssignmentIntervalMs)
+ .orElse(config.shareGroupAssignmentIntervalMs());
}
/**
@@ -8818,7 +8824,9 @@ public class GroupMetadataManager {
*/
// package private for testing
boolean shareGroupAssignorOffloadEnable(String groupId) {
- return config.shareGroupAssignorOffloadEnable();
+ Optional<GroupConfig> groupConfig =
groupConfigManager.groupConfig(groupId);
+ return groupConfig.flatMap(GroupConfig::shareAssignorOffloadEnable)
+ .orElse(config.shareGroupAssignorOffloadEnable());
}
/**
@@ -8844,7 +8852,9 @@ public class GroupMetadataManager {
*/
// package private for testing
int streamsGroupAssignmentIntervalMs(String groupId) {
- return config.streamsGroupAssignmentIntervalMs();
+ Optional<GroupConfig> groupConfig =
groupConfigManager.groupConfig(groupId);
+ return groupConfig.flatMap(GroupConfig::streamsAssignmentIntervalMs)
+ .orElse(config.streamsGroupAssignmentIntervalMs());
}
/**
@@ -8852,7 +8862,9 @@ public class GroupMetadataManager {
*/
// package private for testing
boolean streamsGroupAssignorOffloadEnable(String groupId) {
- return config.streamsGroupAssignorOffloadEnable();
+ Optional<GroupConfig> groupConfig =
groupConfigManager.groupConfig(groupId);
+ return groupConfig.flatMap(GroupConfig::streamsAssignorOffloadEnable)
+ .orElse(config.streamsGroupAssignorOffloadEnable());
}
/**
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
index 8c0772abb9f..139aa4ab3ce 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
@@ -29,17 +29,21 @@ import org.junit.jupiter.params.provider.MethodSource;
import java.util.HashMap;
import java.util.Map;
+import java.util.Optional;
import java.util.Properties;
import java.util.stream.Stream;
+import static
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.CONSUMER_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_DEFAULT;
import static
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DEFAULT;
import static
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT;
import static
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_DEFAULT;
import static
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_DEFAULT;
+import static
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.SHARE_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_DEFAULT;
import static
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DEFAULT;
import static
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.SHARE_GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT;
import static
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_DEFAULT;
import static
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.SHARE_GROUP_MIN_SESSION_TIMEOUT_MS_DEFAULT;
+import static
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.STREAMS_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_DEFAULT;
import static
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DEFAULT;
import static
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.STREAMS_GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT;
import static
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.STREAMS_GROUP_MAX_STANDBY_REPLICAS_DEFAULT;
@@ -74,12 +78,17 @@ public class GroupConfigTest {
private static final int SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS = 60000;
@Test
+ @SuppressWarnings("CyclomaticComplexity")
public void testFromPropsInvalid() {
GroupConfig.configNames().forEach(name -> {
if (GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG.equals(name)) {
assertPropertyInvalid(name, "not_a_number", "-0.1", "1.2");
} else if
(GroupConfig.CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG.equals(name)) {
assertPropertyInvalid(name, "not_a_number", "-0.1", "1.2");
+ } else if
(GroupConfig.CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG.equals(name)) {
+ assertPropertyInvalid(name, "not_a_number", "-1", "1.2");
+ } else if
(GroupConfig.CONSUMER_ASSIGNOR_OFFLOAD_ENABLE_CONFIG.equals(name)) {
+ assertPropertyInvalid(name, "not_a_boolean");
} else if
(GroupConfig.SHARE_SESSION_TIMEOUT_MS_CONFIG.equals(name)) {
assertPropertyInvalid(name, "not_a_number", "-0.1", "1.2");
} else if
(GroupConfig.SHARE_HEARTBEAT_INTERVAL_MS_CONFIG.equals(name)) {
@@ -96,6 +105,10 @@ public class GroupConfigTest {
assertPropertyInvalid(name, "hello", "1.0");
} else if
(GroupConfig.SHARE_RENEW_ACKNOWLEDGE_ENABLE_CONFIG.equals(name)) {
assertPropertyInvalid(name, "not_a_boolean", "1");
+ } else if
(GroupConfig.SHARE_ASSIGNMENT_INTERVAL_MS_CONFIG.equals(name)) {
+ assertPropertyInvalid(name, "not_a_number", "-1", "1.2");
+ } else if
(GroupConfig.SHARE_ASSIGNOR_OFFLOAD_ENABLE_CONFIG.equals(name)) {
+ assertPropertyInvalid(name, "not_a_boolean");
} else if
(GroupConfig.STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG.equals(name)) {
assertPropertyInvalid(name, "not_a_number", "1.0");
} else if
(GroupConfig.STREAMS_NUM_STANDBY_REPLICAS_CONFIG.equals(name)) {
@@ -104,6 +117,10 @@ public class GroupConfigTest {
assertPropertyInvalid(name, "not_a_number", "1.0");
} else if
(GroupConfig.STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG.equals(name)) {
assertPropertyInvalid(name, "not_a_number", "-1", "1.0");
+ } else if
(GroupConfig.STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG.equals(name)) {
+ assertPropertyInvalid(name, "not_a_number", "-1", "1.2");
+ } else if
(GroupConfig.STREAMS_ASSIGNOR_OFFLOAD_ENABLE_CONFIG.equals(name)) {
+ assertPropertyInvalid(name, "not_a_boolean");
} else if
(GroupConfig.STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG.equals(name)) {
assertPropertyInvalid(name, "not_a_number", "1.0");
} else {
@@ -177,6 +194,16 @@ public class GroupConfigTest {
doTestInvalidProps(props, InvalidConfigurationException.class);
props = createValidGroupConfig();
+ // Check for invalid consumerAssignmentIntervalMs, < MIN
+ props.put(GroupConfig.CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG, "500");
+ doTestInvalidProps(props, InvalidConfigurationException.class);
+ props = createValidGroupConfig();
+
+ // Check for invalid consumerAssignmentIntervalMs, > MAX
+ props.put(GroupConfig.CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG, "20000");
+ doTestInvalidProps(props, InvalidConfigurationException.class);
+ props = createValidGroupConfig();
+
// Check for invalid shareSessionTimeoutMs, < MIN
props.put(GroupConfig.SHARE_SESSION_TIMEOUT_MS_CONFIG, "1");
doTestInvalidProps(props, InvalidConfigurationException.class);
@@ -244,6 +271,16 @@ public class GroupConfigTest {
doTestInvalidProps(props, ConfigException.class);
props = createValidGroupConfig();
+ // Check for invalid shareAssignmentIntervalMs, < MIN
+ props.put(GroupConfig.SHARE_ASSIGNMENT_INTERVAL_MS_CONFIG, "500");
+ doTestInvalidProps(props, InvalidConfigurationException.class);
+ props = createValidGroupConfig();
+
+ // Check for invalid shareAssignmentIntervalMs, > MAX
+ props.put(GroupConfig.SHARE_ASSIGNMENT_INTERVAL_MS_CONFIG, "20000");
+ doTestInvalidProps(props, InvalidConfigurationException.class);
+ props = createValidGroupConfig();
+
// Check for invalid streamsSessionTimeoutMs, < MIN
props.put(GroupConfig.STREAMS_SESSION_TIMEOUT_MS_CONFIG, "1");
doTestInvalidProps(props, InvalidConfigurationException.class);
@@ -264,6 +301,16 @@ public class GroupConfigTest {
doTestInvalidProps(props, InvalidConfigurationException.class);
props = createValidGroupConfig();
+ // Check for invalid streamsAssignmentIntervalMs, < MIN
+ props.put(GroupConfig.STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG, "500");
+ doTestInvalidProps(props, InvalidConfigurationException.class);
+ props = createValidGroupConfig();
+
+ // Check for invalid streamsAssignmentIntervalMs, > MAX
+ props.put(GroupConfig.STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG, "20000");
+ doTestInvalidProps(props, InvalidConfigurationException.class);
+ props = createValidGroupConfig();
+
// Check for invalid streamsTaskOffsetIntervalMs, < MIN
props.put(GroupConfig.STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG, "1000");
doTestInvalidProps(props, InvalidConfigurationException.class);
@@ -297,6 +344,8 @@ public class GroupConfigTest {
Map<String, String> defaultValue = new HashMap<>();
defaultValue.put(GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG, "10");
defaultValue.put(GroupConfig.CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG,
"10");
+ defaultValue.put(GroupConfig.CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG,
"5000");
+ defaultValue.put(GroupConfig.CONSUMER_ASSIGNOR_OFFLOAD_ENABLE_CONFIG,
"false");
defaultValue.put(GroupConfig.SHARE_SESSION_TIMEOUT_MS_CONFIG, "10");
defaultValue.put(GroupConfig.SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, "10");
defaultValue.put(GroupConfig.SHARE_RECORD_LOCK_DURATION_MS_CONFIG,
"2000");
@@ -304,10 +353,14 @@ public class GroupConfigTest {
defaultValue.put(GroupConfig.SHARE_PARTITION_MAX_RECORD_LOCKS_CONFIG,
"500");
defaultValue.put(GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG, "latest");
defaultValue.put(GroupConfig.SHARE_ISOLATION_LEVEL_CONFIG,
"read_uncommitted");
+ defaultValue.put(GroupConfig.SHARE_ASSIGNMENT_INTERVAL_MS_CONFIG,
"2500");
+ defaultValue.put(GroupConfig.SHARE_ASSIGNOR_OFFLOAD_ENABLE_CONFIG,
"false");
defaultValue.put(GroupConfig.STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG,
"10");
defaultValue.put(GroupConfig.STREAMS_SESSION_TIMEOUT_MS_CONFIG,
"2000");
defaultValue.put(GroupConfig.STREAMS_NUM_STANDBY_REPLICAS_CONFIG, "1");
defaultValue.put(GroupConfig.STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG, "3000");
+ defaultValue.put(GroupConfig.STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG,
"1250");
+ defaultValue.put(GroupConfig.STREAMS_ASSIGNOR_OFFLOAD_ENABLE_CONFIG,
"false");
defaultValue.put(GroupConfig.STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG,
"30000");
defaultValue.put(GroupConfig.SHARE_RENEW_ACKNOWLEDGE_ENABLE_CONFIG,
"true");
@@ -317,6 +370,8 @@ public class GroupConfigTest {
assertEquals(10,
config.getInt(GroupConfig.CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG));
assertEquals(20,
config.getInt(GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG));
+ assertEquals(5000,
config.getInt(GroupConfig.CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG));
+ assertEquals(false,
config.getBoolean(GroupConfig.CONSUMER_ASSIGNOR_OFFLOAD_ENABLE_CONFIG));
assertEquals(10,
config.getInt(GroupConfig.SHARE_HEARTBEAT_INTERVAL_MS_CONFIG));
assertEquals(10,
config.getInt(GroupConfig.SHARE_SESSION_TIMEOUT_MS_CONFIG));
assertEquals(2000,
config.getInt(GroupConfig.SHARE_RECORD_LOCK_DURATION_MS_CONFIG));
@@ -324,14 +379,52 @@ public class GroupConfigTest {
assertEquals(500,
config.getInt(GroupConfig.SHARE_PARTITION_MAX_RECORD_LOCKS_CONFIG));
assertEquals("latest",
config.getString(GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG));
assertEquals("read_uncommitted",
config.getString(GroupConfig.SHARE_ISOLATION_LEVEL_CONFIG));
+ assertEquals(2500,
config.getInt(GroupConfig.SHARE_ASSIGNMENT_INTERVAL_MS_CONFIG));
+ assertEquals(false,
config.getBoolean(GroupConfig.SHARE_ASSIGNOR_OFFLOAD_ENABLE_CONFIG));
assertEquals(10,
config.getInt(GroupConfig.STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG));
assertEquals(2000,
config.getInt(GroupConfig.STREAMS_SESSION_TIMEOUT_MS_CONFIG));
assertEquals(1,
config.getInt(GroupConfig.STREAMS_NUM_STANDBY_REPLICAS_CONFIG));
assertEquals(3000,
config.getInt(GroupConfig.STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG));
+ assertEquals(1250,
config.getInt(GroupConfig.STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG));
+ assertEquals(false,
config.getBoolean(GroupConfig.STREAMS_ASSIGNOR_OFFLOAD_ENABLE_CONFIG));
assertEquals(30000,
config.getInt(GroupConfig.STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG));
assertEquals(true,
config.getBoolean(GroupConfig.SHARE_RENEW_ACKNOWLEDGE_ENABLE_CONFIG));
}
+ @Test
+ public void testAssignmentIntervalMsAbsentWhenNotConfigured() {
+ // When the assignment interval config is absent, the group-level
value is empty.
+ Properties props = new Properties();
+ GroupConfig config = GroupConfig.fromProps(Map.of(), props);
+ assertEquals(Optional.empty(), config.consumerAssignmentIntervalMs());
+ assertEquals(Optional.empty(), config.shareAssignmentIntervalMs());
+ assertEquals(Optional.empty(), config.streamsAssignmentIntervalMs());
+ }
+
+ @Test
+ public void testAssignmentIntervalMsNotValidatedWhenNotConfigured() {
+ // When the assignment interval config is absent, validation should
not use the default assignment interval.
+ GroupCoordinatorConfig groupCoordinatorConfig =
GroupCoordinatorConfig.fromProps(Map.of(
+
GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, 2000,
+ GroupCoordinatorConfig.SHARE_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG,
2000,
+
GroupCoordinatorConfig.STREAMS_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, 2000,
+
GroupCoordinatorConfig.CONSUMER_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG, 2000,
+
GroupCoordinatorConfig.SHARE_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG, 2000,
+
GroupCoordinatorConfig.STREAMS_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG, 2000
+ ));
+ assertDoesNotThrow(() -> GroupConfig.validate(Map.of(),
groupCoordinatorConfig, createShareGroupConfig()));
+ }
+
+ @Test
+ public void testAssignorOffloadEnableAbsentWhenNotConfigured() {
+ // When the offload enable config is absent, the group-level value is
empty.
+ Properties props = new Properties();
+ GroupConfig config = GroupConfig.fromProps(Map.of(), props);
+ assertEquals(Optional.empty(), config.consumerAssignorOffloadEnable());
+ assertEquals(Optional.empty(), config.shareAssignorOffloadEnable());
+ assertEquals(Optional.empty(), config.streamsAssignorOffloadEnable());
+ }
+
@Test
public void testInvalidConfigName() {
Map<String, String> props = new HashMap<>();
@@ -398,6 +491,11 @@ public class GroupConfigTest {
3000, CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_DEFAULT,
20000, CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DEFAULT
),
+ Arguments.of(
+ GroupConfig.CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG,
+ 0, /* CONSUMER_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG = */
500,
+ 20000, CONSUMER_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_DEFAULT
+ ),
// Share group configs
Arguments.of(
GroupConfig.SHARE_SESSION_TIMEOUT_MS_CONFIG,
@@ -424,6 +522,11 @@ public class GroupConfigTest {
50, SHARE_GROUP_MIN_PARTITION_MAX_RECORD_LOCKS_DEFAULT,
5000, SHARE_GROUP_MAX_PARTITION_MAX_RECORD_LOCKS_DEFAULT
),
+ Arguments.of(
+ GroupConfig.SHARE_ASSIGNMENT_INTERVAL_MS_CONFIG,
+ 0, /* SHARE_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG = */ 500,
+ 20000, SHARE_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_DEFAULT
+ ),
// Streams group configs
Arguments.of(
GroupConfig.STREAMS_SESSION_TIMEOUT_MS_CONFIG,
@@ -434,6 +537,11 @@ public class GroupConfigTest {
GroupConfig.STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG,
3000, STREAMS_GROUP_MIN_HEARTBEAT_INTERVAL_MS_DEFAULT,
20000, STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DEFAULT
+ ),
+ Arguments.of(
+ GroupConfig.STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG,
+ 0, /* STREAMS_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG = */ 500,
+ 20000, STREAMS_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_DEFAULT
)
);
}
@@ -492,7 +600,12 @@ public class GroupConfigTest {
Properties props = new Properties();
props.put(key, tooLow);
Properties result = GroupConfig.evaluate(props, "test-group",
- GroupCoordinatorConfig.fromProps(new HashMap<>()),
ShareGroupConfig.fromProps(new HashMap<>()));
+ GroupCoordinatorConfig.fromProps(Map.of(
+
GroupCoordinatorConfig.CONSUMER_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG, 500,
+
GroupCoordinatorConfig.SHARE_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG, 500,
+
GroupCoordinatorConfig.STREAMS_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG, 500
+ )),
+ ShareGroupConfig.fromProps(Map.of()));
assertEquals(expectedMin, result.get(key));
}
@@ -545,7 +658,16 @@ public class GroupConfigTest {
}
private GroupCoordinatorConfig createGroupCoordinatorConfig() {
- return
GroupCoordinatorConfigTest.createGroupCoordinatorConfig(OFFSET_METADATA_MAX_SIZE,
OFFSETS_RETENTION_CHECK_INTERVAL_MS, OFFSETS_RETENTION_MINUTES);
+ return GroupCoordinatorConfigTest.createGroupCoordinatorConfig(
+ OFFSET_METADATA_MAX_SIZE,
+ OFFSETS_RETENTION_CHECK_INTERVAL_MS,
+ OFFSETS_RETENTION_MINUTES,
+ Map.of(
+
GroupCoordinatorConfig.CONSUMER_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG, 1000,
+
GroupCoordinatorConfig.SHARE_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG, 1000,
+
GroupCoordinatorConfig.STREAMS_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG, 1000
+ )
+ );
}
private ShareGroupConfig createShareGroupConfig() {
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
index 418b57706fc..a1eff74238f 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
@@ -626,6 +626,20 @@ public class GroupCoordinatorConfigTest {
int offsetMetadataMaxSize,
long offsetsRetentionCheckIntervalMs,
int offsetsRetentionMinutes
+ ) {
+ return createGroupCoordinatorConfig(
+ offsetMetadataMaxSize,
+ offsetsRetentionCheckIntervalMs,
+ offsetsRetentionMinutes,
+ Map.of()
+ );
+ }
+
+ public static GroupCoordinatorConfig createGroupCoordinatorConfig(
+ int offsetMetadataMaxSize,
+ long offsetsRetentionCheckIntervalMs,
+ int offsetsRetentionMinutes,
+ Map<String, Object> additionalConfigs
) {
Map<String, Object> configs = new HashMap<>();
configs.put(GroupCoordinatorConfig.GROUP_COORDINATOR_NUM_THREADS_CONFIG, 1);
@@ -654,6 +668,8 @@ public class GroupCoordinatorConfigTest {
configs.put(GroupCoordinatorConfig.SHARE_GROUP_MAX_SIZE_CONFIG, 1000);
configs.put(GroupCoordinatorConfig.CACHED_BUFFER_MAX_BYTES_CONFIG,
1024 * 1024);
+ configs.putAll(additionalConfigs);
+
return createConfig(configs);
}
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index 02248044e5a..998c597c850 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -20,6 +20,7 @@ import
org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.errors.CoordinatorNotAvailableException;
import org.apache.kafka.common.errors.FencedInstanceIdException;
import org.apache.kafka.common.errors.FencedMemberEpochException;
@@ -87,6 +88,7 @@ import
org.apache.kafka.coordinator.common.runtime.KRaftCoordinatorMetadataDelta
import
org.apache.kafka.coordinator.common.runtime.KRaftCoordinatorMetadataImage;
import org.apache.kafka.coordinator.common.runtime.MetadataImageBuilder;
import org.apache.kafka.coordinator.common.runtime.MockCoordinatorExecutor;
+import org.apache.kafka.coordinator.common.runtime.MockCoordinatorTimer;
import
org.apache.kafka.coordinator.common.runtime.MockCoordinatorTimer.ExpiredTimeout;
import
org.apache.kafka.coordinator.common.runtime.MockCoordinatorTimer.ScheduledTimeout;
import
org.apache.kafka.coordinator.group.api.assignor.ConsumerGroupPartitionAssignor;
@@ -116,6 +118,7 @@ import
org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataVa
import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataKey;
import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataValue;
import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue;
+import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard;
import org.apache.kafka.coordinator.group.modern.Assignment;
import org.apache.kafka.coordinator.group.modern.MemberAssignmentImpl;
import org.apache.kafka.coordinator.group.modern.MemberState;
@@ -127,6 +130,7 @@ import
org.apache.kafka.coordinator.group.modern.consumer.ResolvedRegularExpress
import org.apache.kafka.coordinator.group.modern.share.ShareGroup;
import org.apache.kafka.coordinator.group.modern.share.ShareGroup.InitMapValue;
import org.apache.kafka.coordinator.group.modern.share.ShareGroupBuilder;
+import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig;
import org.apache.kafka.coordinator.group.modern.share.ShareGroupMember;
import org.apache.kafka.coordinator.group.streams.MockTaskAssignor;
import
org.apache.kafka.coordinator.group.streams.StreamsCoordinatorRecordHelpers;
@@ -177,6 +181,7 @@ import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
+import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@@ -21155,6 +21160,125 @@ public class GroupMetadataManagerTest {
context.assertNoRebalanceTimeout(groupId, memberId);
}
+ @Test
+ public void testDynamicBrokerAndGroupConfigs() {
+ testDynamicBrokerAndGroupConfig(
+ GroupMetadataManager::consumerGroupAssignmentIntervalMs,
+
GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG,
+ GroupConfig.CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG,
+ 2000, 1500, 1000, 500
+ );
+ testDynamicBrokerAndGroupConfig(
+ GroupMetadataManager::consumerGroupAssignorOffloadEnable,
+
GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG,
+ GroupConfig.CONSUMER_ASSIGNOR_OFFLOAD_ENABLE_CONFIG,
+ true, false, true, false
+ );
+ testDynamicBrokerAndGroupConfig(
+ GroupMetadataManager::shareGroupAssignmentIntervalMs,
+ GroupCoordinatorConfig.SHARE_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG,
+ GroupConfig.SHARE_ASSIGNMENT_INTERVAL_MS_CONFIG,
+ 2000, 1500, 1000, 500
+ );
+ testDynamicBrokerAndGroupConfig(
+ GroupMetadataManager::shareGroupAssignorOffloadEnable,
+ GroupCoordinatorConfig.SHARE_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG,
+ GroupConfig.SHARE_ASSIGNOR_OFFLOAD_ENABLE_CONFIG,
+ true, false, true, false
+ );
+ testDynamicBrokerAndGroupConfig(
+ GroupMetadataManager::streamsGroupAssignmentIntervalMs,
+ GroupCoordinatorConfig.STREAMS_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG,
+ GroupConfig.STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG,
+ 2000, 1500, 1000, 500
+ );
+ testDynamicBrokerAndGroupConfig(
+ GroupMetadataManager::streamsGroupAssignorOffloadEnable,
+
GroupCoordinatorConfig.STREAMS_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG,
+ GroupConfig.STREAMS_ASSIGNOR_OFFLOAD_ENABLE_CONFIG,
+ true, false, true, false
+ );
+ }
+
+ private <V> void testDynamicBrokerAndGroupConfig(
+ BiFunction<GroupMetadataManager, String, V> getValue,
+ String brokerConfigKey,
+ String groupConfigKey,
+ V initial,
+ V brokerOverride1,
+ V brokerOverride2,
+ V groupOverride
+ ) {
+ class DynamicConfig extends AbstractConfig {
+ private final Map<String, Object> overrides = new HashMap<>();
+
+ DynamicConfig(Map<?, ?> props) {
+ super(
+ Utils.mergeConfigs(List.of(
+ GroupCoordinatorConfig.CONFIG_DEF,
+ ShareGroupConfig.CONFIG_DEF
+ )),
+ props,
+ false
+ );
+ }
+
+ @Override
+ protected Object get(String key) {
+ return overrides.getOrDefault(key, super.get(key));
+ }
+
+ void put(String key, Object value) {
+ overrides.put(key, value);
+ }
+ }
+
+ MockTime time = new MockTime(0, 0, 0);
+ DynamicConfig kafkaConfig = new DynamicConfig(Map.of(brokerConfigKey,
initial));
+ GroupCoordinatorConfig groupCoordinatorConfig = new
GroupCoordinatorConfig(kafkaConfig);
+ ShareGroupConfig shareGroupConfig = new ShareGroupConfig(kafkaConfig);
+ GroupConfigManager groupConfigManager = new GroupConfigManager(
+ groupCoordinatorConfig.extractGroupConfigMap(shareGroupConfig),
+ groupCoordinatorConfig,
+ shareGroupConfig
+ );
+ GroupMetadataManager groupMetadataManager = new
GroupMetadataManager.Builder()
+ .withTime(time)
+ .withTimer(new MockCoordinatorTimer<>(time))
+ .withExecutor(new MockCoordinatorExecutor<>())
+ .withConfig(groupCoordinatorConfig)
+
.withGroupCoordinatorMetricsShard(mock(GroupCoordinatorMetricsShard.class))
+ .withGroupConfigManager(groupConfigManager)
+ .build();
+
+ String groupId = "test-group";
+ assertEquals(initial, getValue.apply(groupMetadataManager, groupId));
+
+ // Set broker-level override.
+ kafkaConfig.put(brokerConfigKey, brokerOverride1);
+ assertEquals(brokerOverride1, getValue.apply(groupMetadataManager,
groupId));
+
+ // Create a group config entry.
+ Properties groupConfig = new Properties();
+ groupConfig.put(GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG, 2000);
+ groupConfigManager.updateGroupConfig(groupId, groupConfig);
+
+ // Check that broker-level overrides still work. The group config must
not bake in the value.
+ assertEquals(brokerOverride1, getValue.apply(groupMetadataManager,
groupId));
+ kafkaConfig.put(brokerConfigKey, brokerOverride2);
+ assertEquals(brokerOverride2, getValue.apply(groupMetadataManager,
groupId));
+
+ // Set group-level override.
+ groupConfig.put(groupConfigKey, String.valueOf(groupOverride));
+ groupConfigManager.updateGroupConfig(groupId, groupConfig);
+ assertEquals(groupOverride, getValue.apply(groupMetadataManager,
groupId));
+
+ // Remove group-level override.
+ groupConfig.remove(groupConfigKey);
+ groupConfigManager.updateGroupConfig(groupId, groupConfig);
+ assertEquals(brokerOverride2, getValue.apply(groupMetadataManager,
groupId));
+ }
+
@Test
public void testConsumerGroupEvaluatedConfigs() {
String groupId = "fooup";