This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new eb722e43f0b KAFKA-18652: Add `acceptable.recovery.lag` config (#21799)
eb722e43f0b is described below
commit eb722e43f0bbb62aabbeebb9f3756cd8a0853a14
Author: Matthias J. Sax <[email protected]>
AuthorDate: Fri Jun 5 13:46:27 2026 -0700
KAFKA-18652: Add `acceptable.recovery.lag` config (#21799)
KIP-1071 adds new streams group config acceptable.recovery.lag.
This PR adds this config to the broker, and updates the
StreamsHeartbeatResponse to provide acceptable.recovery.lag to the Kafka
Streams client.
We also update the RPC field from `int32` to `int64`, as specified on
the KIP, via a request version bump, to fix the previously incorrect
implementation. The old (now deprecated `int32` field) is renamed to
`AcceptableRecoveryLagLegacy` on StreamsGroupHeartbeatResponse version 0
(and does not exist on version 1+), and for
StreamsGroupHeartbeatResponse version 1+, `AcceptableRecoveryLag` field
is type `int64` now.
Note that both `StreamsGroupHeartbeatRequest` and
`StreamsGroupHeartbeatResponse` go already a version bump with some
other PR for 4.4 release, thus we the applies changes are safe, w/o a
version bump in this PR.
Reviewers: Lucas Brutschy <[email protected]>, David Jacot
<[email protected]>
---
.../StreamsGroupHeartbeatRequestManager.java | 1 +
.../consumer/internals/StreamsRebalanceData.java | 26 ++
.../message/StreamsGroupHeartbeatResponse.json | 6 +-
.../StreamsGroupHeartbeatResponseTest.java | 133 +++++++
.../scala/unit/kafka/server/KafkaApisTest.scala | 3 +-
.../scala/unit/kafka/server/KafkaConfigTest.scala | 1 +
.../server/StreamsGroupHeartbeatRequestTest.scala | 23 ++
.../kafka/coordinator/group/GroupConfig.java | 24 ++
.../coordinator/group/GroupCoordinatorConfig.java | 27 +-
.../coordinator/group/GroupMetadataManager.java | 13 +
.../group/GroupMetadataManagerTest.java | 390 +++++++++++++++++----
.../group/GroupMetadataManagerTestContext.java | 9 +-
12 files changed, 584 insertions(+), 72 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
index 4080439283d..283625cf2fc 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java
@@ -569,6 +569,7 @@ public class StreamsGroupHeartbeatRequestManager implements
RequestManager {
heartbeatState.setEndpointInformationEpoch(data.endpointInformationEpoch());
streamsRebalanceData.setHeartbeatIntervalMs(data.heartbeatIntervalMs());
streamsRebalanceData.setTaskOffsetIntervalMs(data.taskOffsetIntervalMs());
+
streamsRebalanceData.setAcceptableRecoveryLag(data.acceptableRecoveryLag());
if (data.partitionsByUserEndpoint() != null) {
streamsRebalanceData.setPartitionsByHost(convertHostInfoMap(data));
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceData.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceData.java
index 383900355dd..f4699825b91 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceData.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceData.java
@@ -31,6 +31,7 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
/**
@@ -121,6 +122,7 @@ public class StreamsRebalanceData {
}
}
+
public static class EndpointPartitions {
private final List<TopicPartition> activePartitions;
private final List<TopicPartition> standbyPartitions;
@@ -347,6 +349,8 @@ public class StreamsRebalanceData {
private final AtomicInteger taskOffsetIntervalMs = new AtomicInteger(-1);
+ private final AtomicLong acceptableRecoveryLag = new AtomicLong(-1);
+
public StreamsRebalanceData(final UUID processId,
final Optional<HostInfo> endpoint,
final Optional<String> rackId,
@@ -439,4 +443,26 @@ public class StreamsRebalanceData {
return taskOffsetIntervalMs.get();
}
+ /**
+ * Updated whenever a heartbeat response is received from the broker.
+ *
+ * <p>If the broker does not support warmup tasks, this field should be
set to {@code -1}.
+ * For this case, the Kafka Streams client is not required to populate
{@code TaskOffsets} or
+ * {@code TaskEndOffsets} fields in {@link
org.apache.kafka.common.requests.StreamsGroupHeartbeatRequest}.
+ */
+ public void setAcceptableRecoveryLag(final long acceptableRecoveryLag) {
+ this.acceptableRecoveryLag.set(acceptableRecoveryLag);
+ }
+
+ /**
+ * Returns the acceptable recovery lag.
+ *
+ * <p>If acceptable recovery lag is set to {@code -1}, it means the broker
doesn't support warmup tasks,
+ * and the Kafka Streams client is not required to populate {@code
TaskOffsets} or {@code TaskEndOffsets} fields
+ * in {@link
org.apache.kafka.common.requests.StreamsGroupHeartbeatRequest}.
+ */
+ public long acceptableRecoveryLag() {
+ return acceptableRecoveryLag.get();
+ }
+
}
diff --git
a/clients/src/main/resources/common/message/StreamsGroupHeartbeatResponse.json
b/clients/src/main/resources/common/message/StreamsGroupHeartbeatResponse.json
index 6422712ba52..34bdaf2e2d3 100644
---
a/clients/src/main/resources/common/message/StreamsGroupHeartbeatResponse.json
+++
b/clients/src/main/resources/common/message/StreamsGroupHeartbeatResponse.json
@@ -50,10 +50,12 @@
"about": "The member epoch." },
{ "name": "HeartbeatIntervalMs", "type": "int32", "versions": "0+",
"about": "The heartbeat interval in milliseconds." },
- { "name": "AcceptableRecoveryLag", "type": "int32", "versions": "0+",
- "about": "The maximal lag a warm-up task can have to be considered
caught-up." },
+ { "name": "AcceptableRecoveryLagLegacy", "type": "int32", "versions": "0",
+ "about": "Deprecated. The maximum acceptable lag (number of offsets to
catch up) for a client to be considered caught-up enough to receive an active
task assignment. Use AcceptableRecoveryLag instead." },
{ "name": "TaskOffsetIntervalMs", "type": "int32", "versions": "0+",
"about": "The interval in which the task changelog offsets on a client
are updated on the broker. The offsets are sent with the next heartbeat after
this time has passed." },
+ { "name": "AcceptableRecoveryLag", "type": "int64", "versions": "1+",
"default": "-1", "ignorable": true,
+ "about": "The maximum acceptable lag (number of offsets to catch up) for
a client to be considered caught-up enough to receive an active task
assignment. A value of -1 indicates the broker does not support this
configuration." },
{ "name": "Status", "type": "[]Status", "versions": "0+",
"nullableVersions": "0+",
"about": "Indicate zero or more status for the group." },
diff --git
a/clients/src/test/java/org/apache/kafka/common/requests/StreamsGroupHeartbeatResponseTest.java
b/clients/src/test/java/org/apache/kafka/common/requests/StreamsGroupHeartbeatResponseTest.java
new file mode 100644
index 00000000000..d77c365c8c5
--- /dev/null
+++
b/clients/src/test/java/org/apache/kafka/common/requests/StreamsGroupHeartbeatResponseTest.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource;
+
+import org.junit.jupiter.params.ParameterizedTest;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class StreamsGroupHeartbeatResponseTest {
+
+ @ParameterizedTest
+ @ApiKeyVersionsSource(apiKey = ApiKeys.STREAMS_GROUP_HEARTBEAT)
+ public void testAcceptableRecoveryLagFieldVersionCompatibility(short
version) {
+ // Set the appropriate field based on version
+ StreamsGroupHeartbeatResponseData data = new
StreamsGroupHeartbeatResponseData()
+ .setMemberId("test-member")
+ .setMemberEpoch(1)
+ .setHeartbeatIntervalMs(5000)
+ .setTaskOffsetIntervalMs(60000);
+
+ if (version == 0) {
+ data.setAcceptableRecoveryLagLegacy(10000); // int32 - version 0
only
+ } else {
+ data.setAcceptableRecoveryLag(20000L); // int64 - version 1+
+ }
+
+ // Create response with specific version
+ StreamsGroupHeartbeatResponse response = new
StreamsGroupHeartbeatResponse(data);
+
+ // Serialize and deserialize to simulate wire protocol
+ StreamsGroupHeartbeatResponse parsedResponse =
+ StreamsGroupHeartbeatResponse.parse(response.serialize(version),
version);
+
+ if (version == 0) {
+ // Version 0: should have legacy field, new field should be
default (-1)
+ assertEquals(10000,
parsedResponse.data().acceptableRecoveryLagLegacy(),
+ "Version 0 response should include
acceptableRecoveryLagLegacy");
+ assertEquals(-1L, parsedResponse.data().acceptableRecoveryLag(),
+ "Version 0 response should NOT include acceptableRecoveryLag
(should be default -1)");
+ } else {
+ // Version 1+: should have new field, legacy field should be
default (0)
+ assertEquals(0,
parsedResponse.data().acceptableRecoveryLagLegacy(),
+ "Version 1+ response should NOT include
acceptableRecoveryLagLegacy (should be default 0)");
+ assertEquals(20000L, parsedResponse.data().acceptableRecoveryLag(),
+ "Version 1+ response should include acceptableRecoveryLag");
+ }
+
+ // Common fields should be preserved across versions
+ assertEquals("test-member", parsedResponse.data().memberId());
+ assertEquals(1, parsedResponse.data().memberEpoch());
+ assertEquals(5000, parsedResponse.data().heartbeatIntervalMs());
+ assertEquals(60000, parsedResponse.data().taskOffsetIntervalMs());
+ }
+
+ @ParameterizedTest
+ @ApiKeyVersionsSource(apiKey = ApiKeys.STREAMS_GROUP_HEARTBEAT)
+ public void testAcceptableRecoveryLagWithZeroValue(short version) {
+ // Test that a config value of 0 is handled correctly
+ StreamsGroupHeartbeatResponseData data = new
StreamsGroupHeartbeatResponseData()
+ .setMemberId("test-member")
+ .setMemberEpoch(1)
+ .setHeartbeatIntervalMs(5000)
+ .setTaskOffsetIntervalMs(60000);
+
+ if (version == 0) {
+ data.setAcceptableRecoveryLagLegacy(0); // Valid config value for
v0
+ } else {
+ data.setAcceptableRecoveryLag(0L); // Valid config value for
v1+
+ }
+
+ StreamsGroupHeartbeatResponse response = new
StreamsGroupHeartbeatResponse(data);
+ StreamsGroupHeartbeatResponse parsedResponse =
+ StreamsGroupHeartbeatResponse.parse(response.serialize(version),
version);
+
+ if (version == 0) {
+ assertEquals(0,
parsedResponse.data().acceptableRecoveryLagLegacy(),
+ "Version 0 response should preserve
acceptableRecoveryLagLegacy value of 0");
+ } else {
+ assertEquals(0L, parsedResponse.data().acceptableRecoveryLag(),
+ "Version 1+ response should preserve acceptableRecoveryLag
value of 0");
+ }
+ }
+
+ @ParameterizedTest
+ @ApiKeyVersionsSource(apiKey = ApiKeys.STREAMS_GROUP_HEARTBEAT)
+ public void testAcceptableRecoveryLagWithMaxValue(short version) {
+ // Test that large values work correctly, especially for version 1+
int64 field
+ long maxValue = 1_000_000_000_000L; // 1 trillion - exceeds int32 range
+ int cappedValue = Integer.MAX_VALUE; // Capped for int32 field
+
+ StreamsGroupHeartbeatResponseData data = new
StreamsGroupHeartbeatResponseData()
+ .setMemberId("test-member")
+ .setMemberEpoch(1)
+ .setHeartbeatIntervalMs(5000)
+ .setTaskOffsetIntervalMs(60000);
+
+ if (version == 0) {
+ data.setAcceptableRecoveryLagLegacy(cappedValue);
+ } else {
+ data.setAcceptableRecoveryLag(maxValue);
+ }
+
+ StreamsGroupHeartbeatResponse response = new
StreamsGroupHeartbeatResponse(data);
+ StreamsGroupHeartbeatResponse parsedResponse =
+ StreamsGroupHeartbeatResponse.parse(response.serialize(version),
version);
+
+ if (version == 0) {
+ assertEquals(cappedValue,
parsedResponse.data().acceptableRecoveryLagLegacy(),
+ "Version 0 response should preserve
acceptableRecoveryLagLegacy at Integer.MAX_VALUE");
+ } else {
+ assertEquals(maxValue,
parsedResponse.data().acceptableRecoveryLag(),
+ "Version 1+ response should preserve full int64 value beyond
Integer.MAX_VALUE");
+ }
+ }
+}
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 5471c410293..4239b49a962 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -79,7 +79,7 @@ import
org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource
import org.apache.kafka.common.utils.{ProducerIdAndEpoch, Utils}
import org.apache.kafka.common.utils.internals.SecurityUtils
import org.apache.kafka.common.utils.internals.ImplicitLinkedHashCollection
-import
org.apache.kafka.coordinator.group.GroupConfig.{CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG,
CONSUMER_ASSIGNOR_OFFLOAD_ENABLE_CONFIG,
CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, CONSUMER_SESSION_TIMEOUT_MS_CONFIG,
SHARE_ASSIGNMENT_INTERVAL_MS_CONFIG, SHARE_ASSIGNOR_OFFLOAD_ENABLE_CONFIG,
SHARE_AUTO_OFFSET_RESET_CONFIG, SHARE_DELIVERY_COUNT_LIMIT_CONFIG,
SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, SHARE_ISOLATION_LEVEL_CONFIG,
SHARE_PARTITION_MAX_RECORD_LOCKS_CONFIG, SHARE_RECORD_LOCK_DURATION_MS_CO [...]
+import
org.apache.kafka.coordinator.group.GroupConfig.{CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG,
CONSUMER_ASSIGNOR_OFFLOAD_ENABLE_CONFIG,
CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, CONSUMER_SESSION_TIMEOUT_MS_CONFIG,
SHARE_ASSIGNMENT_INTERVAL_MS_CONFIG, SHARE_ASSIGNOR_OFFLOAD_ENABLE_CONFIG,
SHARE_AUTO_OFFSET_RESET_CONFIG, SHARE_DELIVERY_COUNT_LIMIT_CONFIG,
SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, SHARE_ISOLATION_LEVEL_CONFIG,
SHARE_PARTITION_MAX_RECORD_LOCKS_CONFIG, SHARE_RECORD_LOCK_DURATION_MS_CO [...]
import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig
import org.apache.kafka.coordinator.group.{GroupConfig, GroupConfigManager,
GroupCoordinator, GroupCoordinatorConfig}
import org.apache.kafka.coordinator.group.streams.StreamsGroupHeartbeatResult
@@ -383,6 +383,7 @@ class KafkaApisTest extends Logging {
cgConfigs.put(STREAMS_ASSIGNOR_OFFLOAD_ENABLE_CONFIG, "false")
cgConfigs.put(STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG,
GroupCoordinatorConfig.STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_DEFAULT.toString)
cgConfigs.put(STREAMS_NUM_WARMUP_REPLICAS_CONFIG,
GroupCoordinatorConfig.STREAMS_GROUP_NUM_WARMUP_REPLICAS_DEFAULT.toString)
+ cgConfigs.put(STREAMS_ACCEPTABLE_RECOVERY_LAG_CONFIG,
GroupCoordinatorConfig.STREAMS_GROUP_ACCEPTABLE_RECOVERY_LAG_DEFAULT.toString)
cgConfigs.put(ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG, "")
cgConfigs.put(ERRORS_DEADLETTERQUEUE_COPY_RECORD_ENABLE_CONFIG, "false")
when(configRepository.groupConfig(consumerGroupId)).thenReturn(cgConfigs)
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index a409f8f7cb1..523675c5293 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -1110,6 +1110,7 @@ class KafkaConfigTest {
case
GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG =>
assertPropertyInvalid(baseProperties, name, "not_a_number", -1)
case
GroupCoordinatorConfig.STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_CONFIG =>
assertPropertyInvalid(baseProperties, name, "not_a_number", -1)
case
GroupCoordinatorConfig.STREAMS_GROUP_MIN_TASK_OFFSET_INTERVAL_MS_CONFIG =>
assertPropertyInvalid(baseProperties, name, "not_a_number", -1)
+ case
GroupCoordinatorConfig.STREAMS_GROUP_ACCEPTABLE_RECOVERY_LAG_CONFIG =>
assertPropertyInvalid(baseProperties, name, "not_a_number", -1)
/** Share coordinator configs */
case ShareCoordinatorConfig.APPEND_LINGER_MS_CONFIG =>
assertPropertyInvalid(baseProperties, name, "not_a_number", -2, -0.5)
diff --git
a/core/src/test/scala/unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala
b/core/src/test/scala/unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala
index 7b4ec29c305..16811eee758 100644
---
a/core/src/test/scala/unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala
+++
b/core/src/test/scala/unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala
@@ -707,6 +707,10 @@ class StreamsGroupHeartbeatRequestTest(cluster:
ClusterInstance) extends GroupCo
assertEquals(60_000,
streamsGroupHeartbeatResponse1.taskOffsetIntervalMs(), "Member 1 should pickup
task.offset.interval.ms initially")
assertEquals(60_000,
streamsGroupHeartbeatResponse2.taskOffsetIntervalMs(), "Member 2 should pickup
task.offset.interval.ms initially")
+ // Verify both members picked up `acceptable.recovery.lag`
+ assertEquals(10_000,
streamsGroupHeartbeatResponse1.acceptableRecoveryLag(), "Member 1 should pickup
acceptable.recovery.lag initially")
+ assertEquals(10_000,
streamsGroupHeartbeatResponse2.acceptableRecoveryLag(), "Member 2 should pickup
acceptable.recovery.lag initially")
+
// Both members continue to send heartbeats with their assigned tasks
TestUtils.waitUntilTrue(() => {
streamsGroupHeartbeatResponse1 = streamsGroupHeartbeat(
@@ -761,6 +765,10 @@ class StreamsGroupHeartbeatRequestTest(cluster:
ClusterInstance) extends GroupCo
assertEquals(60_000,
streamsGroupHeartbeatResponse1.taskOffsetIntervalMs(), "Member 1 should pickup
task.offset.interval.ms initially")
assertEquals(60_000,
streamsGroupHeartbeatResponse2.taskOffsetIntervalMs(), "Member 2 should pickup
task.offset.interval.ms initially")
+ // Verify both members picked up `acceptable.recovery.lag`
+ assertEquals(10_000,
streamsGroupHeartbeatResponse1.acceptableRecoveryLag(), "Member 1 should pickup
acceptable.recovery.lag initially")
+ assertEquals(10_000,
streamsGroupHeartbeatResponse2.acceptableRecoveryLag(), "Member 2 should pickup
acceptable.recovery.lag initially")
+
// Change streams.num.standby.replicas = 1
val groupConfigResource = new ConfigResource(ConfigResource.Type.GROUP,
groupId)
val alterConfigOp = new AlterConfigOp(
@@ -781,6 +789,16 @@ class StreamsGroupHeartbeatRequestTest(cluster:
ClusterInstance) extends GroupCo
val options2 = new org.apache.kafka.clients.admin.AlterConfigsOptions()
admin.incrementalAlterConfigs(configChanges2, options2).all().get()
+ // Change streams.acceptable.recovery.lag = 5000
+ val groupConfigResource3 = new ConfigResource(ConfigResource.Type.GROUP,
groupId)
+ val alterConfigOp3 = new AlterConfigOp(
+ new ConfigEntry("streams.acceptable.recovery.lag", "5000"),
+ AlterConfigOp.OpType.SET
+ )
+ val configChanges3 = Map(groupConfigResource3 ->
List(alterConfigOp3).asJavaCollection).asJava
+ val options3 = new org.apache.kafka.clients.admin.AlterConfigsOptions()
+ admin.incrementalAlterConfigs(configChanges3, options3).all().get()
+
// Send heartbeats to trigger rebalance after config change
TestUtils.waitUntilTrue(() => {
streamsGroupHeartbeatResponse1 = streamsGroupHeartbeat(
@@ -846,6 +864,10 @@ class StreamsGroupHeartbeatRequestTest(cluster:
ClusterInstance) extends GroupCo
streamsGroupHeartbeatResponse2.taskOffsetIntervalMs() == 45_000
}, "Member 2 did not pick up updated task.offset.interval.ms within the
timeout period.")
+ // Verify both members picked up change of `acceptable.recovery.lag`
+ assertEquals(5_000,
streamsGroupHeartbeatResponse1.acceptableRecoveryLag(), "Member 1 should pickup
acceptable.recovery.lag change")
+ assertEquals(5_000,
streamsGroupHeartbeatResponse2.acceptableRecoveryLag(), "Member 2 should pickup
acceptable.recovery.lag change")
+
} finally {
admin.close()
}
@@ -1131,6 +1153,7 @@ class StreamsGroupHeartbeatRequestTest(cluster:
ClusterInstance) extends GroupCo
.setStandbyTasks(List.empty.asJava)
.setWarmupTasks(List.empty.asJava)
.setTaskOffsetIntervalMs(60_000)
+ .setAcceptableRecoveryLag(10_000)
assertEquals(expectedRejoinResponse, rejoinResponse)
} finally {
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
index 100bbc0c9b8..9d4a8ad9f9b 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
@@ -37,6 +37,7 @@ import static
org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM;
import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
import static org.apache.kafka.common.config.ConfigDef.Type.BOOLEAN;
import static org.apache.kafka.common.config.ConfigDef.Type.INT;
+import static org.apache.kafka.common.config.ConfigDef.Type.LONG;
import static org.apache.kafka.common.config.ConfigDef.Type.STRING;
import static org.apache.kafka.common.config.ConfigDef.ValidString.in;
@@ -107,6 +108,8 @@ public final class GroupConfig extends AbstractConfig {
public static final String STREAMS_NUM_WARMUP_REPLICAS_CONFIG =
"streams.num.warmup.replicas";
+ public static final String STREAMS_ACCEPTABLE_RECOVERY_LAG_CONFIG =
"streams.acceptable.recovery.lag";
+
public static final String ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG =
"errors.deadletterqueue.topic.name";
public static final String ERRORS_DEADLETTERQUEUE_TOPIC_NAME_DEFAULT = "";
public static final String ERRORS_DEADLETTERQUEUE_TOPIC_NAME_DOC = "The
name of the topic to be used as the dead-letter queue (DLQ) topic for this
share group. If blank (the default), the group does not have a DLQ topic.";
@@ -155,6 +158,8 @@ public final class GroupConfig extends AbstractConfig {
private final Optional<Integer> streamsNumWarmupReplicas;
+ private final Optional<Long> streamsAcceptableRecoveryLag;
+
private final Optional<IsolationLevel> shareIsolationLevel;
private final Optional<Boolean> shareRenewAcknowledgeEnable;
@@ -292,6 +297,12 @@ public final class GroupConfig extends AbstractConfig {
atLeast(0),
MEDIUM,
GroupCoordinatorConfig.STREAMS_GROUP_NUM_WARMUP_REPLICAS_DOC)
+ .define(STREAMS_ACCEPTABLE_RECOVERY_LAG_CONFIG,
+ LONG,
+
GroupCoordinatorConfig.STREAMS_GROUP_ACCEPTABLE_RECOVERY_LAG_DEFAULT,
+ atLeast(0),
+ MEDIUM,
+ GroupCoordinatorConfig.STREAMS_GROUP_ACCEPTABLE_RECOVERY_LAG_DOC)
// DLQ configurations (KIP-1191)
.define(ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG,
@@ -337,6 +348,7 @@ public final class GroupConfig extends AbstractConfig {
Map.entry(STREAMS_ASSIGNOR_OFFLOAD_ENABLE_CONFIG,
Optional.of(GroupCoordinatorConfig.STREAMS_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG)),
Map.entry(STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG,
Optional.of(GroupCoordinatorConfig.STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_CONFIG)),
Map.entry(STREAMS_NUM_WARMUP_REPLICAS_CONFIG,
Optional.of(GroupCoordinatorConfig.STREAMS_GROUP_NUM_WARMUP_REPLICAS_CONFIG)),
+ Map.entry(STREAMS_ACCEPTABLE_RECOVERY_LAG_CONFIG,
Optional.of(GroupCoordinatorConfig.STREAMS_GROUP_ACCEPTABLE_RECOVERY_LAG_CONFIG)),
// DLQ configs
Map.entry(ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG, Optional.empty()),
@@ -386,6 +398,7 @@ public final class GroupConfig extends AbstractConfig {
this.streamsAssignorOffloadEnable =
optionalBoolean(STREAMS_ASSIGNOR_OFFLOAD_ENABLE_CONFIG);
this.streamsTaskOffsetIntervalMs =
optionalInt(STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG);
this.streamsNumWarmupReplicas =
optionalInt(STREAMS_NUM_WARMUP_REPLICAS_CONFIG);
+ this.streamsAcceptableRecoveryLag =
optionalLong(STREAMS_ACCEPTABLE_RECOVERY_LAG_CONFIG);
this.shareIsolationLevel = optionalString(SHARE_ISOLATION_LEVEL_CONFIG)
.map(s -> IsolationLevel.valueOf(s.toUpperCase(Locale.ROOT)));
this.shareRenewAcknowledgeEnable =
optionalBoolean(SHARE_RENEW_ACKNOWLEDGE_ENABLE_CONFIG);
@@ -397,6 +410,10 @@ public final class GroupConfig extends AbstractConfig {
return originals().containsKey(key) ? Optional.of(getInt(key)) :
Optional.empty();
}
+ private Optional<Long> optionalLong(String key) {
+ return originals().containsKey(key) ? Optional.of(getLong(key)) :
Optional.empty();
+ }
+
private Optional<Boolean> optionalBoolean(String key) {
return originals().containsKey(key) ? Optional.of(getBoolean(key)) :
Optional.empty();
}
@@ -1142,6 +1159,13 @@ public final class GroupConfig extends AbstractConfig {
return streamsNumWarmupReplicas;
}
+ /**
+ * The acceptable recovery lag for streams groups.
+ */
+ public Optional<Long> streamsAcceptableRecoveryLag() {
+ return streamsAcceptableRecoveryLag;
+ }
+
/**
* The share group isolation level.
*/
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
index a6cc89d8a75..7a634092ae1 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
@@ -400,6 +400,10 @@ public class GroupCoordinatorConfig {
public static final int STREAMS_GROUP_MAX_WARMUP_REPLICAS_DEFAULT = 20;
public static final String STREAMS_GROUP_MAX_WARMUP_REPLICAS_DOC = "The
maximum allowed value for the group-level configuration of " +
GroupConfig.STREAMS_NUM_WARMUP_REPLICAS_CONFIG;
+ public static final String STREAMS_GROUP_ACCEPTABLE_RECOVERY_LAG_CONFIG =
"group.streams.acceptable.recovery.lag";
+ public static final long STREAMS_GROUP_ACCEPTABLE_RECOVERY_LAG_DEFAULT =
10000L;
+ public static final String STREAMS_GROUP_ACCEPTABLE_RECOVERY_LAG_DOC =
"The maximum acceptable lag (number of offsets to catch up) for a client to be
considered caught-up enough to receive an active task assignment.";
+
public static final Set<String> RECONFIGURABLE_CONFIGS = Set.of(
CACHED_BUFFER_MAX_BYTES_CONFIG,
CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG,
@@ -492,7 +496,8 @@ public class GroupCoordinatorConfig {
.define(STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_CONFIG, INT,
STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM,
STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_DOC)
.define(STREAMS_GROUP_MIN_TASK_OFFSET_INTERVAL_MS_CONFIG, INT,
STREAMS_GROUP_MIN_TASK_OFFSET_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM,
STREAMS_GROUP_MIN_TASK_OFFSET_INTERVAL_MS_DOC)
.define(STREAMS_GROUP_NUM_WARMUP_REPLICAS_CONFIG, INT,
STREAMS_GROUP_NUM_WARMUP_REPLICAS_DEFAULT, atLeast(0), MEDIUM,
STREAMS_GROUP_NUM_WARMUP_REPLICAS_DOC)
- .define(STREAMS_GROUP_MAX_WARMUP_REPLICAS_CONFIG, INT,
STREAMS_GROUP_MAX_WARMUP_REPLICAS_DEFAULT, atLeast(0), MEDIUM,
STREAMS_GROUP_MAX_WARMUP_REPLICAS_DOC);
+ .define(STREAMS_GROUP_MAX_WARMUP_REPLICAS_CONFIG, INT,
STREAMS_GROUP_MAX_WARMUP_REPLICAS_DEFAULT, atLeast(0), MEDIUM,
STREAMS_GROUP_MAX_WARMUP_REPLICAS_DOC)
+ .define(STREAMS_GROUP_ACCEPTABLE_RECOVERY_LAG_CONFIG, LONG,
STREAMS_GROUP_ACCEPTABLE_RECOVERY_LAG_DEFAULT, atLeast(0L), MEDIUM,
STREAMS_GROUP_ACCEPTABLE_RECOVERY_LAG_DOC);
/**
@@ -560,6 +565,7 @@ public class GroupCoordinatorConfig {
private final int streamsGroupMinTaskOffsetIntervalMs;
private final int streamsGroupNumWarmupReplicas;
private final int streamsGroupMaxWarmupReplicas;
+ private final long streamsGroupAcceptableRecoveryLag;
private final AbstractConfig config;
@@ -582,10 +588,10 @@ public class GroupCoordinatorConfig {
this.offsetsRetentionMs =
config.getInt(GroupCoordinatorConfig.OFFSETS_RETENTION_MINUTES_CONFIG) * 60L *
1000L;
this.offsetCommitTimeoutMs =
config.getInt(GroupCoordinatorConfig.OFFSET_COMMIT_TIMEOUT_MS_CONFIG);
this.consumerGroupMigrationPolicy = ConsumerGroupMigrationPolicy.parse(
-
config.getString(GroupCoordinatorConfig.CONSUMER_GROUP_MIGRATION_POLICY_CONFIG));
+
config.getString(GroupCoordinatorConfig.CONSUMER_GROUP_MIGRATION_POLICY_CONFIG));
this.offsetTopicCompressionType =
Optional.ofNullable(config.getInt(GroupCoordinatorConfig.OFFSETS_TOPIC_COMPRESSION_CODEC_CONFIG))
- .map(CompressionType::forId)
- .orElse(null);
+ .map(CompressionType::forId)
+ .orElse(null);
this.offsetsLoadBufferSize =
config.getInt(GroupCoordinatorConfig.OFFSETS_LOAD_BUFFER_SIZE_CONFIG);
this.offsetsTopicPartitions =
config.getInt(GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG);
this.offsetsTopicReplicationFactor =
config.getShort(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG);
@@ -629,8 +635,13 @@ public class GroupCoordinatorConfig {
this.streamsGroupMinTaskOffsetIntervalMs =
config.getInt(GroupCoordinatorConfig.STREAMS_GROUP_MIN_TASK_OFFSET_INTERVAL_MS_CONFIG);
this.streamsGroupNumWarmupReplicas =
config.getInt(GroupCoordinatorConfig.STREAMS_GROUP_NUM_WARMUP_REPLICAS_CONFIG);
this.streamsGroupMaxWarmupReplicas =
config.getInt(GroupCoordinatorConfig.STREAMS_GROUP_MAX_WARMUP_REPLICAS_CONFIG);
+ this.streamsGroupAcceptableRecoveryLag =
config.getLong(GroupCoordinatorConfig.STREAMS_GROUP_ACCEPTABLE_RECOVERY_LAG_CONFIG);
this.config = config;
+ checkConstraints();
+ }
+
+ private void checkConstraints() {
// New group coordinator configs validation.
require(consumerGroupMaxHeartbeatIntervalMs >=
consumerGroupMinHeartbeatIntervalMs,
String.format("%s must be greater than or equal to %s",
CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG,
CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG));
@@ -732,7 +743,6 @@ public class GroupCoordinatorConfig {
String.format("%s must be greater than or equal to %s",
STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_CONFIG,
STREAMS_GROUP_MIN_TASK_OFFSET_INTERVAL_MS_CONFIG));
require(streamsGroupNumWarmupReplicas <= streamsGroupMaxWarmupReplicas,
String.format("%s must be less than or equal to %s",
STREAMS_GROUP_NUM_WARMUP_REPLICAS_CONFIG,
STREAMS_GROUP_MAX_WARMUP_REPLICAS_CONFIG));
-
}
/**
@@ -1363,4 +1373,11 @@ public class GroupCoordinatorConfig {
public int streamsGroupMaxWarmupReplicas() {
return streamsGroupMaxWarmupReplicas;
}
+
+ /**
+ * The acceptable recovery lag for streams groups.
+ */
+ public long streamsGroupAcceptableRecoveryLag() {
+ return streamsGroupAcceptableRecoveryLag;
+ }
}
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index a67ca07883a..6b7514b90ce 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -2145,6 +2145,10 @@ public class GroupMetadataManager {
.setMemberEpoch(updatedMember.memberEpoch())
.setHeartbeatIntervalMs(streamsGroupHeartbeatIntervalMs(groupId))
.setTaskOffsetIntervalMs(streamsGroupTaskOffsetIntervalMs(groupId));
+
+ // AcceptableRecoveryLag is marked as `ignorable` so we can just
blindly set it
+
response.setAcceptableRecoveryLag(streamsGroupAcceptableRecoveryLag(groupId));
+
// The assignment is only provided in the following cases:
// 1. The member is joining.
// 2. The member's assignment has been updated.
@@ -8938,6 +8942,15 @@ public class GroupMetadataManager {
.orElse(config.streamsGroupTaskOffsetIntervalMs());
}
+ /**
+ * Get the acceptable recovery lag of the provided streams group.
+ */
+ private long streamsGroupAcceptableRecoveryLag(String groupId) {
+ Optional<GroupConfig> groupConfig =
groupConfigManager.groupConfig(groupId);
+ return groupConfig.flatMap(GroupConfig::streamsAcceptableRecoveryLag)
+ .orElse(config.streamsGroupAcceptableRecoveryLag());
+ }
+
/**
* Get the initial rebalance delay of the provided streams group.
*/
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index 30ef688d8a5..f199a804254 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -75,7 +75,9 @@ import org.apache.kafka.common.metadata.PartitionRecord;
import org.apache.kafka.common.metadata.RemoveTopicRecord;
import org.apache.kafka.common.metadata.TopicRecord;
import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.MessageUtil;
import org.apache.kafka.common.requests.StreamsGroupHeartbeatResponse.Status;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Utils;
@@ -1176,7 +1178,8 @@ public class GroupMetadataManagerTest {
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(groupId,
topology));
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId,
100, computeGroupHash(Map.of(
fooTopicName, fooTopicHash
- )), 1, Map.of("num.standby.replicas", "0")));
+ )), 1, new TreeMap<>(Map.of("num.standby.replicas", "0"))
+ ));
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId,
memberId,
TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2)
@@ -1209,7 +1212,8 @@ public class GroupMetadataManagerTest {
.setStandbyTasks(List.of())
.setWarmupTasks(List.of())
.setStatus(List.of())
- .setTaskOffsetIntervalMs(60_000),
+ .setTaskOffsetIntervalMs(60_000)
+ .setAcceptableRecoveryLag(10_000),
result.response().data()
);
}
@@ -17904,7 +17908,8 @@ public class GroupMetadataManagerTest {
.setStandbyTasks(List.of())
.setWarmupTasks(List.of())
.setStatus(List.of())
- .setTaskOffsetIntervalMs(60_000),
+ .setTaskOffsetIntervalMs(60_000)
+ .setAcceptableRecoveryLag(10_000),
result.response().data()
);
@@ -17999,7 +18004,8 @@ public class GroupMetadataManagerTest {
.setStandbyTasks(List.of())
.setWarmupTasks(List.of())
.setStatus(List.of())
- .setTaskOffsetIntervalMs(60_000),
+ .setTaskOffsetIntervalMs(60_000)
+ .setAcceptableRecoveryLag(10_000),
result.response().data()
);
@@ -18093,7 +18099,8 @@ public class GroupMetadataManagerTest {
.setStatus(List.of(new
StreamsGroupHeartbeatResponseData.Status()
.setStatusCode(Status.MISSING_SOURCE_TOPICS.code())
.setStatusDetail("Source topics bar are missing.")))
- .setTaskOffsetIntervalMs(60_000),
+ .setTaskOffsetIntervalMs(60_000)
+ .setAcceptableRecoveryLag(10_000),
result.response().data()
);
@@ -18186,7 +18193,8 @@ public class GroupMetadataManagerTest {
.setStatus(List.of(new
StreamsGroupHeartbeatResponseData.Status()
.setStatusCode(Status.MISSING_INTERNAL_TOPICS.code())
.setStatusDetail("Internal topics are missing: bar")))
- .setTaskOffsetIntervalMs(60_000),
+ .setTaskOffsetIntervalMs(60_000)
+ .setAcceptableRecoveryLag(10_000),
result.response().data()
);
@@ -18276,7 +18284,8 @@ public class GroupMetadataManagerTest {
.setStatus(List.of(new
StreamsGroupHeartbeatResponseData.Status()
.setStatusCode(Status.INCORRECTLY_PARTITIONED_TOPICS.code())
.setStatusDetail("Following topics do not have the same
number of partitions: [{bar=3, foo=6}]")))
- .setTaskOffsetIntervalMs(60_000),
+ .setTaskOffsetIntervalMs(60_000)
+ .setAcceptableRecoveryLag(10_000),
result.response().data()
);
@@ -18383,7 +18392,8 @@ public class GroupMetadataManagerTest {
.setStatus(List.of(new
StreamsGroupHeartbeatResponseData.Status()
.setStatusCode(Status.STALE_TOPOLOGY.code())
.setStatusDetail("The member's topology epoch 0 is behind
the group's topology epoch 1.")))
- .setTaskOffsetIntervalMs(60_000),
+ .setTaskOffsetIntervalMs(60_000)
+ .setAcceptableRecoveryLag(10_000),
result.response().data()
);
@@ -18486,7 +18496,8 @@ public class GroupMetadataManagerTest {
.setStatusCode(Status.SHUTDOWN_APPLICATION.code())
.setStatusDetail(statusDetail)
))
- .setTaskOffsetIntervalMs(60_000),
+ .setTaskOffsetIntervalMs(60_000)
+ .setAcceptableRecoveryLag(10_000),
result1.response().data()
);
assertRecordsEquals(List.of(), result1.records());
@@ -18508,7 +18519,8 @@ public class GroupMetadataManagerTest {
.setStatusCode(Status.SHUTDOWN_APPLICATION.code())
.setStatusDetail(statusDetail)
))
- .setTaskOffsetIntervalMs(60_000),
+ .setTaskOffsetIntervalMs(60_000)
+ .setAcceptableRecoveryLag(10_000),
result2.response().data()
);
@@ -18602,7 +18614,8 @@ public class GroupMetadataManagerTest {
.setStatusCode(Status.SHUTDOWN_APPLICATION.code())
.setStatusDetail(statusDetail)
))
- .setTaskOffsetIntervalMs(60_000),
+ .setTaskOffsetIntervalMs(60_000)
+ .setAcceptableRecoveryLag(10_000),
result2.response().data()
);
}
@@ -18683,7 +18696,8 @@ public class GroupMetadataManagerTest {
.setStandbyTasks(List.of())
.setWarmupTasks(List.of())
.setStatus(List.of())
- .setTaskOffsetIntervalMs(60_000),
+ .setTaskOffsetIntervalMs(60_000)
+ .setAcceptableRecoveryLag(10_000),
result.response().data()
);
@@ -18813,7 +18827,8 @@ public class GroupMetadataManagerTest {
.setStandbyTasks(List.of())
.setWarmupTasks(List.of())
.setStatus(List.of())
- .setTaskOffsetIntervalMs(60_000),
+ .setTaskOffsetIntervalMs(60_000)
+ .setAcceptableRecoveryLag(10_000),
result.response().data()
);
@@ -18962,7 +18977,8 @@ public class GroupMetadataManagerTest {
.setStandbyTasks(List.of())
.setWarmupTasks(List.of())
.setStatus(List.of())
- .setTaskOffsetIntervalMs(60_000),
+ .setTaskOffsetIntervalMs(60_000)
+ .setAcceptableRecoveryLag(10_000),
result.response().data()
);
@@ -19091,7 +19107,8 @@ public class GroupMetadataManagerTest {
.setStandbyTasks(List.of())
.setWarmupTasks(List.of())
.setStatus(List.of())
- .setTaskOffsetIntervalMs(60_000),
+ .setTaskOffsetIntervalMs(60_000)
+ .setAcceptableRecoveryLag(10_000),
result.response().data()
);
@@ -19108,11 +19125,126 @@ public class GroupMetadataManagerTest {
.setMemberEpoch(2)
.setHeartbeatIntervalMs(5000)
.setStatus(List.of())
- .setTaskOffsetIntervalMs(60_000),
+ .setTaskOffsetIntervalMs(60_000)
+ .setAcceptableRecoveryLag(10_000),
result.response().data()
);
}
+ @Test
+ public void testStreamsGroupHeartbeatResponseVersion0() {
+ String groupId = "fooup";
+ String memberId = Uuid.randomUuid().toString();
+ String subtopology1 = "subtopology1";
+ String fooTopicName = "foo";
+ Uuid fooTopicId = Uuid.randomUuid();
+ Topology topology = new Topology().setSubtopologies(List.of(
+ new
Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName))
+ ));
+
+ MockTaskAssignor assignor = new MockTaskAssignor("sticky");
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .withStreamsGroupTaskAssignors(List.of(assignor))
+ .withMetadataImage(new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 2)
+ .buildCoordinatorMetadataImage())
+
.withConfig(GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG,
0)
+ .build();
+
+ assignor.prepareGroupAssignment(
+ Map.of(memberId,
TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1))));
+
+ // Send version 0 heartbeat request
+ CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord>
result = context.streamsGroupHeartbeat(
+ new StreamsGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId)
+ .setMemberEpoch(0)
+ .setRebalanceTimeoutMs(1500)
+ .setTopology(topology)
+ .setActiveTasks(List.of())
+ .setStandbyTasks(List.of())
+ .setWarmupTasks(List.of()),
+ (short) 0 // Version 0
+ );
+
+ StreamsGroupHeartbeatResult response = result.response();
+ StreamsGroupHeartbeatResponseData data = response.data();
+
+ assertEquals(0, data.acceptableRecoveryLagLegacy(),
+ "Version 0 response should NOT include acceptableRecoveryLagLegacy
(should be default 0)");
+ // It's ok for version0 to set `acceptableRecoveryLag` because the
field is marked as `ignorable`
+ assertEquals(10_000L, data.acceptableRecoveryLag(),
+ "Version 0 response should NOT include acceptableRecoveryLag
(should be default 10_000L)");
+
+ // Verify other fields are set correctly
+ assertEquals(memberId, data.memberId());
+ assertEquals(2, data.memberEpoch());
+ assertEquals(5000, data.heartbeatIntervalMs());
+ assertEquals(60_000, data.taskOffsetIntervalMs());
+
+ // Verify that AcceptableRecoveryLag (versions: "1+", ignorable: true)
is dropped when the data is serialized
+ // at version 0 and deserialized again, since a version-0 receiver
would not know about it.
+ ByteBufferAccessor serializedV0 =
MessageUtil.toByteBufferAccessor(data, (short) 0);
+ StreamsGroupHeartbeatResponseData deserializedData = new
StreamsGroupHeartbeatResponseData();
+ deserializedData.read(serializedV0, (short) 0);
+
+ assertEquals(0, deserializedData.acceptableRecoveryLagLegacy(),
+ "AcceptableRecoveryLagLegacy must survive the version-0 roundtrip
unchanged");
+ assertEquals(-1L, deserializedData.acceptableRecoveryLag(),
+ "AcceptableRecoveryLag (ignorable, versions 1+) must be absent
after version-0 roundtrip; should revert to default -1, even if it was set to
default 10_000L");
+ }
+
+ @Test
+ public void testStreamsGroupHeartbeatResponseVersion1() {
+ String groupId = "fooup";
+ String memberId = Uuid.randomUuid().toString();
+ String subtopology1 = "subtopology1";
+ String fooTopicName = "foo";
+ Uuid fooTopicId = Uuid.randomUuid();
+ Topology topology = new Topology().setSubtopologies(List.of(
+ new
Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName))
+ ));
+
+ MockTaskAssignor assignor = new MockTaskAssignor("sticky");
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .withStreamsGroupTaskAssignors(List.of(assignor))
+ .withMetadataImage(new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 2)
+ .buildCoordinatorMetadataImage())
+
.withConfig(GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG,
0)
+ .build();
+
+ assignor.prepareGroupAssignment(
+ Map.of(memberId,
TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1))));
+
+ // Send version 1 heartbeat request
+ CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord>
result = context.streamsGroupHeartbeat(
+ new StreamsGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId)
+ .setMemberEpoch(0)
+ .setRebalanceTimeoutMs(1500)
+ .setTopology(topology)
+ .setActiveTasks(List.of())
+ .setStandbyTasks(List.of())
+ .setWarmupTasks(List.of()),
+ (short) 1 // Version 1
+ );
+
+ // Version 1 response should have AcceptableRecoveryLag (int64) set,
not AcceptableRecoveryLagLegacy (int32)
+ assertEquals(0, result.response().data().acceptableRecoveryLagLegacy(),
+ "Version 1 response should NOT include acceptableRecoveryLagLegacy
(should be default 0)");
+ assertEquals(10_000L, result.response().data().acceptableRecoveryLag(),
+ "Version 1 response should include acceptableRecoveryLag");
+
+ // Verify other fields are set correctly
+ assertEquals(memberId, result.response().data().memberId());
+ assertEquals(2, result.response().data().memberEpoch());
+ assertEquals(5000, result.response().data().heartbeatIntervalMs());
+ assertEquals(60_000, result.response().data().taskOffsetIntervalMs());
+ }
+
@Test
public void testStreamsGroupHeartbeatAlwaysSetsStatus() {
String groupId = "fooup";
@@ -19162,7 +19294,8 @@ public class GroupMetadataManagerTest {
.setStandbyTasks(List.of())
.setWarmupTasks(List.of())
.setStatus(List.of())
- .setTaskOffsetIntervalMs(60_000),
+ .setTaskOffsetIntervalMs(60_000)
+ .setAcceptableRecoveryLag(10_000),
result.response().data()
);
@@ -19179,7 +19312,8 @@ public class GroupMetadataManagerTest {
.setMemberEpoch(2)
.setHeartbeatIntervalMs(5000)
.setStatus(List.of())
- .setTaskOffsetIntervalMs(60_000),
+ .setTaskOffsetIntervalMs(60_000)
+ .setAcceptableRecoveryLag(10_000),
result.response().data()
);
}
@@ -19232,7 +19366,8 @@ public class GroupMetadataManagerTest {
.setStatusCode(Status.ASSIGNMENT_DELAYED.code())
.setStatusDetail("Assignment delayed due to the
configured initial rebalance delay.")
))
- .setTaskOffsetIntervalMs(60_000),
+ .setTaskOffsetIntervalMs(60_000)
+ .setAcceptableRecoveryLag(10_000),
result.response().data()
);
@@ -19262,7 +19397,8 @@ public class GroupMetadataManagerTest {
.setStandbyTasks(List.of())
.setWarmupTasks(List.of())
.setStatus(List.of())
- .setTaskOffsetIntervalMs(60_000),
+ .setTaskOffsetIntervalMs(60_000)
+ .setAcceptableRecoveryLag(10_000),
result.response().data()
);
}
@@ -19430,7 +19566,8 @@ public class GroupMetadataManagerTest {
.setStandbyTasks(List.of())
.setWarmupTasks(List.of())
.setStatus(List.of())
- .setTaskOffsetIntervalMs(60_000),
+ .setTaskOffsetIntervalMs(60_000)
+ .setAcceptableRecoveryLag(10_000),
result.response().data()
);
@@ -19473,7 +19610,8 @@ public class GroupMetadataManagerTest {
.setStandbyTasks(List.of())
.setWarmupTasks(List.of())
.setStatus(List.of())
- .setTaskOffsetIntervalMs(60_000),
+ .setTaskOffsetIntervalMs(60_000)
+ .setAcceptableRecoveryLag(10_000),
result.response().data()
);
@@ -19520,7 +19658,8 @@ public class GroupMetadataManagerTest {
.setStandbyTasks(List.of())
.setWarmupTasks(List.of())
.setStatus(List.of())
- .setTaskOffsetIntervalMs(60_000),
+ .setTaskOffsetIntervalMs(60_000)
+ .setAcceptableRecoveryLag(10_000),
result.response().data()
);
@@ -19555,7 +19694,8 @@ public class GroupMetadataManagerTest {
.setMemberEpoch(11)
.setHeartbeatIntervalMs(5000)
.setStatus(List.of())
- .setTaskOffsetIntervalMs(60_000),
+ .setTaskOffsetIntervalMs(60_000)
+ .setAcceptableRecoveryLag(10_000),
result.response().data()
);
@@ -19596,7 +19736,8 @@ public class GroupMetadataManagerTest {
.setMemberEpoch(11)
.setHeartbeatIntervalMs(5000)
.setStatus(List.of())
- .setTaskOffsetIntervalMs(60_000),
+ .setTaskOffsetIntervalMs(60_000)
+ .setAcceptableRecoveryLag(10_000),
result.response().data()
);
@@ -19627,7 +19768,8 @@ public class GroupMetadataManagerTest {
.setMemberEpoch(10)
.setHeartbeatIntervalMs(5000)
.setStatus(List.of())
- .setTaskOffsetIntervalMs(60_000),
+ .setTaskOffsetIntervalMs(60_000)
+ .setAcceptableRecoveryLag(10_000),
result.response().data()
);
@@ -19655,7 +19797,8 @@ public class GroupMetadataManagerTest {
.setStandbyTasks(List.of())
.setWarmupTasks(List.of())
.setStatus(List.of())
- .setTaskOffsetIntervalMs(60_000),
+ .setTaskOffsetIntervalMs(60_000)
+ .setAcceptableRecoveryLag(10_000),
result.response().data()
);
@@ -19687,7 +19830,8 @@ public class GroupMetadataManagerTest {
.setMemberEpoch(11)
.setHeartbeatIntervalMs(5000)
.setStatus(List.of())
- .setTaskOffsetIntervalMs(60_000),
+ .setTaskOffsetIntervalMs(60_000)
+ .setAcceptableRecoveryLag(10_000),
result.response().data()
);
@@ -19731,7 +19875,8 @@ public class GroupMetadataManagerTest {
.setStandbyTasks(List.of())
.setWarmupTasks(List.of())
.setStatus(List.of())
- .setTaskOffsetIntervalMs(60_000),
+ .setTaskOffsetIntervalMs(60_000)
+ .setAcceptableRecoveryLag(10_000),
result.response().data()
);
@@ -19779,7 +19924,8 @@ public class GroupMetadataManagerTest {
.setStandbyTasks(List.of())
.setWarmupTasks(List.of())
.setStatus(List.of())
- .setTaskOffsetIntervalMs(60_000),
+ .setTaskOffsetIntervalMs(60_000)
+ .setAcceptableRecoveryLag(10_000),
result.response().data()
);
@@ -19991,7 +20137,8 @@ public class GroupMetadataManagerTest {
.setStandbyTasks(List.of())
.setWarmupTasks(List.of())
.setStatus(List.of())
- .setTaskOffsetIntervalMs(60_000),
+ .setTaskOffsetIntervalMs(60_000)
+ .setAcceptableRecoveryLag(10_000),
result.response().data()
);
@@ -20128,7 +20275,8 @@ public class GroupMetadataManagerTest {
.setStandbyTasks(List.of())
.setWarmupTasks(List.of())
.setStatus(List.of())
- .setTaskOffsetIntervalMs(60_000),
+ .setTaskOffsetIntervalMs(60_000)
+ .setAcceptableRecoveryLag(10_000),
result.response().data()
);
@@ -20372,7 +20520,8 @@ public class GroupMetadataManagerTest {
.setStandbyTasks(List.of())
.setWarmupTasks(List.of())
.setStatus(List.of())
- .setTaskOffsetIntervalMs(60_000),
+ .setTaskOffsetIntervalMs(60_000)
+ .setAcceptableRecoveryLag(10_000),
result.response().data()
);
@@ -20412,7 +20561,8 @@ public class GroupMetadataManagerTest {
.setStandbyTasks(List.of())
.setWarmupTasks(List.of())
.setStatus(List.of())
- .setTaskOffsetIntervalMs(60_000),
+ .setTaskOffsetIntervalMs(60_000)
+ .setAcceptableRecoveryLag(10_000),
result.response().data()
);
@@ -20442,7 +20592,8 @@ public class GroupMetadataManagerTest {
.setStandbyTasks(List.of())
.setWarmupTasks(List.of())
.setStatus(List.of())
- .setTaskOffsetIntervalMs(60_000),
+ .setTaskOffsetIntervalMs(60_000)
+ .setAcceptableRecoveryLag(10_000),
result.response().data()
);
@@ -20475,7 +20626,8 @@ public class GroupMetadataManagerTest {
.setHeartbeatIntervalMs(5000)
.setEndpointInformationEpoch(0)
.setStatus(List.of())
- .setTaskOffsetIntervalMs(60_000),
+ .setTaskOffsetIntervalMs(60_000)
+ .setAcceptableRecoveryLag(10_000),
result.response().data()
);
@@ -20541,7 +20693,8 @@ public class GroupMetadataManagerTest {
.setStandbyTasks(List.of())
.setWarmupTasks(List.of())
.setStatus(List.of())
- .setTaskOffsetIntervalMs(60_000),
+ .setTaskOffsetIntervalMs(60_000)
+ .setAcceptableRecoveryLag(10_000),
result.response().data()
);
@@ -20581,7 +20734,8 @@ public class GroupMetadataManagerTest {
.setStandbyTasks(List.of())
.setWarmupTasks(List.of())
.setStatus(List.of())
- .setTaskOffsetIntervalMs(60_000),
+ .setTaskOffsetIntervalMs(60_000)
+ .setAcceptableRecoveryLag(10_000),
result.response().data()
);
@@ -20610,7 +20764,8 @@ public class GroupMetadataManagerTest {
.setStandbyTasks(List.of())
.setWarmupTasks(List.of())
.setStatus(List.of())
- .setTaskOffsetIntervalMs(60_000),
+ .setTaskOffsetIntervalMs(60_000)
+ .setAcceptableRecoveryLag(10_000),
result.response().data()
);
@@ -20794,20 +20949,21 @@ public class GroupMetadataManagerTest {
.setStandbyPartitions(List.of());
assertResponseEquals(
- new StreamsGroupHeartbeatResponseData()
- .setMemberId(memberId)
- .setMemberEpoch(2)
- .setHeartbeatIntervalMs(5000)
- .setActiveTasks(List.of(
- new StreamsGroupHeartbeatResponseData.TaskIds()
- .setSubtopologyId(subtopology1)
- .setPartitions(List.of(0, 1))))
- .setStandbyTasks(List.of())
- .setWarmupTasks(List.of())
-
.setPartitionsByUserEndpoint(List.of(expectedEndpointToPartitions))
- .setStatus(List.of())
- .setTaskOffsetIntervalMs(60_000),
- result.response().data()
+ new StreamsGroupHeartbeatResponseData()
+ .setMemberId(memberId)
+ .setMemberEpoch(2)
+ .setHeartbeatIntervalMs(5000)
+ .setActiveTasks(List.of(
+ new StreamsGroupHeartbeatResponseData.TaskIds()
+ .setSubtopologyId(subtopology1)
+ .setPartitions(List.of(0, 1))))
+ .setStandbyTasks(List.of())
+ .setWarmupTasks(List.of())
+
.setPartitionsByUserEndpoint(List.of(expectedEndpointToPartitions))
+ .setStatus(List.of())
+ .setTaskOffsetIntervalMs(60_000)
+ .setAcceptableRecoveryLag(10_000),
+ result.response().data()
);
result = context.streamsGroupHeartbeat(
@@ -20915,7 +21071,7 @@ public class GroupMetadataManagerTest {
}
@Test
- public void testStreamsGroupEpochIncreaseWithAssignmentConfigChanges() {
+ public void
testStreamsGroupEpochIncreaseWithNumStandbyReplicasConfigChanges() {
String groupId = "fooup";
String memberId = Uuid.randomUuid().toString();
String subtopology1 = "subtopology1";
@@ -20951,7 +21107,7 @@ public class GroupMetadataManagerTest {
)
.build();
- // Change the assignment config
+ // Change the group-level num.standby.replicas config
Properties newConfig = new Properties();
newConfig.put(GroupConfig.STREAMS_NUM_STANDBY_REPLICAS_CONFIG, "2");
context.groupConfigManager.updateGroupConfig(groupId, newConfig);
@@ -20985,7 +21141,8 @@ public class GroupMetadataManagerTest {
.setStandbyTasks(List.of())
.setWarmupTasks(List.of())
.setStatus(List.of())
- .setTaskOffsetIntervalMs(60_000),
+ .setTaskOffsetIntervalMs(60_000)
+ .setAcceptableRecoveryLag(10_000),
result.response().data()
);
@@ -21019,6 +21176,102 @@ public class GroupMetadataManagerTest {
assertEquals("2",
group.lastAssignmentConfigs().get("num.standby.replicas"));
}
+ @Test
+ public void
testStreamsGroupEpochShouldNotIncreaseWithAcceptableRecoveryLagConfigChange() {
+ String groupId = "fooup";
+ String memberId = Uuid.randomUuid().toString();
+ String subtopology1 = "subtopology1";
+ String fooTopicName = "foo";
+ Uuid fooTopicId = Uuid.randomUuid();
+
+ Topology topology = new Topology().setSubtopologies(List.of(
+ new
Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName))
+ ));
+
+ CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 6)
+ .buildCoordinatorMetadataImage();
+
+ MockTaskAssignor assignor = new MockTaskAssignor("sticky");
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .withStreamsGroupTaskAssignors(List.of(assignor))
+ .withMetadataImage(metadataImage)
+ .withStreamsGroup(new StreamsGroupBuilder(groupId, 10)
+ .withMember(streamsGroupMemberBuilderWithDefaults(memberId)
+
.setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE)
+ .setMemberEpoch(10)
+ .setPreviousMemberEpoch(9)
+
.setAssignedTasks(TaskAssignmentTestUtil.mkTasksTupleWithCommonEpoch(TaskRole.ACTIVE,
10,
+ TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2,
3, 4, 5)))
+ .build())
+ .withTargetAssignment(memberId,
TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
+ TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3,
4, 5)))
+ .withTargetAssignmentEpoch(10)
+ .withTopology(StreamsTopology.fromHeartbeatRequest(topology))
+ .withValidatedTopologyEpoch(0)
+ )
+ .build();
+
+ // Change the group-level acceptable.recovery.lag config
+ Properties newConfig = new Properties();
+ newConfig.put(GroupConfig.STREAMS_ACCEPTABLE_RECOVERY_LAG_CONFIG,
"50000");
+ context.groupConfigManager.updateGroupConfig(groupId, newConfig);
+
+ assignor.prepareGroupAssignment(
+ Map.of(memberId,
TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
+ TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 4,
5)))
+ );
+
+ CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord>
result = context.streamsGroupHeartbeat(
+ new StreamsGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId)
+ .setMemberEpoch(10)
+ .setActiveTasks(List.of(new
StreamsGroupHeartbeatRequestData.TaskIds()
+ .setSubtopologyId(subtopology1)
+ .setPartitions(List.of(0, 1, 2, 3, 4, 5))))
+ .setStandbyTasks(List.of())
+ .setWarmupTasks(List.of()));
+
+ assertResponseEquals(
+ new StreamsGroupHeartbeatResponseData()
+ .setMemberId(memberId)
+ .setMemberEpoch(11)
+ .setHeartbeatIntervalMs(5000)
+ .setStatus(List.of())
+ .setTaskOffsetIntervalMs(60_000)
+ .setAcceptableRecoveryLag(50_000L),
+ result.response().data()
+ );
+
+ // Find the StreamsGroupMetadata record
+ CoordinatorRecord metadataRecord = result.records().stream()
+ .filter(record -> record.key() instanceof StreamsGroupMetadataKey)
+ .findFirst()
+ .orElse(null);
+
+ assertNotNull(metadataRecord, "Expected a StreamsGroupMetadata
record");
+ // Verify the metadata record contains the updated assignment config
+ StreamsGroupMetadataValue metadataValue = (StreamsGroupMetadataValue)
metadataRecord.value().message();
+ assertEquals(11, metadataValue.epoch());
+
+ // Verify the assignment config does not store acceptable.recovery.lag
+ List<StreamsGroupMetadataValue.LastAssignmentConfig> assignmentConfigs
= metadataValue.lastAssignmentConfigs();
+ assertFalse(assignmentConfigs.isEmpty(), "Expected assignment configs
to be present");
+
+ StreamsGroupMetadataValue.LastAssignmentConfig recoveryLagConfig =
assignmentConfigs.stream()
+ .filter(c -> "acceptable.recovery.lag".equals(c.key()))
+ .findFirst()
+ .orElse(null);
+
+ assertNull(recoveryLagConfig, "Expected acceptable.recovery.lag to be
null");
+
+ // Verify that group epoch stays the same
+ StreamsGroup group =
context.groupMetadataManager.streamsGroup(groupId);
+ int newGroupEpoch = group.groupEpoch();
+ assertEquals(11, newGroupEpoch);
+ }
+
@Test
public void testStreamsGroupHeartbeatWithNonEmptyClassicGroup() {
String classicGroupId = "classic-group-id";
@@ -21435,7 +21688,9 @@ public class GroupMetadataManagerTest {
.setWarmupTasks(List.of()));
assertEquals(2, result.response().data().memberEpoch());
assertEquals(
- Map.of("num.standby.replicas", "0"),
+ Map.of(
+ "num.standby.replicas", "0"
+ ),
assignor.lastPassedAssignmentConfigs()
);
@@ -21474,7 +21729,9 @@ public class GroupMetadataManagerTest {
// Verify that the new number of standby replicas is used
assertEquals(
- Map.of("num.standby.replicas", "2"),
+ Map.of(
+ "num.standby.replicas", "2"
+ ),
assignor.lastPassedAssignmentConfigs()
);
@@ -27761,7 +28018,8 @@ public class GroupMetadataManagerTest {
.setStandbyTasks(List.of())
.setWarmupTasks(List.of())
.setStatus(List.of())
- .setTaskOffsetIntervalMs(60_000),
+ .setTaskOffsetIntervalMs(60_000)
+ .setAcceptableRecoveryLag(10_000L),
result1.response().data()
);
@@ -27779,7 +28037,9 @@ public class GroupMetadataManagerTest {
StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(groupId,
topology),
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 2,
computeGroupHash(Map.of(
fooTopicName, computeTopicHash(fooTopicName, metadataImage)
- )), 0, Map.of("num.standby.replicas", "0")),
+ )), 0, new TreeMap<>(Map.of(
+ "num.standby.replicas", "0"
+ ))),
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId,
memberId1,
TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
TaskAssignmentTestUtil.mkTasks(subtopology, 0, 1, 2,
3, 4, 5)
@@ -27826,7 +28086,8 @@ public class GroupMetadataManagerTest {
.setStatus(List.of(new
StreamsGroupHeartbeatResponseData.Status()
.setStatusCode(Status.ASSIGNMENT_DELAYED.code())
.setStatusDetail("Assignment delayed due to the configured
assignment interval.")))
- .setTaskOffsetIntervalMs(60_000),
+ .setTaskOffsetIntervalMs(60_000)
+ .setAcceptableRecoveryLag(10_000L),
result2.response().data()
);
@@ -27841,7 +28102,9 @@ public class GroupMetadataManagerTest {
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId,
expectedMember2),
StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 3,
computeGroupHash(Map.of(
fooTopicName, computeTopicHash(fooTopicName, metadataImage)
- )), 0, Map.of("num.standby.replicas", "0")),
+ )), 0, new TreeMap<>(Map.of(
+ "num.standby.replicas", "0"
+ ))),
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId,
expectedMember2)
),
result2.records()
@@ -27864,7 +28127,8 @@ public class GroupMetadataManagerTest {
.setHeartbeatIntervalMs(5000)
.setEndpointInformationEpoch(0)
.setStatus(List.of())
- .setTaskOffsetIntervalMs(60_000),
+ .setTaskOffsetIntervalMs(60_000)
+ .setAcceptableRecoveryLag(10_000L),
result3.response().data()
);
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
index 37bc781697b..c3d9544520b 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
@@ -735,11 +735,18 @@ public class GroupMetadataManagerTestContext {
public CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord>
streamsGroupHeartbeat(
StreamsGroupHeartbeatRequestData request
+ ) {
+ return streamsGroupHeartbeat(request,
ApiKeys.STREAMS_GROUP_HEARTBEAT.latestVersion());
+ }
+
+ public CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord>
streamsGroupHeartbeat(
+ StreamsGroupHeartbeatRequestData request,
+ short version
) {
RequestContext context = new RequestContext(
new RequestHeader(
ApiKeys.STREAMS_GROUP_HEARTBEAT,
- ApiKeys.STREAMS_GROUP_HEARTBEAT.latestVersion(),
+ version,
"client",
0
),