This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 4.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.3 by this push:
     new 052e0889294 Revert "KAFKA-18652: Add `task.offset.interval.ms` config 
(#21737)" (#22020)
052e0889294 is described below

commit 052e088929458aef5c0bdaf8ba11032c74f8eb80
Author: Matthias J. Sax <[email protected]>
AuthorDate: Fri Apr 10 16:06:02 2026 -0700

    Revert "KAFKA-18652: Add `task.offset.interval.ms` config (#21737)" (#22020)
    
    This reverts commit b43d70885dbf293f0ce79db20372ce470c9f69cb.
    
    Only reverts the relevant parts to remove `task.offset.interval.ms`
    config.
    
    Reviewers: Bill Bejeck <[email protected]>, Lucas Brutschy 
<[email protected]>
---
 .../StreamsGroupHeartbeatRequestManager.java       |   1 -
 .../consumer/internals/StreamsRebalanceData.java   |  12 --
 .../scala/unit/kafka/server/KafkaApisTest.scala    |   3 +-
 .../scala/unit/kafka/server/KafkaConfigTest.scala  |   2 -
 .../server/StreamsGroupHeartbeatRequestTest.scala  |  23 ----
 .../kafka/coordinator/group/GroupConfig.java       |  30 +----
 .../coordinator/group/GroupCoordinatorConfig.java  |  37 +-----
 .../coordinator/group/GroupMetadataManager.java    |  12 +-
 .../kafka/coordinator/group/GroupConfigTest.java   |  38 ------
 .../group/GroupCoordinatorConfigTest.java          |  53 --------
 .../group/GroupMetadataManagerTest.java            | 134 +++++++--------------
 .../kafka/tools/ConfigCommandIntegrationTest.java  |  36 ------
 12 files changed, 51 insertions(+), 330 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
index d4f1b3dace8..6907e113f65 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
@@ -531,7 +531,6 @@ public class StreamsGroupHeartbeatRequestManager implements 
RequestManager {
         heartbeatRequestState.onSuccessfulAttempt(currentTimeMs);
         
heartbeatState.setEndpointInformationEpoch(data.endpointInformationEpoch());
         
streamsRebalanceData.setHeartbeatIntervalMs(data.heartbeatIntervalMs());
-        
streamsRebalanceData.setTaskOffsetIntervalMs(data.taskOffsetIntervalMs());
 
         if (data.partitionsByUserEndpoint() != null) {
             streamsRebalanceData.setPartitionsByHost(convertHostInfoMap(data));
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceData.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceData.java
index 383900355dd..a08472b6752 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceData.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceData.java
@@ -345,8 +345,6 @@ public class StreamsRebalanceData {
 
     private final AtomicInteger heartbeatIntervalMs = new AtomicInteger(-1);
 
-    private final AtomicInteger taskOffsetIntervalMs = new AtomicInteger(-1);
-
     public StreamsRebalanceData(final UUID processId,
                                 final Optional<HostInfo> endpoint,
                                 final Optional<String> rackId,
@@ -429,14 +427,4 @@ public class StreamsRebalanceData {
         return heartbeatIntervalMs.get();
     }
 
-    /** Updated whenever a heartbeat response is received from the broker. */
-    public void setTaskOffsetIntervalMs(final int taskOffsetIntervalMs) {
-        this.taskOffsetIntervalMs.set(taskOffsetIntervalMs);
-    }
-
-    /** Returns the task offset interval in milliseconds, or -1 if not yet 
set. */
-    public int taskOffsetIntervalMs() {
-        return taskOffsetIntervalMs.get();
-    }
-
 }
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index d7cbcc52a2d..4f051f54d52 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -76,7 +76,7 @@ import org.apache.kafka.common.resource.{PatternType, 
Resource, ResourcePattern,
 import org.apache.kafka.common.security.auth.{KafkaPrincipal, 
KafkaPrincipalSerde, SecurityProtocol}
 import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource
 import org.apache.kafka.common.utils.{ImplicitLinkedHashCollection, 
ProducerIdAndEpoch, SecurityUtils, Utils}
-import 
org.apache.kafka.coordinator.group.GroupConfig.{CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG,
 CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, CONSUMER_SESSION_TIMEOUT_MS_CONFIG, 
SHARE_ASSIGNMENT_INTERVAL_MS_CONFIG, SHARE_AUTO_OFFSET_RESET_CONFIG, 
SHARE_DELIVERY_COUNT_LIMIT_CONFIG, SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, 
SHARE_ISOLATION_LEVEL_CONFIG, SHARE_PARTITION_MAX_RECORD_LOCKS_CONFIG, 
SHARE_RECORD_LOCK_DURATION_MS_CONFIG, SHARE_RENEW_ACKNOWLEDGE_ENABLE_CONFIG, 
SHARE_SESSION_TIMEOUT_MS_CONFIG, S [...]
+import 
org.apache.kafka.coordinator.group.GroupConfig.{CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG,
 CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, CONSUMER_SESSION_TIMEOUT_MS_CONFIG, 
SHARE_ASSIGNMENT_INTERVAL_MS_CONFIG, SHARE_AUTO_OFFSET_RESET_CONFIG, 
SHARE_DELIVERY_COUNT_LIMIT_CONFIG, SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, 
SHARE_ISOLATION_LEVEL_CONFIG, SHARE_PARTITION_MAX_RECORD_LOCKS_CONFIG, 
SHARE_RECORD_LOCK_DURATION_MS_CONFIG, SHARE_RENEW_ACKNOWLEDGE_ENABLE_CONFIG, 
SHARE_SESSION_TIMEOUT_MS_CONFIG, S [...]
 import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig
 import org.apache.kafka.coordinator.group.{GroupConfig, GroupConfigManager, 
GroupCoordinator, GroupCoordinatorConfig}
 import org.apache.kafka.coordinator.group.streams.StreamsGroupHeartbeatResult
@@ -373,7 +373,6 @@ class KafkaApisTest extends Logging {
     cgConfigs.put(STREAMS_NUM_STANDBY_REPLICAS_CONFIG, 
GroupCoordinatorConfig.STREAMS_GROUP_NUM_STANDBY_REPLICAS_DEFAULT.toString)
     cgConfigs.put(STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG, 
GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_DEFAULT.toString)
     cgConfigs.put(STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG, "1000")
-    cgConfigs.put(STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG, 
GroupCoordinatorConfig.STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_DEFAULT.toString)
 
     when(configRepository.groupConfig(consumerGroupId)).thenReturn(cgConfigs)
 
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index 2a035f77333..cf9063a3c2b 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -1086,8 +1086,6 @@ class KafkaConfigTest {
         case 
GroupCoordinatorConfig.STREAMS_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG => 
assertPropertyInvalid(baseProperties, name, "not_a_number", -1)
         case 
GroupCoordinatorConfig.STREAMS_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_CONFIG => 
assertPropertyInvalid(baseProperties, name, "not_a_number", -1)
         case 
GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG => 
assertPropertyInvalid(baseProperties, name, "not_a_number", -1)
-        case 
GroupCoordinatorConfig.STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_CONFIG => 
assertPropertyInvalid(baseProperties, name, "not_a_number", -1)
-        case 
GroupCoordinatorConfig.STREAMS_GROUP_MIN_TASK_OFFSET_INTERVAL_MS_CONFIG => 
assertPropertyInvalid(baseProperties, name, "not_a_number", -1)
 
         /** Share coordinator configs */
         case ShareCoordinatorConfig.APPEND_LINGER_MS_CONFIG => 
assertPropertyInvalid(baseProperties, name, "not_a_number", -2, -0.5)
diff --git 
a/core/src/test/scala/unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala
index 85b04e3e3ed..80d6ab0ec68 100644
--- 
a/core/src/test/scala/unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala
+++ 
b/core/src/test/scala/unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala
@@ -703,10 +703,6 @@ class StreamsGroupHeartbeatRequestTest(cluster: 
ClusterInstance) extends GroupCo
       assertEquals(0, streamsGroupHeartbeatResponse1.standbyTasks().size(), 
"Member 1 should have no standby tasks initially")
       assertEquals(0, streamsGroupHeartbeatResponse2.standbyTasks().size(), 
"Member 2 should have no standby tasks initially")
 
-      // Verify both members picked up `task.offset.interval.ms`
-      assertEquals(60_000, 
streamsGroupHeartbeatResponse1.taskOffsetIntervalMs(), "Member 1 should pickup 
task.offset.interval.ms initially")
-      assertEquals(60_000, 
streamsGroupHeartbeatResponse2.taskOffsetIntervalMs(), "Member 2 should pickup 
task.offset.interval.ms initially")
-
       // Both members continue to send heartbeats with their assigned tasks
       TestUtils.waitUntilTrue(() => {
         streamsGroupHeartbeatResponse1 = streamsGroupHeartbeat(
@@ -757,10 +753,6 @@ class StreamsGroupHeartbeatRequestTest(cluster: 
ClusterInstance) extends GroupCo
       }
       assertEquals(0, member2StandbyTasksSize, "Member 2 should have no 
standby tasks in this configuration")
 
-      // Verify both members picked up `task.offset.interval.ms`
-      assertEquals(60_000, 
streamsGroupHeartbeatResponse1.taskOffsetIntervalMs(), "Member 1 should pickup 
task.offset.interval.ms initially")
-      assertEquals(60_000, 
streamsGroupHeartbeatResponse2.taskOffsetIntervalMs(), "Member 2 should pickup 
task.offset.interval.ms initially")
-
       // Change streams.num.standby.replicas = 1
       val groupConfigResource = new ConfigResource(ConfigResource.Type.GROUP, 
groupId)
       val alterConfigOp = new AlterConfigOp(
@@ -771,16 +763,6 @@ class StreamsGroupHeartbeatRequestTest(cluster: 
ClusterInstance) extends GroupCo
       val options = new org.apache.kafka.clients.admin.AlterConfigsOptions()
       admin.incrementalAlterConfigs(configChanges, options).all().get()
 
-      // Change streams.task.offset.interval.ms = 45000
-      val groupConfigResource2 = new ConfigResource(ConfigResource.Type.GROUP, 
groupId)
-      val alterConfigOp2 = new AlterConfigOp(
-        new ConfigEntry("streams.task.offset.interval.ms", "45000"),
-        AlterConfigOp.OpType.SET
-      )
-      val configChanges2 = Map(groupConfigResource2 -> 
List(alterConfigOp2).asJavaCollection).asJava
-      val options2 = new org.apache.kafka.clients.admin.AlterConfigsOptions()
-      admin.incrementalAlterConfigs(configChanges2, options2).all().get()
-
       // Send heartbeats to trigger rebalance after config change
       TestUtils.waitUntilTrue(() => {
         streamsGroupHeartbeatResponse1 = streamsGroupHeartbeat(
@@ -825,10 +807,6 @@ class StreamsGroupHeartbeatRequestTest(cluster: 
ClusterInstance) extends GroupCo
       val totalStandbyTasks = member1StandbyTasksNum + member2StandbyTasksNum
       assertEquals(totalActiveTasks, totalStandbyTasks, "Each active task 
should have one standby task")
 
-      // Verify both members picked up change of `task.offset.interval.ms`
-      assertEquals(45_000, 
streamsGroupHeartbeatResponse1.taskOffsetIntervalMs(), "Member 1 should pickup 
task.offset.interval.ms initially")
-      assertEquals(45_000, 
streamsGroupHeartbeatResponse2.taskOffsetIntervalMs(), "Member 2 should pickup 
task.offset.interval.ms initially")
-
     } finally {
       admin.close()
     }
@@ -1113,7 +1091,6 @@ class StreamsGroupHeartbeatRequestTest(cluster: 
ClusterInstance) extends GroupCo
         .setActiveTasks(expectedActiveTasks)
         .setStandbyTasks(List.empty.asJava)
         .setWarmupTasks(List.empty.asJava)
-        .setTaskOffsetIntervalMs(60_000)
 
       assertEquals(expectedRejoinResponse, rejoinResponse)
     } finally {
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
index 396ddcaba39..295fbe2a9e5 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
@@ -104,8 +104,6 @@ public final class GroupConfig extends AbstractConfig {
 
     public static final String STREAMS_ASSIGNOR_OFFLOAD_ENABLE_CONFIG = 
"streams.assignor.offload.enable";
 
-    public static final String STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG = 
"streams.task.offset.interval.ms";
-
     public final int consumerSessionTimeoutMs;
 
     public final int consumerHeartbeatIntervalMs;
@@ -151,8 +149,6 @@ public final class GroupConfig extends AbstractConfig {
 
     public final Optional<Boolean> streamsAssignorOffloadEnable;
 
-    public final int streamsTaskOffsetIntervalMs;
-
     public final String shareIsolationLevel;
 
     public final boolean shareRenewAcknowledgeEnable;
@@ -258,13 +254,7 @@ public final class GroupConfig extends AbstractConfig {
             
GroupCoordinatorConfig.STREAMS_GROUP_ASSIGNMENT_INTERVAL_MS_DEFAULT,
             atLeast(0),
             MEDIUM,
-            GroupCoordinatorConfig.STREAMS_GROUP_ASSIGNMENT_INTERVAL_MS_DOC)
-        .define(STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG,
-            INT,
-            
GroupCoordinatorConfig.STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_DEFAULT,
-            atLeast(1),
-            MEDIUM,
-            GroupCoordinatorConfig.STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_DOC);
+            GroupCoordinatorConfig.STREAMS_GROUP_ASSIGNMENT_INTERVAL_MS_DOC);
 
     /**
      * Mapping from GroupConfig name to its broker-level synonym config name.
@@ -292,8 +282,7 @@ public final class GroupConfig extends AbstractConfig {
         Map.entry(STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG, 
Optional.of(GroupCoordinatorConfig.STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG)),
         Map.entry(STREAMS_NUM_STANDBY_REPLICAS_CONFIG, 
Optional.of(GroupCoordinatorConfig.STREAMS_GROUP_NUM_STANDBY_REPLICAS_CONFIG)),
         Map.entry(STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG, 
Optional.of(GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG)),
-        Map.entry(STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG, 
Optional.of(GroupCoordinatorConfig.STREAMS_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG)),
-        Map.entry(STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG, 
Optional.of(GroupCoordinatorConfig.STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_CONFIG))
+        Map.entry(STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG, 
Optional.of(GroupCoordinatorConfig.STREAMS_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG))
     );
 
     /**
@@ -339,7 +328,6 @@ public final class GroupConfig extends AbstractConfig {
             Optional.of(getInt(STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG)) :
             Optional.empty();
         this.streamsAssignorOffloadEnable = Optional.empty();
-        this.streamsTaskOffsetIntervalMs = 
getInt(STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG);
         this.shareIsolationLevel = getString(SHARE_ISOLATION_LEVEL_CONFIG);
         this.shareRenewAcknowledgeEnable = 
getBoolean(SHARE_RENEW_ACKNOWLEDGE_ENABLE_CONFIG);
     }
@@ -383,7 +371,6 @@ public final class GroupConfig extends AbstractConfig {
         int streamsHeartbeatIntervalMs = (Integer) 
valueMaps.get(STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG);
         int streamsNumStandbyReplicas = (Integer) 
valueMaps.get(STREAMS_NUM_STANDBY_REPLICAS_CONFIG);
         int streamsAssignmentIntervalMs = (Integer) 
valueMaps.get(STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG);
-        int streamsTaskOffsetIntervalMs = (Integer) 
valueMaps.get(STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG);
         if (consumerHeartbeatInterval < 
groupCoordinatorConfig.consumerGroupMinHeartbeatIntervalMs()) {
             throw new 
InvalidConfigurationException(CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG + " must be 
greater than or equal to " +
                 
GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG);
@@ -499,10 +486,6 @@ public final class GroupConfig extends AbstractConfig {
                     
GroupCoordinatorConfig.STREAMS_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_CONFIG);
             }
         }
-        if (streamsTaskOffsetIntervalMs < 
groupCoordinatorConfig.streamsGroupMinTaskOffsetIntervalMs()) {
-            throw new 
InvalidConfigurationException(STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG + " must 
be greater than or equal to " +
-                
GroupCoordinatorConfig.STREAMS_GROUP_MIN_TASK_OFFSET_INTERVAL_MS_CONFIG);
-        }
         if (consumerSessionTimeout <= consumerHeartbeatInterval) {
             throw new 
InvalidConfigurationException(CONSUMER_SESSION_TIMEOUT_MS_CONFIG + " must be 
greater than " +
                 CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG);
@@ -604,8 +587,6 @@ public final class GroupConfig extends AbstractConfig {
         clampToRange(props, groupId, STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG,
             groupCoordinatorConfig.streamsGroupMinAssignmentIntervalMs(),
             groupCoordinatorConfig.streamsGroupMaxAssignmentIntervalMs());
-        clampToMin(props, groupId, STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG,
-            groupCoordinatorConfig.streamsGroupMinTaskOffsetIntervalMs());
 
         // Verify that clamping did not break the session > heartbeat 
invariant.
         checkSessionExceedsHeartbeat(props, groupId,
@@ -883,13 +864,6 @@ public final class GroupConfig extends AbstractConfig {
         return streamsAssignorOffloadEnable;
     }
 
-    /**
-     * The task offset reporting interval.
-     */
-    public int streamsTaskOffsetIntervalMs() {
-        return streamsTaskOffsetIntervalMs;
-    }
-
     /**
      * The share group isolation level.
      */
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
index 5000678d925..592b36b9130 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
@@ -374,14 +374,6 @@ public class GroupCoordinatorConfig {
     public static final String STREAMS_GROUP_ASSIGNOR_OFFLOAD_ENABLE_DOC = 
"Whether to offload streams group assignment to a group coordinator background 
thread.";
     public static final boolean STREAMS_GROUP_ASSIGNOR_OFFLOAD_ENABLE_DEFAULT 
= true;
 
-    public static final String STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_CONFIG = 
"group.streams.task.offset.interval.ms";
-    public static final int STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_DEFAULT = 
60000;
-    public static final String STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_DOC = 
"The interval in which the warmup task changelog offsets on a client are 
updated on the broker. The offsets are sent with the next heartbeat after this 
time has passed.";
-
-    public static final String 
STREAMS_GROUP_MIN_TASK_OFFSET_INTERVAL_MS_CONFIG = 
"group.streams.min.task.offset.interval.ms";
-    public static final int STREAMS_GROUP_MIN_TASK_OFFSET_INTERVAL_MS_DEFAULT 
= 15000;
-    public static final String STREAMS_GROUP_MIN_TASK_OFFSET_INTERVAL_MS_DOC = 
"The minimum allowed value for the group-level configuration of " + 
GroupConfig.STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG;
-
     public static final Set<String> RECONFIGURABLE_CONFIGS = Set.of(
         CACHED_BUFFER_MAX_BYTES_CONFIG,
         CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG,
@@ -460,9 +452,7 @@ public class GroupCoordinatorConfig {
         .define(STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, INT, 
STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_DEFAULT, atLeast(0), MEDIUM, 
STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_DOC)
         .define(STREAMS_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, INT, 
STREAMS_GROUP_ASSIGNMENT_INTERVAL_MS_DEFAULT, atLeast(0), MEDIUM, 
STREAMS_GROUP_ASSIGNMENT_INTERVAL_MS_DOC)
         .define(STREAMS_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG, INT, 
STREAMS_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_DEFAULT, atLeast(0), MEDIUM, 
STREAMS_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_DOC)
-        .define(STREAMS_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_CONFIG, INT, 
STREAMS_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_DEFAULT, atLeast(0), MEDIUM, 
STREAMS_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_DOC)
-        .define(STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_CONFIG, INT, 
STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM, 
STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_DOC)
-        .define(STREAMS_GROUP_MIN_TASK_OFFSET_INTERVAL_MS_CONFIG, INT, 
STREAMS_GROUP_MIN_TASK_OFFSET_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM, 
STREAMS_GROUP_MIN_TASK_OFFSET_INTERVAL_MS_DOC);
+        .define(STREAMS_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_CONFIG, INT, 
STREAMS_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_DEFAULT, atLeast(0), MEDIUM, 
STREAMS_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_DOC);
 
 
     /**
@@ -523,8 +513,6 @@ public class GroupCoordinatorConfig {
     private final int streamsGroupInitialRebalanceDelayMs;
     private final int streamsGroupMinAssignmentIntervalMs;
     private final int streamsGroupMaxAssignmentIntervalMs;
-    private final int streamsGroupTaskOffsetIntervalMs;
-    private final int streamsGroupMinTaskOffsetIntervalMs;
 
     private final AbstractConfig config;
 
@@ -587,8 +575,6 @@ public class GroupCoordinatorConfig {
         this.streamsGroupInitialRebalanceDelayMs = 
config.getInt(GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG);
         this.streamsGroupMinAssignmentIntervalMs = 
config.getInt(GroupCoordinatorConfig.STREAMS_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG);
         this.streamsGroupMaxAssignmentIntervalMs = 
config.getInt(GroupCoordinatorConfig.STREAMS_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_CONFIG);
-        this.streamsGroupTaskOffsetIntervalMs = 
config.getInt(GroupCoordinatorConfig.STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_CONFIG);
-        this.streamsGroupMinTaskOffsetIntervalMs = 
config.getInt(GroupCoordinatorConfig.STREAMS_GROUP_MIN_TASK_OFFSET_INTERVAL_MS_CONFIG);
         this.config = config;
 
         // New group coordinator configs validation.
@@ -688,10 +674,6 @@ public class GroupCoordinatorConfig {
 
         require(streamsGroupNumStandbyReplicas <= 
streamsGroupMaxStandbyReplicas,
             String.format("%s must be less than or equal to %s", 
STREAMS_GROUP_NUM_STANDBY_REPLICAS_CONFIG, 
STREAMS_GROUP_MAX_STANDBY_REPLICAS_CONFIG));
-
-        require(streamsGroupTaskOffsetIntervalMs >= 
streamsGroupMinTaskOffsetIntervalMs,
-            String.format("%s must be greater than or equal to %s", 
STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_CONFIG, 
STREAMS_GROUP_MIN_TASK_OFFSET_INTERVAL_MS_CONFIG));
-
     }
 
     /**
@@ -890,8 +872,7 @@ public class GroupCoordinatorConfig {
             GroupConfig.STREAMS_SESSION_TIMEOUT_MS_CONFIG, 
streamsGroupSessionTimeoutMs(),
             GroupConfig.STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG, 
streamsGroupHeartbeatIntervalMs(),
             GroupConfig.STREAMS_NUM_STANDBY_REPLICAS_CONFIG, 
streamsGroupNumStandbyReplicas(),
-            GroupConfig.STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG, 
streamsGroupInitialRebalanceDelayMs(),
-            GroupConfig.STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG, 
streamsGroupTaskOffsetIntervalMs());
+            GroupConfig.STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG, 
streamsGroupInitialRebalanceDelayMs());
     }
 
     /**
@@ -1328,18 +1309,4 @@ public class GroupCoordinatorConfig {
     public boolean streamsGroupAssignorOffloadEnable() {
         return STREAMS_GROUP_ASSIGNOR_OFFLOAD_ENABLE_DEFAULT;
     }
-
-    /**
-     * The task offset interval for streams groups.
-     */
-    public int streamsGroupTaskOffsetIntervalMs() {
-        return streamsGroupTaskOffsetIntervalMs;
-    }
-
-    /**
-     * The minimum task offset interval for streams groups.
-     */
-    public int streamsGroupMinTaskOffsetIntervalMs() {
-        return streamsGroupMinTaskOffsetIntervalMs;
-    }
 }
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index 09714ab28b8..f9230452d54 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -2140,8 +2140,7 @@ public class GroupMetadataManager {
         StreamsGroupHeartbeatResponseData response = new 
StreamsGroupHeartbeatResponseData()
             .setMemberId(updatedMember.memberId())
             .setMemberEpoch(updatedMember.memberEpoch())
-            .setHeartbeatIntervalMs(streamsGroupHeartbeatIntervalMs(groupId))
-            
.setTaskOffsetIntervalMs(streamsGroupTaskOffsetIntervalMs(groupId));
+            .setHeartbeatIntervalMs(streamsGroupHeartbeatIntervalMs(groupId));
         // The assignment is only provided in the following cases:
         // 1. The member is joining.
         // 2. The member's assignment has been updated.
@@ -8867,15 +8866,6 @@ public class GroupMetadataManager {
             .orElse(config.streamsGroupAssignorOffloadEnable());
     }
 
-    /**
-     * Get the task offset interval of the provided streams group.
-     */
-    private int streamsGroupTaskOffsetIntervalMs(String groupId) {
-        Optional<GroupConfig> groupConfig = 
groupConfigManager.groupConfig(groupId);
-        return groupConfig.map(GroupConfig::streamsTaskOffsetIntervalMs)
-            .orElse(config.streamsGroupTaskOffsetIntervalMs());
-    }
-
     /**
      * Get the initial rebalance delay of the provided streams group.
      */
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
index d7208fb8e5c..73195ab825a 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
@@ -51,7 +51,6 @@ import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.STREAMS_
 import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.STREAMS_GROUP_MAX_STANDBY_REPLICAS_DEFAULT;
 import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.STREAMS_GROUP_MIN_HEARTBEAT_INTERVAL_MS_DEFAULT;
 import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_DEFAULT;
-import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.STREAMS_GROUP_MIN_TASK_OFFSET_INTERVAL_MS_DEFAULT;
 import static 
org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig.SHARE_GROUP_MAX_DELIVERY_COUNT_LIMIT_DEFAULT;
 import static 
org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig.SHARE_GROUP_MAX_PARTITION_MAX_RECORD_LOCKS_DEFAULT;
 import static 
org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig.SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_DEFAULT;
@@ -117,8 +116,6 @@ public class GroupConfigTest {
                 assertPropertyInvalid(name, "not_a_number", "-1", "1.0");
             } else if 
(GroupConfig.STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG.equals(name)) {
                 assertPropertyInvalid(name, "not_a_number", "-1", "1.2");
-            } else if 
(GroupConfig.STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG.equals(name)) {
-                assertPropertyInvalid(name, "not_a_number", "1.0");
             } else {
                 assertPropertyInvalid(name, "not_a_number", "-0.1");
             }
@@ -307,11 +304,6 @@ public class GroupConfigTest {
         doTestInvalidProps(props, InvalidConfigurationException.class);
         props = createValidGroupConfig();
 
-        // Check for invalid streamsTaskOffsetIntervalMs, < MIN
-        props.put(GroupConfig.STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG, "1000");
-        doTestInvalidProps(props, InvalidConfigurationException.class);
-        props = createValidGroupConfig();
-
         // Check for invalid shareIsolationLevel.
         props.put(GroupConfig.SHARE_ISOLATION_LEVEL_CONFIG, "read_commit");
         doTestInvalidProps(props, ConfigException.class);
@@ -354,7 +346,6 @@ public class GroupConfigTest {
         defaultValue.put(GroupConfig.STREAMS_NUM_STANDBY_REPLICAS_CONFIG, "1");
         
defaultValue.put(GroupConfig.STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG, "3000");
         defaultValue.put(GroupConfig.STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG, 
"1250");
-        defaultValue.put(GroupConfig.STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG, 
"30000");
         defaultValue.put(GroupConfig.SHARE_RENEW_ACKNOWLEDGE_ENABLE_CONFIG, 
"true");
 
         Properties props = new Properties();
@@ -377,7 +368,6 @@ public class GroupConfigTest {
         assertEquals(1, 
config.getInt(GroupConfig.STREAMS_NUM_STANDBY_REPLICAS_CONFIG));
         assertEquals(3000, 
config.getInt(GroupConfig.STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG));
         assertEquals(1250, 
config.getInt(GroupConfig.STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG));
-        assertEquals(30000, 
config.getInt(GroupConfig.STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG));
         assertEquals(true, 
config.getBoolean(GroupConfig.SHARE_RENEW_ACKNOWLEDGE_ENABLE_CONFIG));
     }
 
@@ -549,19 +539,6 @@ public class GroupConfigTest {
         );
     }
 
-    /**
-     * Data source for configs with min-only evaluation (no max bound enforced 
by evaluate).
-     * Each entry: (configKey, tooLow, expectedMax).
-     */
-    private static Stream<Arguments> minBoundedConfigs() {
-        return Stream.of(
-            Arguments.of(
-                GroupConfig.STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG,
-                1000, STREAMS_GROUP_MIN_TASK_OFFSET_INTERVAL_MS_DEFAULT
-            )
-        );
-    }
-
     @ParameterizedTest(name = "testEvaluateValueAboveMaxIsCapped[{0}]")
     @MethodSource("rangeBoundedConfigs")
     public void testEvaluateValueAboveMaxIsCapped(
@@ -613,20 +590,6 @@ public class GroupConfigTest {
         assertEquals(expectedMax, result.get(key));
     }
 
-    @ParameterizedTest(name = 
"testEvaluateMinBoundedValueBelowMinIsCapped[{0}]")
-    @MethodSource("minBoundedConfigs")
-    public void testEvaluateMinBoundedValueBelowMinIsCapped(
-        String key,
-        int tooLow,
-        int expectedMin
-    ) {
-        Properties props = new Properties();
-        props.put(key, tooLow);
-        Properties result = GroupConfig.evaluate(props, "test-group",
-            GroupCoordinatorConfig.fromProps(new HashMap<>()), 
ShareGroupConfig.fromProps(new HashMap<>()));
-        assertEquals(expectedMin, result.get(key));
-    }
-
     @Test
     public void testAllGroupConfigSynonyms() {
         // Every GroupConfig entry should have an entry in 
ALL_GROUP_CONFIG_SYNONYMS.
@@ -670,7 +633,6 @@ public class GroupConfigTest {
         props.put(GroupConfig.STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG, "6000");
         props.put(GroupConfig.STREAMS_NUM_STANDBY_REPLICAS_CONFIG, "1");
         props.put(GroupConfig.STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG, 
"3000");
-        props.put(GroupConfig.STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG, "45000");
         props.put(GroupConfig.SHARE_RENEW_ACKNOWLEDGE_ENABLE_CONFIG, "true");
         return props;
     }
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
index dd56d1ed994..3444e7cd7c6 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
@@ -544,25 +544,6 @@ public class GroupCoordinatorConfigTest {
         
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG,
 -1);
         assertEquals("Invalid value -1 for configuration 
group.streams.initial.rebalance.delay.ms: Value must be at least 0",
             assertThrows(ConfigException.class, () -> 
createConfig(configs)).getMessage());
-
-        // group.streams.task.offset.interval.ms
-
-        // must be positive
-        configs.clear();
-        
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_CONFIG,
 0);
-        assertEquals("Invalid value 0 for configuration 
group.streams.task.offset.interval.ms: Value must be at least 1",
-            assertThrows(ConfigException.class, () -> 
createConfig(configs)).getMessage());
-
-        // cannot be smaller than MIN
-        configs.clear();
-        
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_CONFIG,
 GroupCoordinatorConfig.STREAMS_GROUP_MIN_TASK_OFFSET_INTERVAL_MS_DEFAULT - 1);
-        assertEquals("group.streams.task.offset.interval.ms must be greater 
than or equal to group.streams.min.task.offset.interval.ms",
-            assertThrows(IllegalArgumentException.class, () -> 
createConfig(configs)).getMessage());
-
-        // can be MIN
-        configs.clear();
-        
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_CONFIG,
 GroupCoordinatorConfig.STREAMS_GROUP_MIN_TASK_OFFSET_INTERVAL_MS_DEFAULT);
-        createConfig(configs);
     }
 
     @Test
@@ -684,40 +665,6 @@ public class GroupCoordinatorConfigTest {
         assertEquals(7000, config.streamsGroupInitialRebalanceDelayMs());
     }
 
-    @Test
-    public void testStreamsGroupTaskOffsetIntervalDefaultValue() {
-        Map<String, Object> configs = new HashMap<>();
-        GroupCoordinatorConfig config = createConfig(configs);
-        assertEquals(60000, config.streamsGroupTaskOffsetIntervalMs());
-        
assertEquals(GroupCoordinatorConfig.STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_DEFAULT,
-            config.streamsGroupTaskOffsetIntervalMs());
-    }
-
-    @Test
-    public void testStreamsGroupTaskOffsetIntervalCustomValue() {
-        Map<String, Object> configs = new HashMap<>();
-        
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_CONFIG,
 45000);
-        GroupCoordinatorConfig config = createConfig(configs);
-        assertEquals(45000, config.streamsGroupTaskOffsetIntervalMs());
-    }
-
-    @Test
-    public void testStreamsGroupMinTaskOffsetIntervalDefaultValue() {
-        Map<String, Object> configs = new HashMap<>();
-        GroupCoordinatorConfig config = createConfig(configs);
-        assertEquals(15000, config.streamsGroupMinTaskOffsetIntervalMs());
-        
assertEquals(GroupCoordinatorConfig.STREAMS_GROUP_MIN_TASK_OFFSET_INTERVAL_MS_DEFAULT,
-            config.streamsGroupMinTaskOffsetIntervalMs());
-    }
-
-    @Test
-    public void testStreamsGroupMinTaskOffsetIntervalCustomValue() {
-        Map<String, Object> configs = new HashMap<>();
-        
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_MIN_TASK_OFFSET_INTERVAL_MS_CONFIG,
 20000);
-        GroupCoordinatorConfig config = createConfig(configs);
-        assertEquals(20000, config.streamsGroupMinTaskOffsetIntervalMs());
-    }
-
     public static GroupCoordinatorConfig createConfig(Map<String, Object> 
configs) {
         return new GroupCoordinatorConfig(new AbstractConfig(
             GroupCoordinatorConfig.CONFIG_DEF,
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index 306128837ff..dd751f607f2 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -206,7 +206,6 @@ import static 
org.apache.kafka.coordinator.group.GroupConfig.SHARE_SESSION_TIMEO
 import static 
org.apache.kafka.coordinator.group.GroupConfig.STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG;
 import static 
org.apache.kafka.coordinator.group.GroupConfig.STREAMS_NUM_STANDBY_REPLICAS_CONFIG;
 import static 
org.apache.kafka.coordinator.group.GroupConfig.STREAMS_SESSION_TIMEOUT_MS_CONFIG;
-import static 
org.apache.kafka.coordinator.group.GroupConfig.STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG;
 import static 
org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newShareGroupStatePartitionMetadataRecord;
 import static 
org.apache.kafka.coordinator.group.GroupMetadataManager.EMPTY_RESULT;
 import static 
org.apache.kafka.coordinator.group.GroupMetadataManager.appendGroupMetadataErrorToResponseError;
@@ -1208,8 +1207,7 @@ public class GroupMetadataManagerTest {
                         .setPartitions(List.of(0, 1, 2))))
                 .setStandbyTasks(List.of())
                 .setWarmupTasks(List.of())
-                .setStatus(List.of())
-                .setTaskOffsetIntervalMs(60_000),
+                .setStatus(List.of()),
             result.response().data()
         );
     }
@@ -17582,8 +17580,7 @@ public class GroupMetadataManagerTest {
                 ))
                 .setStandbyTasks(List.of())
                 .setWarmupTasks(List.of())
-                .setStatus(List.of())
-                .setTaskOffsetIntervalMs(60_000),
+                .setStatus(List.of()),
             result.response().data()
         );
 
@@ -17677,8 +17674,7 @@ public class GroupMetadataManagerTest {
                 ))
                 .setStandbyTasks(List.of())
                 .setWarmupTasks(List.of())
-                .setStatus(List.of())
-                .setTaskOffsetIntervalMs(60_000),
+                .setStatus(List.of()),
             result.response().data()
         );
 
@@ -17771,8 +17767,7 @@ public class GroupMetadataManagerTest {
                 .setWarmupTasks(List.of())
                 .setStatus(List.of(new 
StreamsGroupHeartbeatResponseData.Status()
                     .setStatusCode(Status.MISSING_SOURCE_TOPICS.code())
-                    .setStatusDetail("Source topics bar are missing.")))
-                .setTaskOffsetIntervalMs(60_000),
+                    .setStatusDetail("Source topics bar are missing."))),
             result.response().data()
         );
 
@@ -17864,8 +17859,7 @@ public class GroupMetadataManagerTest {
                 .setWarmupTasks(List.of())
                 .setStatus(List.of(new 
StreamsGroupHeartbeatResponseData.Status()
                     .setStatusCode(Status.MISSING_INTERNAL_TOPICS.code())
-                    .setStatusDetail("Internal topics are missing: bar")))
-                .setTaskOffsetIntervalMs(60_000),
+                    .setStatusDetail("Internal topics are missing: bar"))),
             result.response().data()
         );
 
@@ -17954,8 +17948,7 @@ public class GroupMetadataManagerTest {
                 .setWarmupTasks(List.of())
                 .setStatus(List.of(new 
StreamsGroupHeartbeatResponseData.Status()
                     
.setStatusCode(Status.INCORRECTLY_PARTITIONED_TOPICS.code())
-                    .setStatusDetail("Following topics do not have the same 
number of partitions: [{bar=3, foo=6}]")))
-                .setTaskOffsetIntervalMs(60_000),
+                    .setStatusDetail("Following topics do not have the same 
number of partitions: [{bar=3, foo=6}]"))),
             result.response().data()
         );
 
@@ -18061,8 +18054,7 @@ public class GroupMetadataManagerTest {
                 .setWarmupTasks(List.of())
                 .setStatus(List.of(new 
StreamsGroupHeartbeatResponseData.Status()
                     .setStatusCode(Status.STALE_TOPOLOGY.code())
-                    .setStatusDetail("The member's topology epoch 0 is behind 
the group's topology epoch 1.")))
-                .setTaskOffsetIntervalMs(60_000),
+                    .setStatusDetail("The member's topology epoch 0 is behind 
the group's topology epoch 1."))),
             result.response().data()
         );
 
@@ -18164,8 +18156,7 @@ public class GroupMetadataManagerTest {
                     new StreamsGroupHeartbeatResponseData.Status()
                         .setStatusCode(Status.SHUTDOWN_APPLICATION.code())
                         .setStatusDetail(statusDetail)
-                ))
-                .setTaskOffsetIntervalMs(60_000),
+                )),
             result1.response().data()
         );
         assertRecordsEquals(List.of(), result1.records());
@@ -18186,8 +18177,7 @@ public class GroupMetadataManagerTest {
                     new StreamsGroupHeartbeatResponseData.Status()
                         .setStatusCode(Status.SHUTDOWN_APPLICATION.code())
                         .setStatusDetail(statusDetail)
-                ))
-                .setTaskOffsetIntervalMs(60_000),
+                )),
             result2.response().data()
         );
 
@@ -18280,8 +18270,7 @@ public class GroupMetadataManagerTest {
                     new StreamsGroupHeartbeatResponseData.Status()
                         .setStatusCode(Status.SHUTDOWN_APPLICATION.code())
                         .setStatusDetail(statusDetail)
-                ))
-                .setTaskOffsetIntervalMs(60_000),
+                )),
             result2.response().data()
         );
     }
@@ -18361,8 +18350,7 @@ public class GroupMetadataManagerTest {
                 ))
                 .setStandbyTasks(List.of())
                 .setWarmupTasks(List.of())
-                .setStatus(List.of())
-                .setTaskOffsetIntervalMs(60_000),
+                .setStatus(List.of()),
             result.response().data()
         );
 
@@ -18491,8 +18479,7 @@ public class GroupMetadataManagerTest {
                 ))
                 .setStandbyTasks(List.of())
                 .setWarmupTasks(List.of())
-                .setStatus(List.of())
-                .setTaskOffsetIntervalMs(60_000),
+                .setStatus(List.of()),
             result.response().data()
         );
 
@@ -18640,8 +18627,7 @@ public class GroupMetadataManagerTest {
                 .setActiveTasks(List.of())
                 .setStandbyTasks(List.of())
                 .setWarmupTasks(List.of())
-                .setStatus(List.of())
-                .setTaskOffsetIntervalMs(60_000),
+                .setStatus(List.of()),
 
             result.response().data()
         );
@@ -18769,8 +18755,7 @@ public class GroupMetadataManagerTest {
                         .setPartitions(List.of(0, 1))))
                 .setStandbyTasks(List.of())
                 .setWarmupTasks(List.of())
-                .setStatus(List.of())
-                .setTaskOffsetIntervalMs(60_000),
+                .setStatus(List.of()),
             result.response().data()
         );
 
@@ -18786,8 +18771,7 @@ public class GroupMetadataManagerTest {
                 .setMemberId(memberId)
                 .setMemberEpoch(2)
                 .setHeartbeatIntervalMs(5000)
-                .setStatus(List.of())
-                .setTaskOffsetIntervalMs(60_000),
+                .setStatus(List.of()),
             result.response().data()
         );
     }
@@ -18840,8 +18824,7 @@ public class GroupMetadataManagerTest {
                         .setPartitions(List.of(0, 1))))
                 .setStandbyTasks(List.of())
                 .setWarmupTasks(List.of())
-                .setStatus(List.of())
-                .setTaskOffsetIntervalMs(60_000),
+                .setStatus(List.of()),
             result.response().data()
         );
 
@@ -18857,8 +18840,7 @@ public class GroupMetadataManagerTest {
                 .setMemberId(memberId)
                 .setMemberEpoch(2)
                 .setHeartbeatIntervalMs(5000)
-                .setStatus(List.of())
-                .setTaskOffsetIntervalMs(60_000),
+                .setStatus(List.of()),
             result.response().data()
         );
     }
@@ -18910,8 +18892,7 @@ public class GroupMetadataManagerTest {
                     new StreamsGroupHeartbeatResponseData.Status()
                         .setStatusCode(Status.ASSIGNMENT_DELAYED.code())
                         .setStatusDetail("Assignment delayed due to the 
configured initial rebalance delay.")
-                ))
-                .setTaskOffsetIntervalMs(60_000),
+                )),
             result.response().data()
         );
 
@@ -18940,8 +18921,7 @@ public class GroupMetadataManagerTest {
                         .setPartitions(List.of(0, 1))))
                 .setStandbyTasks(List.of())
                 .setWarmupTasks(List.of())
-                .setStatus(List.of())
-                .setTaskOffsetIntervalMs(60_000),
+                .setStatus(List.of()),
             result.response().data()
         );
     }
@@ -19108,8 +19088,7 @@ public class GroupMetadataManagerTest {
                 .setActiveTasks(List.of())
                 .setStandbyTasks(List.of())
                 .setWarmupTasks(List.of())
-                .setStatus(List.of())
-                .setTaskOffsetIntervalMs(60_000),
+                .setStatus(List.of()),
             result.response().data()
         );
 
@@ -19151,8 +19130,7 @@ public class GroupMetadataManagerTest {
                 ))
                 .setStandbyTasks(List.of())
                 .setWarmupTasks(List.of())
-                .setStatus(List.of())
-                .setTaskOffsetIntervalMs(60_000),
+                .setStatus(List.of()),
             result.response().data()
         );
 
@@ -19198,8 +19176,7 @@ public class GroupMetadataManagerTest {
                 ))
                 .setStandbyTasks(List.of())
                 .setWarmupTasks(List.of())
-                .setStatus(List.of())
-                .setTaskOffsetIntervalMs(60_000),
+                .setStatus(List.of()),
             result.response().data()
         );
 
@@ -19233,8 +19210,7 @@ public class GroupMetadataManagerTest {
                 .setMemberId(memberId3)
                 .setMemberEpoch(11)
                 .setHeartbeatIntervalMs(5000)
-                .setStatus(List.of())
-                .setTaskOffsetIntervalMs(60_000),
+                .setStatus(List.of()),
             result.response().data()
         );
 
@@ -19274,8 +19250,7 @@ public class GroupMetadataManagerTest {
                 .setMemberId(memberId1)
                 .setMemberEpoch(11)
                 .setHeartbeatIntervalMs(5000)
-                .setStatus(List.of())
-                .setTaskOffsetIntervalMs(60_000),
+                .setStatus(List.of()),
             result.response().data()
         );
 
@@ -19305,8 +19280,7 @@ public class GroupMetadataManagerTest {
                 .setMemberId(memberId2)
                 .setMemberEpoch(10)
                 .setHeartbeatIntervalMs(5000)
-                .setStatus(List.of())
-                .setTaskOffsetIntervalMs(60_000),
+                .setStatus(List.of()),
             result.response().data()
         );
 
@@ -19333,8 +19307,7 @@ public class GroupMetadataManagerTest {
                         .setPartitions(List.of(1))))
                 .setStandbyTasks(List.of())
                 .setWarmupTasks(List.of())
-                .setStatus(List.of())
-                .setTaskOffsetIntervalMs(60_000),
+                .setStatus(List.of()),
             result.response().data()
         );
 
@@ -19365,8 +19338,7 @@ public class GroupMetadataManagerTest {
                 .setMemberId(memberId3)
                 .setMemberEpoch(11)
                 .setHeartbeatIntervalMs(5000)
-                .setStatus(List.of())
-                .setTaskOffsetIntervalMs(60_000),
+                .setStatus(List.of()),
             result.response().data()
         );
 
@@ -19409,8 +19381,7 @@ public class GroupMetadataManagerTest {
                 ))
                 .setStandbyTasks(List.of())
                 .setWarmupTasks(List.of())
-                .setStatus(List.of())
-                .setTaskOffsetIntervalMs(60_000),
+                .setStatus(List.of()),
             result.response().data()
         );
 
@@ -19457,8 +19428,7 @@ public class GroupMetadataManagerTest {
                         .setPartitions(List.of(1))))
                 .setStandbyTasks(List.of())
                 .setWarmupTasks(List.of())
-                .setStatus(List.of())
-                .setTaskOffsetIntervalMs(60_000),
+                .setStatus(List.of()),
             result.response().data()
         );
 
@@ -19669,8 +19639,7 @@ public class GroupMetadataManagerTest {
                 ))
                 .setStandbyTasks(List.of())
                 .setWarmupTasks(List.of())
-                .setStatus(List.of())
-                .setTaskOffsetIntervalMs(60_000),
+                .setStatus(List.of()),
             result.response().data()
         );
 
@@ -19806,8 +19775,7 @@ public class GroupMetadataManagerTest {
                 ))
                 .setStandbyTasks(List.of())
                 .setWarmupTasks(List.of())
-                .setStatus(List.of())
-                .setTaskOffsetIntervalMs(60_000),
+                .setStatus(List.of()),
             result.response().data()
         );
 
@@ -20048,8 +20016,7 @@ public class GroupMetadataManagerTest {
                         .setPartitions(List.of(0, 1, 2))))
                 .setStandbyTasks(List.of())
                 .setWarmupTasks(List.of())
-                .setStatus(List.of())
-                .setTaskOffsetIntervalMs(60_000),
+                .setStatus(List.of()),
             result.response().data()
         );
 
@@ -20088,8 +20055,7 @@ public class GroupMetadataManagerTest {
                 .setActiveTasks(List.of())
                 .setStandbyTasks(List.of())
                 .setWarmupTasks(List.of())
-                .setStatus(List.of())
-                .setTaskOffsetIntervalMs(60_000),
+                .setStatus(List.of()),
             result.response().data()
         );
 
@@ -20118,8 +20084,7 @@ public class GroupMetadataManagerTest {
                         .setPartitions(List.of(0, 1))))
                 .setStandbyTasks(List.of())
                 .setWarmupTasks(List.of())
-                .setStatus(List.of())
-                .setTaskOffsetIntervalMs(60_000),
+                .setStatus(List.of()),
             result.response().data()
         );
 
@@ -20151,8 +20116,7 @@ public class GroupMetadataManagerTest {
                 .setMemberEpoch(3)
                 .setHeartbeatIntervalMs(5000)
                 .setEndpointInformationEpoch(0)
-                .setStatus(List.of())
-                .setTaskOffsetIntervalMs(60_000),
+                .setStatus(List.of()),
             result.response().data()
         );
 
@@ -20217,8 +20181,7 @@ public class GroupMetadataManagerTest {
                         .setPartitions(List.of(0, 1, 2))))
                 .setStandbyTasks(List.of())
                 .setWarmupTasks(List.of())
-                .setStatus(List.of())
-                .setTaskOffsetIntervalMs(60_000),
+                .setStatus(List.of()),
             result.response().data()
         );
 
@@ -20257,8 +20220,7 @@ public class GroupMetadataManagerTest {
                 .setActiveTasks(List.of())
                 .setStandbyTasks(List.of())
                 .setWarmupTasks(List.of())
-                .setStatus(List.of())
-                .setTaskOffsetIntervalMs(60_000),
+                .setStatus(List.of()),
             result.response().data()
         );
 
@@ -20286,8 +20248,7 @@ public class GroupMetadataManagerTest {
                         .setPartitions(List.of(0, 1))))
                 .setStandbyTasks(List.of())
                 .setWarmupTasks(List.of())
-                .setStatus(List.of())
-                .setTaskOffsetIntervalMs(60_000),
+                .setStatus(List.of()),
             result.response().data()
         );
 
@@ -20482,8 +20443,7 @@ public class GroupMetadataManagerTest {
                         .setStandbyTasks(List.of())
                         .setWarmupTasks(List.of())
                         
.setPartitionsByUserEndpoint(List.of(expectedEndpointToPartitions))
-                        .setStatus(List.of())
-                        .setTaskOffsetIntervalMs(60_000),
+                        .setStatus(List.of()),
                 result.response().data()
         );
 
@@ -20661,8 +20621,7 @@ public class GroupMetadataManagerTest {
                         .setPartitions(List.of(0, 1, 2))))
                 .setStandbyTasks(List.of())
                 .setWarmupTasks(List.of())
-                .setStatus(List.of())
-                .setTaskOffsetIntervalMs(60_000),
+                .setStatus(List.of()),
             result.response().data()
         );
 
@@ -21476,7 +21435,7 @@ public class GroupMetadataManagerTest {
                     .setWarmupTasks(List.of()));
         assertEquals(2, result.response().data().memberEpoch());
 
-        // Verify default heartbeat interval, session timeout, 
num.standby.replicas, task.offset.interval before config update.
+        // Verify default heartbeat interval, session timeout, and 
num.standby.replicas before config update.
         
assertEquals(GroupCoordinatorConfig.STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT,
             result.response().data().heartbeatIntervalMs());
         context.assertSessionTimeout(groupId, memberId,
@@ -21495,12 +21454,10 @@ public class GroupMetadataManagerTest {
         // Dynamic update group config with out-of-range values.
         // Session timeout 70000 exceeds max 60000; heartbeat interval 1 is 
below min 5000;
         // num standby replicas 100 exceeds max 2.
-        // task offset interval 100 exceeds min 15000.
         Properties newGroupConfig = new Properties();
         newGroupConfig.put(STREAMS_SESSION_TIMEOUT_MS_CONFIG, 70000);
         newGroupConfig.put(STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG, 1);
         newGroupConfig.put(STREAMS_NUM_STANDBY_REPLICAS_CONFIG, 100);
-        newGroupConfig.put(STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG, 100);
         context.updateGroupConfig(groupId, newGroupConfig);
 
         // Session timer is rescheduled on second heartbeat, new assignment 
with evaluated parameter is calculated.
@@ -21519,8 +21476,7 @@ public class GroupMetadataManagerTest {
         context.assertSessionTimeout(groupId, memberId,
             
GroupCoordinatorConfig.STREAMS_GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT);
 
-        // Verify that the number of standby replicas is evaluated to max,
-        // and task offset interval is evaluated to min
+        // Verify that the number of standby replicas is evaluated to max.
         assertEquals(
             Map.of(
                 "num.standby.replicas", 
String.valueOf(GroupCoordinatorConfig.STREAMS_GROUP_MAX_STANDBY_REPLICAS_DEFAULT)
@@ -27416,7 +27372,7 @@ public class GroupMetadataManagerTest {
                 .setStandbyTasks(List.of())
                 .setWarmupTasks(List.of())
                 .setStatus(List.of())
-                .setTaskOffsetIntervalMs(60_000),
+                .setTaskOffsetIntervalMs(0),
             result1.response().data()
         );
 
@@ -27481,7 +27437,7 @@ public class GroupMetadataManagerTest {
                 .setStatus(List.of(new 
StreamsGroupHeartbeatResponseData.Status()
                     .setStatusCode(Status.ASSIGNMENT_DELAYED.code())
                     .setStatusDetail("Assignment delayed due to the configured 
assignment interval.")))
-                .setTaskOffsetIntervalMs(60_000),
+                .setTaskOffsetIntervalMs(0),
             result2.response().data()
         );
 
@@ -27519,7 +27475,7 @@ public class GroupMetadataManagerTest {
                 .setHeartbeatIntervalMs(5000)
                 .setEndpointInformationEpoch(0)
                 .setStatus(List.of())
-                .setTaskOffsetIntervalMs(60_000),
+                .setTaskOffsetIntervalMs(0),
             result3.response().data()
         );
 
diff --git 
a/tools/src/test/java/org/apache/kafka/tools/ConfigCommandIntegrationTest.java 
b/tools/src/test/java/org/apache/kafka/tools/ConfigCommandIntegrationTest.java
index 9cb8ef7f3b1..a331081ba55 100644
--- 
a/tools/src/test/java/org/apache/kafka/tools/ConfigCommandIntegrationTest.java
+++ 
b/tools/src/test/java/org/apache/kafka/tools/ConfigCommandIntegrationTest.java
@@ -296,7 +296,6 @@ public class ConfigCommandIntegrationTest {
         assertTrue(message.contains("streams.heartbeat.interval.ms=5000 
sensitive=false 
synonyms={DEFAULT_CONFIG:group.streams.heartbeat.interval.ms=5000}"));
         assertTrue(message.contains("streams.num.standby.replicas=0 
sensitive=false 
synonyms={DEFAULT_CONFIG:group.streams.num.standby.replicas=0}"));
         assertTrue(message.contains("streams.session.timeout.ms=45000 
sensitive=false 
synonyms={DEFAULT_CONFIG:group.streams.session.timeout.ms=45000}"));
-        assertTrue(message.contains("streams.task.offset.interval.ms=60000 
sensitive=false 
synonyms={DEFAULT_CONFIG:group.streams.task.offset.interval.ms=60000}"));
     }
 
     @ClusterTest
@@ -402,41 +401,6 @@ public class ConfigCommandIntegrationTest {
         assertTrue(message.contains("streams.num.standby.replicas must be less 
than or equal to group.streams.max.standby.replicas"));
     }
 
-    @ClusterTest
-    public void testAlterStreamsGroupTaskOffsetInterval() {
-        // Verify the initial config
-        Stream<String> command = Stream.concat(quorumArgs(), Stream.of(
-            "--entity-type", "groups",
-            "--entity-name", "group",
-            "--describe", "--all"));
-        String message = captureStandardOut(run(command));
-        assertTrue(message.contains("streams.task.offset.interval.ms=60000"));
-
-        // Alter task offset interval
-        command = Stream.concat(quorumArgs(), Stream.of(
-            "--entity-type", "groups",
-            "--entity-name", "group",
-            "--alter", "--add-config", 
"streams.task.offset.interval.ms=45000"));
-        message = captureStandardOut(run(command));
-        assertEquals("Completed updating config for group group.", message);
-
-        // Verify the updated config
-        command = Stream.concat(quorumArgs(), Stream.of(
-            "--entity-type", "groups",
-            "--describe"));
-        message = captureStandardOut(run(command));
-        assertTrue(message.contains("streams.task.offset.interval.ms=45000"));
-
-        // Should fail to set below min interval
-        command = Stream.concat(quorumArgs(), Stream.of(
-            "--entity-type", "groups",
-            "--entity-name", "group",
-            "--alter", "--add-config", "streams.task.offset.interval.ms=1"));
-        message = captureStandardErr(run(command));
-        assertTrue(message.contains("streams.task.offset.interval.ms must be 
greater than or equal to group.streams.min.task.offset.interval.ms"));
-
-    }
-
     private void verifyGroupConfigUpdate(List<String> alterOpts) throws 
Exception {
         try (Admin client = cluster.admin()) {
             // Add config

Reply via email to