This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 422845feefc KAFKA-20291 [1/3]: Implement assignment batching for
consumer groups (#21720)
422845feefc is described below
commit 422845feefc02a2ac416d68d57aa5ab0428a6003
Author: Sean Quah <[email protected]>
AuthorDate: Wed Mar 18 16:44:50 2026 +0000
KAFKA-20291 [1/3]: Implement assignment batching for consumer groups
(#21720)
Implement target assignment batching for consumer groups.
Delay computing the next assignment for consumer groups until their
assignment interval has elapsed since their last assignment computation
finished.
We run request tests with assignment batching disabled, except for
heartbeat tests, which we run with and without assignment batching.
We run a selection of integration and system tests with and without
assignment batching.
Reviewers: Christo Lolov <[email protected]>, David Jacot
<[email protected]>
---
.../kafka/clients/consumer/ConsumerBounceTest.java | 10 +-
.../clients/consumer/ConsumerIntegrationTest.java | 34 ++-
.../PlaintextConsumerSubscriptionTest.java | 83 +++++-
.../clients/consumer/PlaintextConsumerTest.java | 84 +++++-
.../api/AbstractAuthorizerIntegrationTest.scala | 1 +
.../server/ConsumerGroupDescribeRequestTest.scala | 3 +-
.../server/ConsumerGroupHeartbeatRequestTest.scala | 175 ++++++++++++
.../server/ConsumerProtocolMigrationTest.scala | 152 ++++++----
.../unit/kafka/server/ListGroupsRequestTest.scala | 3 +-
.../coordinator/group/GroupMetadataManager.java | 42 +++
.../coordinator/group/classic/ClassicGroup.java | 2 +-
.../group/GroupMetadataManagerTest.java | 313 +++++++++++++++++++++
tests/kafkatest/services/kafka/config_property.py | 4 +
tests/kafkatest/services/kafka/kafka.py | 27 +-
.../client/consumer_protocol_migration_test.py | 63 ++++-
tests/kafkatest/tests/client/consumer_test.py | 37 ++-
16 files changed, 933 insertions(+), 100 deletions(-)
diff --git
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ConsumerBounceTest.java
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ConsumerBounceTest.java
index e5f21a87e22..d8838d1b6e9 100644
---
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ConsumerBounceTest.java
+++
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ConsumerBounceTest.java
@@ -31,6 +31,7 @@ import org.apache.kafka.common.test.ClusterInstance;
import org.apache.kafka.common.test.api.ClusterConfigProperty;
import org.apache.kafka.common.test.api.ClusterTest;
import org.apache.kafka.common.test.api.ClusterTestDefaults;
+import org.apache.kafka.common.test.api.ClusterTests;
import org.apache.kafka.common.test.api.Type;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
@@ -402,7 +403,14 @@ public class ConsumerBounceTest {
testConsumerReceivesFatalExceptionWhenGroupPassesMaxSize(GroupProtocol.CLASSIC);
}
- @ClusterTest
+ @ClusterTests({
+ @ClusterTest(serverProperties = {
+ @ClusterConfigProperty(key =
GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, value =
"0")
+ }),
+ @ClusterTest(serverProperties = {
+ @ClusterConfigProperty(key =
GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, value =
"1000")
+ })
+ })
public void
testAsyncConsumerReceivesFatalExceptionWhenGroupPassesMaxSize() throws
Exception {
testConsumerReceivesFatalExceptionWhenGroupPassesMaxSize(GroupProtocol.CONSUMER);
}
diff --git
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ConsumerIntegrationTest.java
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ConsumerIntegrationTest.java
index 4fa3a63a379..16e38967817 100644
---
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ConsumerIntegrationTest.java
+++
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ConsumerIntegrationTest.java
@@ -232,16 +232,30 @@ public class ConsumerIntegrationTest {
}
}
- @ClusterTest(
- types = {Type.KRAFT},
- brokers = 3,
- serverProperties = {
- @ClusterConfigProperty(id = 0, key = "broker.rack", value =
"rack0"),
- @ClusterConfigProperty(id = 1, key = "broker.rack", value =
"rack1"),
- @ClusterConfigProperty(id = 2, key = "broker.rack", value =
"rack2"),
- @ClusterConfigProperty(key =
GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, value =
"org.apache.kafka.clients.consumer.RackAwareAssignor")
- }
- )
+ @ClusterTests({
+ @ClusterTest(
+ types = {Type.KRAFT},
+ brokers = 3,
+ serverProperties = {
+ @ClusterConfigProperty(id = 0, key = "broker.rack", value =
"rack0"),
+ @ClusterConfigProperty(id = 1, key = "broker.rack", value =
"rack1"),
+ @ClusterConfigProperty(id = 2, key = "broker.rack", value =
"rack2"),
+ @ClusterConfigProperty(key =
GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, value =
"org.apache.kafka.clients.consumer.RackAwareAssignor"),
+ @ClusterConfigProperty(key =
GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, value =
"0")
+ }
+ ),
+ @ClusterTest(
+ types = {Type.KRAFT},
+ brokers = 3,
+ serverProperties = {
+ @ClusterConfigProperty(id = 0, key = "broker.rack", value =
"rack0"),
+ @ClusterConfigProperty(id = 1, key = "broker.rack", value =
"rack1"),
+ @ClusterConfigProperty(id = 2, key = "broker.rack", value =
"rack2"),
+ @ClusterConfigProperty(key =
GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, value =
"org.apache.kafka.clients.consumer.RackAwareAssignor"),
+ @ClusterConfigProperty(key =
GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, value =
"1000")
+ }
+ )
+ })
public void testRackAwareAssignment(ClusterInstance clusterInstance)
throws ExecutionException, InterruptedException {
String topic = "test-topic";
try (Admin admin = clusterInstance.admin();
diff --git
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerSubscriptionTest.java
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerSubscriptionTest.java
index ff840ebf2ea..5dbe2248eb8 100644
---
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerSubscriptionTest.java
+++
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerSubscriptionTest.java
@@ -25,6 +25,7 @@ import org.apache.kafka.common.test.ClusterInstance;
import org.apache.kafka.common.test.api.ClusterConfigProperty;
import org.apache.kafka.common.test.api.ClusterTest;
import org.apache.kafka.common.test.api.ClusterTestDefaults;
+import org.apache.kafka.common.test.api.ClusterTests;
import org.apache.kafka.common.test.api.Type;
import org.apache.kafka.test.TestUtils;
@@ -49,6 +50,7 @@ import static
org.apache.kafka.clients.CommonClientConfigs.METADATA_MAX_AGE_CONF
import static
org.apache.kafka.clients.consumer.ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG;
import static
org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_PROTOCOL_CONFIG;
import static
org.apache.kafka.clients.consumer.ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG;
+import static
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG;
import static
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG;
import static
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG;
import static
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG;
@@ -88,7 +90,14 @@ public class PlaintextConsumerSubscriptionTest {
testPatternSubscription(GroupProtocol.CLASSIC);
}
- @ClusterTest
+ @ClusterTests({
+ @ClusterTest(serverProperties = {
+ @ClusterConfigProperty(key =
CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, value = "0")
+ }),
+ @ClusterTest(serverProperties = {
+ @ClusterConfigProperty(key =
CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, value = "1000")
+ })
+ })
public void testAsyncConsumerPatternSubscription() throws
InterruptedException {
testPatternSubscription(GroupProtocol.CONSUMER);
}
@@ -161,7 +170,14 @@ public class PlaintextConsumerSubscriptionTest {
testSubsequentPatternSubscription(GroupProtocol.CLASSIC);
}
- @ClusterTest
+ @ClusterTests({
+ @ClusterTest(serverProperties = {
+ @ClusterConfigProperty(key =
CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, value = "0")
+ }),
+ @ClusterTest(serverProperties = {
+ @ClusterConfigProperty(key =
CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, value = "1000")
+ })
+ })
public void testAsyncConsumerSubsequentPatternSubscription() throws
InterruptedException {
testSubsequentPatternSubscription(GroupProtocol.CONSUMER);
}
@@ -231,7 +247,14 @@ public class PlaintextConsumerSubscriptionTest {
testPatternUnsubscription(GroupProtocol.CLASSIC);
}
- @ClusterTest
+ @ClusterTests({
+ @ClusterTest(serverProperties = {
+ @ClusterConfigProperty(key =
CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, value = "0")
+ }),
+ @ClusterTest(serverProperties = {
+ @ClusterConfigProperty(key =
CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, value = "1000")
+ })
+ })
public void testAsyncConsumerPatternUnsubscription() throws
InterruptedException {
testPatternUnsubscription(GroupProtocol.CONSUMER);
}
@@ -273,7 +296,14 @@ public class PlaintextConsumerSubscriptionTest {
}
}
- @ClusterTest
+ @ClusterTests({
+ @ClusterTest(serverProperties = {
+ @ClusterConfigProperty(key =
CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, value = "0")
+ }),
+ @ClusterTest(serverProperties = {
+ @ClusterConfigProperty(key =
CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, value = "1000")
+ })
+ })
public void testAsyncConsumerRe2JPatternSubscription() throws
InterruptedException {
Map<String, Object> config = Map.of(GROUP_PROTOCOL_CONFIG,
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT));
try (Consumer<byte[], byte[]> consumer = cluster.consumer(config)) {
@@ -339,7 +369,14 @@ public class PlaintextConsumerSubscriptionTest {
}
}
- @ClusterTest
+ @ClusterTests({
+ @ClusterTest(serverProperties = {
+ @ClusterConfigProperty(key =
CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, value = "0")
+ }),
+ @ClusterTest(serverProperties = {
+ @ClusterConfigProperty(key =
CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, value = "1000")
+ })
+ })
public void testAsyncConsumerRe2JPatternExpandSubscription() throws
InterruptedException {
Map<String, Object> config = Map.of(GROUP_PROTOCOL_CONFIG,
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT));
try (Consumer<byte[], byte[]> consumer = cluster.consumer(config)) {
@@ -417,7 +454,14 @@ public class PlaintextConsumerSubscriptionTest {
}
}
- @ClusterTest
+ @ClusterTests({
+ @ClusterTest(serverProperties = {
+ @ClusterConfigProperty(key =
CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, value = "0")
+ }),
+ @ClusterTest(serverProperties = {
+ @ClusterConfigProperty(key =
CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, value = "1000")
+ })
+ })
public void testRe2JPatternSubscriptionAndTopicSubscription() throws
InterruptedException {
Map<String, Object> config = Map.of(GROUP_PROTOCOL_CONFIG,
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT));
try (Consumer<byte[], byte[]> consumer = cluster.consumer(config)) {
@@ -481,7 +525,14 @@ public class PlaintextConsumerSubscriptionTest {
testExpandingTopicSubscriptions(GroupProtocol.CLASSIC);
}
- @ClusterTest
+ @ClusterTests({
+ @ClusterTest(serverProperties = {
+ @ClusterConfigProperty(key =
CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, value = "0")
+ }),
+ @ClusterTest(serverProperties = {
+ @ClusterConfigProperty(key =
CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, value = "1000")
+ })
+ })
public void testAsyncConsumerExpandingTopicSubscriptions() throws
InterruptedException {
testExpandingTopicSubscriptions(GroupProtocol.CONSUMER);
}
@@ -514,7 +565,14 @@ public class PlaintextConsumerSubscriptionTest {
testShrinkingTopicSubscriptions(GroupProtocol.CLASSIC);
}
- @ClusterTest
+ @ClusterTests({
+ @ClusterTest(serverProperties = {
+ @ClusterConfigProperty(key =
CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, value = "0")
+ }),
+ @ClusterTest(serverProperties = {
+ @ClusterConfigProperty(key =
CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, value = "1000")
+ })
+ })
public void testAsyncConsumerShrinkingTopicSubscriptions() throws
InterruptedException {
testShrinkingTopicSubscriptions(GroupProtocol.CONSUMER);
}
@@ -552,7 +610,14 @@ public class PlaintextConsumerSubscriptionTest {
));
}
- @ClusterTest
+ @ClusterTests({
+ @ClusterTest(serverProperties = {
+ @ClusterConfigProperty(key =
CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, value = "0")
+ }),
+ @ClusterTest(serverProperties = {
+ @ClusterConfigProperty(key =
CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, value = "1000")
+ })
+ })
public void testAsyncConsumerUnsubscribeTopic() throws
InterruptedException {
testUnsubscribeTopic(Map.of(GROUP_PROTOCOL_CONFIG,
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT)));
}
diff --git
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerTest.java
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerTest.java
index 4dd432859e9..ec9704c71ee 100644
---
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerTest.java
+++
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerTest.java
@@ -45,6 +45,7 @@ import org.apache.kafka.common.test.ClusterInstance;
import org.apache.kafka.common.test.api.ClusterConfigProperty;
import org.apache.kafka.common.test.api.ClusterTest;
import org.apache.kafka.common.test.api.ClusterTestDefaults;
+import org.apache.kafka.common.test.api.ClusterTests;
import org.apache.kafka.common.test.api.Flaky;
import org.apache.kafka.common.test.api.Type;
import org.apache.kafka.server.quota.QuotaType;
@@ -101,12 +102,14 @@ import static
org.apache.kafka.clients.producer.ProducerConfig.COMPRESSION_TYPE_
import static
org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
import static
org.apache.kafka.clients.producer.ProducerConfig.LINGER_MS_CONFIG;
import static
org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
+import static
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG;
import static
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG;
import static
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG;
import static
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG;
import static
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -360,7 +363,14 @@ public class PlaintextConsumerTest {
));
}
- @ClusterTest
+ @ClusterTests({
+ @ClusterTest(serverProperties = {
+ @ClusterConfigProperty(key =
CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, value = "0")
+ }),
+ @ClusterTest(serverProperties = {
+ @ClusterConfigProperty(key =
CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, value = "1000")
+ })
+ })
public void testAsyncConsumerGroupConsumption() throws Exception {
testGroupConsumption(Map.of(
GROUP_PROTOCOL_CONFIG,
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT)
@@ -378,6 +388,69 @@ public class PlaintextConsumerTest {
}
}
+ @ClusterTest
+ public void testClassicConsumerGroupConsumptionWithTwoMembers() throws
InterruptedException {
+ testGroupConsumptionWithTwoMembers(Map.of(
+ GROUP_PROTOCOL_CONFIG,
GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT)
+ ));
+ }
+
+ @ClusterTests({
+ @ClusterTest(serverProperties = {
+ @ClusterConfigProperty(key =
CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, value = "0")
+ }),
+ @ClusterTest(serverProperties = {
+ @ClusterConfigProperty(key =
CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, value = "1000")
+ })
+ })
+ public void testAsyncConsumerGroupConsumptionWithTwoMembers() throws
InterruptedException {
+ testGroupConsumptionWithTwoMembers(Map.of(
+ GROUP_PROTOCOL_CONFIG,
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT)
+ ));
+ }
+
+ private void testGroupConsumptionWithTwoMembers(Map<String, Object>
consumerConfig) throws InterruptedException {
+ var fooTopic = "foo";
+ var foo0 = new TopicPartition(fooTopic, 0);
+ var foo1 = new TopicPartition(fooTopic, 1);
+ cluster.createTopic(fooTopic, 2, (short) BROKER_COUNT);
+
+ consumerConfig = new HashMap<>(consumerConfig);
+ consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG,
"group_two_members");
+
+ try (Producer<byte[], byte[]> producer = cluster.producer();
+ Consumer<byte[], byte[]> consumer1 =
cluster.consumer(consumerConfig);
+ Consumer<byte[], byte[]> consumer2 =
cluster.consumer(consumerConfig)
+ ) {
+ var startingTimestamp = System.currentTimeMillis();
+
+ consumer1.subscribe(List.of(fooTopic));
+ awaitAssignment(consumer1, Set.of(foo0, foo1));
+
+ sendRecords(producer, foo0, 10, startingTimestamp);
+ consumeAndVerifyRecords(consumer1, foo0, 10, 0, 0,
startingTimestamp);
+
+ sendRecords(producer, foo1, 10, startingTimestamp);
+ consumeAndVerifyRecords(consumer1, foo1, 10, 0, 0,
startingTimestamp);
+
+ consumer2.subscribe(List.of(fooTopic));
+ TestUtils.waitForCondition(() -> {
+ consumer1.poll(Duration.ofMillis(100));
+ consumer2.poll(Duration.ofMillis(100));
+ return consumer1.assignment().size() == 1 &&
consumer2.assignment().size() == 1;
+ }, "Timed out waiting for rebalance to complete");
+
+ assertTrue(consumer1.assignment().contains(foo0) ||
consumer1.assignment().contains(foo1));
+ assertTrue(consumer2.assignment().contains(foo0) ||
consumer2.assignment().contains(foo1));
+ assertNotEquals(consumer1.assignment(), consumer2.assignment());
+
+ sendRecords(producer, foo0, 10, startingTimestamp);
+ sendRecords(producer, foo1, 10, startingTimestamp);
+ consumeAndVerifyRecords(consumer1,
consumer1.assignment().iterator().next(), 10, 10, 0, startingTimestamp);
+ consumeAndVerifyRecords(consumer2,
consumer2.assignment().iterator().next(), 10, 10, 0, startingTimestamp);
+ }
+ }
+
@ClusterTest
public void testClassicConsumerPartitionsFor() throws Exception {
testPartitionsFor(Map.of(
@@ -1322,7 +1395,14 @@ public class PlaintextConsumerTest {
));
}
- @ClusterTest
+ @ClusterTests({
+ @ClusterTest(serverProperties = {
+ @ClusterConfigProperty(key =
CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, value = "0")
+ }),
+ @ClusterTest(serverProperties = {
+ @ClusterConfigProperty(key =
CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, value = "1000")
+ })
+ })
public void
testAsyncConsumerStaticConsumerDetectsNewPartitionCreatedAfterRestart() throws
Exception {
testStaticConsumerDetectsNewPartitionCreatedAfterRestart(Map.of(
GROUP_PROTOCOL_CONFIG,
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT),
diff --git
a/core/src/test/scala/integration/kafka/api/AbstractAuthorizerIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/AbstractAuthorizerIntegrationTest.scala
index fafce17382c..1ba21895c10 100644
---
a/core/src/test/scala/integration/kafka/api/AbstractAuthorizerIntegrationTest.scala
+++
b/core/src/test/scala/integration/kafka/api/AbstractAuthorizerIntegrationTest.scala
@@ -115,6 +115,7 @@ class AbstractAuthorizerIntegrationTest extends
BaseRequestTest {
properties.put(GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, "1")
properties.put(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG,
"1")
+
properties.put(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG,
"0")
properties.put(GroupCoordinatorConfig.CONSUMER_GROUP_REGEX_REFRESH_INTERVAL_MS_CONFIG,
"10000")
properties.put(TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG,
"1")
properties.put(TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG,
"1")
diff --git
a/core/src/test/scala/unit/kafka/server/ConsumerGroupDescribeRequestTest.scala
b/core/src/test/scala/unit/kafka/server/ConsumerGroupDescribeRequestTest.scala
index 156969f4ece..018fe853973 100644
---
a/core/src/test/scala/unit/kafka/server/ConsumerGroupDescribeRequestTest.scala
+++
b/core/src/test/scala/unit/kafka/server/ConsumerGroupDescribeRequestTest.scala
@@ -42,7 +42,8 @@ import scala.jdk.CollectionConverters._
brokers = 1,
serverProperties = Array(
new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
- new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
+ new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
+ new ClusterConfigProperty(key =
GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, value =
"0")
)
)
class ConsumerGroupDescribeRequestTest(cluster: ClusterInstance) extends
GroupCoordinatorBaseRequestTest(cluster) {
diff --git
a/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala
b/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala
index a5e4eb42dca..20ac6d42347 100644
---
a/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala
+++
b/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala
@@ -33,6 +33,19 @@ import org.junit.jupiter.api.Assertions.{assertEquals,
assertFalse, assertNotEqu
import scala.collection.Map
import scala.jdk.CollectionConverters._
+object ConsumerGroupHeartbeatRequestTest {
+ @ClusterTestDefaults(
+ types = Array(Type.KRAFT),
+ serverProperties = Array(
+ new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
+ new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
+ new ClusterConfigProperty(key =
GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, value =
"0")
+ )
+ )
+ class WithAssignmentBatchingDisabledTest(cluster: ClusterInstance) extends
ConsumerGroupHeartbeatRequestTest(cluster) {
+ }
+}
+
@ClusterTestDefaults(
types = Array(Type.KRAFT),
serverProperties = Array(
@@ -42,6 +55,10 @@ import scala.jdk.CollectionConverters._
)
class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) extends
GroupCoordinatorBaseRequestTest(cluster) {
+ protected def isConsumerAssignmentBatchingEnabled: Boolean = {
+ cluster.brokers.values.stream.allMatch(b =>
b.config.groupCoordinatorConfig.consumerGroupAssignmentIntervalMs > 0)
+ }
+
@ClusterTest(
serverProperties = Array(
new ClusterConfigProperty(key =
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value =
"classic")
@@ -1098,4 +1115,162 @@ class ConsumerGroupHeartbeatRequestTest(cluster:
ClusterInstance) extends GroupC
// Verify duplicate produces same response.
assertEquals(expectedFirstResponse1, duplicateResponse1.data)
}
+
+ @ClusterTest
+ def testConsumerGroupHeartbeatRebalance(): Unit = {
+ createOffsetsTopic()
+
+ val memberId1 = Uuid.randomUuid().toString
+ val memberId2 = Uuid.randomUuid().toString
+ val groupId = "test-rebalance-grp"
+
+ // Create topic.
+ val topicId = createTopic(topic = "foo", numPartitions = 2)
+
+ // Member 1 joins and gets all partitions.
+ val request1 = new ConsumerGroupHeartbeatRequest.Builder(
+ new ConsumerGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId1)
+ .setMemberEpoch(0)
+ .setRebalanceTimeoutMs(5 * 60 * 1000)
+ .setSubscribedTopicNames(List("foo").asJava)
+ .setTopicPartitions(List.empty.asJava)
+ ).build()
+
+ var response1: ConsumerGroupHeartbeatResponse = null
+ val allPartitions = new ConsumerGroupHeartbeatResponseData.Assignment()
+ .setTopicPartitions(List(new
ConsumerGroupHeartbeatResponseData.TopicPartitions()
+ .setTopicId(topicId)
+ .setPartitions(List[Integer](0, 1).asJava)).asJava)
+
+ TestUtils.waitUntilTrue(() => {
+ response1 = connectAndReceive[ConsumerGroupHeartbeatResponse](request1)
+ response1.data.errorCode == Errors.NONE.code &&
+ response1.data.assignment == allPartitions
+ }, msg = s"Member 1 could not get assignment. Last response $response1.")
+
+ // Expected assignment.
+ val expectedAssignment1 = new
ConsumerGroupHeartbeatResponseData.Assignment()
+ .setTopicPartitions(List(new
ConsumerGroupHeartbeatResponseData.TopicPartitions()
+ .setTopicId(topicId)
+ .setPartitions(List[Integer](0, 1).asJava)).asJava)
+
+ val expectedResponse1 = new ConsumerGroupHeartbeatResponseData()
+ .setErrorCode(Errors.NONE.code)
+ .setMemberId(memberId1)
+ .setMemberEpoch(2)
+ .setHeartbeatIntervalMs(response1.data.heartbeatIntervalMs)
+ .setAssignment(expectedAssignment1)
+ assertEquals(expectedResponse1, response1.data)
+
+ val member1InitialEpoch = response1.data.memberEpoch
+
+ // Member 2 joins, triggering rebalance.
+ val request2 = new ConsumerGroupHeartbeatRequest.Builder(
+ new ConsumerGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId2)
+ .setMemberEpoch(0)
+ .setRebalanceTimeoutMs(5 * 60 * 1000)
+ .setSubscribedTopicNames(List("foo").asJava)
+ .setTopicPartitions(List.empty.asJava)
+ ).build()
+
+ var response2: ConsumerGroupHeartbeatResponse = null
+ TestUtils.waitUntilTrue(() => {
+ response2 = connectAndReceive[ConsumerGroupHeartbeatResponse](request2)
+ response2.data.errorCode == Errors.NONE.code
+ }, msg = s"Member 2 could not join. Last response $response2.")
+
+ // Expected assignment.
+ val expectedAssignment2 = new
ConsumerGroupHeartbeatResponseData.Assignment()
+ .setTopicPartitions(List.empty.asJava)
+
+ val expectedResponse2 = new ConsumerGroupHeartbeatResponseData()
+ .setErrorCode(Errors.NONE.code)
+ .setMemberId(memberId2)
+ .setMemberEpoch(if (isConsumerAssignmentBatchingEnabled)
member1InitialEpoch else member1InitialEpoch + 1)
+ .setHeartbeatIntervalMs(response2.data.heartbeatIntervalMs)
+ .setAssignment(expectedAssignment2)
+ assertEquals(expectedResponse2, response2.data)
+
+ val member2InitialEpoch = response2.data.memberEpoch
+
+ // Member 1 heartbeats and sees revoked partition.
+ val request3 = new ConsumerGroupHeartbeatRequest.Builder(
+ new ConsumerGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId1)
+ .setMemberEpoch(member1InitialEpoch)
+ ).build()
+
+ var response3: ConsumerGroupHeartbeatResponse = null
+ if (isConsumerAssignmentBatchingEnabled) {
+ TestUtils.waitUntilTrue(() => {
+ response3 = connectAndReceive[ConsumerGroupHeartbeatResponse](request3)
+ response3.data.errorCode == Errors.NONE.code &&
+ response3.data.assignment != null
+ }, msg = s"Member 1 did not get new assignment before timeout.")
+ } else {
+ response3 = connectAndReceive[ConsumerGroupHeartbeatResponse](request3)
+ assertEquals(Errors.NONE.code, response3.data.errorCode)
+ }
+
+ // Expected assignment.
+ val expectedAssignment3 = new
ConsumerGroupHeartbeatResponseData.Assignment()
+ .setTopicPartitions(List(new
ConsumerGroupHeartbeatResponseData.TopicPartitions()
+ .setTopicId(topicId)
+ .setPartitions(List[Integer](0).asJava)).asJava)
+
+ val expectedResponse3 = new ConsumerGroupHeartbeatResponseData()
+ .setErrorCode(Errors.NONE.code)
+ .setMemberId(memberId1)
+ .setMemberEpoch(member1InitialEpoch)
+ .setHeartbeatIntervalMs(response3.data.heartbeatIntervalMs)
+ .setAssignment(expectedAssignment3)
+ assertEquals(expectedResponse3, response3.data)
+
+ // Member 1 sends heartbeat acknowledging revocation (only reporting
partition 0).
+ val ackRequest1 = new ConsumerGroupHeartbeatRequest.Builder(
+ new ConsumerGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId1)
+ .setMemberEpoch(member1InitialEpoch)
+ .setRebalanceTimeoutMs(5 * 60 * 1000)
+ .setSubscribedTopicNames(List("foo").asJava)
+ .setTopicPartitions(List(new
ConsumerGroupHeartbeatRequestData.TopicPartitions()
+ .setTopicId(topicId)
+ .setPartitions(List[Integer](0).asJava)).asJava)
+ ).build()
+
+ val ackResponse1 =
connectAndReceive[ConsumerGroupHeartbeatResponse](ackRequest1)
+ assertEquals(Errors.NONE.code, ackResponse1.data.errorCode)
+
+ val member1NewEpoch = ackResponse1.data.memberEpoch
+
+ // Member 2 heartbeats and is assigned new partition.
+ val request4 = new ConsumerGroupHeartbeatRequest.Builder(
+ new ConsumerGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId2)
+ .setMemberEpoch(member2InitialEpoch)
+ ).build()
+
+ val response4 = connectAndReceive[ConsumerGroupHeartbeatResponse](request4)
+ assertEquals(Errors.NONE.code, response3.data.errorCode)
+
+ val expectedAssignment4 = new
ConsumerGroupHeartbeatResponseData.Assignment()
+ .setTopicPartitions(List(new
ConsumerGroupHeartbeatResponseData.TopicPartitions()
+ .setTopicId(topicId)
+ .setPartitions(List[Integer](1).asJava)).asJava)
+
+ val expectedResponse4 = new ConsumerGroupHeartbeatResponseData()
+ .setErrorCode(Errors.NONE.code)
+ .setMemberId(memberId2)
+ .setMemberEpoch(member1NewEpoch)
+ .setHeartbeatIntervalMs(response4.data.heartbeatIntervalMs)
+ .setAssignment(expectedAssignment4)
+ assertEquals(expectedResponse4, response4.data)
+ }
}
diff --git
a/core/src/test/scala/unit/kafka/server/ConsumerProtocolMigrationTest.scala
b/core/src/test/scala/unit/kafka/server/ConsumerProtocolMigrationTest.scala
index 3e86e605d2d..4e2606fd107 100644
--- a/core/src/test/scala/unit/kafka/server/ConsumerProtocolMigrationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ConsumerProtocolMigrationTest.scala
@@ -19,20 +19,34 @@ package kafka.server
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol
import org.apache.kafka.common.{TopicPartition, Uuid}
-import org.apache.kafka.common.message.{JoinGroupResponseData,
ListGroupsResponseData, OffsetFetchRequestData, OffsetFetchResponseData,
SyncGroupResponseData}
+import org.apache.kafka.common.message.{ConsumerGroupHeartbeatResponseData,
JoinGroupResponseData, ListGroupsResponseData, OffsetFetchRequestData,
OffsetFetchResponseData, SyncGroupResponseData}
import org.apache.kafka.common.test.api.{ClusterConfigProperty, ClusterTest,
ClusterTestDefaults, Type}
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.test.ClusterInstance
import org.apache.kafka.coordinator.group.{Group, GroupCoordinatorConfig}
import org.apache.kafka.coordinator.group.classic.ClassicGroupState
import
org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroup.ConsumerGroupState
-import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
import org.junit.jupiter.api.Timeout
import java.nio.ByteBuffer
import java.util.Collections
import scala.jdk.CollectionConverters._
+
+object ConsumerProtocolMigrationTest {
+ @ClusterTestDefaults(
+ types = Array(Type.KRAFT),
+ serverProperties = Array(
+ new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
+ new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
+ new ClusterConfigProperty(key =
GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, value =
"0")
+ )
+ )
+ class WithAssignmentBatchingDisabledTest(cluster: ClusterInstance) extends
ConsumerProtocolMigrationTest(cluster) {
+ }
+}
+
@Timeout(120)
@ClusterTestDefaults(
types = Array(Type.KRAFT),
@@ -42,6 +56,11 @@ import scala.jdk.CollectionConverters._
)
)
class ConsumerProtocolMigrationTest(cluster: ClusterInstance) extends
GroupCoordinatorBaseRequestTest(cluster) {
+
+ protected def isConsumerAssignmentBatchingEnabled: Boolean = {
+ cluster.brokers.values.stream.allMatch(b =>
b.config.groupCoordinatorConfig.consumerGroupAssignmentIntervalMs > 0)
+ }
+
@ClusterTest(
serverProperties = Array(
new ClusterConfigProperty(key =
GroupCoordinatorConfig.CONSUMER_GROUP_MIGRATION_POLICY_CONFIG, value =
"bidirectional")
@@ -299,35 +318,10 @@ class ConsumerProtocolMigrationTest(cluster:
ClusterInstance) extends GroupCoord
syncGroupWithOldProtocol(
groupId = groupId,
memberId = memberId2,
- generationId = 3
+ generationId = if (isConsumerAssignmentBatchingEnabled) 2 else 3
)
)
- // Member 2 heartbeats.
- heartbeat(
- groupId = groupId,
- generationId = 3,
- memberId = memberId2
- )
-
- // Member 1 heartbeats to revoke partitions.
- consumerGroupHeartbeat(
- groupId = groupId,
- memberId = memberId1,
- rebalanceTimeoutMs = 5 * 60 * 1000,
- subscribedTopicNames = List("foo"),
- topicPartitions = List.empty,
- expectedError = Errors.NONE
- )
-
- // Member 2 heartbeats and gets REBALANCE_IN_PROGRESS.
- heartbeat(
- groupId = groupId,
- generationId = 3,
- memberId = memberId2,
- expectedError = Errors.REBALANCE_IN_PROGRESS
- )
-
// Downgrade the group by leaving member 1.
leaveGroupWithNewProtocol(
groupId = groupId,
@@ -793,15 +787,25 @@ class ConsumerProtocolMigrationTest(cluster:
ClusterInstance) extends GroupCoord
}
// The joining request with a consumer group member 2 is accepted.
- val memberId2 = consumerGroupHeartbeat(
- groupId = groupId,
- memberId = Uuid.randomUuid.toString,
- instanceId = if (useStaticMembers) instanceId2 else null,
- rebalanceTimeoutMs = 5 * 60 * 1000,
- subscribedTopicNames = List("foo"),
- topicPartitions = List.empty,
- expectedError = Errors.NONE
- ).memberId
+ val memberId2 = Uuid.randomUuid.toString
+ assertEquals(
+ new ConsumerGroupHeartbeatResponseData()
+ .setErrorCode(Errors.NONE.code)
+ .setMemberId(memberId2)
+ .setMemberEpoch(2)
+ .setHeartbeatIntervalMs(5000)
+ .setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment()
+ .setTopicPartitions(List.empty.asJava)),
+ consumerGroupHeartbeat(
+ groupId = groupId,
+ memberId = memberId2,
+ instanceId = if (useStaticMembers) instanceId2 else null,
+ rebalanceTimeoutMs = 5 * 60 * 1000,
+ subscribedTopicNames = List("foo"),
+ topicPartitions = List.empty,
+ expectedError = Errors.NONE
+ )
+ )
// The group has become a consumer group.
assertEquals(
@@ -965,6 +969,7 @@ class ConsumerProtocolMigrationTest(cluster:
ClusterInstance) extends GroupCoord
topicPartitions = List.empty,
expectedError = Errors.NONE
).assignment.topicPartitions.get(0).partitions
+ assertTrue(partitionsOfMember2.size < 3)
// The group has been stabilized.
assertEquals(
@@ -1041,6 +1046,18 @@ class ConsumerProtocolMigrationTest(cluster:
ClusterInstance) extends GroupCoord
expectedAssignment = assignment(List(0, 1,
2).filter(!partitionsOfMember2.contains(_)))
)
+ if (isConsumerAssignmentBatchingEnabled) {
+ // Member 2 changes subscription and the assignment becomes out of date.
+ val memberEpoch2 = consumerGroupHeartbeat(
+ groupId = groupId,
+ memberId = memberId2,
+ memberEpoch = 2,
+ subscribedTopicNames = List("foo", "bar"),
+ expectedError = Errors.NONE
+ ).memberEpoch
+ assertEquals(2, memberEpoch2)
+ }
+
if (!useStaticMembers) {
// Downgrade the group by leaving member 2.
leaveGroupWithNewProtocol(
@@ -1060,15 +1077,16 @@ class ConsumerProtocolMigrationTest(cluster:
ClusterInstance) extends GroupCoord
}
// The group has become a classic group.
- // If the downgrade is triggered by the static member replacement,
- // the group should remain STABLE, otherwise, a rebalance is triggered.
+ // If the downgrade is triggered by the static member replacement and the
+ // assignment is up to date, the group should remain STABLE, otherwise,
+ // a rebalance is triggered.
assertEquals(
List(
new ListGroupsResponseData.ListedGroup()
.setGroupId(groupId)
.setProtocolType("consumer")
.setGroupState(
- if (useStaticMembers)
+ if (useStaticMembers && !isConsumerAssignmentBatchingEnabled)
ClassicGroupState.STABLE.toString
else
ClassicGroupState.PREPARING_REBALANCE.toString
@@ -1128,15 +1146,25 @@ class ConsumerProtocolMigrationTest(cluster:
ClusterInstance) extends GroupCoord
}
// The joining request with a consumer group member 2 is accepted.
- val memberId2 = consumerGroupHeartbeat(
- groupId = groupId,
- memberId = Uuid.randomUuid.toString,
- instanceId = if (useStaticMembers) instanceId2 else null,
- rebalanceTimeoutMs = 5 * 60 * 1000,
- subscribedTopicNames = List("foo"),
- topicPartitions = List.empty,
- expectedError = Errors.NONE
- ).memberId
+ val memberId2 = Uuid.randomUuid.toString
+ assertEquals(
+ new ConsumerGroupHeartbeatResponseData()
+ .setErrorCode(Errors.NONE.code)
+ .setMemberId(memberId2)
+ .setMemberEpoch(2)
+ .setHeartbeatIntervalMs(5000)
+ .setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment()
+ .setTopicPartitions(List.empty.asJava)),
+ consumerGroupHeartbeat(
+ groupId = groupId,
+ memberId = memberId2,
+ instanceId = if (useStaticMembers) instanceId2 else null,
+ rebalanceTimeoutMs = 5 * 60 * 1000,
+ subscribedTopicNames = List("foo"),
+ topicPartitions = List.empty,
+ expectedError = Errors.NONE
+ )
+ )
// The group has become a consumer group.
assertEquals(
@@ -1239,6 +1267,7 @@ class ConsumerProtocolMigrationTest(cluster:
ClusterInstance) extends GroupCoord
generationId = 1
).assignment()
)).partitions()
+ assertTrue(partitionsOfMember1.size < 3)
// Member 1 heartbeats and gets REBALANCE_IN_PROGRESS.
heartbeat(
@@ -1271,7 +1300,7 @@ class ConsumerProtocolMigrationTest(cluster:
ClusterInstance) extends GroupCoord
)
// Member 2 rejoins to retrieve partitions pending assignment.
- consumerGroupHeartbeat(
+ val partitionsOfMember2 = consumerGroupHeartbeat(
groupId = groupId,
memberId = memberId2,
memberEpoch = 2,
@@ -1279,7 +1308,9 @@ class ConsumerProtocolMigrationTest(cluster:
ClusterInstance) extends GroupCoord
subscribedTopicNames = List("foo"),
topicPartitions = List.empty,
expectedError = Errors.NONE
- )
+ ).assignment.topicPartitions.get(0).partitions
+ assertEquals(3, partitionsOfMember1.size + partitionsOfMember2.size)
+
assertTrue(partitionsOfMember1.asScala.intersect(partitionsOfMember2.asScala).isEmpty)
// The group has been stabilized.
assertEquals(
@@ -1296,6 +1327,18 @@ class ConsumerProtocolMigrationTest(cluster:
ClusterInstance) extends GroupCoord
)
)
+ if (isConsumerAssignmentBatchingEnabled) {
+ // Member 2 changes subscription and the assignment becomes out of date.
+ val memberEpoch2 = consumerGroupHeartbeat(
+ groupId = groupId,
+ memberId = memberId2,
+ memberEpoch = 2,
+ subscribedTopicNames = List("foo", "bar"),
+ expectedError = Errors.NONE
+ ).memberEpoch
+ assertEquals(2, memberEpoch2)
+ }
+
if (!useStaticMembers) {
// Downgrade the group by leaving member 2.
leaveGroupWithNewProtocol(
@@ -1315,15 +1358,16 @@ class ConsumerProtocolMigrationTest(cluster:
ClusterInstance) extends GroupCoord
}
// The group has become a classic group.
- // If the downgrade is triggered by the static member replacement,
- // the group should remain STABLE, otherwise, a rebalance is triggered.
+ // If the downgrade is triggered by the static member replacement and the
+ // assignment is up to date, the group should remain STABLE, otherwise,
+ // a rebalance is triggered.
assertEquals(
List(
new ListGroupsResponseData.ListedGroup()
.setGroupId(groupId)
.setProtocolType("consumer")
.setGroupState(
- if (useStaticMembers)
+ if (useStaticMembers && !isConsumerAssignmentBatchingEnabled)
ClassicGroupState.STABLE.toString
else
ClassicGroupState.PREPARING_REBALANCE.toString
diff --git a/core/src/test/scala/unit/kafka/server/ListGroupsRequestTest.scala
b/core/src/test/scala/unit/kafka/server/ListGroupsRequestTest.scala
index 86cf887f2b2..e5df28c3900 100644
--- a/core/src/test/scala/unit/kafka/server/ListGroupsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ListGroupsRequestTest.scala
@@ -30,7 +30,8 @@ import org.junit.jupiter.api.Assertions.assertEquals
serverProperties = Array(
new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
- new ClusterConfigProperty(key =
GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, value = "1000")
+ new ClusterConfigProperty(key =
GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, value = "1000"),
+ new ClusterConfigProperty(key =
GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, value =
"0")
)
)
class ListGroupsRequestTest(cluster: ClusterInstance) extends
GroupCoordinatorBaseRequestTest(cluster) {
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index 364890baae9..0fb6d3ba117 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -3787,6 +3787,39 @@ public class GroupMetadataManager {
);
}
+ /**
+ * Checks whether the next target assignment can be computed right now.
+ *
+ * @param assignmentTimestampMs The time at which the last target
assignment calculation finished,
+ * or 0 if there is no previous assignment.
+ * @param assignmentIntervalMs The interval between assignment updates
+ * @param currentTimeMs The current time in milliseconds.
+ * @return {@code true} if the next target assignment can be computed
right now, {@code false} otherwise.
+ */
+ // package private for testing
+ static boolean canComputeNextTargetAssignment(
+ long assignmentTimestampMs,
+ int assignmentIntervalMs,
+ long currentTimeMs
+ ) {
+ // The next target assignment can be computed immediately
+ // * when there is no existing target assignment,
+ // * or we do not know when the existing target assignment was
computed.
+ if (assignmentTimestampMs == 0) {
+ return true;
+ }
+
+ // The next target assignment can be computed immediately when the
assignment interval is
+ // zero. This provides an escape hatch if the wall clock undergoes a
large backwards
+ // correction.
+ if (assignmentIntervalMs == 0) {
+ return true;
+ }
+
+ // Otherwise, we wait for the assignment interval to elapse.
+ return currentTimeMs >= assignmentTimestampMs + assignmentIntervalMs;
+ }
+
/**
* Updates the target assignment according to the updated member and
subscription metadata.
*
@@ -3811,6 +3844,15 @@ public class GroupMetadataManager {
return
UpdateTargetAssignmentResult.fromLastTargetAssignment(group, updatedMember);
}
+ boolean canComputeNextTargetAssignment =
canComputeNextTargetAssignment(
+ group.assignmentTimestamp(),
+ consumerGroupAssignmentIntervalMs(group.groupId()),
+ time.milliseconds()
+ );
+ if (!canComputeNextTargetAssignment) {
+ return
UpdateTargetAssignmentResult.fromLastTargetAssignment(group, updatedMember);
+ }
+
String preferredServerAssignor = group.computePreferredServerAssignor(
member,
updatedMember
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/classic/ClassicGroup.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/classic/ClassicGroup.java
index 9b937a4201b..0b04dcbf512 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/classic/ClassicGroup.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/classic/ClassicGroup.java
@@ -1429,7 +1429,7 @@ public class ClassicGroup implements Group {
}
byte[] assignment =
Utils.toArray(ConsumerProtocol.serializeAssignment(
toConsumerProtocolAssignment(
-
consumerGroup.targetAssignment().get(memberId).partitions(),
+ consumerGroup.targetAssignment(memberId).partitions(),
image
),
ConsumerProtocol.deserializeVersion(
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index fdb47f417f2..3f2c06b2315 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -13690,6 +13690,109 @@ public class GroupMetadataManagerTest {
assertTrue(classicGroup.isInState(PREPARING_REBALANCE));
}
+ @Test
+ public void
testLastStaticConsumerProtocolMemberReplacedByClassicProtocolMemberWhenTargetAssignmentIsMissing()
throws ExecutionException, InterruptedException {
+ String groupId = "group-id";
+ String memberId1 = Uuid.randomUuid().toString();
+ String oldMemberId2 = Uuid.randomUuid().toString();
+ String instanceId = "instance-id";
+
+ Uuid fooTopicId = Uuid.randomUuid();
+ String fooTopicName = "foo";
+ Uuid barTopicId = Uuid.randomUuid();
+ String barTopicName = "bar";
+
+ List<ConsumerGroupMemberMetadataValue.ClassicProtocol> protocols1 =
List.of(
+ new ConsumerGroupMemberMetadataValue.ClassicProtocol()
+ .setName(NoOpPartitionAssignor.NAME)
+
.setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription(new
ConsumerPartitionAssignor.Subscription(
+ List.of(fooTopicName, barTopicName),
+ null,
+ List.of()
+ ))))
+ );
+
+ ConsumerGroupMember member1 = new
ConsumerGroupMember.Builder(memberId1)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(10)
+ .setPreviousMemberEpoch(10)
+ .setClientId(DEFAULT_CLIENT_ID)
+ .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+ .setSubscribedTopicNames(List.of(fooTopicName, barTopicName))
+ .setServerAssignorName(NoOpPartitionAssignor.NAME)
+ .setRebalanceTimeoutMs(45000)
+ .setClassicMemberMetadata(
+ new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()
+ .setSessionTimeoutMs(5000)
+ .setSupportedProtocols(protocols1)
+ )
+ .setAssignedPartitions(toAssignmentWithEpochs(mkAssignment(), 10))
+ .build();
+ ConsumerGroupMember oldMember2 = new
ConsumerGroupMember.Builder(oldMemberId2)
+ .setInstanceId(instanceId)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(10)
+ .setPreviousMemberEpoch(9)
+ .setClientId(DEFAULT_CLIENT_ID)
+ .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+ .setSubscribedTopicNames(List.of(fooTopicName, barTopicName))
+ .setServerAssignorName(NoOpPartitionAssignor.NAME)
+ .setRebalanceTimeoutMs(45000)
+ .setAssignedPartitions(toAssignmentWithEpochs(mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5),
+ mkTopicAssignment(barTopicId, 0, 1)), 10))
+ .build();
+
+ CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 6)
+ .addTopic(barTopicId, barTopicName, 2)
+ .addRacks()
+ .buildCoordinatorMetadataImage();
+
+ // Consumer group with two members.
+ // Member 1 uses the classic protocol and static member 2 uses the
consumer protocol.
+ // Member 1 has just joined and does not have an assignment yet.
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_MIGRATION_POLICY_CONFIG,
ConsumerGroupMigrationPolicy.DOWNGRADE.toString())
+
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, List.of(new
NoOpPartitionAssignor()))
+ .withMetadataImage(metadataImage)
+ .withConsumerGroup(new ConsumerGroupBuilder(groupId, 11)
+ .withMember(member1)
+ .withMember(oldMember2)
+ .withAssignment(oldMemberId2, mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5),
+ mkTopicAssignment(barTopicId, 0, 1)))
+ .withAssignmentEpoch(10)
+ .withMetadataHash(computeGroupHash(Map.of(
+ fooTopicName, computeTopicHash(fooTopicName,
metadataImage),
+ barTopicName, computeTopicHash(barTopicName, metadataImage)
+ ))))
+ .build();
+
+ // A new member using classic protocol with the same instance id
joins, scheduling the downgrade.
+ byte[] protocolsMetadata2 =
Utils.toArray(ConsumerProtocol.serializeSubscription(new
ConsumerPartitionAssignor.Subscription(
+ List.of(fooTopicName, barTopicName))));
+ JoinGroupRequestData.JoinGroupRequestProtocolCollection protocols2 =
+ new JoinGroupRequestData.JoinGroupRequestProtocolCollection(1);
+ protocols2.add(new JoinGroupRequestProtocol()
+ .setName(NoOpPartitionAssignor.NAME)
+ .setMetadata(protocolsMetadata2));
+ JoinGroupRequestData joinRequest = new
GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
+ .withGroupId(groupId)
+ .withMemberId(UNKNOWN_MEMBER_ID)
+ .withGroupInstanceId(instanceId)
+ .withProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
+ .withProtocols(protocols2)
+ .build();
+ GroupMetadataManagerTestContext.JoinResult result =
context.sendClassicGroupJoin(joinRequest);
+ result.appendFuture.complete(null);
+ result.joinFuture.get();
+
+ // A rebalance is triggered.
+ ClassicGroup classicGroup =
context.groupMetadataManager.getOrMaybeCreateClassicGroup(groupId, false);
+ assertTrue(classicGroup.isInState(PREPARING_REBALANCE));
+ }
+
@Test
public void testJoiningConsumerGroupThrowsExceptionIfGroupOverMaxSize() {
String groupId = "group-id";
@@ -26673,6 +26776,216 @@ public class GroupMetadataManagerTest {
test.assertions.run();
}
+ @Test
+ public void
testCanComputeNextTargetAssignmentWithNoPreviousAssignmentTimestamp() {
+ // The next target assignment can always be computed when there is no
previous assignment timestamp.
+ assertTrue(GroupMetadataManager.canComputeNextTargetAssignment(0,
1000, 0));
+ }
+
+ @Test
+ public void
testCanComputeNextTargetAssignmentWithZeroAssignmentIntervalMs() {
+ // The next target assignment can always be computed when the
assignment interval is zero.
+
assertTrue(GroupMetadataManager.canComputeNextTargetAssignment(1000000, 0, 0));
+
assertTrue(GroupMetadataManager.canComputeNextTargetAssignment(1000000, 0,
999999));
+
assertTrue(GroupMetadataManager.canComputeNextTargetAssignment(1000000, 0,
1000000));
+ }
+
+ @Test
+ public void testCanComputeNextTargetAssignment() {
+ // The next target assignment cannot be computed before the timestamp
of the end of the previous assignment computation.
+
assertFalse(GroupMetadataManager.canComputeNextTargetAssignment(1000000, 5000,
0));
+
assertFalse(GroupMetadataManager.canComputeNextTargetAssignment(1000000, 5000,
999999));
+
+ // The next target assignment cannot be computed before the assignment
interval has elapsed.
+
assertFalse(GroupMetadataManager.canComputeNextTargetAssignment(1000000, 5000,
1000000));
+
assertFalse(GroupMetadataManager.canComputeNextTargetAssignment(1000000, 5000,
1004999));
+
+ // The next target assignment can be computed after the assignment
interval has elapsed.
+
assertTrue(GroupMetadataManager.canComputeNextTargetAssignment(1000000, 5000,
1005000));
+
assertTrue(GroupMetadataManager.canComputeNextTargetAssignment(1000000, 5000,
1007500));
+ }
+
+ @Test
+ public void testConsumerGroupAssignmentInterval() {
+ String groupId = "fooup";
+ String memberId1 = Uuid.randomUuid().toString();
+ String memberId2 = Uuid.randomUuid().toString();
+
+ Uuid fooTopicId = Uuid.randomUuid();
+ String fooTopicName = "foo";
+
+ MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+
+ CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 6)
+ .buildCoordinatorMetadataImage();
+
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG,
5000)
+
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG,
List.of(assignor))
+ .withMetadataImage(metadataImage)
+ .build();
+
+ // Member 1 joins the group and gets an assignment immediately.
+ assignor.prepareGroupAssignment(new GroupAssignment(Map.of(
+ memberId1, new MemberAssignmentImpl(mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)
+ ))
+ )));
+ CoordinatorResult<ConsumerGroupHeartbeatResponseData,
CoordinatorRecord> result1 = context.consumerGroupHeartbeat(
+ new ConsumerGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId1)
+ .setMemberEpoch(0)
+ .setRebalanceTimeoutMs(5000)
+ .setServerAssignor("range")
+ .setSubscribedTopicNames(List.of(fooTopicName))
+ .setTopicPartitions(List.of()));
+
+ assertResponseEquals(
+ new ConsumerGroupHeartbeatResponseData()
+ .setMemberId(memberId1)
+ .setMemberEpoch(2)
+ .setHeartbeatIntervalMs(5000)
+ .setAssignment(new
ConsumerGroupHeartbeatResponseData.Assignment()
+ .setTopicPartitions(List.of(
+ new
ConsumerGroupHeartbeatResponseData.TopicPartitions()
+ .setTopicId(fooTopicId)
+ .setPartitions(List.of(0, 1, 2, 3, 4, 5))
+ ))),
+ result1.response()
+ );
+
+ ConsumerGroupMember expectedMember1 = new
ConsumerGroupMember.Builder(memberId1)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(2)
+ .setPreviousMemberEpoch(0)
+ .setClientId(DEFAULT_CLIENT_ID)
+ .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+ .setRebalanceTimeoutMs(5000)
+ .setSubscribedTopicNames(List.of(fooTopicName))
+ .setServerAssignorName("range")
+ .setAssignedPartitions(toAssignmentWithEpochs(mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)), 2))
+ .build();
+
+ assertRecordsEquals(
+ List.of(
+
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId,
expectedMember1),
+
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 2,
computeGroupHash(Map.of(
+ fooTopicName, computeTopicHash(fooTopicName, metadataImage)
+ ))),
+
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
memberId1, mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)
+ )),
+
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord(groupId,
2, context.time.milliseconds()),
+
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
expectedMember1)
+ ),
+ result1.records()
+ );
+
+ // Wait until just before the expected delay.
+ context.time.sleep(4995);
+
+ // Member 2 joins the group and gets no assignment.
+ assignor.prepareGroupAssignment(new GroupAssignment(Map.of(
+ memberId1, new MemberAssignmentImpl(mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1, 2)
+ )),
+ memberId2, new MemberAssignmentImpl(mkAssignment(
+ mkTopicAssignment(fooTopicId, 3, 4, 5)
+ ))
+ )));
+ CoordinatorResult<ConsumerGroupHeartbeatResponseData,
CoordinatorRecord> result2 = context.consumerGroupHeartbeat(
+ new ConsumerGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId2)
+ .setMemberEpoch(0)
+ .setRebalanceTimeoutMs(5000)
+ .setServerAssignor("range")
+ .setSubscribedTopicNames(List.of(fooTopicName))
+ .setTopicPartitions(List.of()));
+
+ assertResponseEquals(
+ new ConsumerGroupHeartbeatResponseData()
+ .setMemberId(memberId2)
+ .setMemberEpoch(2)
+ .setHeartbeatIntervalMs(5000)
+ .setAssignment(new
ConsumerGroupHeartbeatResponseData.Assignment()
+ .setTopicPartitions(List.of())),
+ result2.response()
+ );
+
+ ConsumerGroupMember expectedMember2 = new
ConsumerGroupMember.Builder(memberId2)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(2)
+ .setPreviousMemberEpoch(0)
+ .setClientId(DEFAULT_CLIENT_ID)
+ .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+ .setRebalanceTimeoutMs(5000)
+ .setSubscribedTopicNames(List.of(fooTopicName))
+ .setServerAssignorName("range")
+ .setAssignedPartitions(toAssignmentWithEpochs(mkAssignment(), 2))
+ .build();
+
+ assertRecordsEquals(
+ List.of(
+
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId,
expectedMember2),
+
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 3,
computeGroupHash(Map.of(
+ fooTopicName, computeTopicHash(fooTopicName, metadataImage)
+ ))),
+
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
expectedMember2)
+ ),
+ result2.records()
+ );
+
+ // Wait a little more. The next target assignment can be computed now.
+ context.time.sleep(10);
+
+ // The next target assignment is computed.
+ CoordinatorResult<ConsumerGroupHeartbeatResponseData,
CoordinatorRecord> result3 = context.consumerGroupHeartbeat(
+ new ConsumerGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId2)
+ .setMemberEpoch(2));
+
+ assertResponseEquals(
+ new ConsumerGroupHeartbeatResponseData()
+ .setMemberId(memberId2)
+ .setMemberEpoch(3)
+ .setHeartbeatIntervalMs(5000),
+ result3.response()
+ );
+
+ ConsumerGroupMember expectedMember3 = new
ConsumerGroupMember.Builder(memberId2)
+ .setState(MemberState.UNRELEASED_PARTITIONS)
+ .setMemberEpoch(3)
+ .setPreviousMemberEpoch(2)
+ .setClientId(DEFAULT_CLIENT_ID)
+ .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+ .setRebalanceTimeoutMs(5000)
+ .setSubscribedTopicNames(List.of(fooTopicName))
+ .setServerAssignorName("range")
+ .setAssignedPartitions(toAssignmentWithEpochs(mkAssignment(), 2))
+ .build();
+
+ assertUnorderedRecordsEquals(
+ List.of(
+ List.of(
+
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
memberId1, mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1, 2)
+ )),
+
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
memberId2, mkAssignment(
+ mkTopicAssignment(fooTopicId, 3, 4, 5)
+ ))
+ ),
+
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord(groupId,
3, context.time.milliseconds())),
+
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
expectedMember3))
+ ),
+ result3.records()
+ );
+ }
+
private static void checkJoinGroupResponse(
JoinGroupResponseData expectedResponse,
JoinGroupResponseData actualResponse,
diff --git a/tests/kafkatest/services/kafka/config_property.py
b/tests/kafkatest/services/kafka/config_property.py
index 702120e1625..2148b3bf3e9 100644
--- a/tests/kafkatest/services/kafka/config_property.py
+++ b/tests/kafkatest/services/kafka/config_property.py
@@ -75,9 +75,13 @@ DELEGATION_TOKEN_SECRET_KEY="delegation.token.secret.key"
SASL_ENABLED_MECHANISMS="sasl.enabled.mechanisms"
CONSUMER_GROUP_MIGRATION_POLICY = "group.consumer.migration.policy"
+CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS = "group.consumer.assignment.interval.ms"
SHARE_COORDINATOR_STATE_TOPIC_REPLICATION_FACTOR
="share.coordinator.state.topic.replication.factor"
SHARE_COORDINATOR_STATE_TOPIC_MIN_ISR = "share.coordinator.state.topic.min.isr"
+SHARE_GROUP_ASSIGNMENT_INTERVAL_MS = "group.share.assignment.interval.ms"
+
+STREAMS_GROUP_ASSIGNMENT_INTERVAL_MS = "group.streams.assignment.interval.ms"
UNSTABLE_API_VERSIONS_ENABLE = "unstable.api.versions.enable"
UNSTABLE_FEATURE_VERSIONS_ENABLE = "unstable.feature.versions.enable"
diff --git a/tests/kafkatest/services/kafka/kafka.py
b/tests/kafkatest/services/kafka/kafka.py
index 252ad520f53..be5a5201ce4 100644
--- a/tests/kafkatest/services/kafka/kafka.py
+++ b/tests/kafkatest/services/kafka/kafka.py
@@ -206,7 +206,8 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin,
Service):
consumer_group_migration_policy=None,
dynamicRaftQuorum=False,
use_transactions_v2=False,
- use_streams_groups=False
+ use_streams_groups=False,
+ enable_assignment_batching=None
):
"""
:param context: test context
@@ -271,6 +272,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin,
Service):
:param dynamicRaftQuorum: When true,
controller_quorum_bootstrap_servers, and bootstraps the first controller using
the standalone flag
:param use_transactions_v2: When true, uses transaction.version=2
which utilizes the new transaction protocol introduced in KIP-890
:param use_streams_groups: When true, enables the use of streams
groups introduced in KIP-1071
+ :param enable_assignment_batching: When true, enables assignment
batching introduced in KIP-1263. If not specified, defaults to True.
"""
self.zk = zk
@@ -296,6 +298,18 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin,
Service):
consumer_group_migration_policy = context.globals.get(arg_name)
self.consumer_group_migration_policy = consumer_group_migration_policy
+ # Set enable_assignment_batching based on context and arguments.
+ # If not specified, defaults to true.
+ if enable_assignment_batching is None:
+ arg_name = 'enable_assignment_batching'
+ if context.injected_args is not None:
+ enable_assignment_batching =
context.injected_args.get(arg_name)
+ if enable_assignment_batching is None:
+ enable_assignment_batching = context.globals.get(arg_name)
+ if enable_assignment_batching is None:
+ enable_assignment_batching = True
+ self.enable_assignment_batching = enable_assignment_batching
+
if num_nodes < 1:
raise Exception("Must set a positive number of nodes: %i" %
num_nodes)
self.num_nodes_broker_role = 0
@@ -346,7 +360,8 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin,
Service):
isolated_kafka=self,
allow_zk_with_kraft=self.allow_zk_with_kraft,
server_prop_overrides=server_prop_overrides,
dynamicRaftQuorum=self.dynamicRaftQuorum,
use_transactions_v2=self.use_transactions_v2,
- use_streams_groups=self.use_streams_groups
+ use_streams_groups=self.use_streams_groups,
+ enable_assignment_batching=self.enable_assignment_batching
)
self.controller_quorum = self.isolated_controller_quorum
@@ -771,6 +786,14 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin,
Service):
override_configs[config_property.UNSTABLE_API_VERSIONS_ENABLE] =
str(True)
override_configs[config_property.UNSTABLE_FEATURE_VERSIONS_ENABLE]
= str(True)
+ if self.enable_assignment_batching:
+ # Assignment batching is enabled by default in Kafka
+ pass
+ else:
+
override_configs[config_property.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS] = "0"
+
override_configs[config_property.SHARE_GROUP_ASSIGNMENT_INTERVAL_MS] = "0"
+
override_configs[config_property.STREAMS_GROUP_ASSIGNMENT_INTERVAL_MS] = "0"
+
#update template configs with test override configs
configs.update(override_configs)
diff --git a/tests/kafkatest/tests/client/consumer_protocol_migration_test.py
b/tests/kafkatest/tests/client/consumer_protocol_migration_test.py
index ce4f0431bb2..bfe5a1eebaa 100644
--- a/tests/kafkatest/tests/client/consumer_protocol_migration_test.py
+++ b/tests/kafkatest/tests/client/consumer_protocol_migration_test.py
@@ -108,24 +108,37 @@ class
ConsumerProtocolMigrationTest(VerifiableConsumerTest):
metadata_quorum=[quorum.isolated_kraft],
consumer_group_migration_policy=["disabled"],
consumer_version=consumer_versions_supporting_range_assignnor,
- assignment_strategy=[RANGE]
+ assignment_strategy=[RANGE],
+ enable_assignment_batching=[True]
)
@matrix(
static_membership=[True],
metadata_quorum=[quorum.isolated_kraft],
consumer_group_migration_policy=["disabled"],
consumer_version=consumer_versions_supporting_static_membership,
- assignment_strategy=[RANGE]
+ assignment_strategy=[RANGE],
+ enable_assignment_batching=[True]
)
@matrix(
static_membership=[True, False],
metadata_quorum=[quorum.isolated_kraft],
consumer_group_migration_policy=["disabled"],
consumer_version=consumer_versions_supporting_cooperative_sticky_assignor,
- assignment_strategy=[COOPERATIVE_STICKEY]
+ assignment_strategy=[COOPERATIVE_STICKEY],
+ enable_assignment_batching=[True]
+ )
+ @matrix(
+ static_membership=[True, False],
+ metadata_quorum=[quorum.isolated_kraft],
+ consumer_group_migration_policy=["disabled"],
+ # Test the latest version only without assignment batching.
+
consumer_version=consumer_versions_supporting_cooperative_sticky_assignor[-1:],
+ assignment_strategy=[RANGE, COOPERATIVE_STICKEY],
+ enable_assignment_batching=[False]
)
def test_consumer_offline_migration(self, static_membership,
metadata_quorum,
- consumer_group_migration_policy,
consumer_version, assignment_strategy):
+ consumer_group_migration_policy,
consumer_version, assignment_strategy,
+ enable_assignment_batching):
"""
Verify correct consumer behavior when the consumers in the group are
restarted to perform
offline upgrade/downgrade.
@@ -178,24 +191,37 @@ class
ConsumerProtocolMigrationTest(VerifiableConsumerTest):
metadata_quorum=[quorum.isolated_kraft],
consumer_group_migration_policy=["bidirectional", "upgrade"],
consumer_version=consumer_versions_supporting_range_assignnor,
- assignment_strategy=[RANGE]
+ assignment_strategy=[RANGE],
+ enable_assignment_batching=[True]
)
@matrix(
static_membership=[True],
metadata_quorum=[quorum.isolated_kraft],
consumer_group_migration_policy=["bidirectional", "upgrade"],
consumer_version=consumer_versions_supporting_static_membership,
- assignment_strategy=[RANGE]
+ assignment_strategy=[RANGE],
+ enable_assignment_batching=[True]
)
@matrix(
static_membership=[True, False],
metadata_quorum=[quorum.isolated_kraft],
consumer_group_migration_policy=["bidirectional", "upgrade"],
consumer_version=consumer_versions_supporting_cooperative_sticky_assignor,
- assignment_strategy=[COOPERATIVE_STICKEY]
+ assignment_strategy=[COOPERATIVE_STICKEY],
+ enable_assignment_batching=[True]
+ )
+ @matrix(
+ static_membership=[True, False],
+ metadata_quorum=[quorum.isolated_kraft],
+ consumer_group_migration_policy=["bidirectional", "upgrade"],
+ # Test the latest version only without assignment batching.
+
consumer_version=consumer_versions_supporting_cooperative_sticky_assignor[-1:],
+ assignment_strategy=[RANGE, COOPERATIVE_STICKEY],
+ enable_assignment_batching=[False]
)
def test_consumer_rolling_upgrade(self, static_membership, metadata_quorum,
- consumer_group_migration_policy,
consumer_version, assignment_strategy):
+ consumer_group_migration_policy,
consumer_version, assignment_strategy,
+ enable_assignment_batching):
"""
Verify correct consumer behavior when the consumers in the group are
restarted to perform
online upgrade when the migration policy is set to be UPGRADE.
@@ -239,24 +265,37 @@ class
ConsumerProtocolMigrationTest(VerifiableConsumerTest):
metadata_quorum=[quorum.isolated_kraft],
consumer_group_migration_policy=["downgrade"],
consumer_version=consumer_versions_supporting_range_assignnor,
- assignment_strategy=[RANGE]
+ assignment_strategy=[RANGE],
+ enable_assignment_batching=[True]
)
@matrix(
static_membership=[True],
metadata_quorum=[quorum.isolated_kraft],
consumer_group_migration_policy=["downgrade"],
consumer_version=consumer_versions_supporting_static_membership,
- assignment_strategy=[RANGE]
+ assignment_strategy=[RANGE],
+ enable_assignment_batching=[True]
)
@matrix(
static_membership=[True, False],
metadata_quorum=[quorum.isolated_kraft],
consumer_group_migration_policy=["downgrade"],
consumer_version=consumer_versions_supporting_cooperative_sticky_assignor,
- assignment_strategy=[COOPERATIVE_STICKEY]
+ assignment_strategy=[COOPERATIVE_STICKEY],
+ enable_assignment_batching=[True]
+ )
+ @matrix(
+ static_membership=[True, False],
+ metadata_quorum=[quorum.isolated_kraft],
+ consumer_group_migration_policy=["downgrade"],
+ # Test the latest version only without assignment batching.
+
consumer_version=consumer_versions_supporting_cooperative_sticky_assignor[-1:],
+ assignment_strategy=[RANGE, COOPERATIVE_STICKEY],
+ enable_assignment_batching=[False]
)
def test_consumer_rolling_downgrade(self, static_membership,
metadata_quorum,
- consumer_group_migration_policy,
consumer_version, assignment_strategy):
+ consumer_group_migration_policy,
consumer_version, assignment_strategy,
+ enable_assignment_batching):
"""
Verify correct consumer behavior when the consumers in the group are
restarted to perform
online downgrade when the migration policy is set to be DOWNGRADE.
diff --git a/tests/kafkatest/tests/client/consumer_test.py
b/tests/kafkatest/tests/client/consumer_test.py
index c28cba1431b..6b71d9769f7 100644
--- a/tests/kafkatest/tests/client/consumer_test.py
+++ b/tests/kafkatest/tests/client/consumer_test.py
@@ -138,9 +138,17 @@ class OffsetValidationTest(VerifiableConsumerTest):
clean_shutdown=[True],
bounce_mode=["all", "rolling"],
metadata_quorum=[quorum.isolated_kraft],
- group_protocol=consumer_group.all_group_protocols
+ group_protocol=consumer_group.all_group_protocols,
+ enable_assignment_batching=[True]
)
- def test_consumer_bounce(self, clean_shutdown, bounce_mode,
metadata_quorum=quorum.isolated_kraft, group_protocol=None):
+ @matrix(
+ clean_shutdown=[True],
+ bounce_mode=["all", "rolling"],
+ metadata_quorum=[quorum.isolated_kraft],
+ group_protocol=[consumer_group.consumer_group_protocol],
+ enable_assignment_batching=[False]
+ )
+ def test_consumer_bounce(self, clean_shutdown, bounce_mode,
metadata_quorum=quorum.isolated_kraft, group_protocol=None,
enable_assignment_batching=True):
"""
Verify correct consumer behavior when the consumers in the group are
consecutively restarted.
@@ -386,7 +394,15 @@ class OffsetValidationTest(VerifiableConsumerTest):
clean_shutdown=[True],
enable_autocommit=[True, False],
metadata_quorum=[quorum.isolated_kraft],
- group_protocol=consumer_group.all_group_protocols
+ group_protocol=consumer_group.all_group_protocols,
+ enable_assignment_batching=[True]
+ )
+ @matrix(
+ clean_shutdown=[True],
+ enable_autocommit=[True],
+ metadata_quorum=[quorum.isolated_kraft],
+ group_protocol=[consumer_group.consumer_group_protocol],
+ enable_assignment_batching=[False]
)
def test_consumer_failure(self, clean_shutdown, enable_autocommit,
metadata_quorum=quorum.isolated_kraft, group_protocol=None):
partition = TopicPartition(self.TOPIC, 0)
@@ -478,9 +494,15 @@ class OffsetValidationTest(VerifiableConsumerTest):
@cluster(num_nodes=7)
@matrix(
metadata_quorum=[quorum.isolated_kraft],
- group_protocol=consumer_group.all_group_protocols
+ group_protocol=consumer_group.all_group_protocols,
+ enable_assignment_batching=[True]
+ )
+ @matrix(
+ metadata_quorum=[quorum.isolated_kraft],
+ group_protocol=[consumer_group.consumer_group_protocol],
+ enable_assignment_batching=[False]
)
- def test_group_consumption(self, metadata_quorum=quorum.isolated_kraft,
group_protocol=None):
+ def test_group_consumption(self, metadata_quorum=quorum.isolated_kraft,
group_protocol=None, enable_assignment_batching=True):
"""
Verifies correct group rebalance behavior as consumers are started and
stopped.
In particular, this test verifies that the partition is readable after
every
@@ -541,9 +563,10 @@ class AssignmentValidationTest(VerifiableConsumerTest):
@matrix(
metadata_quorum=[quorum.isolated_kraft],
group_protocol=[consumer_group.consumer_group_protocol],
- group_remote_assignor=consumer_group.all_remote_assignors
+ group_remote_assignor=consumer_group.all_remote_assignors,
+ enable_assignment_batching=[False, True]
)
- def test_valid_assignment(self, assignment_strategy=None,
metadata_quorum=quorum.isolated_kraft, group_protocol=None,
group_remote_assignor=None):
+ def test_valid_assignment(self, assignment_strategy=None,
metadata_quorum=quorum.isolated_kraft, group_protocol=None,
group_remote_assignor=None, enable_assignment_batching=True):
"""
Verify assignment strategy correctness: each partition is assigned to
exactly
one consumer instance.