This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new b43d70885db KAFKA-18652: Add `task.offset.interval.ms` config (#21737)
b43d70885db is described below
commit b43d70885dbf293f0ce79db20372ce470c9f69cb
Author: Matthias J. Sax <[email protected]>
AuthorDate: Tue Mar 17 15:47:35 2026 -0700
KAFKA-18652: Add `task.offset.interval.ms` config (#21737)
KIP-1071 adds new broker side configs:
- task.offset.interval.ms
- min.task.offset.interval.ms
This PR adds both configs to the broker, and updates the
StreamsHeartbeatResponse to provide `task.offset.interval.ms` to the
Kafka Streams client.
Reviewers: Andrew Schofield <[email protected]>, Lucas Brutschy
<[email protected]>
---
.../StreamsGroupHeartbeatRequestManager.java | 1 +
.../consumer/internals/StreamsRebalanceData.java | 12 +
.../scala/unit/kafka/server/KafkaApisTest.scala | 3 +-
.../scala/unit/kafka/server/KafkaConfigTest.scala | 3 +
.../server/StreamsGroupHeartbeatRequestTest.scala | 25 +-
.../kafka/coordinator/group/GroupConfig.java | 54 +++-
.../coordinator/group/GroupCoordinatorConfig.java | 46 +++-
.../coordinator/group/GroupMetadataManager.java | 17 +-
.../coordinator/group/streams/StreamsGroup.java | 3 +-
.../kafka/coordinator/group/GroupConfigTest.java | 38 +++
.../group/GroupCoordinatorConfigTest.java | 254 ++++++++++++++++-
.../group/GroupMetadataManagerTest.java | 302 +++++++++++++++------
.../streams/test/MockProcessorContextAPITest.java | 1 +
.../kafka/tools/ConfigCommandIntegrationTest.java | 113 ++++++++
14 files changed, 778 insertions(+), 94 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
index 6907e113f65..d4f1b3dace8 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
@@ -531,6 +531,7 @@ public class StreamsGroupHeartbeatRequestManager implements
RequestManager {
heartbeatRequestState.onSuccessfulAttempt(currentTimeMs);
heartbeatState.setEndpointInformationEpoch(data.endpointInformationEpoch());
streamsRebalanceData.setHeartbeatIntervalMs(data.heartbeatIntervalMs());
+
streamsRebalanceData.setTaskOffsetIntervalMs(data.taskOffsetIntervalMs());
if (data.partitionsByUserEndpoint() != null) {
streamsRebalanceData.setPartitionsByHost(convertHostInfoMap(data));
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceData.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceData.java
index a08472b6752..383900355dd 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceData.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceData.java
@@ -345,6 +345,8 @@ public class StreamsRebalanceData {
private final AtomicInteger heartbeatIntervalMs = new AtomicInteger(-1);
+ private final AtomicInteger taskOffsetIntervalMs = new AtomicInteger(-1);
+
public StreamsRebalanceData(final UUID processId,
final Optional<HostInfo> endpoint,
final Optional<String> rackId,
@@ -427,4 +429,14 @@ public class StreamsRebalanceData {
return heartbeatIntervalMs.get();
}
+ /** Updated whenever a heartbeat response is received from the broker. */
+ public void setTaskOffsetIntervalMs(final int taskOffsetIntervalMs) {
+ this.taskOffsetIntervalMs.set(taskOffsetIntervalMs);
+ }
+
+ /** Returns the task offset interval in milliseconds, or -1 if not yet
set. */
+ public int taskOffsetIntervalMs() {
+ return taskOffsetIntervalMs.get();
+ }
+
}
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index e5bbc337ac9..069fe8645a7 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -76,7 +76,7 @@ import org.apache.kafka.common.resource.{PatternType,
Resource, ResourcePattern,
import org.apache.kafka.common.security.auth.{KafkaPrincipal,
KafkaPrincipalSerde, SecurityProtocol}
import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource
import org.apache.kafka.common.utils.{ImplicitLinkedHashCollection,
ProducerIdAndEpoch, SecurityUtils, Utils}
-import
org.apache.kafka.coordinator.group.GroupConfig.{CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG,
CONSUMER_SESSION_TIMEOUT_MS_CONFIG, SHARE_AUTO_OFFSET_RESET_CONFIG,
SHARE_DELIVERY_COUNT_LIMIT_CONFIG, SHARE_HEARTBEAT_INTERVAL_MS_CONFIG,
SHARE_ISOLATION_LEVEL_CONFIG, SHARE_PARTITION_MAX_RECORD_LOCKS_CONFIG,
SHARE_RECORD_LOCK_DURATION_MS_CONFIG, SHARE_RENEW_ACKNOWLEDGE_ENABLE_CONFIG,
SHARE_SESSION_TIMEOUT_MS_CONFIG, STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG,
STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFI [...]
+import
org.apache.kafka.coordinator.group.GroupConfig.{CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG,
CONSUMER_SESSION_TIMEOUT_MS_CONFIG, SHARE_AUTO_OFFSET_RESET_CONFIG,
SHARE_DELIVERY_COUNT_LIMIT_CONFIG, SHARE_HEARTBEAT_INTERVAL_MS_CONFIG,
SHARE_ISOLATION_LEVEL_CONFIG, SHARE_PARTITION_MAX_RECORD_LOCKS_CONFIG,
SHARE_RECORD_LOCK_DURATION_MS_CONFIG, SHARE_RENEW_ACKNOWLEDGE_ENABLE_CONFIG,
SHARE_SESSION_TIMEOUT_MS_CONFIG, STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG,
STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFI [...]
import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig
import org.apache.kafka.coordinator.group.{GroupConfig, GroupConfigManager,
GroupCoordinator, GroupCoordinatorConfig}
import org.apache.kafka.coordinator.group.streams.StreamsGroupHeartbeatResult
@@ -368,6 +368,7 @@ class KafkaApisTest extends Logging {
cgConfigs.put(STREAMS_SESSION_TIMEOUT_MS_CONFIG,
GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_DEFAULT.toString)
cgConfigs.put(STREAMS_NUM_STANDBY_REPLICAS_CONFIG,
GroupCoordinatorConfig.STREAMS_GROUP_NUM_STANDBY_REPLICAS_DEFAULT.toString)
cgConfigs.put(STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG,
GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_DEFAULT.toString)
+ cgConfigs.put(STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG,
GroupCoordinatorConfig.STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_DEFAULT.toString)
when(configRepository.groupConfig(consumerGroupId)).thenReturn(cgConfigs)
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index b7465daf65c..bb493c89811 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -1086,6 +1086,9 @@ class KafkaConfigTest {
case
GroupCoordinatorConfig.STREAMS_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG =>
assertPropertyInvalid(baseProperties, name, "not_a_number", -1)
case
GroupCoordinatorConfig.STREAMS_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG =>
assertPropertyInvalid(baseProperties, name, "not_a_number", -1)
case
GroupCoordinatorConfig.STREAMS_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_CONFIG =>
assertPropertyInvalid(baseProperties, name, "not_a_number", -1)
+ case
GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG =>
assertPropertyInvalid(baseProperties, name, "not_a_number", -1)
+ case
GroupCoordinatorConfig.STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_CONFIG =>
assertPropertyInvalid(baseProperties, name, "not_a_number", -1)
+ case
GroupCoordinatorConfig.STREAMS_GROUP_MIN_TASK_OFFSET_INTERVAL_MS_CONFIG =>
assertPropertyInvalid(baseProperties, name, "not_a_number", -1)
/** Share coordinator configs */
case ShareCoordinatorConfig.APPEND_LINGER_MS_CONFIG =>
assertPropertyInvalid(baseProperties, name, "not_a_number", -2, -0.5)
diff --git
a/core/src/test/scala/unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala
b/core/src/test/scala/unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala
index bb4f2d220d8..3f6b340f48c 100644
---
a/core/src/test/scala/unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala
+++
b/core/src/test/scala/unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala
@@ -35,7 +35,7 @@ import scala.jdk.CollectionConverters._
serverProperties = Array(
new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
- new ClusterConfigProperty(key =
"group.streams.initial.rebalance.delay.ms", value = "0")
+ new ClusterConfigProperty(key =
GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, value =
"0")
)
)
class StreamsGroupHeartbeatRequestTest(cluster: ClusterInstance) extends
GroupCoordinatorBaseRequestTest(cluster) {
@@ -686,6 +686,10 @@ class StreamsGroupHeartbeatRequestTest(cluster:
ClusterInstance) extends GroupCo
assertEquals(0, streamsGroupHeartbeatResponse1.standbyTasks().size(),
"Member 1 should have no standby tasks initially")
assertEquals(0, streamsGroupHeartbeatResponse2.standbyTasks().size(),
"Member 2 should have no standby tasks initially")
+ // Verify both members picked up `task.offset.interval.ms`
+ assertEquals(60_000,
streamsGroupHeartbeatResponse1.taskOffsetIntervalMs(), "Member 1 should pickup
task.offset.interval.ms initially")
+ assertEquals(60_000,
streamsGroupHeartbeatResponse2.taskOffsetIntervalMs(), "Member 2 should pickup
task.offset.interval.ms initially")
+
// Both members continue to send heartbeats with their assigned tasks
TestUtils.waitUntilTrue(() => {
streamsGroupHeartbeatResponse1 = streamsGroupHeartbeat(
@@ -736,6 +740,10 @@ class StreamsGroupHeartbeatRequestTest(cluster:
ClusterInstance) extends GroupCo
}
assertEquals(0, member2StandbyTasksSize, "Member 2 should have no
standby tasks in this configuration")
+ // Verify both members picked up `task.offset.interval.ms`
+ assertEquals(60_000,
streamsGroupHeartbeatResponse1.taskOffsetIntervalMs(), "Member 1 should pickup
task.offset.interval.ms initially")
+ assertEquals(60_000,
streamsGroupHeartbeatResponse2.taskOffsetIntervalMs(), "Member 2 should pickup
task.offset.interval.ms initially")
+
// Change streams.num.standby.replicas = 1
val groupConfigResource = new ConfigResource(ConfigResource.Type.GROUP,
groupId)
val alterConfigOp = new AlterConfigOp(
@@ -746,6 +754,16 @@ class StreamsGroupHeartbeatRequestTest(cluster:
ClusterInstance) extends GroupCo
val options = new org.apache.kafka.clients.admin.AlterConfigsOptions()
admin.incrementalAlterConfigs(configChanges, options).all().get()
+ // Change streams.task.offset.interval.ms = 45000
+ val groupConfigResource2 = new ConfigResource(ConfigResource.Type.GROUP,
groupId)
+ val alterConfigOp2 = new AlterConfigOp(
+ new ConfigEntry("streams.task.offset.interval.ms", "45000"),
+ AlterConfigOp.OpType.SET
+ )
+ val configChanges2 = Map(groupConfigResource2 ->
List(alterConfigOp2).asJavaCollection).asJava
+ val options2 = new org.apache.kafka.clients.admin.AlterConfigsOptions()
+ admin.incrementalAlterConfigs(configChanges2, options2).all().get()
+
// Send heartbeats to trigger rebalance after config change
TestUtils.waitUntilTrue(() => {
streamsGroupHeartbeatResponse1 = streamsGroupHeartbeat(
@@ -790,6 +808,10 @@ class StreamsGroupHeartbeatRequestTest(cluster:
ClusterInstance) extends GroupCo
val totalStandbyTasks = member1StandbyTasksNum + member2StandbyTasksNum
assertEquals(totalActiveTasks, totalStandbyTasks, "Each active task
should have one standby task")
+ // Verify both members picked up change of `task.offset.interval.ms`
+ assertEquals(45_000,
streamsGroupHeartbeatResponse1.taskOffsetIntervalMs(), "Member 1 should pickup
task.offset.interval.ms initially")
+ assertEquals(45_000,
streamsGroupHeartbeatResponse2.taskOffsetIntervalMs(), "Member 2 should pickup
task.offset.interval.ms initially")
+
} finally {
admin.close()
}
@@ -1074,6 +1096,7 @@ class StreamsGroupHeartbeatRequestTest(cluster:
ClusterInstance) extends GroupCo
.setActiveTasks(expectedActiveTasks)
.setStandbyTasks(List.empty.asJava)
.setWarmupTasks(List.empty.asJava)
+ .setTaskOffsetIntervalMs(60_000)
assertEquals(expectedRejoinResponse, rejoinResponse)
} finally {
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
index db2cc128284..3d6ca00af45 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
@@ -92,6 +92,8 @@ public final class GroupConfig extends AbstractConfig {
public static final String STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG =
"streams.initial.rebalance.delay.ms";
+ public static final String STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG =
"streams.task.offset.interval.ms";
+
public final int consumerSessionTimeoutMs;
public final int consumerHeartbeatIntervalMs;
@@ -116,6 +118,8 @@ public final class GroupConfig extends AbstractConfig {
public final int streamsInitialRebalanceDelayMs;
+ public final int streamsTaskOffsetIntervalMs;
+
public final String shareIsolationLevel;
public final boolean shareRenewAcknowledgeEnable;
@@ -203,7 +207,13 @@ public final class GroupConfig extends AbstractConfig {
GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_DEFAULT,
atLeast(0),
MEDIUM,
-
GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_DOC);
+
GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_DOC)
+ .define(STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG,
+ INT,
+
GroupCoordinatorConfig.STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_DEFAULT,
+ atLeast(1),
+ MEDIUM,
+ GroupCoordinatorConfig.STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_DOC);
public GroupConfig(Map<?, ?> props) {
super(CONFIG, props, false);
@@ -219,6 +229,7 @@ public final class GroupConfig extends AbstractConfig {
this.streamsHeartbeatIntervalMs =
getInt(STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG);
this.streamsNumStandbyReplicas =
getInt(STREAMS_NUM_STANDBY_REPLICAS_CONFIG);
this.streamsInitialRebalanceDelayMs =
getInt(STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG);
+ this.streamsTaskOffsetIntervalMs =
getInt(STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG);
this.shareIsolationLevel = getString(SHARE_ISOLATION_LEVEL_CONFIG);
this.shareRenewAcknowledgeEnable =
getBoolean(SHARE_RENEW_ACKNOWLEDGE_ENABLE_CONFIG);
}
@@ -262,6 +273,7 @@ public final class GroupConfig extends AbstractConfig {
int streamsSessionTimeoutMs = (Integer)
valueMaps.get(STREAMS_SESSION_TIMEOUT_MS_CONFIG);
int streamsHeartbeatIntervalMs = (Integer)
valueMaps.get(STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG);
int streamsNumStandbyReplicas = (Integer)
valueMaps.get(STREAMS_NUM_STANDBY_REPLICAS_CONFIG);
+ int streamsTaskOffsetIntervalMs = (Integer)
valueMaps.get(STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG);
if (consumerHeartbeatInterval <
groupCoordinatorConfig.consumerGroupMinHeartbeatIntervalMs()) {
throw new
InvalidConfigurationException(CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG + " must be
greater than or equal to " +
GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG);
@@ -338,6 +350,10 @@ public final class GroupConfig extends AbstractConfig {
throw new
InvalidConfigurationException(STREAMS_NUM_STANDBY_REPLICAS_CONFIG + " must be
less than or equal to " +
GroupCoordinatorConfig.STREAMS_GROUP_MAX_STANDBY_REPLICAS_CONFIG);
}
+ if (streamsTaskOffsetIntervalMs <
groupCoordinatorConfig.streamsGroupMinTaskOffsetIntervalMs()) {
+ throw new
InvalidConfigurationException(STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG + " must
be greater than or equal to " +
+
GroupCoordinatorConfig.STREAMS_GROUP_MIN_TASK_OFFSET_INTERVAL_MS_CONFIG);
+ }
if (consumerSessionTimeout <= consumerHeartbeatInterval) {
throw new
InvalidConfigurationException(CONSUMER_SESSION_TIMEOUT_MS_CONFIG + " must be
greater than " +
CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG);
@@ -429,6 +445,8 @@ public final class GroupConfig extends AbstractConfig {
groupCoordinatorConfig.streamsGroupMaxHeartbeatIntervalMs());
clampToMax(props, groupId, STREAMS_NUM_STANDBY_REPLICAS_CONFIG,
groupCoordinatorConfig.streamsGroupMaxNumStandbyReplicas());
+ clampToMin(props, groupId, STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG,
+ groupCoordinatorConfig.streamsGroupMinTaskOffsetIntervalMs());
// Verify that clamping did not break the session > heartbeat
invariant.
checkSessionExceedsHeartbeat(props, groupId,
@@ -529,6 +547,33 @@ public final class GroupConfig extends AbstractConfig {
}
}
+ /**
+ * Clamp a config value to at least min. A WARN log is emitted on
adjustment.
+ * No-op when the key is absent from props.
+ *
+ * @param props The properties to modify in place.
+ * @param groupId The group id.
+ * @param key The config key.
+ * @param min The minimum allowed value (inclusive).
+ */
+ private static void clampToMin(
+ Properties props,
+ String groupId,
+ String key,
+ int min
+ ) {
+ Object rawValue = props.get(key);
+ if (rawValue == null) return;
+
+ int value = Integer.parseInt(rawValue.toString());
+ if (value < min) {
+ log.warn("The group config '{}' for group '{}' has value {} which
is below the broker's " +
+ "allowed minimum {}. The effective value will be capped to
{}.",
+ key, groupId, value, min, min);
+ props.put(key, min);
+ }
+ }
+
/**
* Create a group config instance using the given properties and defaults.
*/
@@ -637,6 +682,13 @@ public final class GroupConfig extends AbstractConfig {
return streamsInitialRebalanceDelayMs;
}
+ /**
+ * The task offset reporting interval.
+ */
+ public int streamsTaskOffsetIntervalMs() {
+ return streamsTaskOffsetIntervalMs;
+ }
+
/**
* The share group isolation level.
*/
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
index ff80a5c7ee6..1da3ca21412 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
@@ -375,6 +375,14 @@ public class GroupCoordinatorConfig {
public static final String STREAMS_GROUP_ASSIGNOR_OFFLOAD_ENABLE_DOC =
"Whether to offload streams group assignment to a group coordinator background
thread.";
public static final boolean STREAMS_GROUP_ASSIGNOR_OFFLOAD_ENABLE_DEFAULT
= true;
+ public static final String STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_CONFIG =
"group.streams.task.offset.interval.ms";
+ public static final int STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_DEFAULT =
60000;
+ public static final String STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_DOC =
"The interval in which the warmup task changelog offsets on a client are
updated on the broker. The offsets are sent with the next heartbeat after this
time has passed.";
+
+ public static final String
STREAMS_GROUP_MIN_TASK_OFFSET_INTERVAL_MS_CONFIG =
"group.streams.min.task.offset.interval.ms";
+ public static final int STREAMS_GROUP_MIN_TASK_OFFSET_INTERVAL_MS_DEFAULT
= 15000;
+ public static final String STREAMS_GROUP_MIN_TASK_OFFSET_INTERVAL_MS_DOC =
"The minimum allowed value for the group-level configuration of " +
GroupConfig.STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG;
+
public static final Set<String> RECONFIGURABLE_CONFIGS = Set.of(
CACHED_BUFFER_MAX_BYTES_CONFIG,
CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG,
@@ -459,7 +467,9 @@ public class GroupCoordinatorConfig {
.define(STREAMS_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, INT,
STREAMS_GROUP_ASSIGNMENT_INTERVAL_MS_DEFAULT, atLeast(0), MEDIUM,
STREAMS_GROUP_ASSIGNMENT_INTERVAL_MS_DOC)
.define(STREAMS_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG, INT,
STREAMS_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_DEFAULT, atLeast(0), MEDIUM,
STREAMS_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_DOC)
.define(STREAMS_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_CONFIG, INT,
STREAMS_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_DEFAULT, atLeast(0), MEDIUM,
STREAMS_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_DOC)
- .define(STREAMS_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG, BOOLEAN,
STREAMS_GROUP_ASSIGNOR_OFFLOAD_ENABLE_DEFAULT, MEDIUM,
STREAMS_GROUP_ASSIGNOR_OFFLOAD_ENABLE_DOC);
+ .define(STREAMS_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG, BOOLEAN,
STREAMS_GROUP_ASSIGNOR_OFFLOAD_ENABLE_DEFAULT, MEDIUM,
STREAMS_GROUP_ASSIGNOR_OFFLOAD_ENABLE_DOC)
+ .define(STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_CONFIG, INT,
STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM,
STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_DOC)
+ .define(STREAMS_GROUP_MIN_TASK_OFFSET_INTERVAL_MS_CONFIG, INT,
STREAMS_GROUP_MIN_TASK_OFFSET_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM,
STREAMS_GROUP_MIN_TASK_OFFSET_INTERVAL_MS_DOC);
/**
@@ -520,6 +530,8 @@ public class GroupCoordinatorConfig {
private final int streamsGroupInitialRebalanceDelayMs;
private final int streamsGroupMinAssignmentIntervalMs;
private final int streamsGroupMaxAssignmentIntervalMs;
+ private final int streamsGroupTaskOffsetIntervalMs;
+ private final int streamsGroupMinTaskOffsetIntervalMs;
private final AbstractConfig config;
@@ -582,6 +594,8 @@ public class GroupCoordinatorConfig {
this.streamsGroupInitialRebalanceDelayMs =
config.getInt(GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG);
this.streamsGroupMinAssignmentIntervalMs =
config.getInt(GroupCoordinatorConfig.STREAMS_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG);
this.streamsGroupMaxAssignmentIntervalMs =
config.getInt(GroupCoordinatorConfig.STREAMS_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_CONFIG);
+ this.streamsGroupTaskOffsetIntervalMs =
config.getInt(GroupCoordinatorConfig.STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_CONFIG);
+ this.streamsGroupMinTaskOffsetIntervalMs =
config.getInt(GroupCoordinatorConfig.STREAMS_GROUP_MIN_TASK_OFFSET_INTERVAL_MS_CONFIG);
this.config = config;
// New group coordinator configs validation.
@@ -608,6 +622,7 @@ public class GroupCoordinatorConfig {
require(consumerGroupAssignmentIntervalMs() <=
consumerGroupMaxAssignmentIntervalMs,
String.format("%s must be less than or equal to %s",
CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG,
CONSUMER_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_CONFIG));
+
// Share group configs validation.
require(shareGroupMaxHeartbeatIntervalMs >=
shareGroupMinHeartbeatIntervalMs,
String.format("%s must be greater than or equal to %s",
@@ -645,6 +660,7 @@ public class GroupCoordinatorConfig {
String.format("%s must be less than or equal to %s",
SHARE_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG,
SHARE_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_CONFIG));
+
// Streams group configs validation.
require(streamsGroupMaxHeartbeatIntervalMs >=
streamsGroupMinHeartbeatIntervalMs,
String.format("%s must be greater than or equal to %s",
@@ -655,14 +671,14 @@ public class GroupCoordinatorConfig {
require(streamsGroupHeartbeatIntervalMs <=
streamsGroupMaxHeartbeatIntervalMs,
String.format("%s must be less than or equal to %s",
STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG,
STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG));
+
require(streamsGroupMaxSessionTimeoutMs >=
streamsGroupMinSessionTimeoutMs,
String.format("%s must be greater than or equal to %s",
STREAMS_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG,
STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG));
require(streamsGroupSessionTimeoutMs >=
streamsGroupMinSessionTimeoutMs,
String.format("%s must be greater than or equal to %s",
STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG,
STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG));
require(streamsGroupSessionTimeoutMs <=
streamsGroupMaxSessionTimeoutMs,
String.format("%s must be less than or equal to %s",
STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG,
STREAMS_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG));
- require(streamsGroupNumStandbyReplicas <=
streamsGroupMaxStandbyReplicas,
- String.format("%s must be less than or equal to %s",
STREAMS_GROUP_NUM_STANDBY_REPLICAS_CONFIG,
STREAMS_GROUP_MAX_STANDBY_REPLICAS_CONFIG));
+
require(streamsGroupHeartbeatIntervalMs < streamsGroupSessionTimeoutMs,
String.format("%s must be less than %s",
STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG,
STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG));
@@ -676,6 +692,13 @@ public class GroupCoordinatorConfig {
require(streamsGroupAssignmentIntervalMs() <=
streamsGroupMaxAssignmentIntervalMs,
String.format("%s must be less than or equal to %s",
STREAMS_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG,
STREAMS_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_CONFIG));
+
+ require(streamsGroupNumStandbyReplicas <=
streamsGroupMaxStandbyReplicas,
+ String.format("%s must be less than or equal to %s",
STREAMS_GROUP_NUM_STANDBY_REPLICAS_CONFIG,
STREAMS_GROUP_MAX_STANDBY_REPLICAS_CONFIG));
+
+ require(streamsGroupTaskOffsetIntervalMs >=
streamsGroupMinTaskOffsetIntervalMs,
+ String.format("%s must be greater than or equal to %s",
STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_CONFIG,
STREAMS_GROUP_MIN_TASK_OFFSET_INTERVAL_MS_CONFIG));
+
}
/**
@@ -874,7 +897,8 @@ public class GroupCoordinatorConfig {
GroupConfig.STREAMS_SESSION_TIMEOUT_MS_CONFIG,
streamsGroupSessionTimeoutMs(),
GroupConfig.STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG,
streamsGroupHeartbeatIntervalMs(),
GroupConfig.STREAMS_NUM_STANDBY_REPLICAS_CONFIG,
streamsGroupNumStandbyReplicas(),
- GroupConfig.STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG,
streamsGroupInitialRebalanceDelayMs());
+ GroupConfig.STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG,
streamsGroupInitialRebalanceDelayMs(),
+ GroupConfig.STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG,
streamsGroupTaskOffsetIntervalMs());
}
/**
@@ -1311,4 +1335,18 @@ public class GroupCoordinatorConfig {
public boolean streamsGroupAssignorOffloadEnable() {
return
config.getBoolean(GroupCoordinatorConfig.STREAMS_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG);
}
+
+ /**
+ * The task offset interval for streams groups.
+ */
+ public int streamsGroupTaskOffsetIntervalMs() {
+ return streamsGroupTaskOffsetIntervalMs;
+ }
+
+ /**
+ * The minimum task offset interval for streams groups.
+ */
+ public int streamsGroupMinTaskOffsetIntervalMs() {
+ return streamsGroupMinTaskOffsetIntervalMs;
+ }
}
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index 4141e441aee..364890baae9 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -189,6 +189,7 @@ import java.util.Optional;
import java.util.OptionalInt;
import java.util.Set;
import java.util.SortedMap;
+import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
@@ -2139,7 +2140,8 @@ public class GroupMetadataManager {
StreamsGroupHeartbeatResponseData response = new
StreamsGroupHeartbeatResponseData()
.setMemberId(updatedMember.memberId())
.setMemberEpoch(updatedMember.memberEpoch())
- .setHeartbeatIntervalMs(streamsGroupHeartbeatIntervalMs(groupId));
+ .setHeartbeatIntervalMs(streamsGroupHeartbeatIntervalMs(groupId))
+
.setTaskOffsetIntervalMs(streamsGroupTaskOffsetIntervalMs(groupId));
// The assignment is only provided in the following cases:
// 1. The member is joining.
// 2. The member's assignment has been updated.
@@ -8781,6 +8783,15 @@ public class GroupMetadataManager {
return config.streamsGroupAssignorOffloadEnable();
}
+ /**
+ * Get the task offset interval of the provided streams group.
+ */
+ private int streamsGroupTaskOffsetIntervalMs(String groupId) {
+ Optional<GroupConfig> groupConfig =
groupConfigManager.groupConfig(groupId);
+ return groupConfig.map(GroupConfig::streamsTaskOffsetIntervalMs)
+ .orElse(config.streamsGroupTaskOffsetIntervalMs());
+ }
+
/**
* Get the initial rebalance delay of the provided streams group.
*/
@@ -8804,7 +8815,9 @@ public class GroupMetadataManager {
Optional<GroupConfig> groupConfig =
groupConfigManager.groupConfig(groupId);
final Integer numStandbyReplicas =
groupConfig.map(GroupConfig::streamsNumStandbyReplicas)
.orElse(config.streamsGroupNumStandbyReplicas());
- return Map.of("num.standby.replicas", numStandbyReplicas.toString());
+ return new TreeMap<>(Map.of(
+ "num.standby.replicas", numStandbyReplicas.toString()
+ ));
}
/**
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
index 5b6610fe4a0..b1a5b026654 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
@@ -56,6 +56,7 @@ import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
+import java.util.TreeMap;
import static
org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState.ASSIGNING;
import static
org.apache.kafka.coordinator.group.streams.StreamsGroup.StreamsGroupState.DEAD;
@@ -1231,7 +1232,7 @@ public class StreamsGroup implements Group {
* @return The assignment configurations for this streams group.
*/
public Map<String, String> lastAssignmentConfigs() {
- return Collections.unmodifiableMap(lastAssignmentConfigs);
+ return Collections.unmodifiableMap(new
TreeMap<>(lastAssignmentConfigs));
}
/**
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
index 9b28b591574..8c0772abb9f 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
@@ -45,6 +45,7 @@ import static
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.STREAMS_
import static
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.STREAMS_GROUP_MAX_STANDBY_REPLICAS_DEFAULT;
import static
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.STREAMS_GROUP_MIN_HEARTBEAT_INTERVAL_MS_DEFAULT;
import static
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_DEFAULT;
+import static
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.STREAMS_GROUP_MIN_TASK_OFFSET_INTERVAL_MS_DEFAULT;
import static
org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig.SHARE_GROUP_MAX_DELIVERY_COUNT_LIMIT_DEFAULT;
import static
org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig.SHARE_GROUP_MAX_PARTITION_MAX_RECORD_LOCKS_DEFAULT;
import static
org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig.SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_DEFAULT;
@@ -103,6 +104,8 @@ public class GroupConfigTest {
assertPropertyInvalid(name, "not_a_number", "1.0");
} else if
(GroupConfig.STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG.equals(name)) {
assertPropertyInvalid(name, "not_a_number", "-1", "1.0");
+ } else if
(GroupConfig.STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG.equals(name)) {
+ assertPropertyInvalid(name, "not_a_number", "1.0");
} else {
assertPropertyInvalid(name, "not_a_number", "-0.1");
}
@@ -261,6 +264,11 @@ public class GroupConfigTest {
doTestInvalidProps(props, InvalidConfigurationException.class);
props = createValidGroupConfig();
+ // Check for invalid streamsTaskOffsetIntervalMs, < MIN
+ props.put(GroupConfig.STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG, "1000");
+ doTestInvalidProps(props, InvalidConfigurationException.class);
+ props = createValidGroupConfig();
+
// Check for invalid shareIsolationLevel.
props.put(GroupConfig.SHARE_ISOLATION_LEVEL_CONFIG, "read_commit");
doTestInvalidProps(props, ConfigException.class);
@@ -300,6 +308,7 @@ public class GroupConfigTest {
defaultValue.put(GroupConfig.STREAMS_SESSION_TIMEOUT_MS_CONFIG,
"2000");
defaultValue.put(GroupConfig.STREAMS_NUM_STANDBY_REPLICAS_CONFIG, "1");
defaultValue.put(GroupConfig.STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG, "3000");
+ defaultValue.put(GroupConfig.STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG,
"30000");
defaultValue.put(GroupConfig.SHARE_RENEW_ACKNOWLEDGE_ENABLE_CONFIG,
"true");
Properties props = new Properties();
@@ -319,6 +328,7 @@ public class GroupConfigTest {
assertEquals(2000,
config.getInt(GroupConfig.STREAMS_SESSION_TIMEOUT_MS_CONFIG));
assertEquals(1,
config.getInt(GroupConfig.STREAMS_NUM_STANDBY_REPLICAS_CONFIG));
assertEquals(3000,
config.getInt(GroupConfig.STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG));
+ assertEquals(30000,
config.getInt(GroupConfig.STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG));
assertEquals(true,
config.getBoolean(GroupConfig.SHARE_RENEW_ACKNOWLEDGE_ENABLE_CONFIG));
}
@@ -441,6 +451,19 @@ public class GroupConfigTest {
);
}
+ /**
+ * Data source for configs with min-only evaluation (no max bound enforced
by evaluate).
+ * Each entry: (configKey, tooLow, expectedMax).
+ */
+ private static Stream<Arguments> minBoundedConfigs() {
+ return Stream.of(
+ Arguments.of(
+ GroupConfig.STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG,
+ 1000, STREAMS_GROUP_MIN_TASK_OFFSET_INTERVAL_MS_DEFAULT
+ )
+ );
+ }
+
@ParameterizedTest(name = "testEvaluateValueAboveMaxIsCapped[{0}]")
@MethodSource("rangeBoundedConfigs")
public void testEvaluateValueAboveMaxIsCapped(
@@ -487,6 +510,20 @@ public class GroupConfigTest {
assertEquals(expectedMax, result.get(key));
}
+ @ParameterizedTest(name =
"testEvaluateMinBoundedValueBelowMinIsCapped[{0}]")
+ @MethodSource("minBoundedConfigs")
+ public void testEvaluateMinBoundedValueBelowMinIsCapped(
+ String key,
+ int tooLow,
+ int expectedMin
+ ) {
+ Properties props = new Properties();
+ props.put(key, tooLow);
+ Properties result = GroupConfig.evaluate(props, "test-group",
+ GroupCoordinatorConfig.fromProps(new HashMap<>()),
ShareGroupConfig.fromProps(new HashMap<>()));
+ assertEquals(expectedMin, result.get(key));
+ }
+
private Map<String, String> createValidGroupConfig() {
Map<String, String> props = new HashMap<>();
props.put(GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG, "45000");
@@ -502,6 +539,7 @@ public class GroupConfigTest {
props.put(GroupConfig.STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG, "6000");
props.put(GroupConfig.STREAMS_NUM_STANDBY_REPLICAS_CONFIG, "1");
props.put(GroupConfig.STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG,
"3000");
+ props.put(GroupConfig.STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG, "45000");
props.put(GroupConfig.SHARE_RENEW_ACKNOWLEDGE_ENABLE_CONFIG, "true");
return props;
}
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
index 7319fcebdf4..418b57706fc 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
@@ -261,7 +261,7 @@ public class GroupCoordinatorConfigTest {
}
@Test
- public void testInvalidConfigs() {
+ public void testInvalidConsumerConfigs() {
Map<String, Object> configs = new HashMap<>();
configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG,
10);
configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG,
20);
@@ -346,19 +346,229 @@ public class GroupCoordinatorConfigTest {
configs.put(GroupCoordinatorConfig.OFFSET_COMMIT_TIMEOUT_MS_CONFIG,
5000);
configs.put(GroupCoordinatorConfig.SHARE_GROUP_INITIALIZE_RETRY_INTERVAL_MS_CONFIG,
1000);
assertEquals(5000,
createConfig(configs).shareGroupInitializeRetryIntervalMs());
+ }
+
+ @Test
+ public void testInvalidStreamsConfigs() {
+ testStreamsSessionTimeoutMs();
+ testStreamsHeartbeatIntervalMs();
+ testStreamsOtherConfigs();
+ }
+
+ private void testStreamsSessionTimeoutMs() {
+ Map<String, Object> configs = new HashMap<>();
+
+ // group.streams.session.timeout.ms
+
+ // must be positive
+
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG, 0);
+ assertEquals("Invalid value 0 for configuration
group.streams.session.timeout.ms: Value must be at least 1",
+ assertThrows(ConfigException.class, () ->
createConfig(configs)).getMessage());
+
+ // cannot be smaller than MIN
+ configs.clear();
+
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG,
GroupCoordinatorConfig.STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_DEFAULT - 1);
+ assertEquals("group.streams.session.timeout.ms must be greater than or
equal to group.streams.min.session.timeout.ms",
+ assertThrows(IllegalArgumentException.class, () ->
createConfig(configs)).getMessage());
+
+ // can be MIN
+ configs.clear();
+
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG,
GroupCoordinatorConfig.STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_DEFAULT);
+ createConfig(configs);
+
+ // can be MAX
+ configs.clear();
+
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG,
GroupCoordinatorConfig.STREAMS_GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT);
+ createConfig(configs);
+
+ // cannot be larger than MAX
+
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG,
GroupCoordinatorConfig.STREAMS_GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT + 1);
+ assertEquals("group.streams.session.timeout.ms must be less than or
equal to group.streams.max.session.timeout.ms",
+ assertThrows(IllegalArgumentException.class, () ->
createConfig(configs)).getMessage());
+
+
+ // group.streams.min.session.timeout.ms
+
+ // must be positive
+ configs.clear();
+
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG,
0);
+ assertEquals("Invalid value 0 for configuration
group.streams.min.session.timeout.ms: Value must be at least 1",
+ assertThrows(ConfigException.class, () ->
createConfig(configs)).getMessage());
+
+ // can be MAX (implies `MAX can be MIN`)
+ configs.clear();
+
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG,
GroupCoordinatorConfig.STREAMS_GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT); //
required
+
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG,
GroupCoordinatorConfig.STREAMS_GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT); // when
+ createConfig(configs);
+
+ // cannot be larger than MAX (implies `MAX cannot be smaller than MIN`)
+ configs.clear();
+
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG,
GroupCoordinatorConfig.STREAMS_GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT + 1); //
required
+
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG,
GroupCoordinatorConfig.STREAMS_GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT + 1); //
when
+ assertEquals("group.streams.max.session.timeout.ms must be greater
than or equal to group.streams.min.session.timeout.ms",
+ assertThrows(IllegalArgumentException.class, () ->
createConfig(configs)).getMessage());
+
+ // other case for `streams.group.min.session.timeout.ms` are covered
in section `session.timeout.ms` above
+
+
+ // group.streams.max.session.timeout.ms
+ // must be positive
configs.clear();
-
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG,
45000);
-
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG,
60000);
-
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG,
50000);
-
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG,
50000);
+
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG,
0);
+ assertEquals("Invalid value 0 for configuration
group.streams.max.session.timeout.ms: Value must be at least 1",
+ assertThrows(ConfigException.class, () ->
createConfig(configs)).getMessage());
+
+ // other case for `streams.group.max.session.timeout.ms` are covered
in sections `session.timeout.ms` and `streams.group.min.session.timeout.ms`
above
+ }
+
+ private void testStreamsHeartbeatIntervalMs() {
+ Map<String, Object> configs = new HashMap<>();
+
+ // group.streams.heartbeat.interval.ms
+
+ // must be positive
+
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG,
0);
+ assertEquals("Invalid value 0 for configuration
group.streams.heartbeat.interval.ms: Value must be at least 1",
+ assertThrows(ConfigException.class, () ->
createConfig(configs)).getMessage());
+
+ // cannot be smaller than MIN
+ configs.clear();
+
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG,
GroupCoordinatorConfig.STREAMS_GROUP_MIN_HEARTBEAT_INTERVAL_MS_DEFAULT - 1);
+ assertEquals("group.streams.heartbeat.interval.ms must be greater than
or equal to group.streams.min.heartbeat.interval.ms",
+ assertThrows(IllegalArgumentException.class, () ->
createConfig(configs)).getMessage());
+
+ // can be MIN
+ configs.clear();
+
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG,
GroupCoordinatorConfig.STREAMS_GROUP_MIN_HEARTBEAT_INTERVAL_MS_DEFAULT);
+ createConfig(configs);
+
+ // can be MAX
+ configs.clear();
+
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG,
GroupCoordinatorConfig.STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DEFAULT);
+ createConfig(configs);
+
+ // cannot be larger than MAX
+ configs.clear();
+
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG,
GroupCoordinatorConfig.STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DEFAULT + 1);
+ assertEquals("group.streams.heartbeat.interval.ms must be less than or
equal to group.streams.max.heartbeat.interval.ms",
+ assertThrows(IllegalArgumentException.class, () ->
createConfig(configs)).getMessage());
+
+ // can be smaller than session timeout
+ configs.clear();
+
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG,
GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_DEFAULT - 1); //
required
+
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG,
GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_DEFAULT - 1);
+ createConfig(configs);
+
+ // cannot be same than session timeout
+ configs.clear();
+
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG,
GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_DEFAULT); // required
+
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG,
GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_DEFAULT);
assertEquals("group.streams.heartbeat.interval.ms must be less than
group.streams.session.timeout.ms",
assertThrows(IllegalArgumentException.class, () ->
createConfig(configs)).getMessage());
+
+ // group.streams.min.heartbeat.interval.ms
+
+ // must be positive
+ configs.clear();
+
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG,
0);
+ assertEquals("Invalid value 0 for configuration
group.streams.min.heartbeat.interval.ms: Value must be at least 1",
+ assertThrows(ConfigException.class, () ->
createConfig(configs)).getMessage());
+
+ // can be MAX (implies `MAX can be MIN`)
+ configs.clear();
+
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG,
GroupCoordinatorConfig.STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DEFAULT); //
required
+
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG,
GroupCoordinatorConfig.STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DEFAULT); //
when
+ createConfig(configs);
+
+ // cannot be larger than MAX (implies `MAX cannot be smaller than MIN`)
+ configs.clear();
+
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG,
GroupCoordinatorConfig.STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DEFAULT + 1); //
required
+
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG,
GroupCoordinatorConfig.STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DEFAULT + 1);
// when
+ assertEquals("group.streams.max.heartbeat.interval.ms must be greater
than or equal to group.streams.min.heartbeat.interval.ms",
+ assertThrows(IllegalArgumentException.class, () ->
createConfig(configs)).getMessage());
+
+ // other case for `streams.group.min.heartbeat.interval.ms` covered in
`session.timeout.ms` section
+
+
+ // group.streams.max.heartbeat.interval.ms
+
+ // must be positive
+ configs.clear();
+
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG,
0);
+ assertEquals("Invalid value 0 for configuration
group.streams.max.heartbeat.interval.ms: Value must be at least 1",
+ assertThrows(ConfigException.class, () ->
createConfig(configs)).getMessage());
+
+ // other case for `streams.group.max.heartbeat.interval.ms` covered in
`session.timeout.ms` and `streams.group.mix.heartbeat.interval.ms` section
+ }
+
+ private void testStreamsOtherConfigs() {
+ Map<String, Object> configs = new HashMap<>();
+
+ // group.streams.max.size
+
+ // must be positive
+ configs.put(GroupCoordinatorConfig.STREAMS_GROUP_MAX_SIZE_CONFIG, 0);
+ assertEquals("Invalid value 0 for configuration
group.streams.max.size: Value must be at least 1",
+ assertThrows(ConfigException.class, () ->
createConfig(configs)).getMessage());
+
+
+ // group.streams.num.standby.replicas
+
+ // cannot be negative
+ configs.clear();
+
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_NUM_STANDBY_REPLICAS_CONFIG,
-1);
+ assertEquals("Invalid value -1 for configuration
group.streams.num.standby.replicas: Value must be at least 0",
+ assertThrows(ConfigException.class, () ->
createConfig(configs)).getMessage());
+
+ // can be MAX
+ configs.clear();
+
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_NUM_STANDBY_REPLICAS_CONFIG,
GroupCoordinatorConfig.STREAMS_GROUP_MAX_STANDBY_REPLICAS_DEFAULT);
+ createConfig(configs);
+
+ // cannot be larger than MAX
+ configs.clear();
+
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_NUM_STANDBY_REPLICAS_CONFIG,
GroupCoordinatorConfig.STREAMS_GROUP_MAX_STANDBY_REPLICAS_DEFAULT + 1);
+ assertEquals("group.streams.num.standby.replicas must be less than or
equal to group.streams.max.standby.replicas",
+ assertThrows(IllegalArgumentException.class, () ->
createConfig(configs)).getMessage());
+
+
+ // group.streams.max.num.standby.replicas
+
+ // cannot be negative
+ configs.clear();
+
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_MAX_STANDBY_REPLICAS_CONFIG,
-1);
+ assertEquals("Invalid value -1 for configuration
group.streams.max.standby.replicas: Value must be at least 0",
+ assertThrows(ConfigException.class, () ->
createConfig(configs)).getMessage());
+
+
+ // group.streams.initial.rebalance.delay.ms
+
+ // cannot be negative
configs.clear();
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG,
-1);
assertEquals("Invalid value -1 for configuration
group.streams.initial.rebalance.delay.ms: Value must be at least 0",
assertThrows(ConfigException.class, () ->
createConfig(configs)).getMessage());
+
+ // group.streams.task.offset.interval.ms
+
+ // must be positive
+ configs.clear();
+
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_CONFIG,
0);
+ assertEquals("Invalid value 0 for configuration
group.streams.task.offset.interval.ms: Value must be at least 1",
+ assertThrows(ConfigException.class, () ->
createConfig(configs)).getMessage());
+
+ // cannot be smaller than MIN
+ configs.clear();
+
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_CONFIG,
GroupCoordinatorConfig.STREAMS_GROUP_MIN_TASK_OFFSET_INTERVAL_MS_DEFAULT - 1);
+ assertEquals("group.streams.task.offset.interval.ms must be greater
than or equal to group.streams.min.task.offset.interval.ms",
+ assertThrows(IllegalArgumentException.class, () ->
createConfig(configs)).getMessage());
+
+ // can be MIN
+ configs.clear();
+
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_CONFIG,
GroupCoordinatorConfig.STREAMS_GROUP_MIN_TASK_OFFSET_INTERVAL_MS_DEFAULT);
+ createConfig(configs);
}
@Test
@@ -464,6 +674,40 @@ public class GroupCoordinatorConfigTest {
assertEquals(7000, config.streamsGroupInitialRebalanceDelayMs());
}
+ @Test
+ public void testStreamsGroupTaskOffsetIntervalDefaultValue() {
+ Map<String, Object> configs = new HashMap<>();
+ GroupCoordinatorConfig config = createConfig(configs);
+ assertEquals(60000, config.streamsGroupTaskOffsetIntervalMs());
+
assertEquals(GroupCoordinatorConfig.STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_DEFAULT,
+ config.streamsGroupTaskOffsetIntervalMs());
+ }
+
+ @Test
+ public void testStreamsGroupTaskOffsetIntervalCustomValue() {
+ Map<String, Object> configs = new HashMap<>();
+
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_CONFIG,
45000);
+ GroupCoordinatorConfig config = createConfig(configs);
+ assertEquals(45000, config.streamsGroupTaskOffsetIntervalMs());
+ }
+
+ @Test
+ public void testStreamsGroupMinTaskOffsetIntervalDefaultValue() {
+ Map<String, Object> configs = new HashMap<>();
+ GroupCoordinatorConfig config = createConfig(configs);
+ assertEquals(15000, config.streamsGroupMinTaskOffsetIntervalMs());
+
assertEquals(GroupCoordinatorConfig.STREAMS_GROUP_MIN_TASK_OFFSET_INTERVAL_MS_DEFAULT,
+ config.streamsGroupMinTaskOffsetIntervalMs());
+ }
+
+ @Test
+ public void testStreamsGroupMinTaskOffsetIntervalCustomValue() {
+ Map<String, Object> configs = new HashMap<>();
+
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_MIN_TASK_OFFSET_INTERVAL_MS_CONFIG,
20000);
+ GroupCoordinatorConfig config = createConfig(configs);
+ assertEquals(20000, config.streamsGroupMinTaskOffsetIntervalMs());
+ }
+
public static GroupCoordinatorConfig createConfig(Map<String, Object>
configs) {
return new GroupCoordinatorConfig(new AbstractConfig(
GroupCoordinatorConfig.CONFIG_DEF,
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index a7a67f4e415..fdb47f417f2 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -174,6 +174,7 @@ import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
+import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
@@ -200,6 +201,7 @@ import static
org.apache.kafka.coordinator.group.GroupConfig.SHARE_SESSION_TIMEO
import static
org.apache.kafka.coordinator.group.GroupConfig.STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG;
import static
org.apache.kafka.coordinator.group.GroupConfig.STREAMS_NUM_STANDBY_REPLICAS_CONFIG;
import static
org.apache.kafka.coordinator.group.GroupConfig.STREAMS_SESSION_TIMEOUT_MS_CONFIG;
+import static
org.apache.kafka.coordinator.group.GroupConfig.STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG;
import static
org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newShareGroupStatePartitionMetadataRecord;
import static
org.apache.kafka.coordinator.group.GroupMetadataManager.EMPTY_RESULT;
import static
org.apache.kafka.coordinator.group.GroupMetadataManager.appendGroupMetadataErrorToResponseError;
@@ -1201,7 +1203,8 @@ public class GroupMetadataManagerTest {
.setPartitions(List.of(0, 1, 2))))
.setStandbyTasks(List.of())
.setWarmupTasks(List.of())
- .setStatus(List.of()),
+ .setStatus(List.of())
+ .setTaskOffsetIntervalMs(60_000),
result.response().data()
);
}
@@ -17461,7 +17464,8 @@ public class GroupMetadataManagerTest {
))
.setStandbyTasks(List.of())
.setWarmupTasks(List.of())
- .setStatus(List.of()),
+ .setStatus(List.of())
+ .setTaskOffsetIntervalMs(60_000),
result.response().data()
);
@@ -17481,7 +17485,15 @@ public class GroupMetadataManagerTest {
List<CoordinatorRecord> expectedRecords = List.of(
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId,
expectedMember),
StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(groupId,
topology),
-
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 2,
groupMetadataHash, 0, Map.of("num.standby.replicas", "0")),
+ StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(
+ groupId,
+ 2,
+ groupMetadataHash,
+ 0,
+ new TreeMap<>(Map.of(
+ "num.standby.replicas", "0"
+ ))
+ ),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId,
memberId,
TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3,
4, 5),
@@ -17547,7 +17559,8 @@ public class GroupMetadataManagerTest {
))
.setStandbyTasks(List.of())
.setWarmupTasks(List.of())
- .setStatus(List.of()),
+ .setStatus(List.of())
+ .setTaskOffsetIntervalMs(60_000),
result.response().data()
);
@@ -17640,7 +17653,8 @@ public class GroupMetadataManagerTest {
.setWarmupTasks(List.of())
.setStatus(List.of(new
StreamsGroupHeartbeatResponseData.Status()
.setStatusCode(Status.MISSING_SOURCE_TOPICS.code())
- .setStatusDetail("Source topics bar are missing."))),
+ .setStatusDetail("Source topics bar are missing.")))
+ .setTaskOffsetIntervalMs(60_000),
result.response().data()
);
@@ -17656,9 +17670,15 @@ public class GroupMetadataManagerTest {
List<CoordinatorRecord> expectedRecords = List.of(
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId,
expectedMember),
StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(groupId,
topology),
-
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 2,
computeGroupHash(Map.of(
- fooTopicName, computeTopicHash(fooTopicName, metadataImage)
- )), -1, Map.of("num.standby.replicas", "0")),
+ StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(
+ groupId,
+ 2,
+ computeGroupHash(Map.of(fooTopicName,
computeTopicHash(fooTopicName, metadataImage))),
+ -1,
+ new TreeMap<>(Map.of(
+ "num.standby.replicas", "0"
+ ))
+ ),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId,
memberId, TasksTuple.EMPTY),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentMetadataRecord(groupId,
2, context.time.milliseconds()),
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId,
expectedMember)
@@ -17726,7 +17746,8 @@ public class GroupMetadataManagerTest {
.setWarmupTasks(List.of())
.setStatus(List.of(new
StreamsGroupHeartbeatResponseData.Status()
.setStatusCode(Status.MISSING_INTERNAL_TOPICS.code())
- .setStatusDetail("Internal topics are missing: bar"))),
+ .setStatusDetail("Internal topics are missing: bar")))
+ .setTaskOffsetIntervalMs(60_000),
result.response().data()
);
@@ -17742,9 +17763,15 @@ public class GroupMetadataManagerTest {
List<CoordinatorRecord> expectedRecords = List.of(
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId,
expectedMember),
StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(groupId,
topology),
-
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 2,
computeGroupHash(Map.of(
- fooTopicName, computeTopicHash(fooTopicName, metadataImage)
- )), -1, Map.of("num.standby.replicas", "0")),
+ StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(
+ groupId,
+ 2,
+ computeGroupHash(Map.of(fooTopicName,
computeTopicHash(fooTopicName, metadataImage))),
+ -1,
+ new TreeMap<>(Map.of(
+ "num.standby.replicas", "0"
+ ))
+ ),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId,
memberId, TasksTuple.EMPTY),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentMetadataRecord(groupId,
2, context.time.milliseconds()),
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId,
expectedMember)
@@ -17809,7 +17836,8 @@ public class GroupMetadataManagerTest {
.setWarmupTasks(List.of())
.setStatus(List.of(new
StreamsGroupHeartbeatResponseData.Status()
.setStatusCode(Status.INCORRECTLY_PARTITIONED_TOPICS.code())
- .setStatusDetail("Following topics do not have the same
number of partitions: [{bar=3, foo=6}]"))),
+ .setStatusDetail("Following topics do not have the same
number of partitions: [{bar=3, foo=6}]")))
+ .setTaskOffsetIntervalMs(60_000),
result.response().data()
);
@@ -17825,10 +17853,18 @@ public class GroupMetadataManagerTest {
List<CoordinatorRecord> expectedRecords = List.of(
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId,
expectedMember),
StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(groupId,
topology),
-
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 2,
computeGroupHash(Map.of(
- fooTopicName, computeTopicHash(fooTopicName, metadataImage),
- barTopicName, computeTopicHash(barTopicName, metadataImage)
- )), -1, Map.of("num.standby.replicas", "0")),
+ StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(
+ groupId,
+ 2,
+ computeGroupHash(Map.of(
+ fooTopicName, computeTopicHash(fooTopicName,
metadataImage),
+ barTopicName, computeTopicHash(barTopicName, metadataImage)
+ )),
+ -1,
+ new TreeMap<>(Map.of(
+ "num.standby.replicas", "0"
+ ))
+ ),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId,
memberId, TasksTuple.EMPTY),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentMetadataRecord(groupId,
2, context.time.milliseconds()),
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId,
expectedMember)
@@ -17907,7 +17943,8 @@ public class GroupMetadataManagerTest {
.setWarmupTasks(List.of())
.setStatus(List.of(new
StreamsGroupHeartbeatResponseData.Status()
.setStatusCode(Status.STALE_TOPOLOGY.code())
- .setStatusDetail("The member's topology epoch 0 is behind
the group's topology epoch 1."))),
+ .setStatusDetail("The member's topology epoch 0 is behind
the group's topology epoch 1.")))
+ .setTaskOffsetIntervalMs(60_000),
result.response().data()
);
@@ -17922,10 +17959,18 @@ public class GroupMetadataManagerTest {
List<CoordinatorRecord> expectedRecords = List.of(
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId,
expectedMember),
-
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 11,
computeGroupHash(Map.of(
- fooTopicName, computeTopicHash(fooTopicName, metadataImage),
- barTopicName, computeTopicHash(barTopicName, metadataImage)
- )), 1, Map.of("num.standby.replicas", "0")),
+ StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(
+ groupId,
+ 11,
+ computeGroupHash(Map.of(
+ fooTopicName, computeTopicHash(fooTopicName,
metadataImage),
+ barTopicName, computeTopicHash(barTopicName, metadataImage)
+ )),
+ 1,
+ new TreeMap<>(Map.of(
+ "num.standby.replicas", "0"
+ ))
+ ),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId,
memberId, TasksTuple.EMPTY),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentMetadataRecord(groupId,
11, context.time.milliseconds()),
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId,
expectedMember)
@@ -18001,7 +18046,8 @@ public class GroupMetadataManagerTest {
new StreamsGroupHeartbeatResponseData.Status()
.setStatusCode(Status.SHUTDOWN_APPLICATION.code())
.setStatusDetail(statusDetail)
- )),
+ ))
+ .setTaskOffsetIntervalMs(60_000),
result1.response().data()
);
assertRecordsEquals(List.of(), result1.records());
@@ -18022,7 +18068,8 @@ public class GroupMetadataManagerTest {
new StreamsGroupHeartbeatResponseData.Status()
.setStatusCode(Status.SHUTDOWN_APPLICATION.code())
.setStatusDetail(statusDetail)
- )),
+ ))
+ .setTaskOffsetIntervalMs(60_000),
result2.response().data()
);
@@ -18115,7 +18162,8 @@ public class GroupMetadataManagerTest {
new StreamsGroupHeartbeatResponseData.Status()
.setStatusCode(Status.SHUTDOWN_APPLICATION.code())
.setStatusDetail(statusDetail)
- )),
+ ))
+ .setTaskOffsetIntervalMs(60_000),
result2.response().data()
);
}
@@ -18195,7 +18243,8 @@ public class GroupMetadataManagerTest {
))
.setStandbyTasks(List.of())
.setWarmupTasks(List.of())
- .setStatus(List.of()),
+ .setStatus(List.of())
+ .setTaskOffsetIntervalMs(60_000),
result.response().data()
);
@@ -18224,7 +18273,15 @@ public class GroupMetadataManagerTest {
List<CoordinatorRecord> expectedRecords = List.of(
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId,
expectedMember),
-
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 11,
groupMetadataHash, 0, Map.of("num.standby.replicas", "0")),
+ StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(
+ groupId,
+ 11,
+ groupMetadataHash,
+ 0,
+ new TreeMap<>(Map.of(
+ "num.standby.replicas", "0"
+ ))
+ ),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId,
memberId,
TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3,
4, 5),
@@ -18316,7 +18373,8 @@ public class GroupMetadataManagerTest {
))
.setStandbyTasks(List.of())
.setWarmupTasks(List.of())
- .setStatus(List.of()),
+ .setStatus(List.of())
+ .setTaskOffsetIntervalMs(60_000),
result.response().data()
);
@@ -18344,10 +18402,18 @@ public class GroupMetadataManagerTest {
.build();
List<CoordinatorRecord> expectedRecords = List.of(
-
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 11,
computeGroupHash(Map.of(
- fooTopicName, computeTopicHash(fooTopicName, newMetadataImage),
- barTopicName, computeTopicHash(barTopicName, newMetadataImage)
- )), 0, Map.of("num.standby.replicas", "0")),
+ StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(
+ groupId,
+ 11,
+ computeGroupHash(Map.of(
+ fooTopicName, computeTopicHash(fooTopicName,
newMetadataImage),
+ barTopicName, computeTopicHash(barTopicName,
newMetadataImage)
+ )),
+ 0,
+ new TreeMap<>(Map.of(
+ "num.standby.replicas", "0"
+ ))
+ ),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId,
memberId,
TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3,
4, 5),
@@ -18456,7 +18522,8 @@ public class GroupMetadataManagerTest {
.setActiveTasks(List.of())
.setStandbyTasks(List.of())
.setWarmupTasks(List.of())
- .setStatus(List.of()),
+ .setStatus(List.of())
+ .setTaskOffsetIntervalMs(60_000),
result.response().data()
);
@@ -18584,7 +18651,8 @@ public class GroupMetadataManagerTest {
.setPartitions(List.of(0, 1))))
.setStandbyTasks(List.of())
.setWarmupTasks(List.of())
- .setStatus(List.of()),
+ .setStatus(List.of())
+ .setTaskOffsetIntervalMs(60_000),
result.response().data()
);
@@ -18600,7 +18668,8 @@ public class GroupMetadataManagerTest {
.setMemberId(memberId)
.setMemberEpoch(2)
.setHeartbeatIntervalMs(5000)
- .setStatus(List.of()),
+ .setStatus(List.of())
+ .setTaskOffsetIntervalMs(60_000),
result.response().data()
);
}
@@ -18653,7 +18722,8 @@ public class GroupMetadataManagerTest {
.setPartitions(List.of(0, 1))))
.setStandbyTasks(List.of())
.setWarmupTasks(List.of())
- .setStatus(List.of()),
+ .setStatus(List.of())
+ .setTaskOffsetIntervalMs(60_000),
result.response().data()
);
@@ -18669,7 +18739,8 @@ public class GroupMetadataManagerTest {
.setMemberId(memberId)
.setMemberEpoch(2)
.setHeartbeatIntervalMs(5000)
- .setStatus(List.of()),
+ .setStatus(List.of())
+ .setTaskOffsetIntervalMs(60_000),
result.response().data()
);
}
@@ -18721,7 +18792,8 @@ public class GroupMetadataManagerTest {
new StreamsGroupHeartbeatResponseData.Status()
.setStatusCode(Status.ASSIGNMENT_DELAYED.code())
.setStatusDetail("Assignment delayed due to the
configured initial rebalance delay.")
- )),
+ ))
+ .setTaskOffsetIntervalMs(60_000),
result.response().data()
);
@@ -18750,7 +18822,8 @@ public class GroupMetadataManagerTest {
.setPartitions(List.of(0, 1))))
.setStandbyTasks(List.of())
.setWarmupTasks(List.of())
- .setStatus(List.of()),
+ .setStatus(List.of())
+ .setTaskOffsetIntervalMs(60_000),
result.response().data()
);
}
@@ -18917,7 +18990,8 @@ public class GroupMetadataManagerTest {
.setActiveTasks(List.of())
.setStandbyTasks(List.of())
.setWarmupTasks(List.of())
- .setStatus(List.of()),
+ .setStatus(List.of())
+ .setTaskOffsetIntervalMs(60_000),
result.response().data()
);
@@ -18959,7 +19033,8 @@ public class GroupMetadataManagerTest {
))
.setStandbyTasks(List.of())
.setWarmupTasks(List.of())
- .setStatus(List.of()),
+ .setStatus(List.of())
+ .setTaskOffsetIntervalMs(60_000),
result.response().data()
);
@@ -19005,7 +19080,8 @@ public class GroupMetadataManagerTest {
))
.setStandbyTasks(List.of())
.setWarmupTasks(List.of())
- .setStatus(List.of()),
+ .setStatus(List.of())
+ .setTaskOffsetIntervalMs(60_000),
result.response().data()
);
@@ -19039,7 +19115,8 @@ public class GroupMetadataManagerTest {
.setMemberId(memberId3)
.setMemberEpoch(11)
.setHeartbeatIntervalMs(5000)
- .setStatus(List.of()),
+ .setStatus(List.of())
+ .setTaskOffsetIntervalMs(60_000),
result.response().data()
);
@@ -19079,7 +19156,8 @@ public class GroupMetadataManagerTest {
.setMemberId(memberId1)
.setMemberEpoch(11)
.setHeartbeatIntervalMs(5000)
- .setStatus(List.of()),
+ .setStatus(List.of())
+ .setTaskOffsetIntervalMs(60_000),
result.response().data()
);
@@ -19109,7 +19187,8 @@ public class GroupMetadataManagerTest {
.setMemberId(memberId2)
.setMemberEpoch(10)
.setHeartbeatIntervalMs(5000)
- .setStatus(List.of()),
+ .setStatus(List.of())
+ .setTaskOffsetIntervalMs(60_000),
result.response().data()
);
@@ -19136,7 +19215,8 @@ public class GroupMetadataManagerTest {
.setPartitions(List.of(1))))
.setStandbyTasks(List.of())
.setWarmupTasks(List.of())
- .setStatus(List.of()),
+ .setStatus(List.of())
+ .setTaskOffsetIntervalMs(60_000),
result.response().data()
);
@@ -19167,7 +19247,8 @@ public class GroupMetadataManagerTest {
.setMemberId(memberId3)
.setMemberEpoch(11)
.setHeartbeatIntervalMs(5000)
- .setStatus(List.of()),
+ .setStatus(List.of())
+ .setTaskOffsetIntervalMs(60_000),
result.response().data()
);
@@ -19210,7 +19291,8 @@ public class GroupMetadataManagerTest {
))
.setStandbyTasks(List.of())
.setWarmupTasks(List.of())
- .setStatus(List.of()),
+ .setStatus(List.of())
+ .setTaskOffsetIntervalMs(60_000),
result.response().data()
);
@@ -19257,7 +19339,8 @@ public class GroupMetadataManagerTest {
.setPartitions(List.of(1))))
.setStandbyTasks(List.of())
.setWarmupTasks(List.of())
- .setStatus(List.of()),
+ .setStatus(List.of())
+ .setTaskOffsetIntervalMs(60_000),
result.response().data()
);
@@ -19468,7 +19551,8 @@ public class GroupMetadataManagerTest {
))
.setStandbyTasks(List.of())
.setWarmupTasks(List.of())
- .setStatus(List.of()),
+ .setStatus(List.of())
+ .setTaskOffsetIntervalMs(60_000),
result.response().data()
);
@@ -19489,9 +19573,15 @@ public class GroupMetadataManagerTest {
.build();
List<CoordinatorRecord> expectedRecords = List.of(
-
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 11,
computeGroupHash(Map.of(
- fooTopicName, computeTopicHash(fooTopicName, metadataImage)
- )), 0, Map.of("num.standby.replicas", "0")),
+ StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(
+ groupId,
+ 11,
+ computeGroupHash(Map.of(fooTopicName,
computeTopicHash(fooTopicName, metadataImage))),
+ 0,
+ new TreeMap<>(Map.of(
+ "num.standby.replicas", "0"
+ ))
+ ),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId,
memberId,
TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3,
4, 5)
@@ -19598,7 +19688,8 @@ public class GroupMetadataManagerTest {
))
.setStandbyTasks(List.of())
.setWarmupTasks(List.of())
- .setStatus(List.of()),
+ .setStatus(List.of())
+ .setTaskOffsetIntervalMs(60_000),
result.response().data()
);
@@ -19611,9 +19702,15 @@ public class GroupMetadataManagerTest {
.build();
List<CoordinatorRecord> expectedRecords = List.of(
-
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 11,
computeGroupHash(Map.of(
- fooTopicName, computeTopicHash(fooTopicName, metadataImage)
- )), 0, Map.of("num.standby.replicas", "0")),
+ StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(
+ groupId,
+ 11,
+ computeGroupHash(Map.of(fooTopicName,
computeTopicHash(fooTopicName, metadataImage))),
+ 0,
+ new TreeMap<>(Map.of(
+ "num.standby.replicas", "0"
+ ))
+ ),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId,
memberId,
TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3,
4, 5)
@@ -19764,7 +19861,15 @@ public class GroupMetadataManagerTest {
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentTombstoneRecord(groupId,
memberId),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentTombstoneRecord(groupId,
memberId),
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord(groupId,
memberId),
-
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 3,
groupMetadataHash, 0, Map.of("num.standby.replicas", "0"))
+
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(
+ groupId,
+ 3,
+ groupMetadataHash,
+ 0,
+ new TreeMap<>(Map.of(
+ "num.standby.replicas", "0"
+ ))
+ )
)
)
)),
@@ -19825,7 +19930,8 @@ public class GroupMetadataManagerTest {
.setPartitions(List.of(0, 1, 2))))
.setStandbyTasks(List.of())
.setWarmupTasks(List.of())
- .setStatus(List.of()),
+ .setStatus(List.of())
+ .setTaskOffsetIntervalMs(60_000),
result.response().data()
);
@@ -19864,7 +19970,8 @@ public class GroupMetadataManagerTest {
.setActiveTasks(List.of())
.setStandbyTasks(List.of())
.setWarmupTasks(List.of())
- .setStatus(List.of()),
+ .setStatus(List.of())
+ .setTaskOffsetIntervalMs(60_000),
result.response().data()
);
@@ -19893,7 +20000,8 @@ public class GroupMetadataManagerTest {
.setPartitions(List.of(0, 1))))
.setStandbyTasks(List.of())
.setWarmupTasks(List.of())
- .setStatus(List.of()),
+ .setStatus(List.of())
+ .setTaskOffsetIntervalMs(60_000),
result.response().data()
);
@@ -19925,7 +20033,8 @@ public class GroupMetadataManagerTest {
.setMemberEpoch(3)
.setHeartbeatIntervalMs(5000)
.setEndpointInformationEpoch(0)
- .setStatus(List.of()),
+ .setStatus(List.of())
+ .setTaskOffsetIntervalMs(60_000),
result.response().data()
);
@@ -19990,7 +20099,8 @@ public class GroupMetadataManagerTest {
.setPartitions(List.of(0, 1, 2))))
.setStandbyTasks(List.of())
.setWarmupTasks(List.of())
- .setStatus(List.of()),
+ .setStatus(List.of())
+ .setTaskOffsetIntervalMs(60_000),
result.response().data()
);
@@ -20029,7 +20139,8 @@ public class GroupMetadataManagerTest {
.setActiveTasks(List.of())
.setStandbyTasks(List.of())
.setWarmupTasks(List.of())
- .setStatus(List.of()),
+ .setStatus(List.of())
+ .setTaskOffsetIntervalMs(60_000),
result.response().data()
);
@@ -20057,7 +20168,8 @@ public class GroupMetadataManagerTest {
.setPartitions(List.of(0, 1))))
.setStandbyTasks(List.of())
.setWarmupTasks(List.of())
- .setStatus(List.of()),
+ .setStatus(List.of())
+ .setTaskOffsetIntervalMs(60_000),
result.response().data()
);
@@ -20073,7 +20185,15 @@ public class GroupMetadataManagerTest {
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentTombstoneRecord(groupId,
memberId1),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentTombstoneRecord(groupId,
memberId1),
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord(groupId,
memberId1),
-
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 4,
groupMetadataHash, 0, Map.of("num.standby.replicas", "0"))
+
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(
+ groupId,
+ 4,
+ groupMetadataHash,
+ 0,
+ new TreeMap<>(Map.of(
+ "num.standby.replicas", "0"
+ ))
+ )
)
)
)),
@@ -20240,7 +20360,8 @@ public class GroupMetadataManagerTest {
.setStandbyTasks(List.of())
.setWarmupTasks(List.of())
.setPartitionsByUserEndpoint(List.of(expectedEndpointToPartitions))
- .setStatus(List.of()),
+ .setStatus(List.of())
+ .setTaskOffsetIntervalMs(60_000),
result.response().data()
);
@@ -20417,7 +20538,8 @@ public class GroupMetadataManagerTest {
.setPartitions(List.of(0, 1, 2))))
.setStandbyTasks(List.of())
.setWarmupTasks(List.of())
- .setStatus(List.of()),
+ .setStatus(List.of())
+ .setTaskOffsetIntervalMs(60_000),
result.response().data()
);
@@ -20538,7 +20660,15 @@ public class GroupMetadataManagerTest {
GroupCoordinatorRecordHelpers.newGroupMetadataTombstoneRecord(classicGroupId),
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(classicGroupId,
expectedMember),
StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(classicGroupId,
topology),
-
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(classicGroupId,
2, 0, -1, Map.of("num.standby.replicas", "0")),
+ StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(
+ classicGroupId,
+ 2,
+ 0,
+ -1,
+ new TreeMap<>(Map.of(
+ "num.standby.replicas", "0"
+ ))
+ ),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(classicGroupId,
memberId, TasksTuple.EMPTY),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentMetadataRecord(classicGroupId,
2, context.time.milliseconds()),
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(classicGroupId,
expectedMember)
@@ -20858,7 +20988,10 @@ public class GroupMetadataManagerTest {
.setStandbyTasks(List.of())
.setWarmupTasks(List.of()));
assertEquals(2, result.response().data().memberEpoch());
- assertEquals(Map.of("num.standby.replicas", "0"),
assignor.lastPassedAssignmentConfigs());
+ assertEquals(
+ Map.of("num.standby.replicas", "0"),
+ assignor.lastPassedAssignmentConfigs()
+ );
// Verify heartbeat interval
assertEquals(GroupCoordinatorConfig.STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT,
result.response().data().heartbeatIntervalMs());
@@ -20894,7 +21027,10 @@ public class GroupMetadataManagerTest {
context.assertSessionTimeout(groupId, memberId, 50000);
// Verify that the new number of standby replicas is used
- assertEquals(Map.of("num.standby.replicas", "2"),
assignor.lastPassedAssignmentConfigs());
+ assertEquals(
+ Map.of("num.standby.replicas", "2"),
+ assignor.lastPassedAssignmentConfigs()
+ );
// Advance time.
assertEquals(
@@ -21116,15 +21252,16 @@ public class GroupMetadataManagerTest {
.setWarmupTasks(List.of()));
assertEquals(2, result.response().data().memberEpoch());
- // Verify default heartbeat interval, session timeout, and
num.standby.replicas before config update.
+ // Verify default heartbeat interval, session timeout,
num.standby.replicas, task.offset.interval before config update.
assertEquals(GroupCoordinatorConfig.STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT,
result.response().data().heartbeatIntervalMs());
context.assertSessionTimeout(groupId, memberId,
GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_DEFAULT);
- assertEquals(Map.of("num.standby.replicas",
-
String.valueOf(GroupCoordinatorConfig.STREAMS_GROUP_NUM_STANDBY_REPLICAS_DEFAULT)),
+ assertEquals(
+ Map.of(
+ "num.standby.replicas",
String.valueOf(GroupCoordinatorConfig.STREAMS_GROUP_NUM_STANDBY_REPLICAS_DEFAULT)
+ ),
assignor.lastPassedAssignmentConfigs());
-
// Advance time.
assertEquals(
List.of(),
@@ -21134,10 +21271,12 @@ public class GroupMetadataManagerTest {
// Dynamic update group config with out-of-range values.
// Session timeout 70000 exceeds max 60000; heartbeat interval 1 is
below min 5000;
// num standby replicas 100 exceeds max 2.
+ // task offset interval 100 exceeds min 15000.
Properties newGroupConfig = new Properties();
newGroupConfig.put(STREAMS_SESSION_TIMEOUT_MS_CONFIG, 70000);
newGroupConfig.put(STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG, 1);
newGroupConfig.put(STREAMS_NUM_STANDBY_REPLICAS_CONFIG, 100);
+ newGroupConfig.put(STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG, 100);
context.updateGroupConfig(groupId, newGroupConfig);
// Session timer is rescheduled on second heartbeat, new assignment
with evaluated parameter is calculated.
@@ -21156,9 +21295,12 @@ public class GroupMetadataManagerTest {
context.assertSessionTimeout(groupId, memberId,
GroupCoordinatorConfig.STREAMS_GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT);
- // Verify that the number of standby replicas is evaluated to max.
- assertEquals(Map.of("num.standby.replicas",
-
String.valueOf(GroupCoordinatorConfig.STREAMS_GROUP_MAX_STANDBY_REPLICAS_DEFAULT)),
+ // Verify that the number of standby replicas is evaluated to max,
+ // and task offset interval is evaluated to min
+ assertEquals(
+ Map.of(
+ "num.standby.replicas",
String.valueOf(GroupCoordinatorConfig.STREAMS_GROUP_MAX_STANDBY_REPLICAS_DEFAULT)
+ ),
assignor.lastPassedAssignmentConfigs());
}
@@ -26650,6 +26792,8 @@ public class GroupMetadataManagerTest {
*/
private Map<String, String> getDefaultAssignmentConfigs() {
// Use the same default value as
GroupCoordinatorConfig.STREAMS_GROUP_NUM_STANDBY_REPLICAS_DEFAULT
- return Map.of("num.standby.replicas",
String.valueOf(GroupCoordinatorConfig.STREAMS_GROUP_NUM_STANDBY_REPLICAS_DEFAULT));
+ return new TreeMap<>(Map.of(
+ "num.standby.replicas",
String.valueOf(GroupCoordinatorConfig.STREAMS_GROUP_NUM_STANDBY_REPLICAS_DEFAULT)
+ ));
}
}
diff --git
a/streams/test-utils/src/test/java/org/apache/kafka/streams/test/MockProcessorContextAPITest.java
b/streams/test-utils/src/test/java/org/apache/kafka/streams/test/MockProcessorContextAPITest.java
index e863fe8509d..1fb6ca93123 100644
---
a/streams/test-utils/src/test/java/org/apache/kafka/streams/test/MockProcessorContextAPITest.java
+++
b/streams/test-utils/src/test/java/org/apache/kafka/streams/test/MockProcessorContextAPITest.java
@@ -208,6 +208,7 @@ public class MockProcessorContextAPITest {
assertThat(context.committed(), is(false));
}
+ @SuppressWarnings("unchecked")
@Test
public void shouldStoreAndReturnStateStores() {
final Processor<String, Long, Void, Void> processor = new
Processor<>() {
diff --git
a/tools/src/test/java/org/apache/kafka/tools/ConfigCommandIntegrationTest.java
b/tools/src/test/java/org/apache/kafka/tools/ConfigCommandIntegrationTest.java
index c221ba05ab1..ccc356820d1 100644
---
a/tools/src/test/java/org/apache/kafka/tools/ConfigCommandIntegrationTest.java
+++
b/tools/src/test/java/org/apache/kafka/tools/ConfigCommandIntegrationTest.java
@@ -59,6 +59,7 @@ import static
org.apache.kafka.coordinator.group.GroupConfig.CONSUMER_HEARTBEAT_
import static
org.apache.kafka.coordinator.group.GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG;
import static
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG;
import static
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG;
+import static
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG;
import static
org.apache.kafka.server.config.ReplicationConfigs.AUTO_LEADER_REBALANCE_ENABLE_CONFIG;
import static
org.apache.kafka.server.config.ServerConfigs.MESSAGE_MAX_BYTES_CONFIG;
import static
org.apache.kafka.server.config.ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG;
@@ -295,6 +296,75 @@ public class ConfigCommandIntegrationTest {
assertTrue(message.contains("streams.heartbeat.interval.ms=5000
sensitive=false synonyms={DEFAULT_CONFIG:streams.heartbeat.interval.ms=5000}"));
assertTrue(message.contains("streams.num.standby.replicas=0
sensitive=false synonyms={DEFAULT_CONFIG:streams.num.standby.replicas=0}"));
assertTrue(message.contains("streams.session.timeout.ms=45000
sensitive=false synonyms={DEFAULT_CONFIG:streams.session.timeout.ms=45000}"));
+ assertTrue(message.contains("streams.task.offset.interval.ms=60000
sensitive=false
synonyms={DEFAULT_CONFIG:streams.task.offset.interval.ms=60000}"));
+ }
+
+ @ClusterTest
+ public void testAlterStreamsGroupSessionTimeout() {
+ // verify session.timeout.ms
+
+ // Verify the initial config
+ Stream<String> command = Stream.concat(quorumArgs(), Stream.of(
+ "--entity-type", "groups",
+ "--entity-name", "group",
+ "--describe", "--all"));
+ String message = captureStandardOut(run(command));
+ assertTrue(message.contains("streams.session.timeout.ms=45000"));
+
+ // Should fail to set below min
+ command = Stream.concat(quorumArgs(), Stream.of(
+ "--entity-type", "groups",
+ "--entity-name", "group",
+ "--alter", "--add-config", "streams.session.timeout.ms=1"));
+ message = captureStandardErr(run(command));
+
assertTrue(message.contains("org.apache.kafka.common.errors.InvalidConfigurationException:
streams.session.timeout.ms must be greater than or equal to
group.streams.min.session.timeout.ms"));
+
+ // Should fail to set above max
+ command = Stream.concat(quorumArgs(), Stream.of(
+ "--entity-type", "groups",
+ "--entity-name", "group",
+ "--alter", "--add-config", "streams.session.timeout.ms=100000"));
+ message = captureStandardErr(run(command));
+
assertTrue(message.contains("org.apache.kafka.common.errors.InvalidConfigurationException:
streams.session.timeout.ms must be less than or equal to
group.streams.max.session.timeout.ms"));
+ }
+
+ @ClusterTest(serverProperties = {
+ @ClusterConfigProperty(key =
STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG, value = "55000"),
+ })
+ public void testAlterStreamsGroupHeartbeatInterval() {
+ // verify heartbeat.interval.ms
+
+ // Verify the initial config
+ Stream<String> command = Stream.concat(quorumArgs(), Stream.of(
+ "--entity-type", "groups",
+ "--entity-name", "group",
+ "--describe", "--all"));
+ String message = captureStandardOut(run(command));
+ assertTrue(message.contains("streams.heartbeat.interval.ms=5000"));
+
+ // Should fail to set below min
+ command = Stream.concat(quorumArgs(), Stream.of(
+ "--entity-type", "groups",
+ "--entity-name", "group",
+ "--alter", "--add-config", "streams.heartbeat.interval.ms=1"));
+ message = captureStandardErr(run(command));
+
assertTrue(message.contains("org.apache.kafka.common.errors.InvalidConfigurationException:
streams.heartbeat.interval.ms must be greater than or equal to
group.streams.min.heartbeat.interval.ms"));
+
+ // Should fail to set above max
+ command = Stream.concat(quorumArgs(), Stream.of(
+ "--entity-type", "groups",
+ "--entity-name", "group",
+ "--alter", "--add-config",
"streams.heartbeat.interval.ms=100000"));
+ message = captureStandardErr(run(command));
+
assertTrue(message.contains("org.apache.kafka.common.errors.InvalidConfigurationException:
streams.heartbeat.interval.ms must be less than or equal to
group.streams.max.heartbeat.interval.ms"));
+
+ // Should fail to set above session timeout
+ command = Stream.concat(quorumArgs(), Stream.of(
+ "--entity-type", "groups",
+ "--entity-name", "group",
+ "--alter", "--add-config", "streams.heartbeat.interval.ms=50000"));
+ message = captureStandardErr(run(command));
+
assertTrue(message.contains("org.apache.kafka.common.errors.InvalidConfigurationException:
streams.session.timeout.ms must be greater than
streams.heartbeat.interval.ms"));
}
@ClusterTest
@@ -322,6 +392,49 @@ public class ConfigCommandIntegrationTest {
"--describe"));
message = captureStandardOut(run(command));
assertTrue(message.contains("streams.num.standby.replicas=1"));
+
+ // Should fail to set above max timeout
+ command = Stream.concat(quorumArgs(), Stream.of(
+ "--entity-type", "groups",
+ "--entity-name", "group",
+ "--alter", "--add-config", "streams.num.standby.replicas=3"));
+ message = captureStandardErr(run(command));
+ assertTrue(message.contains("streams.num.standby.replicas must be less
than or equal to group.streams.max.standby.replicas"));
+ }
+
+ @ClusterTest
+ public void testAlterStreamsGroupTaskOffsetInterval() {
+ // Verify the initial config
+ Stream<String> command = Stream.concat(quorumArgs(), Stream.of(
+ "--entity-type", "groups",
+ "--entity-name", "group",
+ "--describe", "--all"));
+ String message = captureStandardOut(run(command));
+ assertTrue(message.contains("streams.task.offset.interval.ms=60000"));
+
+ // Alter task offset interval
+ command = Stream.concat(quorumArgs(), Stream.of(
+ "--entity-type", "groups",
+ "--entity-name", "group",
+ "--alter", "--add-config",
"streams.task.offset.interval.ms=45000"));
+ message = captureStandardOut(run(command));
+ assertEquals("Completed updating config for group group.", message);
+
+ // Verify the updated config
+ command = Stream.concat(quorumArgs(), Stream.of(
+ "--entity-type", "groups",
+ "--describe"));
+ message = captureStandardOut(run(command));
+ assertTrue(message.contains("streams.task.offset.interval.ms=45000"));
+
+ // Should fail to set below min interval
+ command = Stream.concat(quorumArgs(), Stream.of(
+ "--entity-type", "groups",
+ "--entity-name", "group",
+ "--alter", "--add-config", "streams.task.offset.interval.ms=1"));
+ message = captureStandardErr(run(command));
+ assertTrue(message.contains("streams.task.offset.interval.ms must be
greater than or equal to group.streams.min.task.offset.interval.ms"));
+
}
private void verifyGroupConfigUpdate(List<String> alterOpts) throws
Exception {