This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new e4cd243abca KAFKA-20337 Make all GroupConfig fields Optional and clean
up validation (#22003)
e4cd243abca is described below
commit e4cd243abca0db8b2de1ef0862ebf94a7b9c8f5a
Author: David Jacot <[email protected]>
AuthorDate: Thu Apr 9 18:58:26 2026 +0200
KAFKA-20337 Make all GroupConfig fields Optional and clean up validation
(#22003)
All GroupConfig fields are now Optional<T>, storing only explicitly
provided values. Broker-level defaults are resolved at access time via
flatMap().orElse(brokerDefault), eliminating stale-capture issues when
broker configs change dynamically.
Key changes:
- All 21 GroupConfig fields are private Optional, using
optionalInt/Boolean/String helpers based on originals().
- GroupConfigManager no longer needs a defaultConfig; constructor
simplified.
- GroupCoordinatorConfig.extractGroupConfigMap(ShareGroupConfig)
removed.
- All consumers (GroupMetadataManager, ShareGroupConfigProvider,
KafkaApis) use flatMap.
- validateValues refactored with validateIntRange/Max/Min helpers
operating on a single filtered parsed map.
- Cross-field checks use broker defaults for missing values.
Reviewers: Sean Quah <[email protected]>, Chia-Ping Tsai
<[email protected]>
Co-authored-by: Claude Opus 4.6 (1M context) <[email protected]>
---
.../src/main/scala/kafka/server/BrokerServer.scala | 2 +-
core/src/main/scala/kafka/server/KafkaApis.scala | 9 +-
.../kafka/server/share/SharePartitionTest.java | 36 +-
.../kafka/api/PlaintextAdminIntegrationTest.scala | 4 +-
.../kafka/server/DynamicConfigChangeTest.scala | 5 +-
.../kafka/coordinator/group/GroupConfig.java | 749 ++++++++++++---------
.../coordinator/group/GroupConfigManager.java | 9 +-
.../coordinator/group/GroupCoordinatorConfig.java | 51 --
.../coordinator/group/GroupMetadataManager.java | 18 +-
.../modern/share/ShareGroupConfigProvider.java | 10 +-
.../coordinator/group/GroupConfigManagerTest.java | 4 +-
.../kafka/coordinator/group/GroupConfigTest.java | 102 ++-
.../group/GroupMetadataManagerTest.java | 1 -
.../modern/share/ShareGroupConfigProviderTest.java | 10 +-
.../GroupCoordinatorShardLoadingBenchmark.java | 2 +-
15 files changed, 574 insertions(+), 438 deletions(-)
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala
b/core/src/main/scala/kafka/server/BrokerServer.scala
index cf43e9ef1ca..102981827ce 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -375,7 +375,7 @@ class BrokerServer(
authorizerPlugin = config.createNewAuthorizer(metrics,
ProcessRole.BrokerRole.toString)
/* initializing the groupConfigManager */
- groupConfigManager = new
GroupConfigManager(config.groupCoordinatorConfig.extractGroupConfigMap(config.shareGroupConfig),
config.groupCoordinatorConfig, config.shareGroupConfig)
+ groupConfigManager = new
GroupConfigManager(config.groupCoordinatorConfig, config.shareGroupConfig)
/* create share coordinator */
shareCoordinator = createShareCoordinator()
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 144696be434..9e923563ad0 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -2827,9 +2827,10 @@ class KafkaApis(val requestChannel: RequestChannel,
}
} else {
// Compute group-specific timeout for caching errors (2 *
heartbeat interval)
- val heartbeatIntervalMs =
Option(groupConfigManager.groupConfig(streamsGroupHeartbeatRequest.data.groupId).orElse(null))
- .map(_.streamsHeartbeatIntervalMs().toLong)
-
.getOrElse(config.groupCoordinatorConfig.streamsGroupHeartbeatIntervalMs().toLong)
+ val heartbeatIntervalMs =
groupConfigManager.groupConfig(streamsGroupHeartbeatRequest.data.groupId)
+ .flatMap[java.lang.Integer](gc =>
gc.streamsHeartbeatIntervalMs())
+ .orElseGet(() =>
config.groupCoordinatorConfig.streamsGroupHeartbeatIntervalMs())
+ .toLong
val timeoutMs = heartbeatIntervalMs * 2
autoTopicCreationManager.createStreamsInternalTopics(topicsToCreate,
requestContext, timeoutMs)
@@ -3460,7 +3461,7 @@ class KafkaApis(val requestChannel: RequestChannel,
shareFetchRequest.maxWait,
fetchMinBytes,
fetchMaxBytes,
- FetchIsolation.of(FetchRequest.CONSUMER_REPLICA_ID,
groupConfigManager.groupConfig(groupId).map(_.shareIsolationLevel()).orElse(GroupConfig.defaultShareIsolationLevel)),
+ FetchIsolation.of(FetchRequest.CONSUMER_REPLICA_ID,
groupConfigManager.groupConfig(groupId).flatMap(_.shareIsolationLevel()).orElse(GroupConfig.defaultShareIsolationLevel)),
clientMetadata,
true
)
diff --git a/core/src/test/java/kafka/server/share/SharePartitionTest.java
b/core/src/test/java/kafka/server/share/SharePartitionTest.java
index 285de56397f..24d37dc0700 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionTest.java
@@ -219,7 +219,7 @@ public class SharePartitionTest {
GroupConfigManager groupConfigManager =
Mockito.mock(GroupConfigManager.class);
GroupConfig groupConfig = Mockito.mock(GroupConfig.class);
Mockito.when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.of(groupConfig));
-
Mockito.when(groupConfig.shareAutoOffsetReset()).thenReturn(ShareGroupAutoOffsetResetStrategy.EARLIEST);
+
Mockito.when(groupConfig.shareAutoOffsetReset()).thenReturn(Optional.of(ShareGroupAutoOffsetResetStrategy.EARLIEST));
ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class);
@@ -270,7 +270,7 @@ public class SharePartitionTest {
GroupConfigManager groupConfigManager =
Mockito.mock(GroupConfigManager.class);
GroupConfig groupConfig = Mockito.mock(GroupConfig.class);
Mockito.when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.of(groupConfig));
-
Mockito.when(groupConfig.shareAutoOffsetReset()).thenReturn(ShareGroupAutoOffsetResetStrategy.LATEST);
+
Mockito.when(groupConfig.shareAutoOffsetReset()).thenReturn(Optional.of(ShareGroupAutoOffsetResetStrategy.LATEST));
ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class);
@@ -330,7 +330,7 @@ public class SharePartitionTest {
Mockito.when(resetStrategy.type()).thenReturn(ShareGroupAutoOffsetResetStrategy.StrategyType.BY_DURATION);
Mockito.when(resetStrategy.timestamp()).thenReturn(expectedTimestamp);
-
Mockito.when(groupConfig.shareAutoOffsetReset()).thenReturn(resetStrategy);
+
Mockito.when(groupConfig.shareAutoOffsetReset()).thenReturn(Optional.of(resetStrategy));
ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class);
@@ -480,7 +480,7 @@ public class SharePartitionTest {
GroupConfigManager groupConfigManager =
Mockito.mock(GroupConfigManager.class);
GroupConfig groupConfig = Mockito.mock(GroupConfig.class);
Mockito.when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.of(groupConfig));
-
Mockito.when(groupConfig.shareAutoOffsetReset()).thenReturn(ShareGroupAutoOffsetResetStrategy.EARLIEST);
+
Mockito.when(groupConfig.shareAutoOffsetReset()).thenReturn(Optional.of(ShareGroupAutoOffsetResetStrategy.EARLIEST));
ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class);
@@ -531,7 +531,7 @@ public class SharePartitionTest {
// final ShareGroupAutoOffsetResetStrategy resetStrategy =
ShareGroupAutoOffsetResetStrategy.fromString("by_duration:PT1H");
final ShareGroupAutoOffsetResetStrategy resetStrategy =
Mockito.mock(ShareGroupAutoOffsetResetStrategy.class);
final long expectedTimestamp = MOCK_TIME.milliseconds() -
TimeUnit.HOURS.toMillis(1);
-
Mockito.when(groupConfig.shareAutoOffsetReset()).thenReturn(resetStrategy);
+
Mockito.when(groupConfig.shareAutoOffsetReset()).thenReturn(Optional.of(resetStrategy));
Mockito.when(resetStrategy.type()).thenReturn(ShareGroupAutoOffsetResetStrategy.StrategyType.BY_DURATION);
Mockito.when(resetStrategy.timestamp()).thenReturn(expectedTimestamp);
@@ -7151,7 +7151,7 @@ public class SharePartitionTest {
GroupConfig groupConfig = Mockito.mock(GroupConfig.class);
int expectedDurationMs = 500;
Mockito.when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.of(groupConfig));
-
Mockito.when(groupConfig.shareRecordLockDurationMs()).thenReturn(expectedDurationMs);
+
Mockito.when(groupConfig.shareRecordLockDurationMs()).thenReturn(Optional.of(expectedDurationMs));
SharePartition sharePartition = SharePartitionBuilder.builder()
.withConfigProvider(new
ShareGroupConfigProvider(groupConfigManager)).build();
@@ -7172,8 +7172,8 @@ public class SharePartitionTest {
Mockito.when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.of(groupConfig));
// First invocation of shareRecordLockDurationMs() returns 500, and
the second invocation returns 1000
Mockito.when(groupConfig.shareRecordLockDurationMs())
- .thenReturn(expectedDurationMs1)
- .thenReturn(expectedDurationMs2);
+ .thenReturn(Optional.of(expectedDurationMs1))
+ .thenReturn(Optional.of(expectedDurationMs2));
SharePartition sharePartition = SharePartitionBuilder.builder()
.withConfigProvider(new
ShareGroupConfigProvider(groupConfigManager)).build();
@@ -12324,7 +12324,7 @@ public class SharePartitionTest {
public void testMaxDeliveryCountUsesGroupConfigWhenPresent() {
GroupConfigManager groupConfigManager =
Mockito.mock(GroupConfigManager.class);
GroupConfig groupConfig = Mockito.mock(GroupConfig.class);
- when(groupConfig.shareDeliveryCountLimit()).thenReturn(8);
+ when(groupConfig.shareDeliveryCountLimit()).thenReturn(Optional.of(8));
when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.of(groupConfig));
SharePartition sharePartition = SharePartitionBuilder.builder()
@@ -12376,7 +12376,7 @@ public class SharePartitionTest {
// Dynamically decrease the limit to 2 via group config BEFORE
releasing.
GroupConfig groupConfig = Mockito.mock(GroupConfig.class);
- when(groupConfig.shareDeliveryCountLimit()).thenReturn(2);
+ when(groupConfig.shareDeliveryCountLimit()).thenReturn(Optional.of(2));
when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.of(groupConfig));
// Release: archival check fires because deliveryCount(2) >=
maxDeliveryCount(2),
@@ -12412,7 +12412,7 @@ public class SharePartitionTest {
// Now increase limit to 10 via group config before the second acquire.
GroupConfig groupConfig = Mockito.mock(GroupConfig.class);
- when(groupConfig.shareDeliveryCountLimit()).thenReturn(10);
+
when(groupConfig.shareDeliveryCountLimit()).thenReturn(Optional.of(10));
when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.of(groupConfig));
// Second acquire: deliveryCount = 2. With old limit (2) this would
archive.
@@ -12432,7 +12432,7 @@ public class SharePartitionTest {
public void testMaxInFlightRecordsUsesGroupConfigWhenPresent() {
GroupConfigManager groupConfigManager =
Mockito.mock(GroupConfigManager.class);
GroupConfig groupConfig = Mockito.mock(GroupConfig.class);
- when(groupConfig.sharePartitionMaxRecordLocks()).thenReturn(5000);
+
when(groupConfig.sharePartitionMaxRecordLocks()).thenReturn(Optional.of(5000));
when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.of(groupConfig));
SharePartition sharePartition = SharePartitionBuilder.builder()
@@ -12477,7 +12477,7 @@ public class SharePartitionTest {
// Dynamically decrease the limit to 30 via group config.
GroupConfig groupConfig = Mockito.mock(GroupConfig.class);
- when(groupConfig.sharePartitionMaxRecordLocks()).thenReturn(30);
+
when(groupConfig.sharePartitionMaxRecordLocks()).thenReturn(Optional.of(30));
when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.of(groupConfig));
// The effective limit should now be 30.
@@ -12510,7 +12510,7 @@ public class SharePartitionTest {
// Increase limit to 500 via group config.
GroupConfig groupConfig = Mockito.mock(GroupConfig.class);
- when(groupConfig.sharePartitionMaxRecordLocks()).thenReturn(500);
+
when(groupConfig.sharePartitionMaxRecordLocks()).thenReturn(Optional.of(500));
when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.of(groupConfig));
assertEquals(500, sharePartition.maxInFlightRecords());
@@ -12540,14 +12540,14 @@ public class SharePartitionTest {
// Dynamically set limit to exactly the in-flight count via group
config.
GroupConfig groupConfig = Mockito.mock(GroupConfig.class);
- when(groupConfig.sharePartitionMaxRecordLocks()).thenReturn(50);
+
when(groupConfig.sharePartitionMaxRecordLocks()).thenReturn(Optional.of(50));
when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.of(groupConfig));
// Still at boundary: 50 < 50 is false.
assertFalse(sharePartition.canAcquireRecords());
// Increase by 1 to cross the boundary.
- when(groupConfig.sharePartitionMaxRecordLocks()).thenReturn(51);
+
when(groupConfig.sharePartitionMaxRecordLocks()).thenReturn(Optional.of(51));
// Now 50 < 51 is true.
assertTrue(sharePartition.canAcquireRecords());
@@ -12557,7 +12557,7 @@ public class SharePartitionTest {
public void testDynamicPartitionMaxRecordLocksRemoveGroupConfig() {
GroupConfigManager groupConfigManager =
Mockito.mock(GroupConfigManager.class);
GroupConfig groupConfig = Mockito.mock(GroupConfig.class);
- when(groupConfig.sharePartitionMaxRecordLocks()).thenReturn(500);
+
when(groupConfig.sharePartitionMaxRecordLocks()).thenReturn(Optional.of(500));
when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.of(groupConfig));
SharePartition sharePartition = SharePartitionBuilder.builder()
@@ -12591,7 +12591,7 @@ public class SharePartitionTest {
// Decrease limit to 20, well below the 50 in-flight.
GroupConfig groupConfig = Mockito.mock(GroupConfig.class);
- when(groupConfig.sharePartitionMaxRecordLocks()).thenReturn(20);
+
when(groupConfig.sharePartitionMaxRecordLocks()).thenReturn(Optional.of(20));
when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.of(groupConfig));
// maxInFlightRecords - inFlightRecordsCount = 20 - 50 = -30, so
maxRecordsToAcquire <= 0.
diff --git
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
index a9c23a0bc08..97adc98cee4 100644
---
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
+++
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
@@ -1176,7 +1176,7 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
var configs = describeResult.all.get(15, TimeUnit.SECONDS)
assertEquals("55000",
configs.get(groupResource).get(GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG).value)
// Before restart, 55000 is within [45000, 60000], so no adjustment needed
- assertEquals(55000,
brokerServers.head.groupConfigManager.groupConfig(groupId).get.consumerSessionTimeoutMs)
+ assertEquals(Optional.of(55000),
brokerServers.head.groupConfigManager.groupConfig(groupId).get.consumerSessionTimeoutMs)
// Kill all brokers
client.close()
@@ -1200,7 +1200,7 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
configs.get(groupResource).get(GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG).source)
// Verify effective value is adjusted (55000 evaluated to new max 50000)
- assertEquals(50000,
brokerServers.head.groupConfigManager.groupConfig(groupId).get.consumerSessionTimeoutMs)
+ assertEquals(Optional.of(50000),
brokerServers.head.groupConfigManager.groupConfig(groupId).get.consumerSessionTimeoutMs)
}
@Test
diff --git
a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
index d7231fbe7d7..0f510d1a06a 100644
--- a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
@@ -31,6 +31,7 @@ import
org.apache.kafka.common.quota.ClientQuotaEntity.{CLIENT_ID, IP, USER}
import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity}
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.{TopicPartition, Uuid}
+import java.util.Optional
import org.apache.kafka.coordinator.group.{GroupConfig, GroupCoordinatorConfig}
import org.apache.kafka.coordinator.share.ShareCoordinatorConfig
import org.apache.kafka.metadata.MetadataCache
@@ -419,7 +420,7 @@ class DynamicConfigChangeTest extends
KafkaServerTestHarness {
}
val groupConfig =
brokerServers.head.groupCoordinator.groupConfig(consumerGroupId).get()
- assertEquals(newSessionTimeoutMs, groupConfig.consumerSessionTimeoutMs())
+ assertEquals(Optional.of(newSessionTimeoutMs),
groupConfig.consumerSessionTimeoutMs())
}
@Test
@@ -445,7 +446,7 @@ class DynamicConfigChangeTest extends
KafkaServerTestHarness {
}
val groupConfig =
brokerServers.head.groupCoordinator.groupConfig(shareGroupId).get()
- assertEquals(newRecordLockDurationMs,
groupConfig.shareRecordLockDurationMs)
+ assertEquals(Optional.of(newRecordLockDurationMs),
groupConfig.shareRecordLockDurationMs)
}
@Test
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
index 0a6841ec324..07247bf76ab 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
@@ -27,7 +27,6 @@ import
org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
@@ -47,7 +46,7 @@ import static
org.apache.kafka.common.config.ConfigDef.ValidString.in;
*/
public final class GroupConfig extends AbstractConfig {
- private static final Logger log =
LoggerFactory.getLogger(GroupConfig.class);
+ private static final Logger LOG =
LoggerFactory.getLogger(GroupConfig.class);
public static final String CONSUMER_SESSION_TIMEOUT_MS_CONFIG =
"consumer.session.timeout.ms";
@@ -106,56 +105,47 @@ public final class GroupConfig extends AbstractConfig {
public static final String STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG =
"streams.task.offset.interval.ms";
- public final int consumerSessionTimeoutMs;
+ private final Optional<Integer> consumerSessionTimeoutMs;
- public final int consumerHeartbeatIntervalMs;
+ private final Optional<Integer> consumerHeartbeatIntervalMs;
- // These have to be optionals because their default group coordinator
configs are dynamic,
- // so we must not capture the default value at the time of GroupConfig
construction.
- // KAFKA-20337 tracks the work to refactor GroupConfig into something more
consistent.
- public final Optional<Integer> consumerAssignmentIntervalMs;
+ private final Optional<Integer> consumerAssignmentIntervalMs;
- public final Optional<Boolean> consumerAssignorOffloadEnable;
+ private final Optional<Boolean> consumerAssignorOffloadEnable;
- public final int shareSessionTimeoutMs;
+ private final Optional<Integer> shareSessionTimeoutMs;
- public final int shareHeartbeatIntervalMs;
+ private final Optional<Integer> shareHeartbeatIntervalMs;
- public final int shareRecordLockDurationMs;
+ private final Optional<Integer> shareRecordLockDurationMs;
- public final int shareDeliveryCountLimit;
+ private final Optional<Integer> shareDeliveryCountLimit;
- public final int sharePartitionMaxRecordLocks;
+ private final Optional<Integer> sharePartitionMaxRecordLocks;
- public final String shareAutoOffsetReset;
+ private final Optional<ShareGroupAutoOffsetResetStrategy>
shareAutoOffsetReset;
- // These have to be optionals because their default group coordinator
configs are dynamic,
- // so we must not capture the default value at the time of GroupConfig
construction.
- // KAFKA-20337 tracks the work to refactor GroupConfig into something more
consistent.
- public final Optional<Integer> shareAssignmentIntervalMs;
+ private final Optional<Integer> shareAssignmentIntervalMs;
- public final Optional<Boolean> shareAssignorOffloadEnable;
+ private final Optional<Boolean> shareAssignorOffloadEnable;
- public final int streamsSessionTimeoutMs;
+ private final Optional<Integer> streamsSessionTimeoutMs;
- public final int streamsHeartbeatIntervalMs;
+ private final Optional<Integer> streamsHeartbeatIntervalMs;
- public final int streamsNumStandbyReplicas;
+ private final Optional<Integer> streamsNumStandbyReplicas;
- public final int streamsInitialRebalanceDelayMs;
+ private final Optional<Integer> streamsInitialRebalanceDelayMs;
- // These have to be optionals because their default group coordinator
configs are dynamic,
- // so we must not capture the default value at the time of GroupConfig
construction.
- // KAFKA-20337 tracks the work to refactor GroupConfig into something more
consistent.
- public final Optional<Integer> streamsAssignmentIntervalMs;
+ private final Optional<Integer> streamsAssignmentIntervalMs;
- public final Optional<Boolean> streamsAssignorOffloadEnable;
+ private final Optional<Boolean> streamsAssignorOffloadEnable;
- public final int streamsTaskOffsetIntervalMs;
+ private final Optional<Integer> streamsTaskOffsetIntervalMs;
- public final String shareIsolationLevel;
+ private final Optional<IsolationLevel> shareIsolationLevel;
- public final boolean shareRenewAcknowledgeEnable;
+ private final Optional<Boolean> shareRenewAcknowledgeEnable;
public static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(CONSUMER_SESSION_TIMEOUT_MS_CONFIG,
@@ -286,13 +276,13 @@ public final class GroupConfig extends AbstractConfig {
* {@code Optional.empty()} indicates that the config has no broker-level
synonym.
*/
public static final Map<String, Optional<String>>
ALL_GROUP_CONFIG_SYNONYMS = Map.ofEntries(
- // Consumer group configs
+ // Consumer group configs.
Map.entry(CONSUMER_SESSION_TIMEOUT_MS_CONFIG,
Optional.of(GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG)),
Map.entry(CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG,
Optional.of(GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG)),
Map.entry(CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG,
Optional.of(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG)),
Map.entry(CONSUMER_ASSIGNOR_OFFLOAD_ENABLE_CONFIG,
Optional.of(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG)),
- // Share group configs
+ // Share group configs.
Map.entry(SHARE_SESSION_TIMEOUT_MS_CONFIG,
Optional.of(GroupCoordinatorConfig.SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG)),
Map.entry(SHARE_HEARTBEAT_INTERVAL_MS_CONFIG,
Optional.of(GroupCoordinatorConfig.SHARE_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG)),
Map.entry(SHARE_RECORD_LOCK_DURATION_MS_CONFIG,
Optional.of(ShareGroupConfig.SHARE_GROUP_RECORD_LOCK_DURATION_MS_CONFIG)),
@@ -304,7 +294,7 @@ public final class GroupConfig extends AbstractConfig {
Map.entry(SHARE_ASSIGNMENT_INTERVAL_MS_CONFIG,
Optional.of(GroupCoordinatorConfig.SHARE_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG)),
Map.entry(SHARE_ASSIGNOR_OFFLOAD_ENABLE_CONFIG,
Optional.of(GroupCoordinatorConfig.SHARE_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG)),
- // Streams group configs
+ // Streams group configs.
Map.entry(STREAMS_SESSION_TIMEOUT_MS_CONFIG,
Optional.of(GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG)),
Map.entry(STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG,
Optional.of(GroupCoordinatorConfig.STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG)),
Map.entry(STREAMS_NUM_STANDBY_REPLICAS_CONFIG,
Optional.of(GroupCoordinatorConfig.STREAMS_GROUP_NUM_STANDBY_REPLICAS_CONFIG)),
@@ -324,48 +314,41 @@ public final class GroupConfig extends AbstractConfig {
public GroupConfig(Map<?, ?> props) {
super(CONFIG_DEF, props, false);
- this.consumerSessionTimeoutMs =
getInt(CONSUMER_SESSION_TIMEOUT_MS_CONFIG);
- this.consumerHeartbeatIntervalMs =
getInt(CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG);
- // These have to be optionals because their default group coordinator
configs are dynamic,
- // so we must not capture the default value at the time of GroupConfig
construction.
- // KAFKA-20337 tracks the work to refactor GroupConfig into something
more consistent.
- this.consumerAssignmentIntervalMs =
props.containsKey(CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG) ?
- Optional.of(getInt(CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG)) :
- Optional.empty();
- this.consumerAssignorOffloadEnable =
props.containsKey(CONSUMER_ASSIGNOR_OFFLOAD_ENABLE_CONFIG) ?
- Optional.of(getBoolean(CONSUMER_ASSIGNOR_OFFLOAD_ENABLE_CONFIG)) :
- Optional.empty();
- this.shareSessionTimeoutMs = getInt(SHARE_SESSION_TIMEOUT_MS_CONFIG);
- this.shareHeartbeatIntervalMs =
getInt(SHARE_HEARTBEAT_INTERVAL_MS_CONFIG);
- this.shareRecordLockDurationMs =
getInt(SHARE_RECORD_LOCK_DURATION_MS_CONFIG);
- this.shareDeliveryCountLimit =
getInt(SHARE_DELIVERY_COUNT_LIMIT_CONFIG);
- this.sharePartitionMaxRecordLocks =
getInt(SHARE_PARTITION_MAX_RECORD_LOCKS_CONFIG);
- this.shareAutoOffsetReset = getString(SHARE_AUTO_OFFSET_RESET_CONFIG);
- // These have to be optionals because their default group coordinator
configs are dynamic,
- // so we must not capture the default value at the time of GroupConfig
construction.
- // KAFKA-20337 tracks the work to refactor GroupConfig into something
more consistent.
- this.shareAssignmentIntervalMs =
props.containsKey(SHARE_ASSIGNMENT_INTERVAL_MS_CONFIG) ?
- Optional.of(getInt(SHARE_ASSIGNMENT_INTERVAL_MS_CONFIG)) :
- Optional.empty();
- this.shareAssignorOffloadEnable =
props.containsKey(SHARE_ASSIGNOR_OFFLOAD_ENABLE_CONFIG) ?
- Optional.of(getBoolean(SHARE_ASSIGNOR_OFFLOAD_ENABLE_CONFIG)) :
- Optional.empty();
- this.streamsSessionTimeoutMs =
getInt(STREAMS_SESSION_TIMEOUT_MS_CONFIG);
- this.streamsHeartbeatIntervalMs =
getInt(STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG);
- this.streamsNumStandbyReplicas =
getInt(STREAMS_NUM_STANDBY_REPLICAS_CONFIG);
- this.streamsInitialRebalanceDelayMs =
getInt(STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG);
- // These have to be optionals because their default group coordinator
configs are dynamic,
- // so we must not capture the default value at the time of GroupConfig
construction.
- // KAFKA-20337 tracks the work to refactor GroupConfig into something
more consistent.
- this.streamsAssignmentIntervalMs =
props.containsKey(STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG) ?
- Optional.of(getInt(STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG)) :
- Optional.empty();
- this.streamsAssignorOffloadEnable =
props.containsKey(STREAMS_ASSIGNOR_OFFLOAD_ENABLE_CONFIG) ?
- Optional.of(getBoolean(STREAMS_ASSIGNOR_OFFLOAD_ENABLE_CONFIG)) :
- Optional.empty();
- this.streamsTaskOffsetIntervalMs =
getInt(STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG);
- this.shareIsolationLevel = getString(SHARE_ISOLATION_LEVEL_CONFIG);
- this.shareRenewAcknowledgeEnable =
getBoolean(SHARE_RENEW_ACKNOWLEDGE_ENABLE_CONFIG);
+ this.consumerSessionTimeoutMs =
optionalInt(CONSUMER_SESSION_TIMEOUT_MS_CONFIG);
+ this.consumerHeartbeatIntervalMs =
optionalInt(CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG);
+ this.consumerAssignmentIntervalMs =
optionalInt(CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG);
+ this.consumerAssignorOffloadEnable =
optionalBoolean(CONSUMER_ASSIGNOR_OFFLOAD_ENABLE_CONFIG);
+ this.shareSessionTimeoutMs =
optionalInt(SHARE_SESSION_TIMEOUT_MS_CONFIG);
+ this.shareHeartbeatIntervalMs =
optionalInt(SHARE_HEARTBEAT_INTERVAL_MS_CONFIG);
+ this.shareRecordLockDurationMs =
optionalInt(SHARE_RECORD_LOCK_DURATION_MS_CONFIG);
+ this.shareDeliveryCountLimit =
optionalInt(SHARE_DELIVERY_COUNT_LIMIT_CONFIG);
+ this.sharePartitionMaxRecordLocks =
optionalInt(SHARE_PARTITION_MAX_RECORD_LOCKS_CONFIG);
+ this.shareAutoOffsetReset =
optionalString(SHARE_AUTO_OFFSET_RESET_CONFIG)
+ .map(ShareGroupAutoOffsetResetStrategy::fromString);
+ this.shareAssignmentIntervalMs =
optionalInt(SHARE_ASSIGNMENT_INTERVAL_MS_CONFIG);
+ this.shareAssignorOffloadEnable =
optionalBoolean(SHARE_ASSIGNOR_OFFLOAD_ENABLE_CONFIG);
+ this.streamsSessionTimeoutMs =
optionalInt(STREAMS_SESSION_TIMEOUT_MS_CONFIG);
+ this.streamsHeartbeatIntervalMs =
optionalInt(STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG);
+ this.streamsNumStandbyReplicas =
optionalInt(STREAMS_NUM_STANDBY_REPLICAS_CONFIG);
+ this.streamsInitialRebalanceDelayMs =
optionalInt(STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG);
+ this.streamsAssignmentIntervalMs =
optionalInt(STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG);
+ this.streamsAssignorOffloadEnable =
optionalBoolean(STREAMS_ASSIGNOR_OFFLOAD_ENABLE_CONFIG);
+ this.streamsTaskOffsetIntervalMs =
optionalInt(STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG);
+ this.shareIsolationLevel = optionalString(SHARE_ISOLATION_LEVEL_CONFIG)
+ .map(s -> IsolationLevel.valueOf(s.toUpperCase(Locale.ROOT)));
+ this.shareRenewAcknowledgeEnable =
optionalBoolean(SHARE_RENEW_ACKNOWLEDGE_ENABLE_CONFIG);
+ }
+
+ private Optional<Integer> optionalInt(String key) {
+ return originals().containsKey(key) ? Optional.of(getInt(key)) :
Optional.empty();
+ }
+
+ private Optional<Boolean> optionalBoolean(String key) {
+ return originals().containsKey(key) ? Optional.of(getBoolean(key)) :
Optional.empty();
+ }
+
+ private Optional<String> optionalString(String key) {
+ return originals().containsKey(key) ? Optional.of(getString(key)) :
Optional.empty();
}
public static Optional<Type> configType(String configName) {
@@ -377,7 +360,7 @@ public final class GroupConfig extends AbstractConfig {
}
/**
- * Check that property names are valid
+ * Check that property names are valid.
*/
public static void validateNames(Map<String, ?> props) {
Set<String> names = configNames();
@@ -389,172 +372,242 @@ public final class GroupConfig extends AbstractConfig {
}
/**
- * Validates the values of the given properties.
- */
- @SuppressWarnings({"CyclomaticComplexity", "NPathComplexity"})
- private static void validateValues(Map<String, Object> unparsedMap,
GroupCoordinatorConfig groupCoordinatorConfig, ShareGroupConfig
shareGroupConfig) {
- Map<String, Object> valueMaps = CONFIG_DEF.parse(unparsedMap);
- int consumerHeartbeatInterval = (Integer)
valueMaps.get(CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG);
- int consumerSessionTimeout = (Integer)
valueMaps.get(CONSUMER_SESSION_TIMEOUT_MS_CONFIG);
- int consumerAssignmentIntervalMs = (Integer)
valueMaps.get(CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG);
- int shareHeartbeatInterval = (Integer)
valueMaps.get(SHARE_HEARTBEAT_INTERVAL_MS_CONFIG);
- int shareSessionTimeout = (Integer)
valueMaps.get(SHARE_SESSION_TIMEOUT_MS_CONFIG);
- int shareRecordLockDurationMs = (Integer)
valueMaps.get(SHARE_RECORD_LOCK_DURATION_MS_CONFIG);
- int shareDeliveryCountLimit = (Integer)
valueMaps.get(SHARE_DELIVERY_COUNT_LIMIT_CONFIG);
- int sharePartitionMaxRecordLocks = (Integer)
valueMaps.get(SHARE_PARTITION_MAX_RECORD_LOCKS_CONFIG);
- int shareAssignmentIntervalMs = (Integer)
valueMaps.get(SHARE_ASSIGNMENT_INTERVAL_MS_CONFIG);
- int streamsSessionTimeoutMs = (Integer)
valueMaps.get(STREAMS_SESSION_TIMEOUT_MS_CONFIG);
- int streamsHeartbeatIntervalMs = (Integer)
valueMaps.get(STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG);
- int streamsNumStandbyReplicas = (Integer)
valueMaps.get(STREAMS_NUM_STANDBY_REPLICAS_CONFIG);
- int streamsAssignmentIntervalMs = (Integer)
valueMaps.get(STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG);
- int streamsTaskOffsetIntervalMs = (Integer)
valueMaps.get(STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG);
- if (consumerHeartbeatInterval <
groupCoordinatorConfig.consumerGroupMinHeartbeatIntervalMs()) {
- throw new
InvalidConfigurationException(CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG + " must be
greater than or equal to " +
-
GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG);
- }
- if (consumerHeartbeatInterval >
groupCoordinatorConfig.consumerGroupMaxHeartbeatIntervalMs()) {
- throw new
InvalidConfigurationException(CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG + " must be
less than or equal to " +
-
GroupCoordinatorConfig.CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG);
- }
- if (consumerSessionTimeout <
groupCoordinatorConfig.consumerGroupMinSessionTimeoutMs()) {
- throw new
InvalidConfigurationException(CONSUMER_SESSION_TIMEOUT_MS_CONFIG + " must be
greater than or equal to " +
-
GroupCoordinatorConfig.CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG);
- }
- if (consumerSessionTimeout >
groupCoordinatorConfig.consumerGroupMaxSessionTimeoutMs()) {
- throw new
InvalidConfigurationException(CONSUMER_SESSION_TIMEOUT_MS_CONFIG + " must be
less than or equal to " +
-
GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG);
- }
- // If no group-level consumer assignment interval is configured, do
not attempt to validate it.
- // TODO: It's not clear if we can run the validation unconditionally.
- // KAFKA-20337 tracks the work to clean this up.
- if (unparsedMap.containsKey(CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG)) {
- if (consumerAssignmentIntervalMs <
groupCoordinatorConfig.consumerGroupMinAssignmentIntervalMs()) {
- throw new
InvalidConfigurationException(CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG + " must
be greater than or equal to " +
-
GroupCoordinatorConfig.CONSUMER_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG);
- }
- if (consumerAssignmentIntervalMs >
groupCoordinatorConfig.consumerGroupMaxAssignmentIntervalMs()) {
- throw new
InvalidConfigurationException(CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG + " must
be less than or equal to " +
-
GroupCoordinatorConfig.CONSUMER_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_CONFIG);
- }
- }
- if (shareHeartbeatInterval <
groupCoordinatorConfig.shareGroupMinHeartbeatIntervalMs()) {
- throw new
InvalidConfigurationException(SHARE_HEARTBEAT_INTERVAL_MS_CONFIG + " must be
greater than or equal to " +
-
GroupCoordinatorConfig.SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG);
- }
- if (shareHeartbeatInterval >
groupCoordinatorConfig.shareGroupMaxHeartbeatIntervalMs()) {
- throw new
InvalidConfigurationException(SHARE_HEARTBEAT_INTERVAL_MS_CONFIG + " must be
less than or equal to " +
-
GroupCoordinatorConfig.SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG);
- }
- if (shareSessionTimeout <
groupCoordinatorConfig.shareGroupMinSessionTimeoutMs()) {
- throw new
InvalidConfigurationException(SHARE_SESSION_TIMEOUT_MS_CONFIG + " must be
greater than or equal to " +
-
GroupCoordinatorConfig.SHARE_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG);
- }
- if (shareSessionTimeout >
groupCoordinatorConfig.shareGroupMaxSessionTimeoutMs()) {
- throw new
InvalidConfigurationException(SHARE_SESSION_TIMEOUT_MS_CONFIG + " must be less
than or equal to " +
-
GroupCoordinatorConfig.SHARE_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG);
- }
- if (shareRecordLockDurationMs <
shareGroupConfig.shareGroupMinRecordLockDurationMs()) {
- throw new
InvalidConfigurationException(SHARE_RECORD_LOCK_DURATION_MS_CONFIG + " must be
greater than or equal to " +
-
ShareGroupConfig.SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_CONFIG);
- }
- if (shareRecordLockDurationMs >
shareGroupConfig.shareGroupMaxRecordLockDurationMs()) {
- throw new
InvalidConfigurationException(SHARE_RECORD_LOCK_DURATION_MS_CONFIG + " must be
less than or equal to " +
-
ShareGroupConfig.SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_CONFIG);
- }
- if (shareDeliveryCountLimit <
shareGroupConfig.shareGroupMinDeliveryCountLimit()) {
- throw new
InvalidConfigurationException(SHARE_DELIVERY_COUNT_LIMIT_CONFIG + " must be
greater than or equal to " +
- ShareGroupConfig.SHARE_GROUP_MIN_DELIVERY_COUNT_LIMIT_CONFIG);
- }
- if (shareDeliveryCountLimit >
shareGroupConfig.shareGroupMaxDeliveryCountLimit()) {
- throw new
InvalidConfigurationException(SHARE_DELIVERY_COUNT_LIMIT_CONFIG + " must be
less than or equal to " +
- ShareGroupConfig.SHARE_GROUP_MAX_DELIVERY_COUNT_LIMIT_CONFIG);
- }
- if (sharePartitionMaxRecordLocks <
shareGroupConfig.shareGroupMinPartitionMaxRecordLocks()) {
- throw new
InvalidConfigurationException(SHARE_PARTITION_MAX_RECORD_LOCKS_CONFIG + " must
be greater than or equal to " +
-
ShareGroupConfig.SHARE_GROUP_MIN_PARTITION_MAX_RECORD_LOCKS_CONFIG);
- }
- if (sharePartitionMaxRecordLocks >
shareGroupConfig.shareGroupMaxPartitionMaxRecordLocks()) {
- throw new
InvalidConfigurationException(SHARE_PARTITION_MAX_RECORD_LOCKS_CONFIG + " must
be less than or equal to " +
-
ShareGroupConfig.SHARE_GROUP_MAX_PARTITION_MAX_RECORD_LOCKS_CONFIG);
- }
- // If no group-level share assignment interval is configured, do not
attempt to validate it.
- // TODO: It's not clear if we can run the validation unconditionally.
- // KAFKA-20337 tracks the work to clean this up.
- if (unparsedMap.containsKey(SHARE_ASSIGNMENT_INTERVAL_MS_CONFIG)) {
- if (shareAssignmentIntervalMs <
groupCoordinatorConfig.shareGroupMinAssignmentIntervalMs()) {
- throw new
InvalidConfigurationException(SHARE_ASSIGNMENT_INTERVAL_MS_CONFIG + " must be
greater than or equal to " +
-
GroupCoordinatorConfig.SHARE_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG);
- }
- if (shareAssignmentIntervalMs >
groupCoordinatorConfig.shareGroupMaxAssignmentIntervalMs()) {
- throw new
InvalidConfigurationException(SHARE_ASSIGNMENT_INTERVAL_MS_CONFIG + " must be
less than or equal to " +
-
GroupCoordinatorConfig.SHARE_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_CONFIG);
- }
- }
- if (streamsHeartbeatIntervalMs <
groupCoordinatorConfig.streamsGroupMinHeartbeatIntervalMs()) {
- throw new
InvalidConfigurationException(STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG + " must be
greater than or equal to " +
-
GroupCoordinatorConfig.STREAMS_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG);
- }
- if (streamsHeartbeatIntervalMs >
groupCoordinatorConfig.streamsGroupMaxHeartbeatIntervalMs()) {
- throw new
InvalidConfigurationException(STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG + " must be
less than or equal to " +
-
GroupCoordinatorConfig.STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG);
- }
- if (streamsSessionTimeoutMs <
groupCoordinatorConfig.streamsGroupMinSessionTimeoutMs()) {
- throw new
InvalidConfigurationException(STREAMS_SESSION_TIMEOUT_MS_CONFIG + " must be
greater than or equal to " +
-
GroupCoordinatorConfig.STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG);
- }
- if (streamsSessionTimeoutMs >
groupCoordinatorConfig.streamsGroupMaxSessionTimeoutMs()) {
- throw new
InvalidConfigurationException(STREAMS_SESSION_TIMEOUT_MS_CONFIG + " must be
less than or equal to " +
-
GroupCoordinatorConfig.STREAMS_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG);
- }
- if (streamsNumStandbyReplicas >
groupCoordinatorConfig.streamsGroupMaxNumStandbyReplicas()) {
- throw new
InvalidConfigurationException(STREAMS_NUM_STANDBY_REPLICAS_CONFIG + " must be
less than or equal to " +
-
GroupCoordinatorConfig.STREAMS_GROUP_MAX_STANDBY_REPLICAS_CONFIG);
- }
- // If no group-level streams assignment interval is configured, do not
attempt to validate it.
- // TODO: It's not clear if we can run the validation unconditionally.
- // KAFKA-20337 tracks the work to clean this up.
- if (unparsedMap.containsKey(STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG)) {
- if (streamsAssignmentIntervalMs <
groupCoordinatorConfig.streamsGroupMinAssignmentIntervalMs()) {
- throw new
InvalidConfigurationException(STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG + " must be
greater than or equal to " +
-
GroupCoordinatorConfig.STREAMS_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG);
- }
- if (streamsAssignmentIntervalMs >
groupCoordinatorConfig.streamsGroupMaxAssignmentIntervalMs()) {
- throw new
InvalidConfigurationException(STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG + " must be
less than or equal to " +
-
GroupCoordinatorConfig.STREAMS_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_CONFIG);
- }
- }
- if (streamsTaskOffsetIntervalMs <
groupCoordinatorConfig.streamsGroupMinTaskOffsetIntervalMs()) {
- throw new
InvalidConfigurationException(STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG + " must
be greater than or equal to " +
-
GroupCoordinatorConfig.STREAMS_GROUP_MIN_TASK_OFFSET_INTERVAL_MS_CONFIG);
- }
- if (consumerSessionTimeout <= consumerHeartbeatInterval) {
- throw new
InvalidConfigurationException(CONSUMER_SESSION_TIMEOUT_MS_CONFIG + " must be
greater than " +
- CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG);
- }
- if (shareSessionTimeout <= shareHeartbeatInterval) {
- throw new
InvalidConfigurationException(SHARE_SESSION_TIMEOUT_MS_CONFIG + " must be
greater than " +
- SHARE_HEARTBEAT_INTERVAL_MS_CONFIG);
- }
- if (streamsSessionTimeoutMs <= streamsHeartbeatIntervalMs) {
- throw new
InvalidConfigurationException(STREAMS_SESSION_TIMEOUT_MS_CONFIG + " must be
greater than " +
- STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG);
- }
+ * Check that the given properties contain only valid group config names
and that
+ * all values can be parsed and are valid.
+ */
+ public static void validate(
+ Map<String, ?> props,
+ GroupCoordinatorConfig groupCoordinatorConfig,
+ ShareGroupConfig shareGroupConfig
+ ) {
+ validateNames(props);
+ var parsed = CONFIG_DEF.parse(props);
+ parsed.keySet().retainAll(props.keySet());
+ validateValues(
+ parsed,
+ groupCoordinatorConfig,
+ shareGroupConfig
+ );
}
/**
- * Check that the given properties contain only valid group config names
and that
- * all values can be parsed and are valid. The provided properties are
merged with
- * the broker-level defaults before validation.
+ * Validates the parsed values against broker-level bounds.
+ * Only configs explicitly present in the parsed map are validated.
*/
- public static void validate(Map<String, ?> props, GroupCoordinatorConfig
groupCoordinatorConfig, ShareGroupConfig shareGroupConfig) {
- // TODO: We shouldn't be re-deriving the default config from
GroupConfigManager here.
- // KAFKA-20337 tracks the work to clean this up.
- Map<String, Object> combinedConfigs = new HashMap<>();
-
combinedConfigs.putAll(groupCoordinatorConfig.extractGroupConfigMap(shareGroupConfig));
- combinedConfigs.putAll(props);
+ private static void validateValues(
+ Map<String, Object> parsed,
+ GroupCoordinatorConfig groupCoordinatorConfig,
+ ShareGroupConfig shareGroupConfig
+ ) {
+ // Consumer group configs.
+ validateIntRange(
+ parsed,
+ CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG,
+ groupCoordinatorConfig.consumerGroupMinHeartbeatIntervalMs(),
+
GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG,
+ groupCoordinatorConfig.consumerGroupMaxHeartbeatIntervalMs(),
+
GroupCoordinatorConfig.CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG
+ );
+ validateIntRange(
+ parsed,
+ CONSUMER_SESSION_TIMEOUT_MS_CONFIG,
+ groupCoordinatorConfig.consumerGroupMinSessionTimeoutMs(),
+
GroupCoordinatorConfig.CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG,
+ groupCoordinatorConfig.consumerGroupMaxSessionTimeoutMs(),
+ GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG
+ );
+ validateIntRange(
+ parsed,
+ CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG,
+ groupCoordinatorConfig.consumerGroupMinAssignmentIntervalMs(),
+
GroupCoordinatorConfig.CONSUMER_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG,
+ groupCoordinatorConfig.consumerGroupMaxAssignmentIntervalMs(),
+
GroupCoordinatorConfig.CONSUMER_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_CONFIG
+ );
+
+ // Share group configs.
+ validateIntRange(
+ parsed,
+ SHARE_HEARTBEAT_INTERVAL_MS_CONFIG,
+ groupCoordinatorConfig.shareGroupMinHeartbeatIntervalMs(),
+
GroupCoordinatorConfig.SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG,
+ groupCoordinatorConfig.shareGroupMaxHeartbeatIntervalMs(),
+ GroupCoordinatorConfig.SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG
+ );
+ validateIntRange(
+ parsed,
+ SHARE_SESSION_TIMEOUT_MS_CONFIG,
+ groupCoordinatorConfig.shareGroupMinSessionTimeoutMs(),
+ GroupCoordinatorConfig.SHARE_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG,
+ groupCoordinatorConfig.shareGroupMaxSessionTimeoutMs(),
+ GroupCoordinatorConfig.SHARE_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG
+ );
+ validateIntRange(
+ parsed,
+ SHARE_RECORD_LOCK_DURATION_MS_CONFIG,
+ shareGroupConfig.shareGroupMinRecordLockDurationMs(),
+ ShareGroupConfig.SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_CONFIG,
+ shareGroupConfig.shareGroupMaxRecordLockDurationMs(),
+ ShareGroupConfig.SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_CONFIG
+ );
+ validateIntRange(
+ parsed,
+ SHARE_DELIVERY_COUNT_LIMIT_CONFIG,
+ shareGroupConfig.shareGroupMinDeliveryCountLimit(),
+ ShareGroupConfig.SHARE_GROUP_MIN_DELIVERY_COUNT_LIMIT_CONFIG,
+ shareGroupConfig.shareGroupMaxDeliveryCountLimit(),
+ ShareGroupConfig.SHARE_GROUP_MAX_DELIVERY_COUNT_LIMIT_CONFIG
+ );
+ validateIntRange(
+ parsed,
+ SHARE_PARTITION_MAX_RECORD_LOCKS_CONFIG,
+ shareGroupConfig.shareGroupMinPartitionMaxRecordLocks(),
+ ShareGroupConfig.SHARE_GROUP_MIN_PARTITION_MAX_RECORD_LOCKS_CONFIG,
+ shareGroupConfig.shareGroupMaxPartitionMaxRecordLocks(),
+ ShareGroupConfig.SHARE_GROUP_MAX_PARTITION_MAX_RECORD_LOCKS_CONFIG
+ );
+ validateIntRange(
+ parsed,
+ SHARE_ASSIGNMENT_INTERVAL_MS_CONFIG,
+ groupCoordinatorConfig.shareGroupMinAssignmentIntervalMs(),
+
GroupCoordinatorConfig.SHARE_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG,
+ groupCoordinatorConfig.shareGroupMaxAssignmentIntervalMs(),
+
GroupCoordinatorConfig.SHARE_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_CONFIG
+ );
+
+ // Streams group configs.
+ validateIntRange(
+ parsed,
+ STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG,
+ groupCoordinatorConfig.streamsGroupMinHeartbeatIntervalMs(),
+
GroupCoordinatorConfig.STREAMS_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG,
+ groupCoordinatorConfig.streamsGroupMaxHeartbeatIntervalMs(),
+
GroupCoordinatorConfig.STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG
+ );
+ validateIntRange(
+ parsed,
+ STREAMS_SESSION_TIMEOUT_MS_CONFIG,
+ groupCoordinatorConfig.streamsGroupMinSessionTimeoutMs(),
+ GroupCoordinatorConfig.STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG,
+ groupCoordinatorConfig.streamsGroupMaxSessionTimeoutMs(),
+ GroupCoordinatorConfig.STREAMS_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG
+ );
+ validateIntMax(
+ parsed,
+ STREAMS_NUM_STANDBY_REPLICAS_CONFIG,
+ groupCoordinatorConfig.streamsGroupMaxNumStandbyReplicas(),
+ GroupCoordinatorConfig.STREAMS_GROUP_MAX_STANDBY_REPLICAS_CONFIG
+ );
+ validateIntRange(
+ parsed,
+ STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG,
+ groupCoordinatorConfig.streamsGroupMinAssignmentIntervalMs(),
+
GroupCoordinatorConfig.STREAMS_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG,
+ groupCoordinatorConfig.streamsGroupMaxAssignmentIntervalMs(),
+
GroupCoordinatorConfig.STREAMS_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_CONFIG
+ );
+ validateIntMin(
+ parsed,
+ STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG,
+ groupCoordinatorConfig.streamsGroupMinTaskOffsetIntervalMs(),
+
GroupCoordinatorConfig.STREAMS_GROUP_MIN_TASK_OFFSET_INTERVAL_MS_CONFIG
+ );
+
+ // Cross-field validations: session timeout must be greater than
heartbeat interval.
+ validateSessionExceedsHeartbeat(
+ parsed,
+ CONSUMER_SESSION_TIMEOUT_MS_CONFIG,
+ groupCoordinatorConfig.consumerGroupSessionTimeoutMs(),
+ CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG,
+ groupCoordinatorConfig.consumerGroupHeartbeatIntervalMs()
+ );
+ validateSessionExceedsHeartbeat(
+ parsed,
+ SHARE_SESSION_TIMEOUT_MS_CONFIG,
+ groupCoordinatorConfig.shareGroupSessionTimeoutMs(),
+ SHARE_HEARTBEAT_INTERVAL_MS_CONFIG,
+ groupCoordinatorConfig.shareGroupHeartbeatIntervalMs()
+ );
+ validateSessionExceedsHeartbeat(
+ parsed,
+ STREAMS_SESSION_TIMEOUT_MS_CONFIG,
+ groupCoordinatorConfig.streamsGroupSessionTimeoutMs(),
+ STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG,
+ groupCoordinatorConfig.streamsGroupHeartbeatIntervalMs()
+ );
+ }
+
+ /**
+ * Validates that an integer config value falls within [min, max].
+ * No-op when the key is absent from the parsed map.
+ */
+ private static void validateIntRange(
+ Map<String, Object> parsed,
+ String key,
+ int min,
+ String minConfigName,
+ int max,
+ String maxConfigName
+ ) {
+ if (!parsed.containsKey(key)) return;
+ int value = (Integer) parsed.get(key);
+ if (value < min)
+ throw new InvalidConfigurationException(key + " must be greater
than or equal to " + minConfigName);
+ if (value > max)
+ throw new InvalidConfigurationException(key + " must be less than
or equal to " + maxConfigName);
+ }
+
+ /**
+ * Validates that an integer config value does not exceed max.
+ * No-op when the key is absent from the parsed map.
+ */
+ private static void validateIntMax(
+ Map<String, Object> parsed,
+ String key,
+ int max,
+ String maxConfigName
+ ) {
+ if (!parsed.containsKey(key)) return;
+ int value = (Integer) parsed.get(key);
+ if (value > max)
+ throw new InvalidConfigurationException(key + " must be less than
or equal to " + maxConfigName);
+ }
- validateNames(combinedConfigs);
- validateValues(combinedConfigs, groupCoordinatorConfig,
shareGroupConfig);
+ /**
+ * Validates that an integer config value is at least min.
+ * No-op when the key is absent from the parsed map.
+ */
+ private static void validateIntMin(
+ Map<String, Object> parsed,
+ String key,
+ int min,
+ String minConfigName
+ ) {
+ if (!parsed.containsKey(key)) return;
+ int value = (Integer) parsed.get(key);
+ if (value < min)
+ throw new InvalidConfigurationException(key + " must be greater
than or equal to " + minConfigName);
+ }
+
+ /**
+ * Validates that the session timeout is greater than the heartbeat
interval.
+ * Uses broker defaults for any config not present in the parsed map.
+ */
+ private static void validateSessionExceedsHeartbeat(
+ Map<String, Object> parsed,
+ String sessionKey,
+ int defaultSession,
+ String heartbeatKey,
+ int defaultHeartbeat
+ ) {
+ if (parsed.containsKey(sessionKey) ||
parsed.containsKey(heartbeatKey)) {
+ int effectiveSession = parsed.containsKey(sessionKey)
+ ? (Integer) parsed.get(sessionKey) : defaultSession;
+ int effectiveHeartbeat = parsed.containsKey(heartbeatKey)
+ ? (Integer) parsed.get(heartbeatKey) : defaultHeartbeat;
+ if (effectiveSession <= effectiveHeartbeat)
+ throw new InvalidConfigurationException(sessionKey + " must be
greater than " + heartbeatKey);
+ }
}
/**
@@ -575,7 +628,12 @@ public final class GroupConfig extends AbstractConfig {
) {
Properties effective = new Properties();
effective.putAll(props);
- evaluateValues(effective, groupId, groupCoordinatorConfig,
shareGroupConfig);
+ evaluateValues(
+ effective,
+ groupId,
+ groupCoordinatorConfig,
+ shareGroupConfig
+ );
return effective;
}
@@ -585,62 +643,133 @@ public final class GroupConfig extends AbstractConfig {
GroupCoordinatorConfig groupCoordinatorConfig,
ShareGroupConfig shareGroupConfig
) {
- // Consumer group configs
- clampToRange(props, groupId, CONSUMER_SESSION_TIMEOUT_MS_CONFIG,
+ // Consumer group configs.
+ clampToRange(
+ props,
+ groupId,
+ CONSUMER_SESSION_TIMEOUT_MS_CONFIG,
groupCoordinatorConfig.consumerGroupMinSessionTimeoutMs(),
- groupCoordinatorConfig.consumerGroupMaxSessionTimeoutMs());
- clampToRange(props, groupId, CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG,
+ groupCoordinatorConfig.consumerGroupMaxSessionTimeoutMs()
+ );
+ clampToRange(
+ props,
+ groupId,
+ CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG,
groupCoordinatorConfig.consumerGroupMinHeartbeatIntervalMs(),
- groupCoordinatorConfig.consumerGroupMaxHeartbeatIntervalMs());
- clampToRange(props, groupId, CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG,
+ groupCoordinatorConfig.consumerGroupMaxHeartbeatIntervalMs()
+ );
+ clampToRange(
+ props,
+ groupId,
+ CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG,
groupCoordinatorConfig.consumerGroupMinAssignmentIntervalMs(),
- groupCoordinatorConfig.consumerGroupMaxAssignmentIntervalMs());
-
- // Share group configs
- clampToRange(props, groupId, SHARE_SESSION_TIMEOUT_MS_CONFIG,
+ groupCoordinatorConfig.consumerGroupMaxAssignmentIntervalMs()
+ );
+
+ // Share group configs.
+ clampToRange(
+ props,
+ groupId,
+ SHARE_SESSION_TIMEOUT_MS_CONFIG,
groupCoordinatorConfig.shareGroupMinSessionTimeoutMs(),
- groupCoordinatorConfig.shareGroupMaxSessionTimeoutMs());
- clampToRange(props, groupId, SHARE_HEARTBEAT_INTERVAL_MS_CONFIG,
+ groupCoordinatorConfig.shareGroupMaxSessionTimeoutMs()
+ );
+ clampToRange(
+ props,
+ groupId,
+ SHARE_HEARTBEAT_INTERVAL_MS_CONFIG,
groupCoordinatorConfig.shareGroupMinHeartbeatIntervalMs(),
- groupCoordinatorConfig.shareGroupMaxHeartbeatIntervalMs());
- clampToRange(props, groupId, SHARE_RECORD_LOCK_DURATION_MS_CONFIG,
+ groupCoordinatorConfig.shareGroupMaxHeartbeatIntervalMs()
+ );
+ clampToRange(
+ props,
+ groupId,
+ SHARE_RECORD_LOCK_DURATION_MS_CONFIG,
shareGroupConfig.shareGroupMinRecordLockDurationMs(),
- shareGroupConfig.shareGroupMaxRecordLockDurationMs());
- clampToRange(props, groupId, SHARE_DELIVERY_COUNT_LIMIT_CONFIG,
+ shareGroupConfig.shareGroupMaxRecordLockDurationMs()
+ );
+ clampToRange(
+ props,
+ groupId,
+ SHARE_DELIVERY_COUNT_LIMIT_CONFIG,
shareGroupConfig.shareGroupMinDeliveryCountLimit(),
- shareGroupConfig.shareGroupMaxDeliveryCountLimit());
- clampToRange(props, groupId, SHARE_PARTITION_MAX_RECORD_LOCKS_CONFIG,
+ shareGroupConfig.shareGroupMaxDeliveryCountLimit()
+ );
+ clampToRange(
+ props,
+ groupId,
+ SHARE_PARTITION_MAX_RECORD_LOCKS_CONFIG,
shareGroupConfig.shareGroupMinPartitionMaxRecordLocks(),
- shareGroupConfig.shareGroupMaxPartitionMaxRecordLocks());
- clampToRange(props, groupId, SHARE_ASSIGNMENT_INTERVAL_MS_CONFIG,
+ shareGroupConfig.shareGroupMaxPartitionMaxRecordLocks()
+ );
+ clampToRange(
+ props,
+ groupId,
+ SHARE_ASSIGNMENT_INTERVAL_MS_CONFIG,
groupCoordinatorConfig.shareGroupMinAssignmentIntervalMs(),
- groupCoordinatorConfig.shareGroupMaxAssignmentIntervalMs());
-
- // Streams group configs
- clampToRange(props, groupId, STREAMS_SESSION_TIMEOUT_MS_CONFIG,
+ groupCoordinatorConfig.shareGroupMaxAssignmentIntervalMs()
+ );
+
+ // Streams group configs.
+ clampToRange(
+ props,
+ groupId,
+ STREAMS_SESSION_TIMEOUT_MS_CONFIG,
groupCoordinatorConfig.streamsGroupMinSessionTimeoutMs(),
- groupCoordinatorConfig.streamsGroupMaxSessionTimeoutMs());
- clampToRange(props, groupId, STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG,
+ groupCoordinatorConfig.streamsGroupMaxSessionTimeoutMs()
+ );
+ clampToRange(
+ props,
+ groupId,
+ STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG,
groupCoordinatorConfig.streamsGroupMinHeartbeatIntervalMs(),
- groupCoordinatorConfig.streamsGroupMaxHeartbeatIntervalMs());
- clampToMax(props, groupId, STREAMS_NUM_STANDBY_REPLICAS_CONFIG,
- groupCoordinatorConfig.streamsGroupMaxNumStandbyReplicas());
- clampToRange(props, groupId, STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG,
+ groupCoordinatorConfig.streamsGroupMaxHeartbeatIntervalMs()
+ );
+ clampToMax(
+ props,
+ groupId,
+ STREAMS_NUM_STANDBY_REPLICAS_CONFIG,
+ groupCoordinatorConfig.streamsGroupMaxNumStandbyReplicas()
+ );
+ clampToRange(
+ props,
+ groupId,
+ STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG,
groupCoordinatorConfig.streamsGroupMinAssignmentIntervalMs(),
- groupCoordinatorConfig.streamsGroupMaxAssignmentIntervalMs());
- clampToMin(props, groupId, STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG,
- groupCoordinatorConfig.streamsGroupMinTaskOffsetIntervalMs());
+ groupCoordinatorConfig.streamsGroupMaxAssignmentIntervalMs()
+ );
+ clampToMin(
+ props,
+ groupId,
+ STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG,
+ groupCoordinatorConfig.streamsGroupMinTaskOffsetIntervalMs()
+ );
// Verify that clamping did not break the session > heartbeat
invariant.
- checkSessionExceedsHeartbeat(props, groupId,
- CONSUMER_SESSION_TIMEOUT_MS_CONFIG,
groupCoordinatorConfig.consumerGroupSessionTimeoutMs(),
- CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG,
groupCoordinatorConfig.consumerGroupHeartbeatIntervalMs());
- checkSessionExceedsHeartbeat(props, groupId,
- SHARE_SESSION_TIMEOUT_MS_CONFIG,
groupCoordinatorConfig.shareGroupSessionTimeoutMs(),
- SHARE_HEARTBEAT_INTERVAL_MS_CONFIG,
groupCoordinatorConfig.shareGroupHeartbeatIntervalMs());
- checkSessionExceedsHeartbeat(props, groupId,
- STREAMS_SESSION_TIMEOUT_MS_CONFIG,
groupCoordinatorConfig.streamsGroupSessionTimeoutMs(),
- STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG,
groupCoordinatorConfig.streamsGroupHeartbeatIntervalMs());
+ checkSessionExceedsHeartbeat(
+ props,
+ groupId,
+ CONSUMER_SESSION_TIMEOUT_MS_CONFIG,
+ groupCoordinatorConfig.consumerGroupSessionTimeoutMs(),
+ CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG,
+ groupCoordinatorConfig.consumerGroupHeartbeatIntervalMs()
+ );
+ checkSessionExceedsHeartbeat(
+ props,
+ groupId,
+ SHARE_SESSION_TIMEOUT_MS_CONFIG,
+ groupCoordinatorConfig.shareGroupSessionTimeoutMs(),
+ SHARE_HEARTBEAT_INTERVAL_MS_CONFIG,
+ groupCoordinatorConfig.shareGroupHeartbeatIntervalMs()
+ );
+ checkSessionExceedsHeartbeat(
+ props,
+ groupId,
+ STREAMS_SESSION_TIMEOUT_MS_CONFIG,
+ groupCoordinatorConfig.streamsGroupSessionTimeoutMs(),
+ STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG,
+ groupCoordinatorConfig.streamsGroupHeartbeatIntervalMs()
+ );
}
/**
@@ -662,7 +791,7 @@ public final class GroupConfig extends AbstractConfig {
int session = rawSession != null ?
Integer.parseInt(rawSession.toString()) : defaultSession;
int heartbeat = rawHeartbeat != null ?
Integer.parseInt(rawHeartbeat.toString()) : defaultHeartbeat;
if (session <= heartbeat) {
- log.warn("The effective {} ({}) for group '{}' is not greater than
{} ({}). "
+ LOG.warn("The effective {} ({}) for group '{}' is not greater than
{} ({}). "
+ "Check that the broker-level min/max bounds for session
timeout "
+ "and heartbeat interval do not overlap.",
sessionKey, session, groupId, heartbeatKey, heartbeat);
@@ -691,12 +820,12 @@ public final class GroupConfig extends AbstractConfig {
int value = Integer.parseInt(rawValue.toString());
if (value < min) {
- log.warn("The group config '{}' for group '{}' has value {} which
is below the broker's " +
+ LOG.warn("The group config '{}' for group '{}' has value {} which
is below the broker's " +
"allowed minimum {}. The effective value will be capped to
{}.",
key, groupId, value, min, min);
props.put(key, min);
} else if (value > max) {
- log.warn("The group config '{}' for group '{}' has value {} which
exceeds the broker's " +
+ LOG.warn("The group config '{}' for group '{}' has value {} which
exceeds the broker's " +
"allowed maximum {}. The effective value will be capped to
{}.",
key, groupId, value, max, max);
props.put(key, max);
@@ -723,7 +852,7 @@ public final class GroupConfig extends AbstractConfig {
int value = Integer.parseInt(rawValue.toString());
if (value > max) {
- log.warn("The group config '{}' for group '{}' has value {} which
exceeds the broker's " +
+ LOG.warn("The group config '{}' for group '{}' has value {} which
exceeds the broker's " +
"allowed maximum {}. The effective value will be capped to
{}.",
key, groupId, value, max, max);
props.put(key, max);
@@ -750,7 +879,7 @@ public final class GroupConfig extends AbstractConfig {
int value = Integer.parseInt(rawValue.toString());
if (value < min) {
- log.warn("The group config '{}' for group '{}' has value {} which
is below the broker's " +
+ LOG.warn("The group config '{}' for group '{}' has value {} which
is below the broker's " +
"allowed minimum {}. The effective value will be capped to
{}.",
key, groupId, value, min, min);
props.put(key, min);
@@ -760,7 +889,10 @@ public final class GroupConfig extends AbstractConfig {
/**
* Create a group config instance using the given properties and defaults.
*/
- public static GroupConfig fromProps(Map<?, ?> defaults, Properties
overrides) {
+ public static GroupConfig fromProps(
+ Map<?, ?> defaults,
+ Properties overrides
+ ) {
Properties props = new Properties();
props.putAll(defaults);
props.putAll(overrides);
@@ -784,14 +916,14 @@ public final class GroupConfig extends AbstractConfig {
/**
* The consumer group session timeout in milliseconds.
*/
- public int consumerSessionTimeoutMs() {
+ public Optional<Integer> consumerSessionTimeoutMs() {
return consumerSessionTimeoutMs;
}
/**
* The consumer group heartbeat interval in milliseconds.
*/
- public int consumerHeartbeatIntervalMs() {
+ public Optional<Integer> consumerHeartbeatIntervalMs() {
return consumerHeartbeatIntervalMs;
}
@@ -812,43 +944,43 @@ public final class GroupConfig extends AbstractConfig {
/**
* The share group session timeout in milliseconds.
*/
- public int shareSessionTimeoutMs() {
+ public Optional<Integer> shareSessionTimeoutMs() {
return shareSessionTimeoutMs;
}
/**
* The share group heartbeat interval in milliseconds.
*/
- public int shareHeartbeatIntervalMs() {
+ public Optional<Integer> shareHeartbeatIntervalMs() {
return shareHeartbeatIntervalMs;
}
/**
* The share group delivery count limit.
*/
- public int shareDeliveryCountLimit() {
+ public Optional<Integer> shareDeliveryCountLimit() {
return shareDeliveryCountLimit;
}
/**
* The share group partition max record locks.
*/
- public int sharePartitionMaxRecordLocks() {
+ public Optional<Integer> sharePartitionMaxRecordLocks() {
return sharePartitionMaxRecordLocks;
}
/**
* The share group record lock duration milliseconds.
*/
- public int shareRecordLockDurationMs() {
+ public Optional<Integer> shareRecordLockDurationMs() {
return shareRecordLockDurationMs;
}
/**
* The share group auto offset reset strategy.
*/
- public ShareGroupAutoOffsetResetStrategy shareAutoOffsetReset() {
- return
ShareGroupAutoOffsetResetStrategy.fromString(shareAutoOffsetReset);
+ public Optional<ShareGroupAutoOffsetResetStrategy> shareAutoOffsetReset() {
+ return shareAutoOffsetReset;
}
/**
@@ -868,28 +1000,28 @@ public final class GroupConfig extends AbstractConfig {
/**
* The streams group session timeout in milliseconds.
*/
- public int streamsSessionTimeoutMs() {
+ public Optional<Integer> streamsSessionTimeoutMs() {
return streamsSessionTimeoutMs;
}
/**
* The streams group heartbeat interval in milliseconds.
*/
- public int streamsHeartbeatIntervalMs() {
+ public Optional<Integer> streamsHeartbeatIntervalMs() {
return streamsHeartbeatIntervalMs;
}
/**
* The number of streams standby replicas for each task.
*/
- public int streamsNumStandbyReplicas() {
+ public Optional<Integer> streamsNumStandbyReplicas() {
return streamsNumStandbyReplicas;
}
/**
* The initial rebalance delay for streams groups.
*/
- public int streamsInitialRebalanceDelayMs() {
+ public Optional<Integer> streamsInitialRebalanceDelayMs() {
return streamsInitialRebalanceDelayMs;
}
@@ -910,28 +1042,21 @@ public final class GroupConfig extends AbstractConfig {
/**
* The task offset reporting interval.
*/
- public int streamsTaskOffsetIntervalMs() {
+ public Optional<Integer> streamsTaskOffsetIntervalMs() {
return streamsTaskOffsetIntervalMs;
}
/**
* The share group isolation level.
*/
- public IsolationLevel shareIsolationLevel() {
- if (shareIsolationLevel == null) {
- throw new IllegalArgumentException("Share isolation level is
null");
- }
- try {
- return
IsolationLevel.valueOf(shareIsolationLevel.toUpperCase(Locale.ROOT));
- } catch (IllegalArgumentException e) {
- throw new IllegalArgumentException("Unknown Share isolation level:
" + shareIsolationLevel);
- }
+ public Optional<IsolationLevel> shareIsolationLevel() {
+ return shareIsolationLevel;
}
/**
* The share group renew acknowledge enable.
*/
- public boolean shareRenewAcknowledgeEnable() {
+ public Optional<Boolean> shareRenewAcknowledgeEnable() {
return shareRenewAcknowledgeEnable;
}
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfigManager.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfigManager.java
index 9374d768107..310d079205c 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfigManager.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfigManager.java
@@ -32,8 +32,6 @@ import java.util.concurrent.ConcurrentHashMap;
*/
public class GroupConfigManager implements AutoCloseable {
- private final GroupConfig defaultConfig;
-
private final Map<String, GroupConfig> configMap;
private final GroupCoordinatorConfig groupCoordinatorConfig;
@@ -41,12 +39,10 @@ public class GroupConfigManager implements AutoCloseable {
private final ShareGroupConfig shareGroupConfig;
public GroupConfigManager(
- Map<?, ?> defaultConfig,
GroupCoordinatorConfig groupCoordinatorConfig,
ShareGroupConfig shareGroupConfig
) {
this.configMap = new ConcurrentHashMap<>();
- this.defaultConfig = new GroupConfig(defaultConfig);
this.groupCoordinatorConfig =
Objects.requireNonNull(groupCoordinatorConfig);
this.shareGroupConfig = Objects.requireNonNull(shareGroupConfig);
}
@@ -75,10 +71,7 @@ public class GroupConfigManager implements AutoCloseable {
Properties evaluatedProps = GroupConfig.evaluate(
newGroupConfig, groupId, groupCoordinatorConfig, shareGroupConfig);
- final GroupConfig newConfig = GroupConfig.fromProps(
- defaultConfig.originals(),
- evaluatedProps
- );
+ final GroupConfig newConfig = new GroupConfig(evaluatedProps);
configMap.put(groupId, newConfig);
}
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
index 1da3ca21412..b5303be53b1 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
@@ -28,14 +28,11 @@ import
org.apache.kafka.coordinator.group.api.assignor.ShareGroupPartitionAssign
import org.apache.kafka.coordinator.group.assignor.RangeAssignor;
import org.apache.kafka.coordinator.group.assignor.SimpleAssignor;
import org.apache.kafka.coordinator.group.assignor.UniformAssignor;
-import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -853,54 +850,6 @@ public class GroupCoordinatorConfig {
return assignors;
}
- /**
- * Copy the subset of properties that are relevant to consumer group,
share group and streams group.
- */
- public Map<String, Integer> extractGroupConfigMap(ShareGroupConfig
shareGroupConfig) {
- Map<String, Integer> defaultConfigs = new HashMap<>();
- defaultConfigs.putAll(extractConsumerGroupConfigMap());
- defaultConfigs.putAll(extractShareGroupConfigMap(shareGroupConfig));
- defaultConfigs.putAll(extractStreamsGroupConfigMap());
- return Collections.unmodifiableMap(defaultConfigs);
- }
-
- /**
- * Copy the subset of properties that are relevant to consumer group.
- */
- public Map<String, Integer> extractConsumerGroupConfigMap() {
- return Map.of(
- GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG,
consumerGroupSessionTimeoutMs(),
- GroupConfig.CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG,
consumerGroupHeartbeatIntervalMs());
- }
-
- /**
- * Copy the subset of properties that are relevant to share group. These
configs include those which can be set
- * statically (for all groups) or dynamically (for a specific group). In
those cases, the default value for the
- * group specific dynamic config (Ex. share.session.timeout.ms) should be
the value set for the static config
- * (Ex. group.share.session.timeout.ms).
- */
- public Map<String, Integer> extractShareGroupConfigMap(ShareGroupConfig
shareGroupConfig) {
- return Map.of(
- GroupConfig.SHARE_SESSION_TIMEOUT_MS_CONFIG,
this.shareGroupSessionTimeoutMs(),
- GroupConfig.SHARE_HEARTBEAT_INTERVAL_MS_CONFIG,
this.shareGroupHeartbeatIntervalMs(),
- GroupConfig.SHARE_RECORD_LOCK_DURATION_MS_CONFIG,
shareGroupConfig.shareGroupRecordLockDurationMs(),
- GroupConfig.SHARE_DELIVERY_COUNT_LIMIT_CONFIG,
shareGroupConfig.shareGroupDeliveryCountLimit(),
- GroupConfig.SHARE_PARTITION_MAX_RECORD_LOCKS_CONFIG,
shareGroupConfig.shareGroupPartitionMaxRecordLocks()
- );
- }
-
- /**
- * Copy the subset of properties that are relevant to streams group.
- */
- public Map<String, Integer> extractStreamsGroupConfigMap() {
- return Map.of(
- GroupConfig.STREAMS_SESSION_TIMEOUT_MS_CONFIG,
streamsGroupSessionTimeoutMs(),
- GroupConfig.STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG,
streamsGroupHeartbeatIntervalMs(),
- GroupConfig.STREAMS_NUM_STANDBY_REPLICAS_CONFIG,
streamsGroupNumStandbyReplicas(),
- GroupConfig.STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG,
streamsGroupInitialRebalanceDelayMs(),
- GroupConfig.STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG,
streamsGroupTaskOffsetIntervalMs());
- }
-
/**
* The number of threads or event loops running.
*/
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index 837caddcb03..81a6e1b0c4a 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -8758,7 +8758,7 @@ public class GroupMetadataManager {
*/
private int consumerGroupSessionTimeoutMs(String groupId) {
Optional<GroupConfig> groupConfig =
groupConfigManager.groupConfig(groupId);
- return groupConfig.map(GroupConfig::consumerSessionTimeoutMs)
+ return groupConfig.flatMap(GroupConfig::consumerSessionTimeoutMs)
.orElse(config.consumerGroupSessionTimeoutMs());
}
@@ -8767,7 +8767,7 @@ public class GroupMetadataManager {
*/
private int consumerGroupHeartbeatIntervalMs(String groupId) {
Optional<GroupConfig> groupConfig =
groupConfigManager.groupConfig(groupId);
- return groupConfig.map(GroupConfig::consumerHeartbeatIntervalMs)
+ return groupConfig.flatMap(GroupConfig::consumerHeartbeatIntervalMs)
.orElse(config.consumerGroupHeartbeatIntervalMs());
}
@@ -8796,7 +8796,7 @@ public class GroupMetadataManager {
*/
private int shareGroupSessionTimeoutMs(String groupId) {
Optional<GroupConfig> groupConfig =
groupConfigManager.groupConfig(groupId);
- return groupConfig.map(GroupConfig::shareSessionTimeoutMs)
+ return groupConfig.flatMap(GroupConfig::shareSessionTimeoutMs)
.orElse(config.shareGroupSessionTimeoutMs());
}
@@ -8805,7 +8805,7 @@ public class GroupMetadataManager {
*/
private int shareGroupHeartbeatIntervalMs(String groupId) {
Optional<GroupConfig> groupConfig =
groupConfigManager.groupConfig(groupId);
- return groupConfig.map(GroupConfig::shareHeartbeatIntervalMs)
+ return groupConfig.flatMap(GroupConfig::shareHeartbeatIntervalMs)
.orElse(config.shareGroupHeartbeatIntervalMs());
}
@@ -8834,7 +8834,7 @@ public class GroupMetadataManager {
*/
private int streamsGroupSessionTimeoutMs(String groupId) {
Optional<GroupConfig> groupConfig =
groupConfigManager.groupConfig(groupId);
- return groupConfig.map(GroupConfig::streamsSessionTimeoutMs)
+ return groupConfig.flatMap(GroupConfig::streamsSessionTimeoutMs)
.orElse(config.streamsGroupSessionTimeoutMs());
}
@@ -8843,7 +8843,7 @@ public class GroupMetadataManager {
*/
private int streamsGroupHeartbeatIntervalMs(String groupId) {
Optional<GroupConfig> groupConfig =
groupConfigManager.groupConfig(groupId);
- return groupConfig.map(GroupConfig::streamsHeartbeatIntervalMs)
+ return groupConfig.flatMap(GroupConfig::streamsHeartbeatIntervalMs)
.orElse(config.streamsGroupHeartbeatIntervalMs());
}
@@ -8872,7 +8872,7 @@ public class GroupMetadataManager {
*/
private int streamsGroupTaskOffsetIntervalMs(String groupId) {
Optional<GroupConfig> groupConfig =
groupConfigManager.groupConfig(groupId);
- return groupConfig.map(GroupConfig::streamsTaskOffsetIntervalMs)
+ return groupConfig.flatMap(GroupConfig::streamsTaskOffsetIntervalMs)
.orElse(config.streamsGroupTaskOffsetIntervalMs());
}
@@ -8881,7 +8881,7 @@ public class GroupMetadataManager {
*/
private int streamsGroupInitialRebalanceDelayMs(String groupId) {
Optional<GroupConfig> groupConfig =
groupConfigManager.groupConfig(groupId);
- return groupConfig.map(GroupConfig::streamsInitialRebalanceDelayMs)
+ return groupConfig.flatMap(GroupConfig::streamsInitialRebalanceDelayMs)
.orElse(config.streamsGroupInitialRebalanceDelayMs());
}
@@ -8897,7 +8897,7 @@ public class GroupMetadataManager {
*/
private Map<String, String> streamsGroupAssignmentConfigs(String groupId) {
Optional<GroupConfig> groupConfig =
groupConfigManager.groupConfig(groupId);
- final Integer numStandbyReplicas =
groupConfig.map(GroupConfig::streamsNumStandbyReplicas)
+ final Integer numStandbyReplicas =
groupConfig.flatMap(GroupConfig::streamsNumStandbyReplicas)
.orElse(config.streamsGroupNumStandbyReplicas());
return new TreeMap<>(Map.of(
"num.standby.replicas", numStandbyReplicas.toString()
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigProvider.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigProvider.java
index 447750fb611..868236f9c63 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigProvider.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigProvider.java
@@ -41,7 +41,7 @@ public class ShareGroupConfigProvider {
*/
public int recordLockDurationMsOrDefault(String groupId, int defaultValue)
{
return manager.groupConfig(groupId)
- .map(GroupConfig::shareRecordLockDurationMs)
+ .flatMap(GroupConfig::shareRecordLockDurationMs)
.orElse(defaultValue);
}
@@ -55,7 +55,7 @@ public class ShareGroupConfigProvider {
*/
public int deliveryCountLimitOrDefault(String groupId, int defaultValue) {
return manager.groupConfig(groupId)
- .map(GroupConfig::shareDeliveryCountLimit)
+ .flatMap(GroupConfig::shareDeliveryCountLimit)
.orElse(defaultValue);
}
@@ -69,7 +69,7 @@ public class ShareGroupConfigProvider {
*/
public int partitionMaxRecordLocksOrDefault(String groupId, int
defaultValue) {
return manager.groupConfig(groupId)
- .map(GroupConfig::sharePartitionMaxRecordLocks)
+ .flatMap(GroupConfig::sharePartitionMaxRecordLocks)
.orElse(defaultValue);
}
@@ -82,7 +82,7 @@ public class ShareGroupConfigProvider {
*/
public boolean isRenewAcknowledgeEnabled(String groupId) {
return manager.groupConfig(groupId)
- .map(GroupConfig::shareRenewAcknowledgeEnable)
+ .flatMap(GroupConfig::shareRenewAcknowledgeEnable)
.orElse(GroupConfig.SHARE_RENEW_ACKNOWLEDGE_ENABLE_DEFAULT);
}
@@ -95,7 +95,7 @@ public class ShareGroupConfigProvider {
*/
public ShareGroupAutoOffsetResetStrategy autoOffsetReset(String groupId) {
return manager.groupConfig(groupId)
- .map(GroupConfig::shareAutoOffsetReset)
+ .flatMap(GroupConfig::shareAutoOffsetReset)
.orElseGet(GroupConfig::defaultShareAutoOffsetReset);
}
}
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigManagerTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigManagerTest.java
index 901ca0c3c1a..062b7f861b8 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigManagerTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigManagerTest.java
@@ -138,8 +138,6 @@ public class GroupConfigManagerTest {
GroupCoordinatorConfig groupCoordinatorConfig =
GroupCoordinatorConfig.fromProps(overrides);
ShareGroupConfig shareGroupConfig =
ShareGroupConfig.fromProps(overrides);
- Map<String, Integer> defaultConfig = new
HashMap<>(groupCoordinatorConfig.extractGroupConfigMap(shareGroupConfig));
-
- return new GroupConfigManager(defaultConfig, groupCoordinatorConfig,
shareGroupConfig);
+ return new GroupConfigManager(groupCoordinatorConfig,
shareGroupConfig);
}
}
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
index d1468b1eba1..96df728aa2c 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
@@ -17,6 +17,7 @@
package org.apache.kafka.coordinator.group;
+import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.InvalidConfigurationException;
import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig;
@@ -394,18 +395,97 @@ public class GroupConfigTest {
}
@Test
- public void testAssignmentIntervalMsAbsentWhenNotConfigured() {
- // When the assignment interval config is absent, the group-level
value is empty.
- Properties props = new Properties();
- GroupConfig config = GroupConfig.fromProps(Map.of(), props);
+ public void testAllFieldsAbsentWhenNotConfigured() {
+ // When no config is provided, all group-level values are empty.
+ GroupConfig config = new GroupConfig(Map.of());
+
+ // Consumer group configs
+ assertEquals(Optional.empty(), config.consumerSessionTimeoutMs());
+ assertEquals(Optional.empty(), config.consumerHeartbeatIntervalMs());
assertEquals(Optional.empty(), config.consumerAssignmentIntervalMs());
+ assertEquals(Optional.empty(), config.consumerAssignorOffloadEnable());
+
+ // Share group configs
+ assertEquals(Optional.empty(), config.shareSessionTimeoutMs());
+ assertEquals(Optional.empty(), config.shareHeartbeatIntervalMs());
+ assertEquals(Optional.empty(), config.shareRecordLockDurationMs());
+ assertEquals(Optional.empty(), config.shareDeliveryCountLimit());
+ assertEquals(Optional.empty(), config.sharePartitionMaxRecordLocks());
+ assertEquals(Optional.empty(), config.shareAutoOffsetReset());
assertEquals(Optional.empty(), config.shareAssignmentIntervalMs());
+ assertEquals(Optional.empty(), config.shareAssignorOffloadEnable());
+ assertEquals(Optional.empty(), config.shareIsolationLevel());
+ assertEquals(Optional.empty(), config.shareRenewAcknowledgeEnable());
+
+ // Streams group configs
+ assertEquals(Optional.empty(), config.streamsSessionTimeoutMs());
+ assertEquals(Optional.empty(), config.streamsHeartbeatIntervalMs());
+ assertEquals(Optional.empty(), config.streamsNumStandbyReplicas());
+ assertEquals(Optional.empty(),
config.streamsInitialRebalanceDelayMs());
assertEquals(Optional.empty(), config.streamsAssignmentIntervalMs());
+ assertEquals(Optional.empty(), config.streamsAssignorOffloadEnable());
+ assertEquals(Optional.empty(), config.streamsTaskOffsetIntervalMs());
}
@Test
- public void testAssignmentIntervalMsNotValidatedWhenNotConfigured() {
- // When the assignment interval config is absent, validation should
not use the default assignment interval.
+ public void testAllFieldsPresentWhenConfigured() {
+ // When all configs are provided, all group-level values are present.
+ Map<String, String> props = new HashMap<>();
+ props.put(GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG, "50000");
+ props.put(GroupConfig.CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, "6000");
+ props.put(GroupConfig.CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG, "5000");
+ props.put(GroupConfig.CONSUMER_ASSIGNOR_OFFLOAD_ENABLE_CONFIG, "true");
+ props.put(GroupConfig.SHARE_SESSION_TIMEOUT_MS_CONFIG, "50000");
+ props.put(GroupConfig.SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, "6000");
+ props.put(GroupConfig.SHARE_RECORD_LOCK_DURATION_MS_CONFIG, "20000");
+ props.put(GroupConfig.SHARE_DELIVERY_COUNT_LIMIT_CONFIG, "5");
+ props.put(GroupConfig.SHARE_PARTITION_MAX_RECORD_LOCKS_CONFIG, "1000");
+ props.put(GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG, "earliest");
+ props.put(GroupConfig.SHARE_ASSIGNMENT_INTERVAL_MS_CONFIG, "2500");
+ props.put(GroupConfig.SHARE_ASSIGNOR_OFFLOAD_ENABLE_CONFIG, "true");
+ props.put(GroupConfig.SHARE_ISOLATION_LEVEL_CONFIG, "read_committed");
+ props.put(GroupConfig.SHARE_RENEW_ACKNOWLEDGE_ENABLE_CONFIG, "false");
+ props.put(GroupConfig.STREAMS_SESSION_TIMEOUT_MS_CONFIG, "50000");
+ props.put(GroupConfig.STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG, "6000");
+ props.put(GroupConfig.STREAMS_NUM_STANDBY_REPLICAS_CONFIG, "2");
+ props.put(GroupConfig.STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG,
"3000");
+ props.put(GroupConfig.STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG, "1250");
+ props.put(GroupConfig.STREAMS_ASSIGNOR_OFFLOAD_ENABLE_CONFIG, "false");
+ props.put(GroupConfig.STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG, "30000");
+
+ GroupConfig config = new GroupConfig(props);
+
+ // Consumer group configs
+ assertEquals(Optional.of(50000), config.consumerSessionTimeoutMs());
+ assertEquals(Optional.of(6000), config.consumerHeartbeatIntervalMs());
+ assertEquals(Optional.of(5000), config.consumerAssignmentIntervalMs());
+ assertEquals(Optional.of(true),
config.consumerAssignorOffloadEnable());
+
+ // Share group configs
+ assertEquals(Optional.of(50000), config.shareSessionTimeoutMs());
+ assertEquals(Optional.of(6000), config.shareHeartbeatIntervalMs());
+ assertEquals(Optional.of(20000), config.shareRecordLockDurationMs());
+ assertEquals(Optional.of(5), config.shareDeliveryCountLimit());
+ assertEquals(Optional.of(1000), config.sharePartitionMaxRecordLocks());
+ assertEquals(Optional.of(ShareGroupAutoOffsetResetStrategy.EARLIEST),
config.shareAutoOffsetReset());
+ assertEquals(Optional.of(2500), config.shareAssignmentIntervalMs());
+ assertEquals(Optional.of(true), config.shareAssignorOffloadEnable());
+ assertEquals(Optional.of(IsolationLevel.READ_COMMITTED),
config.shareIsolationLevel());
+ assertEquals(Optional.of(false), config.shareRenewAcknowledgeEnable());
+
+ // Streams group configs
+ assertEquals(Optional.of(50000), config.streamsSessionTimeoutMs());
+ assertEquals(Optional.of(6000), config.streamsHeartbeatIntervalMs());
+ assertEquals(Optional.of(2), config.streamsNumStandbyReplicas());
+ assertEquals(Optional.of(3000),
config.streamsInitialRebalanceDelayMs());
+ assertEquals(Optional.of(1250), config.streamsAssignmentIntervalMs());
+ assertEquals(Optional.of(false),
config.streamsAssignorOffloadEnable());
+ assertEquals(Optional.of(30000), config.streamsTaskOffsetIntervalMs());
+ }
+
+ @Test
+ public void testNotValidatedWhenNotConfigured() {
+ // When configs are absent, validation should not use their default
values.
GroupCoordinatorConfig groupCoordinatorConfig =
GroupCoordinatorConfig.fromProps(Map.of(
GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, 2000,
GroupCoordinatorConfig.SHARE_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG,
2000,
@@ -417,16 +497,6 @@ public class GroupConfigTest {
assertDoesNotThrow(() -> GroupConfig.validate(Map.of(),
groupCoordinatorConfig, createShareGroupConfig()));
}
- @Test
- public void testAssignorOffloadEnableAbsentWhenNotConfigured() {
- // When the offload enable config is absent, the group-level value is
empty.
- Properties props = new Properties();
- GroupConfig config = GroupConfig.fromProps(Map.of(), props);
- assertEquals(Optional.empty(), config.consumerAssignorOffloadEnable());
- assertEquals(Optional.empty(), config.shareAssignorOffloadEnable());
- assertEquals(Optional.empty(), config.streamsAssignorOffloadEnable());
- }
-
@Test
public void testInvalidConfigName() {
Map<String, String> props = new HashMap<>();
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index ab1807a3cbc..a4b8141ed5c 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -21252,7 +21252,6 @@ public class GroupMetadataManagerTest {
GroupCoordinatorConfig groupCoordinatorConfig = new
GroupCoordinatorConfig(kafkaConfig);
ShareGroupConfig shareGroupConfig = new ShareGroupConfig(kafkaConfig);
GroupConfigManager groupConfigManager = new GroupConfigManager(
- groupCoordinatorConfig.extractGroupConfigMap(shareGroupConfig),
groupCoordinatorConfig,
shareGroupConfig
);
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigProviderTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigProviderTest.java
index 1eec4abc0f3..a426b51554b 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigProviderTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigProviderTest.java
@@ -37,7 +37,7 @@ public class ShareGroupConfigProviderTest {
void testRecordLockDurationMsOrDefaultWithGroupConfig() {
GroupConfigManager groupConfigManager = mock(GroupConfigManager.class);
GroupConfig groupConfig = mock(GroupConfig.class);
- when(groupConfig.shareRecordLockDurationMs()).thenReturn(1000);
+
when(groupConfig.shareRecordLockDurationMs()).thenReturn(Optional.of(1000));
when(groupConfigManager.groupConfig("test-group")).thenReturn(Optional.of(groupConfig));
provider = new ShareGroupConfigProvider(groupConfigManager);
@@ -57,7 +57,7 @@ public class ShareGroupConfigProviderTest {
void testDeliveryCountLimitOrDefaultWithGroupConfig() {
GroupConfigManager groupConfigManager = mock(GroupConfigManager.class);
GroupConfig groupConfig = mock(GroupConfig.class);
- when(groupConfig.shareDeliveryCountLimit()).thenReturn(8);
+ when(groupConfig.shareDeliveryCountLimit()).thenReturn(Optional.of(8));
when(groupConfigManager.groupConfig("test-group")).thenReturn(Optional.of(groupConfig));
provider = new ShareGroupConfigProvider(groupConfigManager);
@@ -77,7 +77,7 @@ public class ShareGroupConfigProviderTest {
void testPartitionMaxRecordLocksOrDefaultWithGroupConfig() {
GroupConfigManager groupConfigManager = mock(GroupConfigManager.class);
GroupConfig groupConfig = mock(GroupConfig.class);
- when(groupConfig.sharePartitionMaxRecordLocks()).thenReturn(5000);
+
when(groupConfig.sharePartitionMaxRecordLocks()).thenReturn(Optional.of(5000));
when(groupConfigManager.groupConfig("test-group")).thenReturn(Optional.of(groupConfig));
provider = new ShareGroupConfigProvider(groupConfigManager);
@@ -97,7 +97,7 @@ public class ShareGroupConfigProviderTest {
void testIsRenewAcknowledgeDisabledWithGroupConfig() {
GroupConfigManager groupConfigManager = mock(GroupConfigManager.class);
GroupConfig groupConfig = mock(GroupConfig.class);
- when(groupConfig.shareRenewAcknowledgeEnable()).thenReturn(false);
+
when(groupConfig.shareRenewAcknowledgeEnable()).thenReturn(Optional.of(false));
when(groupConfigManager.groupConfig("test-group")).thenReturn(Optional.of(groupConfig));
provider = new ShareGroupConfigProvider(groupConfigManager);
@@ -117,7 +117,7 @@ public class ShareGroupConfigProviderTest {
void testAutoOffsetResetWithGroupConfig() {
GroupConfigManager groupConfigManager = mock(GroupConfigManager.class);
GroupConfig groupConfig = mock(GroupConfig.class);
-
when(groupConfig.shareAutoOffsetReset()).thenReturn(ShareGroupAutoOffsetResetStrategy.EARLIEST);
+
when(groupConfig.shareAutoOffsetReset()).thenReturn(Optional.of(ShareGroupAutoOffsetResetStrategy.EARLIEST));
when(groupConfigManager.groupConfig("test-group")).thenReturn(Optional.of(groupConfig));
provider = new ShareGroupConfigProvider(groupConfigManager);
diff --git
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/coordinator/GroupCoordinatorShardLoadingBenchmark.java
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/coordinator/GroupCoordinatorShardLoadingBenchmark.java
index f2f3b835c50..31798621f95 100644
---
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/coordinator/GroupCoordinatorShardLoadingBenchmark.java
+++
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/coordinator/GroupCoordinatorShardLoadingBenchmark.java
@@ -290,7 +290,7 @@ public class GroupCoordinatorShardLoadingBenchmark {
@Setup(Level.Invocation)
public void setupInvocation() {
- GroupConfigManager configManager = new GroupConfigManager(new
HashMap<>(), config, shareGroupConfig);
+ GroupConfigManager configManager = new GroupConfigManager(config,
shareGroupConfig);
LogContext logContext = new LogContext();
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(logContext);