This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 07e0caefef7 KAFKA-18652: Add `num.warmup.replicas` config (#21801)
07e0caefef7 is described below
commit 07e0caefef7fbb433939e75c7fd37deb9bd1385a
Author: Matthias J. Sax <[email protected]>
AuthorDate: Tue Apr 28 11:23:36 2026 -0700
KAFKA-18652: Add `num.warmup.replicas` config (#21801)
KIP-1071 adds new broker side configs:
- num.warmup.replicas
- max.warmup.replicas
This PR adds both configs to the broker.
Reviewers: Lucas Brutschy <[email protected]>, Lan Ding
<[email protected]>, Chia-Ping Tsai <[email protected]>
---
checkstyle/suppressions.xml | 2 +-
.../scala/unit/kafka/server/KafkaApisTest.scala | 4 +-
.../kafka/coordinator/group/GroupConfig.java | 31 +++++++++++
.../coordinator/group/GroupCoordinatorConfig.java | 33 +++++++++++-
.../kafka/coordinator/group/GroupConfigTest.java | 15 ++++++
.../group/GroupCoordinatorConfigTest.java | 63 ++++++++++++++++++++++
.../kafka/tools/ConfigCommandIntegrationTest.java | 34 ++++++++++++
7 files changed, 177 insertions(+), 5 deletions(-)
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 53dbac88113..c61fc9c48f5 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -269,7 +269,7 @@
<!-- group coordinator -->
<suppress checks="CyclomaticComplexity"
-
files="(ConsumerGroupMember|GroupMetadataManager|GroupCoordinatorRecordSerde|GroupMetadataManagerTestContext).java"/>
+
files="(ConsumerGroupMember|GroupMetadataManager|GroupCoordinatorRecordSerde|GroupMetadataManagerTestContext|GroupConfigTest).java"/>
<suppress checks="(NPathComplexity|MethodLength)"
files="(GroupMetadataManager|GroupMetadataManagerTest|GroupMetadataManagerTestContext|GroupCoordinatorShard).java"/>
<suppress checks="ClassFanOutComplexity"
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 49d7fe11139..9331650e1a9 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -77,7 +77,7 @@ import org.apache.kafka.common.security.auth.{KafkaPrincipal,
KafkaPrincipalSerd
import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource
import org.apache.kafka.common.utils.{ProducerIdAndEpoch, SecurityUtils, Utils}
import org.apache.kafka.common.utils.internals.ImplicitLinkedHashCollection
-import
org.apache.kafka.coordinator.group.GroupConfig.{CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG,
CONSUMER_ASSIGNOR_OFFLOAD_ENABLE_CONFIG,
CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, CONSUMER_SESSION_TIMEOUT_MS_CONFIG,
SHARE_ASSIGNMENT_INTERVAL_MS_CONFIG, SHARE_ASSIGNOR_OFFLOAD_ENABLE_CONFIG,
SHARE_AUTO_OFFSET_RESET_CONFIG, SHARE_DELIVERY_COUNT_LIMIT_CONFIG,
SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, SHARE_ISOLATION_LEVEL_CONFIG,
SHARE_PARTITION_MAX_RECORD_LOCKS_CONFIG, SHARE_RECORD_LOCK_DURATION_MS_CO [...]
+import
org.apache.kafka.coordinator.group.GroupConfig.{CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG,
CONSUMER_ASSIGNOR_OFFLOAD_ENABLE_CONFIG,
CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, CONSUMER_SESSION_TIMEOUT_MS_CONFIG,
SHARE_ASSIGNMENT_INTERVAL_MS_CONFIG, SHARE_ASSIGNOR_OFFLOAD_ENABLE_CONFIG,
SHARE_AUTO_OFFSET_RESET_CONFIG, SHARE_DELIVERY_COUNT_LIMIT_CONFIG,
SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, SHARE_ISOLATION_LEVEL_CONFIG,
SHARE_PARTITION_MAX_RECORD_LOCKS_CONFIG, SHARE_RECORD_LOCK_DURATION_MS_CO [...]
import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig
import org.apache.kafka.coordinator.group.{GroupConfig, GroupConfigManager,
GroupCoordinator, GroupCoordinatorConfig}
import org.apache.kafka.coordinator.group.streams.StreamsGroupHeartbeatResult
@@ -379,9 +379,9 @@ class KafkaApisTest extends Logging {
cgConfigs.put(STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG, "1000")
cgConfigs.put(STREAMS_ASSIGNOR_OFFLOAD_ENABLE_CONFIG, "false");
cgConfigs.put(STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG,
GroupCoordinatorConfig.STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_DEFAULT.toString)
+ cgConfigs.put(STREAMS_NUM_WARMUP_REPLICAS_CONFIG,
GroupCoordinatorConfig.STREAMS_GROUP_NUM_WARMUP_REPLICAS_DEFAULT.toString)
cgConfigs.put(ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG, "")
cgConfigs.put(ERRORS_DEADLETTERQUEUE_COPY_RECORD_ENABLE_CONFIG, "false")
-
when(configRepository.groupConfig(consumerGroupId)).thenReturn(cgConfigs)
val describeConfigsRequest = new DescribeConfigsRequest.Builder(new
DescribeConfigsRequestData()
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
index 0404b299bf9..5ee40332369 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
@@ -105,6 +105,8 @@ public final class GroupConfig extends AbstractConfig {
public static final String STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG =
"streams.task.offset.interval.ms";
+ public static final String STREAMS_NUM_WARMUP_REPLICAS_CONFIG =
"streams.num.warmup.replicas";
+
public static final String ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG =
"errors.deadletterqueue.topic.name";
public static final String ERRORS_DEADLETTERQUEUE_TOPIC_NAME_DEFAULT = "";
public static final String ERRORS_DEADLETTERQUEUE_TOPIC_NAME_DOC = "The
name of the topic to be used as the dead-letter queue (DLQ) topic for this
share group. If blank (the default), the group does not have a DLQ topic.";
@@ -151,6 +153,8 @@ public final class GroupConfig extends AbstractConfig {
private final Optional<Integer> streamsTaskOffsetIntervalMs;
+ private final Optional<Integer> streamsNumWarmupReplicas;
+
private final Optional<IsolationLevel> shareIsolationLevel;
private final Optional<Boolean> shareRenewAcknowledgeEnable;
@@ -282,6 +286,12 @@ public final class GroupConfig extends AbstractConfig {
atLeast(1),
MEDIUM,
GroupCoordinatorConfig.STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_DOC)
+ .define(STREAMS_NUM_WARMUP_REPLICAS_CONFIG,
+ INT,
+ GroupCoordinatorConfig.STREAMS_GROUP_NUM_WARMUP_REPLICAS_DEFAULT,
+ atLeast(0),
+ MEDIUM,
+ GroupCoordinatorConfig.STREAMS_GROUP_NUM_WARMUP_REPLICAS_DOC)
// DLQ configurations (KIP-1191)
.define(ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG,
@@ -326,6 +336,7 @@ public final class GroupConfig extends AbstractConfig {
Map.entry(STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG,
Optional.of(GroupCoordinatorConfig.STREAMS_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG)),
Map.entry(STREAMS_ASSIGNOR_OFFLOAD_ENABLE_CONFIG,
Optional.of(GroupCoordinatorConfig.STREAMS_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG)),
Map.entry(STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG,
Optional.of(GroupCoordinatorConfig.STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_CONFIG)),
+ Map.entry(STREAMS_NUM_WARMUP_REPLICAS_CONFIG,
Optional.of(GroupCoordinatorConfig.STREAMS_GROUP_NUM_WARMUP_REPLICAS_CONFIG)),
// DLQ configs
Map.entry(ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG, Optional.empty()),
@@ -362,6 +373,7 @@ public final class GroupConfig extends AbstractConfig {
this.streamsAssignmentIntervalMs =
optionalInt(STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG);
this.streamsAssignorOffloadEnable =
optionalBoolean(STREAMS_ASSIGNOR_OFFLOAD_ENABLE_CONFIG);
this.streamsTaskOffsetIntervalMs =
optionalInt(STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG);
+ this.streamsNumWarmupReplicas =
optionalInt(STREAMS_NUM_WARMUP_REPLICAS_CONFIG);
this.shareIsolationLevel = optionalString(SHARE_ISOLATION_LEVEL_CONFIG)
.map(s -> IsolationLevel.valueOf(s.toUpperCase(Locale.ROOT)));
this.shareRenewAcknowledgeEnable =
optionalBoolean(SHARE_RENEW_ACKNOWLEDGE_ENABLE_CONFIG);
@@ -542,6 +554,12 @@ public final class GroupConfig extends AbstractConfig {
groupCoordinatorConfig.streamsGroupMinTaskOffsetIntervalMs(),
GroupCoordinatorConfig.STREAMS_GROUP_MIN_TASK_OFFSET_INTERVAL_MS_CONFIG
);
+ validateIntMax(
+ parsed,
+ STREAMS_NUM_WARMUP_REPLICAS_CONFIG,
+ groupCoordinatorConfig.streamsGroupMaxWarmupReplicas(),
+ GroupCoordinatorConfig.STREAMS_GROUP_MAX_WARMUP_REPLICAS_CONFIG
+ );
// Cross-field validations: session timeout must be greater than
heartbeat interval.
validateSessionExceedsHeartbeat(
@@ -782,6 +800,12 @@ public final class GroupConfig extends AbstractConfig {
STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG,
groupCoordinatorConfig.streamsGroupMinTaskOffsetIntervalMs()
);
+ clampToMax(
+ props,
+ groupId,
+ STREAMS_NUM_WARMUP_REPLICAS_CONFIG,
+ groupCoordinatorConfig.streamsGroupMaxWarmupReplicas()
+ );
// Verify that clamping did not break the session > heartbeat
invariant.
checkSessionExceedsHeartbeat(
@@ -1084,6 +1108,13 @@ public final class GroupConfig extends AbstractConfig {
return streamsTaskOffsetIntervalMs;
}
+ /**
+ * The number of warmup replicas for each task.
+ */
+ public Optional<Integer> streamsNumWarmupReplicas() {
+ return streamsNumWarmupReplicas;
+ }
+
/**
* The share group isolation level.
*/
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
index 30e79e4631c..a6cc89d8a75 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
@@ -392,6 +392,14 @@ public class GroupCoordinatorConfig {
public static final int STREAMS_GROUP_MIN_TASK_OFFSET_INTERVAL_MS_DEFAULT
= 15000;
public static final String STREAMS_GROUP_MIN_TASK_OFFSET_INTERVAL_MS_DOC =
"The minimum allowed value for the group-level configuration of " +
GroupConfig.STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG;
+ public static final String STREAMS_GROUP_NUM_WARMUP_REPLICAS_CONFIG =
"group.streams.num.warmup.replicas";
+ public static final int STREAMS_GROUP_NUM_WARMUP_REPLICAS_DEFAULT = 2;
+ public static final String STREAMS_GROUP_NUM_WARMUP_REPLICAS_DOC = "The
maximum number of warmup task replicas.";
+
+ public static final String STREAMS_GROUP_MAX_WARMUP_REPLICAS_CONFIG =
"group.streams.max.warmup.replicas";
+ public static final int STREAMS_GROUP_MAX_WARMUP_REPLICAS_DEFAULT = 20;
+ public static final String STREAMS_GROUP_MAX_WARMUP_REPLICAS_DOC = "The
maximum allowed value for the group-level configuration of " +
GroupConfig.STREAMS_NUM_WARMUP_REPLICAS_CONFIG;
+
public static final Set<String> RECONFIGURABLE_CONFIGS = Set.of(
CACHED_BUFFER_MAX_BYTES_CONFIG,
CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG,
@@ -482,7 +490,9 @@ public class GroupCoordinatorConfig {
.define(STREAMS_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_CONFIG, INT,
STREAMS_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_DEFAULT, atLeast(0), MEDIUM,
STREAMS_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_DOC)
.define(STREAMS_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG, BOOLEAN,
STREAMS_GROUP_ASSIGNOR_OFFLOAD_ENABLE_DEFAULT, MEDIUM,
STREAMS_GROUP_ASSIGNOR_OFFLOAD_ENABLE_DOC)
.define(STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_CONFIG, INT,
STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM,
STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_DOC)
- .define(STREAMS_GROUP_MIN_TASK_OFFSET_INTERVAL_MS_CONFIG, INT,
STREAMS_GROUP_MIN_TASK_OFFSET_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM,
STREAMS_GROUP_MIN_TASK_OFFSET_INTERVAL_MS_DOC);
+ .define(STREAMS_GROUP_MIN_TASK_OFFSET_INTERVAL_MS_CONFIG, INT,
STREAMS_GROUP_MIN_TASK_OFFSET_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM,
STREAMS_GROUP_MIN_TASK_OFFSET_INTERVAL_MS_DOC)
+ .define(STREAMS_GROUP_NUM_WARMUP_REPLICAS_CONFIG, INT,
STREAMS_GROUP_NUM_WARMUP_REPLICAS_DEFAULT, atLeast(0), MEDIUM,
STREAMS_GROUP_NUM_WARMUP_REPLICAS_DOC)
+ .define(STREAMS_GROUP_MAX_WARMUP_REPLICAS_CONFIG, INT,
STREAMS_GROUP_MAX_WARMUP_REPLICAS_DEFAULT, atLeast(0), MEDIUM,
STREAMS_GROUP_MAX_WARMUP_REPLICAS_DOC);
/**
@@ -548,6 +558,8 @@ public class GroupCoordinatorConfig {
private final int streamsGroupMaxAssignmentIntervalMs;
private final int streamsGroupTaskOffsetIntervalMs;
private final int streamsGroupMinTaskOffsetIntervalMs;
+ private final int streamsGroupNumWarmupReplicas;
+ private final int streamsGroupMaxWarmupReplicas;
private final AbstractConfig config;
@@ -615,6 +627,8 @@ public class GroupCoordinatorConfig {
this.streamsGroupMaxAssignmentIntervalMs =
config.getInt(GroupCoordinatorConfig.STREAMS_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_CONFIG);
this.streamsGroupTaskOffsetIntervalMs =
config.getInt(GroupCoordinatorConfig.STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_CONFIG);
this.streamsGroupMinTaskOffsetIntervalMs =
config.getInt(GroupCoordinatorConfig.STREAMS_GROUP_MIN_TASK_OFFSET_INTERVAL_MS_CONFIG);
+ this.streamsGroupNumWarmupReplicas =
config.getInt(GroupCoordinatorConfig.STREAMS_GROUP_NUM_WARMUP_REPLICAS_CONFIG);
+ this.streamsGroupMaxWarmupReplicas =
config.getInt(GroupCoordinatorConfig.STREAMS_GROUP_MAX_WARMUP_REPLICAS_CONFIG);
this.config = config;
// New group coordinator configs validation.
@@ -714,9 +728,10 @@ public class GroupCoordinatorConfig {
require(streamsGroupNumStandbyReplicas <=
streamsGroupMaxStandbyReplicas,
String.format("%s must be less than or equal to %s",
STREAMS_GROUP_NUM_STANDBY_REPLICAS_CONFIG,
STREAMS_GROUP_MAX_STANDBY_REPLICAS_CONFIG));
-
require(streamsGroupTaskOffsetIntervalMs >=
streamsGroupMinTaskOffsetIntervalMs,
String.format("%s must be greater than or equal to %s",
STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_CONFIG,
STREAMS_GROUP_MIN_TASK_OFFSET_INTERVAL_MS_CONFIG));
+ require(streamsGroupNumWarmupReplicas <= streamsGroupMaxWarmupReplicas,
+ String.format("%s must be less than or equal to %s",
STREAMS_GROUP_NUM_WARMUP_REPLICAS_CONFIG,
STREAMS_GROUP_MAX_WARMUP_REPLICAS_CONFIG));
}
@@ -1334,4 +1349,18 @@ public class GroupCoordinatorConfig {
public int streamsGroupMinTaskOffsetIntervalMs() {
return streamsGroupMinTaskOffsetIntervalMs;
}
+
+ /**
+ * The maximum number of warmup replicas for streams groups.
+ */
+ public int streamsGroupNumWarmupReplicas() {
+ return streamsGroupNumWarmupReplicas;
+ }
+
+ /**
+ * The maximum allowed number of warmup replicas to be configured for
streams groups
+ */
+ public int streamsGroupMaxWarmupReplicas() {
+ return streamsGroupMaxWarmupReplicas;
+ }
}
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
index a79ec3ac1fc..83ef07ac496 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
@@ -50,6 +50,7 @@ import static
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.STREAMS_
import static
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DEFAULT;
import static
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.STREAMS_GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT;
import static
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.STREAMS_GROUP_MAX_STANDBY_REPLICAS_DEFAULT;
+import static
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.STREAMS_GROUP_MAX_WARMUP_REPLICAS_DEFAULT;
import static
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.STREAMS_GROUP_MIN_HEARTBEAT_INTERVAL_MS_DEFAULT;
import static
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_DEFAULT;
import static
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.STREAMS_GROUP_MIN_TASK_OFFSET_INTERVAL_MS_DEFAULT;
@@ -127,6 +128,8 @@ public class GroupConfigTest {
assertPropertyInvalid(name, "not_a_boolean");
} else if
(GroupConfig.STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG.equals(name)) {
assertPropertyInvalid(name, "not_a_number", "1.0");
+ } else if
(GroupConfig.STREAMS_NUM_WARMUP_REPLICAS_CONFIG.equals(name)) {
+ assertPropertyInvalid(name, "not_a_number", "1.0");
} else if
(GroupConfig.ERRORS_DEADLETTERQUEUE_COPY_RECORD_ENABLE_CONFIG.equals(name)) {
assertPropertyInvalid(name, "not_a_boolean");
} else if
(!GroupConfig.ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG.equals(name)) {
@@ -322,6 +325,11 @@ public class GroupConfigTest {
doTestInvalidProps(props, InvalidConfigurationException.class);
props = createValidGroupConfig();
+ // Check for invalid streamsNumWarmupReplicas, > MAX
+ props.put(GroupConfig.STREAMS_NUM_WARMUP_REPLICAS_CONFIG, "50");
+ doTestInvalidProps(props, InvalidConfigurationException.class);
+ props = createValidGroupConfig();
+
// Check for invalid shareIsolationLevel.
props.put(GroupConfig.SHARE_ISOLATION_LEVEL_CONFIG, "read_commit");
doTestInvalidProps(props, ConfigException.class);
@@ -368,6 +376,7 @@ public class GroupConfigTest {
defaultValue.put(GroupConfig.STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG,
"1250");
defaultValue.put(GroupConfig.STREAMS_ASSIGNOR_OFFLOAD_ENABLE_CONFIG,
"false");
defaultValue.put(GroupConfig.STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG,
"30000");
+ defaultValue.put(GroupConfig.STREAMS_NUM_WARMUP_REPLICAS_CONFIG, "5");
defaultValue.put(GroupConfig.SHARE_RENEW_ACKNOWLEDGE_ENABLE_CONFIG,
"true");
Properties props = new Properties();
@@ -394,6 +403,7 @@ public class GroupConfigTest {
assertEquals(1250,
config.getInt(GroupConfig.STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG));
assertEquals(false,
config.getBoolean(GroupConfig.STREAMS_ASSIGNOR_OFFLOAD_ENABLE_CONFIG));
assertEquals(30000,
config.getInt(GroupConfig.STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG));
+ assertEquals(5,
config.getInt(GroupConfig.STREAMS_NUM_WARMUP_REPLICAS_CONFIG));
assertEquals(true,
config.getBoolean(GroupConfig.SHARE_RENEW_ACKNOWLEDGE_ENABLE_CONFIG));
}
@@ -640,6 +650,10 @@ public class GroupConfigTest {
Arguments.of(
GroupConfig.STREAMS_NUM_STANDBY_REPLICAS_CONFIG,
5, STREAMS_GROUP_MAX_STANDBY_REPLICAS_DEFAULT
+ ),
+ Arguments.of(
+ GroupConfig.STREAMS_NUM_WARMUP_REPLICAS_CONFIG,
+ 25, STREAMS_GROUP_MAX_WARMUP_REPLICAS_DEFAULT
)
);
}
@@ -766,6 +780,7 @@ public class GroupConfigTest {
props.put(GroupConfig.STREAMS_NUM_STANDBY_REPLICAS_CONFIG, "1");
props.put(GroupConfig.STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG,
"3000");
props.put(GroupConfig.STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG, "45000");
+ props.put(GroupConfig.STREAMS_NUM_WARMUP_REPLICAS_CONFIG, "3");
props.put(GroupConfig.SHARE_RENEW_ACKNOWLEDGE_ENABLE_CONFIG, "true");
return props;
}
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
index f9dc304e16d..e0327148e4a 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
@@ -569,6 +569,35 @@ public class GroupCoordinatorConfigTest {
configs.clear();
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_CONFIG,
GroupCoordinatorConfig.STREAMS_GROUP_MIN_TASK_OFFSET_INTERVAL_MS_DEFAULT);
createConfig(configs);
+
+
+ // group.streams.num.warmup.replicas
+
+ // cannot be negative
+ configs.clear();
+
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_NUM_WARMUP_REPLICAS_CONFIG,
-1);
+ assertEquals("Invalid value -1 for configuration
group.streams.num.warmup.replicas: Value must be at least 0",
+ assertThrows(ConfigException.class, () ->
createConfig(configs)).getMessage());
+
+ // can be MAX
+ configs.clear();
+
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_NUM_WARMUP_REPLICAS_CONFIG,
GroupCoordinatorConfig.STREAMS_GROUP_MAX_WARMUP_REPLICAS_DEFAULT);
+ createConfig(configs);
+
+ // cannot be larger than MAX
+ configs.clear();
+
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_NUM_WARMUP_REPLICAS_CONFIG,
GroupCoordinatorConfig.STREAMS_GROUP_MAX_WARMUP_REPLICAS_DEFAULT + 1);
+ assertEquals("group.streams.num.warmup.replicas must be less than or
equal to group.streams.max.warmup.replicas",
+ assertThrows(IllegalArgumentException.class, () ->
createConfig(configs)).getMessage());
+
+
+ // group.streams.max.warmup.replicas
+
+ // cannot be negative
+ configs.clear();
+
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_MAX_WARMUP_REPLICAS_CONFIG,
-1);
+ assertEquals("Invalid value -1 for configuration
group.streams.max.warmup.replicas: Value must be at least 0",
+ assertThrows(ConfigException.class, () ->
createConfig(configs)).getMessage());
}
@Test
@@ -724,6 +753,40 @@ public class GroupCoordinatorConfigTest {
assertEquals(20000, config.streamsGroupMinTaskOffsetIntervalMs());
}
+ @Test
+ public void testStreamsGroupNumWarmupReplicasDefaultValue() {
+ Map<String, Object> configs = new HashMap<>();
+ GroupCoordinatorConfig config = createConfig(configs);
+ assertEquals(2, config.streamsGroupNumWarmupReplicas());
+
assertEquals(GroupCoordinatorConfig.STREAMS_GROUP_NUM_WARMUP_REPLICAS_DEFAULT,
+ config.streamsGroupNumWarmupReplicas());
+ }
+
+ @Test
+ public void testStreamsGroupNumWarmupReplicasCustomValue() {
+ Map<String, Object> configs = new HashMap<>();
+
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_NUM_WARMUP_REPLICAS_CONFIG, 5);
+ GroupCoordinatorConfig config = createConfig(configs);
+ assertEquals(5, config.streamsGroupNumWarmupReplicas());
+ }
+
+ @Test
+ public void testStreamsGroupMaxWarmupReplicasDefaultValue() {
+ Map<String, Object> configs = new HashMap<>();
+ GroupCoordinatorConfig config = createConfig(configs);
+ assertEquals(20, config.streamsGroupMaxWarmupReplicas());
+
assertEquals(GroupCoordinatorConfig.STREAMS_GROUP_MAX_WARMUP_REPLICAS_DEFAULT,
+ config.streamsGroupMaxWarmupReplicas());
+ }
+
+ @Test
+ public void testStreamsGroupMaxWarmupReplicasCustomValue() {
+ Map<String, Object> configs = new HashMap<>();
+
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_MAX_WARMUP_REPLICAS_CONFIG,
30);
+ GroupCoordinatorConfig config = createConfig(configs);
+ assertEquals(30, config.streamsGroupMaxWarmupReplicas());
+ }
+
@Test
public void testDLQAutoCreateTopicsEnableDefaultValue() {
Map<String, Object> configs = new HashMap<>();
diff --git
a/tools/src/test/java/org/apache/kafka/tools/ConfigCommandIntegrationTest.java
b/tools/src/test/java/org/apache/kafka/tools/ConfigCommandIntegrationTest.java
index d2c49088f8d..c4368076690 100644
---
a/tools/src/test/java/org/apache/kafka/tools/ConfigCommandIntegrationTest.java
+++
b/tools/src/test/java/org/apache/kafka/tools/ConfigCommandIntegrationTest.java
@@ -436,6 +436,40 @@ public class ConfigCommandIntegrationTest {
}
+ @ClusterTest
+ public void testAlterStreamsGroupNumWarmupReplicas() {
+ // Verify the initial config
+ Stream<String> command = Stream.concat(quorumArgs(), Stream.of(
+ "--entity-type", "groups",
+ "--entity-name", "group",
+ "--describe", "--all"));
+ String message = captureStandardOut(run(command));
+ assertTrue(message.contains("streams.num.warmup.replicas=2"));
+
+ // Alter num warmup replicas
+ command = Stream.concat(quorumArgs(), Stream.of(
+ "--entity-type", "groups",
+ "--entity-name", "group",
+ "--alter", "--add-config", "streams.num.warmup.replicas=5"));
+ message = captureStandardOut(run(command));
+ assertEquals("Completed updating config for group group.", message);
+
+ // Verify the updated config
+ command = Stream.concat(quorumArgs(), Stream.of(
+ "--entity-type", "groups",
+ "--describe"));
+ message = captureStandardOut(run(command));
+ assertTrue(message.contains("streams.num.warmup.replicas=5"));
+
+ // Should fail to set above max
+ command = Stream.concat(quorumArgs(), Stream.of(
+ "--entity-type", "groups",
+ "--entity-name", "group",
+ "--alter", "--add-config", "streams.num.warmup.replicas=25"));
+ message = captureStandardErr(run(command));
+ assertTrue(message.contains("streams.num.warmup.replicas must be less
than or equal to group.streams.max.warmup.replicas"));
+ }
+
private void verifyGroupConfigUpdate(List<String> alterOpts) throws
Exception {
try (Admin client = cluster.admin()) {
// Add config