This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch 4.3
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.3 by this push:
new 052e0889294 Revert "KAFKA-18652: Add `task.offset.interval.ms` config
(#21737)" (#22020)
052e0889294 is described below
commit 052e088929458aef5c0bdaf8ba11032c74f8eb80
Author: Matthias J. Sax <[email protected]>
AuthorDate: Fri Apr 10 16:06:02 2026 -0700
Revert "KAFKA-18652: Add `task.offset.interval.ms` config (#21737)" (#22020)
This reverts commit b43d70885dbf293f0ce79db20372ce470c9f69cb.
Only reverts the relevant parts to remove `task.offset.interval.ms`
config.
Reviewers: Bill Bejeck <[email protected]>, Lucas Brutschy
<[email protected]>
---
.../StreamsGroupHeartbeatRequestManager.java | 1 -
.../consumer/internals/StreamsRebalanceData.java | 12 --
.../scala/unit/kafka/server/KafkaApisTest.scala | 3 +-
.../scala/unit/kafka/server/KafkaConfigTest.scala | 2 -
.../server/StreamsGroupHeartbeatRequestTest.scala | 23 ----
.../kafka/coordinator/group/GroupConfig.java | 30 +----
.../coordinator/group/GroupCoordinatorConfig.java | 37 +-----
.../coordinator/group/GroupMetadataManager.java | 12 +-
.../kafka/coordinator/group/GroupConfigTest.java | 38 ------
.../group/GroupCoordinatorConfigTest.java | 53 --------
.../group/GroupMetadataManagerTest.java | 134 +++++++--------------
.../kafka/tools/ConfigCommandIntegrationTest.java | 36 ------
12 files changed, 51 insertions(+), 330 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
index d4f1b3dace8..6907e113f65 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
@@ -531,7 +531,6 @@ public class StreamsGroupHeartbeatRequestManager implements
RequestManager {
heartbeatRequestState.onSuccessfulAttempt(currentTimeMs);
heartbeatState.setEndpointInformationEpoch(data.endpointInformationEpoch());
streamsRebalanceData.setHeartbeatIntervalMs(data.heartbeatIntervalMs());
-
streamsRebalanceData.setTaskOffsetIntervalMs(data.taskOffsetIntervalMs());
if (data.partitionsByUserEndpoint() != null) {
streamsRebalanceData.setPartitionsByHost(convertHostInfoMap(data));
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceData.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceData.java
index 383900355dd..a08472b6752 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceData.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceData.java
@@ -345,8 +345,6 @@ public class StreamsRebalanceData {
private final AtomicInteger heartbeatIntervalMs = new AtomicInteger(-1);
- private final AtomicInteger taskOffsetIntervalMs = new AtomicInteger(-1);
-
public StreamsRebalanceData(final UUID processId,
final Optional<HostInfo> endpoint,
final Optional<String> rackId,
@@ -429,14 +427,4 @@ public class StreamsRebalanceData {
return heartbeatIntervalMs.get();
}
- /** Updated whenever a heartbeat response is received from the broker. */
- public void setTaskOffsetIntervalMs(final int taskOffsetIntervalMs) {
- this.taskOffsetIntervalMs.set(taskOffsetIntervalMs);
- }
-
- /** Returns the task offset interval in milliseconds, or -1 if not yet
set. */
- public int taskOffsetIntervalMs() {
- return taskOffsetIntervalMs.get();
- }
-
}
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index d7cbcc52a2d..4f051f54d52 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -76,7 +76,7 @@ import org.apache.kafka.common.resource.{PatternType,
Resource, ResourcePattern,
import org.apache.kafka.common.security.auth.{KafkaPrincipal,
KafkaPrincipalSerde, SecurityProtocol}
import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource
import org.apache.kafka.common.utils.{ImplicitLinkedHashCollection,
ProducerIdAndEpoch, SecurityUtils, Utils}
-import
org.apache.kafka.coordinator.group.GroupConfig.{CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG,
CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, CONSUMER_SESSION_TIMEOUT_MS_CONFIG,
SHARE_ASSIGNMENT_INTERVAL_MS_CONFIG, SHARE_AUTO_OFFSET_RESET_CONFIG,
SHARE_DELIVERY_COUNT_LIMIT_CONFIG, SHARE_HEARTBEAT_INTERVAL_MS_CONFIG,
SHARE_ISOLATION_LEVEL_CONFIG, SHARE_PARTITION_MAX_RECORD_LOCKS_CONFIG,
SHARE_RECORD_LOCK_DURATION_MS_CONFIG, SHARE_RENEW_ACKNOWLEDGE_ENABLE_CONFIG,
SHARE_SESSION_TIMEOUT_MS_CONFIG, S [...]
+import
org.apache.kafka.coordinator.group.GroupConfig.{CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG,
CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, CONSUMER_SESSION_TIMEOUT_MS_CONFIG,
SHARE_ASSIGNMENT_INTERVAL_MS_CONFIG, SHARE_AUTO_OFFSET_RESET_CONFIG,
SHARE_DELIVERY_COUNT_LIMIT_CONFIG, SHARE_HEARTBEAT_INTERVAL_MS_CONFIG,
SHARE_ISOLATION_LEVEL_CONFIG, SHARE_PARTITION_MAX_RECORD_LOCKS_CONFIG,
SHARE_RECORD_LOCK_DURATION_MS_CONFIG, SHARE_RENEW_ACKNOWLEDGE_ENABLE_CONFIG,
SHARE_SESSION_TIMEOUT_MS_CONFIG, S [...]
import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig
import org.apache.kafka.coordinator.group.{GroupConfig, GroupConfigManager,
GroupCoordinator, GroupCoordinatorConfig}
import org.apache.kafka.coordinator.group.streams.StreamsGroupHeartbeatResult
@@ -373,7 +373,6 @@ class KafkaApisTest extends Logging {
cgConfigs.put(STREAMS_NUM_STANDBY_REPLICAS_CONFIG,
GroupCoordinatorConfig.STREAMS_GROUP_NUM_STANDBY_REPLICAS_DEFAULT.toString)
cgConfigs.put(STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG,
GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_DEFAULT.toString)
cgConfigs.put(STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG, "1000")
- cgConfigs.put(STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG,
GroupCoordinatorConfig.STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_DEFAULT.toString)
when(configRepository.groupConfig(consumerGroupId)).thenReturn(cgConfigs)
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index 2a035f77333..cf9063a3c2b 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -1086,8 +1086,6 @@ class KafkaConfigTest {
case
GroupCoordinatorConfig.STREAMS_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG =>
assertPropertyInvalid(baseProperties, name, "not_a_number", -1)
case
GroupCoordinatorConfig.STREAMS_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_CONFIG =>
assertPropertyInvalid(baseProperties, name, "not_a_number", -1)
case
GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG =>
assertPropertyInvalid(baseProperties, name, "not_a_number", -1)
- case
GroupCoordinatorConfig.STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_CONFIG =>
assertPropertyInvalid(baseProperties, name, "not_a_number", -1)
- case
GroupCoordinatorConfig.STREAMS_GROUP_MIN_TASK_OFFSET_INTERVAL_MS_CONFIG =>
assertPropertyInvalid(baseProperties, name, "not_a_number", -1)
/** Share coordinator configs */
case ShareCoordinatorConfig.APPEND_LINGER_MS_CONFIG =>
assertPropertyInvalid(baseProperties, name, "not_a_number", -2, -0.5)
diff --git
a/core/src/test/scala/unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala
b/core/src/test/scala/unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala
index 85b04e3e3ed..80d6ab0ec68 100644
---
a/core/src/test/scala/unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala
+++
b/core/src/test/scala/unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala
@@ -703,10 +703,6 @@ class StreamsGroupHeartbeatRequestTest(cluster:
ClusterInstance) extends GroupCo
assertEquals(0, streamsGroupHeartbeatResponse1.standbyTasks().size(),
"Member 1 should have no standby tasks initially")
assertEquals(0, streamsGroupHeartbeatResponse2.standbyTasks().size(),
"Member 2 should have no standby tasks initially")
- // Verify both members picked up `task.offset.interval.ms`
- assertEquals(60_000,
streamsGroupHeartbeatResponse1.taskOffsetIntervalMs(), "Member 1 should pickup
task.offset.interval.ms initially")
- assertEquals(60_000,
streamsGroupHeartbeatResponse2.taskOffsetIntervalMs(), "Member 2 should pickup
task.offset.interval.ms initially")
-
// Both members continue to send heartbeats with their assigned tasks
TestUtils.waitUntilTrue(() => {
streamsGroupHeartbeatResponse1 = streamsGroupHeartbeat(
@@ -757,10 +753,6 @@ class StreamsGroupHeartbeatRequestTest(cluster:
ClusterInstance) extends GroupCo
}
assertEquals(0, member2StandbyTasksSize, "Member 2 should have no
standby tasks in this configuration")
- // Verify both members picked up `task.offset.interval.ms`
- assertEquals(60_000,
streamsGroupHeartbeatResponse1.taskOffsetIntervalMs(), "Member 1 should pickup
task.offset.interval.ms initially")
- assertEquals(60_000,
streamsGroupHeartbeatResponse2.taskOffsetIntervalMs(), "Member 2 should pickup
task.offset.interval.ms initially")
-
// Change streams.num.standby.replicas = 1
val groupConfigResource = new ConfigResource(ConfigResource.Type.GROUP,
groupId)
val alterConfigOp = new AlterConfigOp(
@@ -771,16 +763,6 @@ class StreamsGroupHeartbeatRequestTest(cluster:
ClusterInstance) extends GroupCo
val options = new org.apache.kafka.clients.admin.AlterConfigsOptions()
admin.incrementalAlterConfigs(configChanges, options).all().get()
- // Change streams.task.offset.interval.ms = 45000
- val groupConfigResource2 = new ConfigResource(ConfigResource.Type.GROUP,
groupId)
- val alterConfigOp2 = new AlterConfigOp(
- new ConfigEntry("streams.task.offset.interval.ms", "45000"),
- AlterConfigOp.OpType.SET
- )
- val configChanges2 = Map(groupConfigResource2 ->
List(alterConfigOp2).asJavaCollection).asJava
- val options2 = new org.apache.kafka.clients.admin.AlterConfigsOptions()
- admin.incrementalAlterConfigs(configChanges2, options2).all().get()
-
// Send heartbeats to trigger rebalance after config change
TestUtils.waitUntilTrue(() => {
streamsGroupHeartbeatResponse1 = streamsGroupHeartbeat(
@@ -825,10 +807,6 @@ class StreamsGroupHeartbeatRequestTest(cluster:
ClusterInstance) extends GroupCo
val totalStandbyTasks = member1StandbyTasksNum + member2StandbyTasksNum
assertEquals(totalActiveTasks, totalStandbyTasks, "Each active task
should have one standby task")
- // Verify both members picked up change of `task.offset.interval.ms`
- assertEquals(45_000,
streamsGroupHeartbeatResponse1.taskOffsetIntervalMs(), "Member 1 should pickup
task.offset.interval.ms initially")
- assertEquals(45_000,
streamsGroupHeartbeatResponse2.taskOffsetIntervalMs(), "Member 2 should pickup
task.offset.interval.ms initially")
-
} finally {
admin.close()
}
@@ -1113,7 +1091,6 @@ class StreamsGroupHeartbeatRequestTest(cluster:
ClusterInstance) extends GroupCo
.setActiveTasks(expectedActiveTasks)
.setStandbyTasks(List.empty.asJava)
.setWarmupTasks(List.empty.asJava)
- .setTaskOffsetIntervalMs(60_000)
assertEquals(expectedRejoinResponse, rejoinResponse)
} finally {
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
index 396ddcaba39..295fbe2a9e5 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
@@ -104,8 +104,6 @@ public final class GroupConfig extends AbstractConfig {
public static final String STREAMS_ASSIGNOR_OFFLOAD_ENABLE_CONFIG =
"streams.assignor.offload.enable";
- public static final String STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG =
"streams.task.offset.interval.ms";
-
public final int consumerSessionTimeoutMs;
public final int consumerHeartbeatIntervalMs;
@@ -151,8 +149,6 @@ public final class GroupConfig extends AbstractConfig {
public final Optional<Boolean> streamsAssignorOffloadEnable;
- public final int streamsTaskOffsetIntervalMs;
-
public final String shareIsolationLevel;
public final boolean shareRenewAcknowledgeEnable;
@@ -258,13 +254,7 @@ public final class GroupConfig extends AbstractConfig {
GroupCoordinatorConfig.STREAMS_GROUP_ASSIGNMENT_INTERVAL_MS_DEFAULT,
atLeast(0),
MEDIUM,
- GroupCoordinatorConfig.STREAMS_GROUP_ASSIGNMENT_INTERVAL_MS_DOC)
- .define(STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG,
- INT,
-
GroupCoordinatorConfig.STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_DEFAULT,
- atLeast(1),
- MEDIUM,
- GroupCoordinatorConfig.STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_DOC);
+ GroupCoordinatorConfig.STREAMS_GROUP_ASSIGNMENT_INTERVAL_MS_DOC);
/**
* Mapping from GroupConfig name to its broker-level synonym config name.
@@ -292,8 +282,7 @@ public final class GroupConfig extends AbstractConfig {
Map.entry(STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG,
Optional.of(GroupCoordinatorConfig.STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG)),
Map.entry(STREAMS_NUM_STANDBY_REPLICAS_CONFIG,
Optional.of(GroupCoordinatorConfig.STREAMS_GROUP_NUM_STANDBY_REPLICAS_CONFIG)),
Map.entry(STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG,
Optional.of(GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG)),
- Map.entry(STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG,
Optional.of(GroupCoordinatorConfig.STREAMS_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG)),
- Map.entry(STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG,
Optional.of(GroupCoordinatorConfig.STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_CONFIG))
+ Map.entry(STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG,
Optional.of(GroupCoordinatorConfig.STREAMS_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG))
);
/**
@@ -339,7 +328,6 @@ public final class GroupConfig extends AbstractConfig {
Optional.of(getInt(STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG)) :
Optional.empty();
this.streamsAssignorOffloadEnable = Optional.empty();
- this.streamsTaskOffsetIntervalMs =
getInt(STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG);
this.shareIsolationLevel = getString(SHARE_ISOLATION_LEVEL_CONFIG);
this.shareRenewAcknowledgeEnable =
getBoolean(SHARE_RENEW_ACKNOWLEDGE_ENABLE_CONFIG);
}
@@ -383,7 +371,6 @@ public final class GroupConfig extends AbstractConfig {
int streamsHeartbeatIntervalMs = (Integer)
valueMaps.get(STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG);
int streamsNumStandbyReplicas = (Integer)
valueMaps.get(STREAMS_NUM_STANDBY_REPLICAS_CONFIG);
int streamsAssignmentIntervalMs = (Integer)
valueMaps.get(STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG);
- int streamsTaskOffsetIntervalMs = (Integer)
valueMaps.get(STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG);
if (consumerHeartbeatInterval <
groupCoordinatorConfig.consumerGroupMinHeartbeatIntervalMs()) {
throw new
InvalidConfigurationException(CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG + " must be
greater than or equal to " +
GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG);
@@ -499,10 +486,6 @@ public final class GroupConfig extends AbstractConfig {
GroupCoordinatorConfig.STREAMS_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_CONFIG);
}
}
- if (streamsTaskOffsetIntervalMs <
groupCoordinatorConfig.streamsGroupMinTaskOffsetIntervalMs()) {
- throw new
InvalidConfigurationException(STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG + " must
be greater than or equal to " +
-
GroupCoordinatorConfig.STREAMS_GROUP_MIN_TASK_OFFSET_INTERVAL_MS_CONFIG);
- }
if (consumerSessionTimeout <= consumerHeartbeatInterval) {
throw new
InvalidConfigurationException(CONSUMER_SESSION_TIMEOUT_MS_CONFIG + " must be
greater than " +
CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG);
@@ -604,8 +587,6 @@ public final class GroupConfig extends AbstractConfig {
clampToRange(props, groupId, STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG,
groupCoordinatorConfig.streamsGroupMinAssignmentIntervalMs(),
groupCoordinatorConfig.streamsGroupMaxAssignmentIntervalMs());
- clampToMin(props, groupId, STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG,
- groupCoordinatorConfig.streamsGroupMinTaskOffsetIntervalMs());
// Verify that clamping did not break the session > heartbeat
invariant.
checkSessionExceedsHeartbeat(props, groupId,
@@ -883,13 +864,6 @@ public final class GroupConfig extends AbstractConfig {
return streamsAssignorOffloadEnable;
}
- /**
- * The task offset reporting interval.
- */
- public int streamsTaskOffsetIntervalMs() {
- return streamsTaskOffsetIntervalMs;
- }
-
/**
* The share group isolation level.
*/
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
index 5000678d925..592b36b9130 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
@@ -374,14 +374,6 @@ public class GroupCoordinatorConfig {
public static final String STREAMS_GROUP_ASSIGNOR_OFFLOAD_ENABLE_DOC =
"Whether to offload streams group assignment to a group coordinator background
thread.";
public static final boolean STREAMS_GROUP_ASSIGNOR_OFFLOAD_ENABLE_DEFAULT
= true;
- public static final String STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_CONFIG =
"group.streams.task.offset.interval.ms";
- public static final int STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_DEFAULT =
60000;
- public static final String STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_DOC =
"The interval in which the warmup task changelog offsets on a client are
updated on the broker. The offsets are sent with the next heartbeat after this
time has passed.";
-
- public static final String
STREAMS_GROUP_MIN_TASK_OFFSET_INTERVAL_MS_CONFIG =
"group.streams.min.task.offset.interval.ms";
- public static final int STREAMS_GROUP_MIN_TASK_OFFSET_INTERVAL_MS_DEFAULT
= 15000;
- public static final String STREAMS_GROUP_MIN_TASK_OFFSET_INTERVAL_MS_DOC =
"The minimum allowed value for the group-level configuration of " +
GroupConfig.STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG;
-
public static final Set<String> RECONFIGURABLE_CONFIGS = Set.of(
CACHED_BUFFER_MAX_BYTES_CONFIG,
CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG,
@@ -460,9 +452,7 @@ public class GroupCoordinatorConfig {
.define(STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, INT,
STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_DEFAULT, atLeast(0), MEDIUM,
STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_DOC)
.define(STREAMS_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, INT,
STREAMS_GROUP_ASSIGNMENT_INTERVAL_MS_DEFAULT, atLeast(0), MEDIUM,
STREAMS_GROUP_ASSIGNMENT_INTERVAL_MS_DOC)
.define(STREAMS_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG, INT,
STREAMS_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_DEFAULT, atLeast(0), MEDIUM,
STREAMS_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_DOC)
- .define(STREAMS_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_CONFIG, INT,
STREAMS_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_DEFAULT, atLeast(0), MEDIUM,
STREAMS_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_DOC)
- .define(STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_CONFIG, INT,
STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM,
STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_DOC)
- .define(STREAMS_GROUP_MIN_TASK_OFFSET_INTERVAL_MS_CONFIG, INT,
STREAMS_GROUP_MIN_TASK_OFFSET_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM,
STREAMS_GROUP_MIN_TASK_OFFSET_INTERVAL_MS_DOC);
+ .define(STREAMS_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_CONFIG, INT,
STREAMS_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_DEFAULT, atLeast(0), MEDIUM,
STREAMS_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_DOC);
/**
@@ -523,8 +513,6 @@ public class GroupCoordinatorConfig {
private final int streamsGroupInitialRebalanceDelayMs;
private final int streamsGroupMinAssignmentIntervalMs;
private final int streamsGroupMaxAssignmentIntervalMs;
- private final int streamsGroupTaskOffsetIntervalMs;
- private final int streamsGroupMinTaskOffsetIntervalMs;
private final AbstractConfig config;
@@ -587,8 +575,6 @@ public class GroupCoordinatorConfig {
this.streamsGroupInitialRebalanceDelayMs =
config.getInt(GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG);
this.streamsGroupMinAssignmentIntervalMs =
config.getInt(GroupCoordinatorConfig.STREAMS_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG);
this.streamsGroupMaxAssignmentIntervalMs =
config.getInt(GroupCoordinatorConfig.STREAMS_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_CONFIG);
- this.streamsGroupTaskOffsetIntervalMs =
config.getInt(GroupCoordinatorConfig.STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_CONFIG);
- this.streamsGroupMinTaskOffsetIntervalMs =
config.getInt(GroupCoordinatorConfig.STREAMS_GROUP_MIN_TASK_OFFSET_INTERVAL_MS_CONFIG);
this.config = config;
// New group coordinator configs validation.
@@ -688,10 +674,6 @@ public class GroupCoordinatorConfig {
require(streamsGroupNumStandbyReplicas <=
streamsGroupMaxStandbyReplicas,
String.format("%s must be less than or equal to %s",
STREAMS_GROUP_NUM_STANDBY_REPLICAS_CONFIG,
STREAMS_GROUP_MAX_STANDBY_REPLICAS_CONFIG));
-
- require(streamsGroupTaskOffsetIntervalMs >=
streamsGroupMinTaskOffsetIntervalMs,
- String.format("%s must be greater than or equal to %s",
STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_CONFIG,
STREAMS_GROUP_MIN_TASK_OFFSET_INTERVAL_MS_CONFIG));
-
}
/**
@@ -890,8 +872,7 @@ public class GroupCoordinatorConfig {
GroupConfig.STREAMS_SESSION_TIMEOUT_MS_CONFIG,
streamsGroupSessionTimeoutMs(),
GroupConfig.STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG,
streamsGroupHeartbeatIntervalMs(),
GroupConfig.STREAMS_NUM_STANDBY_REPLICAS_CONFIG,
streamsGroupNumStandbyReplicas(),
- GroupConfig.STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG,
streamsGroupInitialRebalanceDelayMs(),
- GroupConfig.STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG,
streamsGroupTaskOffsetIntervalMs());
+ GroupConfig.STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG,
streamsGroupInitialRebalanceDelayMs());
}
/**
@@ -1328,18 +1309,4 @@ public class GroupCoordinatorConfig {
public boolean streamsGroupAssignorOffloadEnable() {
return STREAMS_GROUP_ASSIGNOR_OFFLOAD_ENABLE_DEFAULT;
}
-
- /**
- * The task offset interval for streams groups.
- */
- public int streamsGroupTaskOffsetIntervalMs() {
- return streamsGroupTaskOffsetIntervalMs;
- }
-
- /**
- * The minimum task offset interval for streams groups.
- */
- public int streamsGroupMinTaskOffsetIntervalMs() {
- return streamsGroupMinTaskOffsetIntervalMs;
- }
}
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index 09714ab28b8..f9230452d54 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -2140,8 +2140,7 @@ public class GroupMetadataManager {
StreamsGroupHeartbeatResponseData response = new
StreamsGroupHeartbeatResponseData()
.setMemberId(updatedMember.memberId())
.setMemberEpoch(updatedMember.memberEpoch())
- .setHeartbeatIntervalMs(streamsGroupHeartbeatIntervalMs(groupId))
-
.setTaskOffsetIntervalMs(streamsGroupTaskOffsetIntervalMs(groupId));
+ .setHeartbeatIntervalMs(streamsGroupHeartbeatIntervalMs(groupId));
// The assignment is only provided in the following cases:
// 1. The member is joining.
// 2. The member's assignment has been updated.
@@ -8867,15 +8866,6 @@ public class GroupMetadataManager {
.orElse(config.streamsGroupAssignorOffloadEnable());
}
- /**
- * Get the task offset interval of the provided streams group.
- */
- private int streamsGroupTaskOffsetIntervalMs(String groupId) {
- Optional<GroupConfig> groupConfig =
groupConfigManager.groupConfig(groupId);
- return groupConfig.map(GroupConfig::streamsTaskOffsetIntervalMs)
- .orElse(config.streamsGroupTaskOffsetIntervalMs());
- }
-
/**
* Get the initial rebalance delay of the provided streams group.
*/
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
index d7208fb8e5c..73195ab825a 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
@@ -51,7 +51,6 @@ import static
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.STREAMS_
import static
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.STREAMS_GROUP_MAX_STANDBY_REPLICAS_DEFAULT;
import static
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.STREAMS_GROUP_MIN_HEARTBEAT_INTERVAL_MS_DEFAULT;
import static
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_DEFAULT;
-import static
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.STREAMS_GROUP_MIN_TASK_OFFSET_INTERVAL_MS_DEFAULT;
import static
org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig.SHARE_GROUP_MAX_DELIVERY_COUNT_LIMIT_DEFAULT;
import static
org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig.SHARE_GROUP_MAX_PARTITION_MAX_RECORD_LOCKS_DEFAULT;
import static
org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig.SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_DEFAULT;
@@ -117,8 +116,6 @@ public class GroupConfigTest {
assertPropertyInvalid(name, "not_a_number", "-1", "1.0");
} else if
(GroupConfig.STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG.equals(name)) {
assertPropertyInvalid(name, "not_a_number", "-1", "1.2");
- } else if
(GroupConfig.STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG.equals(name)) {
- assertPropertyInvalid(name, "not_a_number", "1.0");
} else {
assertPropertyInvalid(name, "not_a_number", "-0.1");
}
@@ -307,11 +304,6 @@ public class GroupConfigTest {
doTestInvalidProps(props, InvalidConfigurationException.class);
props = createValidGroupConfig();
- // Check for invalid streamsTaskOffsetIntervalMs, < MIN
- props.put(GroupConfig.STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG, "1000");
- doTestInvalidProps(props, InvalidConfigurationException.class);
- props = createValidGroupConfig();
-
// Check for invalid shareIsolationLevel.
props.put(GroupConfig.SHARE_ISOLATION_LEVEL_CONFIG, "read_commit");
doTestInvalidProps(props, ConfigException.class);
@@ -354,7 +346,6 @@ public class GroupConfigTest {
defaultValue.put(GroupConfig.STREAMS_NUM_STANDBY_REPLICAS_CONFIG, "1");
defaultValue.put(GroupConfig.STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG, "3000");
defaultValue.put(GroupConfig.STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG,
"1250");
- defaultValue.put(GroupConfig.STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG,
"30000");
defaultValue.put(GroupConfig.SHARE_RENEW_ACKNOWLEDGE_ENABLE_CONFIG,
"true");
Properties props = new Properties();
@@ -377,7 +368,6 @@ public class GroupConfigTest {
assertEquals(1,
config.getInt(GroupConfig.STREAMS_NUM_STANDBY_REPLICAS_CONFIG));
assertEquals(3000,
config.getInt(GroupConfig.STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG));
assertEquals(1250,
config.getInt(GroupConfig.STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG));
- assertEquals(30000,
config.getInt(GroupConfig.STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG));
assertEquals(true,
config.getBoolean(GroupConfig.SHARE_RENEW_ACKNOWLEDGE_ENABLE_CONFIG));
}
@@ -549,19 +539,6 @@ public class GroupConfigTest {
);
}
- /**
- * Data source for configs with min-only evaluation (no max bound enforced
by evaluate).
- * Each entry: (configKey, tooLow, expectedMax).
- */
- private static Stream<Arguments> minBoundedConfigs() {
- return Stream.of(
- Arguments.of(
- GroupConfig.STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG,
- 1000, STREAMS_GROUP_MIN_TASK_OFFSET_INTERVAL_MS_DEFAULT
- )
- );
- }
-
@ParameterizedTest(name = "testEvaluateValueAboveMaxIsCapped[{0}]")
@MethodSource("rangeBoundedConfigs")
public void testEvaluateValueAboveMaxIsCapped(
@@ -613,20 +590,6 @@ public class GroupConfigTest {
assertEquals(expectedMax, result.get(key));
}
- @ParameterizedTest(name =
"testEvaluateMinBoundedValueBelowMinIsCapped[{0}]")
- @MethodSource("minBoundedConfigs")
- public void testEvaluateMinBoundedValueBelowMinIsCapped(
- String key,
- int tooLow,
- int expectedMin
- ) {
- Properties props = new Properties();
- props.put(key, tooLow);
- Properties result = GroupConfig.evaluate(props, "test-group",
- GroupCoordinatorConfig.fromProps(new HashMap<>()),
ShareGroupConfig.fromProps(new HashMap<>()));
- assertEquals(expectedMin, result.get(key));
- }
-
@Test
public void testAllGroupConfigSynonyms() {
// Every GroupConfig entry should have an entry in
ALL_GROUP_CONFIG_SYNONYMS.
@@ -670,7 +633,6 @@ public class GroupConfigTest {
props.put(GroupConfig.STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG, "6000");
props.put(GroupConfig.STREAMS_NUM_STANDBY_REPLICAS_CONFIG, "1");
props.put(GroupConfig.STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG,
"3000");
- props.put(GroupConfig.STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG, "45000");
props.put(GroupConfig.SHARE_RENEW_ACKNOWLEDGE_ENABLE_CONFIG, "true");
return props;
}
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
index dd56d1ed994..3444e7cd7c6 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
@@ -544,25 +544,6 @@ public class GroupCoordinatorConfigTest {
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG,
-1);
assertEquals("Invalid value -1 for configuration
group.streams.initial.rebalance.delay.ms: Value must be at least 0",
assertThrows(ConfigException.class, () ->
createConfig(configs)).getMessage());
-
- // group.streams.task.offset.interval.ms
-
- // must be positive
- configs.clear();
-
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_CONFIG,
0);
- assertEquals("Invalid value 0 for configuration
group.streams.task.offset.interval.ms: Value must be at least 1",
- assertThrows(ConfigException.class, () ->
createConfig(configs)).getMessage());
-
- // cannot be smaller than MIN
- configs.clear();
-
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_CONFIG,
GroupCoordinatorConfig.STREAMS_GROUP_MIN_TASK_OFFSET_INTERVAL_MS_DEFAULT - 1);
- assertEquals("group.streams.task.offset.interval.ms must be greater
than or equal to group.streams.min.task.offset.interval.ms",
- assertThrows(IllegalArgumentException.class, () ->
createConfig(configs)).getMessage());
-
- // can be MIN
- configs.clear();
-
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_CONFIG,
GroupCoordinatorConfig.STREAMS_GROUP_MIN_TASK_OFFSET_INTERVAL_MS_DEFAULT);
- createConfig(configs);
}
@Test
@@ -684,40 +665,6 @@ public class GroupCoordinatorConfigTest {
assertEquals(7000, config.streamsGroupInitialRebalanceDelayMs());
}
- @Test
- public void testStreamsGroupTaskOffsetIntervalDefaultValue() {
- Map<String, Object> configs = new HashMap<>();
- GroupCoordinatorConfig config = createConfig(configs);
- assertEquals(60000, config.streamsGroupTaskOffsetIntervalMs());
-
assertEquals(GroupCoordinatorConfig.STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_DEFAULT,
- config.streamsGroupTaskOffsetIntervalMs());
- }
-
- @Test
- public void testStreamsGroupTaskOffsetIntervalCustomValue() {
- Map<String, Object> configs = new HashMap<>();
-
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_CONFIG,
45000);
- GroupCoordinatorConfig config = createConfig(configs);
- assertEquals(45000, config.streamsGroupTaskOffsetIntervalMs());
- }
-
- @Test
- public void testStreamsGroupMinTaskOffsetIntervalDefaultValue() {
- Map<String, Object> configs = new HashMap<>();
- GroupCoordinatorConfig config = createConfig(configs);
- assertEquals(15000, config.streamsGroupMinTaskOffsetIntervalMs());
-
assertEquals(GroupCoordinatorConfig.STREAMS_GROUP_MIN_TASK_OFFSET_INTERVAL_MS_DEFAULT,
- config.streamsGroupMinTaskOffsetIntervalMs());
- }
-
- @Test
- public void testStreamsGroupMinTaskOffsetIntervalCustomValue() {
- Map<String, Object> configs = new HashMap<>();
-
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_MIN_TASK_OFFSET_INTERVAL_MS_CONFIG,
20000);
- GroupCoordinatorConfig config = createConfig(configs);
- assertEquals(20000, config.streamsGroupMinTaskOffsetIntervalMs());
- }
-
public static GroupCoordinatorConfig createConfig(Map<String, Object>
configs) {
return new GroupCoordinatorConfig(new AbstractConfig(
GroupCoordinatorConfig.CONFIG_DEF,
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index 306128837ff..dd751f607f2 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -206,7 +206,6 @@ import static
org.apache.kafka.coordinator.group.GroupConfig.SHARE_SESSION_TIMEO
import static
org.apache.kafka.coordinator.group.GroupConfig.STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG;
import static
org.apache.kafka.coordinator.group.GroupConfig.STREAMS_NUM_STANDBY_REPLICAS_CONFIG;
import static
org.apache.kafka.coordinator.group.GroupConfig.STREAMS_SESSION_TIMEOUT_MS_CONFIG;
-import static
org.apache.kafka.coordinator.group.GroupConfig.STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG;
import static
org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newShareGroupStatePartitionMetadataRecord;
import static
org.apache.kafka.coordinator.group.GroupMetadataManager.EMPTY_RESULT;
import static
org.apache.kafka.coordinator.group.GroupMetadataManager.appendGroupMetadataErrorToResponseError;
@@ -1208,8 +1207,7 @@ public class GroupMetadataManagerTest {
.setPartitions(List.of(0, 1, 2))))
.setStandbyTasks(List.of())
.setWarmupTasks(List.of())
- .setStatus(List.of())
- .setTaskOffsetIntervalMs(60_000),
+ .setStatus(List.of()),
result.response().data()
);
}
@@ -17582,8 +17580,7 @@ public class GroupMetadataManagerTest {
))
.setStandbyTasks(List.of())
.setWarmupTasks(List.of())
- .setStatus(List.of())
- .setTaskOffsetIntervalMs(60_000),
+ .setStatus(List.of()),
result.response().data()
);
@@ -17677,8 +17674,7 @@ public class GroupMetadataManagerTest {
))
.setStandbyTasks(List.of())
.setWarmupTasks(List.of())
- .setStatus(List.of())
- .setTaskOffsetIntervalMs(60_000),
+ .setStatus(List.of()),
result.response().data()
);
@@ -17771,8 +17767,7 @@ public class GroupMetadataManagerTest {
.setWarmupTasks(List.of())
.setStatus(List.of(new
StreamsGroupHeartbeatResponseData.Status()
.setStatusCode(Status.MISSING_SOURCE_TOPICS.code())
- .setStatusDetail("Source topics bar are missing.")))
- .setTaskOffsetIntervalMs(60_000),
+ .setStatusDetail("Source topics bar are missing."))),
result.response().data()
);
@@ -17864,8 +17859,7 @@ public class GroupMetadataManagerTest {
.setWarmupTasks(List.of())
.setStatus(List.of(new
StreamsGroupHeartbeatResponseData.Status()
.setStatusCode(Status.MISSING_INTERNAL_TOPICS.code())
- .setStatusDetail("Internal topics are missing: bar")))
- .setTaskOffsetIntervalMs(60_000),
+ .setStatusDetail("Internal topics are missing: bar"))),
result.response().data()
);
@@ -17954,8 +17948,7 @@ public class GroupMetadataManagerTest {
.setWarmupTasks(List.of())
.setStatus(List.of(new
StreamsGroupHeartbeatResponseData.Status()
.setStatusCode(Status.INCORRECTLY_PARTITIONED_TOPICS.code())
- .setStatusDetail("Following topics do not have the same
number of partitions: [{bar=3, foo=6}]")))
- .setTaskOffsetIntervalMs(60_000),
+ .setStatusDetail("Following topics do not have the same
number of partitions: [{bar=3, foo=6}]"))),
result.response().data()
);
@@ -18061,8 +18054,7 @@ public class GroupMetadataManagerTest {
.setWarmupTasks(List.of())
.setStatus(List.of(new
StreamsGroupHeartbeatResponseData.Status()
.setStatusCode(Status.STALE_TOPOLOGY.code())
- .setStatusDetail("The member's topology epoch 0 is behind
the group's topology epoch 1.")))
- .setTaskOffsetIntervalMs(60_000),
+ .setStatusDetail("The member's topology epoch 0 is behind
the group's topology epoch 1."))),
result.response().data()
);
@@ -18164,8 +18156,7 @@ public class GroupMetadataManagerTest {
new StreamsGroupHeartbeatResponseData.Status()
.setStatusCode(Status.SHUTDOWN_APPLICATION.code())
.setStatusDetail(statusDetail)
- ))
- .setTaskOffsetIntervalMs(60_000),
+ )),
result1.response().data()
);
assertRecordsEquals(List.of(), result1.records());
@@ -18186,8 +18177,7 @@ public class GroupMetadataManagerTest {
new StreamsGroupHeartbeatResponseData.Status()
.setStatusCode(Status.SHUTDOWN_APPLICATION.code())
.setStatusDetail(statusDetail)
- ))
- .setTaskOffsetIntervalMs(60_000),
+ )),
result2.response().data()
);
@@ -18280,8 +18270,7 @@ public class GroupMetadataManagerTest {
new StreamsGroupHeartbeatResponseData.Status()
.setStatusCode(Status.SHUTDOWN_APPLICATION.code())
.setStatusDetail(statusDetail)
- ))
- .setTaskOffsetIntervalMs(60_000),
+ )),
result2.response().data()
);
}
@@ -18361,8 +18350,7 @@ public class GroupMetadataManagerTest {
))
.setStandbyTasks(List.of())
.setWarmupTasks(List.of())
- .setStatus(List.of())
- .setTaskOffsetIntervalMs(60_000),
+ .setStatus(List.of()),
result.response().data()
);
@@ -18491,8 +18479,7 @@ public class GroupMetadataManagerTest {
))
.setStandbyTasks(List.of())
.setWarmupTasks(List.of())
- .setStatus(List.of())
- .setTaskOffsetIntervalMs(60_000),
+ .setStatus(List.of()),
result.response().data()
);
@@ -18640,8 +18627,7 @@ public class GroupMetadataManagerTest {
.setActiveTasks(List.of())
.setStandbyTasks(List.of())
.setWarmupTasks(List.of())
- .setStatus(List.of())
- .setTaskOffsetIntervalMs(60_000),
+ .setStatus(List.of()),
result.response().data()
);
@@ -18769,8 +18755,7 @@ public class GroupMetadataManagerTest {
.setPartitions(List.of(0, 1))))
.setStandbyTasks(List.of())
.setWarmupTasks(List.of())
- .setStatus(List.of())
- .setTaskOffsetIntervalMs(60_000),
+ .setStatus(List.of()),
result.response().data()
);
@@ -18786,8 +18771,7 @@ public class GroupMetadataManagerTest {
.setMemberId(memberId)
.setMemberEpoch(2)
.setHeartbeatIntervalMs(5000)
- .setStatus(List.of())
- .setTaskOffsetIntervalMs(60_000),
+ .setStatus(List.of()),
result.response().data()
);
}
@@ -18840,8 +18824,7 @@ public class GroupMetadataManagerTest {
.setPartitions(List.of(0, 1))))
.setStandbyTasks(List.of())
.setWarmupTasks(List.of())
- .setStatus(List.of())
- .setTaskOffsetIntervalMs(60_000),
+ .setStatus(List.of()),
result.response().data()
);
@@ -18857,8 +18840,7 @@ public class GroupMetadataManagerTest {
.setMemberId(memberId)
.setMemberEpoch(2)
.setHeartbeatIntervalMs(5000)
- .setStatus(List.of())
- .setTaskOffsetIntervalMs(60_000),
+ .setStatus(List.of()),
result.response().data()
);
}
@@ -18910,8 +18892,7 @@ public class GroupMetadataManagerTest {
new StreamsGroupHeartbeatResponseData.Status()
.setStatusCode(Status.ASSIGNMENT_DELAYED.code())
.setStatusDetail("Assignment delayed due to the
configured initial rebalance delay.")
- ))
- .setTaskOffsetIntervalMs(60_000),
+ )),
result.response().data()
);
@@ -18940,8 +18921,7 @@ public class GroupMetadataManagerTest {
.setPartitions(List.of(0, 1))))
.setStandbyTasks(List.of())
.setWarmupTasks(List.of())
- .setStatus(List.of())
- .setTaskOffsetIntervalMs(60_000),
+ .setStatus(List.of()),
result.response().data()
);
}
@@ -19108,8 +19088,7 @@ public class GroupMetadataManagerTest {
.setActiveTasks(List.of())
.setStandbyTasks(List.of())
.setWarmupTasks(List.of())
- .setStatus(List.of())
- .setTaskOffsetIntervalMs(60_000),
+ .setStatus(List.of()),
result.response().data()
);
@@ -19151,8 +19130,7 @@ public class GroupMetadataManagerTest {
))
.setStandbyTasks(List.of())
.setWarmupTasks(List.of())
- .setStatus(List.of())
- .setTaskOffsetIntervalMs(60_000),
+ .setStatus(List.of()),
result.response().data()
);
@@ -19198,8 +19176,7 @@ public class GroupMetadataManagerTest {
))
.setStandbyTasks(List.of())
.setWarmupTasks(List.of())
- .setStatus(List.of())
- .setTaskOffsetIntervalMs(60_000),
+ .setStatus(List.of()),
result.response().data()
);
@@ -19233,8 +19210,7 @@ public class GroupMetadataManagerTest {
.setMemberId(memberId3)
.setMemberEpoch(11)
.setHeartbeatIntervalMs(5000)
- .setStatus(List.of())
- .setTaskOffsetIntervalMs(60_000),
+ .setStatus(List.of()),
result.response().data()
);
@@ -19274,8 +19250,7 @@ public class GroupMetadataManagerTest {
.setMemberId(memberId1)
.setMemberEpoch(11)
.setHeartbeatIntervalMs(5000)
- .setStatus(List.of())
- .setTaskOffsetIntervalMs(60_000),
+ .setStatus(List.of()),
result.response().data()
);
@@ -19305,8 +19280,7 @@ public class GroupMetadataManagerTest {
.setMemberId(memberId2)
.setMemberEpoch(10)
.setHeartbeatIntervalMs(5000)
- .setStatus(List.of())
- .setTaskOffsetIntervalMs(60_000),
+ .setStatus(List.of()),
result.response().data()
);
@@ -19333,8 +19307,7 @@ public class GroupMetadataManagerTest {
.setPartitions(List.of(1))))
.setStandbyTasks(List.of())
.setWarmupTasks(List.of())
- .setStatus(List.of())
- .setTaskOffsetIntervalMs(60_000),
+ .setStatus(List.of()),
result.response().data()
);
@@ -19365,8 +19338,7 @@ public class GroupMetadataManagerTest {
.setMemberId(memberId3)
.setMemberEpoch(11)
.setHeartbeatIntervalMs(5000)
- .setStatus(List.of())
- .setTaskOffsetIntervalMs(60_000),
+ .setStatus(List.of()),
result.response().data()
);
@@ -19409,8 +19381,7 @@ public class GroupMetadataManagerTest {
))
.setStandbyTasks(List.of())
.setWarmupTasks(List.of())
- .setStatus(List.of())
- .setTaskOffsetIntervalMs(60_000),
+ .setStatus(List.of()),
result.response().data()
);
@@ -19457,8 +19428,7 @@ public class GroupMetadataManagerTest {
.setPartitions(List.of(1))))
.setStandbyTasks(List.of())
.setWarmupTasks(List.of())
- .setStatus(List.of())
- .setTaskOffsetIntervalMs(60_000),
+ .setStatus(List.of()),
result.response().data()
);
@@ -19669,8 +19639,7 @@ public class GroupMetadataManagerTest {
))
.setStandbyTasks(List.of())
.setWarmupTasks(List.of())
- .setStatus(List.of())
- .setTaskOffsetIntervalMs(60_000),
+ .setStatus(List.of()),
result.response().data()
);
@@ -19806,8 +19775,7 @@ public class GroupMetadataManagerTest {
))
.setStandbyTasks(List.of())
.setWarmupTasks(List.of())
- .setStatus(List.of())
- .setTaskOffsetIntervalMs(60_000),
+ .setStatus(List.of()),
result.response().data()
);
@@ -20048,8 +20016,7 @@ public class GroupMetadataManagerTest {
.setPartitions(List.of(0, 1, 2))))
.setStandbyTasks(List.of())
.setWarmupTasks(List.of())
- .setStatus(List.of())
- .setTaskOffsetIntervalMs(60_000),
+ .setStatus(List.of()),
result.response().data()
);
@@ -20088,8 +20055,7 @@ public class GroupMetadataManagerTest {
.setActiveTasks(List.of())
.setStandbyTasks(List.of())
.setWarmupTasks(List.of())
- .setStatus(List.of())
- .setTaskOffsetIntervalMs(60_000),
+ .setStatus(List.of()),
result.response().data()
);
@@ -20118,8 +20084,7 @@ public class GroupMetadataManagerTest {
.setPartitions(List.of(0, 1))))
.setStandbyTasks(List.of())
.setWarmupTasks(List.of())
- .setStatus(List.of())
- .setTaskOffsetIntervalMs(60_000),
+ .setStatus(List.of()),
result.response().data()
);
@@ -20151,8 +20116,7 @@ public class GroupMetadataManagerTest {
.setMemberEpoch(3)
.setHeartbeatIntervalMs(5000)
.setEndpointInformationEpoch(0)
- .setStatus(List.of())
- .setTaskOffsetIntervalMs(60_000),
+ .setStatus(List.of()),
result.response().data()
);
@@ -20217,8 +20181,7 @@ public class GroupMetadataManagerTest {
.setPartitions(List.of(0, 1, 2))))
.setStandbyTasks(List.of())
.setWarmupTasks(List.of())
- .setStatus(List.of())
- .setTaskOffsetIntervalMs(60_000),
+ .setStatus(List.of()),
result.response().data()
);
@@ -20257,8 +20220,7 @@ public class GroupMetadataManagerTest {
.setActiveTasks(List.of())
.setStandbyTasks(List.of())
.setWarmupTasks(List.of())
- .setStatus(List.of())
- .setTaskOffsetIntervalMs(60_000),
+ .setStatus(List.of()),
result.response().data()
);
@@ -20286,8 +20248,7 @@ public class GroupMetadataManagerTest {
.setPartitions(List.of(0, 1))))
.setStandbyTasks(List.of())
.setWarmupTasks(List.of())
- .setStatus(List.of())
- .setTaskOffsetIntervalMs(60_000),
+ .setStatus(List.of()),
result.response().data()
);
@@ -20482,8 +20443,7 @@ public class GroupMetadataManagerTest {
.setStandbyTasks(List.of())
.setWarmupTasks(List.of())
.setPartitionsByUserEndpoint(List.of(expectedEndpointToPartitions))
- .setStatus(List.of())
- .setTaskOffsetIntervalMs(60_000),
+ .setStatus(List.of()),
result.response().data()
);
@@ -20661,8 +20621,7 @@ public class GroupMetadataManagerTest {
.setPartitions(List.of(0, 1, 2))))
.setStandbyTasks(List.of())
.setWarmupTasks(List.of())
- .setStatus(List.of())
- .setTaskOffsetIntervalMs(60_000),
+ .setStatus(List.of()),
result.response().data()
);
@@ -21476,7 +21435,7 @@ public class GroupMetadataManagerTest {
.setWarmupTasks(List.of()));
assertEquals(2, result.response().data().memberEpoch());
- // Verify default heartbeat interval, session timeout,
num.standby.replicas, task.offset.interval before config update.
+ // Verify default heartbeat interval, session timeout, and
num.standby.replicas before config update.
assertEquals(GroupCoordinatorConfig.STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT,
result.response().data().heartbeatIntervalMs());
context.assertSessionTimeout(groupId, memberId,
@@ -21495,12 +21454,10 @@ public class GroupMetadataManagerTest {
// Dynamic update group config with out-of-range values.
// Session timeout 70000 exceeds max 60000; heartbeat interval 1 is
below min 5000;
// num standby replicas 100 exceeds max 2.
- // task offset interval 100 exceeds min 15000.
Properties newGroupConfig = new Properties();
newGroupConfig.put(STREAMS_SESSION_TIMEOUT_MS_CONFIG, 70000);
newGroupConfig.put(STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG, 1);
newGroupConfig.put(STREAMS_NUM_STANDBY_REPLICAS_CONFIG, 100);
- newGroupConfig.put(STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG, 100);
context.updateGroupConfig(groupId, newGroupConfig);
// Session timer is rescheduled on second heartbeat, new assignment
with evaluated parameter is calculated.
@@ -21519,8 +21476,7 @@ public class GroupMetadataManagerTest {
context.assertSessionTimeout(groupId, memberId,
GroupCoordinatorConfig.STREAMS_GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT);
- // Verify that the number of standby replicas is evaluated to max,
- // and task offset interval is evaluated to min
+ // Verify that the number of standby replicas is evaluated to max.
assertEquals(
Map.of(
"num.standby.replicas",
String.valueOf(GroupCoordinatorConfig.STREAMS_GROUP_MAX_STANDBY_REPLICAS_DEFAULT)
@@ -27416,7 +27372,7 @@ public class GroupMetadataManagerTest {
.setStandbyTasks(List.of())
.setWarmupTasks(List.of())
.setStatus(List.of())
- .setTaskOffsetIntervalMs(60_000),
+ .setTaskOffsetIntervalMs(0),
result1.response().data()
);
@@ -27481,7 +27437,7 @@ public class GroupMetadataManagerTest {
.setStatus(List.of(new
StreamsGroupHeartbeatResponseData.Status()
.setStatusCode(Status.ASSIGNMENT_DELAYED.code())
.setStatusDetail("Assignment delayed due to the configured
assignment interval.")))
- .setTaskOffsetIntervalMs(60_000),
+ .setTaskOffsetIntervalMs(0),
result2.response().data()
);
@@ -27519,7 +27475,7 @@ public class GroupMetadataManagerTest {
.setHeartbeatIntervalMs(5000)
.setEndpointInformationEpoch(0)
.setStatus(List.of())
- .setTaskOffsetIntervalMs(60_000),
+ .setTaskOffsetIntervalMs(0),
result3.response().data()
);
diff --git
a/tools/src/test/java/org/apache/kafka/tools/ConfigCommandIntegrationTest.java
b/tools/src/test/java/org/apache/kafka/tools/ConfigCommandIntegrationTest.java
index 9cb8ef7f3b1..a331081ba55 100644
---
a/tools/src/test/java/org/apache/kafka/tools/ConfigCommandIntegrationTest.java
+++
b/tools/src/test/java/org/apache/kafka/tools/ConfigCommandIntegrationTest.java
@@ -296,7 +296,6 @@ public class ConfigCommandIntegrationTest {
assertTrue(message.contains("streams.heartbeat.interval.ms=5000
sensitive=false
synonyms={DEFAULT_CONFIG:group.streams.heartbeat.interval.ms=5000}"));
assertTrue(message.contains("streams.num.standby.replicas=0
sensitive=false
synonyms={DEFAULT_CONFIG:group.streams.num.standby.replicas=0}"));
assertTrue(message.contains("streams.session.timeout.ms=45000
sensitive=false
synonyms={DEFAULT_CONFIG:group.streams.session.timeout.ms=45000}"));
- assertTrue(message.contains("streams.task.offset.interval.ms=60000
sensitive=false
synonyms={DEFAULT_CONFIG:group.streams.task.offset.interval.ms=60000}"));
}
@ClusterTest
@@ -402,41 +401,6 @@ public class ConfigCommandIntegrationTest {
assertTrue(message.contains("streams.num.standby.replicas must be less
than or equal to group.streams.max.standby.replicas"));
}
- @ClusterTest
- public void testAlterStreamsGroupTaskOffsetInterval() {
- // Verify the initial config
- Stream<String> command = Stream.concat(quorumArgs(), Stream.of(
- "--entity-type", "groups",
- "--entity-name", "group",
- "--describe", "--all"));
- String message = captureStandardOut(run(command));
- assertTrue(message.contains("streams.task.offset.interval.ms=60000"));
-
- // Alter task offset interval
- command = Stream.concat(quorumArgs(), Stream.of(
- "--entity-type", "groups",
- "--entity-name", "group",
- "--alter", "--add-config",
"streams.task.offset.interval.ms=45000"));
- message = captureStandardOut(run(command));
- assertEquals("Completed updating config for group group.", message);
-
- // Verify the updated config
- command = Stream.concat(quorumArgs(), Stream.of(
- "--entity-type", "groups",
- "--describe"));
- message = captureStandardOut(run(command));
- assertTrue(message.contains("streams.task.offset.interval.ms=45000"));
-
- // Should fail to set below min interval
- command = Stream.concat(quorumArgs(), Stream.of(
- "--entity-type", "groups",
- "--entity-name", "group",
- "--alter", "--add-config", "streams.task.offset.interval.ms=1"));
- message = captureStandardErr(run(command));
- assertTrue(message.contains("streams.task.offset.interval.ms must be
greater than or equal to group.streams.min.task.offset.interval.ms"));
-
- }
-
private void verifyGroupConfigUpdate(List<String> alterOpts) throws
Exception {
try (Admin client = cluster.admin()) {
// Add config