This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e4cd243abca KAFKA-20337 Make all GroupConfig fields Optional and clean 
up validation (#22003)
e4cd243abca is described below

commit e4cd243abca0db8b2de1ef0862ebf94a7b9c8f5a
Author: David Jacot <[email protected]>
AuthorDate: Thu Apr 9 18:58:26 2026 +0200

    KAFKA-20337 Make all GroupConfig fields Optional and clean up validation 
(#22003)
    
    All GroupConfig fields are now Optional<T>, storing only explicitly
    provided values. Broker-level defaults are resolved at access time via
    flatMap().orElse(brokerDefault), eliminating stale-capture issues when
    broker configs change dynamically.
    
    Key changes:
    - All 21 GroupConfig fields are private Optional, using
    optionalInt/Boolean/String helpers based on originals().
    - GroupConfigManager no longer needs a defaultConfig; constructor
    simplified.
    - GroupCoordinatorConfig.extractGroupConfigMap(ShareGroupConfig)
    removed.
    - All consumers (GroupMetadataManager, ShareGroupConfigProvider,
    KafkaApis) use flatMap.
    - validateValues refactored with validateIntRange/Max/Min helpers
    operating on a single filtered parsed map.
    - Cross-field checks use broker defaults for missing values.
    
    Reviewers: Sean Quah <[email protected]>, Chia-Ping Tsai
     <[email protected]>
    
    Co-authored-by: Claude Opus 4.6 (1M context) <[email protected]>
---
 .../src/main/scala/kafka/server/BrokerServer.scala |   2 +-
 core/src/main/scala/kafka/server/KafkaApis.scala   |   9 +-
 .../kafka/server/share/SharePartitionTest.java     |  36 +-
 .../kafka/api/PlaintextAdminIntegrationTest.scala  |   4 +-
 .../kafka/server/DynamicConfigChangeTest.scala     |   5 +-
 .../kafka/coordinator/group/GroupConfig.java       | 749 ++++++++++++---------
 .../coordinator/group/GroupConfigManager.java      |   9 +-
 .../coordinator/group/GroupCoordinatorConfig.java  |  51 --
 .../coordinator/group/GroupMetadataManager.java    |  18 +-
 .../modern/share/ShareGroupConfigProvider.java     |  10 +-
 .../coordinator/group/GroupConfigManagerTest.java  |   4 +-
 .../kafka/coordinator/group/GroupConfigTest.java   | 102 ++-
 .../group/GroupMetadataManagerTest.java            |   1 -
 .../modern/share/ShareGroupConfigProviderTest.java |  10 +-
 .../GroupCoordinatorShardLoadingBenchmark.java     |   2 +-
 15 files changed, 574 insertions(+), 438 deletions(-)

diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala 
b/core/src/main/scala/kafka/server/BrokerServer.scala
index cf43e9ef1ca..102981827ce 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -375,7 +375,7 @@ class BrokerServer(
       authorizerPlugin = config.createNewAuthorizer(metrics, 
ProcessRole.BrokerRole.toString)
 
       /* initializing the groupConfigManager */
-      groupConfigManager = new 
GroupConfigManager(config.groupCoordinatorConfig.extractGroupConfigMap(config.shareGroupConfig),
 config.groupCoordinatorConfig, config.shareGroupConfig)
+      groupConfigManager = new 
GroupConfigManager(config.groupCoordinatorConfig, config.shareGroupConfig)
 
       /* create share coordinator */
       shareCoordinator = createShareCoordinator()
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 144696be434..9e923563ad0 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -2827,9 +2827,10 @@ class KafkaApis(val requestChannel: RequestChannel,
               }
             } else {
               // Compute group-specific timeout for caching errors (2 * 
heartbeat interval)
-              val heartbeatIntervalMs = 
Option(groupConfigManager.groupConfig(streamsGroupHeartbeatRequest.data.groupId).orElse(null))
-                .map(_.streamsHeartbeatIntervalMs().toLong)
-                
.getOrElse(config.groupCoordinatorConfig.streamsGroupHeartbeatIntervalMs().toLong)
+              val heartbeatIntervalMs = 
groupConfigManager.groupConfig(streamsGroupHeartbeatRequest.data.groupId)
+                .flatMap[java.lang.Integer](gc => 
gc.streamsHeartbeatIntervalMs())
+                .orElseGet(() => 
config.groupCoordinatorConfig.streamsGroupHeartbeatIntervalMs())
+                .toLong
               val timeoutMs = heartbeatIntervalMs * 2
 
               
autoTopicCreationManager.createStreamsInternalTopics(topicsToCreate, 
requestContext, timeoutMs)
@@ -3460,7 +3461,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         shareFetchRequest.maxWait,
         fetchMinBytes,
         fetchMaxBytes,
-        FetchIsolation.of(FetchRequest.CONSUMER_REPLICA_ID, 
groupConfigManager.groupConfig(groupId).map(_.shareIsolationLevel()).orElse(GroupConfig.defaultShareIsolationLevel)),
+        FetchIsolation.of(FetchRequest.CONSUMER_REPLICA_ID, 
groupConfigManager.groupConfig(groupId).flatMap(_.shareIsolationLevel()).orElse(GroupConfig.defaultShareIsolationLevel)),
         clientMetadata,
         true
       )
diff --git a/core/src/test/java/kafka/server/share/SharePartitionTest.java 
b/core/src/test/java/kafka/server/share/SharePartitionTest.java
index 285de56397f..24d37dc0700 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionTest.java
@@ -219,7 +219,7 @@ public class SharePartitionTest {
         GroupConfigManager groupConfigManager = 
Mockito.mock(GroupConfigManager.class);
         GroupConfig groupConfig = Mockito.mock(GroupConfig.class);
         
Mockito.when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.of(groupConfig));
-        
Mockito.when(groupConfig.shareAutoOffsetReset()).thenReturn(ShareGroupAutoOffsetResetStrategy.EARLIEST);
+        
Mockito.when(groupConfig.shareAutoOffsetReset()).thenReturn(Optional.of(ShareGroupAutoOffsetResetStrategy.EARLIEST));
 
         ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class);
 
@@ -270,7 +270,7 @@ public class SharePartitionTest {
         GroupConfigManager groupConfigManager = 
Mockito.mock(GroupConfigManager.class);
         GroupConfig groupConfig = Mockito.mock(GroupConfig.class);
         
Mockito.when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.of(groupConfig));
-        
Mockito.when(groupConfig.shareAutoOffsetReset()).thenReturn(ShareGroupAutoOffsetResetStrategy.LATEST);
+        
Mockito.when(groupConfig.shareAutoOffsetReset()).thenReturn(Optional.of(ShareGroupAutoOffsetResetStrategy.LATEST));
 
         ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class);
 
@@ -330,7 +330,7 @@ public class SharePartitionTest {
         
Mockito.when(resetStrategy.type()).thenReturn(ShareGroupAutoOffsetResetStrategy.StrategyType.BY_DURATION);
         Mockito.when(resetStrategy.timestamp()).thenReturn(expectedTimestamp);
 
-        
Mockito.when(groupConfig.shareAutoOffsetReset()).thenReturn(resetStrategy);
+        
Mockito.when(groupConfig.shareAutoOffsetReset()).thenReturn(Optional.of(resetStrategy));
 
         ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class);
 
@@ -480,7 +480,7 @@ public class SharePartitionTest {
         GroupConfigManager groupConfigManager = 
Mockito.mock(GroupConfigManager.class);
         GroupConfig groupConfig = Mockito.mock(GroupConfig.class);
         
Mockito.when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.of(groupConfig));
-        
Mockito.when(groupConfig.shareAutoOffsetReset()).thenReturn(ShareGroupAutoOffsetResetStrategy.EARLIEST);
+        
Mockito.when(groupConfig.shareAutoOffsetReset()).thenReturn(Optional.of(ShareGroupAutoOffsetResetStrategy.EARLIEST));
 
         ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class);
 
@@ -531,7 +531,7 @@ public class SharePartitionTest {
         // final ShareGroupAutoOffsetResetStrategy resetStrategy = 
ShareGroupAutoOffsetResetStrategy.fromString("by_duration:PT1H");
         final ShareGroupAutoOffsetResetStrategy resetStrategy = 
Mockito.mock(ShareGroupAutoOffsetResetStrategy.class);
         final long expectedTimestamp = MOCK_TIME.milliseconds() - 
TimeUnit.HOURS.toMillis(1);
-        
Mockito.when(groupConfig.shareAutoOffsetReset()).thenReturn(resetStrategy);
+        
Mockito.when(groupConfig.shareAutoOffsetReset()).thenReturn(Optional.of(resetStrategy));
 
         
Mockito.when(resetStrategy.type()).thenReturn(ShareGroupAutoOffsetResetStrategy.StrategyType.BY_DURATION);
         Mockito.when(resetStrategy.timestamp()).thenReturn(expectedTimestamp);
@@ -7151,7 +7151,7 @@ public class SharePartitionTest {
         GroupConfig groupConfig = Mockito.mock(GroupConfig.class);
         int expectedDurationMs = 500;
         
Mockito.when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.of(groupConfig));
-        
Mockito.when(groupConfig.shareRecordLockDurationMs()).thenReturn(expectedDurationMs);
+        
Mockito.when(groupConfig.shareRecordLockDurationMs()).thenReturn(Optional.of(expectedDurationMs));
 
         SharePartition sharePartition = SharePartitionBuilder.builder()
             .withConfigProvider(new 
ShareGroupConfigProvider(groupConfigManager)).build();
@@ -7172,8 +7172,8 @@ public class SharePartitionTest {
         
Mockito.when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.of(groupConfig));
         // First invocation of shareRecordLockDurationMs() returns 500, and 
the second invocation returns 1000
         Mockito.when(groupConfig.shareRecordLockDurationMs())
-            .thenReturn(expectedDurationMs1)
-            .thenReturn(expectedDurationMs2);
+            .thenReturn(Optional.of(expectedDurationMs1))
+            .thenReturn(Optional.of(expectedDurationMs2));
 
         SharePartition sharePartition = SharePartitionBuilder.builder()
             .withConfigProvider(new 
ShareGroupConfigProvider(groupConfigManager)).build();
@@ -12324,7 +12324,7 @@ public class SharePartitionTest {
     public void testMaxDeliveryCountUsesGroupConfigWhenPresent() {
         GroupConfigManager groupConfigManager = 
Mockito.mock(GroupConfigManager.class);
         GroupConfig groupConfig = Mockito.mock(GroupConfig.class);
-        when(groupConfig.shareDeliveryCountLimit()).thenReturn(8);
+        when(groupConfig.shareDeliveryCountLimit()).thenReturn(Optional.of(8));
         
when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.of(groupConfig));
 
         SharePartition sharePartition = SharePartitionBuilder.builder()
@@ -12376,7 +12376,7 @@ public class SharePartitionTest {
 
         // Dynamically decrease the limit to 2 via group config BEFORE 
releasing.
         GroupConfig groupConfig = Mockito.mock(GroupConfig.class);
-        when(groupConfig.shareDeliveryCountLimit()).thenReturn(2);
+        when(groupConfig.shareDeliveryCountLimit()).thenReturn(Optional.of(2));
         
when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.of(groupConfig));
 
         // Release: archival check fires because deliveryCount(2) >= 
maxDeliveryCount(2),
@@ -12412,7 +12412,7 @@ public class SharePartitionTest {
 
         // Now increase limit to 10 via group config before the second acquire.
         GroupConfig groupConfig = Mockito.mock(GroupConfig.class);
-        when(groupConfig.shareDeliveryCountLimit()).thenReturn(10);
+        
when(groupConfig.shareDeliveryCountLimit()).thenReturn(Optional.of(10));
         
when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.of(groupConfig));
 
         // Second acquire: deliveryCount = 2. With old limit (2) this would 
archive.
@@ -12432,7 +12432,7 @@ public class SharePartitionTest {
     public void testMaxInFlightRecordsUsesGroupConfigWhenPresent() {
         GroupConfigManager groupConfigManager = 
Mockito.mock(GroupConfigManager.class);
         GroupConfig groupConfig = Mockito.mock(GroupConfig.class);
-        when(groupConfig.sharePartitionMaxRecordLocks()).thenReturn(5000);
+        
when(groupConfig.sharePartitionMaxRecordLocks()).thenReturn(Optional.of(5000));
         
when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.of(groupConfig));
 
         SharePartition sharePartition = SharePartitionBuilder.builder()
@@ -12477,7 +12477,7 @@ public class SharePartitionTest {
 
         // Dynamically decrease the limit to 30 via group config.
         GroupConfig groupConfig = Mockito.mock(GroupConfig.class);
-        when(groupConfig.sharePartitionMaxRecordLocks()).thenReturn(30);
+        
when(groupConfig.sharePartitionMaxRecordLocks()).thenReturn(Optional.of(30));
         
when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.of(groupConfig));
 
         // The effective limit should now be 30.
@@ -12510,7 +12510,7 @@ public class SharePartitionTest {
 
         // Increase limit to 500 via group config.
         GroupConfig groupConfig = Mockito.mock(GroupConfig.class);
-        when(groupConfig.sharePartitionMaxRecordLocks()).thenReturn(500);
+        
when(groupConfig.sharePartitionMaxRecordLocks()).thenReturn(Optional.of(500));
         
when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.of(groupConfig));
 
         assertEquals(500, sharePartition.maxInFlightRecords());
@@ -12540,14 +12540,14 @@ public class SharePartitionTest {
 
         // Dynamically set limit to exactly the in-flight count via group 
config.
         GroupConfig groupConfig = Mockito.mock(GroupConfig.class);
-        when(groupConfig.sharePartitionMaxRecordLocks()).thenReturn(50);
+        
when(groupConfig.sharePartitionMaxRecordLocks()).thenReturn(Optional.of(50));
         
when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.of(groupConfig));
 
         // Still at boundary: 50 < 50 is false.
         assertFalse(sharePartition.canAcquireRecords());
 
         // Increase by 1 to cross the boundary.
-        when(groupConfig.sharePartitionMaxRecordLocks()).thenReturn(51);
+        
when(groupConfig.sharePartitionMaxRecordLocks()).thenReturn(Optional.of(51));
 
         // Now 50 < 51 is true.
         assertTrue(sharePartition.canAcquireRecords());
@@ -12557,7 +12557,7 @@ public class SharePartitionTest {
     public void testDynamicPartitionMaxRecordLocksRemoveGroupConfig() {
         GroupConfigManager groupConfigManager = 
Mockito.mock(GroupConfigManager.class);
         GroupConfig groupConfig = Mockito.mock(GroupConfig.class);
-        when(groupConfig.sharePartitionMaxRecordLocks()).thenReturn(500);
+        
when(groupConfig.sharePartitionMaxRecordLocks()).thenReturn(Optional.of(500));
         
when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.of(groupConfig));
 
         SharePartition sharePartition = SharePartitionBuilder.builder()
@@ -12591,7 +12591,7 @@ public class SharePartitionTest {
 
         // Decrease limit to 20, well below the 50 in-flight.
         GroupConfig groupConfig = Mockito.mock(GroupConfig.class);
-        when(groupConfig.sharePartitionMaxRecordLocks()).thenReturn(20);
+        
when(groupConfig.sharePartitionMaxRecordLocks()).thenReturn(Optional.of(20));
         
when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.of(groupConfig));
 
         // maxInFlightRecords - inFlightRecordsCount = 20 - 50 = -30, so 
maxRecordsToAcquire <= 0.
diff --git 
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
index a9c23a0bc08..97adc98cee4 100644
--- 
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
@@ -1176,7 +1176,7 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     var configs = describeResult.all.get(15, TimeUnit.SECONDS)
     assertEquals("55000", 
configs.get(groupResource).get(GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG).value)
     // Before restart, 55000 is within [45000, 60000], so no adjustment needed
-    assertEquals(55000, 
brokerServers.head.groupConfigManager.groupConfig(groupId).get.consumerSessionTimeoutMs)
+    assertEquals(Optional.of(55000), 
brokerServers.head.groupConfigManager.groupConfig(groupId).get.consumerSessionTimeoutMs)
 
     // Kill all brokers
     client.close()
@@ -1200,7 +1200,7 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
       
configs.get(groupResource).get(GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG).source)
 
     // Verify effective value is adjusted (55000 evaluated to new max 50000)
-    assertEquals(50000, 
brokerServers.head.groupConfigManager.groupConfig(groupId).get.consumerSessionTimeoutMs)
+    assertEquals(Optional.of(50000), 
brokerServers.head.groupConfigManager.groupConfig(groupId).get.consumerSessionTimeoutMs)
   }
 
   @Test
diff --git 
a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala 
b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
index d7231fbe7d7..0f510d1a06a 100644
--- a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
@@ -31,6 +31,7 @@ import 
org.apache.kafka.common.quota.ClientQuotaEntity.{CLIENT_ID, IP, USER}
 import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity}
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.common.{TopicPartition, Uuid}
+import java.util.Optional
 import org.apache.kafka.coordinator.group.{GroupConfig, GroupCoordinatorConfig}
 import org.apache.kafka.coordinator.share.ShareCoordinatorConfig
 import org.apache.kafka.metadata.MetadataCache
@@ -419,7 +420,7 @@ class DynamicConfigChangeTest extends 
KafkaServerTestHarness {
     }
 
     val groupConfig = 
brokerServers.head.groupCoordinator.groupConfig(consumerGroupId).get()
-    assertEquals(newSessionTimeoutMs, groupConfig.consumerSessionTimeoutMs())
+    assertEquals(Optional.of(newSessionTimeoutMs), 
groupConfig.consumerSessionTimeoutMs())
   }
 
   @Test
@@ -445,7 +446,7 @@ class DynamicConfigChangeTest extends 
KafkaServerTestHarness {
     }
 
     val groupConfig = 
brokerServers.head.groupCoordinator.groupConfig(shareGroupId).get()
-    assertEquals(newRecordLockDurationMs, 
groupConfig.shareRecordLockDurationMs)
+    assertEquals(Optional.of(newRecordLockDurationMs), 
groupConfig.shareRecordLockDurationMs)
   }
 
   @Test
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
index 0a6841ec324..07247bf76ab 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
@@ -27,7 +27,6 @@ import 
org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.HashMap;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Optional;
@@ -47,7 +46,7 @@ import static 
org.apache.kafka.common.config.ConfigDef.ValidString.in;
  */
 public final class GroupConfig extends AbstractConfig {
 
-    private static final Logger log = 
LoggerFactory.getLogger(GroupConfig.class);
+    private static final Logger LOG = 
LoggerFactory.getLogger(GroupConfig.class);
 
     public static final String CONSUMER_SESSION_TIMEOUT_MS_CONFIG = 
"consumer.session.timeout.ms";
 
@@ -106,56 +105,47 @@ public final class GroupConfig extends AbstractConfig {
 
     public static final String STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG = 
"streams.task.offset.interval.ms";
 
-    public final int consumerSessionTimeoutMs;
+    private final Optional<Integer> consumerSessionTimeoutMs;
 
-    public final int consumerHeartbeatIntervalMs;
+    private final Optional<Integer> consumerHeartbeatIntervalMs;
 
-    // These have to be optionals because their default group coordinator 
configs are dynamic,
-    // so we must not capture the default value at the time of GroupConfig 
construction.
-    // KAFKA-20337 tracks the work to refactor GroupConfig into something more 
consistent.
-    public final Optional<Integer> consumerAssignmentIntervalMs;
+    private final Optional<Integer> consumerAssignmentIntervalMs;
 
-    public final Optional<Boolean> consumerAssignorOffloadEnable;
+    private final Optional<Boolean> consumerAssignorOffloadEnable;
 
-    public final int shareSessionTimeoutMs;
+    private final Optional<Integer> shareSessionTimeoutMs;
 
-    public final int shareHeartbeatIntervalMs;
+    private final Optional<Integer> shareHeartbeatIntervalMs;
 
-    public final int shareRecordLockDurationMs;
+    private final Optional<Integer> shareRecordLockDurationMs;
 
-    public final int shareDeliveryCountLimit;
+    private final Optional<Integer> shareDeliveryCountLimit;
 
-    public final int sharePartitionMaxRecordLocks;
+    private final Optional<Integer> sharePartitionMaxRecordLocks;
 
-    public final String shareAutoOffsetReset;
+    private final Optional<ShareGroupAutoOffsetResetStrategy> 
shareAutoOffsetReset;
 
-    // These have to be optionals because their default group coordinator 
configs are dynamic,
-    // so we must not capture the default value at the time of GroupConfig 
construction.
-    // KAFKA-20337 tracks the work to refactor GroupConfig into something more 
consistent.
-    public final Optional<Integer> shareAssignmentIntervalMs;
+    private final Optional<Integer> shareAssignmentIntervalMs;
 
-    public final Optional<Boolean> shareAssignorOffloadEnable;
+    private final Optional<Boolean> shareAssignorOffloadEnable;
 
-    public final int streamsSessionTimeoutMs;
+    private final Optional<Integer> streamsSessionTimeoutMs;
 
-    public final int streamsHeartbeatIntervalMs;
+    private final Optional<Integer> streamsHeartbeatIntervalMs;
 
-    public final int streamsNumStandbyReplicas;
+    private final Optional<Integer> streamsNumStandbyReplicas;
 
-    public final int streamsInitialRebalanceDelayMs;
+    private final Optional<Integer> streamsInitialRebalanceDelayMs;
 
-    // These have to be optionals because their default group coordinator 
configs are dynamic,
-    // so we must not capture the default value at the time of GroupConfig 
construction.
-    // KAFKA-20337 tracks the work to refactor GroupConfig into something more 
consistent.
-    public final Optional<Integer> streamsAssignmentIntervalMs;
+    private final Optional<Integer> streamsAssignmentIntervalMs;
 
-    public final Optional<Boolean> streamsAssignorOffloadEnable;
+    private final Optional<Boolean> streamsAssignorOffloadEnable;
 
-    public final int streamsTaskOffsetIntervalMs;
+    private final Optional<Integer> streamsTaskOffsetIntervalMs;
 
-    public final String shareIsolationLevel;
+    private final Optional<IsolationLevel> shareIsolationLevel;
 
-    public final boolean shareRenewAcknowledgeEnable;
+    private final Optional<Boolean> shareRenewAcknowledgeEnable;
 
     public static final ConfigDef CONFIG_DEF = new ConfigDef()
         .define(CONSUMER_SESSION_TIMEOUT_MS_CONFIG,
@@ -286,13 +276,13 @@ public final class GroupConfig extends AbstractConfig {
      * {@code Optional.empty()} indicates that the config has no broker-level 
synonym.
      */
     public static final Map<String, Optional<String>> 
ALL_GROUP_CONFIG_SYNONYMS = Map.ofEntries(
-        // Consumer group configs
+        // Consumer group configs.
         Map.entry(CONSUMER_SESSION_TIMEOUT_MS_CONFIG, 
Optional.of(GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG)),
         Map.entry(CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, 
Optional.of(GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG)),
         Map.entry(CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG, 
Optional.of(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG)),
         Map.entry(CONSUMER_ASSIGNOR_OFFLOAD_ENABLE_CONFIG, 
Optional.of(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG)),
 
-        // Share group configs
+        // Share group configs.
         Map.entry(SHARE_SESSION_TIMEOUT_MS_CONFIG, 
Optional.of(GroupCoordinatorConfig.SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG)),
         Map.entry(SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, 
Optional.of(GroupCoordinatorConfig.SHARE_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG)),
         Map.entry(SHARE_RECORD_LOCK_DURATION_MS_CONFIG, 
Optional.of(ShareGroupConfig.SHARE_GROUP_RECORD_LOCK_DURATION_MS_CONFIG)),
@@ -304,7 +294,7 @@ public final class GroupConfig extends AbstractConfig {
         Map.entry(SHARE_ASSIGNMENT_INTERVAL_MS_CONFIG, 
Optional.of(GroupCoordinatorConfig.SHARE_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG)),
         Map.entry(SHARE_ASSIGNOR_OFFLOAD_ENABLE_CONFIG, 
Optional.of(GroupCoordinatorConfig.SHARE_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG)),
 
-        // Streams group configs
+        // Streams group configs.
         Map.entry(STREAMS_SESSION_TIMEOUT_MS_CONFIG, 
Optional.of(GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG)),
         Map.entry(STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG, 
Optional.of(GroupCoordinatorConfig.STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG)),
         Map.entry(STREAMS_NUM_STANDBY_REPLICAS_CONFIG, 
Optional.of(GroupCoordinatorConfig.STREAMS_GROUP_NUM_STANDBY_REPLICAS_CONFIG)),
@@ -324,48 +314,41 @@ public final class GroupConfig extends AbstractConfig {
 
     public GroupConfig(Map<?, ?> props) {
         super(CONFIG_DEF, props, false);
-        this.consumerSessionTimeoutMs = 
getInt(CONSUMER_SESSION_TIMEOUT_MS_CONFIG);
-        this.consumerHeartbeatIntervalMs = 
getInt(CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG);
-        // These have to be optionals because their default group coordinator 
configs are dynamic,
-        // so we must not capture the default value at the time of GroupConfig 
construction.
-        // KAFKA-20337 tracks the work to refactor GroupConfig into something 
more consistent.
-        this.consumerAssignmentIntervalMs = 
props.containsKey(CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG) ?
-            Optional.of(getInt(CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG)) :
-            Optional.empty();
-        this.consumerAssignorOffloadEnable = 
props.containsKey(CONSUMER_ASSIGNOR_OFFLOAD_ENABLE_CONFIG) ?
-            Optional.of(getBoolean(CONSUMER_ASSIGNOR_OFFLOAD_ENABLE_CONFIG)) :
-            Optional.empty();
-        this.shareSessionTimeoutMs = getInt(SHARE_SESSION_TIMEOUT_MS_CONFIG);
-        this.shareHeartbeatIntervalMs = 
getInt(SHARE_HEARTBEAT_INTERVAL_MS_CONFIG);
-        this.shareRecordLockDurationMs = 
getInt(SHARE_RECORD_LOCK_DURATION_MS_CONFIG);
-        this.shareDeliveryCountLimit = 
getInt(SHARE_DELIVERY_COUNT_LIMIT_CONFIG);
-        this.sharePartitionMaxRecordLocks = 
getInt(SHARE_PARTITION_MAX_RECORD_LOCKS_CONFIG);
-        this.shareAutoOffsetReset = getString(SHARE_AUTO_OFFSET_RESET_CONFIG);
-        // These have to be optionals because their default group coordinator 
configs are dynamic,
-        // so we must not capture the default value at the time of GroupConfig 
construction.
-        // KAFKA-20337 tracks the work to refactor GroupConfig into something 
more consistent.
-        this.shareAssignmentIntervalMs = 
props.containsKey(SHARE_ASSIGNMENT_INTERVAL_MS_CONFIG) ?
-            Optional.of(getInt(SHARE_ASSIGNMENT_INTERVAL_MS_CONFIG)) :
-            Optional.empty();
-        this.shareAssignorOffloadEnable = 
props.containsKey(SHARE_ASSIGNOR_OFFLOAD_ENABLE_CONFIG) ?
-            Optional.of(getBoolean(SHARE_ASSIGNOR_OFFLOAD_ENABLE_CONFIG)) :
-            Optional.empty();
-        this.streamsSessionTimeoutMs = 
getInt(STREAMS_SESSION_TIMEOUT_MS_CONFIG);
-        this.streamsHeartbeatIntervalMs = 
getInt(STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG);
-        this.streamsNumStandbyReplicas = 
getInt(STREAMS_NUM_STANDBY_REPLICAS_CONFIG);
-        this.streamsInitialRebalanceDelayMs = 
getInt(STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG);
-        // These have to be optionals because their default group coordinator 
configs are dynamic,
-        // so we must not capture the default value at the time of GroupConfig 
construction.
-        // KAFKA-20337 tracks the work to refactor GroupConfig into something 
more consistent.
-        this.streamsAssignmentIntervalMs = 
props.containsKey(STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG) ?
-            Optional.of(getInt(STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG)) :
-            Optional.empty();
-        this.streamsAssignorOffloadEnable = 
props.containsKey(STREAMS_ASSIGNOR_OFFLOAD_ENABLE_CONFIG) ?
-            Optional.of(getBoolean(STREAMS_ASSIGNOR_OFFLOAD_ENABLE_CONFIG)) :
-            Optional.empty();
-        this.streamsTaskOffsetIntervalMs = 
getInt(STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG);
-        this.shareIsolationLevel = getString(SHARE_ISOLATION_LEVEL_CONFIG);
-        this.shareRenewAcknowledgeEnable = 
getBoolean(SHARE_RENEW_ACKNOWLEDGE_ENABLE_CONFIG);
+        this.consumerSessionTimeoutMs = 
optionalInt(CONSUMER_SESSION_TIMEOUT_MS_CONFIG);
+        this.consumerHeartbeatIntervalMs = 
optionalInt(CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG);
+        this.consumerAssignmentIntervalMs = 
optionalInt(CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG);
+        this.consumerAssignorOffloadEnable = 
optionalBoolean(CONSUMER_ASSIGNOR_OFFLOAD_ENABLE_CONFIG);
+        this.shareSessionTimeoutMs = 
optionalInt(SHARE_SESSION_TIMEOUT_MS_CONFIG);
+        this.shareHeartbeatIntervalMs = 
optionalInt(SHARE_HEARTBEAT_INTERVAL_MS_CONFIG);
+        this.shareRecordLockDurationMs = 
optionalInt(SHARE_RECORD_LOCK_DURATION_MS_CONFIG);
+        this.shareDeliveryCountLimit = 
optionalInt(SHARE_DELIVERY_COUNT_LIMIT_CONFIG);
+        this.sharePartitionMaxRecordLocks = 
optionalInt(SHARE_PARTITION_MAX_RECORD_LOCKS_CONFIG);
+        this.shareAutoOffsetReset = 
optionalString(SHARE_AUTO_OFFSET_RESET_CONFIG)
+            .map(ShareGroupAutoOffsetResetStrategy::fromString);
+        this.shareAssignmentIntervalMs = 
optionalInt(SHARE_ASSIGNMENT_INTERVAL_MS_CONFIG);
+        this.shareAssignorOffloadEnable = 
optionalBoolean(SHARE_ASSIGNOR_OFFLOAD_ENABLE_CONFIG);
+        this.streamsSessionTimeoutMs = 
optionalInt(STREAMS_SESSION_TIMEOUT_MS_CONFIG);
+        this.streamsHeartbeatIntervalMs = 
optionalInt(STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG);
+        this.streamsNumStandbyReplicas = 
optionalInt(STREAMS_NUM_STANDBY_REPLICAS_CONFIG);
+        this.streamsInitialRebalanceDelayMs = 
optionalInt(STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG);
+        this.streamsAssignmentIntervalMs = 
optionalInt(STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG);
+        this.streamsAssignorOffloadEnable = 
optionalBoolean(STREAMS_ASSIGNOR_OFFLOAD_ENABLE_CONFIG);
+        this.streamsTaskOffsetIntervalMs = 
optionalInt(STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG);
+        this.shareIsolationLevel = optionalString(SHARE_ISOLATION_LEVEL_CONFIG)
+            .map(s -> IsolationLevel.valueOf(s.toUpperCase(Locale.ROOT)));
+        this.shareRenewAcknowledgeEnable = 
optionalBoolean(SHARE_RENEW_ACKNOWLEDGE_ENABLE_CONFIG);
+    }
+
+    private Optional<Integer> optionalInt(String key) {
+        return originals().containsKey(key) ? Optional.of(getInt(key)) : 
Optional.empty();
+    }
+
+    private Optional<Boolean> optionalBoolean(String key) {
+        return originals().containsKey(key) ? Optional.of(getBoolean(key)) : 
Optional.empty();
+    }
+
+    private Optional<String> optionalString(String key) {
+        return originals().containsKey(key) ? Optional.of(getString(key)) : 
Optional.empty();
     }
 
     public static Optional<Type> configType(String configName) {
@@ -377,7 +360,7 @@ public final class GroupConfig extends AbstractConfig {
     }
 
     /**
-     * Check that property names are valid
+     * Check that property names are valid.
      */
     public static void validateNames(Map<String, ?> props) {
         Set<String> names = configNames();
@@ -389,172 +372,242 @@ public final class GroupConfig extends AbstractConfig {
     }
 
     /**
-     * Validates the values of the given properties.
-     */
-    @SuppressWarnings({"CyclomaticComplexity", "NPathComplexity"})
-    private static void validateValues(Map<String, Object> unparsedMap, 
GroupCoordinatorConfig groupCoordinatorConfig, ShareGroupConfig 
shareGroupConfig) {
-        Map<String, Object> valueMaps = CONFIG_DEF.parse(unparsedMap);
-        int consumerHeartbeatInterval = (Integer) 
valueMaps.get(CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG);
-        int consumerSessionTimeout = (Integer) 
valueMaps.get(CONSUMER_SESSION_TIMEOUT_MS_CONFIG);
-        int consumerAssignmentIntervalMs = (Integer) 
valueMaps.get(CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG);
-        int shareHeartbeatInterval = (Integer) 
valueMaps.get(SHARE_HEARTBEAT_INTERVAL_MS_CONFIG);
-        int shareSessionTimeout = (Integer) 
valueMaps.get(SHARE_SESSION_TIMEOUT_MS_CONFIG);
-        int shareRecordLockDurationMs = (Integer) 
valueMaps.get(SHARE_RECORD_LOCK_DURATION_MS_CONFIG);
-        int shareDeliveryCountLimit = (Integer) 
valueMaps.get(SHARE_DELIVERY_COUNT_LIMIT_CONFIG);
-        int sharePartitionMaxRecordLocks = (Integer) 
valueMaps.get(SHARE_PARTITION_MAX_RECORD_LOCKS_CONFIG);
-        int shareAssignmentIntervalMs = (Integer) 
valueMaps.get(SHARE_ASSIGNMENT_INTERVAL_MS_CONFIG);
-        int streamsSessionTimeoutMs = (Integer) 
valueMaps.get(STREAMS_SESSION_TIMEOUT_MS_CONFIG);
-        int streamsHeartbeatIntervalMs = (Integer) 
valueMaps.get(STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG);
-        int streamsNumStandbyReplicas = (Integer) 
valueMaps.get(STREAMS_NUM_STANDBY_REPLICAS_CONFIG);
-        int streamsAssignmentIntervalMs = (Integer) 
valueMaps.get(STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG);
-        int streamsTaskOffsetIntervalMs = (Integer) 
valueMaps.get(STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG);
-        if (consumerHeartbeatInterval < 
groupCoordinatorConfig.consumerGroupMinHeartbeatIntervalMs()) {
-            throw new 
InvalidConfigurationException(CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG + " must be 
greater than or equal to " +
-                
GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG);
-        }
-        if (consumerHeartbeatInterval > 
groupCoordinatorConfig.consumerGroupMaxHeartbeatIntervalMs()) {
-            throw new 
InvalidConfigurationException(CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG + " must be 
less than or equal to " +
-                
GroupCoordinatorConfig.CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG);
-        }
-        if (consumerSessionTimeout < 
groupCoordinatorConfig.consumerGroupMinSessionTimeoutMs()) {
-            throw new 
InvalidConfigurationException(CONSUMER_SESSION_TIMEOUT_MS_CONFIG + " must be 
greater than or equal to " +
-                
GroupCoordinatorConfig.CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG);
-        }
-        if (consumerSessionTimeout > 
groupCoordinatorConfig.consumerGroupMaxSessionTimeoutMs()) {
-            throw new 
InvalidConfigurationException(CONSUMER_SESSION_TIMEOUT_MS_CONFIG + " must be 
less than or equal to " +
-                
GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG);
-        }
-        // If no group-level consumer assignment interval is configured, do 
not attempt to validate it.
-        // TODO: It's not clear if we can run the validation unconditionally.
-        //       KAFKA-20337 tracks the work to clean this up.
-        if (unparsedMap.containsKey(CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG)) {
-            if (consumerAssignmentIntervalMs < 
groupCoordinatorConfig.consumerGroupMinAssignmentIntervalMs()) {
-                throw new 
InvalidConfigurationException(CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG + " must 
be greater than or equal to " +
-                    
GroupCoordinatorConfig.CONSUMER_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG);
-            }
-            if (consumerAssignmentIntervalMs > 
groupCoordinatorConfig.consumerGroupMaxAssignmentIntervalMs()) {
-                throw new 
InvalidConfigurationException(CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG + " must 
be less than or equal to " +
-                    
GroupCoordinatorConfig.CONSUMER_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_CONFIG);
-            }
-        }
-        if (shareHeartbeatInterval < 
groupCoordinatorConfig.shareGroupMinHeartbeatIntervalMs()) {
-            throw new 
InvalidConfigurationException(SHARE_HEARTBEAT_INTERVAL_MS_CONFIG + " must be 
greater than or equal to " +
-                
GroupCoordinatorConfig.SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG);
-        }
-        if (shareHeartbeatInterval > 
groupCoordinatorConfig.shareGroupMaxHeartbeatIntervalMs()) {
-            throw new 
InvalidConfigurationException(SHARE_HEARTBEAT_INTERVAL_MS_CONFIG + " must be 
less than or equal to " +
-                
GroupCoordinatorConfig.SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG);
-        }
-        if (shareSessionTimeout < 
groupCoordinatorConfig.shareGroupMinSessionTimeoutMs()) {
-            throw new 
InvalidConfigurationException(SHARE_SESSION_TIMEOUT_MS_CONFIG + " must be 
greater than or equal to " +
-                
GroupCoordinatorConfig.SHARE_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG);
-        }
-        if (shareSessionTimeout > 
groupCoordinatorConfig.shareGroupMaxSessionTimeoutMs()) {
-            throw new 
InvalidConfigurationException(SHARE_SESSION_TIMEOUT_MS_CONFIG + " must be less 
than or equal to " +
-                
GroupCoordinatorConfig.SHARE_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG);
-        }
-        if (shareRecordLockDurationMs < 
shareGroupConfig.shareGroupMinRecordLockDurationMs()) {
-            throw new 
InvalidConfigurationException(SHARE_RECORD_LOCK_DURATION_MS_CONFIG + " must be 
greater than or equal to " +
-                
ShareGroupConfig.SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_CONFIG);
-        }
-        if (shareRecordLockDurationMs > 
shareGroupConfig.shareGroupMaxRecordLockDurationMs()) {
-            throw new 
InvalidConfigurationException(SHARE_RECORD_LOCK_DURATION_MS_CONFIG + " must be 
less than or equal to " +
-                
ShareGroupConfig.SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_CONFIG);
-        }
-        if (shareDeliveryCountLimit < 
shareGroupConfig.shareGroupMinDeliveryCountLimit()) {
-            throw new 
InvalidConfigurationException(SHARE_DELIVERY_COUNT_LIMIT_CONFIG + " must be 
greater than or equal to " +
-                ShareGroupConfig.SHARE_GROUP_MIN_DELIVERY_COUNT_LIMIT_CONFIG);
-        }
-        if (shareDeliveryCountLimit > 
shareGroupConfig.shareGroupMaxDeliveryCountLimit()) {
-            throw new 
InvalidConfigurationException(SHARE_DELIVERY_COUNT_LIMIT_CONFIG + " must be 
less than or equal to " +
-                ShareGroupConfig.SHARE_GROUP_MAX_DELIVERY_COUNT_LIMIT_CONFIG);
-        }
-        if (sharePartitionMaxRecordLocks < 
shareGroupConfig.shareGroupMinPartitionMaxRecordLocks()) {
-            throw new 
InvalidConfigurationException(SHARE_PARTITION_MAX_RECORD_LOCKS_CONFIG + " must 
be greater than or equal to " +
-                
ShareGroupConfig.SHARE_GROUP_MIN_PARTITION_MAX_RECORD_LOCKS_CONFIG);
-        }
-        if (sharePartitionMaxRecordLocks > 
shareGroupConfig.shareGroupMaxPartitionMaxRecordLocks()) {
-            throw new 
InvalidConfigurationException(SHARE_PARTITION_MAX_RECORD_LOCKS_CONFIG + " must 
be less than or equal to " +
-                
ShareGroupConfig.SHARE_GROUP_MAX_PARTITION_MAX_RECORD_LOCKS_CONFIG);
-        }
-        // If no group-level share assignment interval is configured, do not 
attempt to validate it.
-        // TODO: It's not clear if we can run the validation unconditionally.
-        //       KAFKA-20337 tracks the work to clean this up.
-        if (unparsedMap.containsKey(SHARE_ASSIGNMENT_INTERVAL_MS_CONFIG)) {
-            if (shareAssignmentIntervalMs < 
groupCoordinatorConfig.shareGroupMinAssignmentIntervalMs()) {
-                throw new 
InvalidConfigurationException(SHARE_ASSIGNMENT_INTERVAL_MS_CONFIG + " must be 
greater than or equal to " +
-                    
GroupCoordinatorConfig.SHARE_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG);
-            }
-            if (shareAssignmentIntervalMs > 
groupCoordinatorConfig.shareGroupMaxAssignmentIntervalMs()) {
-                throw new 
InvalidConfigurationException(SHARE_ASSIGNMENT_INTERVAL_MS_CONFIG + " must be 
less than or equal to " +
-                    
GroupCoordinatorConfig.SHARE_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_CONFIG);
-            }
-        }
-        if (streamsHeartbeatIntervalMs < 
groupCoordinatorConfig.streamsGroupMinHeartbeatIntervalMs()) {
-            throw new 
InvalidConfigurationException(STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG + " must be 
greater than or equal to " +
-                
GroupCoordinatorConfig.STREAMS_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG);
-        }
-        if (streamsHeartbeatIntervalMs > 
groupCoordinatorConfig.streamsGroupMaxHeartbeatIntervalMs()) {
-            throw new 
InvalidConfigurationException(STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG + " must be 
less than or equal to " +
-                
GroupCoordinatorConfig.STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG);
-        }
-        if (streamsSessionTimeoutMs < 
groupCoordinatorConfig.streamsGroupMinSessionTimeoutMs()) {
-            throw new 
InvalidConfigurationException(STREAMS_SESSION_TIMEOUT_MS_CONFIG + " must be 
greater than or equal to " +
-                
GroupCoordinatorConfig.STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG);
-        }
-        if (streamsSessionTimeoutMs > 
groupCoordinatorConfig.streamsGroupMaxSessionTimeoutMs()) {
-            throw new 
InvalidConfigurationException(STREAMS_SESSION_TIMEOUT_MS_CONFIG + " must be 
less than or equal to " +
-                
GroupCoordinatorConfig.STREAMS_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG);
-        }
-        if (streamsNumStandbyReplicas > 
groupCoordinatorConfig.streamsGroupMaxNumStandbyReplicas()) {
-            throw new 
InvalidConfigurationException(STREAMS_NUM_STANDBY_REPLICAS_CONFIG + " must be 
less than or equal to " +
-                
GroupCoordinatorConfig.STREAMS_GROUP_MAX_STANDBY_REPLICAS_CONFIG);
-        }
-        // If no group-level streams assignment interval is configured, do not 
attempt to validate it.
-        // TODO: It's not clear if we can run the validation unconditionally.
-        //       KAFKA-20337 tracks the work to clean this up.
-        if (unparsedMap.containsKey(STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG)) {
-            if (streamsAssignmentIntervalMs < 
groupCoordinatorConfig.streamsGroupMinAssignmentIntervalMs()) {
-                throw new 
InvalidConfigurationException(STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG + " must be 
greater than or equal to " +
-                    
GroupCoordinatorConfig.STREAMS_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG);
-            }
-            if (streamsAssignmentIntervalMs > 
groupCoordinatorConfig.streamsGroupMaxAssignmentIntervalMs()) {
-                throw new 
InvalidConfigurationException(STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG + " must be 
less than or equal to " +
-                    
GroupCoordinatorConfig.STREAMS_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_CONFIG);
-            }
-        }
-        if (streamsTaskOffsetIntervalMs < 
groupCoordinatorConfig.streamsGroupMinTaskOffsetIntervalMs()) {
-            throw new 
InvalidConfigurationException(STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG + " must 
be greater than or equal to " +
-                
GroupCoordinatorConfig.STREAMS_GROUP_MIN_TASK_OFFSET_INTERVAL_MS_CONFIG);
-        }
-        if (consumerSessionTimeout <= consumerHeartbeatInterval) {
-            throw new 
InvalidConfigurationException(CONSUMER_SESSION_TIMEOUT_MS_CONFIG + " must be 
greater than " +
-                CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG);
-        }
-        if (shareSessionTimeout <= shareHeartbeatInterval) {
-            throw new 
InvalidConfigurationException(SHARE_SESSION_TIMEOUT_MS_CONFIG + " must be 
greater than " +
-                SHARE_HEARTBEAT_INTERVAL_MS_CONFIG);
-        }
-        if (streamsSessionTimeoutMs <= streamsHeartbeatIntervalMs) {
-            throw new 
InvalidConfigurationException(STREAMS_SESSION_TIMEOUT_MS_CONFIG + " must be 
greater than " +
-                STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG);
-        }
+     * Check that the given properties contain only valid group config names 
and that
+     * all values can be parsed and are valid.
+     */
+    public static void validate(
+        Map<String, ?> props,
+        GroupCoordinatorConfig groupCoordinatorConfig,
+        ShareGroupConfig shareGroupConfig
+    ) {
+        validateNames(props);
+        var parsed = CONFIG_DEF.parse(props);
+        parsed.keySet().retainAll(props.keySet());
+        validateValues(
+            parsed,
+            groupCoordinatorConfig,
+            shareGroupConfig
+        );
     }
 
     /**
-     * Check that the given properties contain only valid group config names 
and that
-     * all values can be parsed and are valid. The provided properties are 
merged with
-     * the broker-level defaults before validation.
+     * Validates the parsed values against broker-level bounds.
+     * Only configs explicitly present in the parsed map are validated.
      */
-    public static void validate(Map<String, ?> props, GroupCoordinatorConfig 
groupCoordinatorConfig, ShareGroupConfig shareGroupConfig) {
-        // TODO: We shouldn't be re-deriving the default config from 
GroupConfigManager here.
-        //       KAFKA-20337 tracks the work to clean this up.
-        Map<String, Object> combinedConfigs = new HashMap<>();
-        
combinedConfigs.putAll(groupCoordinatorConfig.extractGroupConfigMap(shareGroupConfig));
-        combinedConfigs.putAll(props);
+    private static void validateValues(
+        Map<String, Object> parsed,
+        GroupCoordinatorConfig groupCoordinatorConfig,
+        ShareGroupConfig shareGroupConfig
+    ) {
+        // Consumer group configs.
+        validateIntRange(
+            parsed,
+            CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG,
+            groupCoordinatorConfig.consumerGroupMinHeartbeatIntervalMs(),
+            
GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG,
+            groupCoordinatorConfig.consumerGroupMaxHeartbeatIntervalMs(),
+            
GroupCoordinatorConfig.CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG
+        );
+        validateIntRange(
+            parsed,
+            CONSUMER_SESSION_TIMEOUT_MS_CONFIG,
+            groupCoordinatorConfig.consumerGroupMinSessionTimeoutMs(),
+            
GroupCoordinatorConfig.CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG,
+            groupCoordinatorConfig.consumerGroupMaxSessionTimeoutMs(),
+            GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG
+        );
+        validateIntRange(
+            parsed,
+            CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG,
+            groupCoordinatorConfig.consumerGroupMinAssignmentIntervalMs(),
+            
GroupCoordinatorConfig.CONSUMER_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG,
+            groupCoordinatorConfig.consumerGroupMaxAssignmentIntervalMs(),
+            
GroupCoordinatorConfig.CONSUMER_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_CONFIG
+        );
+
+        // Share group configs.
+        validateIntRange(
+            parsed,
+            SHARE_HEARTBEAT_INTERVAL_MS_CONFIG,
+            groupCoordinatorConfig.shareGroupMinHeartbeatIntervalMs(),
+            
GroupCoordinatorConfig.SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG,
+            groupCoordinatorConfig.shareGroupMaxHeartbeatIntervalMs(),
+            GroupCoordinatorConfig.SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG
+        );
+        validateIntRange(
+            parsed,
+            SHARE_SESSION_TIMEOUT_MS_CONFIG,
+            groupCoordinatorConfig.shareGroupMinSessionTimeoutMs(),
+            GroupCoordinatorConfig.SHARE_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG,
+            groupCoordinatorConfig.shareGroupMaxSessionTimeoutMs(),
+            GroupCoordinatorConfig.SHARE_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG
+        );
+        validateIntRange(
+            parsed,
+            SHARE_RECORD_LOCK_DURATION_MS_CONFIG,
+            shareGroupConfig.shareGroupMinRecordLockDurationMs(),
+            ShareGroupConfig.SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_CONFIG,
+            shareGroupConfig.shareGroupMaxRecordLockDurationMs(),
+            ShareGroupConfig.SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_CONFIG
+        );
+        validateIntRange(
+            parsed,
+            SHARE_DELIVERY_COUNT_LIMIT_CONFIG,
+            shareGroupConfig.shareGroupMinDeliveryCountLimit(),
+            ShareGroupConfig.SHARE_GROUP_MIN_DELIVERY_COUNT_LIMIT_CONFIG,
+            shareGroupConfig.shareGroupMaxDeliveryCountLimit(),
+            ShareGroupConfig.SHARE_GROUP_MAX_DELIVERY_COUNT_LIMIT_CONFIG
+        );
+        validateIntRange(
+            parsed,
+            SHARE_PARTITION_MAX_RECORD_LOCKS_CONFIG,
+            shareGroupConfig.shareGroupMinPartitionMaxRecordLocks(),
+            ShareGroupConfig.SHARE_GROUP_MIN_PARTITION_MAX_RECORD_LOCKS_CONFIG,
+            shareGroupConfig.shareGroupMaxPartitionMaxRecordLocks(),
+            ShareGroupConfig.SHARE_GROUP_MAX_PARTITION_MAX_RECORD_LOCKS_CONFIG
+        );
+        validateIntRange(
+            parsed,
+            SHARE_ASSIGNMENT_INTERVAL_MS_CONFIG,
+            groupCoordinatorConfig.shareGroupMinAssignmentIntervalMs(),
+            
GroupCoordinatorConfig.SHARE_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG,
+            groupCoordinatorConfig.shareGroupMaxAssignmentIntervalMs(),
+            
GroupCoordinatorConfig.SHARE_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_CONFIG
+        );
+
+        // Streams group configs.
+        validateIntRange(
+            parsed,
+            STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG,
+            groupCoordinatorConfig.streamsGroupMinHeartbeatIntervalMs(),
+            
GroupCoordinatorConfig.STREAMS_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG,
+            groupCoordinatorConfig.streamsGroupMaxHeartbeatIntervalMs(),
+            
GroupCoordinatorConfig.STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG
+        );
+        validateIntRange(
+            parsed,
+            STREAMS_SESSION_TIMEOUT_MS_CONFIG,
+            groupCoordinatorConfig.streamsGroupMinSessionTimeoutMs(),
+            GroupCoordinatorConfig.STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG,
+            groupCoordinatorConfig.streamsGroupMaxSessionTimeoutMs(),
+            GroupCoordinatorConfig.STREAMS_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG
+        );
+        validateIntMax(
+            parsed,
+            STREAMS_NUM_STANDBY_REPLICAS_CONFIG,
+            groupCoordinatorConfig.streamsGroupMaxNumStandbyReplicas(),
+            GroupCoordinatorConfig.STREAMS_GROUP_MAX_STANDBY_REPLICAS_CONFIG
+        );
+        validateIntRange(
+            parsed,
+            STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG,
+            groupCoordinatorConfig.streamsGroupMinAssignmentIntervalMs(),
+            
GroupCoordinatorConfig.STREAMS_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG,
+            groupCoordinatorConfig.streamsGroupMaxAssignmentIntervalMs(),
+            
GroupCoordinatorConfig.STREAMS_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_CONFIG
+        );
+        validateIntMin(
+            parsed,
+            STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG,
+            groupCoordinatorConfig.streamsGroupMinTaskOffsetIntervalMs(),
+            
GroupCoordinatorConfig.STREAMS_GROUP_MIN_TASK_OFFSET_INTERVAL_MS_CONFIG
+        );
+
+        // Cross-field validations: session timeout must be greater than 
heartbeat interval.
+        validateSessionExceedsHeartbeat(
+            parsed,
+            CONSUMER_SESSION_TIMEOUT_MS_CONFIG,
+            groupCoordinatorConfig.consumerGroupSessionTimeoutMs(),
+            CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG,
+            groupCoordinatorConfig.consumerGroupHeartbeatIntervalMs()
+        );
+        validateSessionExceedsHeartbeat(
+            parsed,
+            SHARE_SESSION_TIMEOUT_MS_CONFIG,
+            groupCoordinatorConfig.shareGroupSessionTimeoutMs(),
+            SHARE_HEARTBEAT_INTERVAL_MS_CONFIG,
+            groupCoordinatorConfig.shareGroupHeartbeatIntervalMs()
+        );
+        validateSessionExceedsHeartbeat(
+            parsed,
+            STREAMS_SESSION_TIMEOUT_MS_CONFIG,
+            groupCoordinatorConfig.streamsGroupSessionTimeoutMs(),
+            STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG,
+            groupCoordinatorConfig.streamsGroupHeartbeatIntervalMs()
+        );
+    }
+
+    /**
+     * Validates that an integer config value falls within [min, max].
+     * No-op when the key is absent from the parsed map.
+     */
+    private static void validateIntRange(
+        Map<String, Object> parsed,
+        String key,
+        int min,
+        String minConfigName,
+        int max,
+        String maxConfigName
+    ) {
+        if (!parsed.containsKey(key)) return;
+        int value = (Integer) parsed.get(key);
+        if (value < min)
+            throw new InvalidConfigurationException(key + " must be greater 
than or equal to " + minConfigName);
+        if (value > max)
+            throw new InvalidConfigurationException(key + " must be less than 
or equal to " + maxConfigName);
+    }
+
+    /**
+     * Validates that an integer config value does not exceed max.
+     * No-op when the key is absent from the parsed map.
+     */
+    private static void validateIntMax(
+        Map<String, Object> parsed,
+        String key,
+        int max,
+        String maxConfigName
+    ) {
+        if (!parsed.containsKey(key)) return;
+        int value = (Integer) parsed.get(key);
+        if (value > max)
+            throw new InvalidConfigurationException(key + " must be less than 
or equal to " + maxConfigName);
+    }
 
-        validateNames(combinedConfigs);
-        validateValues(combinedConfigs, groupCoordinatorConfig, 
shareGroupConfig);
+    /**
+     * Validates that an integer config value is at least min.
+     * No-op when the key is absent from the parsed map.
+     */
+    private static void validateIntMin(
+        Map<String, Object> parsed,
+        String key,
+        int min,
+        String minConfigName
+    ) {
+        if (!parsed.containsKey(key)) return;
+        int value = (Integer) parsed.get(key);
+        if (value < min)
+            throw new InvalidConfigurationException(key + " must be greater 
than or equal to " + minConfigName);
+    }
+
+    /**
+     * Validates that the session timeout is greater than the heartbeat 
interval.
+     * Uses broker defaults for any config not present in the parsed map.
+     */
+    private static void validateSessionExceedsHeartbeat(
+        Map<String, Object> parsed,
+        String sessionKey,
+        int defaultSession,
+        String heartbeatKey,
+        int defaultHeartbeat
+    ) {
+        if (parsed.containsKey(sessionKey) || 
parsed.containsKey(heartbeatKey)) {
+            int effectiveSession = parsed.containsKey(sessionKey)
+                ? (Integer) parsed.get(sessionKey) : defaultSession;
+            int effectiveHeartbeat = parsed.containsKey(heartbeatKey)
+                ? (Integer) parsed.get(heartbeatKey) : defaultHeartbeat;
+            if (effectiveSession <= effectiveHeartbeat)
+                throw new InvalidConfigurationException(sessionKey + " must be 
greater than " + heartbeatKey);
+        }
     }
 
     /**
@@ -575,7 +628,12 @@ public final class GroupConfig extends AbstractConfig {
     ) {
         Properties effective = new Properties();
         effective.putAll(props);
-        evaluateValues(effective, groupId, groupCoordinatorConfig, 
shareGroupConfig);
+        evaluateValues(
+            effective,
+            groupId,
+            groupCoordinatorConfig,
+            shareGroupConfig
+        );
         return effective;
     }
 
@@ -585,62 +643,133 @@ public final class GroupConfig extends AbstractConfig {
         GroupCoordinatorConfig groupCoordinatorConfig,
         ShareGroupConfig shareGroupConfig
     ) {
-        // Consumer group configs
-        clampToRange(props, groupId, CONSUMER_SESSION_TIMEOUT_MS_CONFIG,
+        // Consumer group configs.
+        clampToRange(
+            props,
+            groupId,
+            CONSUMER_SESSION_TIMEOUT_MS_CONFIG,
             groupCoordinatorConfig.consumerGroupMinSessionTimeoutMs(),
-            groupCoordinatorConfig.consumerGroupMaxSessionTimeoutMs());
-        clampToRange(props, groupId, CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG,
+            groupCoordinatorConfig.consumerGroupMaxSessionTimeoutMs()
+        );
+        clampToRange(
+            props,
+            groupId,
+            CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG,
             groupCoordinatorConfig.consumerGroupMinHeartbeatIntervalMs(),
-            groupCoordinatorConfig.consumerGroupMaxHeartbeatIntervalMs());
-        clampToRange(props, groupId, CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG,
+            groupCoordinatorConfig.consumerGroupMaxHeartbeatIntervalMs()
+        );
+        clampToRange(
+            props,
+            groupId,
+            CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG,
             groupCoordinatorConfig.consumerGroupMinAssignmentIntervalMs(),
-            groupCoordinatorConfig.consumerGroupMaxAssignmentIntervalMs());
-
-        // Share group configs
-        clampToRange(props, groupId, SHARE_SESSION_TIMEOUT_MS_CONFIG,
+            groupCoordinatorConfig.consumerGroupMaxAssignmentIntervalMs()
+        );
+
+        // Share group configs.
+        clampToRange(
+            props,
+            groupId,
+            SHARE_SESSION_TIMEOUT_MS_CONFIG,
             groupCoordinatorConfig.shareGroupMinSessionTimeoutMs(),
-            groupCoordinatorConfig.shareGroupMaxSessionTimeoutMs());
-        clampToRange(props, groupId, SHARE_HEARTBEAT_INTERVAL_MS_CONFIG,
+            groupCoordinatorConfig.shareGroupMaxSessionTimeoutMs()
+        );
+        clampToRange(
+            props,
+            groupId,
+            SHARE_HEARTBEAT_INTERVAL_MS_CONFIG,
             groupCoordinatorConfig.shareGroupMinHeartbeatIntervalMs(),
-            groupCoordinatorConfig.shareGroupMaxHeartbeatIntervalMs());
-        clampToRange(props, groupId, SHARE_RECORD_LOCK_DURATION_MS_CONFIG,
+            groupCoordinatorConfig.shareGroupMaxHeartbeatIntervalMs()
+        );
+        clampToRange(
+            props,
+            groupId,
+            SHARE_RECORD_LOCK_DURATION_MS_CONFIG,
             shareGroupConfig.shareGroupMinRecordLockDurationMs(),
-            shareGroupConfig.shareGroupMaxRecordLockDurationMs());
-        clampToRange(props, groupId, SHARE_DELIVERY_COUNT_LIMIT_CONFIG,
+            shareGroupConfig.shareGroupMaxRecordLockDurationMs()
+        );
+        clampToRange(
+            props,
+            groupId,
+            SHARE_DELIVERY_COUNT_LIMIT_CONFIG,
             shareGroupConfig.shareGroupMinDeliveryCountLimit(),
-            shareGroupConfig.shareGroupMaxDeliveryCountLimit());
-        clampToRange(props, groupId, SHARE_PARTITION_MAX_RECORD_LOCKS_CONFIG,
+            shareGroupConfig.shareGroupMaxDeliveryCountLimit()
+        );
+        clampToRange(
+            props,
+            groupId,
+            SHARE_PARTITION_MAX_RECORD_LOCKS_CONFIG,
             shareGroupConfig.shareGroupMinPartitionMaxRecordLocks(),
-            shareGroupConfig.shareGroupMaxPartitionMaxRecordLocks());
-        clampToRange(props, groupId, SHARE_ASSIGNMENT_INTERVAL_MS_CONFIG,
+            shareGroupConfig.shareGroupMaxPartitionMaxRecordLocks()
+        );
+        clampToRange(
+            props,
+            groupId,
+            SHARE_ASSIGNMENT_INTERVAL_MS_CONFIG,
             groupCoordinatorConfig.shareGroupMinAssignmentIntervalMs(),
-            groupCoordinatorConfig.shareGroupMaxAssignmentIntervalMs());
-
-        // Streams group configs
-        clampToRange(props, groupId, STREAMS_SESSION_TIMEOUT_MS_CONFIG,
+            groupCoordinatorConfig.shareGroupMaxAssignmentIntervalMs()
+        );
+
+        // Streams group configs.
+        clampToRange(
+            props,
+            groupId,
+            STREAMS_SESSION_TIMEOUT_MS_CONFIG,
             groupCoordinatorConfig.streamsGroupMinSessionTimeoutMs(),
-            groupCoordinatorConfig.streamsGroupMaxSessionTimeoutMs());
-        clampToRange(props, groupId, STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG,
+            groupCoordinatorConfig.streamsGroupMaxSessionTimeoutMs()
+        );
+        clampToRange(
+            props,
+            groupId,
+            STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG,
             groupCoordinatorConfig.streamsGroupMinHeartbeatIntervalMs(),
-            groupCoordinatorConfig.streamsGroupMaxHeartbeatIntervalMs());
-        clampToMax(props, groupId, STREAMS_NUM_STANDBY_REPLICAS_CONFIG,
-            groupCoordinatorConfig.streamsGroupMaxNumStandbyReplicas());
-        clampToRange(props, groupId, STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG,
+            groupCoordinatorConfig.streamsGroupMaxHeartbeatIntervalMs()
+        );
+        clampToMax(
+            props,
+            groupId,
+            STREAMS_NUM_STANDBY_REPLICAS_CONFIG,
+            groupCoordinatorConfig.streamsGroupMaxNumStandbyReplicas()
+        );
+        clampToRange(
+            props,
+            groupId,
+            STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG,
             groupCoordinatorConfig.streamsGroupMinAssignmentIntervalMs(),
-            groupCoordinatorConfig.streamsGroupMaxAssignmentIntervalMs());
-        clampToMin(props, groupId, STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG,
-            groupCoordinatorConfig.streamsGroupMinTaskOffsetIntervalMs());
+            groupCoordinatorConfig.streamsGroupMaxAssignmentIntervalMs()
+        );
+        clampToMin(
+            props,
+            groupId,
+            STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG,
+            groupCoordinatorConfig.streamsGroupMinTaskOffsetIntervalMs()
+        );
 
         // Verify that clamping did not break the session > heartbeat 
invariant.
-        checkSessionExceedsHeartbeat(props, groupId,
-            CONSUMER_SESSION_TIMEOUT_MS_CONFIG, 
groupCoordinatorConfig.consumerGroupSessionTimeoutMs(),
-            CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, 
groupCoordinatorConfig.consumerGroupHeartbeatIntervalMs());
-        checkSessionExceedsHeartbeat(props, groupId,
-            SHARE_SESSION_TIMEOUT_MS_CONFIG, 
groupCoordinatorConfig.shareGroupSessionTimeoutMs(),
-            SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, 
groupCoordinatorConfig.shareGroupHeartbeatIntervalMs());
-        checkSessionExceedsHeartbeat(props, groupId,
-            STREAMS_SESSION_TIMEOUT_MS_CONFIG, 
groupCoordinatorConfig.streamsGroupSessionTimeoutMs(),
-            STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG, 
groupCoordinatorConfig.streamsGroupHeartbeatIntervalMs());
+        checkSessionExceedsHeartbeat(
+            props,
+            groupId,
+            CONSUMER_SESSION_TIMEOUT_MS_CONFIG,
+            groupCoordinatorConfig.consumerGroupSessionTimeoutMs(),
+            CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG,
+            groupCoordinatorConfig.consumerGroupHeartbeatIntervalMs()
+        );
+        checkSessionExceedsHeartbeat(
+            props,
+            groupId,
+            SHARE_SESSION_TIMEOUT_MS_CONFIG,
+            groupCoordinatorConfig.shareGroupSessionTimeoutMs(),
+            SHARE_HEARTBEAT_INTERVAL_MS_CONFIG,
+            groupCoordinatorConfig.shareGroupHeartbeatIntervalMs()
+        );
+        checkSessionExceedsHeartbeat(
+            props,
+            groupId,
+            STREAMS_SESSION_TIMEOUT_MS_CONFIG,
+            groupCoordinatorConfig.streamsGroupSessionTimeoutMs(),
+            STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG,
+            groupCoordinatorConfig.streamsGroupHeartbeatIntervalMs()
+        );
     }
 
     /**
@@ -662,7 +791,7 @@ public final class GroupConfig extends AbstractConfig {
         int session = rawSession != null ? 
Integer.parseInt(rawSession.toString()) : defaultSession;
         int heartbeat = rawHeartbeat != null ? 
Integer.parseInt(rawHeartbeat.toString()) : defaultHeartbeat;
         if (session <= heartbeat) {
-            log.warn("The effective {} ({}) for group '{}' is not greater than 
{} ({}). "
+            LOG.warn("The effective {} ({}) for group '{}' is not greater than 
{} ({}). "
                     + "Check that the broker-level min/max bounds for session 
timeout "
                     + "and heartbeat interval do not overlap.",
                 sessionKey, session, groupId, heartbeatKey, heartbeat);
@@ -691,12 +820,12 @@ public final class GroupConfig extends AbstractConfig {
 
         int value = Integer.parseInt(rawValue.toString());
         if (value < min) {
-            log.warn("The group config '{}' for group '{}' has value {} which 
is below the broker's " +
+            LOG.warn("The group config '{}' for group '{}' has value {} which 
is below the broker's " +
                     "allowed minimum {}. The effective value will be capped to 
{}.",
                 key, groupId, value, min, min);
             props.put(key, min);
         } else if (value > max) {
-            log.warn("The group config '{}' for group '{}' has value {} which 
exceeds the broker's " +
+            LOG.warn("The group config '{}' for group '{}' has value {} which 
exceeds the broker's " +
                     "allowed maximum {}. The effective value will be capped to 
{}.",
                 key, groupId, value, max, max);
             props.put(key, max);
@@ -723,7 +852,7 @@ public final class GroupConfig extends AbstractConfig {
 
         int value = Integer.parseInt(rawValue.toString());
         if (value > max) {
-            log.warn("The group config '{}' for group '{}' has value {} which 
exceeds the broker's " +
+            LOG.warn("The group config '{}' for group '{}' has value {} which 
exceeds the broker's " +
                     "allowed maximum {}. The effective value will be capped to 
{}.",
                 key, groupId, value, max, max);
             props.put(key, max);
@@ -750,7 +879,7 @@ public final class GroupConfig extends AbstractConfig {
 
         int value = Integer.parseInt(rawValue.toString());
         if (value < min) {
-            log.warn("The group config '{}' for group '{}' has value {} which 
is below the broker's " +
+            LOG.warn("The group config '{}' for group '{}' has value {} which 
is below the broker's " +
                     "allowed minimum {}. The effective value will be capped to 
{}.",
                 key, groupId, value, min, min);
             props.put(key, min);
@@ -760,7 +889,10 @@ public final class GroupConfig extends AbstractConfig {
     /**
      * Create a group config instance using the given properties and defaults.
      */
-    public static GroupConfig fromProps(Map<?, ?> defaults, Properties 
overrides) {
+    public static GroupConfig fromProps(
+        Map<?, ?> defaults,
+        Properties overrides
+    ) {
         Properties props = new Properties();
         props.putAll(defaults);
         props.putAll(overrides);
@@ -784,14 +916,14 @@ public final class GroupConfig extends AbstractConfig {
     /**
      * The consumer group session timeout in milliseconds.
      */
-    public int consumerSessionTimeoutMs() {
+    public Optional<Integer> consumerSessionTimeoutMs() {
         return consumerSessionTimeoutMs;
     }
 
     /**
      * The consumer group heartbeat interval in milliseconds.
      */
-    public int consumerHeartbeatIntervalMs() {
+    public Optional<Integer> consumerHeartbeatIntervalMs() {
         return consumerHeartbeatIntervalMs;
     }
 
@@ -812,43 +944,43 @@ public final class GroupConfig extends AbstractConfig {
     /**
      * The share group session timeout in milliseconds.
      */
-    public int shareSessionTimeoutMs() {
+    public Optional<Integer> shareSessionTimeoutMs() {
         return shareSessionTimeoutMs;
     }
 
     /**
      * The share group heartbeat interval in milliseconds.
      */
-    public int shareHeartbeatIntervalMs() {
+    public Optional<Integer> shareHeartbeatIntervalMs() {
         return shareHeartbeatIntervalMs;
     }
 
     /**
      * The share group delivery count limit.
      */
-    public int shareDeliveryCountLimit() {
+    public Optional<Integer> shareDeliveryCountLimit() {
         return shareDeliveryCountLimit;
     }
 
     /**
      * The share group partition max record locks.
      */
-    public int sharePartitionMaxRecordLocks() {
+    public Optional<Integer> sharePartitionMaxRecordLocks() {
         return sharePartitionMaxRecordLocks;
     }
 
     /**
      * The share group record lock duration milliseconds.
      */
-    public int shareRecordLockDurationMs() {
+    public Optional<Integer> shareRecordLockDurationMs() {
         return shareRecordLockDurationMs;
     }
 
     /**
      * The share group auto offset reset strategy.
      */
-    public ShareGroupAutoOffsetResetStrategy shareAutoOffsetReset() {
-        return 
ShareGroupAutoOffsetResetStrategy.fromString(shareAutoOffsetReset);
+    public Optional<ShareGroupAutoOffsetResetStrategy> shareAutoOffsetReset() {
+        return shareAutoOffsetReset;
     }
 
     /**
@@ -868,28 +1000,28 @@ public final class GroupConfig extends AbstractConfig {
     /**
      * The streams group session timeout in milliseconds.
      */
-    public int streamsSessionTimeoutMs() {
+    public Optional<Integer> streamsSessionTimeoutMs() {
         return streamsSessionTimeoutMs;
     }
 
     /**
      * The streams group heartbeat interval in milliseconds.
      */
-    public int streamsHeartbeatIntervalMs() {
+    public Optional<Integer> streamsHeartbeatIntervalMs() {
         return streamsHeartbeatIntervalMs;
     }
 
     /**
      * The number of streams standby replicas for each task.
      */
-    public int streamsNumStandbyReplicas() {
+    public Optional<Integer> streamsNumStandbyReplicas() {
         return streamsNumStandbyReplicas;
     }
 
     /**
      * The initial rebalance delay for streams groups.
      */
-    public int streamsInitialRebalanceDelayMs() {
+    public Optional<Integer> streamsInitialRebalanceDelayMs() {
         return streamsInitialRebalanceDelayMs;
     }
 
@@ -910,28 +1042,21 @@ public final class GroupConfig extends AbstractConfig {
     /**
      * The task offset reporting interval.
      */
-    public int streamsTaskOffsetIntervalMs() {
+    public Optional<Integer> streamsTaskOffsetIntervalMs() {
         return streamsTaskOffsetIntervalMs;
     }
 
     /**
      * The share group isolation level.
      */
-    public IsolationLevel shareIsolationLevel() {
-        if (shareIsolationLevel == null) {
-            throw new IllegalArgumentException("Share isolation level is 
null");
-        }
-        try {
-            return 
IsolationLevel.valueOf(shareIsolationLevel.toUpperCase(Locale.ROOT));
-        } catch (IllegalArgumentException e) {
-            throw new IllegalArgumentException("Unknown Share isolation level: 
" + shareIsolationLevel);
-        }
+    public Optional<IsolationLevel> shareIsolationLevel() {
+        return shareIsolationLevel;
     }
 
     /**
      * The share group renew acknowledge enable.
      */
-    public boolean shareRenewAcknowledgeEnable() {
+    public Optional<Boolean> shareRenewAcknowledgeEnable() {
         return shareRenewAcknowledgeEnable;
     }
 
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfigManager.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfigManager.java
index 9374d768107..310d079205c 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfigManager.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfigManager.java
@@ -32,8 +32,6 @@ import java.util.concurrent.ConcurrentHashMap;
  */
 public class GroupConfigManager implements AutoCloseable {
 
-    private final GroupConfig defaultConfig;
-
     private final Map<String, GroupConfig> configMap;
 
     private final GroupCoordinatorConfig groupCoordinatorConfig;
@@ -41,12 +39,10 @@ public class GroupConfigManager implements AutoCloseable {
     private final ShareGroupConfig shareGroupConfig;
 
     public GroupConfigManager(
-        Map<?, ?> defaultConfig,
         GroupCoordinatorConfig groupCoordinatorConfig,
         ShareGroupConfig shareGroupConfig
     ) {
         this.configMap = new ConcurrentHashMap<>();
-        this.defaultConfig = new GroupConfig(defaultConfig);
         this.groupCoordinatorConfig = 
Objects.requireNonNull(groupCoordinatorConfig);
         this.shareGroupConfig = Objects.requireNonNull(shareGroupConfig);
     }
@@ -75,10 +71,7 @@ public class GroupConfigManager implements AutoCloseable {
         Properties evaluatedProps = GroupConfig.evaluate(
             newGroupConfig, groupId, groupCoordinatorConfig, shareGroupConfig);
 
-        final GroupConfig newConfig = GroupConfig.fromProps(
-            defaultConfig.originals(),
-            evaluatedProps
-        );
+        final GroupConfig newConfig = new GroupConfig(evaluatedProps);
         configMap.put(groupId, newConfig);
     }
 
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
index 1da3ca21412..b5303be53b1 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
@@ -28,14 +28,11 @@ import 
org.apache.kafka.coordinator.group.api.assignor.ShareGroupPartitionAssign
 import org.apache.kafka.coordinator.group.assignor.RangeAssignor;
 import org.apache.kafka.coordinator.group.assignor.SimpleAssignor;
 import org.apache.kafka.coordinator.group.assignor.UniformAssignor;
-import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -853,54 +850,6 @@ public class GroupCoordinatorConfig {
         return assignors;
     }
 
-    /**
-     * Copy the subset of properties that are relevant to consumer group, 
share group and streams group.
-     */
-    public Map<String, Integer> extractGroupConfigMap(ShareGroupConfig 
shareGroupConfig) {
-        Map<String, Integer> defaultConfigs = new HashMap<>();
-        defaultConfigs.putAll(extractConsumerGroupConfigMap());
-        defaultConfigs.putAll(extractShareGroupConfigMap(shareGroupConfig));
-        defaultConfigs.putAll(extractStreamsGroupConfigMap());
-        return Collections.unmodifiableMap(defaultConfigs);
-    }
-
-    /**
-     * Copy the subset of properties that are relevant to consumer group.
-     */
-    public Map<String, Integer> extractConsumerGroupConfigMap() {
-        return Map.of(
-            GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG, 
consumerGroupSessionTimeoutMs(),
-            GroupConfig.CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, 
consumerGroupHeartbeatIntervalMs());
-    }
-
-    /**
-     * Copy the subset of properties that are relevant to share group. These 
configs include those which can be set
-     * statically (for all groups) or dynamically (for a specific group). In 
those cases, the default value for the
-     * group specific dynamic config (Ex. share.session.timeout.ms) should be 
the value set for the static config
-     * (Ex. group.share.session.timeout.ms).
-     */
-    public Map<String, Integer> extractShareGroupConfigMap(ShareGroupConfig 
shareGroupConfig) {
-        return Map.of(
-            GroupConfig.SHARE_SESSION_TIMEOUT_MS_CONFIG, 
this.shareGroupSessionTimeoutMs(),
-            GroupConfig.SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, 
this.shareGroupHeartbeatIntervalMs(),
-            GroupConfig.SHARE_RECORD_LOCK_DURATION_MS_CONFIG, 
shareGroupConfig.shareGroupRecordLockDurationMs(),
-            GroupConfig.SHARE_DELIVERY_COUNT_LIMIT_CONFIG, 
shareGroupConfig.shareGroupDeliveryCountLimit(),
-            GroupConfig.SHARE_PARTITION_MAX_RECORD_LOCKS_CONFIG, 
shareGroupConfig.shareGroupPartitionMaxRecordLocks()
-        );
-    }
-
-    /**
-     * Copy the subset of properties that are relevant to streams group.
-     */
-    public Map<String, Integer> extractStreamsGroupConfigMap() {
-        return Map.of(
-            GroupConfig.STREAMS_SESSION_TIMEOUT_MS_CONFIG, 
streamsGroupSessionTimeoutMs(),
-            GroupConfig.STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG, 
streamsGroupHeartbeatIntervalMs(),
-            GroupConfig.STREAMS_NUM_STANDBY_REPLICAS_CONFIG, 
streamsGroupNumStandbyReplicas(),
-            GroupConfig.STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG, 
streamsGroupInitialRebalanceDelayMs(),
-            GroupConfig.STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG, 
streamsGroupTaskOffsetIntervalMs());
-    }
-
     /**
      * The number of threads or event loops running.
      */
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index 837caddcb03..81a6e1b0c4a 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -8758,7 +8758,7 @@ public class GroupMetadataManager {
      */
     private int consumerGroupSessionTimeoutMs(String groupId) {
         Optional<GroupConfig> groupConfig = 
groupConfigManager.groupConfig(groupId);
-        return groupConfig.map(GroupConfig::consumerSessionTimeoutMs)
+        return groupConfig.flatMap(GroupConfig::consumerSessionTimeoutMs)
             .orElse(config.consumerGroupSessionTimeoutMs());
     }
 
@@ -8767,7 +8767,7 @@ public class GroupMetadataManager {
      */
     private int consumerGroupHeartbeatIntervalMs(String groupId) {
         Optional<GroupConfig> groupConfig = 
groupConfigManager.groupConfig(groupId);
-        return groupConfig.map(GroupConfig::consumerHeartbeatIntervalMs)
+        return groupConfig.flatMap(GroupConfig::consumerHeartbeatIntervalMs)
             .orElse(config.consumerGroupHeartbeatIntervalMs());
     }
 
@@ -8796,7 +8796,7 @@ public class GroupMetadataManager {
      */
     private int shareGroupSessionTimeoutMs(String groupId) {
         Optional<GroupConfig> groupConfig = 
groupConfigManager.groupConfig(groupId);
-        return groupConfig.map(GroupConfig::shareSessionTimeoutMs)
+        return groupConfig.flatMap(GroupConfig::shareSessionTimeoutMs)
             .orElse(config.shareGroupSessionTimeoutMs());
     }
 
@@ -8805,7 +8805,7 @@ public class GroupMetadataManager {
      */
     private int shareGroupHeartbeatIntervalMs(String groupId) {
         Optional<GroupConfig> groupConfig = 
groupConfigManager.groupConfig(groupId);
-        return groupConfig.map(GroupConfig::shareHeartbeatIntervalMs)
+        return groupConfig.flatMap(GroupConfig::shareHeartbeatIntervalMs)
             .orElse(config.shareGroupHeartbeatIntervalMs());
     }
 
@@ -8834,7 +8834,7 @@ public class GroupMetadataManager {
      */
     private int streamsGroupSessionTimeoutMs(String groupId) {
         Optional<GroupConfig> groupConfig = 
groupConfigManager.groupConfig(groupId);
-        return groupConfig.map(GroupConfig::streamsSessionTimeoutMs)
+        return groupConfig.flatMap(GroupConfig::streamsSessionTimeoutMs)
             .orElse(config.streamsGroupSessionTimeoutMs());
     }
 
@@ -8843,7 +8843,7 @@ public class GroupMetadataManager {
      */
     private int streamsGroupHeartbeatIntervalMs(String groupId) {
         Optional<GroupConfig> groupConfig = 
groupConfigManager.groupConfig(groupId);
-        return groupConfig.map(GroupConfig::streamsHeartbeatIntervalMs)
+        return groupConfig.flatMap(GroupConfig::streamsHeartbeatIntervalMs)
             .orElse(config.streamsGroupHeartbeatIntervalMs());
     }
 
@@ -8872,7 +8872,7 @@ public class GroupMetadataManager {
      */
     private int streamsGroupTaskOffsetIntervalMs(String groupId) {
         Optional<GroupConfig> groupConfig = 
groupConfigManager.groupConfig(groupId);
-        return groupConfig.map(GroupConfig::streamsTaskOffsetIntervalMs)
+        return groupConfig.flatMap(GroupConfig::streamsTaskOffsetIntervalMs)
             .orElse(config.streamsGroupTaskOffsetIntervalMs());
     }
 
@@ -8881,7 +8881,7 @@ public class GroupMetadataManager {
      */
     private int streamsGroupInitialRebalanceDelayMs(String groupId) {
         Optional<GroupConfig> groupConfig = 
groupConfigManager.groupConfig(groupId);
-        return groupConfig.map(GroupConfig::streamsInitialRebalanceDelayMs)
+        return groupConfig.flatMap(GroupConfig::streamsInitialRebalanceDelayMs)
             .orElse(config.streamsGroupInitialRebalanceDelayMs());
     }
 
@@ -8897,7 +8897,7 @@ public class GroupMetadataManager {
      */
     private Map<String, String> streamsGroupAssignmentConfigs(String groupId) {
         Optional<GroupConfig> groupConfig = 
groupConfigManager.groupConfig(groupId);
-        final Integer numStandbyReplicas = 
groupConfig.map(GroupConfig::streamsNumStandbyReplicas)
+        final Integer numStandbyReplicas = 
groupConfig.flatMap(GroupConfig::streamsNumStandbyReplicas)
             .orElse(config.streamsGroupNumStandbyReplicas());
         return new TreeMap<>(Map.of(
             "num.standby.replicas", numStandbyReplicas.toString()
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigProvider.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigProvider.java
index 447750fb611..868236f9c63 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigProvider.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigProvider.java
@@ -41,7 +41,7 @@ public class ShareGroupConfigProvider {
      */
     public int recordLockDurationMsOrDefault(String groupId, int defaultValue) 
{
         return manager.groupConfig(groupId)
-            .map(GroupConfig::shareRecordLockDurationMs)
+            .flatMap(GroupConfig::shareRecordLockDurationMs)
             .orElse(defaultValue);
     }
 
@@ -55,7 +55,7 @@ public class ShareGroupConfigProvider {
      */
     public int deliveryCountLimitOrDefault(String groupId, int defaultValue) {
         return manager.groupConfig(groupId)
-            .map(GroupConfig::shareDeliveryCountLimit)
+            .flatMap(GroupConfig::shareDeliveryCountLimit)
             .orElse(defaultValue);
     }
 
@@ -69,7 +69,7 @@ public class ShareGroupConfigProvider {
      */
     public int partitionMaxRecordLocksOrDefault(String groupId, int 
defaultValue) {
         return manager.groupConfig(groupId)
-            .map(GroupConfig::sharePartitionMaxRecordLocks)
+            .flatMap(GroupConfig::sharePartitionMaxRecordLocks)
             .orElse(defaultValue);
     }
 
@@ -82,7 +82,7 @@ public class ShareGroupConfigProvider {
      */
     public boolean isRenewAcknowledgeEnabled(String groupId) {
         return manager.groupConfig(groupId)
-            .map(GroupConfig::shareRenewAcknowledgeEnable)
+            .flatMap(GroupConfig::shareRenewAcknowledgeEnable)
             .orElse(GroupConfig.SHARE_RENEW_ACKNOWLEDGE_ENABLE_DEFAULT);
     }
 
@@ -95,7 +95,7 @@ public class ShareGroupConfigProvider {
      */
     public ShareGroupAutoOffsetResetStrategy autoOffsetReset(String groupId) {
         return manager.groupConfig(groupId)
-            .map(GroupConfig::shareAutoOffsetReset)
+            .flatMap(GroupConfig::shareAutoOffsetReset)
             .orElseGet(GroupConfig::defaultShareAutoOffsetReset);
     }
 }
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigManagerTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigManagerTest.java
index 901ca0c3c1a..062b7f861b8 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigManagerTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigManagerTest.java
@@ -138,8 +138,6 @@ public class GroupConfigManagerTest {
         GroupCoordinatorConfig groupCoordinatorConfig = 
GroupCoordinatorConfig.fromProps(overrides);
         ShareGroupConfig shareGroupConfig = 
ShareGroupConfig.fromProps(overrides);
 
-        Map<String, Integer> defaultConfig = new 
HashMap<>(groupCoordinatorConfig.extractGroupConfigMap(shareGroupConfig));
-
-        return new GroupConfigManager(defaultConfig, groupCoordinatorConfig, 
shareGroupConfig);
+        return new GroupConfigManager(groupCoordinatorConfig, 
shareGroupConfig);
     }
 }
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
index d1468b1eba1..96df728aa2c 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.coordinator.group;
 
+import org.apache.kafka.common.IsolationLevel;
 import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.errors.InvalidConfigurationException;
 import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig;
@@ -394,18 +395,97 @@ public class GroupConfigTest {
     }
 
     @Test
-    public void testAssignmentIntervalMsAbsentWhenNotConfigured() {
-        // When the assignment interval config is absent, the group-level 
value is empty.
-        Properties props = new Properties();
-        GroupConfig config = GroupConfig.fromProps(Map.of(), props);
+    public void testAllFieldsAbsentWhenNotConfigured() {
+        // When no config is provided, all group-level values are empty.
+        GroupConfig config = new GroupConfig(Map.of());
+
+        // Consumer group configs
+        assertEquals(Optional.empty(), config.consumerSessionTimeoutMs());
+        assertEquals(Optional.empty(), config.consumerHeartbeatIntervalMs());
         assertEquals(Optional.empty(), config.consumerAssignmentIntervalMs());
+        assertEquals(Optional.empty(), config.consumerAssignorOffloadEnable());
+
+        // Share group configs
+        assertEquals(Optional.empty(), config.shareSessionTimeoutMs());
+        assertEquals(Optional.empty(), config.shareHeartbeatIntervalMs());
+        assertEquals(Optional.empty(), config.shareRecordLockDurationMs());
+        assertEquals(Optional.empty(), config.shareDeliveryCountLimit());
+        assertEquals(Optional.empty(), config.sharePartitionMaxRecordLocks());
+        assertEquals(Optional.empty(), config.shareAutoOffsetReset());
         assertEquals(Optional.empty(), config.shareAssignmentIntervalMs());
+        assertEquals(Optional.empty(), config.shareAssignorOffloadEnable());
+        assertEquals(Optional.empty(), config.shareIsolationLevel());
+        assertEquals(Optional.empty(), config.shareRenewAcknowledgeEnable());
+
+        // Streams group configs
+        assertEquals(Optional.empty(), config.streamsSessionTimeoutMs());
+        assertEquals(Optional.empty(), config.streamsHeartbeatIntervalMs());
+        assertEquals(Optional.empty(), config.streamsNumStandbyReplicas());
+        assertEquals(Optional.empty(), 
config.streamsInitialRebalanceDelayMs());
         assertEquals(Optional.empty(), config.streamsAssignmentIntervalMs());
+        assertEquals(Optional.empty(), config.streamsAssignorOffloadEnable());
+        assertEquals(Optional.empty(), config.streamsTaskOffsetIntervalMs());
     }
 
     @Test
-    public void testAssignmentIntervalMsNotValidatedWhenNotConfigured() {
-        // When the assignment interval config is absent, validation should 
not use the default assignment interval.
+    public void testAllFieldsPresentWhenConfigured() {
+        // When all configs are provided, all group-level values are present.
+        Map<String, String> props = new HashMap<>();
+        props.put(GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG, "50000");
+        props.put(GroupConfig.CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, "6000");
+        props.put(GroupConfig.CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG, "5000");
+        props.put(GroupConfig.CONSUMER_ASSIGNOR_OFFLOAD_ENABLE_CONFIG, "true");
+        props.put(GroupConfig.SHARE_SESSION_TIMEOUT_MS_CONFIG, "50000");
+        props.put(GroupConfig.SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, "6000");
+        props.put(GroupConfig.SHARE_RECORD_LOCK_DURATION_MS_CONFIG, "20000");
+        props.put(GroupConfig.SHARE_DELIVERY_COUNT_LIMIT_CONFIG, "5");
+        props.put(GroupConfig.SHARE_PARTITION_MAX_RECORD_LOCKS_CONFIG, "1000");
+        props.put(GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG, "earliest");
+        props.put(GroupConfig.SHARE_ASSIGNMENT_INTERVAL_MS_CONFIG, "2500");
+        props.put(GroupConfig.SHARE_ASSIGNOR_OFFLOAD_ENABLE_CONFIG, "true");
+        props.put(GroupConfig.SHARE_ISOLATION_LEVEL_CONFIG, "read_committed");
+        props.put(GroupConfig.SHARE_RENEW_ACKNOWLEDGE_ENABLE_CONFIG, "false");
+        props.put(GroupConfig.STREAMS_SESSION_TIMEOUT_MS_CONFIG, "50000");
+        props.put(GroupConfig.STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG, "6000");
+        props.put(GroupConfig.STREAMS_NUM_STANDBY_REPLICAS_CONFIG, "2");
+        props.put(GroupConfig.STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG, 
"3000");
+        props.put(GroupConfig.STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG, "1250");
+        props.put(GroupConfig.STREAMS_ASSIGNOR_OFFLOAD_ENABLE_CONFIG, "false");
+        props.put(GroupConfig.STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG, "30000");
+
+        GroupConfig config = new GroupConfig(props);
+
+        // Consumer group configs
+        assertEquals(Optional.of(50000), config.consumerSessionTimeoutMs());
+        assertEquals(Optional.of(6000), config.consumerHeartbeatIntervalMs());
+        assertEquals(Optional.of(5000), config.consumerAssignmentIntervalMs());
+        assertEquals(Optional.of(true), 
config.consumerAssignorOffloadEnable());
+
+        // Share group configs
+        assertEquals(Optional.of(50000), config.shareSessionTimeoutMs());
+        assertEquals(Optional.of(6000), config.shareHeartbeatIntervalMs());
+        assertEquals(Optional.of(20000), config.shareRecordLockDurationMs());
+        assertEquals(Optional.of(5), config.shareDeliveryCountLimit());
+        assertEquals(Optional.of(1000), config.sharePartitionMaxRecordLocks());
+        assertEquals(Optional.of(ShareGroupAutoOffsetResetStrategy.EARLIEST), 
config.shareAutoOffsetReset());
+        assertEquals(Optional.of(2500), config.shareAssignmentIntervalMs());
+        assertEquals(Optional.of(true), config.shareAssignorOffloadEnable());
+        assertEquals(Optional.of(IsolationLevel.READ_COMMITTED), 
config.shareIsolationLevel());
+        assertEquals(Optional.of(false), config.shareRenewAcknowledgeEnable());
+
+        // Streams group configs
+        assertEquals(Optional.of(50000), config.streamsSessionTimeoutMs());
+        assertEquals(Optional.of(6000), config.streamsHeartbeatIntervalMs());
+        assertEquals(Optional.of(2), config.streamsNumStandbyReplicas());
+        assertEquals(Optional.of(3000), 
config.streamsInitialRebalanceDelayMs());
+        assertEquals(Optional.of(1250), config.streamsAssignmentIntervalMs());
+        assertEquals(Optional.of(false), 
config.streamsAssignorOffloadEnable());
+        assertEquals(Optional.of(30000), config.streamsTaskOffsetIntervalMs());
+    }
+
+    @Test
+    public void testNotValidatedWhenNotConfigured() {
+        // When configs are absent, validation should not use their default 
values.
         GroupCoordinatorConfig groupCoordinatorConfig = 
GroupCoordinatorConfig.fromProps(Map.of(
             
GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, 2000,
             GroupCoordinatorConfig.SHARE_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, 
2000,
@@ -417,16 +497,6 @@ public class GroupConfigTest {
         assertDoesNotThrow(() -> GroupConfig.validate(Map.of(), 
groupCoordinatorConfig, createShareGroupConfig()));
     }
 
-    @Test
-    public void testAssignorOffloadEnableAbsentWhenNotConfigured() {
-        // When the offload enable config is absent, the group-level value is 
empty.
-        Properties props = new Properties();
-        GroupConfig config = GroupConfig.fromProps(Map.of(), props);
-        assertEquals(Optional.empty(), config.consumerAssignorOffloadEnable());
-        assertEquals(Optional.empty(), config.shareAssignorOffloadEnable());
-        assertEquals(Optional.empty(), config.streamsAssignorOffloadEnable());
-    }
-
     @Test
     public void testInvalidConfigName() {
         Map<String, String> props = new HashMap<>();
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index ab1807a3cbc..a4b8141ed5c 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -21252,7 +21252,6 @@ public class GroupMetadataManagerTest {
         GroupCoordinatorConfig groupCoordinatorConfig = new 
GroupCoordinatorConfig(kafkaConfig);
         ShareGroupConfig shareGroupConfig = new ShareGroupConfig(kafkaConfig);
         GroupConfigManager groupConfigManager = new GroupConfigManager(
-            groupCoordinatorConfig.extractGroupConfigMap(shareGroupConfig),
             groupCoordinatorConfig,
             shareGroupConfig
         );
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigProviderTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigProviderTest.java
index 1eec4abc0f3..a426b51554b 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigProviderTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigProviderTest.java
@@ -37,7 +37,7 @@ public class ShareGroupConfigProviderTest {
     void testRecordLockDurationMsOrDefaultWithGroupConfig() {
         GroupConfigManager groupConfigManager = mock(GroupConfigManager.class);
         GroupConfig groupConfig = mock(GroupConfig.class);
-        when(groupConfig.shareRecordLockDurationMs()).thenReturn(1000);
+        
when(groupConfig.shareRecordLockDurationMs()).thenReturn(Optional.of(1000));
         
when(groupConfigManager.groupConfig("test-group")).thenReturn(Optional.of(groupConfig));
         provider = new ShareGroupConfigProvider(groupConfigManager);
 
@@ -57,7 +57,7 @@ public class ShareGroupConfigProviderTest {
     void testDeliveryCountLimitOrDefaultWithGroupConfig() {
         GroupConfigManager groupConfigManager = mock(GroupConfigManager.class);
         GroupConfig groupConfig = mock(GroupConfig.class);
-        when(groupConfig.shareDeliveryCountLimit()).thenReturn(8);
+        when(groupConfig.shareDeliveryCountLimit()).thenReturn(Optional.of(8));
         
when(groupConfigManager.groupConfig("test-group")).thenReturn(Optional.of(groupConfig));
         provider = new ShareGroupConfigProvider(groupConfigManager);
 
@@ -77,7 +77,7 @@ public class ShareGroupConfigProviderTest {
     void testPartitionMaxRecordLocksOrDefaultWithGroupConfig() {
         GroupConfigManager groupConfigManager = mock(GroupConfigManager.class);
         GroupConfig groupConfig = mock(GroupConfig.class);
-        when(groupConfig.sharePartitionMaxRecordLocks()).thenReturn(5000);
+        
when(groupConfig.sharePartitionMaxRecordLocks()).thenReturn(Optional.of(5000));
         
when(groupConfigManager.groupConfig("test-group")).thenReturn(Optional.of(groupConfig));
         provider = new ShareGroupConfigProvider(groupConfigManager);
 
@@ -97,7 +97,7 @@ public class ShareGroupConfigProviderTest {
     void testIsRenewAcknowledgeDisabledWithGroupConfig() {
         GroupConfigManager groupConfigManager = mock(GroupConfigManager.class);
         GroupConfig groupConfig = mock(GroupConfig.class);
-        when(groupConfig.shareRenewAcknowledgeEnable()).thenReturn(false);
+        
when(groupConfig.shareRenewAcknowledgeEnable()).thenReturn(Optional.of(false));
         
when(groupConfigManager.groupConfig("test-group")).thenReturn(Optional.of(groupConfig));
         provider = new ShareGroupConfigProvider(groupConfigManager);
 
@@ -117,7 +117,7 @@ public class ShareGroupConfigProviderTest {
     void testAutoOffsetResetWithGroupConfig() {
         GroupConfigManager groupConfigManager = mock(GroupConfigManager.class);
         GroupConfig groupConfig = mock(GroupConfig.class);
-        
when(groupConfig.shareAutoOffsetReset()).thenReturn(ShareGroupAutoOffsetResetStrategy.EARLIEST);
+        
when(groupConfig.shareAutoOffsetReset()).thenReturn(Optional.of(ShareGroupAutoOffsetResetStrategy.EARLIEST));
         
when(groupConfigManager.groupConfig("test-group")).thenReturn(Optional.of(groupConfig));
         provider = new ShareGroupConfigProvider(groupConfigManager);
 
diff --git 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/coordinator/GroupCoordinatorShardLoadingBenchmark.java
 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/coordinator/GroupCoordinatorShardLoadingBenchmark.java
index f2f3b835c50..31798621f95 100644
--- 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/coordinator/GroupCoordinatorShardLoadingBenchmark.java
+++ 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/coordinator/GroupCoordinatorShardLoadingBenchmark.java
@@ -290,7 +290,7 @@ public class GroupCoordinatorShardLoadingBenchmark {
 
     @Setup(Level.Invocation)
     public void setupInvocation() {
-        GroupConfigManager configManager = new GroupConfigManager(new 
HashMap<>(), config, shareGroupConfig);
+        GroupConfigManager configManager = new GroupConfigManager(config, 
shareGroupConfig);
         LogContext logContext = new LogContext();
         SnapshotRegistry snapshotRegistry = new SnapshotRegistry(logContext);
 

Reply via email to