This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 07e0caefef7 KAFKA-18652: Add `num.warmup.replicas` config (#21801)
07e0caefef7 is described below

commit 07e0caefef7fbb433939e75c7fd37deb9bd1385a
Author: Matthias J. Sax <[email protected]>
AuthorDate: Tue Apr 28 11:23:36 2026 -0700

    KAFKA-18652: Add `num.warmup.replicas` config (#21801)
    
    KIP-1071 adds new broker side configs:
    
    - num.warmup.replicas
    - max.warmup.replicas
    
    This PR adds both configs to the broker.
    
    Reviewers: Lucas Brutschy <[email protected]>, Lan Ding
     <[email protected]>, Chia-Ping Tsai <[email protected]>
---
 checkstyle/suppressions.xml                        |  2 +-
 .../scala/unit/kafka/server/KafkaApisTest.scala    |  4 +-
 .../kafka/coordinator/group/GroupConfig.java       | 31 +++++++++++
 .../coordinator/group/GroupCoordinatorConfig.java  | 33 +++++++++++-
 .../kafka/coordinator/group/GroupConfigTest.java   | 15 ++++++
 .../group/GroupCoordinatorConfigTest.java          | 63 ++++++++++++++++++++++
 .../kafka/tools/ConfigCommandIntegrationTest.java  | 34 ++++++++++++
 7 files changed, 177 insertions(+), 5 deletions(-)

diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 53dbac88113..c61fc9c48f5 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -269,7 +269,7 @@
 
     <!-- group coordinator -->
     <suppress checks="CyclomaticComplexity"
-              
files="(ConsumerGroupMember|GroupMetadataManager|GroupCoordinatorRecordSerde|GroupMetadataManagerTestContext).java"/>
+              
files="(ConsumerGroupMember|GroupMetadataManager|GroupCoordinatorRecordSerde|GroupMetadataManagerTestContext|GroupConfigTest).java"/>
     <suppress checks="(NPathComplexity|MethodLength)"
               
files="(GroupMetadataManager|GroupMetadataManagerTest|GroupMetadataManagerTestContext|GroupCoordinatorShard).java"/>
     <suppress checks="ClassFanOutComplexity"
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 49d7fe11139..9331650e1a9 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -77,7 +77,7 @@ import org.apache.kafka.common.security.auth.{KafkaPrincipal, 
KafkaPrincipalSerd
 import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource
 import org.apache.kafka.common.utils.{ProducerIdAndEpoch, SecurityUtils, Utils}
 import org.apache.kafka.common.utils.internals.ImplicitLinkedHashCollection
-import 
org.apache.kafka.coordinator.group.GroupConfig.{CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG,
 CONSUMER_ASSIGNOR_OFFLOAD_ENABLE_CONFIG, 
CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, CONSUMER_SESSION_TIMEOUT_MS_CONFIG, 
SHARE_ASSIGNMENT_INTERVAL_MS_CONFIG, SHARE_ASSIGNOR_OFFLOAD_ENABLE_CONFIG, 
SHARE_AUTO_OFFSET_RESET_CONFIG, SHARE_DELIVERY_COUNT_LIMIT_CONFIG, 
SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, SHARE_ISOLATION_LEVEL_CONFIG, 
SHARE_PARTITION_MAX_RECORD_LOCKS_CONFIG, SHARE_RECORD_LOCK_DURATION_MS_CO [...]
+import 
org.apache.kafka.coordinator.group.GroupConfig.{CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG,
 CONSUMER_ASSIGNOR_OFFLOAD_ENABLE_CONFIG, 
CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, CONSUMER_SESSION_TIMEOUT_MS_CONFIG, 
SHARE_ASSIGNMENT_INTERVAL_MS_CONFIG, SHARE_ASSIGNOR_OFFLOAD_ENABLE_CONFIG, 
SHARE_AUTO_OFFSET_RESET_CONFIG, SHARE_DELIVERY_COUNT_LIMIT_CONFIG, 
SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, SHARE_ISOLATION_LEVEL_CONFIG, 
SHARE_PARTITION_MAX_RECORD_LOCKS_CONFIG, SHARE_RECORD_LOCK_DURATION_MS_CO [...]
 import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig
 import org.apache.kafka.coordinator.group.{GroupConfig, GroupConfigManager, 
GroupCoordinator, GroupCoordinatorConfig}
 import org.apache.kafka.coordinator.group.streams.StreamsGroupHeartbeatResult
@@ -379,9 +379,9 @@ class KafkaApisTest extends Logging {
     cgConfigs.put(STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG, "1000")
     cgConfigs.put(STREAMS_ASSIGNOR_OFFLOAD_ENABLE_CONFIG, "false");
     cgConfigs.put(STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG, 
GroupCoordinatorConfig.STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_DEFAULT.toString)
+    cgConfigs.put(STREAMS_NUM_WARMUP_REPLICAS_CONFIG, 
GroupCoordinatorConfig.STREAMS_GROUP_NUM_WARMUP_REPLICAS_DEFAULT.toString)
     cgConfigs.put(ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG, "")
     cgConfigs.put(ERRORS_DEADLETTERQUEUE_COPY_RECORD_ENABLE_CONFIG, "false")
-
     when(configRepository.groupConfig(consumerGroupId)).thenReturn(cgConfigs)
 
     val describeConfigsRequest = new DescribeConfigsRequest.Builder(new 
DescribeConfigsRequestData()
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
index 0404b299bf9..5ee40332369 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
@@ -105,6 +105,8 @@ public final class GroupConfig extends AbstractConfig {
 
     public static final String STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG = 
"streams.task.offset.interval.ms";
 
+    public static final String STREAMS_NUM_WARMUP_REPLICAS_CONFIG = 
"streams.num.warmup.replicas";
+
     public static final String ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG = 
"errors.deadletterqueue.topic.name";
     public static final String ERRORS_DEADLETTERQUEUE_TOPIC_NAME_DEFAULT = "";
     public static final String ERRORS_DEADLETTERQUEUE_TOPIC_NAME_DOC = "The 
name of the topic to be used as the dead-letter queue (DLQ) topic for this 
share group. If blank (the default), the group does not have a DLQ topic.";
@@ -151,6 +153,8 @@ public final class GroupConfig extends AbstractConfig {
 
     private final Optional<Integer> streamsTaskOffsetIntervalMs;
 
+    private final Optional<Integer> streamsNumWarmupReplicas;
+
     private final Optional<IsolationLevel> shareIsolationLevel;
 
     private final Optional<Boolean> shareRenewAcknowledgeEnable;
@@ -282,6 +286,12 @@ public final class GroupConfig extends AbstractConfig {
             atLeast(1),
             MEDIUM,
             GroupCoordinatorConfig.STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_DOC)
+        .define(STREAMS_NUM_WARMUP_REPLICAS_CONFIG,
+            INT,
+            GroupCoordinatorConfig.STREAMS_GROUP_NUM_WARMUP_REPLICAS_DEFAULT,
+            atLeast(0),
+            MEDIUM,
+            GroupCoordinatorConfig.STREAMS_GROUP_NUM_WARMUP_REPLICAS_DOC)
 
         // DLQ configurations (KIP-1191)
         .define(ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG,
@@ -326,6 +336,7 @@ public final class GroupConfig extends AbstractConfig {
         Map.entry(STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG, 
Optional.of(GroupCoordinatorConfig.STREAMS_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG)),
         Map.entry(STREAMS_ASSIGNOR_OFFLOAD_ENABLE_CONFIG, 
Optional.of(GroupCoordinatorConfig.STREAMS_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG)),
         Map.entry(STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG, 
Optional.of(GroupCoordinatorConfig.STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_CONFIG)),
+        Map.entry(STREAMS_NUM_WARMUP_REPLICAS_CONFIG, 
Optional.of(GroupCoordinatorConfig.STREAMS_GROUP_NUM_WARMUP_REPLICAS_CONFIG)),
 
         // DLQ configs
         Map.entry(ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG, Optional.empty()),
@@ -362,6 +373,7 @@ public final class GroupConfig extends AbstractConfig {
         this.streamsAssignmentIntervalMs = 
optionalInt(STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG);
         this.streamsAssignorOffloadEnable = 
optionalBoolean(STREAMS_ASSIGNOR_OFFLOAD_ENABLE_CONFIG);
         this.streamsTaskOffsetIntervalMs = 
optionalInt(STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG);
+        this.streamsNumWarmupReplicas = 
optionalInt(STREAMS_NUM_WARMUP_REPLICAS_CONFIG);
         this.shareIsolationLevel = optionalString(SHARE_ISOLATION_LEVEL_CONFIG)
             .map(s -> IsolationLevel.valueOf(s.toUpperCase(Locale.ROOT)));
         this.shareRenewAcknowledgeEnable = 
optionalBoolean(SHARE_RENEW_ACKNOWLEDGE_ENABLE_CONFIG);
@@ -542,6 +554,12 @@ public final class GroupConfig extends AbstractConfig {
             groupCoordinatorConfig.streamsGroupMinTaskOffsetIntervalMs(),
             
GroupCoordinatorConfig.STREAMS_GROUP_MIN_TASK_OFFSET_INTERVAL_MS_CONFIG
         );
+        validateIntMax(
+            parsed,
+            STREAMS_NUM_WARMUP_REPLICAS_CONFIG,
+            groupCoordinatorConfig.streamsGroupMaxWarmupReplicas(),
+            GroupCoordinatorConfig.STREAMS_GROUP_MAX_WARMUP_REPLICAS_CONFIG
+        );
 
         // Cross-field validations: session timeout must be greater than 
heartbeat interval.
         validateSessionExceedsHeartbeat(
@@ -782,6 +800,12 @@ public final class GroupConfig extends AbstractConfig {
             STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG,
             groupCoordinatorConfig.streamsGroupMinTaskOffsetIntervalMs()
         );
+        clampToMax(
+            props,
+            groupId,
+            STREAMS_NUM_WARMUP_REPLICAS_CONFIG,
+            groupCoordinatorConfig.streamsGroupMaxWarmupReplicas()
+        );
 
         // Verify that clamping did not break the session > heartbeat 
invariant.
         checkSessionExceedsHeartbeat(
@@ -1084,6 +1108,13 @@ public final class GroupConfig extends AbstractConfig {
         return streamsTaskOffsetIntervalMs;
     }
 
+    /**
+     * The number of warmup replicas for each task.
+     */
+    public Optional<Integer> streamsNumWarmupReplicas() {
+        return streamsNumWarmupReplicas;
+    }
+
     /**
      * The share group isolation level.
      */
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
index 30e79e4631c..a6cc89d8a75 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
@@ -392,6 +392,14 @@ public class GroupCoordinatorConfig {
     public static final int STREAMS_GROUP_MIN_TASK_OFFSET_INTERVAL_MS_DEFAULT 
= 15000;
     public static final String STREAMS_GROUP_MIN_TASK_OFFSET_INTERVAL_MS_DOC = 
"The minimum allowed value for the group-level configuration of " + 
GroupConfig.STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG;
 
+    public static final String STREAMS_GROUP_NUM_WARMUP_REPLICAS_CONFIG = 
"group.streams.num.warmup.replicas";
+    public static final int STREAMS_GROUP_NUM_WARMUP_REPLICAS_DEFAULT = 2;
+    public static final String STREAMS_GROUP_NUM_WARMUP_REPLICAS_DOC = "The 
maximum number of warmup task replicas.";
+
+    public static final String STREAMS_GROUP_MAX_WARMUP_REPLICAS_CONFIG = 
"group.streams.max.warmup.replicas";
+    public static final int STREAMS_GROUP_MAX_WARMUP_REPLICAS_DEFAULT = 20;
+    public static final String STREAMS_GROUP_MAX_WARMUP_REPLICAS_DOC = "The 
maximum allowed value for the group-level configuration of " + 
GroupConfig.STREAMS_NUM_WARMUP_REPLICAS_CONFIG;
+
     public static final Set<String> RECONFIGURABLE_CONFIGS = Set.of(
         CACHED_BUFFER_MAX_BYTES_CONFIG,
         CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG,
@@ -482,7 +490,9 @@ public class GroupCoordinatorConfig {
         .define(STREAMS_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_CONFIG, INT, 
STREAMS_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_DEFAULT, atLeast(0), MEDIUM, 
STREAMS_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_DOC)
         .define(STREAMS_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG, BOOLEAN, 
STREAMS_GROUP_ASSIGNOR_OFFLOAD_ENABLE_DEFAULT, MEDIUM, 
STREAMS_GROUP_ASSIGNOR_OFFLOAD_ENABLE_DOC)
         .define(STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_CONFIG, INT, 
STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM, 
STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_DOC)
-        .define(STREAMS_GROUP_MIN_TASK_OFFSET_INTERVAL_MS_CONFIG, INT, 
STREAMS_GROUP_MIN_TASK_OFFSET_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM, 
STREAMS_GROUP_MIN_TASK_OFFSET_INTERVAL_MS_DOC);
+        .define(STREAMS_GROUP_MIN_TASK_OFFSET_INTERVAL_MS_CONFIG, INT, 
STREAMS_GROUP_MIN_TASK_OFFSET_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM, 
STREAMS_GROUP_MIN_TASK_OFFSET_INTERVAL_MS_DOC)
+        .define(STREAMS_GROUP_NUM_WARMUP_REPLICAS_CONFIG, INT, 
STREAMS_GROUP_NUM_WARMUP_REPLICAS_DEFAULT, atLeast(0), MEDIUM, 
STREAMS_GROUP_NUM_WARMUP_REPLICAS_DOC)
+        .define(STREAMS_GROUP_MAX_WARMUP_REPLICAS_CONFIG, INT, 
STREAMS_GROUP_MAX_WARMUP_REPLICAS_DEFAULT, atLeast(0), MEDIUM, 
STREAMS_GROUP_MAX_WARMUP_REPLICAS_DOC);
 
 
     /**
@@ -548,6 +558,8 @@ public class GroupCoordinatorConfig {
     private final int streamsGroupMaxAssignmentIntervalMs;
     private final int streamsGroupTaskOffsetIntervalMs;
     private final int streamsGroupMinTaskOffsetIntervalMs;
+    private final int streamsGroupNumWarmupReplicas;
+    private final int streamsGroupMaxWarmupReplicas;
 
     private final AbstractConfig config;
 
@@ -615,6 +627,8 @@ public class GroupCoordinatorConfig {
         this.streamsGroupMaxAssignmentIntervalMs = 
config.getInt(GroupCoordinatorConfig.STREAMS_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_CONFIG);
         this.streamsGroupTaskOffsetIntervalMs = 
config.getInt(GroupCoordinatorConfig.STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_CONFIG);
         this.streamsGroupMinTaskOffsetIntervalMs = 
config.getInt(GroupCoordinatorConfig.STREAMS_GROUP_MIN_TASK_OFFSET_INTERVAL_MS_CONFIG);
+        this.streamsGroupNumWarmupReplicas = 
config.getInt(GroupCoordinatorConfig.STREAMS_GROUP_NUM_WARMUP_REPLICAS_CONFIG);
+        this.streamsGroupMaxWarmupReplicas = 
config.getInt(GroupCoordinatorConfig.STREAMS_GROUP_MAX_WARMUP_REPLICAS_CONFIG);
         this.config = config;
 
         // New group coordinator configs validation.
@@ -714,9 +728,10 @@ public class GroupCoordinatorConfig {
 
         require(streamsGroupNumStandbyReplicas <= 
streamsGroupMaxStandbyReplicas,
             String.format("%s must be less than or equal to %s", 
STREAMS_GROUP_NUM_STANDBY_REPLICAS_CONFIG, 
STREAMS_GROUP_MAX_STANDBY_REPLICAS_CONFIG));
-
         require(streamsGroupTaskOffsetIntervalMs >= 
streamsGroupMinTaskOffsetIntervalMs,
             String.format("%s must be greater than or equal to %s", 
STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_CONFIG, 
STREAMS_GROUP_MIN_TASK_OFFSET_INTERVAL_MS_CONFIG));
+        require(streamsGroupNumWarmupReplicas <= streamsGroupMaxWarmupReplicas,
+            String.format("%s must be less than or equal to %s", 
STREAMS_GROUP_NUM_WARMUP_REPLICAS_CONFIG, 
STREAMS_GROUP_MAX_WARMUP_REPLICAS_CONFIG));
 
     }
 
@@ -1334,4 +1349,18 @@ public class GroupCoordinatorConfig {
     public int streamsGroupMinTaskOffsetIntervalMs() {
         return streamsGroupMinTaskOffsetIntervalMs;
     }
+
+    /**
+     * The maximum number of warmup replicas for streams groups.
+     */
+    public int streamsGroupNumWarmupReplicas() {
+        return streamsGroupNumWarmupReplicas;
+    }
+
+    /**
+     * The maximum allowed number of warmup replicas to be configured for 
streams groups
+     */
+    public int streamsGroupMaxWarmupReplicas() {
+        return streamsGroupMaxWarmupReplicas;
+    }
 }
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
index a79ec3ac1fc..83ef07ac496 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
@@ -50,6 +50,7 @@ import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.STREAMS_
 import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DEFAULT;
 import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.STREAMS_GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT;
 import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.STREAMS_GROUP_MAX_STANDBY_REPLICAS_DEFAULT;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.STREAMS_GROUP_MAX_WARMUP_REPLICAS_DEFAULT;
 import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.STREAMS_GROUP_MIN_HEARTBEAT_INTERVAL_MS_DEFAULT;
 import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_DEFAULT;
 import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.STREAMS_GROUP_MIN_TASK_OFFSET_INTERVAL_MS_DEFAULT;
@@ -127,6 +128,8 @@ public class GroupConfigTest {
                 assertPropertyInvalid(name, "not_a_boolean");
             } else if 
(GroupConfig.STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG.equals(name)) {
                 assertPropertyInvalid(name, "not_a_number", "1.0");
+            } else if 
(GroupConfig.STREAMS_NUM_WARMUP_REPLICAS_CONFIG.equals(name)) {
+                assertPropertyInvalid(name, "not_a_number", "1.0");
             } else if 
(GroupConfig.ERRORS_DEADLETTERQUEUE_COPY_RECORD_ENABLE_CONFIG.equals(name)) {
                 assertPropertyInvalid(name, "not_a_boolean");
             } else if 
(!GroupConfig.ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG.equals(name)) {
@@ -322,6 +325,11 @@ public class GroupConfigTest {
         doTestInvalidProps(props, InvalidConfigurationException.class);
         props = createValidGroupConfig();
 
+        // Check for invalid streamsNumWarmupReplicas, > MAX
+        props.put(GroupConfig.STREAMS_NUM_WARMUP_REPLICAS_CONFIG, "50");
+        doTestInvalidProps(props, InvalidConfigurationException.class);
+        props = createValidGroupConfig();
+
         // Check for invalid shareIsolationLevel.
         props.put(GroupConfig.SHARE_ISOLATION_LEVEL_CONFIG, "read_commit");
         doTestInvalidProps(props, ConfigException.class);
@@ -368,6 +376,7 @@ public class GroupConfigTest {
         defaultValue.put(GroupConfig.STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG, 
"1250");
         defaultValue.put(GroupConfig.STREAMS_ASSIGNOR_OFFLOAD_ENABLE_CONFIG, 
"false");
         defaultValue.put(GroupConfig.STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG, 
"30000");
+        defaultValue.put(GroupConfig.STREAMS_NUM_WARMUP_REPLICAS_CONFIG, "5");
         defaultValue.put(GroupConfig.SHARE_RENEW_ACKNOWLEDGE_ENABLE_CONFIG, 
"true");
 
         Properties props = new Properties();
@@ -394,6 +403,7 @@ public class GroupConfigTest {
         assertEquals(1250, 
config.getInt(GroupConfig.STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG));
         assertEquals(false, 
config.getBoolean(GroupConfig.STREAMS_ASSIGNOR_OFFLOAD_ENABLE_CONFIG));
         assertEquals(30000, 
config.getInt(GroupConfig.STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG));
+        assertEquals(5, 
config.getInt(GroupConfig.STREAMS_NUM_WARMUP_REPLICAS_CONFIG));
         assertEquals(true, 
config.getBoolean(GroupConfig.SHARE_RENEW_ACKNOWLEDGE_ENABLE_CONFIG));
     }
 
@@ -640,6 +650,10 @@ public class GroupConfigTest {
             Arguments.of(
                 GroupConfig.STREAMS_NUM_STANDBY_REPLICAS_CONFIG,
                 5, STREAMS_GROUP_MAX_STANDBY_REPLICAS_DEFAULT
+            ),
+            Arguments.of(
+                GroupConfig.STREAMS_NUM_WARMUP_REPLICAS_CONFIG,
+                25, STREAMS_GROUP_MAX_WARMUP_REPLICAS_DEFAULT
             )
         );
     }
@@ -766,6 +780,7 @@ public class GroupConfigTest {
         props.put(GroupConfig.STREAMS_NUM_STANDBY_REPLICAS_CONFIG, "1");
         props.put(GroupConfig.STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG, 
"3000");
         props.put(GroupConfig.STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG, "45000");
+        props.put(GroupConfig.STREAMS_NUM_WARMUP_REPLICAS_CONFIG, "3");
         props.put(GroupConfig.SHARE_RENEW_ACKNOWLEDGE_ENABLE_CONFIG, "true");
         return props;
     }
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
index f9dc304e16d..e0327148e4a 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
@@ -569,6 +569,35 @@ public class GroupCoordinatorConfigTest {
         configs.clear();
         
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_CONFIG,
 GroupCoordinatorConfig.STREAMS_GROUP_MIN_TASK_OFFSET_INTERVAL_MS_DEFAULT);
         createConfig(configs);
+
+
+        // group.streams.num.warmup.replicas
+
+        // cannot be negative
+        configs.clear();
+        
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_NUM_WARMUP_REPLICAS_CONFIG, 
-1);
+        assertEquals("Invalid value -1 for configuration 
group.streams.num.warmup.replicas: Value must be at least 0",
+            assertThrows(ConfigException.class, () -> 
createConfig(configs)).getMessage());
+
+        // can be MAX
+        configs.clear();
+        
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_NUM_WARMUP_REPLICAS_CONFIG, 
GroupCoordinatorConfig.STREAMS_GROUP_MAX_WARMUP_REPLICAS_DEFAULT);
+        createConfig(configs);
+
+        // cannot be larger than MAX
+        configs.clear();
+        
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_NUM_WARMUP_REPLICAS_CONFIG, 
GroupCoordinatorConfig.STREAMS_GROUP_MAX_WARMUP_REPLICAS_DEFAULT + 1);
+        assertEquals("group.streams.num.warmup.replicas must be less than or 
equal to group.streams.max.warmup.replicas",
+            assertThrows(IllegalArgumentException.class, () -> 
createConfig(configs)).getMessage());
+
+
+        // group.streams.max.warmup.replicas
+
+        // cannot be negative
+        configs.clear();
+        
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_MAX_WARMUP_REPLICAS_CONFIG, 
-1);
+        assertEquals("Invalid value -1 for configuration 
group.streams.max.warmup.replicas: Value must be at least 0",
+            assertThrows(ConfigException.class, () -> 
createConfig(configs)).getMessage());
     }
 
     @Test
@@ -724,6 +753,40 @@ public class GroupCoordinatorConfigTest {
         assertEquals(20000, config.streamsGroupMinTaskOffsetIntervalMs());
     }
 
+    @Test
+    public void testStreamsGroupNumWarmupReplicasDefaultValue() {
+        Map<String, Object> configs = new HashMap<>();
+        GroupCoordinatorConfig config = createConfig(configs);
+        assertEquals(2, config.streamsGroupNumWarmupReplicas());
+        
assertEquals(GroupCoordinatorConfig.STREAMS_GROUP_NUM_WARMUP_REPLICAS_DEFAULT,
+            config.streamsGroupNumWarmupReplicas());
+    }
+
+    @Test
+    public void testStreamsGroupNumWarmupReplicasCustomValue() {
+        Map<String, Object> configs = new HashMap<>();
+        
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_NUM_WARMUP_REPLICAS_CONFIG, 5);
+        GroupCoordinatorConfig config = createConfig(configs);
+        assertEquals(5, config.streamsGroupNumWarmupReplicas());
+    }
+
+    @Test
+    public void testStreamsGroupMaxWarmupReplicasDefaultValue() {
+        Map<String, Object> configs = new HashMap<>();
+        GroupCoordinatorConfig config = createConfig(configs);
+        assertEquals(20, config.streamsGroupMaxWarmupReplicas());
+        
assertEquals(GroupCoordinatorConfig.STREAMS_GROUP_MAX_WARMUP_REPLICAS_DEFAULT,
+            config.streamsGroupMaxWarmupReplicas());
+    }
+
+    @Test
+    public void testStreamsGroupMaxWarmupReplicasCustomValue() {
+        Map<String, Object> configs = new HashMap<>();
+        
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_MAX_WARMUP_REPLICAS_CONFIG, 
30);
+        GroupCoordinatorConfig config = createConfig(configs);
+        assertEquals(30, config.streamsGroupMaxWarmupReplicas());
+    }
+
     @Test
     public void testDLQAutoCreateTopicsEnableDefaultValue() {
         Map<String, Object> configs = new HashMap<>();
diff --git 
a/tools/src/test/java/org/apache/kafka/tools/ConfigCommandIntegrationTest.java 
b/tools/src/test/java/org/apache/kafka/tools/ConfigCommandIntegrationTest.java
index d2c49088f8d..c4368076690 100644
--- 
a/tools/src/test/java/org/apache/kafka/tools/ConfigCommandIntegrationTest.java
+++ 
b/tools/src/test/java/org/apache/kafka/tools/ConfigCommandIntegrationTest.java
@@ -436,6 +436,40 @@ public class ConfigCommandIntegrationTest {
 
     }
 
+    @ClusterTest
+    public void testAlterStreamsGroupNumWarmupReplicas() {
+        // Verify the initial config
+        Stream<String> command = Stream.concat(quorumArgs(), Stream.of(
+            "--entity-type", "groups",
+            "--entity-name", "group",
+            "--describe", "--all"));
+        String message = captureStandardOut(run(command));
+        assertTrue(message.contains("streams.num.warmup.replicas=2"));
+
+        // Alter num warmup replicas
+        command = Stream.concat(quorumArgs(), Stream.of(
+            "--entity-type", "groups",
+            "--entity-name", "group",
+            "--alter", "--add-config", "streams.num.warmup.replicas=5"));
+        message = captureStandardOut(run(command));
+        assertEquals("Completed updating config for group group.", message);
+
+        // Verify the updated config
+        command = Stream.concat(quorumArgs(), Stream.of(
+            "--entity-type", "groups",
+            "--describe"));
+        message = captureStandardOut(run(command));
+        assertTrue(message.contains("streams.num.warmup.replicas=5"));
+
+        // Should fail to set above max
+        command = Stream.concat(quorumArgs(), Stream.of(
+            "--entity-type", "groups",
+            "--entity-name", "group",
+            "--alter", "--add-config", "streams.num.warmup.replicas=25"));
+        message = captureStandardErr(run(command));
+        assertTrue(message.contains("streams.num.warmup.replicas must be less 
than or equal to group.streams.max.warmup.replicas"));
+    }
+
     private void verifyGroupConfigUpdate(List<String> alterOpts) throws 
Exception {
         try (Admin client = cluster.admin()) {
             // Add config

Reply via email to