This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 0f0b418190d KAFKA-20291 [2/3]: Implement assignment batching for share
groups (#21721)
0f0b418190d is described below
commit 0f0b418190df2fde0c5504402ccea7b16a8d17e4
Author: Sean Quah <[email protected]>
AuthorDate: Wed Mar 18 19:33:42 2026 +0000
KAFKA-20291 [2/3]: Implement assignment batching for share groups (#21721)
Implement target assignment batching for share groups.
Delay computing the next assignment for share groups until their
assignment interval has elapsed since their last assignment computation
finished.
Also fix a bug where we would bump the group epoch on every heartbeat
when the target assignment is delayed and there are unassigned
initialized share partitions. As a side effect, we bump the epoch fewer
times in tests when assignment batching is enabled. After the epoch is
bumped by a metadata update, it is not bumped again by share partition
initialization if the next assignment has not been computed yet.
We run request tests with assignment batching disabled, except for
heartbeat tests, which we run with and without assignment batching.
We run a selection of integration and system tests with and without
assignment batching.
Reviewers: Sushant Mahajan <[email protected]>, David Jacot
<[email protected]>
---
.../consumer/ShareConsumerRackAwareTest.java | 10 +
.../kafka/clients/consumer/ShareConsumerTest.java | 74 +++++-
.../api/AbstractAuthorizerIntegrationTest.scala | 1 +
.../server/ShareGroupDescribeRequestTest.scala | 1 +
.../server/ShareGroupHeartbeatRequestTest.scala | 194 +++++++++++++---
.../coordinator/group/GroupMetadataManager.java | 17 +-
.../group/GroupMetadataManagerTest.java | 252 +++++++++++++++++++++
.../kafkatest/tests/client/share_consumer_test.py | 18 +-
8 files changed, 519 insertions(+), 48 deletions(-)
diff --git
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerRackAwareTest.java
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerRackAwareTest.java
index 488f8b75494..dcf58b5f238 100644
---
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerRackAwareTest.java
+++
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerRackAwareTest.java
@@ -28,6 +28,7 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.test.ClusterInstance;
import org.apache.kafka.common.test.api.ClusterConfigProperty;
import org.apache.kafka.common.test.api.ClusterTest;
+import org.apache.kafka.common.test.api.ClusterTestDefaults;
import org.apache.kafka.common.test.api.Type;
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
import org.apache.kafka.test.TestUtils;
@@ -42,6 +43,15 @@ import java.util.concurrent.ExecutionException;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
public class ShareConsumerRackAwareTest {
+
+ @ClusterTestDefaults(
+ serverProperties = {
+ @ClusterConfigProperty(key =
GroupCoordinatorConfig.SHARE_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, value = "0")
+ }
+ )
+ public static class WithAssignmentBatchingDisabledTest extends
ShareConsumerRackAwareTest {
+ }
+
@ClusterTest(
types = {Type.KRAFT},
brokers = 3,
diff --git
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
index 325a3c3ca8d..89ea1b38c50 100644
---
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
+++
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
@@ -66,9 +66,11 @@ import org.apache.kafka.common.test.ClusterInstance;
import org.apache.kafka.common.test.api.ClusterConfigProperty;
import org.apache.kafka.common.test.api.ClusterTest;
import org.apache.kafka.common.test.api.ClusterTestDefaults;
+import org.apache.kafka.common.test.api.ClusterTests;
import org.apache.kafka.common.test.api.Type;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.coordinator.group.GroupConfig;
+import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig;
import org.apache.kafka.server.metrics.KafkaYammerMetrics;
import org.apache.kafka.server.share.SharePartitionKey;
@@ -271,7 +273,14 @@ public class ShareConsumerTest {
}
}
- @ClusterTest
+ @ClusterTests({
+ @ClusterTest(serverProperties = {
+ @ClusterConfigProperty(key =
GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, value =
"0")
+ }),
+ @ClusterTest(serverProperties = {
+ @ClusterConfigProperty(key =
GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, value =
"1000")
+ })
+ })
public void testSubscriptionAndPoll() {
alterShareAutoOffsetReset("group1", "earliest");
try (Producer<byte[], byte[]> producer = createProducer();
@@ -289,7 +298,14 @@ public class ShareConsumerTest {
}
}
- @ClusterTest
+ @ClusterTests({
+ @ClusterTest(serverProperties = {
+ @ClusterConfigProperty(key =
GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, value =
"0")
+ }),
+ @ClusterTest(serverProperties = {
+ @ClusterConfigProperty(key =
GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, value =
"1000")
+ })
+ })
public void testSubscriptionAndPollMultiple() {
alterShareAutoOffsetReset("group1", "earliest");
try (Producer<byte[], byte[]> producer = createProducer();
@@ -1285,7 +1301,14 @@ public class ShareConsumerTest {
"Consumer close should not wait for full timeout when broker is
already shut down");
}
- @ClusterTest
+ @ClusterTests({
+ @ClusterTest(serverProperties = {
+ @ClusterConfigProperty(key =
GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, value =
"0")
+ }),
+ @ClusterTest(serverProperties = {
+ @ClusterConfigProperty(key =
GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, value =
"1000")
+ })
+ })
public void testMultipleConsumersInGroupSequentialConsumption() {
alterShareAutoOffsetReset("group1", "earliest");
try (Producer<byte[], byte[]> producer = createProducer();
@@ -1321,7 +1344,14 @@ public class ShareConsumerTest {
}
}
- @ClusterTest
+ @ClusterTests({
+ @ClusterTest(serverProperties = {
+ @ClusterConfigProperty(key =
GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, value =
"0")
+ }),
+ @ClusterTest(serverProperties = {
+ @ClusterConfigProperty(key =
GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, value =
"1000")
+ })
+ })
public void testMultipleConsumersInGroupConcurrentConsumption()
throws InterruptedException, ExecutionException, TimeoutException {
AtomicInteger totalMessagesConsumed = new AtomicInteger(0);
@@ -1355,7 +1385,14 @@ public class ShareConsumerTest {
assertEquals(producerCount * messagesPerProducer, totalResult);
}
- @ClusterTest
+ @ClusterTests({
+ @ClusterTest(serverProperties = {
+ @ClusterConfigProperty(key =
GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, value =
"0")
+ }),
+ @ClusterTest(serverProperties = {
+ @ClusterConfigProperty(key =
GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, value =
"1000")
+ })
+ })
public void testMultipleConsumersInMultipleGroupsConcurrentConsumption()
throws ExecutionException, InterruptedException, TimeoutException {
AtomicInteger totalMessagesConsumedGroup1 = new AtomicInteger(0);
@@ -1468,7 +1505,14 @@ public class ShareConsumerTest {
}
}
- @ClusterTest
+ @ClusterTests({
+ @ClusterTest(serverProperties = {
+ @ClusterConfigProperty(key =
GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, value =
"0")
+ }),
+ @ClusterTest(serverProperties = {
+ @ClusterConfigProperty(key =
GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, value =
"1000")
+ })
+ })
public void testMultipleConsumersInGroupFailureConcurrentConsumption()
throws InterruptedException, ExecutionException, TimeoutException {
AtomicInteger totalMessagesConsumed = new AtomicInteger(0);
@@ -1781,7 +1825,14 @@ public class ShareConsumerTest {
}
}
- @ClusterTest
+ @ClusterTests({
+ @ClusterTest(serverProperties = {
+ @ClusterConfigProperty(key =
GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, value =
"0")
+ }),
+ @ClusterTest(serverProperties = {
+ @ClusterConfigProperty(key =
GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, value =
"1000")
+ })
+ })
public void testSubscriptionFollowedByTopicCreation() throws
InterruptedException {
alterShareAutoOffsetReset("group1", "earliest");
try (Producer<byte[], byte[]> producer = createProducer();
@@ -1810,7 +1861,14 @@ public class ShareConsumerTest {
}
}
- @ClusterTest
+ @ClusterTests({
+ @ClusterTest(serverProperties = {
+ @ClusterConfigProperty(key =
GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, value =
"0")
+ }),
+ @ClusterTest(serverProperties = {
+ @ClusterConfigProperty(key =
GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, value =
"1000")
+ })
+ })
public void testSubscriptionAndPollFollowedByTopicDeletion() throws
InterruptedException, ExecutionException {
String topic1 = "bar";
String topic2 = "baz";
diff --git
a/core/src/test/scala/integration/kafka/api/AbstractAuthorizerIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/AbstractAuthorizerIntegrationTest.scala
index 1ba21895c10..8a4c0880d33 100644
---
a/core/src/test/scala/integration/kafka/api/AbstractAuthorizerIntegrationTest.scala
+++
b/core/src/test/scala/integration/kafka/api/AbstractAuthorizerIntegrationTest.scala
@@ -117,6 +117,7 @@ class AbstractAuthorizerIntegrationTest extends
BaseRequestTest {
properties.put(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG,
"1")
properties.put(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG,
"0")
properties.put(GroupCoordinatorConfig.CONSUMER_GROUP_REGEX_REFRESH_INTERVAL_MS_CONFIG,
"10000")
+
properties.put(GroupCoordinatorConfig.SHARE_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG,
"0")
properties.put(TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG,
"1")
properties.put(TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG,
"1")
properties.put(TransactionLogConfig.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG, "1")
diff --git
a/core/src/test/scala/unit/kafka/server/ShareGroupDescribeRequestTest.scala
b/core/src/test/scala/unit/kafka/server/ShareGroupDescribeRequestTest.scala
index 16278cc627e..1e662110168 100644
--- a/core/src/test/scala/unit/kafka/server/ShareGroupDescribeRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ShareGroupDescribeRequestTest.scala
@@ -38,6 +38,7 @@ import scala.jdk.CollectionConverters._
@Timeout(120)
@ClusterTestDefaults(types = Array(Type.KRAFT), brokers = 1, serverProperties
= Array(
+ new ClusterConfigProperty(key =
GroupCoordinatorConfig.SHARE_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, value = "0"),
new ClusterConfigProperty(key =
ShareGroupConfig.SHARE_GROUP_PERSISTER_CLASS_NAME_CONFIG, value = "")
))
class ShareGroupDescribeRequestTest(cluster: ClusterInstance) extends
GroupCoordinatorBaseRequestTest(cluster) {
diff --git
a/core/src/test/scala/unit/kafka/server/ShareGroupHeartbeatRequestTest.scala
b/core/src/test/scala/unit/kafka/server/ShareGroupHeartbeatRequestTest.scala
index 11bf42b3f6c..301205e2209 100644
--- a/core/src/test/scala/unit/kafka/server/ShareGroupHeartbeatRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ShareGroupHeartbeatRequestTest.scala
@@ -24,6 +24,7 @@ import
org.apache.kafka.common.message.{ShareGroupHeartbeatRequestData, ShareGro
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.{ShareGroupHeartbeatRequest,
ShareGroupHeartbeatResponse}
import org.apache.kafka.common.test.ClusterInstance
+import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
import org.apache.kafka.server.common.Feature
import org.apache.kafka.server.IntegrationTestUtils;
import org.junit.jupiter.api.Assertions.{assertEquals, assertNotEquals,
assertNotNull, assertNull, assertTrue}
@@ -33,12 +34,25 @@ import org.junit.jupiter.api.Timeout
import java.util
import scala.jdk.CollectionConverters._
+object ShareGroupHeartbeatRequestTest {
+ @ClusterTestDefaults(types = Array(Type.KRAFT), brokers = 1,
serverProperties = Array(
+ new ClusterConfigProperty(key =
GroupCoordinatorConfig.SHARE_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, value = "0"),
+ new ClusterConfigProperty(key = "group.share.persister.class.name", value
= "")
+ ))
+ class WithAssignmentBatchingDisabledTest(cluster: ClusterInstance) extends
ShareGroupHeartbeatRequestTest(cluster) {
+ }
+}
+
@Timeout(120)
@ClusterTestDefaults(types = Array(Type.KRAFT), brokers = 1, serverProperties
= Array(
new ClusterConfigProperty(key = "group.share.persister.class.name", value =
"")
))
class ShareGroupHeartbeatRequestTest(cluster: ClusterInstance) {
+ protected def isShareAssignmentBatchingEnabled: Boolean = {
+ cluster.brokers.values.stream.allMatch(b =>
b.config.groupCoordinatorConfig.shareGroupAssignmentIntervalMs > 0)
+ }
+
@ClusterTest(
features = Array(
new ClusterFeature(feature = Feature.SHARE_VERSION, version = 0)
@@ -88,10 +102,11 @@ class ShareGroupHeartbeatRequestTest(cluster:
ClusterInstance) {
shareGroupHeartbeatResponse =
connectAndReceive(shareGroupHeartbeatRequest)
shareGroupHeartbeatResponse.data.errorCode == Errors.NONE.code
}, msg = s"Could not join the group successfully. Last response
$shareGroupHeartbeatResponse.")
+ var expectedMemberEpoch = 2
// Verify the response.
assertNotNull(shareGroupHeartbeatResponse.data.memberId)
- assertEquals(2, shareGroupHeartbeatResponse.data.memberEpoch)
+ assertEquals(expectedMemberEpoch,
shareGroupHeartbeatResponse.data.memberEpoch)
assertEquals(new ShareGroupHeartbeatResponseData.Assignment(),
shareGroupHeartbeatResponse.data.assignment)
// Create the topic.
@@ -122,9 +137,18 @@ class ShareGroupHeartbeatRequestTest(cluster:
ClusterInstance) {
shareGroupHeartbeatResponse.data.errorCode == Errors.NONE.code &&
shareGroupHeartbeatResponse.data.assignment == expectedAssignment
}, msg = s"Could not get partitions assigned. Last response
$shareGroupHeartbeatResponse.")
+ if (isShareAssignmentBatchingEnabled) {
+ // The group epoch is bumped by the metadata update, but not share
partition initialization
+ // due to the pending delayed assignment.
+ expectedMemberEpoch += 1
+ } else {
+ // The group epoch is bumped by the metadata update and share
partition initialization.
+ // Assignments are computed for both epoch bumps.
+ expectedMemberEpoch += 2
+ }
// Verify the response.
- assertEquals(4, shareGroupHeartbeatResponse.data.memberEpoch)
+ assertEquals(expectedMemberEpoch,
shareGroupHeartbeatResponse.data.memberEpoch)
assertEquals(expectedAssignment,
shareGroupHeartbeatResponse.data.assignment)
// Leave the group.
@@ -178,11 +202,13 @@ class ShareGroupHeartbeatRequestTest(cluster:
ClusterInstance) {
shareGroupHeartbeatResponse =
connectAndReceive(shareGroupHeartbeatRequest)
shareGroupHeartbeatResponse.data.errorCode == Errors.NONE.code
}, msg = s"Could not join the group successfully. Last response
$shareGroupHeartbeatResponse.")
+ var expectedMemberEpoch = 2
// Verify the response for member 1.
val memberId1 = shareGroupHeartbeatResponse.data.memberId
+ var memberEpoch1 = shareGroupHeartbeatResponse.data.memberEpoch
assertNotNull(memberId1)
- assertEquals(2, shareGroupHeartbeatResponse.data.memberEpoch)
+ assertEquals(expectedMemberEpoch, memberEpoch1)
assertEquals(new ShareGroupHeartbeatResponseData.Assignment(),
shareGroupHeartbeatResponse.data.assignment)
// The second member request to join the group.
@@ -197,13 +223,16 @@ class ShareGroupHeartbeatRequestTest(cluster:
ClusterInstance) {
// Send the second member request until receiving a successful response.
TestUtils.waitUntilTrue(() => {
shareGroupHeartbeatResponse =
connectAndReceive(shareGroupHeartbeatRequest)
- shareGroupHeartbeatResponse.data.errorCode == Errors.NONE.code
+ shareGroupHeartbeatResponse.data.errorCode == Errors.NONE.code &&
+ (!isShareAssignmentBatchingEnabled ||
shareGroupHeartbeatResponse.data.memberEpoch > expectedMemberEpoch)
}, msg = s"Could not join the group successfully. Last response
$shareGroupHeartbeatResponse.")
+ expectedMemberEpoch += 1
// Verify the response for member 2.
val memberId2 = shareGroupHeartbeatResponse.data.memberId
+ val memberEpoch2 = shareGroupHeartbeatResponse.data.memberEpoch
assertNotNull(memberId2)
- assertEquals(3, shareGroupHeartbeatResponse.data.memberEpoch)
+ assertEquals(expectedMemberEpoch, memberEpoch2)
assertEquals(new ShareGroupHeartbeatResponseData.Assignment(),
shareGroupHeartbeatResponse.data.assignment)
// Verify the member id is different.
assertNotEquals(memberId1, memberId2)
@@ -220,7 +249,7 @@ class ShareGroupHeartbeatRequestTest(cluster:
ClusterInstance) {
new ShareGroupHeartbeatRequestData()
.setGroupId("grp")
.setMemberId(memberId1)
- .setMemberEpoch(2)
+ .setMemberEpoch(memberEpoch1)
).build()
// Heartbeats until the partitions are assigned for member 1.
@@ -240,17 +269,27 @@ class ShareGroupHeartbeatRequestTest(cluster:
ClusterInstance) {
false
}
}, msg = s"Could not get partitions assigned. Last response
$shareGroupHeartbeatResponse.")
+ // Verify the response.
+ if (isShareAssignmentBatchingEnabled) {
+ // The group epoch is bumped by the metadata update, but not share
partition initialization
+ // due to the pending delayed assignment.
+ expectedMemberEpoch += 1
+ } else {
+ // The group epoch is bumped by the metadata update and share
partition initialization.
+ // Assignments are computed for both epoch bumps.
+ expectedMemberEpoch += 2
+ }
val topicPartitionsAssignedToMember1 =
shareGroupHeartbeatResponse.data.assignment.topicPartitions()
- // Verify the response.
- assertEquals(5, shareGroupHeartbeatResponse.data.memberEpoch)
+ memberEpoch1 = shareGroupHeartbeatResponse.data.memberEpoch
+ assertEquals(expectedMemberEpoch, memberEpoch1)
// Prepare the next heartbeat for member 2.
shareGroupHeartbeatRequest = new ShareGroupHeartbeatRequest.Builder(
new ShareGroupHeartbeatRequestData()
.setGroupId("grp")
.setMemberId(memberId2)
- .setMemberEpoch(3)
+ .setMemberEpoch(memberEpoch2)
).build()
// Heartbeats until the partitions are assigned for member 2.
@@ -262,7 +301,7 @@ class ShareGroupHeartbeatRequestTest(cluster:
ClusterInstance) {
val topicPartitionsAssignedToMember2 =
shareGroupHeartbeatResponse.data.assignment.topicPartitions()
// Verify the response.
- assertEquals(5, shareGroupHeartbeatResponse.data.memberEpoch)
+ assertEquals(expectedMemberEpoch,
shareGroupHeartbeatResponse.data.memberEpoch)
val partitionsAssigned: util.Set[Integer] = new util.HashSet[Integer]()
topicPartitionsAssignedToMember1.forEach(topicPartition => {
@@ -275,12 +314,12 @@ class ShareGroupHeartbeatRequestTest(cluster:
ClusterInstance) {
assertEquals(util.Set.of(0, 1, 2), partitionsAssigned)
// Verify the assignments are not changed for member 1.
- // Prepare another heartbeat for member 1 with latest received epoch 3
for member 1.
+ // Prepare another heartbeat for member 1 with latest received epoch for
member 1.
shareGroupHeartbeatRequest = new ShareGroupHeartbeatRequest.Builder(
new ShareGroupHeartbeatRequestData()
.setGroupId("grp")
.setMemberId(memberId1)
- .setMemberEpoch(4)
+ .setMemberEpoch(memberEpoch1)
).build()
// Heartbeats until the response for no change of assignment occurs for
member 1 with same epoch.
@@ -292,7 +331,7 @@ class ShareGroupHeartbeatRequestTest(cluster:
ClusterInstance) {
}, msg = s"Could not get partitions assigned. Last response
$shareGroupHeartbeatResponse.")
// Verify the response.
- assertEquals(5, shareGroupHeartbeatResponse.data.memberEpoch)
+ assertEquals(expectedMemberEpoch,
shareGroupHeartbeatResponse.data.memberEpoch)
} finally {
admin.close()
}
@@ -332,11 +371,13 @@ class ShareGroupHeartbeatRequestTest(cluster:
ClusterInstance) {
shareGroupHeartbeatResponse =
connectAndReceive(shareGroupHeartbeatRequest)
shareGroupHeartbeatResponse.data.errorCode == Errors.NONE.code
}, msg = s"Could not join the group successfully. Last response
$shareGroupHeartbeatResponse.")
+ var expectedMemberEpoch = 2
// Verify the response for member.
val memberId = shareGroupHeartbeatResponse.data.memberId
+ var memberEpoch = shareGroupHeartbeatResponse.data.memberEpoch
assertNotNull(memberId)
- assertEquals(2, shareGroupHeartbeatResponse.data.memberEpoch)
+ assertEquals(expectedMemberEpoch, memberEpoch)
assertEquals(new ShareGroupHeartbeatResponseData.Assignment(),
shareGroupHeartbeatResponse.data.assignment)
// Create the topic.
@@ -357,7 +398,7 @@ class ShareGroupHeartbeatRequestTest(cluster:
ClusterInstance) {
new ShareGroupHeartbeatRequestData()
.setGroupId("grp")
.setMemberId(memberId)
- .setMemberEpoch(2)
+ .setMemberEpoch(memberEpoch)
).build()
TestUtils.waitUntilTrue(() => {
@@ -365,9 +406,18 @@ class ShareGroupHeartbeatRequestTest(cluster:
ClusterInstance) {
shareGroupHeartbeatResponse.data.errorCode == Errors.NONE.code &&
shareGroupHeartbeatResponse.data.assignment == expectedAssignment
}, msg = s"Could not get partitions assigned. Last response
$shareGroupHeartbeatResponse.")
+ if (isShareAssignmentBatchingEnabled) {
+ // The group epoch is bumped by the metadata update, but not share
partition initialization
+ // due to the pending delayed assignment.
+ expectedMemberEpoch += 1
+ } else {
+ // The group epoch is bumped by the metadata update and share
partition initialization.
+ // Assignments are computed for both epoch bumps.
+ expectedMemberEpoch += 2
+ }
// Verify the response.
- assertEquals(4, shareGroupHeartbeatResponse.data.memberEpoch)
+ assertEquals(expectedMemberEpoch,
shareGroupHeartbeatResponse.data.memberEpoch)
// Member leaves the group.
shareGroupHeartbeatRequest = new ShareGroupHeartbeatRequest.Builder(
@@ -382,6 +432,7 @@ class ShareGroupHeartbeatRequestTest(cluster:
ClusterInstance) {
shareGroupHeartbeatResponse =
connectAndReceive(shareGroupHeartbeatRequest)
shareGroupHeartbeatResponse.data.errorCode == Errors.NONE.code
}, msg = s"Could not leave the group successfully. Last response
$shareGroupHeartbeatResponse.")
+ expectedMemberEpoch += 1
// Verify the response for member.
assertEquals(-1, shareGroupHeartbeatResponse.data.memberEpoch)
@@ -395,10 +446,15 @@ class ShareGroupHeartbeatRequestTest(cluster:
ClusterInstance) {
.setSubscribedTopicNames(List("foo").asJava)
).build()
- shareGroupHeartbeatResponse =
connectAndReceive(shareGroupHeartbeatRequest)
+ TestUtils.waitUntilTrue(() => {
+ shareGroupHeartbeatResponse =
connectAndReceive(shareGroupHeartbeatRequest)
+ (!isShareAssignmentBatchingEnabled ||
shareGroupHeartbeatResponse.data.memberEpoch > expectedMemberEpoch)
+ }, msg = s"Could not rejoin the group successfully. Last response
$shareGroupHeartbeatResponse.")
+ expectedMemberEpoch += 1
// Verify the response for member 1.
- assertEquals(6, shareGroupHeartbeatResponse.data.memberEpoch)
+ memberEpoch = shareGroupHeartbeatResponse.data.memberEpoch
+ assertEquals(expectedMemberEpoch, memberEpoch)
assertEquals(memberId, shareGroupHeartbeatResponse.data.memberId)
// Partition assignment remains intact on rejoining.
assertEquals(expectedAssignment,
shareGroupHeartbeatResponse.data.assignment)
@@ -438,10 +494,13 @@ class ShareGroupHeartbeatRequestTest(cluster:
ClusterInstance) {
shareGroupHeartbeatResponse =
connectAndReceive(shareGroupHeartbeatRequest)
shareGroupHeartbeatResponse.data.errorCode == Errors.NONE.code
}, msg = s"Could not join the group successfully. Last response
$shareGroupHeartbeatResponse.")
+ var expectedMemberEpoch = 2
+
// Verify the response for member.
val memberId = shareGroupHeartbeatResponse.data.memberId
+ var memberEpoch = shareGroupHeartbeatResponse.data.memberEpoch
assertNotNull(memberId)
- assertEquals(2, shareGroupHeartbeatResponse.data.memberEpoch)
+ assertEquals(expectedMemberEpoch, memberEpoch)
assertEquals(new ShareGroupHeartbeatResponseData.Assignment(),
shareGroupHeartbeatResponse.data.assignment)
// Create the topic foo.
val fooTopicId = TestUtils.createTopicWithAdminRaw(
@@ -469,7 +528,7 @@ class ShareGroupHeartbeatRequestTest(cluster:
ClusterInstance) {
new ShareGroupHeartbeatRequestData()
.setGroupId("grp")
.setMemberId(memberId)
- .setMemberEpoch(2)
+ .setMemberEpoch(memberEpoch)
).build()
cluster.waitTopicCreation("foo", 2)
@@ -482,8 +541,19 @@ class ShareGroupHeartbeatRequestTest(cluster:
ClusterInstance) {
expectedAssignment.topicPartitions.containsAll(shareGroupHeartbeatResponse.data.assignment.topicPartitions)
&&
shareGroupHeartbeatResponse.data.assignment.topicPartitions.containsAll(expectedAssignment.topicPartitions)
}, msg = s"Could not get partitions for topic foo and bar assigned. Last
response $shareGroupHeartbeatResponse.")
+ if (isShareAssignmentBatchingEnabled) {
+ // The group epoch is bumped by the metadata update, but not share
partition initialization
+ // due to the pending delayed assignment.
+ expectedMemberEpoch += 1
+ } else {
+ // The group epoch is bumped by the metadata update and share
partition initialization.
+ // Assignments are computed for both epoch bumps.
+ expectedMemberEpoch += 2
+ }
+
// Verify the response.
- assertEquals(4, shareGroupHeartbeatResponse.data.memberEpoch)
+ memberEpoch = shareGroupHeartbeatResponse.data.memberEpoch
+ assertEquals(expectedMemberEpoch, memberEpoch)
// Create the topic baz.
val bazTopicId = TestUtils.createTopicWithAdminRaw(
admin = admin,
@@ -507,7 +577,7 @@ class ShareGroupHeartbeatRequestTest(cluster:
ClusterInstance) {
new ShareGroupHeartbeatRequestData()
.setGroupId("grp")
.setMemberId(memberId)
- .setMemberEpoch(4)
+ .setMemberEpoch(memberEpoch)
).build()
TestUtils.waitUntilTrue(() => {
@@ -517,8 +587,19 @@ class ShareGroupHeartbeatRequestTest(cluster:
ClusterInstance) {
expectedAssignment.topicPartitions.containsAll(shareGroupHeartbeatResponse.data.assignment.topicPartitions)
&&
shareGroupHeartbeatResponse.data.assignment.topicPartitions.containsAll(expectedAssignment.topicPartitions)
}, msg = s"Could not get partitions for topic baz assigned. Last
response $shareGroupHeartbeatResponse.")
+ if (isShareAssignmentBatchingEnabled) {
+ // The group epoch is bumped by the metadata update, but not share
partition initialization
+ // due to the pending delayed assignment.
+ expectedMemberEpoch += 1
+ } else {
+ // The group epoch is bumped by the metadata update and share
partition initialization.
+ // Assignments are computed for both epoch bumps.
+ expectedMemberEpoch += 2
+ }
+
// Verify the response.
- assertEquals(6, shareGroupHeartbeatResponse.data.memberEpoch)
+ memberEpoch = shareGroupHeartbeatResponse.data.memberEpoch
+ assertEquals(expectedMemberEpoch, memberEpoch)
// Increasing the partitions of topic bar which is already being
consumed in the share group.
increasePartitions(admin, "bar", 6)
@@ -538,7 +619,7 @@ class ShareGroupHeartbeatRequestTest(cluster:
ClusterInstance) {
new ShareGroupHeartbeatRequestData()
.setGroupId("grp")
.setMemberId(memberId)
- .setMemberEpoch(6)
+ .setMemberEpoch(memberEpoch)
).build()
TestUtils.waitUntilTrue(() => {
@@ -548,8 +629,19 @@ class ShareGroupHeartbeatRequestTest(cluster:
ClusterInstance) {
expectedAssignment.topicPartitions.containsAll(shareGroupHeartbeatResponse.data.assignment.topicPartitions)
&&
shareGroupHeartbeatResponse.data.assignment.topicPartitions.containsAll(expectedAssignment.topicPartitions)
}, msg = s"Could not update partitions assignment for topic bar. Last
response $shareGroupHeartbeatResponse.")
+ if (isShareAssignmentBatchingEnabled) {
+ // The group epoch is bumped by the metadata update, but not share
partition initialization
+ // due to the pending delayed assignment.
+ expectedMemberEpoch += 1
+ } else {
+ // The group epoch is bumped by the metadata update and share
partition initialization.
+ // Assignments are computed for both epoch bumps.
+ expectedMemberEpoch += 2
+ }
+
// Verify the response.
- assertEquals(8, shareGroupHeartbeatResponse.data.memberEpoch)
+ memberEpoch = shareGroupHeartbeatResponse.data.memberEpoch
+ assertEquals(expectedMemberEpoch, memberEpoch)
// Delete the topic foo.
TestUtils.deleteTopicWithAdmin(
admin = admin,
@@ -571,7 +663,7 @@ class ShareGroupHeartbeatRequestTest(cluster:
ClusterInstance) {
new ShareGroupHeartbeatRequestData()
.setGroupId("grp")
.setMemberId(memberId)
- .setMemberEpoch(8)
+ .setMemberEpoch(memberEpoch)
).build()
TestUtils.waitUntilTrue(() => {
@@ -581,8 +673,29 @@ class ShareGroupHeartbeatRequestTest(cluster:
ClusterInstance) {
expectedAssignment.topicPartitions.containsAll(shareGroupHeartbeatResponse.data.assignment.topicPartitions)
&&
shareGroupHeartbeatResponse.data.assignment.topicPartitions.containsAll(expectedAssignment.topicPartitions)
}, msg = s"Could not update partitions assignment for topic foo. Last
response $shareGroupHeartbeatResponse.")
- // Verify the response.
- assertEquals(9, shareGroupHeartbeatResponse.data.memberEpoch)
+
+ if (isShareAssignmentBatchingEnabled) {
+ // Verify the response. Reconciliation filters out the assignment for
deleted topic foo before the next assignment.
+ memberEpoch = shareGroupHeartbeatResponse.data.memberEpoch
+ assertEquals(expectedMemberEpoch, memberEpoch)
+
+ TestUtils.waitUntilTrue(() => {
+ shareGroupHeartbeatResponse =
connectAndReceive(shareGroupHeartbeatRequest)
+ shareGroupHeartbeatResponse.data.errorCode == Errors.NONE.code &&
+ shareGroupHeartbeatResponse.data.memberEpoch >
shareGroupHeartbeatRequest.data.memberEpoch
+ }, msg = s"Could not update partitions assignment for topic foo. Last
response $shareGroupHeartbeatResponse.")
+ expectedMemberEpoch += 1
+
+ // Verify the response.
+ memberEpoch = shareGroupHeartbeatResponse.data.memberEpoch
+ assertEquals(expectedMemberEpoch, memberEpoch)
+ } else {
+ expectedMemberEpoch += 1
+
+ // Verify the response.
+ memberEpoch = shareGroupHeartbeatResponse.data.memberEpoch
+ assertEquals(expectedMemberEpoch, memberEpoch)
+ }
} finally {
admin.close()
}
@@ -872,10 +985,13 @@ class ShareGroupHeartbeatRequestTest(cluster:
ClusterInstance) {
shareGroupHeartbeatResponse =
connectAndReceive(shareGroupHeartbeatRequest)
shareGroupHeartbeatResponse.data.errorCode == Errors.NONE.code
}, msg = s"Could not join the group successfully. Last response
$shareGroupHeartbeatResponse.")
+ var expectedMemberEpoch = 2
+
// Verify the response for member.
val memberId = shareGroupHeartbeatResponse.data.memberId
+ var memberEpoch = shareGroupHeartbeatResponse.data.memberEpoch
assertNotNull(memberId)
- assertEquals(2, shareGroupHeartbeatResponse.data.memberEpoch)
+ assertEquals(expectedMemberEpoch, memberEpoch)
assertEquals(new ShareGroupHeartbeatResponseData.Assignment(),
shareGroupHeartbeatResponse.data.assignment)
// Create the topic.
val fooId = TestUtils.createTopicWithAdminRaw(
@@ -893,7 +1009,7 @@ class ShareGroupHeartbeatRequestTest(cluster:
ClusterInstance) {
new ShareGroupHeartbeatRequestData()
.setGroupId("grp")
.setMemberId(memberId)
- .setMemberEpoch(2)
+ .setMemberEpoch(memberEpoch)
).build()
TestUtils.waitUntilTrue(() => {
@@ -901,8 +1017,19 @@ class ShareGroupHeartbeatRequestTest(cluster:
ClusterInstance) {
shareGroupHeartbeatResponse.data.errorCode == Errors.NONE.code &&
shareGroupHeartbeatResponse.data.assignment == expectedAssignment
}, msg = s"Could not get partitions assigned. Last response
$shareGroupHeartbeatResponse.")
+ if (isShareAssignmentBatchingEnabled) {
+ // The group epoch is bumped by the metadata update, but not share
partition initialization
+ // due to the pending delayed assignment.
+ expectedMemberEpoch += 1
+ } else {
+ // The group epoch is bumped by the metadata update and share
partition initialization.
+ // Assignments are computed for both epoch bumps.
+ expectedMemberEpoch += 2
+ }
+
// Verify the response.
- assertEquals(4, shareGroupHeartbeatResponse.data.memberEpoch)
+ memberEpoch = shareGroupHeartbeatResponse.data.memberEpoch
+ assertEquals(expectedMemberEpoch, memberEpoch)
// Restart the only running broker.
val broker = cluster.brokers().values().iterator().next()
@@ -914,7 +1041,7 @@ class ShareGroupHeartbeatRequestTest(cluster:
ClusterInstance) {
new ShareGroupHeartbeatRequestData()
.setGroupId("grp")
.setMemberId(memberId)
- .setMemberEpoch(3)
+ .setMemberEpoch(memberEpoch)
).build()
// Should receive no error and no assignment changes.
@@ -925,7 +1052,8 @@ class ShareGroupHeartbeatRequestTest(cluster:
ClusterInstance) {
// Verify the response. Epoch should not have changed and null
assignments determines that no
// change in old assignment.
- assertEquals(4, shareGroupHeartbeatResponse.data.memberEpoch)
+ memberEpoch = shareGroupHeartbeatResponse.data.memberEpoch
+ assertEquals(expectedMemberEpoch, memberEpoch)
assertNull(shareGroupHeartbeatResponse.data.assignment)
} finally {
admin.close()
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index 0fb6d3ba117..386a92c785c 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -2729,9 +2729,15 @@ public class GroupMetadataManager {
member,
updatedMember,
records
- ) || initializedAssignmentPending(group);
+ );
int groupEpoch = group.groupEpoch();
+ if (group.assignmentEpoch() >= groupEpoch) {
+ // A new assignment is needed if no assignment is pending and
there are unassigned
+ // initialized partitions.
+ bumpGroupEpoch |= initializedAssignmentPending(group);
+ }
+
Map<String, SubscriptionCount> subscribedTopicNamesMap =
group.subscribedTopicNames();
SubscriptionType subscriptionType = group.subscriptionType();
@@ -3928,6 +3934,15 @@ public class GroupMetadataManager {
return
UpdateTargetAssignmentResult.fromLastTargetAssignment(group, updatedMember);
}
+ boolean canComputeNextTargetAssignment =
canComputeNextTargetAssignment(
+ group.assignmentTimestamp(),
+ shareGroupAssignmentIntervalMs(group.groupId()),
+ time.milliseconds()
+ );
+ if (!canComputeNextTargetAssignment) {
+ return
UpdateTargetAssignmentResult.fromLastTargetAssignment(group, updatedMember);
+ }
+
try {
Map<Uuid, Set<Integer>> initializedTopicPartitions =
shareGroupStatePartitionMetadata.containsKey(group.groupId()) ?
stripInitValue(shareGroupStatePartitionMetadata.get(group.groupId()).initializedTopics())
:
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index 3f2c06b2315..2ee9cd4cd73 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -25640,6 +25640,84 @@ public class GroupMetadataManagerTest {
verify(context.metrics,
times(2)).record(SHARE_GROUP_REBALANCES_SENSOR_NAME);
}
+ @Test
+ public void
testShareGroupHeartbeatDoesNotBumpGroupEpochDuringAssignmentDelay() {
+ Uuid t1Uuid = Uuid.randomUuid();
+ String t1Name = "t1";
+ CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
+ .addTopic(t1Uuid, t1Name, 2)
+ .buildCoordinatorMetadataImage();
+
+ String groupId = "share-group";
+ String memberId = Uuid.randomUuid().toString();
+
+ ShareGroupMember member = new ShareGroupMember.Builder(memberId)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(2)
+ .setPreviousMemberEpoch(0)
+ .setClientId(DEFAULT_CLIENT_ID)
+ .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+ .setSubscribedTopicNames(List.of(t1Name))
+ .setAssignedPartitions(Map.of())
+ .build();
+
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .withMetadataImage(metadataImage)
+ .withShareGroup(new ShareGroupBuilder(groupId, 2)
+ .withMember(member)
+ .withAssignment(memberId, mkAssignment())
+ .withAssignmentEpoch(2)
+ // Suppress assignments.
+ .withAssignmentTimestamp(Integer.MAX_VALUE)
+ .withMetadataHash(computeGroupHash(Map.of(
+ t1Name, computeTopicHash(t1Name, metadataImage)
+ ))))
+ .build();
+
+ // t1-0 and t1-1 are initialized and not yet assigned.
+ context.groupMetadataManager.replay(
+ new ShareGroupStatePartitionMetadataKey()
+ .setGroupId(groupId),
+ new ShareGroupStatePartitionMetadataValue()
+ .setInitializingTopics(List.of())
+ .setInitializedTopics(List.of(
+ new
ShareGroupStatePartitionMetadataValue.TopicPartitionsInfo()
+ .setTopicId(t1Uuid)
+ .setTopicName(t1Name)
+ .setPartitions(List.of(0, 1))
+ ))
+ .setDeletingTopics(List.of())
+ );
+
+ // Group epoch is bumped on the next heartbeat.
+ CoordinatorResult<Map.Entry<ShareGroupHeartbeatResponseData,
Optional<InitializeShareGroupStateParameters>>, CoordinatorRecord> result1 =
context.shareGroupHeartbeat(
+ new ShareGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId)
+ .setMemberEpoch(2));
+
+ assertEquals(
+ List.of(
+
GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupId, 3,
computeGroupHash(Map.of(
+ t1Name, computeTopicHash(t1Name, metadataImage)
+ )))
+ ),
+ result1.records()
+ );
+
+ // Group epoch is not bumped again.
+ CoordinatorResult<Map.Entry<ShareGroupHeartbeatResponseData,
Optional<InitializeShareGroupStateParameters>>, CoordinatorRecord> result2 =
context.shareGroupHeartbeat(
+ new ShareGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId)
+ .setMemberEpoch(2));
+
+ assertEquals(
+ List.of(),
+ result2.records()
+ );
+ }
+
@Test
public void testShareGroupHeartbeatPersisterRequestWithInitializing() {
MockPartitionAssignor assignor = new MockPartitionAssignor("range");
@@ -26986,6 +27064,180 @@ public class GroupMetadataManagerTest {
);
}
+ @Test
+ public void testShareGroupAssignmentInterval() {
+ String groupId = "fooup";
+ String memberId1 = Uuid.randomUuid().toString();
+ String memberId2 = Uuid.randomUuid().toString();
+
+ Uuid fooTopicId = Uuid.randomUuid();
+ String fooTopicName = "foo";
+
+ MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+
+ CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 6)
+ .buildCoordinatorMetadataImage();
+
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+
.withConfig(GroupCoordinatorConfig.SHARE_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG,
5000)
+ .withShareGroupAssignor(assignor)
+ .withMetadataImage(metadataImage)
+ .build();
+
+ // Member 1 joins the group and gets an assignment immediately.
+ assignor.prepareGroupAssignment(new GroupAssignment(Map.of(
+ memberId1, new MemberAssignmentImpl(mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)
+ ))
+ )));
+ CoordinatorResult<Map.Entry<ShareGroupHeartbeatResponseData,
Optional<InitializeShareGroupStateParameters>>, CoordinatorRecord> result1 =
context.shareGroupHeartbeat(
+ new ShareGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId1)
+ .setMemberEpoch(0)
+ .setSubscribedTopicNames(List.of(fooTopicName)));
+
+ assertResponseEquals(
+ new ShareGroupHeartbeatResponseData()
+ .setMemberId(memberId1)
+ .setMemberEpoch(2)
+ .setHeartbeatIntervalMs(5000)
+ .setAssignment(new ShareGroupHeartbeatResponseData.Assignment()
+ .setTopicPartitions(List.of(
+ new ShareGroupHeartbeatResponseData.TopicPartitions()
+ .setTopicId(fooTopicId)
+ .setPartitions(List.of(0, 1, 2, 3, 4, 5))
+ ))),
+ result1.response().getKey()
+ );
+
+ ShareGroupMember expectedMember1 = new
ShareGroupMember.Builder(memberId1)
+ .setMemberEpoch(2)
+ .setPreviousMemberEpoch(0)
+ .setClientId(DEFAULT_CLIENT_ID)
+ .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+ .setSubscribedTopicNames(List.of(fooTopicName))
+ .setAssignedPartitions(mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)))
+ .build();
+
+ assertRecordsEquals(
+ List.of(
+
GroupCoordinatorRecordHelpers.newShareGroupMemberSubscriptionRecord(groupId,
expectedMember1),
+
GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupId, 2,
computeGroupHash(Map.of(
+ fooTopicName, computeTopicHash(fooTopicName, metadataImage)
+ ))),
+
GroupCoordinatorRecordHelpers.newShareGroupTargetAssignmentRecord(groupId,
memberId1, mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)
+ )),
+
GroupCoordinatorRecordHelpers.newShareGroupTargetAssignmentMetadataRecord(groupId,
2, context.time.milliseconds()),
+
GroupCoordinatorRecordHelpers.newShareGroupCurrentAssignmentRecord(groupId,
expectedMember1),
+
GroupCoordinatorRecordHelpers.newShareGroupStatePartitionMetadataRecord(groupId,
mkShareGroupStateMap(List.of(
+ mkShareGroupStateMetadataEntry(fooTopicId,
fooTopicName, List.of(0, 1, 2, 3, 4, 5))
+ )),
+ Map.of(),
+ Map.of()
+ )
+ ),
+ result1.records()
+ );
+
+ // Wait until just before the expected delay.
+ context.time.sleep(4995);
+
+ // Member 2 joins the group and gets no assignment.
+ assignor.prepareGroupAssignment(new GroupAssignment(Map.of(
+ memberId1, new MemberAssignmentImpl(mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)
+ )),
+ memberId2, new MemberAssignmentImpl(mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)
+ ))
+ )));
+ CoordinatorResult<Map.Entry<ShareGroupHeartbeatResponseData,
Optional<InitializeShareGroupStateParameters>>, CoordinatorRecord> result2 =
context.shareGroupHeartbeat(
+ new ShareGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId2)
+ .setMemberEpoch(0)
+ .setSubscribedTopicNames(List.of(fooTopicName)));
+
+ assertResponseEquals(
+ new ShareGroupHeartbeatResponseData()
+ .setMemberId(memberId2)
+ .setMemberEpoch(2)
+ .setHeartbeatIntervalMs(5000)
+ .setAssignment(new ShareGroupHeartbeatResponseData.Assignment()
+ .setTopicPartitions(List.of())),
+ result2.response().getKey()
+ );
+
+ ShareGroupMember expectedMember2 = new
ShareGroupMember.Builder(memberId2)
+ .setMemberEpoch(2)
+ .setPreviousMemberEpoch(0)
+ .setClientId(DEFAULT_CLIENT_ID)
+ .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+ .setSubscribedTopicNames(List.of(fooTopicName))
+ .setAssignedPartitions(mkAssignment())
+ .build();
+
+ assertRecordsEquals(
+ List.of(
+
GroupCoordinatorRecordHelpers.newShareGroupMemberSubscriptionRecord(groupId,
expectedMember2),
+
GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupId, 3,
computeGroupHash(Map.of(
+ fooTopicName, computeTopicHash(fooTopicName, metadataImage)
+ ))),
+
GroupCoordinatorRecordHelpers.newShareGroupCurrentAssignmentRecord(groupId,
expectedMember2)
+ ),
+ result2.records()
+ );
+
+ // Wait a little more. The next target assignment can be computed now.
+ context.time.sleep(10);
+
+ // The next target assignment is computed.
+ CoordinatorResult<Map.Entry<ShareGroupHeartbeatResponseData,
Optional<InitializeShareGroupStateParameters>>, CoordinatorRecord> result3 =
context.shareGroupHeartbeat(
+ new ShareGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId2)
+ .setMemberEpoch(2));
+
+ assertResponseEquals(
+ new ShareGroupHeartbeatResponseData()
+ .setMemberId(memberId2)
+ .setMemberEpoch(3)
+ .setHeartbeatIntervalMs(5000)
+ .setAssignment(new ShareGroupHeartbeatResponseData.Assignment()
+ .setTopicPartitions(List.of(
+ new ShareGroupHeartbeatResponseData.TopicPartitions()
+ .setTopicId(fooTopicId)
+ .setPartitions(List.of(0, 1, 2, 3, 4, 5))
+ ))),
+ result3.response().getKey()
+ );
+
+ ShareGroupMember expectedMember3 = new
ShareGroupMember.Builder(memberId2)
+ .setMemberEpoch(3)
+ .setPreviousMemberEpoch(2)
+ .setClientId(DEFAULT_CLIENT_ID)
+ .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+ .setSubscribedTopicNames(List.of(fooTopicName))
+ .setAssignedPartitions(mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)))
+ .build();
+
+ assertRecordsEquals(
+ List.of(
+
GroupCoordinatorRecordHelpers.newShareGroupTargetAssignmentRecord(groupId,
memberId2, mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)
+ )),
+
GroupCoordinatorRecordHelpers.newShareGroupTargetAssignmentMetadataRecord(groupId,
3, context.time.milliseconds()),
+
GroupCoordinatorRecordHelpers.newShareGroupCurrentAssignmentRecord(groupId,
expectedMember3)
+ ),
+ result3.records()
+ );
+ }
+
private static void checkJoinGroupResponse(
JoinGroupResponseData expectedResponse,
JoinGroupResponseData actualResponse,
diff --git a/tests/kafkatest/tests/client/share_consumer_test.py
b/tests/kafkatest/tests/client/share_consumer_test.py
index e5147894855..c95fd2cff1d 100644
--- a/tests/kafkatest/tests/client/share_consumer_test.py
+++ b/tests/kafkatest/tests/client/share_consumer_test.py
@@ -119,8 +119,9 @@ class ShareConsumerTest(VerifiableShareConsumerTest):
@cluster(num_nodes=10)
@matrix(
metadata_quorum=[quorum.isolated_kraft],
+ enable_assignment_batching=[False, True]
)
- def test_share_single_topic_partition(self,
metadata_quorum=quorum.isolated_kraft):
+ def test_share_single_topic_partition(self,
metadata_quorum=quorum.isolated_kraft, enable_assignment_batching=True):
total_messages = 100000
producer = self.setup_producer(self.TOPIC1["name"],
max_messages=total_messages)
@@ -150,8 +151,9 @@ class ShareConsumerTest(VerifiableShareConsumerTest):
@cluster(num_nodes=10)
@matrix(
metadata_quorum=[quorum.isolated_kraft],
+ enable_assignment_batching=[False, True]
)
- def test_share_multiple_partitions(self,
metadata_quorum=quorum.isolated_kraft):
+ def test_share_multiple_partitions(self,
metadata_quorum=quorum.isolated_kraft, enable_assignment_batching=True):
total_messages = 1000000
producer = self.setup_producer(self.TOPIC2["name"],
max_messages=total_messages, throughput=5000)
@@ -182,8 +184,9 @@ class ShareConsumerTest(VerifiableShareConsumerTest):
@matrix(
clean_shutdown=[True, False],
metadata_quorum=[quorum.isolated_kraft],
+ enable_assignment_batching=[False, True]
)
- def test_broker_rolling_bounce(self, clean_shutdown,
metadata_quorum=quorum.isolated_kraft):
+ def test_broker_rolling_bounce(self, clean_shutdown,
metadata_quorum=quorum.isolated_kraft, enable_assignment_batching=True):
producer = self.setup_producer(self.TOPIC2["name"])
consumer = self.setup_share_group(self.TOPIC2["name"])
@@ -216,8 +219,9 @@ class ShareConsumerTest(VerifiableShareConsumerTest):
clean_shutdown=[True, False],
metadata_quorum=[quorum.isolated_kraft],
num_failed_brokers=[1, 2],
+ enable_assignment_batching=[False, True]
)
- def test_broker_failure(self, clean_shutdown,
metadata_quorum=quorum.isolated_kraft, num_failed_brokers=1):
+ def test_broker_failure(self, clean_shutdown,
metadata_quorum=quorum.isolated_kraft, num_failed_brokers=1,
enable_assignment_batching=True):
producer = self.setup_producer(self.TOPIC2["name"])
consumer = self.setup_share_group(self.TOPIC2["name"])
@@ -250,8 +254,9 @@ class ShareConsumerTest(VerifiableShareConsumerTest):
clean_shutdown=[True, False],
bounce_mode=["all", "rolling"],
metadata_quorum=[quorum.isolated_kraft],
+ enable_assignment_batching=[False, True]
)
- def test_share_consumer_bounce(self, clean_shutdown, bounce_mode,
metadata_quorum=quorum.isolated_kraft):
+ def test_share_consumer_bounce(self, clean_shutdown, bounce_mode,
metadata_quorum=quorum.isolated_kraft, enable_assignment_batching=True):
"""
Verify correct share consumer behavior when the share consumers in the
group are consecutively restarted.
@@ -294,8 +299,9 @@ class ShareConsumerTest(VerifiableShareConsumerTest):
clean_shutdown=[True, False],
num_failed_consumers=[1, 2],
metadata_quorum=[quorum.isolated_kraft],
+ enable_assignment_batching=[False, True]
)
- def test_share_consumer_failure(self, clean_shutdown,
metadata_quorum=quorum.isolated_kraft, num_failed_consumers=1):
+ def test_share_consumer_failure(self, clean_shutdown,
metadata_quorum=quorum.isolated_kraft, num_failed_consumers=1,
enable_assignment_batching=True):
producer = self.setup_producer(self.TOPIC2["name"])
consumer = self.setup_share_group(self.TOPIC2["name"])