This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b43d70885db KAFKA-18652: Add `task.offset.interval.ms` config (#21737)
b43d70885db is described below

commit b43d70885dbf293f0ce79db20372ce470c9f69cb
Author: Matthias J. Sax <[email protected]>
AuthorDate: Tue Mar 17 15:47:35 2026 -0700

    KAFKA-18652: Add `task.offset.interval.ms` config (#21737)
    
    KIP-1071 adds new broker side configs:
    - task.offset.interval.ms
    - min.task.offset.interval.ms
    
    This PR adds both configs to the broker, and updates the
    StreamsHeartbeatResponse to provide `task.offset.interval.ms`  to the
    Kafka Streams client.
    
    Reviewers: Andrew Schofield <[email protected]>, Lucas Brutschy
    <[email protected]>
---
 .../StreamsGroupHeartbeatRequestManager.java       |   1 +
 .../consumer/internals/StreamsRebalanceData.java   |  12 +
 .../scala/unit/kafka/server/KafkaApisTest.scala    |   3 +-
 .../scala/unit/kafka/server/KafkaConfigTest.scala  |   3 +
 .../server/StreamsGroupHeartbeatRequestTest.scala  |  25 +-
 .../kafka/coordinator/group/GroupConfig.java       |  54 +++-
 .../coordinator/group/GroupCoordinatorConfig.java  |  46 +++-
 .../coordinator/group/GroupMetadataManager.java    |  17 +-
 .../coordinator/group/streams/StreamsGroup.java    |   3 +-
 .../kafka/coordinator/group/GroupConfigTest.java   |  38 +++
 .../group/GroupCoordinatorConfigTest.java          | 254 ++++++++++++++++-
 .../group/GroupMetadataManagerTest.java            | 302 +++++++++++++++------
 .../streams/test/MockProcessorContextAPITest.java  |   1 +
 .../kafka/tools/ConfigCommandIntegrationTest.java  | 113 ++++++++
 14 files changed, 778 insertions(+), 94 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
index 6907e113f65..d4f1b3dace8 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
@@ -531,6 +531,7 @@ public class StreamsGroupHeartbeatRequestManager implements 
RequestManager {
         heartbeatRequestState.onSuccessfulAttempt(currentTimeMs);
         
heartbeatState.setEndpointInformationEpoch(data.endpointInformationEpoch());
         
streamsRebalanceData.setHeartbeatIntervalMs(data.heartbeatIntervalMs());
+        
streamsRebalanceData.setTaskOffsetIntervalMs(data.taskOffsetIntervalMs());
 
         if (data.partitionsByUserEndpoint() != null) {
             streamsRebalanceData.setPartitionsByHost(convertHostInfoMap(data));
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceData.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceData.java
index a08472b6752..383900355dd 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceData.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceData.java
@@ -345,6 +345,8 @@ public class StreamsRebalanceData {
 
     private final AtomicInteger heartbeatIntervalMs = new AtomicInteger(-1);
 
+    private final AtomicInteger taskOffsetIntervalMs = new AtomicInteger(-1);
+
     public StreamsRebalanceData(final UUID processId,
                                 final Optional<HostInfo> endpoint,
                                 final Optional<String> rackId,
@@ -427,4 +429,14 @@ public class StreamsRebalanceData {
         return heartbeatIntervalMs.get();
     }
 
+    /** Updated whenever a heartbeat response is received from the broker. */
+    public void setTaskOffsetIntervalMs(final int taskOffsetIntervalMs) {
+        this.taskOffsetIntervalMs.set(taskOffsetIntervalMs);
+    }
+
+    /** Returns the task offset interval in milliseconds, or -1 if not yet 
set. */
+    public int taskOffsetIntervalMs() {
+        return taskOffsetIntervalMs.get();
+    }
+
 }
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index e5bbc337ac9..069fe8645a7 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -76,7 +76,7 @@ import org.apache.kafka.common.resource.{PatternType, 
Resource, ResourcePattern,
 import org.apache.kafka.common.security.auth.{KafkaPrincipal, 
KafkaPrincipalSerde, SecurityProtocol}
 import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource
 import org.apache.kafka.common.utils.{ImplicitLinkedHashCollection, 
ProducerIdAndEpoch, SecurityUtils, Utils}
-import 
org.apache.kafka.coordinator.group.GroupConfig.{CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG,
 CONSUMER_SESSION_TIMEOUT_MS_CONFIG, SHARE_AUTO_OFFSET_RESET_CONFIG, 
SHARE_DELIVERY_COUNT_LIMIT_CONFIG, SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, 
SHARE_ISOLATION_LEVEL_CONFIG, SHARE_PARTITION_MAX_RECORD_LOCKS_CONFIG, 
SHARE_RECORD_LOCK_DURATION_MS_CONFIG, SHARE_RENEW_ACKNOWLEDGE_ENABLE_CONFIG, 
SHARE_SESSION_TIMEOUT_MS_CONFIG, STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG, 
STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFI [...]
+import 
org.apache.kafka.coordinator.group.GroupConfig.{CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG,
 CONSUMER_SESSION_TIMEOUT_MS_CONFIG, SHARE_AUTO_OFFSET_RESET_CONFIG, 
SHARE_DELIVERY_COUNT_LIMIT_CONFIG, SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, 
SHARE_ISOLATION_LEVEL_CONFIG, SHARE_PARTITION_MAX_RECORD_LOCKS_CONFIG, 
SHARE_RECORD_LOCK_DURATION_MS_CONFIG, SHARE_RENEW_ACKNOWLEDGE_ENABLE_CONFIG, 
SHARE_SESSION_TIMEOUT_MS_CONFIG, STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG, 
STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFI [...]
 import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig
 import org.apache.kafka.coordinator.group.{GroupConfig, GroupConfigManager, 
GroupCoordinator, GroupCoordinatorConfig}
 import org.apache.kafka.coordinator.group.streams.StreamsGroupHeartbeatResult
@@ -368,6 +368,7 @@ class KafkaApisTest extends Logging {
     cgConfigs.put(STREAMS_SESSION_TIMEOUT_MS_CONFIG, 
GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_DEFAULT.toString)
     cgConfigs.put(STREAMS_NUM_STANDBY_REPLICAS_CONFIG, 
GroupCoordinatorConfig.STREAMS_GROUP_NUM_STANDBY_REPLICAS_DEFAULT.toString)
     cgConfigs.put(STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG, 
GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_DEFAULT.toString)
+    cgConfigs.put(STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG, 
GroupCoordinatorConfig.STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_DEFAULT.toString)
 
     when(configRepository.groupConfig(consumerGroupId)).thenReturn(cgConfigs)
 
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index b7465daf65c..bb493c89811 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -1086,6 +1086,9 @@ class KafkaConfigTest {
         case 
GroupCoordinatorConfig.STREAMS_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG => 
assertPropertyInvalid(baseProperties, name, "not_a_number", -1)
         case 
GroupCoordinatorConfig.STREAMS_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG => 
assertPropertyInvalid(baseProperties, name, "not_a_number", -1)
         case 
GroupCoordinatorConfig.STREAMS_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_CONFIG => 
assertPropertyInvalid(baseProperties, name, "not_a_number", -1)
+        case 
GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG => 
assertPropertyInvalid(baseProperties, name, "not_a_number", -1)
+        case 
GroupCoordinatorConfig.STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_CONFIG => 
assertPropertyInvalid(baseProperties, name, "not_a_number", -1)
+        case 
GroupCoordinatorConfig.STREAMS_GROUP_MIN_TASK_OFFSET_INTERVAL_MS_CONFIG => 
assertPropertyInvalid(baseProperties, name, "not_a_number", -1)
 
         /** Share coordinator configs */
         case ShareCoordinatorConfig.APPEND_LINGER_MS_CONFIG => 
assertPropertyInvalid(baseProperties, name, "not_a_number", -2, -0.5)
diff --git 
a/core/src/test/scala/unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala
index bb4f2d220d8..3f6b340f48c 100644
--- 
a/core/src/test/scala/unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala
+++ 
b/core/src/test/scala/unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala
@@ -35,7 +35,7 @@ import scala.jdk.CollectionConverters._
   serverProperties = Array(
     new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
     new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
-    new ClusterConfigProperty(key = 
"group.streams.initial.rebalance.delay.ms", value = "0")
+    new ClusterConfigProperty(key = 
GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, value = 
"0")
   )
 )
 class StreamsGroupHeartbeatRequestTest(cluster: ClusterInstance) extends 
GroupCoordinatorBaseRequestTest(cluster) {
@@ -686,6 +686,10 @@ class StreamsGroupHeartbeatRequestTest(cluster: 
ClusterInstance) extends GroupCo
       assertEquals(0, streamsGroupHeartbeatResponse1.standbyTasks().size(), 
"Member 1 should have no standby tasks initially")
       assertEquals(0, streamsGroupHeartbeatResponse2.standbyTasks().size(), 
"Member 2 should have no standby tasks initially")
 
+      // Verify both members picked up `task.offset.interval.ms`
+      assertEquals(60_000, 
streamsGroupHeartbeatResponse1.taskOffsetIntervalMs(), "Member 1 should pickup 
task.offset.interval.ms initially")
+      assertEquals(60_000, 
streamsGroupHeartbeatResponse2.taskOffsetIntervalMs(), "Member 2 should pickup 
task.offset.interval.ms initially")
+
       // Both members continue to send heartbeats with their assigned tasks
       TestUtils.waitUntilTrue(() => {
         streamsGroupHeartbeatResponse1 = streamsGroupHeartbeat(
@@ -736,6 +740,10 @@ class StreamsGroupHeartbeatRequestTest(cluster: 
ClusterInstance) extends GroupCo
       }
       assertEquals(0, member2StandbyTasksSize, "Member 2 should have no 
standby tasks in this configuration")
 
+      // Verify both members picked up `task.offset.interval.ms`
+      assertEquals(60_000, 
streamsGroupHeartbeatResponse1.taskOffsetIntervalMs(), "Member 1 should pickup 
task.offset.interval.ms initially")
+      assertEquals(60_000, 
streamsGroupHeartbeatResponse2.taskOffsetIntervalMs(), "Member 2 should pickup 
task.offset.interval.ms initially")
+
       // Change streams.num.standby.replicas = 1
       val groupConfigResource = new ConfigResource(ConfigResource.Type.GROUP, 
groupId)
       val alterConfigOp = new AlterConfigOp(
@@ -746,6 +754,16 @@ class StreamsGroupHeartbeatRequestTest(cluster: 
ClusterInstance) extends GroupCo
       val options = new org.apache.kafka.clients.admin.AlterConfigsOptions()
       admin.incrementalAlterConfigs(configChanges, options).all().get()
 
+      // Change streams.task.offset.interval.ms = 45000
+      val groupConfigResource2 = new ConfigResource(ConfigResource.Type.GROUP, 
groupId)
+      val alterConfigOp2 = new AlterConfigOp(
+        new ConfigEntry("streams.task.offset.interval.ms", "45000"),
+        AlterConfigOp.OpType.SET
+      )
+      val configChanges2 = Map(groupConfigResource2 -> 
List(alterConfigOp2).asJavaCollection).asJava
+      val options2 = new org.apache.kafka.clients.admin.AlterConfigsOptions()
+      admin.incrementalAlterConfigs(configChanges2, options2).all().get()
+
       // Send heartbeats to trigger rebalance after config change
       TestUtils.waitUntilTrue(() => {
         streamsGroupHeartbeatResponse1 = streamsGroupHeartbeat(
@@ -790,6 +808,10 @@ class StreamsGroupHeartbeatRequestTest(cluster: 
ClusterInstance) extends GroupCo
       val totalStandbyTasks = member1StandbyTasksNum + member2StandbyTasksNum
       assertEquals(totalActiveTasks, totalStandbyTasks, "Each active task 
should have one standby task")
 
+      // Verify both members picked up change of `task.offset.interval.ms`
+      assertEquals(45_000, 
streamsGroupHeartbeatResponse1.taskOffsetIntervalMs(), "Member 1 should pickup 
task.offset.interval.ms initially")
+      assertEquals(45_000, 
streamsGroupHeartbeatResponse2.taskOffsetIntervalMs(), "Member 2 should pickup 
task.offset.interval.ms initially")
+
     } finally {
       admin.close()
     }
@@ -1074,6 +1096,7 @@ class StreamsGroupHeartbeatRequestTest(cluster: 
ClusterInstance) extends GroupCo
         .setActiveTasks(expectedActiveTasks)
         .setStandbyTasks(List.empty.asJava)
         .setWarmupTasks(List.empty.asJava)
+        .setTaskOffsetIntervalMs(60_000)
 
       assertEquals(expectedRejoinResponse, rejoinResponse)
     } finally {
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
index db2cc128284..3d6ca00af45 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
@@ -92,6 +92,8 @@ public final class GroupConfig extends AbstractConfig {
 
     public static final String STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG = 
"streams.initial.rebalance.delay.ms";
 
+    public static final String STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG = 
"streams.task.offset.interval.ms";
+
     public final int consumerSessionTimeoutMs;
 
     public final int consumerHeartbeatIntervalMs;
@@ -116,6 +118,8 @@ public final class GroupConfig extends AbstractConfig {
 
     public final int streamsInitialRebalanceDelayMs;
 
+    public final int streamsTaskOffsetIntervalMs;
+
     public final String shareIsolationLevel;
 
     public final boolean shareRenewAcknowledgeEnable;
@@ -203,7 +207,13 @@ public final class GroupConfig extends AbstractConfig {
             
GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_DEFAULT,
             atLeast(0),
             MEDIUM,
-            
GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_DOC);
+            
GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_DOC)
+        .define(STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG,
+            INT,
+            
GroupCoordinatorConfig.STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_DEFAULT,
+            atLeast(1),
+            MEDIUM,
+            GroupCoordinatorConfig.STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_DOC);
 
     public GroupConfig(Map<?, ?> props) {
         super(CONFIG, props, false);
@@ -219,6 +229,7 @@ public final class GroupConfig extends AbstractConfig {
         this.streamsHeartbeatIntervalMs = 
getInt(STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG);
         this.streamsNumStandbyReplicas = 
getInt(STREAMS_NUM_STANDBY_REPLICAS_CONFIG);
         this.streamsInitialRebalanceDelayMs = 
getInt(STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG);
+        this.streamsTaskOffsetIntervalMs = 
getInt(STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG);
         this.shareIsolationLevel = getString(SHARE_ISOLATION_LEVEL_CONFIG);
         this.shareRenewAcknowledgeEnable = 
getBoolean(SHARE_RENEW_ACKNOWLEDGE_ENABLE_CONFIG);
     }
@@ -262,6 +273,7 @@ public final class GroupConfig extends AbstractConfig {
         int streamsSessionTimeoutMs = (Integer) 
valueMaps.get(STREAMS_SESSION_TIMEOUT_MS_CONFIG);
         int streamsHeartbeatIntervalMs = (Integer) 
valueMaps.get(STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG);
         int streamsNumStandbyReplicas = (Integer) 
valueMaps.get(STREAMS_NUM_STANDBY_REPLICAS_CONFIG);
+        int streamsTaskOffsetIntervalMs = (Integer) 
valueMaps.get(STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG);
         if (consumerHeartbeatInterval < 
groupCoordinatorConfig.consumerGroupMinHeartbeatIntervalMs()) {
             throw new 
InvalidConfigurationException(CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG + " must be 
greater than or equal to " +
                 
GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG);
@@ -338,6 +350,10 @@ public final class GroupConfig extends AbstractConfig {
             throw new 
InvalidConfigurationException(STREAMS_NUM_STANDBY_REPLICAS_CONFIG + " must be 
less than or equal to " +
                 
GroupCoordinatorConfig.STREAMS_GROUP_MAX_STANDBY_REPLICAS_CONFIG);
         }
+        if (streamsTaskOffsetIntervalMs < 
groupCoordinatorConfig.streamsGroupMinTaskOffsetIntervalMs()) {
+            throw new 
InvalidConfigurationException(STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG + " must 
be greater than or equal to " +
+                
GroupCoordinatorConfig.STREAMS_GROUP_MIN_TASK_OFFSET_INTERVAL_MS_CONFIG);
+        }
         if (consumerSessionTimeout <= consumerHeartbeatInterval) {
             throw new 
InvalidConfigurationException(CONSUMER_SESSION_TIMEOUT_MS_CONFIG + " must be 
greater than " +
                 CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG);
@@ -429,6 +445,8 @@ public final class GroupConfig extends AbstractConfig {
             groupCoordinatorConfig.streamsGroupMaxHeartbeatIntervalMs());
         clampToMax(props, groupId, STREAMS_NUM_STANDBY_REPLICAS_CONFIG,
             groupCoordinatorConfig.streamsGroupMaxNumStandbyReplicas());
+        clampToMin(props, groupId, STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG,
+            groupCoordinatorConfig.streamsGroupMinTaskOffsetIntervalMs());
 
         // Verify that clamping did not break the session > heartbeat 
invariant.
         checkSessionExceedsHeartbeat(props, groupId,
@@ -529,6 +547,33 @@ public final class GroupConfig extends AbstractConfig {
         }
     }
 
+    /**
+     * Clamp a config value to at least min. A WARN log is emitted on 
adjustment.
+     * No-op when the key is absent from props.
+     *
+     * @param props   The properties to modify in place.
+     * @param groupId The group id.
+     * @param key     The config key.
+     * @param min     The minimum allowed value (inclusive).
+     */
+    private static void clampToMin(
+        Properties props,
+        String groupId,
+        String key,
+        int min
+    ) {
+        Object rawValue = props.get(key);
+        if (rawValue == null) return;
+
+        int value = Integer.parseInt(rawValue.toString());
+        if (value < min) {
+            log.warn("The group config '{}' for group '{}' has value {} which 
is below the broker's " +
+                    "allowed minimum {}. The effective value will be capped to 
{}.",
+                key, groupId, value, min, min);
+            props.put(key, min);
+        }
+    }
+
     /**
      * Create a group config instance using the given properties and defaults.
      */
@@ -637,6 +682,13 @@ public final class GroupConfig extends AbstractConfig {
         return streamsInitialRebalanceDelayMs;
     }
 
+    /**
+     * The task offset reporting interval.
+     */
+    public int streamsTaskOffsetIntervalMs() {
+        return streamsTaskOffsetIntervalMs;
+    }
+
     /**
      * The share group isolation level.
      */
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
index ff80a5c7ee6..1da3ca21412 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
@@ -375,6 +375,14 @@ public class GroupCoordinatorConfig {
     public static final String STREAMS_GROUP_ASSIGNOR_OFFLOAD_ENABLE_DOC = 
"Whether to offload streams group assignment to a group coordinator background 
thread.";
     public static final boolean STREAMS_GROUP_ASSIGNOR_OFFLOAD_ENABLE_DEFAULT 
= true;
 
+    public static final String STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_CONFIG = 
"group.streams.task.offset.interval.ms";
+    public static final int STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_DEFAULT = 
60000;
+    public static final String STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_DOC = 
"The interval in which the warmup task changelog offsets on a client are 
updated on the broker. The offsets are sent with the next heartbeat after this 
time has passed.";
+
+    public static final String 
STREAMS_GROUP_MIN_TASK_OFFSET_INTERVAL_MS_CONFIG = 
"group.streams.min.task.offset.interval.ms";
+    public static final int STREAMS_GROUP_MIN_TASK_OFFSET_INTERVAL_MS_DEFAULT 
= 15000;
+    public static final String STREAMS_GROUP_MIN_TASK_OFFSET_INTERVAL_MS_DOC = 
"The minimum allowed value for the group-level configuration of " + 
GroupConfig.STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG;
+
     public static final Set<String> RECONFIGURABLE_CONFIGS = Set.of(
         CACHED_BUFFER_MAX_BYTES_CONFIG,
         CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG,
@@ -459,7 +467,9 @@ public class GroupCoordinatorConfig {
         .define(STREAMS_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, INT, 
STREAMS_GROUP_ASSIGNMENT_INTERVAL_MS_DEFAULT, atLeast(0), MEDIUM, 
STREAMS_GROUP_ASSIGNMENT_INTERVAL_MS_DOC)
         .define(STREAMS_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG, INT, 
STREAMS_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_DEFAULT, atLeast(0), MEDIUM, 
STREAMS_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_DOC)
         .define(STREAMS_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_CONFIG, INT, 
STREAMS_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_DEFAULT, atLeast(0), MEDIUM, 
STREAMS_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_DOC)
-        .define(STREAMS_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG, BOOLEAN, 
STREAMS_GROUP_ASSIGNOR_OFFLOAD_ENABLE_DEFAULT, MEDIUM, 
STREAMS_GROUP_ASSIGNOR_OFFLOAD_ENABLE_DOC);
+        .define(STREAMS_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG, BOOLEAN, 
STREAMS_GROUP_ASSIGNOR_OFFLOAD_ENABLE_DEFAULT, MEDIUM, 
STREAMS_GROUP_ASSIGNOR_OFFLOAD_ENABLE_DOC)
+        .define(STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_CONFIG, INT, 
STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM, 
STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_DOC)
+        .define(STREAMS_GROUP_MIN_TASK_OFFSET_INTERVAL_MS_CONFIG, INT, 
STREAMS_GROUP_MIN_TASK_OFFSET_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM, 
STREAMS_GROUP_MIN_TASK_OFFSET_INTERVAL_MS_DOC);
 
 
     /**
@@ -520,6 +530,8 @@ public class GroupCoordinatorConfig {
     private final int streamsGroupInitialRebalanceDelayMs;
     private final int streamsGroupMinAssignmentIntervalMs;
     private final int streamsGroupMaxAssignmentIntervalMs;
+    private final int streamsGroupTaskOffsetIntervalMs;
+    private final int streamsGroupMinTaskOffsetIntervalMs;
 
     private final AbstractConfig config;
 
@@ -582,6 +594,8 @@ public class GroupCoordinatorConfig {
         this.streamsGroupInitialRebalanceDelayMs = 
config.getInt(GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG);
         this.streamsGroupMinAssignmentIntervalMs = 
config.getInt(GroupCoordinatorConfig.STREAMS_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG);
         this.streamsGroupMaxAssignmentIntervalMs = 
config.getInt(GroupCoordinatorConfig.STREAMS_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_CONFIG);
+        this.streamsGroupTaskOffsetIntervalMs = 
config.getInt(GroupCoordinatorConfig.STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_CONFIG);
+        this.streamsGroupMinTaskOffsetIntervalMs = 
config.getInt(GroupCoordinatorConfig.STREAMS_GROUP_MIN_TASK_OFFSET_INTERVAL_MS_CONFIG);
         this.config = config;
 
         // New group coordinator configs validation.
@@ -608,6 +622,7 @@ public class GroupCoordinatorConfig {
         require(consumerGroupAssignmentIntervalMs() <= 
consumerGroupMaxAssignmentIntervalMs,
                 String.format("%s must be less than or equal to %s", 
CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, 
CONSUMER_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_CONFIG));
 
+
         // Share group configs validation.
         require(shareGroupMaxHeartbeatIntervalMs >= 
shareGroupMinHeartbeatIntervalMs,
             String.format("%s must be greater than or equal to %s",
@@ -645,6 +660,7 @@ public class GroupCoordinatorConfig {
             String.format("%s must be less than or equal to %s",
                 SHARE_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, 
SHARE_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_CONFIG));
 
+
         // Streams group configs validation.
         require(streamsGroupMaxHeartbeatIntervalMs >= 
streamsGroupMinHeartbeatIntervalMs,
             String.format("%s must be greater than or equal to %s",
@@ -655,14 +671,14 @@ public class GroupCoordinatorConfig {
         require(streamsGroupHeartbeatIntervalMs <= 
streamsGroupMaxHeartbeatIntervalMs,
             String.format("%s must be less than or equal to %s",
                 STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, 
STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG));
+
         require(streamsGroupMaxSessionTimeoutMs >= 
streamsGroupMinSessionTimeoutMs,
             String.format("%s must be greater than or equal to %s", 
STREAMS_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG, 
STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG));
         require(streamsGroupSessionTimeoutMs >= 
streamsGroupMinSessionTimeoutMs,
             String.format("%s must be greater than or equal to %s", 
STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG, 
STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG));
         require(streamsGroupSessionTimeoutMs <= 
streamsGroupMaxSessionTimeoutMs,
             String.format("%s must be less than or equal to %s", 
STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG, 
STREAMS_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG));
-        require(streamsGroupNumStandbyReplicas <= 
streamsGroupMaxStandbyReplicas,
-            String.format("%s must be less than or equal to %s", 
STREAMS_GROUP_NUM_STANDBY_REPLICAS_CONFIG, 
STREAMS_GROUP_MAX_STANDBY_REPLICAS_CONFIG));
+
         require(streamsGroupHeartbeatIntervalMs < streamsGroupSessionTimeoutMs,
             String.format("%s must be less than %s",
                 STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, 
STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG));
@@ -676,6 +692,13 @@ public class GroupCoordinatorConfig {
         require(streamsGroupAssignmentIntervalMs() <= 
streamsGroupMaxAssignmentIntervalMs,
             String.format("%s must be less than or equal to %s",
                 STREAMS_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, 
STREAMS_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_CONFIG));
+
+        require(streamsGroupNumStandbyReplicas <= 
streamsGroupMaxStandbyReplicas,
+            String.format("%s must be less than or equal to %s", 
STREAMS_GROUP_NUM_STANDBY_REPLICAS_CONFIG, 
STREAMS_GROUP_MAX_STANDBY_REPLICAS_CONFIG));
+
+        require(streamsGroupTaskOffsetIntervalMs >= 
streamsGroupMinTaskOffsetIntervalMs,
+            String.format("%s must be greater than or equal to %s", 
STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_CONFIG, 
STREAMS_GROUP_MIN_TASK_OFFSET_INTERVAL_MS_CONFIG));
+
     }
 
     /**
@@ -874,7 +897,8 @@ public class GroupCoordinatorConfig {
             GroupConfig.STREAMS_SESSION_TIMEOUT_MS_CONFIG, 
streamsGroupSessionTimeoutMs(),
             GroupConfig.STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG, 
streamsGroupHeartbeatIntervalMs(),
             GroupConfig.STREAMS_NUM_STANDBY_REPLICAS_CONFIG, 
streamsGroupNumStandbyReplicas(),
-            GroupConfig.STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG, 
streamsGroupInitialRebalanceDelayMs());
+            GroupConfig.STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG, 
streamsGroupInitialRebalanceDelayMs(),
+            GroupConfig.STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG, 
streamsGroupTaskOffsetIntervalMs());
     }
 
     /**
@@ -1311,4 +1335,18 @@ public class GroupCoordinatorConfig {
     public boolean streamsGroupAssignorOffloadEnable() {
         return 
config.getBoolean(GroupCoordinatorConfig.STREAMS_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG);
     }
+
+    /**
+     * The task offset interval for streams groups.
+     */
+    public int streamsGroupTaskOffsetIntervalMs() {
+        return streamsGroupTaskOffsetIntervalMs;
+    }
+
+    /**
+     * The minimum task offset interval for streams groups.
+     */
+    public int streamsGroupMinTaskOffsetIntervalMs() {
+        return streamsGroupMinTaskOffsetIntervalMs;
+    }
 }
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index 4141e441aee..364890baae9 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -189,6 +189,7 @@ import java.util.Optional;
 import java.util.OptionalInt;
 import java.util.Set;
 import java.util.SortedMap;
+import java.util.TreeMap;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.function.BiFunction;
@@ -2139,7 +2140,8 @@ public class GroupMetadataManager {
         StreamsGroupHeartbeatResponseData response = new 
StreamsGroupHeartbeatResponseData()
             .setMemberId(updatedMember.memberId())
             .setMemberEpoch(updatedMember.memberEpoch())
-            .setHeartbeatIntervalMs(streamsGroupHeartbeatIntervalMs(groupId));
+            .setHeartbeatIntervalMs(streamsGroupHeartbeatIntervalMs(groupId))
+            
.setTaskOffsetIntervalMs(streamsGroupTaskOffsetIntervalMs(groupId));
         // The assignment is only provided in the following cases:
         // 1. The member is joining.
         // 2. The member's assignment has been updated.
@@ -8781,6 +8783,15 @@ public class GroupMetadataManager {
         return config.streamsGroupAssignorOffloadEnable();
     }
 
+    /**
+     * Get the task offset interval of the provided streams group.
+     */
+    private int streamsGroupTaskOffsetIntervalMs(String groupId) {
+        Optional<GroupConfig> groupConfig = 
groupConfigManager.groupConfig(groupId);
+        return groupConfig.map(GroupConfig::streamsTaskOffsetIntervalMs)
+            .orElse(config.streamsGroupTaskOffsetIntervalMs());
+    }
+
     /**
      * Get the initial rebalance delay of the provided streams group.
      */
@@ -8804,7 +8815,9 @@ public class GroupMetadataManager {
         Optional<GroupConfig> groupConfig = 
groupConfigManager.groupConfig(groupId);
         final Integer numStandbyReplicas = 
groupConfig.map(GroupConfig::streamsNumStandbyReplicas)
             .orElse(config.streamsGroupNumStandbyReplicas());
-        return Map.of("num.standby.replicas", numStandbyReplicas.toString());
+        return new TreeMap<>(Map.of(
+            "num.standby.replicas", numStandbyReplicas.toString()
+        ));
     }
 
     /**
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
index 5b6610fe4a0..b1a5b026654 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
@@ -56,6 +56,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
+import java.util.TreeMap;
 
 import static 
org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState.ASSIGNING;
 import static 
org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState.DEAD;
@@ -1231,7 +1232,7 @@ public class StreamsGroup implements Group {
      * @return The assignment configurations for this streams group.
      */
     public Map<String, String> lastAssignmentConfigs() {
-        return Collections.unmodifiableMap(lastAssignmentConfigs);
+        return Collections.unmodifiableMap(new 
TreeMap<>(lastAssignmentConfigs));
     }
 
     /**
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
index 9b28b591574..8c0772abb9f 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
@@ -45,6 +45,7 @@ import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.STREAMS_
 import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.STREAMS_GROUP_MAX_STANDBY_REPLICAS_DEFAULT;
 import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.STREAMS_GROUP_MIN_HEARTBEAT_INTERVAL_MS_DEFAULT;
 import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_DEFAULT;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.STREAMS_GROUP_MIN_TASK_OFFSET_INTERVAL_MS_DEFAULT;
 import static 
org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig.SHARE_GROUP_MAX_DELIVERY_COUNT_LIMIT_DEFAULT;
 import static 
org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig.SHARE_GROUP_MAX_PARTITION_MAX_RECORD_LOCKS_DEFAULT;
 import static 
org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig.SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_DEFAULT;
@@ -103,6 +104,8 @@ public class GroupConfigTest {
                 assertPropertyInvalid(name, "not_a_number", "1.0");
             } else if 
(GroupConfig.STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG.equals(name)) {
                 assertPropertyInvalid(name, "not_a_number", "-1", "1.0");
+            } else if 
(GroupConfig.STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG.equals(name)) {
+                assertPropertyInvalid(name, "not_a_number", "1.0");
             } else {
                 assertPropertyInvalid(name, "not_a_number", "-0.1");
             }
@@ -261,6 +264,11 @@ public class GroupConfigTest {
         doTestInvalidProps(props, InvalidConfigurationException.class);
         props = createValidGroupConfig();
 
+        // Check for invalid streamsTaskOffsetIntervalMs, < MIN
+        props.put(GroupConfig.STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG, "1000");
+        doTestInvalidProps(props, InvalidConfigurationException.class);
+        props = createValidGroupConfig();
+
         // Check for invalid shareIsolationLevel.
         props.put(GroupConfig.SHARE_ISOLATION_LEVEL_CONFIG, "read_commit");
         doTestInvalidProps(props, ConfigException.class);
@@ -300,6 +308,7 @@ public class GroupConfigTest {
         defaultValue.put(GroupConfig.STREAMS_SESSION_TIMEOUT_MS_CONFIG, 
"2000");
         defaultValue.put(GroupConfig.STREAMS_NUM_STANDBY_REPLICAS_CONFIG, "1");
         
defaultValue.put(GroupConfig.STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG, "3000");
+        defaultValue.put(GroupConfig.STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG, 
"30000");
         defaultValue.put(GroupConfig.SHARE_RENEW_ACKNOWLEDGE_ENABLE_CONFIG, 
"true");
 
         Properties props = new Properties();
@@ -319,6 +328,7 @@ public class GroupConfigTest {
         assertEquals(2000, 
config.getInt(GroupConfig.STREAMS_SESSION_TIMEOUT_MS_CONFIG));
         assertEquals(1, 
config.getInt(GroupConfig.STREAMS_NUM_STANDBY_REPLICAS_CONFIG));
         assertEquals(3000, 
config.getInt(GroupConfig.STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG));
+        assertEquals(30000, 
config.getInt(GroupConfig.STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG));
         assertEquals(true, 
config.getBoolean(GroupConfig.SHARE_RENEW_ACKNOWLEDGE_ENABLE_CONFIG));
     }
 
@@ -441,6 +451,19 @@ public class GroupConfigTest {
         );
     }
 
+    /**
+     * Data source for configs with min-only evaluation (no max bound enforced 
by evaluate).
+     * Each entry: (configKey, tooLow, expectedMax).
+     */
+    private static Stream<Arguments> minBoundedConfigs() {
+        return Stream.of(
+            Arguments.of(
+                GroupConfig.STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG,
+                1000, STREAMS_GROUP_MIN_TASK_OFFSET_INTERVAL_MS_DEFAULT
+            )
+        );
+    }
+
     @ParameterizedTest(name = "testEvaluateValueAboveMaxIsCapped[{0}]")
     @MethodSource("rangeBoundedConfigs")
     public void testEvaluateValueAboveMaxIsCapped(
@@ -487,6 +510,20 @@ public class GroupConfigTest {
         assertEquals(expectedMax, result.get(key));
     }
 
+    @ParameterizedTest(name = 
"testEvaluateMinBoundedValueBelowMinIsCapped[{0}]")
+    @MethodSource("minBoundedConfigs")
+    public void testEvaluateMinBoundedValueBelowMinIsCapped(
+        String key,
+        int tooLow,
+        int expectedMin
+    ) {
+        Properties props = new Properties();
+        props.put(key, tooLow);
+        Properties result = GroupConfig.evaluate(props, "test-group",
+            GroupCoordinatorConfig.fromProps(new HashMap<>()), 
ShareGroupConfig.fromProps(new HashMap<>()));
+        assertEquals(expectedMin, result.get(key));
+    }
+
     private Map<String, String> createValidGroupConfig() {
         Map<String, String> props = new HashMap<>();
         props.put(GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG, "45000");
@@ -502,6 +539,7 @@ public class GroupConfigTest {
         props.put(GroupConfig.STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG, "6000");
         props.put(GroupConfig.STREAMS_NUM_STANDBY_REPLICAS_CONFIG, "1");
         props.put(GroupConfig.STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG, 
"3000");
+        props.put(GroupConfig.STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG, "45000");
         props.put(GroupConfig.SHARE_RENEW_ACKNOWLEDGE_ENABLE_CONFIG, "true");
         return props;
     }
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
index 7319fcebdf4..418b57706fc 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
@@ -261,7 +261,7 @@ public class GroupCoordinatorConfigTest {
     }
 
     @Test
-    public void testInvalidConfigs() {
+    public void testInvalidConsumerConfigs() {
         Map<String, Object> configs = new HashMap<>();
         
configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG,
 10);
         
configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG,
 20);
@@ -346,19 +346,229 @@ public class GroupCoordinatorConfigTest {
         configs.put(GroupCoordinatorConfig.OFFSET_COMMIT_TIMEOUT_MS_CONFIG, 
5000);
         
configs.put(GroupCoordinatorConfig.SHARE_GROUP_INITIALIZE_RETRY_INTERVAL_MS_CONFIG,
 1000);
         assertEquals(5000, 
createConfig(configs).shareGroupInitializeRetryIntervalMs());
+    }
+
+    @Test
+    public void testInvalidStreamsConfigs() {
+        testStreamsSessionTimeoutMs();
+        testStreamsHeartbeatIntervalMs();
+        testStreamsOtherConfigs();
+    }
+
+    private void testStreamsSessionTimeoutMs() {
+        Map<String, Object> configs = new HashMap<>();
+
+        // group.streams.session.timeout.ms
+
+        // must be positive
+        
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG, 0);
+        assertEquals("Invalid value 0 for configuration 
group.streams.session.timeout.ms: Value must be at least 1",
+            assertThrows(ConfigException.class, () -> 
createConfig(configs)).getMessage());
+
+        // cannot be smaller than MIN
+        configs.clear();
+        
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG, 
GroupCoordinatorConfig.STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_DEFAULT - 1);
+        assertEquals("group.streams.session.timeout.ms must be greater than or 
equal to group.streams.min.session.timeout.ms",
+            assertThrows(IllegalArgumentException.class, () -> 
createConfig(configs)).getMessage());
+
+        // can be MIN
+        configs.clear();
+        
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG, 
GroupCoordinatorConfig.STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_DEFAULT);
+        createConfig(configs);
+
+        // can be MAX
+        configs.clear();
+        
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG, 
GroupCoordinatorConfig.STREAMS_GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT);
+        createConfig(configs);
+
+        // cannot be larger than MAX
+        
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG, 
GroupCoordinatorConfig.STREAMS_GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT + 1);
+        assertEquals("group.streams.session.timeout.ms must be less than or 
equal to group.streams.max.session.timeout.ms",
+            assertThrows(IllegalArgumentException.class, () -> 
createConfig(configs)).getMessage());
+
+
+        // group.streams.min.session.timeout.ms
+
+        // must be positive
+        configs.clear();
+        
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, 
0);
+        assertEquals("Invalid value 0 for configuration 
group.streams.min.session.timeout.ms: Value must be at least 1",
+            assertThrows(ConfigException.class, () -> 
createConfig(configs)).getMessage());
+
+        // can be MAX (implies `MAX can be MIN`)
+        configs.clear();
+        
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG, 
GroupCoordinatorConfig.STREAMS_GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT); // 
required
+        
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, 
GroupCoordinatorConfig.STREAMS_GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT); // when
+        createConfig(configs);
+
+        // cannot be larger than MAX (implies `MAX cannot be smaller than MIN`)
+        configs.clear();
+        
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG, 
GroupCoordinatorConfig.STREAMS_GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT + 1); // 
required
+        
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, 
GroupCoordinatorConfig.STREAMS_GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT + 1); // 
when
+        assertEquals("group.streams.max.session.timeout.ms must be greater 
than or equal to group.streams.min.session.timeout.ms",
+            assertThrows(IllegalArgumentException.class, () -> 
createConfig(configs)).getMessage());
+
+        // other case for `streams.group.min.session.timeout.ms` are covered 
in section `session.timeout.ms` above
+
+
+        // group.streams.max.session.timeout.ms
 
+        // must be positive
         configs.clear();
-        
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG,
 45000);
-        
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG,
 60000);
-        
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, 
50000);
-        
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG, 
50000);
+        
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG, 
0);
+        assertEquals("Invalid value 0 for configuration 
group.streams.max.session.timeout.ms: Value must be at least 1",
+            assertThrows(ConfigException.class, () -> 
createConfig(configs)).getMessage());
+
+        // other case for `streams.group.max.session.timeout.ms` are covered 
in sections `session.timeout.ms` and `streams.group.min.session.timeout.ms` 
above
+    }
+
+    private void testStreamsHeartbeatIntervalMs() {
+        Map<String, Object> configs = new HashMap<>();
+
+        // group.streams.heartbeat.interval.ms
+
+        // must be positive
+        
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, 
0);
+        assertEquals("Invalid value 0 for configuration 
group.streams.heartbeat.interval.ms: Value must be at least 1",
+            assertThrows(ConfigException.class, () -> 
createConfig(configs)).getMessage());
+
+        // cannot be smaller than MIN
+        configs.clear();
+        
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, 
GroupCoordinatorConfig.STREAMS_GROUP_MIN_HEARTBEAT_INTERVAL_MS_DEFAULT - 1);
+        assertEquals("group.streams.heartbeat.interval.ms must be greater than 
or equal to group.streams.min.heartbeat.interval.ms",
+            assertThrows(IllegalArgumentException.class, () -> 
createConfig(configs)).getMessage());
+
+        // can be MIN
+        configs.clear();
+        
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, 
GroupCoordinatorConfig.STREAMS_GROUP_MIN_HEARTBEAT_INTERVAL_MS_DEFAULT);
+        createConfig(configs);
+
+        // can be MAX
+        configs.clear();
+        
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, 
GroupCoordinatorConfig.STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DEFAULT);
+        createConfig(configs);
+
+        // cannot be larger than MAX
+        configs.clear();
+        
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, 
GroupCoordinatorConfig.STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DEFAULT + 1);
+        assertEquals("group.streams.heartbeat.interval.ms must be less than or 
equal to group.streams.max.heartbeat.interval.ms",
+            assertThrows(IllegalArgumentException.class, () -> 
createConfig(configs)).getMessage());
+
+        // can be smaller than session timeout
+        configs.clear();
+        
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG,
 GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_DEFAULT - 1); // 
required
+        
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, 
GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_DEFAULT - 1);
+        createConfig(configs);
+
+        // cannot be same than session timeout
+        configs.clear();
+        
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG,
 GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_DEFAULT); // required
+        
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, 
GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_DEFAULT);
         assertEquals("group.streams.heartbeat.interval.ms must be less than 
group.streams.session.timeout.ms",
             assertThrows(IllegalArgumentException.class, () -> 
createConfig(configs)).getMessage());
 
+
+        // group.streams.min.heartbeat.interval.ms
+
+        // must be positive
+        configs.clear();
+        
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG,
 0);
+        assertEquals("Invalid value 0 for configuration 
group.streams.min.heartbeat.interval.ms: Value must be at least 1",
+            assertThrows(ConfigException.class, () -> 
createConfig(configs)).getMessage());
+
+        // can be MAX (implies `MAX can be MIN`)
+        configs.clear();
+        
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, 
GroupCoordinatorConfig.STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DEFAULT); // 
required
+        
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG,
 GroupCoordinatorConfig.STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DEFAULT); // 
when
+        createConfig(configs);
+
+        // cannot be larger than MAX (implies `MAX cannot be smaller than MIN`)
+        configs.clear();
+        
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, 
GroupCoordinatorConfig.STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DEFAULT + 1); // 
required
+        
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG,
 GroupCoordinatorConfig.STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DEFAULT + 1); 
// when
+        assertEquals("group.streams.max.heartbeat.interval.ms must be greater 
than or equal to group.streams.min.heartbeat.interval.ms",
+            assertThrows(IllegalArgumentException.class, () -> 
createConfig(configs)).getMessage());
+
+        // other case for `streams.group.min.heartbeat.interval.ms` covered in 
`session.timeout.ms` section
+
+
+        // group.streams.max.heartbeat.interval.ms
+
+        // must be positive
+        configs.clear();
+        
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG,
 0);
+        assertEquals("Invalid value 0 for configuration 
group.streams.max.heartbeat.interval.ms: Value must be at least 1",
+            assertThrows(ConfigException.class, () -> 
createConfig(configs)).getMessage());
+
+        // other case for `streams.group.max.heartbeat.interval.ms` covered in 
`session.timeout.ms` and `streams.group.mix.heartbeat.interval.ms` section
+    }
+
+    private void testStreamsOtherConfigs() {
+        Map<String, Object> configs = new HashMap<>();
+
+        // group.streams.max.size
+
+        // must be positive
+        configs.put(GroupCoordinatorConfig.STREAMS_GROUP_MAX_SIZE_CONFIG, 0);
+        assertEquals("Invalid value 0 for configuration 
group.streams.max.size: Value must be at least 1",
+            assertThrows(ConfigException.class, () -> 
createConfig(configs)).getMessage());
+
+
+        // group.streams.num.standby.replicas
+
+        // cannot be negative
+        configs.clear();
+        
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_NUM_STANDBY_REPLICAS_CONFIG, 
-1);
+        assertEquals("Invalid value -1 for configuration 
group.streams.num.standby.replicas: Value must be at least 0",
+            assertThrows(ConfigException.class, () -> 
createConfig(configs)).getMessage());
+
+        // can be MAX
+        configs.clear();
+        
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_NUM_STANDBY_REPLICAS_CONFIG, 
GroupCoordinatorConfig.STREAMS_GROUP_MAX_STANDBY_REPLICAS_DEFAULT);
+        createConfig(configs);
+
+        // cannot be larger than MAX
+        configs.clear();
+        
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_NUM_STANDBY_REPLICAS_CONFIG, 
GroupCoordinatorConfig.STREAMS_GROUP_MAX_STANDBY_REPLICAS_DEFAULT + 1);
+        assertEquals("group.streams.num.standby.replicas must be less than or 
equal to group.streams.max.standby.replicas",
+            assertThrows(IllegalArgumentException.class, () -> 
createConfig(configs)).getMessage());
+
+
+        // group.streams.max.num.standby.replicas
+
+        // cannot be negative
+        configs.clear();
+        
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_MAX_STANDBY_REPLICAS_CONFIG, 
-1);
+        assertEquals("Invalid value -1 for configuration 
group.streams.max.standby.replicas: Value must be at least 0",
+            assertThrows(ConfigException.class, () -> 
createConfig(configs)).getMessage());
+
+
+        // group.streams.initial.rebalance.delay.ms
+
+        // cannot be negative
         configs.clear();
         
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG,
 -1);
         assertEquals("Invalid value -1 for configuration 
group.streams.initial.rebalance.delay.ms: Value must be at least 0",
             assertThrows(ConfigException.class, () -> 
createConfig(configs)).getMessage());
+
+        // group.streams.task.offset.interval.ms
+
+        // must be positive
+        configs.clear();
+        
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_CONFIG,
 0);
+        assertEquals("Invalid value 0 for configuration 
group.streams.task.offset.interval.ms: Value must be at least 1",
+            assertThrows(ConfigException.class, () -> 
createConfig(configs)).getMessage());
+
+        // cannot be smaller than MIN
+        configs.clear();
+        
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_CONFIG,
 GroupCoordinatorConfig.STREAMS_GROUP_MIN_TASK_OFFSET_INTERVAL_MS_DEFAULT - 1);
+        assertEquals("group.streams.task.offset.interval.ms must be greater 
than or equal to group.streams.min.task.offset.interval.ms",
+            assertThrows(IllegalArgumentException.class, () -> 
createConfig(configs)).getMessage());
+
+        // can be MIN
+        configs.clear();
+        
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_CONFIG,
 GroupCoordinatorConfig.STREAMS_GROUP_MIN_TASK_OFFSET_INTERVAL_MS_DEFAULT);
+        createConfig(configs);
     }
 
     @Test
@@ -464,6 +674,40 @@ public class GroupCoordinatorConfigTest {
         assertEquals(7000, config.streamsGroupInitialRebalanceDelayMs());
     }
 
+    @Test
+    public void testStreamsGroupTaskOffsetIntervalDefaultValue() {
+        Map<String, Object> configs = new HashMap<>();
+        GroupCoordinatorConfig config = createConfig(configs);
+        assertEquals(60000, config.streamsGroupTaskOffsetIntervalMs());
+        
assertEquals(GroupCoordinatorConfig.STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_DEFAULT,
+            config.streamsGroupTaskOffsetIntervalMs());
+    }
+
+    @Test
+    public void testStreamsGroupTaskOffsetIntervalCustomValue() {
+        Map<String, Object> configs = new HashMap<>();
+        
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_CONFIG,
 45000);
+        GroupCoordinatorConfig config = createConfig(configs);
+        assertEquals(45000, config.streamsGroupTaskOffsetIntervalMs());
+    }
+
+    @Test
+    public void testStreamsGroupMinTaskOffsetIntervalDefaultValue() {
+        Map<String, Object> configs = new HashMap<>();
+        GroupCoordinatorConfig config = createConfig(configs);
+        assertEquals(15000, config.streamsGroupMinTaskOffsetIntervalMs());
+        
assertEquals(GroupCoordinatorConfig.STREAMS_GROUP_MIN_TASK_OFFSET_INTERVAL_MS_DEFAULT,
+            config.streamsGroupMinTaskOffsetIntervalMs());
+    }
+
+    @Test
+    public void testStreamsGroupMinTaskOffsetIntervalCustomValue() {
+        Map<String, Object> configs = new HashMap<>();
+        
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_MIN_TASK_OFFSET_INTERVAL_MS_CONFIG,
 20000);
+        GroupCoordinatorConfig config = createConfig(configs);
+        assertEquals(20000, config.streamsGroupMinTaskOffsetIntervalMs());
+    }
+
     public static GroupCoordinatorConfig createConfig(Map<String, Object> 
configs) {
         return new GroupCoordinatorConfig(new AbstractConfig(
             GroupCoordinatorConfig.CONFIG_DEF,
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index a7a67f4e415..fdb47f417f2 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -174,6 +174,7 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
+import java.util.TreeMap;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.function.Function;
@@ -200,6 +201,7 @@ import static 
org.apache.kafka.coordinator.group.GroupConfig.SHARE_SESSION_TIMEO
 import static 
org.apache.kafka.coordinator.group.GroupConfig.STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG;
 import static 
org.apache.kafka.coordinator.group.GroupConfig.STREAMS_NUM_STANDBY_REPLICAS_CONFIG;
 import static 
org.apache.kafka.coordinator.group.GroupConfig.STREAMS_SESSION_TIMEOUT_MS_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupConfig.STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG;
 import static 
org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newShareGroupStatePartitionMetadataRecord;
 import static 
org.apache.kafka.coordinator.group.GroupMetadataManager.EMPTY_RESULT;
 import static 
org.apache.kafka.coordinator.group.GroupMetadataManager.appendGroupMetadataErrorToResponseError;
@@ -1201,7 +1203,8 @@ public class GroupMetadataManagerTest {
                         .setPartitions(List.of(0, 1, 2))))
                 .setStandbyTasks(List.of())
                 .setWarmupTasks(List.of())
-                .setStatus(List.of()),
+                .setStatus(List.of())
+                .setTaskOffsetIntervalMs(60_000),
             result.response().data()
         );
     }
@@ -17461,7 +17464,8 @@ public class GroupMetadataManagerTest {
                 ))
                 .setStandbyTasks(List.of())
                 .setWarmupTasks(List.of())
-                .setStatus(List.of()),
+                .setStatus(List.of())
+                .setTaskOffsetIntervalMs(60_000),
             result.response().data()
         );
 
@@ -17481,7 +17485,15 @@ public class GroupMetadataManagerTest {
         List<CoordinatorRecord> expectedRecords = List.of(
             
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId, 
expectedMember),
             
StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(groupId, 
topology),
-            
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 2, 
groupMetadataHash, 0, Map.of("num.standby.replicas", "0")),
+            StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(
+                groupId,
+                2,
+                groupMetadataHash,
+                0,
+                new TreeMap<>(Map.of(
+                    "num.standby.replicas", "0"
+                ))
+            ),
             
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, 
memberId,
                 TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
                     TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 
4, 5),
@@ -17547,7 +17559,8 @@ public class GroupMetadataManagerTest {
                 ))
                 .setStandbyTasks(List.of())
                 .setWarmupTasks(List.of())
-                .setStatus(List.of()),
+                .setStatus(List.of())
+                .setTaskOffsetIntervalMs(60_000),
             result.response().data()
         );
 
@@ -17640,7 +17653,8 @@ public class GroupMetadataManagerTest {
                 .setWarmupTasks(List.of())
                 .setStatus(List.of(new 
StreamsGroupHeartbeatResponseData.Status()
                     .setStatusCode(Status.MISSING_SOURCE_TOPICS.code())
-                    .setStatusDetail("Source topics bar are missing."))),
+                    .setStatusDetail("Source topics bar are missing.")))
+                .setTaskOffsetIntervalMs(60_000),
             result.response().data()
         );
 
@@ -17656,9 +17670,15 @@ public class GroupMetadataManagerTest {
         List<CoordinatorRecord> expectedRecords = List.of(
             
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId, 
expectedMember),
             
StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(groupId, 
topology),
-            
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 2, 
computeGroupHash(Map.of(
-                fooTopicName, computeTopicHash(fooTopicName, metadataImage)
-            )), -1, Map.of("num.standby.replicas", "0")),
+            StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(
+                groupId,
+                2,
+                computeGroupHash(Map.of(fooTopicName, 
computeTopicHash(fooTopicName, metadataImage))),
+                -1,
+                new TreeMap<>(Map.of(
+                    "num.standby.replicas", "0"
+                ))
+            ),
             
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, 
memberId, TasksTuple.EMPTY),
             
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentMetadataRecord(groupId,
 2, context.time.milliseconds()),
             
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId, 
expectedMember)
@@ -17726,7 +17746,8 @@ public class GroupMetadataManagerTest {
                 .setWarmupTasks(List.of())
                 .setStatus(List.of(new 
StreamsGroupHeartbeatResponseData.Status()
                     .setStatusCode(Status.MISSING_INTERNAL_TOPICS.code())
-                    .setStatusDetail("Internal topics are missing: bar"))),
+                    .setStatusDetail("Internal topics are missing: bar")))
+                .setTaskOffsetIntervalMs(60_000),
             result.response().data()
         );
 
@@ -17742,9 +17763,15 @@ public class GroupMetadataManagerTest {
         List<CoordinatorRecord> expectedRecords = List.of(
             
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId, 
expectedMember),
             
StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(groupId, 
topology),
-            
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 2, 
computeGroupHash(Map.of(
-                fooTopicName, computeTopicHash(fooTopicName, metadataImage)
-            )), -1, Map.of("num.standby.replicas", "0")),
+            StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(
+                groupId,
+                2,
+                computeGroupHash(Map.of(fooTopicName, 
computeTopicHash(fooTopicName, metadataImage))),
+                -1,
+                new TreeMap<>(Map.of(
+                    "num.standby.replicas", "0"
+                ))
+            ),
             
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, 
memberId, TasksTuple.EMPTY),
             
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentMetadataRecord(groupId,
 2, context.time.milliseconds()),
             
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId, 
expectedMember)
@@ -17809,7 +17836,8 @@ public class GroupMetadataManagerTest {
                 .setWarmupTasks(List.of())
                 .setStatus(List.of(new 
StreamsGroupHeartbeatResponseData.Status()
                     
.setStatusCode(Status.INCORRECTLY_PARTITIONED_TOPICS.code())
-                    .setStatusDetail("Following topics do not have the same 
number of partitions: [{bar=3, foo=6}]"))),
+                    .setStatusDetail("Following topics do not have the same 
number of partitions: [{bar=3, foo=6}]")))
+                .setTaskOffsetIntervalMs(60_000),
             result.response().data()
         );
 
@@ -17825,10 +17853,18 @@ public class GroupMetadataManagerTest {
         List<CoordinatorRecord> expectedRecords = List.of(
             
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId, 
expectedMember),
             
StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(groupId, 
topology),
-            
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 2, 
computeGroupHash(Map.of(
-                fooTopicName, computeTopicHash(fooTopicName, metadataImage),
-                barTopicName, computeTopicHash(barTopicName, metadataImage)
-            )), -1, Map.of("num.standby.replicas", "0")),
+            StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(
+                groupId,
+                2,
+                computeGroupHash(Map.of(
+                    fooTopicName, computeTopicHash(fooTopicName, 
metadataImage),
+                    barTopicName, computeTopicHash(barTopicName, metadataImage)
+                )),
+                -1,
+                new TreeMap<>(Map.of(
+                    "num.standby.replicas", "0"
+                ))
+            ),
             
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, 
memberId, TasksTuple.EMPTY),
             
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentMetadataRecord(groupId,
 2, context.time.milliseconds()),
             
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId, 
expectedMember)
@@ -17907,7 +17943,8 @@ public class GroupMetadataManagerTest {
                 .setWarmupTasks(List.of())
                 .setStatus(List.of(new 
StreamsGroupHeartbeatResponseData.Status()
                     .setStatusCode(Status.STALE_TOPOLOGY.code())
-                    .setStatusDetail("The member's topology epoch 0 is behind 
the group's topology epoch 1."))),
+                    .setStatusDetail("The member's topology epoch 0 is behind 
the group's topology epoch 1.")))
+                .setTaskOffsetIntervalMs(60_000),
             result.response().data()
         );
 
@@ -17922,10 +17959,18 @@ public class GroupMetadataManagerTest {
 
         List<CoordinatorRecord> expectedRecords = List.of(
             
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId, 
expectedMember),
-            
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 11, 
computeGroupHash(Map.of(
-                fooTopicName, computeTopicHash(fooTopicName, metadataImage),
-                barTopicName, computeTopicHash(barTopicName, metadataImage)
-            )), 1, Map.of("num.standby.replicas", "0")),
+            StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(
+                groupId,
+                11,
+                computeGroupHash(Map.of(
+                    fooTopicName, computeTopicHash(fooTopicName, 
metadataImage),
+                    barTopicName, computeTopicHash(barTopicName, metadataImage)
+                )),
+                1,
+                new TreeMap<>(Map.of(
+                    "num.standby.replicas", "0"
+                ))
+            ),
             
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, 
memberId, TasksTuple.EMPTY),
             
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentMetadataRecord(groupId,
 11, context.time.milliseconds()),
             
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId, 
expectedMember)
@@ -18001,7 +18046,8 @@ public class GroupMetadataManagerTest {
                     new StreamsGroupHeartbeatResponseData.Status()
                         .setStatusCode(Status.SHUTDOWN_APPLICATION.code())
                         .setStatusDetail(statusDetail)
-                )),
+                ))
+                .setTaskOffsetIntervalMs(60_000),
             result1.response().data()
         );
         assertRecordsEquals(List.of(), result1.records());
@@ -18022,7 +18068,8 @@ public class GroupMetadataManagerTest {
                     new StreamsGroupHeartbeatResponseData.Status()
                         .setStatusCode(Status.SHUTDOWN_APPLICATION.code())
                         .setStatusDetail(statusDetail)
-                )),
+                ))
+                .setTaskOffsetIntervalMs(60_000),
             result2.response().data()
         );
 
@@ -18115,7 +18162,8 @@ public class GroupMetadataManagerTest {
                     new StreamsGroupHeartbeatResponseData.Status()
                         .setStatusCode(Status.SHUTDOWN_APPLICATION.code())
                         .setStatusDetail(statusDetail)
-                )),
+                ))
+                .setTaskOffsetIntervalMs(60_000),
             result2.response().data()
         );
     }
@@ -18195,7 +18243,8 @@ public class GroupMetadataManagerTest {
                 ))
                 .setStandbyTasks(List.of())
                 .setWarmupTasks(List.of())
-                .setStatus(List.of()),
+                .setStatus(List.of())
+                .setTaskOffsetIntervalMs(60_000),
             result.response().data()
         );
 
@@ -18224,7 +18273,15 @@ public class GroupMetadataManagerTest {
 
         List<CoordinatorRecord> expectedRecords = List.of(
             
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId, 
expectedMember),
-            
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 11, 
groupMetadataHash, 0, Map.of("num.standby.replicas", "0")),
+            StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(
+                groupId,
+                11,
+                groupMetadataHash,
+                0,
+                new TreeMap<>(Map.of(
+                    "num.standby.replicas", "0"
+                ))
+            ),
             
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, 
memberId,
                 TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
                     TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 
4, 5),
@@ -18316,7 +18373,8 @@ public class GroupMetadataManagerTest {
                 ))
                 .setStandbyTasks(List.of())
                 .setWarmupTasks(List.of())
-                .setStatus(List.of()),
+                .setStatus(List.of())
+                .setTaskOffsetIntervalMs(60_000),
             result.response().data()
         );
 
@@ -18344,10 +18402,18 @@ public class GroupMetadataManagerTest {
             .build();
 
         List<CoordinatorRecord> expectedRecords = List.of(
-            
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 11, 
computeGroupHash(Map.of(
-                fooTopicName, computeTopicHash(fooTopicName, newMetadataImage),
-                barTopicName, computeTopicHash(barTopicName, newMetadataImage)
-            )), 0,  Map.of("num.standby.replicas", "0")),
+            StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(
+                groupId,
+                11,
+                computeGroupHash(Map.of(
+                    fooTopicName, computeTopicHash(fooTopicName, 
newMetadataImage),
+                    barTopicName, computeTopicHash(barTopicName, 
newMetadataImage)
+                )),
+                0,
+                new TreeMap<>(Map.of(
+                    "num.standby.replicas", "0"
+                ))
+            ),
             
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, 
memberId,
                 TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
                     TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 
4, 5),
@@ -18456,7 +18522,8 @@ public class GroupMetadataManagerTest {
                 .setActiveTasks(List.of())
                 .setStandbyTasks(List.of())
                 .setWarmupTasks(List.of())
-                .setStatus(List.of()),
+                .setStatus(List.of())
+                .setTaskOffsetIntervalMs(60_000),
 
             result.response().data()
         );
@@ -18584,7 +18651,8 @@ public class GroupMetadataManagerTest {
                         .setPartitions(List.of(0, 1))))
                 .setStandbyTasks(List.of())
                 .setWarmupTasks(List.of())
-                .setStatus(List.of()),
+                .setStatus(List.of())
+                .setTaskOffsetIntervalMs(60_000),
             result.response().data()
         );
 
@@ -18600,7 +18668,8 @@ public class GroupMetadataManagerTest {
                 .setMemberId(memberId)
                 .setMemberEpoch(2)
                 .setHeartbeatIntervalMs(5000)
-                .setStatus(List.of()),
+                .setStatus(List.of())
+                .setTaskOffsetIntervalMs(60_000),
             result.response().data()
         );
     }
@@ -18653,7 +18722,8 @@ public class GroupMetadataManagerTest {
                         .setPartitions(List.of(0, 1))))
                 .setStandbyTasks(List.of())
                 .setWarmupTasks(List.of())
-                .setStatus(List.of()),
+                .setStatus(List.of())
+                .setTaskOffsetIntervalMs(60_000),
             result.response().data()
         );
 
@@ -18669,7 +18739,8 @@ public class GroupMetadataManagerTest {
                 .setMemberId(memberId)
                 .setMemberEpoch(2)
                 .setHeartbeatIntervalMs(5000)
-                .setStatus(List.of()),
+                .setStatus(List.of())
+                .setTaskOffsetIntervalMs(60_000),
             result.response().data()
         );
     }
@@ -18721,7 +18792,8 @@ public class GroupMetadataManagerTest {
                     new StreamsGroupHeartbeatResponseData.Status()
                         .setStatusCode(Status.ASSIGNMENT_DELAYED.code())
                         .setStatusDetail("Assignment delayed due to the 
configured initial rebalance delay.")
-                )),
+                ))
+                .setTaskOffsetIntervalMs(60_000),
             result.response().data()
         );
 
@@ -18750,7 +18822,8 @@ public class GroupMetadataManagerTest {
                         .setPartitions(List.of(0, 1))))
                 .setStandbyTasks(List.of())
                 .setWarmupTasks(List.of())
-                .setStatus(List.of()),
+                .setStatus(List.of())
+                .setTaskOffsetIntervalMs(60_000),
             result.response().data()
         );
     }
@@ -18917,7 +18990,8 @@ public class GroupMetadataManagerTest {
                 .setActiveTasks(List.of())
                 .setStandbyTasks(List.of())
                 .setWarmupTasks(List.of())
-                .setStatus(List.of()),
+                .setStatus(List.of())
+                .setTaskOffsetIntervalMs(60_000),
             result.response().data()
         );
 
@@ -18959,7 +19033,8 @@ public class GroupMetadataManagerTest {
                 ))
                 .setStandbyTasks(List.of())
                 .setWarmupTasks(List.of())
-                .setStatus(List.of()),
+                .setStatus(List.of())
+                .setTaskOffsetIntervalMs(60_000),
             result.response().data()
         );
 
@@ -19005,7 +19080,8 @@ public class GroupMetadataManagerTest {
                 ))
                 .setStandbyTasks(List.of())
                 .setWarmupTasks(List.of())
-                .setStatus(List.of()),
+                .setStatus(List.of())
+                .setTaskOffsetIntervalMs(60_000),
             result.response().data()
         );
 
@@ -19039,7 +19115,8 @@ public class GroupMetadataManagerTest {
                 .setMemberId(memberId3)
                 .setMemberEpoch(11)
                 .setHeartbeatIntervalMs(5000)
-                .setStatus(List.of()),
+                .setStatus(List.of())
+                .setTaskOffsetIntervalMs(60_000),
             result.response().data()
         );
 
@@ -19079,7 +19156,8 @@ public class GroupMetadataManagerTest {
                 .setMemberId(memberId1)
                 .setMemberEpoch(11)
                 .setHeartbeatIntervalMs(5000)
-                .setStatus(List.of()),
+                .setStatus(List.of())
+                .setTaskOffsetIntervalMs(60_000),
             result.response().data()
         );
 
@@ -19109,7 +19187,8 @@ public class GroupMetadataManagerTest {
                 .setMemberId(memberId2)
                 .setMemberEpoch(10)
                 .setHeartbeatIntervalMs(5000)
-                .setStatus(List.of()),
+                .setStatus(List.of())
+                .setTaskOffsetIntervalMs(60_000),
             result.response().data()
         );
 
@@ -19136,7 +19215,8 @@ public class GroupMetadataManagerTest {
                         .setPartitions(List.of(1))))
                 .setStandbyTasks(List.of())
                 .setWarmupTasks(List.of())
-                .setStatus(List.of()),
+                .setStatus(List.of())
+                .setTaskOffsetIntervalMs(60_000),
             result.response().data()
         );
 
@@ -19167,7 +19247,8 @@ public class GroupMetadataManagerTest {
                 .setMemberId(memberId3)
                 .setMemberEpoch(11)
                 .setHeartbeatIntervalMs(5000)
-                .setStatus(List.of()),
+                .setStatus(List.of())
+                .setTaskOffsetIntervalMs(60_000),
             result.response().data()
         );
 
@@ -19210,7 +19291,8 @@ public class GroupMetadataManagerTest {
                 ))
                 .setStandbyTasks(List.of())
                 .setWarmupTasks(List.of())
-                .setStatus(List.of()),
+                .setStatus(List.of())
+                .setTaskOffsetIntervalMs(60_000),
             result.response().data()
         );
 
@@ -19257,7 +19339,8 @@ public class GroupMetadataManagerTest {
                         .setPartitions(List.of(1))))
                 .setStandbyTasks(List.of())
                 .setWarmupTasks(List.of())
-                .setStatus(List.of()),
+                .setStatus(List.of())
+                .setTaskOffsetIntervalMs(60_000),
             result.response().data()
         );
 
@@ -19468,7 +19551,8 @@ public class GroupMetadataManagerTest {
                 ))
                 .setStandbyTasks(List.of())
                 .setWarmupTasks(List.of())
-                .setStatus(List.of()),
+                .setStatus(List.of())
+                .setTaskOffsetIntervalMs(60_000),
             result.response().data()
         );
 
@@ -19489,9 +19573,15 @@ public class GroupMetadataManagerTest {
             .build();
 
         List<CoordinatorRecord> expectedRecords = List.of(
-            
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 11, 
computeGroupHash(Map.of(
-                fooTopicName, computeTopicHash(fooTopicName, metadataImage)
-            )), 0, Map.of("num.standby.replicas", "0")),
+            StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(
+                groupId,
+                11,
+                computeGroupHash(Map.of(fooTopicName, 
computeTopicHash(fooTopicName, metadataImage))),
+                0,
+                new TreeMap<>(Map.of(
+                    "num.standby.replicas", "0"
+                ))
+            ),
             
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, 
memberId,
                 TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
                     TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 
4, 5)
@@ -19598,7 +19688,8 @@ public class GroupMetadataManagerTest {
                 ))
                 .setStandbyTasks(List.of())
                 .setWarmupTasks(List.of())
-                .setStatus(List.of()),
+                .setStatus(List.of())
+                .setTaskOffsetIntervalMs(60_000),
             result.response().data()
         );
 
@@ -19611,9 +19702,15 @@ public class GroupMetadataManagerTest {
             .build();
 
         List<CoordinatorRecord> expectedRecords = List.of(
-            
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 11, 
computeGroupHash(Map.of(
-                fooTopicName, computeTopicHash(fooTopicName, metadataImage)
-            )), 0, Map.of("num.standby.replicas", "0")),
+            StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(
+                groupId,
+                11,
+                computeGroupHash(Map.of(fooTopicName, 
computeTopicHash(fooTopicName, metadataImage))),
+                0,
+                new TreeMap<>(Map.of(
+                    "num.standby.replicas", "0"
+                ))
+            ),
             
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, 
memberId,
                 TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
                     TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 
4, 5)
@@ -19764,7 +19861,15 @@ public class GroupMetadataManagerTest {
                         
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentTombstoneRecord(groupId,
 memberId),
                         
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentTombstoneRecord(groupId,
 memberId),
                         
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord(groupId, 
memberId),
-                        
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 3, 
groupMetadataHash, 0, Map.of("num.standby.replicas", "0"))
+                        
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(
+                            groupId,
+                            3,
+                            groupMetadataHash,
+                            0,
+                            new TreeMap<>(Map.of(
+                                "num.standby.replicas", "0"
+                            ))
+                        )
                     )
                 )
             )),
@@ -19825,7 +19930,8 @@ public class GroupMetadataManagerTest {
                         .setPartitions(List.of(0, 1, 2))))
                 .setStandbyTasks(List.of())
                 .setWarmupTasks(List.of())
-                .setStatus(List.of()),
+                .setStatus(List.of())
+                .setTaskOffsetIntervalMs(60_000),
             result.response().data()
         );
 
@@ -19864,7 +19970,8 @@ public class GroupMetadataManagerTest {
                 .setActiveTasks(List.of())
                 .setStandbyTasks(List.of())
                 .setWarmupTasks(List.of())
-                .setStatus(List.of()),
+                .setStatus(List.of())
+                .setTaskOffsetIntervalMs(60_000),
             result.response().data()
         );
 
@@ -19893,7 +20000,8 @@ public class GroupMetadataManagerTest {
                         .setPartitions(List.of(0, 1))))
                 .setStandbyTasks(List.of())
                 .setWarmupTasks(List.of())
-                .setStatus(List.of()),
+                .setStatus(List.of())
+                .setTaskOffsetIntervalMs(60_000),
             result.response().data()
         );
 
@@ -19925,7 +20033,8 @@ public class GroupMetadataManagerTest {
                 .setMemberEpoch(3)
                 .setHeartbeatIntervalMs(5000)
                 .setEndpointInformationEpoch(0)
-                .setStatus(List.of()),
+                .setStatus(List.of())
+                .setTaskOffsetIntervalMs(60_000),
             result.response().data()
         );
 
@@ -19990,7 +20099,8 @@ public class GroupMetadataManagerTest {
                         .setPartitions(List.of(0, 1, 2))))
                 .setStandbyTasks(List.of())
                 .setWarmupTasks(List.of())
-                .setStatus(List.of()),
+                .setStatus(List.of())
+                .setTaskOffsetIntervalMs(60_000),
             result.response().data()
         );
 
@@ -20029,7 +20139,8 @@ public class GroupMetadataManagerTest {
                 .setActiveTasks(List.of())
                 .setStandbyTasks(List.of())
                 .setWarmupTasks(List.of())
-                .setStatus(List.of()),
+                .setStatus(List.of())
+                .setTaskOffsetIntervalMs(60_000),
             result.response().data()
         );
 
@@ -20057,7 +20168,8 @@ public class GroupMetadataManagerTest {
                         .setPartitions(List.of(0, 1))))
                 .setStandbyTasks(List.of())
                 .setWarmupTasks(List.of())
-                .setStatus(List.of()),
+                .setStatus(List.of())
+                .setTaskOffsetIntervalMs(60_000),
             result.response().data()
         );
 
@@ -20073,7 +20185,15 @@ public class GroupMetadataManagerTest {
                         
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentTombstoneRecord(groupId,
 memberId1),
                         
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentTombstoneRecord(groupId,
 memberId1),
                         
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord(groupId, 
memberId1),
-                        
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 4, 
groupMetadataHash, 0, Map.of("num.standby.replicas", "0"))
+                        
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(
+                            groupId,
+                            4,
+                            groupMetadataHash,
+                            0,
+                            new TreeMap<>(Map.of(
+                                "num.standby.replicas", "0"
+                            ))
+                        )
                     )
                 )
             )),
@@ -20240,7 +20360,8 @@ public class GroupMetadataManagerTest {
                         .setStandbyTasks(List.of())
                         .setWarmupTasks(List.of())
                         
.setPartitionsByUserEndpoint(List.of(expectedEndpointToPartitions))
-                        .setStatus(List.of()),
+                        .setStatus(List.of())
+                        .setTaskOffsetIntervalMs(60_000),
                 result.response().data()
         );
 
@@ -20417,7 +20538,8 @@ public class GroupMetadataManagerTest {
                         .setPartitions(List.of(0, 1, 2))))
                 .setStandbyTasks(List.of())
                 .setWarmupTasks(List.of())
-                .setStatus(List.of()),
+                .setStatus(List.of())
+                .setTaskOffsetIntervalMs(60_000),
             result.response().data()
         );
 
@@ -20538,7 +20660,15 @@ public class GroupMetadataManagerTest {
                 
GroupCoordinatorRecordHelpers.newGroupMetadataTombstoneRecord(classicGroupId),
                 
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(classicGroupId, 
expectedMember),
                 
StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(classicGroupId, 
topology),
-                
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(classicGroupId, 
2, 0, -1, Map.of("num.standby.replicas", "0")),
+                StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(
+                    classicGroupId,
+                    2,
+                    0,
+                    -1,
+                    new TreeMap<>(Map.of(
+                        "num.standby.replicas", "0"
+                    ))
+                ),
                 
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(classicGroupId,
 memberId, TasksTuple.EMPTY),
                 
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentMetadataRecord(classicGroupId,
 2, context.time.milliseconds()),
                 
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(classicGroupId,
 expectedMember)
@@ -20858,7 +20988,10 @@ public class GroupMetadataManagerTest {
                     .setStandbyTasks(List.of())
                     .setWarmupTasks(List.of()));
         assertEquals(2, result.response().data().memberEpoch());
-        assertEquals(Map.of("num.standby.replicas", "0"), 
assignor.lastPassedAssignmentConfigs());
+        assertEquals(
+            Map.of("num.standby.replicas", "0"),
+            assignor.lastPassedAssignmentConfigs()
+        );
 
         // Verify heartbeat interval
         
assertEquals(GroupCoordinatorConfig.STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT,
 result.response().data().heartbeatIntervalMs());
@@ -20894,7 +21027,10 @@ public class GroupMetadataManagerTest {
         context.assertSessionTimeout(groupId, memberId, 50000);
 
         // Verify that the new number of standby replicas is used
-        assertEquals(Map.of("num.standby.replicas", "2"), 
assignor.lastPassedAssignmentConfigs());
+        assertEquals(
+            Map.of("num.standby.replicas", "2"),
+            assignor.lastPassedAssignmentConfigs()
+        );
 
         // Advance time.
         assertEquals(
@@ -21116,15 +21252,16 @@ public class GroupMetadataManagerTest {
                     .setWarmupTasks(List.of()));
         assertEquals(2, result.response().data().memberEpoch());
 
-        // Verify default heartbeat interval, session timeout, and 
num.standby.replicas before config update.
+        // Verify default heartbeat interval, session timeout, 
num.standby.replicas, task.offset.interval before config update.
         
assertEquals(GroupCoordinatorConfig.STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT,
             result.response().data().heartbeatIntervalMs());
         context.assertSessionTimeout(groupId, memberId,
             GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_DEFAULT);
-        assertEquals(Map.of("num.standby.replicas",
-                
String.valueOf(GroupCoordinatorConfig.STREAMS_GROUP_NUM_STANDBY_REPLICAS_DEFAULT)),
+        assertEquals(
+            Map.of(
+                "num.standby.replicas", 
String.valueOf(GroupCoordinatorConfig.STREAMS_GROUP_NUM_STANDBY_REPLICAS_DEFAULT)
+            ),
             assignor.lastPassedAssignmentConfigs());
-
         // Advance time.
         assertEquals(
             List.of(),
@@ -21134,10 +21271,12 @@ public class GroupMetadataManagerTest {
         // Dynamic update group config with out-of-range values.
         // Session timeout 70000 exceeds max 60000; heartbeat interval 1 is 
below min 5000;
         // num standby replicas 100 exceeds max 2.
+        // task offset interval 100 exceeds min 15000.
         Properties newGroupConfig = new Properties();
         newGroupConfig.put(STREAMS_SESSION_TIMEOUT_MS_CONFIG, 70000);
         newGroupConfig.put(STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG, 1);
         newGroupConfig.put(STREAMS_NUM_STANDBY_REPLICAS_CONFIG, 100);
+        newGroupConfig.put(STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG, 100);
         context.updateGroupConfig(groupId, newGroupConfig);
 
         // Session timer is rescheduled on second heartbeat, new assignment 
with evaluated parameter is calculated.
@@ -21156,9 +21295,12 @@ public class GroupMetadataManagerTest {
         context.assertSessionTimeout(groupId, memberId,
             
GroupCoordinatorConfig.STREAMS_GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT);
 
-        // Verify that the number of standby replicas is evaluated to max.
-        assertEquals(Map.of("num.standby.replicas",
-                
String.valueOf(GroupCoordinatorConfig.STREAMS_GROUP_MAX_STANDBY_REPLICAS_DEFAULT)),
+        // Verify that the number of standby replicas is evaluated to max,
+        // and task offset interval is evaluated to min
+        assertEquals(
+            Map.of(
+                "num.standby.replicas", 
String.valueOf(GroupCoordinatorConfig.STREAMS_GROUP_MAX_STANDBY_REPLICAS_DEFAULT)
+            ),
             assignor.lastPassedAssignmentConfigs());
     }
 
@@ -26650,6 +26792,8 @@ public class GroupMetadataManagerTest {
      */
     private Map<String, String> getDefaultAssignmentConfigs() {
         // Use the same default value as 
GroupCoordinatorConfig.STREAMS_GROUP_NUM_STANDBY_REPLICAS_DEFAULT
-        return Map.of("num.standby.replicas", 
String.valueOf(GroupCoordinatorConfig.STREAMS_GROUP_NUM_STANDBY_REPLICAS_DEFAULT));
+        return new TreeMap<>(Map.of(
+            "num.standby.replicas", 
String.valueOf(GroupCoordinatorConfig.STREAMS_GROUP_NUM_STANDBY_REPLICAS_DEFAULT)
+        ));
     }
 }
diff --git 
a/streams/test-utils/src/test/java/org/apache/kafka/streams/test/MockProcessorContextAPITest.java
 
b/streams/test-utils/src/test/java/org/apache/kafka/streams/test/MockProcessorContextAPITest.java
index e863fe8509d..1fb6ca93123 100644
--- 
a/streams/test-utils/src/test/java/org/apache/kafka/streams/test/MockProcessorContextAPITest.java
+++ 
b/streams/test-utils/src/test/java/org/apache/kafka/streams/test/MockProcessorContextAPITest.java
@@ -208,6 +208,7 @@ public class MockProcessorContextAPITest {
         assertThat(context.committed(), is(false));
     }
 
+    @SuppressWarnings("unchecked")
     @Test
     public void shouldStoreAndReturnStateStores() {
         final Processor<String, Long, Void, Void> processor = new 
Processor<>() {
diff --git 
a/tools/src/test/java/org/apache/kafka/tools/ConfigCommandIntegrationTest.java 
b/tools/src/test/java/org/apache/kafka/tools/ConfigCommandIntegrationTest.java
index c221ba05ab1..ccc356820d1 100644
--- 
a/tools/src/test/java/org/apache/kafka/tools/ConfigCommandIntegrationTest.java
+++ 
b/tools/src/test/java/org/apache/kafka/tools/ConfigCommandIntegrationTest.java
@@ -59,6 +59,7 @@ import static 
org.apache.kafka.coordinator.group.GroupConfig.CONSUMER_HEARTBEAT_
 import static 
org.apache.kafka.coordinator.group.GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG;
 import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG;
 import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG;
 import static 
org.apache.kafka.server.config.ReplicationConfigs.AUTO_LEADER_REBALANCE_ENABLE_CONFIG;
 import static 
org.apache.kafka.server.config.ServerConfigs.MESSAGE_MAX_BYTES_CONFIG;
 import static 
org.apache.kafka.server.config.ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG;
@@ -295,6 +296,75 @@ public class ConfigCommandIntegrationTest {
         assertTrue(message.contains("streams.heartbeat.interval.ms=5000 
sensitive=false synonyms={DEFAULT_CONFIG:streams.heartbeat.interval.ms=5000}"));
         assertTrue(message.contains("streams.num.standby.replicas=0 
sensitive=false synonyms={DEFAULT_CONFIG:streams.num.standby.replicas=0}"));
         assertTrue(message.contains("streams.session.timeout.ms=45000 
sensitive=false synonyms={DEFAULT_CONFIG:streams.session.timeout.ms=45000}"));
+        assertTrue(message.contains("streams.task.offset.interval.ms=60000 
sensitive=false 
synonyms={DEFAULT_CONFIG:streams.task.offset.interval.ms=60000}"));
+    }
+
+    @ClusterTest
+    public void testAlterStreamsGroupSessionTimeout() {
+        // verify session.timeout.ms
+
+        // Verify the initial config
+        Stream<String> command = Stream.concat(quorumArgs(), Stream.of(
+            "--entity-type", "groups",
+            "--entity-name", "group",
+            "--describe", "--all"));
+        String message = captureStandardOut(run(command));
+        assertTrue(message.contains("streams.session.timeout.ms=45000"));
+
+        // Should fail to set below min
+        command = Stream.concat(quorumArgs(), Stream.of(
+            "--entity-type", "groups",
+            "--entity-name", "group",
+            "--alter", "--add-config", "streams.session.timeout.ms=1"));
+        message = captureStandardErr(run(command));
+        
assertTrue(message.contains("org.apache.kafka.common.errors.InvalidConfigurationException:
 streams.session.timeout.ms must be greater than or equal to 
group.streams.min.session.timeout.ms"));
+
+        // Should fail to set above max
+        command = Stream.concat(quorumArgs(), Stream.of(
+            "--entity-type", "groups",
+            "--entity-name", "group",
+            "--alter", "--add-config", "streams.session.timeout.ms=100000"));
+        message = captureStandardErr(run(command));
+        
assertTrue(message.contains("org.apache.kafka.common.errors.InvalidConfigurationException:
 streams.session.timeout.ms must be less than or equal to 
group.streams.max.session.timeout.ms"));
+    }
+
+    @ClusterTest(serverProperties = {
+        @ClusterConfigProperty(key = 
STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG, value = "55000"),
+    })
+    public void testAlterStreamsGroupHeartbeatInterval() {
+        // verify heartbeat.interval.ms
+
+        // Verify the initial config
+        Stream<String> command = Stream.concat(quorumArgs(), Stream.of(
+            "--entity-type", "groups",
+            "--entity-name", "group",
+            "--describe", "--all"));
+        String message = captureStandardOut(run(command));
+        assertTrue(message.contains("streams.heartbeat.interval.ms=5000"));
+
+        // Should fail to set below min
+        command = Stream.concat(quorumArgs(), Stream.of(
+            "--entity-type", "groups",
+            "--entity-name", "group",
+            "--alter", "--add-config", "streams.heartbeat.interval.ms=1"));
+        message = captureStandardErr(run(command));
+        
assertTrue(message.contains("org.apache.kafka.common.errors.InvalidConfigurationException:
 streams.heartbeat.interval.ms must be greater than or equal to 
group.streams.min.heartbeat.interval.ms"));
+
+        // Should fail to set above max
+        command = Stream.concat(quorumArgs(), Stream.of(
+            "--entity-type", "groups",
+            "--entity-name", "group",
+            "--alter", "--add-config", 
"streams.heartbeat.interval.ms=100000"));
+        message = captureStandardErr(run(command));
+        
assertTrue(message.contains("org.apache.kafka.common.errors.InvalidConfigurationException:
 streams.heartbeat.interval.ms must be less than or equal to 
group.streams.max.heartbeat.interval.ms"));
+
+        // Should fail to set above session timeout
+        command = Stream.concat(quorumArgs(), Stream.of(
+            "--entity-type", "groups",
+            "--entity-name", "group",
+            "--alter", "--add-config", "streams.heartbeat.interval.ms=50000"));
+        message = captureStandardErr(run(command));
+        
assertTrue(message.contains("org.apache.kafka.common.errors.InvalidConfigurationException:
 streams.session.timeout.ms must be greater than 
streams.heartbeat.interval.ms"));
     }
 
     @ClusterTest
@@ -322,6 +392,49 @@ public class ConfigCommandIntegrationTest {
             "--describe"));
         message = captureStandardOut(run(command));
         assertTrue(message.contains("streams.num.standby.replicas=1"));
+
+        // Should fail to set above max timeout
+        command = Stream.concat(quorumArgs(), Stream.of(
+            "--entity-type", "groups",
+            "--entity-name", "group",
+            "--alter", "--add-config", "streams.num.standby.replicas=3"));
+        message = captureStandardErr(run(command));
+        assertTrue(message.contains("streams.num.standby.replicas must be less 
than or equal to group.streams.max.standby.replicas"));
+    }
+
+    @ClusterTest
+    public void testAlterStreamsGroupTaskOffsetInterval() {
+        // Verify the initial config
+        Stream<String> command = Stream.concat(quorumArgs(), Stream.of(
+            "--entity-type", "groups",
+            "--entity-name", "group",
+            "--describe", "--all"));
+        String message = captureStandardOut(run(command));
+        assertTrue(message.contains("streams.task.offset.interval.ms=60000"));
+
+        // Alter task offset interval
+        command = Stream.concat(quorumArgs(), Stream.of(
+            "--entity-type", "groups",
+            "--entity-name", "group",
+            "--alter", "--add-config", 
"streams.task.offset.interval.ms=45000"));
+        message = captureStandardOut(run(command));
+        assertEquals("Completed updating config for group group.", message);
+
+        // Verify the updated config
+        command = Stream.concat(quorumArgs(), Stream.of(
+            "--entity-type", "groups",
+            "--describe"));
+        message = captureStandardOut(run(command));
+        assertTrue(message.contains("streams.task.offset.interval.ms=45000"));
+
+        // Should fail to set below min interval
+        command = Stream.concat(quorumArgs(), Stream.of(
+            "--entity-type", "groups",
+            "--entity-name", "group",
+            "--alter", "--add-config", "streams.task.offset.interval.ms=1"));
+        message = captureStandardErr(run(command));
+        assertTrue(message.contains("streams.task.offset.interval.ms must be 
greater than or equal to group.streams.min.task.offset.interval.ms"));
+
     }
 
     private void verifyGroupConfigUpdate(List<String> alterOpts) throws 
Exception {

Reply via email to