This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch 4.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.3 by this push:
     new a2a6fecfd42 KAFKA-20251: Add group-level configs for assignment 
batching and offload (#21627)
a2a6fecfd42 is described below

commit a2a6fecfd423e638b37b0f61fc186811d65aab55
Author: Sean Quah <[email protected]>
AuthorDate: Thu Mar 19 14:44:53 2026 +0000

    KAFKA-20251: Add group-level configs for assignment batching and offload 
(#21627)
    
    Add group-level {consumer,share,streams}.assignment.interval.ms config
    options to control the delay between assignment calculation. These
    config options override the dynamic broker-level configs.
    
    Add group-level {consumer,share,streams}.assignor.offload.enable config
    options to control whether assignment calculation is offloaded to a
    group coordinator background thread. These config options override the
    dynamic broker-level configs.
    
    Since the broker-level configs for these group-level configs are
    dynamic, we have to use a different approach compared to the existing
    group-level configs. In the interests of getting these new configs into
    Apache Kafka 4.3, we defer refactoring GroupConfig until a later patch.
    
    Reviewers: majialong <[email protected]>, Christo Lolov
    <[email protected]>, David Jacot <[email protected]>
---
 .../scala/unit/kafka/server/KafkaApisTest.scala    |   8 +-
 .../kafka/coordinator/group/GroupConfig.java       | 194 ++++++++++++++++++++-
 .../coordinator/group/GroupMetadataManager.java    |  24 ++-
 .../kafka/coordinator/group/GroupConfigTest.java   | 126 ++++++++++++-
 .../group/GroupCoordinatorConfigTest.java          |  16 ++
 .../group/GroupMetadataManagerTest.java            | 124 +++++++++++++
 6 files changed, 480 insertions(+), 12 deletions(-)

diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 663aaefe839..aaf5409e5f3 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -76,7 +76,7 @@ import org.apache.kafka.common.resource.{PatternType, 
Resource, ResourcePattern,
 import org.apache.kafka.common.security.auth.{KafkaPrincipal, 
KafkaPrincipalSerde, SecurityProtocol}
 import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource
 import org.apache.kafka.common.utils.{ImplicitLinkedHashCollection, 
ProducerIdAndEpoch, SecurityUtils, Utils}
-import 
org.apache.kafka.coordinator.group.GroupConfig.{CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG,
 CONSUMER_SESSION_TIMEOUT_MS_CONFIG, SHARE_AUTO_OFFSET_RESET_CONFIG, 
SHARE_DELIVERY_COUNT_LIMIT_CONFIG, SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, 
SHARE_ISOLATION_LEVEL_CONFIG, SHARE_PARTITION_MAX_RECORD_LOCKS_CONFIG, 
SHARE_RECORD_LOCK_DURATION_MS_CONFIG, SHARE_RENEW_ACKNOWLEDGE_ENABLE_CONFIG, 
SHARE_SESSION_TIMEOUT_MS_CONFIG, STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG, 
STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFI [...]
+import 
org.apache.kafka.coordinator.group.GroupConfig.{CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG,
 CONSUMER_ASSIGNOR_OFFLOAD_ENABLE_CONFIG, 
CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, CONSUMER_SESSION_TIMEOUT_MS_CONFIG, 
SHARE_ASSIGNMENT_INTERVAL_MS_CONFIG, SHARE_ASSIGNOR_OFFLOAD_ENABLE_CONFIG, 
SHARE_AUTO_OFFSET_RESET_CONFIG, SHARE_DELIVERY_COUNT_LIMIT_CONFIG, 
SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, SHARE_ISOLATION_LEVEL_CONFIG, 
SHARE_PARTITION_MAX_RECORD_LOCKS_CONFIG, SHARE_RECORD_LOCK_DURATION_MS_CO [...]
 import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig
 import org.apache.kafka.coordinator.group.{GroupConfig, GroupConfigManager, 
GroupCoordinator, GroupCoordinatorConfig}
 import org.apache.kafka.coordinator.group.streams.StreamsGroupHeartbeatResult
@@ -356,6 +356,8 @@ class KafkaApisTest extends Logging {
     val cgConfigs = new Properties()
     cgConfigs.put(CONSUMER_SESSION_TIMEOUT_MS_CONFIG, 
GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_DEFAULT.toString)
     cgConfigs.put(CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, 
GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT.toString)
+    cgConfigs.put(CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG, "1000")
+    cgConfigs.put(CONSUMER_ASSIGNOR_OFFLOAD_ENABLE_CONFIG, "false");
     cgConfigs.put(SHARE_SESSION_TIMEOUT_MS_CONFIG, 
GroupCoordinatorConfig.SHARE_GROUP_SESSION_TIMEOUT_MS_DEFAULT.toString)
     cgConfigs.put(SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, 
GroupCoordinatorConfig.SHARE_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT.toString)
     cgConfigs.put(SHARE_RECORD_LOCK_DURATION_MS_CONFIG, 
ShareGroupConfig.SHARE_GROUP_RECORD_LOCK_DURATION_MS_DEFAULT.toString)
@@ -364,10 +366,14 @@ class KafkaApisTest extends Logging {
     cgConfigs.put(SHARE_AUTO_OFFSET_RESET_CONFIG, 
GroupConfig.SHARE_AUTO_OFFSET_RESET_DEFAULT)
     cgConfigs.put(SHARE_ISOLATION_LEVEL_CONFIG, 
GroupConfig.SHARE_ISOLATION_LEVEL_DEFAULT)
     cgConfigs.put(SHARE_RENEW_ACKNOWLEDGE_ENABLE_CONFIG, 
GroupConfig.SHARE_RENEW_ACKNOWLEDGE_ENABLE_DEFAULT.toString)
+    cgConfigs.put(SHARE_ASSIGNMENT_INTERVAL_MS_CONFIG, "1000")
+    cgConfigs.put(SHARE_ASSIGNOR_OFFLOAD_ENABLE_CONFIG, "false");
     cgConfigs.put(STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG, 
GroupCoordinatorConfig.STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT.toString)
     cgConfigs.put(STREAMS_SESSION_TIMEOUT_MS_CONFIG, 
GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_DEFAULT.toString)
     cgConfigs.put(STREAMS_NUM_STANDBY_REPLICAS_CONFIG, 
GroupCoordinatorConfig.STREAMS_GROUP_NUM_STANDBY_REPLICAS_DEFAULT.toString)
     cgConfigs.put(STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG, 
GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_DEFAULT.toString)
+    cgConfigs.put(STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG, "1000")
+    cgConfigs.put(STREAMS_ASSIGNOR_OFFLOAD_ENABLE_CONFIG, "false");
     cgConfigs.put(STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG, 
GroupCoordinatorConfig.STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_DEFAULT.toString)
 
     when(configRepository.groupConfig(consumerGroupId)).thenReturn(cgConfigs)
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
index 3d6ca00af45..8b1ce751ef0 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
@@ -53,6 +53,10 @@ public final class GroupConfig extends AbstractConfig {
 
     public static final String CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG = 
"consumer.heartbeat.interval.ms";
 
+    public static final String CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG = 
"consumer.assignment.interval.ms";
+
+    public static final String CONSUMER_ASSIGNOR_OFFLOAD_ENABLE_CONFIG = 
"consumer.assignor.offload.enable";
+
     public static final String SHARE_SESSION_TIMEOUT_MS_CONFIG = 
"share.session.timeout.ms";
 
     public static final String SHARE_HEARTBEAT_INTERVAL_MS_CONFIG = 
"share.heartbeat.interval.ms";
@@ -84,6 +88,10 @@ public final class GroupConfig extends AbstractConfig {
     public static final boolean SHARE_RENEW_ACKNOWLEDGE_ENABLE_DEFAULT = true;
     public static final String SHARE_RENEW_ACKNOWLEDGE_ENABLE_DOC = "Whether 
the renew acknowledge type is enabled for the share group.";
 
+    public static final String SHARE_ASSIGNMENT_INTERVAL_MS_CONFIG = 
"share.assignment.interval.ms";
+
+    public static final String SHARE_ASSIGNOR_OFFLOAD_ENABLE_CONFIG = 
"share.assignor.offload.enable";
+
     public static final String STREAMS_SESSION_TIMEOUT_MS_CONFIG = 
"streams.session.timeout.ms";
 
     public static final String STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG = 
"streams.heartbeat.interval.ms";
@@ -92,12 +100,23 @@ public final class GroupConfig extends AbstractConfig {
 
     public static final String STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG = 
"streams.initial.rebalance.delay.ms";
 
+    public static final String STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG = 
"streams.assignment.interval.ms";
+
+    public static final String STREAMS_ASSIGNOR_OFFLOAD_ENABLE_CONFIG = 
"streams.assignor.offload.enable";
+
     public static final String STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG = 
"streams.task.offset.interval.ms";
 
     public final int consumerSessionTimeoutMs;
 
     public final int consumerHeartbeatIntervalMs;
 
+    // These have to be optionals because their default group coordinator 
configs are dynamic,
+    // so we must not capture the default value at the time of GroupConfig 
construction.
+    // KAFKA-20337 tracks the work to refactor GroupConfig into something more 
consistent.
+    public final Optional<Integer> consumerAssignmentIntervalMs;
+
+    public final Optional<Boolean> consumerAssignorOffloadEnable;
+
     public final int shareSessionTimeoutMs;
 
     public final int shareHeartbeatIntervalMs;
@@ -110,6 +129,13 @@ public final class GroupConfig extends AbstractConfig {
 
     public final String shareAutoOffsetReset;
 
+    // These have to be optionals because their default group coordinator 
configs are dynamic,
+    // so we must not capture the default value at the time of GroupConfig 
construction.
+    // KAFKA-20337 tracks the work to refactor GroupConfig into something more 
consistent.
+    public final Optional<Integer> shareAssignmentIntervalMs;
+
+    public final Optional<Boolean> shareAssignorOffloadEnable;
+
     public final int streamsSessionTimeoutMs;
 
     public final int streamsHeartbeatIntervalMs;
@@ -118,6 +144,13 @@ public final class GroupConfig extends AbstractConfig {
 
     public final int streamsInitialRebalanceDelayMs;
 
+    // These have to be optionals because their default group coordinator 
configs are dynamic,
+    // so we must not capture the default value at the time of GroupConfig 
construction.
+    // KAFKA-20337 tracks the work to refactor GroupConfig into something more 
consistent.
+    public final Optional<Integer> streamsAssignmentIntervalMs;
+
+    public final Optional<Boolean> streamsAssignorOffloadEnable;
+
     public final int streamsTaskOffsetIntervalMs;
 
     public final String shareIsolationLevel;
@@ -137,6 +170,17 @@ public final class GroupConfig extends AbstractConfig {
             atLeast(1),
             MEDIUM,
             GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_DOC)
+        .define(CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG,
+            INT,
+            
GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_DEFAULT,
+            atLeast(0),
+            MEDIUM,
+            GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_DOC)
+        .define(CONSUMER_ASSIGNOR_OFFLOAD_ENABLE_CONFIG,
+            BOOLEAN,
+            
GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNOR_OFFLOAD_ENABLE_DEFAULT,
+            MEDIUM,
+            GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNOR_OFFLOAD_ENABLE_DOC)
         .define(SHARE_SESSION_TIMEOUT_MS_CONFIG,
             INT,
             GroupCoordinatorConfig.SHARE_GROUP_SESSION_TIMEOUT_MS_DEFAULT,
@@ -184,6 +228,17 @@ public final class GroupConfig extends AbstractConfig {
             SHARE_RENEW_ACKNOWLEDGE_ENABLE_DEFAULT,
             MEDIUM,
             SHARE_RENEW_ACKNOWLEDGE_ENABLE_DOC)
+        .define(SHARE_ASSIGNMENT_INTERVAL_MS_CONFIG,
+            INT,
+            GroupCoordinatorConfig.SHARE_GROUP_ASSIGNMENT_INTERVAL_MS_DEFAULT,
+            atLeast(0),
+            MEDIUM,
+            GroupCoordinatorConfig.SHARE_GROUP_ASSIGNMENT_INTERVAL_MS_DOC)
+        .define(SHARE_ASSIGNOR_OFFLOAD_ENABLE_CONFIG,
+            BOOLEAN,
+            GroupCoordinatorConfig.SHARE_GROUP_ASSIGNOR_OFFLOAD_ENABLE_DEFAULT,
+            MEDIUM,
+            GroupCoordinatorConfig.SHARE_GROUP_ASSIGNOR_OFFLOAD_ENABLE_DOC)
         .define(STREAMS_SESSION_TIMEOUT_MS_CONFIG,
             INT,
             GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_DEFAULT,
@@ -208,6 +263,17 @@ public final class GroupConfig extends AbstractConfig {
             atLeast(0),
             MEDIUM,
             
GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_DOC)
+        .define(STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG,
+            INT,
+            
GroupCoordinatorConfig.STREAMS_GROUP_ASSIGNMENT_INTERVAL_MS_DEFAULT,
+            atLeast(0),
+            MEDIUM,
+            GroupCoordinatorConfig.STREAMS_GROUP_ASSIGNMENT_INTERVAL_MS_DOC)
+        .define(STREAMS_ASSIGNOR_OFFLOAD_ENABLE_CONFIG,
+            BOOLEAN,
+            
GroupCoordinatorConfig.STREAMS_GROUP_ASSIGNOR_OFFLOAD_ENABLE_DEFAULT,
+            MEDIUM,
+            GroupCoordinatorConfig.STREAMS_GROUP_ASSIGNOR_OFFLOAD_ENABLE_DOC)
         .define(STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG,
             INT,
             
GroupCoordinatorConfig.STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_DEFAULT,
@@ -219,16 +285,43 @@ public final class GroupConfig extends AbstractConfig {
         super(CONFIG, props, false);
         this.consumerSessionTimeoutMs = 
getInt(CONSUMER_SESSION_TIMEOUT_MS_CONFIG);
         this.consumerHeartbeatIntervalMs = 
getInt(CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG);
+        // These have to be optionals because their default group coordinator 
configs are dynamic,
+        // so we must not capture the default value at the time of GroupConfig 
construction.
+        // KAFKA-20337 tracks the work to refactor GroupConfig into something 
more consistent.
+        this.consumerAssignmentIntervalMs = 
props.containsKey(CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG) ?
+            Optional.of(getInt(CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG)) :
+            Optional.empty();
+        this.consumerAssignorOffloadEnable = 
props.containsKey(CONSUMER_ASSIGNOR_OFFLOAD_ENABLE_CONFIG) ?
+            Optional.of(getBoolean(CONSUMER_ASSIGNOR_OFFLOAD_ENABLE_CONFIG)) :
+            Optional.empty();
         this.shareSessionTimeoutMs = getInt(SHARE_SESSION_TIMEOUT_MS_CONFIG);
         this.shareHeartbeatIntervalMs = 
getInt(SHARE_HEARTBEAT_INTERVAL_MS_CONFIG);
         this.shareRecordLockDurationMs = 
getInt(SHARE_RECORD_LOCK_DURATION_MS_CONFIG);
         this.shareDeliveryCountLimit = 
getInt(SHARE_DELIVERY_COUNT_LIMIT_CONFIG);
         this.sharePartitionMaxRecordLocks = 
getInt(SHARE_PARTITION_MAX_RECORD_LOCKS_CONFIG);
         this.shareAutoOffsetReset = getString(SHARE_AUTO_OFFSET_RESET_CONFIG);
+        // These have to be optionals because their default group coordinator 
configs are dynamic,
+        // so we must not capture the default value at the time of GroupConfig 
construction.
+        // KAFKA-20337 tracks the work to refactor GroupConfig into something 
more consistent.
+        this.shareAssignmentIntervalMs = 
props.containsKey(SHARE_ASSIGNMENT_INTERVAL_MS_CONFIG) ?
+            Optional.of(getInt(SHARE_ASSIGNMENT_INTERVAL_MS_CONFIG)) :
+            Optional.empty();
+        this.shareAssignorOffloadEnable = 
props.containsKey(SHARE_ASSIGNOR_OFFLOAD_ENABLE_CONFIG) ?
+            Optional.of(getBoolean(SHARE_ASSIGNOR_OFFLOAD_ENABLE_CONFIG)) :
+            Optional.empty();
         this.streamsSessionTimeoutMs = 
getInt(STREAMS_SESSION_TIMEOUT_MS_CONFIG);
         this.streamsHeartbeatIntervalMs = 
getInt(STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG);
         this.streamsNumStandbyReplicas = 
getInt(STREAMS_NUM_STANDBY_REPLICAS_CONFIG);
         this.streamsInitialRebalanceDelayMs = 
getInt(STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG);
+        // These have to be optionals because their default group coordinator 
configs are dynamic,
+        // so we must not capture the default value at the time of GroupConfig 
construction.
+        // KAFKA-20337 tracks the work to refactor GroupConfig into something 
more consistent.
+        this.streamsAssignmentIntervalMs = 
props.containsKey(STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG) ?
+            Optional.of(getInt(STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG)) :
+            Optional.empty();
+        this.streamsAssignorOffloadEnable = 
props.containsKey(STREAMS_ASSIGNOR_OFFLOAD_ENABLE_CONFIG) ?
+            Optional.of(getBoolean(STREAMS_ASSIGNOR_OFFLOAD_ENABLE_CONFIG)) :
+            Optional.empty();
         this.streamsTaskOffsetIntervalMs = 
getInt(STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG);
         this.shareIsolationLevel = getString(SHARE_ISOLATION_LEVEL_CONFIG);
         this.shareRenewAcknowledgeEnable = 
getBoolean(SHARE_RENEW_ACKNOWLEDGE_ENABLE_CONFIG);
@@ -262,17 +355,21 @@ public final class GroupConfig extends AbstractConfig {
      * Validates the values of the given properties.
      */
     @SuppressWarnings({"CyclomaticComplexity", "NPathComplexity"})
-    private static void validateValues(Map<String, ?> valueMaps, 
GroupCoordinatorConfig groupCoordinatorConfig, ShareGroupConfig 
shareGroupConfig) {
+    private static void validateValues(Map<String, Object> unparsedMap, 
GroupCoordinatorConfig groupCoordinatorConfig, ShareGroupConfig 
shareGroupConfig) {
+        Map<String, Object> valueMaps = CONFIG.parse(unparsedMap);
         int consumerHeartbeatInterval = (Integer) 
valueMaps.get(CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG);
         int consumerSessionTimeout = (Integer) 
valueMaps.get(CONSUMER_SESSION_TIMEOUT_MS_CONFIG);
+        int consumerAssignmentIntervalMs = (Integer) 
valueMaps.get(CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG);
         int shareHeartbeatInterval = (Integer) 
valueMaps.get(SHARE_HEARTBEAT_INTERVAL_MS_CONFIG);
         int shareSessionTimeout = (Integer) 
valueMaps.get(SHARE_SESSION_TIMEOUT_MS_CONFIG);
         int shareRecordLockDurationMs = (Integer) 
valueMaps.get(SHARE_RECORD_LOCK_DURATION_MS_CONFIG);
         int shareDeliveryCountLimit = (Integer) 
valueMaps.get(SHARE_DELIVERY_COUNT_LIMIT_CONFIG);
         int sharePartitionMaxRecordLocks = (Integer) 
valueMaps.get(SHARE_PARTITION_MAX_RECORD_LOCKS_CONFIG);
+        int shareAssignmentIntervalMs = (Integer) 
valueMaps.get(SHARE_ASSIGNMENT_INTERVAL_MS_CONFIG);
         int streamsSessionTimeoutMs = (Integer) 
valueMaps.get(STREAMS_SESSION_TIMEOUT_MS_CONFIG);
         int streamsHeartbeatIntervalMs = (Integer) 
valueMaps.get(STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG);
         int streamsNumStandbyReplicas = (Integer) 
valueMaps.get(STREAMS_NUM_STANDBY_REPLICAS_CONFIG);
+        int streamsAssignmentIntervalMs = (Integer) 
valueMaps.get(STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG);
         int streamsTaskOffsetIntervalMs = (Integer) 
valueMaps.get(STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG);
         if (consumerHeartbeatInterval < 
groupCoordinatorConfig.consumerGroupMinHeartbeatIntervalMs()) {
             throw new 
InvalidConfigurationException(CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG + " must be 
greater than or equal to " +
@@ -290,6 +387,19 @@ public final class GroupConfig extends AbstractConfig {
             throw new 
InvalidConfigurationException(CONSUMER_SESSION_TIMEOUT_MS_CONFIG + " must be 
less than or equal to " +
                 
GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG);
         }
+        // If no group-level consumer assignment interval is configured, do 
not attempt to validate it.
+        // TODO: It's not clear if we can run the validation unconditionally.
+        //       KAFKA-20337 tracks the work to clean this up.
+        if (unparsedMap.containsKey(CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG)) {
+            if (consumerAssignmentIntervalMs < 
groupCoordinatorConfig.consumerGroupMinAssignmentIntervalMs()) {
+                throw new 
InvalidConfigurationException(CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG + " must 
be greater than or equal to " +
+                    
GroupCoordinatorConfig.CONSUMER_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG);
+            }
+            if (consumerAssignmentIntervalMs > 
groupCoordinatorConfig.consumerGroupMaxAssignmentIntervalMs()) {
+                throw new 
InvalidConfigurationException(CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG + " must 
be less than or equal to " +
+                    
GroupCoordinatorConfig.CONSUMER_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_CONFIG);
+            }
+        }
         if (shareHeartbeatInterval < 
groupCoordinatorConfig.shareGroupMinHeartbeatIntervalMs()) {
             throw new 
InvalidConfigurationException(SHARE_HEARTBEAT_INTERVAL_MS_CONFIG + " must be 
greater than or equal to " +
                 
GroupCoordinatorConfig.SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG);
@@ -330,6 +440,19 @@ public final class GroupConfig extends AbstractConfig {
             throw new 
InvalidConfigurationException(SHARE_PARTITION_MAX_RECORD_LOCKS_CONFIG + " must 
be less than or equal to " +
                 
ShareGroupConfig.SHARE_GROUP_MAX_PARTITION_MAX_RECORD_LOCKS_CONFIG);
         }
+        // If no group-level share assignment interval is configured, do not 
attempt to validate it.
+        // TODO: It's not clear if we can run the validation unconditionally.
+        //       KAFKA-20337 tracks the work to clean this up.
+        if (unparsedMap.containsKey(SHARE_ASSIGNMENT_INTERVAL_MS_CONFIG)) {
+            if (shareAssignmentIntervalMs < 
groupCoordinatorConfig.shareGroupMinAssignmentIntervalMs()) {
+                throw new 
InvalidConfigurationException(SHARE_ASSIGNMENT_INTERVAL_MS_CONFIG + " must be 
greater than or equal to " +
+                    
GroupCoordinatorConfig.SHARE_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG);
+            }
+            if (shareAssignmentIntervalMs > 
groupCoordinatorConfig.shareGroupMaxAssignmentIntervalMs()) {
+                throw new 
InvalidConfigurationException(SHARE_ASSIGNMENT_INTERVAL_MS_CONFIG + " must be 
less than or equal to " +
+                    
GroupCoordinatorConfig.SHARE_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_CONFIG);
+            }
+        }
         if (streamsHeartbeatIntervalMs < 
groupCoordinatorConfig.streamsGroupMinHeartbeatIntervalMs()) {
             throw new 
InvalidConfigurationException(STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG + " must be 
greater than or equal to " +
                 
GroupCoordinatorConfig.STREAMS_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG);
@@ -350,6 +473,19 @@ public final class GroupConfig extends AbstractConfig {
             throw new 
InvalidConfigurationException(STREAMS_NUM_STANDBY_REPLICAS_CONFIG + " must be 
less than or equal to " +
                 
GroupCoordinatorConfig.STREAMS_GROUP_MAX_STANDBY_REPLICAS_CONFIG);
         }
+        // If no group-level streams assignment interval is configured, do not 
attempt to validate it.
+        // TODO: It's not clear if we can run the validation unconditionally.
+        //       KAFKA-20337 tracks the work to clean this up.
+        if (unparsedMap.containsKey(STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG)) {
+            if (streamsAssignmentIntervalMs < 
groupCoordinatorConfig.streamsGroupMinAssignmentIntervalMs()) {
+                throw new 
InvalidConfigurationException(STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG + " must be 
greater than or equal to " +
+                    
GroupCoordinatorConfig.STREAMS_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG);
+            }
+            if (streamsAssignmentIntervalMs > 
groupCoordinatorConfig.streamsGroupMaxAssignmentIntervalMs()) {
+                throw new 
InvalidConfigurationException(STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG + " must be 
less than or equal to " +
+                    
GroupCoordinatorConfig.STREAMS_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_CONFIG);
+            }
+        }
         if (streamsTaskOffsetIntervalMs < 
groupCoordinatorConfig.streamsGroupMinTaskOffsetIntervalMs()) {
             throw new 
InvalidConfigurationException(STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG + " must 
be greater than or equal to " +
                 
GroupCoordinatorConfig.STREAMS_GROUP_MIN_TASK_OFFSET_INTERVAL_MS_CONFIG);
@@ -374,13 +510,14 @@ public final class GroupConfig extends AbstractConfig {
      * the broker-level defaults before validation.
      */
     public static void validate(Map<String, ?> props, GroupCoordinatorConfig 
groupCoordinatorConfig, ShareGroupConfig shareGroupConfig) {
+        // TODO: We shouldn't be re-deriving the default config from 
GroupConfigManager here.
+        //       KAFKA-20337 tracks the work to clean this up.
         Map<String, Object> combinedConfigs = new HashMap<>();
         
combinedConfigs.putAll(groupCoordinatorConfig.extractGroupConfigMap(shareGroupConfig));
         combinedConfigs.putAll(props);
 
         validateNames(combinedConfigs);
-        Map<String, Object> valueMaps = CONFIG.parse(combinedConfigs);
-        validateValues(valueMaps, groupCoordinatorConfig, shareGroupConfig);
+        validateValues(combinedConfigs, groupCoordinatorConfig, 
shareGroupConfig);
     }
 
     /**
@@ -418,6 +555,9 @@ public final class GroupConfig extends AbstractConfig {
         clampToRange(props, groupId, CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG,
             groupCoordinatorConfig.consumerGroupMinHeartbeatIntervalMs(),
             groupCoordinatorConfig.consumerGroupMaxHeartbeatIntervalMs());
+        clampToRange(props, groupId, CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG,
+            groupCoordinatorConfig.consumerGroupMinAssignmentIntervalMs(),
+            groupCoordinatorConfig.consumerGroupMaxAssignmentIntervalMs());
 
         // Share group configs
         clampToRange(props, groupId, SHARE_SESSION_TIMEOUT_MS_CONFIG,
@@ -435,6 +575,9 @@ public final class GroupConfig extends AbstractConfig {
         clampToRange(props, groupId, SHARE_PARTITION_MAX_RECORD_LOCKS_CONFIG,
             shareGroupConfig.shareGroupMinPartitionMaxRecordLocks(),
             shareGroupConfig.shareGroupMaxPartitionMaxRecordLocks());
+        clampToRange(props, groupId, SHARE_ASSIGNMENT_INTERVAL_MS_CONFIG,
+            groupCoordinatorConfig.shareGroupMinAssignmentIntervalMs(),
+            groupCoordinatorConfig.shareGroupMaxAssignmentIntervalMs());
 
         // Streams group configs
         clampToRange(props, groupId, STREAMS_SESSION_TIMEOUT_MS_CONFIG,
@@ -445,6 +588,9 @@ public final class GroupConfig extends AbstractConfig {
             groupCoordinatorConfig.streamsGroupMaxHeartbeatIntervalMs());
         clampToMax(props, groupId, STREAMS_NUM_STANDBY_REPLICAS_CONFIG,
             groupCoordinatorConfig.streamsGroupMaxNumStandbyReplicas());
+        clampToRange(props, groupId, STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG,
+            groupCoordinatorConfig.streamsGroupMinAssignmentIntervalMs(),
+            groupCoordinatorConfig.streamsGroupMaxAssignmentIntervalMs());
         clampToMin(props, groupId, STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG,
             groupCoordinatorConfig.streamsGroupMinTaskOffsetIntervalMs());
 
@@ -612,6 +758,20 @@ public final class GroupConfig extends AbstractConfig {
         return consumerHeartbeatIntervalMs;
     }
 
+    /**
+     * The interval between assignment updates for a consumer group.
+     */
+    public Optional<Integer> consumerAssignmentIntervalMs() {
+        return consumerAssignmentIntervalMs;
+    }
+
+    /**
+     * Whether to offload consumer group assignment to a group coordinator 
background thread.
+     */
+    public Optional<Boolean> consumerAssignorOffloadEnable() {
+        return consumerAssignorOffloadEnable;
+    }
+
     /**
      * The share group session timeout in milliseconds.
      */
@@ -654,6 +814,20 @@ public final class GroupConfig extends AbstractConfig {
         return 
ShareGroupAutoOffsetResetStrategy.fromString(shareAutoOffsetReset);
     }
 
+    /**
+     * The interval between assignment updates for a share group.
+     */
+    public Optional<Integer> shareAssignmentIntervalMs() {
+        return shareAssignmentIntervalMs;
+    }
+
+    /**
+     * Whether to offload share group assignment to a group coordinator 
background thread.
+     */
+    public Optional<Boolean> shareAssignorOffloadEnable() {
+        return shareAssignorOffloadEnable;
+    }
+
     /**
      * The streams group session timeout in milliseconds.
      */
@@ -682,6 +856,20 @@ public final class GroupConfig extends AbstractConfig {
         return streamsInitialRebalanceDelayMs;
     }
 
+    /**
+     * The interval between assignment updates for a streams group.
+     */
+    public Optional<Integer> streamsAssignmentIntervalMs() {
+        return streamsAssignmentIntervalMs;
+    }
+
+    /**
+     * Whether to offload streams group assignment to a group coordinator 
background thread.
+     */
+    public Optional<Boolean> streamsAssignorOffloadEnable() {
+        return streamsAssignorOffloadEnable;
+    }
+
     /**
      * The task offset reporting interval.
      */
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index 27fa34ccce5..09714ab28b8 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -8776,7 +8776,9 @@ public class GroupMetadataManager {
      */
     // package private for testing
     int consumerGroupAssignmentIntervalMs(String groupId) {
-        return config.consumerGroupAssignmentIntervalMs();
+        Optional<GroupConfig> groupConfig = 
groupConfigManager.groupConfig(groupId);
+        return groupConfig.flatMap(GroupConfig::consumerAssignmentIntervalMs)
+            .orElse(config.consumerGroupAssignmentIntervalMs());
     }
 
     /**
@@ -8784,7 +8786,9 @@ public class GroupMetadataManager {
      */
     // package private for testing
     boolean consumerGroupAssignorOffloadEnable(String groupId) {
-        return config.consumerGroupAssignorOffloadEnable();
+        Optional<GroupConfig> groupConfig = 
groupConfigManager.groupConfig(groupId);
+        return groupConfig.flatMap(GroupConfig::consumerAssignorOffloadEnable)
+            .orElse(config.consumerGroupAssignorOffloadEnable());
     }
 
     /**
@@ -8810,7 +8814,9 @@ public class GroupMetadataManager {
      */
     // package private for testing
     int shareGroupAssignmentIntervalMs(String groupId) {
-        return config.shareGroupAssignmentIntervalMs();
+        Optional<GroupConfig> groupConfig = 
groupConfigManager.groupConfig(groupId);
+        return groupConfig.flatMap(GroupConfig::shareAssignmentIntervalMs)
+            .orElse(config.shareGroupAssignmentIntervalMs());
     }
 
     /**
@@ -8818,7 +8824,9 @@ public class GroupMetadataManager {
      */
     // package private for testing
     boolean shareGroupAssignorOffloadEnable(String groupId) {
-        return config.shareGroupAssignorOffloadEnable();
+        Optional<GroupConfig> groupConfig = 
groupConfigManager.groupConfig(groupId);
+        return groupConfig.flatMap(GroupConfig::shareAssignorOffloadEnable)
+            .orElse(config.shareGroupAssignorOffloadEnable());
     }
 
     /**
@@ -8844,7 +8852,9 @@ public class GroupMetadataManager {
      */
     // package private for testing
     int streamsGroupAssignmentIntervalMs(String groupId) {
-        return config.streamsGroupAssignmentIntervalMs();
+        Optional<GroupConfig> groupConfig = 
groupConfigManager.groupConfig(groupId);
+        return groupConfig.flatMap(GroupConfig::streamsAssignmentIntervalMs)
+            .orElse(config.streamsGroupAssignmentIntervalMs());
     }
 
     /**
@@ -8852,7 +8862,9 @@ public class GroupMetadataManager {
      */
     // package private for testing
     boolean streamsGroupAssignorOffloadEnable(String groupId) {
-        return config.streamsGroupAssignorOffloadEnable();
+        Optional<GroupConfig> groupConfig = 
groupConfigManager.groupConfig(groupId);
+        return groupConfig.flatMap(GroupConfig::streamsAssignorOffloadEnable)
+            .orElse(config.streamsGroupAssignorOffloadEnable());
     }
 
     /**
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
index 8c0772abb9f..139aa4ab3ce 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
@@ -29,17 +29,21 @@ import org.junit.jupiter.params.provider.MethodSource;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Properties;
 import java.util.stream.Stream;
 
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.CONSUMER_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_DEFAULT;
 import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DEFAULT;
 import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT;
 import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_DEFAULT;
 import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_DEFAULT;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.SHARE_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_DEFAULT;
 import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DEFAULT;
 import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.SHARE_GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT;
 import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_DEFAULT;
 import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.SHARE_GROUP_MIN_SESSION_TIMEOUT_MS_DEFAULT;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.STREAMS_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_DEFAULT;
 import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DEFAULT;
 import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.STREAMS_GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT;
 import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.STREAMS_GROUP_MAX_STANDBY_REPLICAS_DEFAULT;
@@ -74,12 +78,17 @@ public class GroupConfigTest {
     private static final int SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS = 60000;
 
     @Test
+    @SuppressWarnings("CyclomaticComplexity")
     public void testFromPropsInvalid() {
         GroupConfig.configNames().forEach(name -> {
             if (GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG.equals(name)) {
                 assertPropertyInvalid(name, "not_a_number", "-0.1", "1.2");
             } else if 
(GroupConfig.CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG.equals(name)) {
                 assertPropertyInvalid(name, "not_a_number", "-0.1", "1.2");
+            } else if 
(GroupConfig.CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG.equals(name)) {
+                assertPropertyInvalid(name, "not_a_number", "-1", "1.2");
+            } else if 
(GroupConfig.CONSUMER_ASSIGNOR_OFFLOAD_ENABLE_CONFIG.equals(name)) {
+                assertPropertyInvalid(name, "not_a_boolean");
             } else if 
(GroupConfig.SHARE_SESSION_TIMEOUT_MS_CONFIG.equals(name)) {
                 assertPropertyInvalid(name, "not_a_number", "-0.1", "1.2");
             } else if 
(GroupConfig.SHARE_HEARTBEAT_INTERVAL_MS_CONFIG.equals(name)) {
@@ -96,6 +105,10 @@ public class GroupConfigTest {
                 assertPropertyInvalid(name, "hello", "1.0");
             } else if 
(GroupConfig.SHARE_RENEW_ACKNOWLEDGE_ENABLE_CONFIG.equals(name)) {
                 assertPropertyInvalid(name, "not_a_boolean", "1");
+            } else if 
(GroupConfig.SHARE_ASSIGNMENT_INTERVAL_MS_CONFIG.equals(name)) {
+                assertPropertyInvalid(name, "not_a_number", "-1", "1.2");
+            } else if 
(GroupConfig.SHARE_ASSIGNOR_OFFLOAD_ENABLE_CONFIG.equals(name)) {
+                assertPropertyInvalid(name, "not_a_boolean");
             } else if 
(GroupConfig.STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG.equals(name)) {
                 assertPropertyInvalid(name, "not_a_number", "1.0");
             } else if 
(GroupConfig.STREAMS_NUM_STANDBY_REPLICAS_CONFIG.equals(name)) {
@@ -104,6 +117,10 @@ public class GroupConfigTest {
                 assertPropertyInvalid(name, "not_a_number", "1.0");
             } else if 
(GroupConfig.STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG.equals(name)) {
                 assertPropertyInvalid(name, "not_a_number", "-1", "1.0");
+            } else if 
(GroupConfig.STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG.equals(name)) {
+                assertPropertyInvalid(name, "not_a_number", "-1", "1.2");
+            } else if 
(GroupConfig.STREAMS_ASSIGNOR_OFFLOAD_ENABLE_CONFIG.equals(name)) {
+                assertPropertyInvalid(name, "not_a_boolean");
             } else if 
(GroupConfig.STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG.equals(name)) {
                 assertPropertyInvalid(name, "not_a_number", "1.0");
             } else {
@@ -177,6 +194,16 @@ public class GroupConfigTest {
         doTestInvalidProps(props, InvalidConfigurationException.class);
         props = createValidGroupConfig();
 
+        // Check for invalid consumerAssignmentIntervalMs, < MIN
+        props.put(GroupConfig.CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG, "500");
+        doTestInvalidProps(props, InvalidConfigurationException.class);
+        props = createValidGroupConfig();
+
+        // Check for invalid consumerAssignmentIntervalMs, > MAX
+        props.put(GroupConfig.CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG, "20000");
+        doTestInvalidProps(props, InvalidConfigurationException.class);
+        props = createValidGroupConfig();
+
         // Check for invalid shareSessionTimeoutMs, < MIN
         props.put(GroupConfig.SHARE_SESSION_TIMEOUT_MS_CONFIG, "1");
         doTestInvalidProps(props, InvalidConfigurationException.class);
@@ -244,6 +271,16 @@ public class GroupConfigTest {
         doTestInvalidProps(props, ConfigException.class);
         props = createValidGroupConfig();
 
+        // Check for invalid shareAssignmentIntervalMs, < MIN
+        props.put(GroupConfig.SHARE_ASSIGNMENT_INTERVAL_MS_CONFIG, "500");
+        doTestInvalidProps(props, InvalidConfigurationException.class);
+        props = createValidGroupConfig();
+
+        // Check for invalid shareAssignmentIntervalMs, > MAX
+        props.put(GroupConfig.SHARE_ASSIGNMENT_INTERVAL_MS_CONFIG, "20000");
+        doTestInvalidProps(props, InvalidConfigurationException.class);
+        props = createValidGroupConfig();
+
         // Check for invalid streamsSessionTimeoutMs, < MIN
         props.put(GroupConfig.STREAMS_SESSION_TIMEOUT_MS_CONFIG, "1");
         doTestInvalidProps(props, InvalidConfigurationException.class);
@@ -264,6 +301,16 @@ public class GroupConfigTest {
         doTestInvalidProps(props, InvalidConfigurationException.class);
         props = createValidGroupConfig();
 
+        // Check for invalid streamsAssignmentIntervalMs, < MIN
+        props.put(GroupConfig.STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG, "500");
+        doTestInvalidProps(props, InvalidConfigurationException.class);
+        props = createValidGroupConfig();
+
+        // Check for invalid streamsAssignmentIntervalMs, > MAX
+        props.put(GroupConfig.STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG, "20000");
+        doTestInvalidProps(props, InvalidConfigurationException.class);
+        props = createValidGroupConfig();
+
         // Check for invalid streamsTaskOffsetIntervalMs, < MIN
         props.put(GroupConfig.STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG, "1000");
         doTestInvalidProps(props, InvalidConfigurationException.class);
@@ -297,6 +344,8 @@ public class GroupConfigTest {
         Map<String, String> defaultValue = new HashMap<>();
         defaultValue.put(GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG, "10");
         defaultValue.put(GroupConfig.CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, 
"10");
+        defaultValue.put(GroupConfig.CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG, 
"5000");
+        defaultValue.put(GroupConfig.CONSUMER_ASSIGNOR_OFFLOAD_ENABLE_CONFIG, 
"false");
         defaultValue.put(GroupConfig.SHARE_SESSION_TIMEOUT_MS_CONFIG, "10");
         defaultValue.put(GroupConfig.SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, "10");
         defaultValue.put(GroupConfig.SHARE_RECORD_LOCK_DURATION_MS_CONFIG, 
"2000");
@@ -304,10 +353,14 @@ public class GroupConfigTest {
         defaultValue.put(GroupConfig.SHARE_PARTITION_MAX_RECORD_LOCKS_CONFIG, 
"500");
         defaultValue.put(GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG, "latest");
         defaultValue.put(GroupConfig.SHARE_ISOLATION_LEVEL_CONFIG, 
"read_uncommitted");
+        defaultValue.put(GroupConfig.SHARE_ASSIGNMENT_INTERVAL_MS_CONFIG, 
"2500");
+        defaultValue.put(GroupConfig.SHARE_ASSIGNOR_OFFLOAD_ENABLE_CONFIG, 
"false");
         defaultValue.put(GroupConfig.STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG, 
"10");
         defaultValue.put(GroupConfig.STREAMS_SESSION_TIMEOUT_MS_CONFIG, 
"2000");
         defaultValue.put(GroupConfig.STREAMS_NUM_STANDBY_REPLICAS_CONFIG, "1");
         
defaultValue.put(GroupConfig.STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG, "3000");
+        defaultValue.put(GroupConfig.STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG, 
"1250");
+        defaultValue.put(GroupConfig.STREAMS_ASSIGNOR_OFFLOAD_ENABLE_CONFIG, 
"false");
         defaultValue.put(GroupConfig.STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG, 
"30000");
         defaultValue.put(GroupConfig.SHARE_RENEW_ACKNOWLEDGE_ENABLE_CONFIG, 
"true");
 
@@ -317,6 +370,8 @@ public class GroupConfigTest {
 
         assertEquals(10, 
config.getInt(GroupConfig.CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG));
         assertEquals(20, 
config.getInt(GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG));
+        assertEquals(5000, 
config.getInt(GroupConfig.CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG));
+        assertEquals(false, 
config.getBoolean(GroupConfig.CONSUMER_ASSIGNOR_OFFLOAD_ENABLE_CONFIG));
         assertEquals(10, 
config.getInt(GroupConfig.SHARE_HEARTBEAT_INTERVAL_MS_CONFIG));
         assertEquals(10, 
config.getInt(GroupConfig.SHARE_SESSION_TIMEOUT_MS_CONFIG));
         assertEquals(2000, 
config.getInt(GroupConfig.SHARE_RECORD_LOCK_DURATION_MS_CONFIG));
@@ -324,14 +379,52 @@ public class GroupConfigTest {
         assertEquals(500, 
config.getInt(GroupConfig.SHARE_PARTITION_MAX_RECORD_LOCKS_CONFIG));
         assertEquals("latest", 
config.getString(GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG));
         assertEquals("read_uncommitted", 
config.getString(GroupConfig.SHARE_ISOLATION_LEVEL_CONFIG));
+        assertEquals(2500, 
config.getInt(GroupConfig.SHARE_ASSIGNMENT_INTERVAL_MS_CONFIG));
+        assertEquals(false, 
config.getBoolean(GroupConfig.SHARE_ASSIGNOR_OFFLOAD_ENABLE_CONFIG));
         assertEquals(10, 
config.getInt(GroupConfig.STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG));
         assertEquals(2000, 
config.getInt(GroupConfig.STREAMS_SESSION_TIMEOUT_MS_CONFIG));
         assertEquals(1, 
config.getInt(GroupConfig.STREAMS_NUM_STANDBY_REPLICAS_CONFIG));
         assertEquals(3000, 
config.getInt(GroupConfig.STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG));
+        assertEquals(1250, 
config.getInt(GroupConfig.STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG));
+        assertEquals(false, 
config.getBoolean(GroupConfig.STREAMS_ASSIGNOR_OFFLOAD_ENABLE_CONFIG));
         assertEquals(30000, 
config.getInt(GroupConfig.STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG));
         assertEquals(true, 
config.getBoolean(GroupConfig.SHARE_RENEW_ACKNOWLEDGE_ENABLE_CONFIG));
     }
 
+    @Test
+    public void testAssignmentIntervalMsAbsentWhenNotConfigured() {
+        // When the assignment interval config is absent, the group-level 
value is empty.
+        Properties props = new Properties();
+        GroupConfig config = GroupConfig.fromProps(Map.of(), props);
+        assertEquals(Optional.empty(), config.consumerAssignmentIntervalMs());
+        assertEquals(Optional.empty(), config.shareAssignmentIntervalMs());
+        assertEquals(Optional.empty(), config.streamsAssignmentIntervalMs());
+    }
+
+    @Test
+    public void testAssignmentIntervalMsNotValidatedWhenNotConfigured() {
+        // When the assignment interval config is absent, validation should 
not use the default assignment interval.
+        GroupCoordinatorConfig groupCoordinatorConfig = 
GroupCoordinatorConfig.fromProps(Map.of(
+            
GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, 2000,
+            GroupCoordinatorConfig.SHARE_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, 
2000,
+            
GroupCoordinatorConfig.STREAMS_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, 2000,
+            
GroupCoordinatorConfig.CONSUMER_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG, 2000,
+            
GroupCoordinatorConfig.SHARE_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG, 2000,
+            
GroupCoordinatorConfig.STREAMS_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG, 2000
+        ));
+        assertDoesNotThrow(() -> GroupConfig.validate(Map.of(), 
groupCoordinatorConfig, createShareGroupConfig()));
+    }
+
+    @Test
+    public void testAssignorOffloadEnableAbsentWhenNotConfigured() {
+        // When the offload enable config is absent, the group-level value is 
empty.
+        Properties props = new Properties();
+        GroupConfig config = GroupConfig.fromProps(Map.of(), props);
+        assertEquals(Optional.empty(), config.consumerAssignorOffloadEnable());
+        assertEquals(Optional.empty(), config.shareAssignorOffloadEnable());
+        assertEquals(Optional.empty(), config.streamsAssignorOffloadEnable());
+    }
+
     @Test
     public void testInvalidConfigName() {
         Map<String, String> props = new HashMap<>();
@@ -398,6 +491,11 @@ public class GroupConfigTest {
                 3000, CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_DEFAULT,
                 20000, CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DEFAULT
             ),
+            Arguments.of(
+                GroupConfig.CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG,
+                0, /* CONSUMER_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG = */ 
500,
+                20000, CONSUMER_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_DEFAULT
+            ),
             // Share group configs
             Arguments.of(
                 GroupConfig.SHARE_SESSION_TIMEOUT_MS_CONFIG,
@@ -424,6 +522,11 @@ public class GroupConfigTest {
                 50, SHARE_GROUP_MIN_PARTITION_MAX_RECORD_LOCKS_DEFAULT,
                 5000, SHARE_GROUP_MAX_PARTITION_MAX_RECORD_LOCKS_DEFAULT
             ),
+            Arguments.of(
+                GroupConfig.SHARE_ASSIGNMENT_INTERVAL_MS_CONFIG,
+                0, /* SHARE_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG = */ 500,
+                20000, SHARE_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_DEFAULT
+            ),
             // Streams group configs
             Arguments.of(
                 GroupConfig.STREAMS_SESSION_TIMEOUT_MS_CONFIG,
@@ -434,6 +537,11 @@ public class GroupConfigTest {
                 GroupConfig.STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG,
                 3000, STREAMS_GROUP_MIN_HEARTBEAT_INTERVAL_MS_DEFAULT,
                 20000, STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DEFAULT
+            ),
+            Arguments.of(
+                GroupConfig.STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG,
+                0, /* STREAMS_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG = */ 500,
+                20000, STREAMS_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_DEFAULT
             )
         );
     }
@@ -492,7 +600,12 @@ public class GroupConfigTest {
         Properties props = new Properties();
         props.put(key, tooLow);
         Properties result = GroupConfig.evaluate(props, "test-group",
-            GroupCoordinatorConfig.fromProps(new HashMap<>()), 
ShareGroupConfig.fromProps(new HashMap<>()));
+            GroupCoordinatorConfig.fromProps(Map.of(
+                
GroupCoordinatorConfig.CONSUMER_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG, 500,
+                
GroupCoordinatorConfig.SHARE_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG, 500,
+                
GroupCoordinatorConfig.STREAMS_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG, 500
+            )),
+            ShareGroupConfig.fromProps(Map.of()));
         assertEquals(expectedMin, result.get(key));
     }
 
@@ -545,7 +658,16 @@ public class GroupConfigTest {
     }
 
     private GroupCoordinatorConfig createGroupCoordinatorConfig() {
-        return 
GroupCoordinatorConfigTest.createGroupCoordinatorConfig(OFFSET_METADATA_MAX_SIZE,
 OFFSETS_RETENTION_CHECK_INTERVAL_MS, OFFSETS_RETENTION_MINUTES);
+        return GroupCoordinatorConfigTest.createGroupCoordinatorConfig(
+            OFFSET_METADATA_MAX_SIZE,
+            OFFSETS_RETENTION_CHECK_INTERVAL_MS,
+            OFFSETS_RETENTION_MINUTES,
+            Map.of(
+                
GroupCoordinatorConfig.CONSUMER_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG, 1000,
+                
GroupCoordinatorConfig.SHARE_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG, 1000,
+                
GroupCoordinatorConfig.STREAMS_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG, 1000
+            )
+        );
     }
 
     private ShareGroupConfig createShareGroupConfig() {
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
index 418b57706fc..a1eff74238f 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
@@ -626,6 +626,20 @@ public class GroupCoordinatorConfigTest {
         int offsetMetadataMaxSize,
         long offsetsRetentionCheckIntervalMs,
         int offsetsRetentionMinutes
+    ) {
+        return createGroupCoordinatorConfig(
+            offsetMetadataMaxSize,
+            offsetsRetentionCheckIntervalMs,
+            offsetsRetentionMinutes,
+            Map.of()
+        );
+    }
+
+    public static GroupCoordinatorConfig createGroupCoordinatorConfig(
+        int offsetMetadataMaxSize,
+        long offsetsRetentionCheckIntervalMs,
+        int offsetsRetentionMinutes,
+        Map<String, Object> additionalConfigs
     ) {
         Map<String, Object> configs = new HashMap<>();
         
configs.put(GroupCoordinatorConfig.GROUP_COORDINATOR_NUM_THREADS_CONFIG, 1);
@@ -654,6 +668,8 @@ public class GroupCoordinatorConfigTest {
         configs.put(GroupCoordinatorConfig.SHARE_GROUP_MAX_SIZE_CONFIG, 1000);
         configs.put(GroupCoordinatorConfig.CACHED_BUFFER_MAX_BYTES_CONFIG, 
1024 * 1024);
 
+        configs.putAll(additionalConfigs);
+
         return createConfig(configs);
     }
 
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index 02248044e5a..998c597c850 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -20,6 +20,7 @@ import 
org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
 import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.errors.CoordinatorNotAvailableException;
 import org.apache.kafka.common.errors.FencedInstanceIdException;
 import org.apache.kafka.common.errors.FencedMemberEpochException;
@@ -87,6 +88,7 @@ import 
org.apache.kafka.coordinator.common.runtime.KRaftCoordinatorMetadataDelta
 import 
org.apache.kafka.coordinator.common.runtime.KRaftCoordinatorMetadataImage;
 import org.apache.kafka.coordinator.common.runtime.MetadataImageBuilder;
 import org.apache.kafka.coordinator.common.runtime.MockCoordinatorExecutor;
+import org.apache.kafka.coordinator.common.runtime.MockCoordinatorTimer;
 import 
org.apache.kafka.coordinator.common.runtime.MockCoordinatorTimer.ExpiredTimeout;
 import 
org.apache.kafka.coordinator.common.runtime.MockCoordinatorTimer.ScheduledTimeout;
 import 
org.apache.kafka.coordinator.group.api.assignor.ConsumerGroupPartitionAssignor;
@@ -116,6 +118,7 @@ import 
org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataVa
 import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataKey;
 import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataValue;
 import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue;
+import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard;
 import org.apache.kafka.coordinator.group.modern.Assignment;
 import org.apache.kafka.coordinator.group.modern.MemberAssignmentImpl;
 import org.apache.kafka.coordinator.group.modern.MemberState;
@@ -127,6 +130,7 @@ import 
org.apache.kafka.coordinator.group.modern.consumer.ResolvedRegularExpress
 import org.apache.kafka.coordinator.group.modern.share.ShareGroup;
 import org.apache.kafka.coordinator.group.modern.share.ShareGroup.InitMapValue;
 import org.apache.kafka.coordinator.group.modern.share.ShareGroupBuilder;
+import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig;
 import org.apache.kafka.coordinator.group.modern.share.ShareGroupMember;
 import org.apache.kafka.coordinator.group.streams.MockTaskAssignor;
 import 
org.apache.kafka.coordinator.group.streams.StreamsCoordinatorRecordHelpers;
@@ -177,6 +181,7 @@ import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
+import java.util.function.BiFunction;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
@@ -21155,6 +21160,125 @@ public class GroupMetadataManagerTest {
         context.assertNoRebalanceTimeout(groupId, memberId);
     }
 
+    @Test
+    public void testDynamicBrokerAndGroupConfigs() {
+        testDynamicBrokerAndGroupConfig(
+            GroupMetadataManager::consumerGroupAssignmentIntervalMs,
+            
GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG,
+            GroupConfig.CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG,
+            2000, 1500, 1000, 500
+        );
+        testDynamicBrokerAndGroupConfig(
+            GroupMetadataManager::consumerGroupAssignorOffloadEnable,
+            
GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG,
+            GroupConfig.CONSUMER_ASSIGNOR_OFFLOAD_ENABLE_CONFIG,
+            true, false, true, false
+        );
+        testDynamicBrokerAndGroupConfig(
+            GroupMetadataManager::shareGroupAssignmentIntervalMs,
+            GroupCoordinatorConfig.SHARE_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG,
+            GroupConfig.SHARE_ASSIGNMENT_INTERVAL_MS_CONFIG,
+            2000, 1500, 1000, 500
+        );
+        testDynamicBrokerAndGroupConfig(
+            GroupMetadataManager::shareGroupAssignorOffloadEnable,
+            GroupCoordinatorConfig.SHARE_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG,
+            GroupConfig.SHARE_ASSIGNOR_OFFLOAD_ENABLE_CONFIG,
+            true, false, true, false
+        );
+        testDynamicBrokerAndGroupConfig(
+            GroupMetadataManager::streamsGroupAssignmentIntervalMs,
+            GroupCoordinatorConfig.STREAMS_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG,
+            GroupConfig.STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG,
+            2000, 1500, 1000, 500
+        );
+        testDynamicBrokerAndGroupConfig(
+            GroupMetadataManager::streamsGroupAssignorOffloadEnable,
+            
GroupCoordinatorConfig.STREAMS_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG,
+            GroupConfig.STREAMS_ASSIGNOR_OFFLOAD_ENABLE_CONFIG,
+            true, false, true, false
+        );
+    }
+
+    private <V> void testDynamicBrokerAndGroupConfig(
+        BiFunction<GroupMetadataManager, String, V> getValue,
+        String brokerConfigKey,
+        String groupConfigKey,
+        V initial,
+        V brokerOverride1,
+        V brokerOverride2,
+        V groupOverride
+    ) {
+        class DynamicConfig extends AbstractConfig {
+            private final Map<String, Object> overrides = new HashMap<>();
+
+            DynamicConfig(Map<?, ?> props) {
+                super(
+                    Utils.mergeConfigs(List.of(
+                        GroupCoordinatorConfig.CONFIG_DEF,
+                        ShareGroupConfig.CONFIG_DEF
+                    )),
+                    props,
+                    false
+                );
+            }
+
+            @Override
+            protected Object get(String key) {
+                return overrides.getOrDefault(key, super.get(key));
+            }
+
+            void put(String key, Object value) {
+                overrides.put(key, value);
+            }
+        }
+
+        MockTime time = new MockTime(0, 0, 0);
+        DynamicConfig kafkaConfig = new DynamicConfig(Map.of(brokerConfigKey, 
initial));
+        GroupCoordinatorConfig groupCoordinatorConfig = new 
GroupCoordinatorConfig(kafkaConfig);
+        ShareGroupConfig shareGroupConfig = new ShareGroupConfig(kafkaConfig);
+        GroupConfigManager groupConfigManager = new GroupConfigManager(
+            groupCoordinatorConfig.extractGroupConfigMap(shareGroupConfig),
+            groupCoordinatorConfig,
+            shareGroupConfig
+        );
+        GroupMetadataManager groupMetadataManager = new 
GroupMetadataManager.Builder()
+            .withTime(time)
+            .withTimer(new MockCoordinatorTimer<>(time))
+            .withExecutor(new MockCoordinatorExecutor<>())
+            .withConfig(groupCoordinatorConfig)
+            
.withGroupCoordinatorMetricsShard(mock(GroupCoordinatorMetricsShard.class))
+            .withGroupConfigManager(groupConfigManager)
+            .build();
+
+        String groupId = "test-group";
+        assertEquals(initial, getValue.apply(groupMetadataManager, groupId));
+
+        // Set broker-level override.
+        kafkaConfig.put(brokerConfigKey, brokerOverride1);
+        assertEquals(brokerOverride1, getValue.apply(groupMetadataManager, 
groupId));
+
+        // Create a group config entry.
+        Properties groupConfig = new Properties();
+        groupConfig.put(GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG, 2000);
+        groupConfigManager.updateGroupConfig(groupId, groupConfig);
+
+        // Check that broker-level overrides still work. The group config must 
not bake in the value.
+        assertEquals(brokerOverride1, getValue.apply(groupMetadataManager, 
groupId));
+        kafkaConfig.put(brokerConfigKey, brokerOverride2);
+        assertEquals(brokerOverride2, getValue.apply(groupMetadataManager, 
groupId));
+
+        // Set group-level override.
+        groupConfig.put(groupConfigKey, String.valueOf(groupOverride));
+        groupConfigManager.updateGroupConfig(groupId, groupConfig);
+        assertEquals(groupOverride, getValue.apply(groupMetadataManager, 
groupId));
+
+        // Remove group-level override.
+        groupConfig.remove(groupConfigKey);
+        groupConfigManager.updateGroupConfig(groupId, groupConfig);
+        assertEquals(brokerOverride2, getValue.apply(groupMetadataManager, 
groupId));
+    }
+
     @Test
     public void testConsumerGroupEvaluatedConfigs() {
         String groupId = "fooup";

Reply via email to