This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 422845feefc KAFKA-20291 [1/3]: Implement assignment batching for 
consumer groups (#21720)
422845feefc is described below

commit 422845feefc02a2ac416d68d57aa5ab0428a6003
Author: Sean Quah <[email protected]>
AuthorDate: Wed Mar 18 16:44:50 2026 +0000

    KAFKA-20291 [1/3]: Implement assignment batching for consumer groups 
(#21720)
    
    Implement target assignment batching for consumer groups.
    
    Delay computing the next assignment for consumer groups until their
    assignment interval has elapsed since their last assignment computation
    finished.
    
    We run request tests with assignment batching disabled, except for
    heartbeat tests, which we run with and without assignment batching.
    
    We run a selection of integration and system tests with and without
    assignment batching.
    
    Reviewers: Christo Lolov <[email protected]>, David Jacot
     <[email protected]>
---
 .../kafka/clients/consumer/ConsumerBounceTest.java |  10 +-
 .../clients/consumer/ConsumerIntegrationTest.java  |  34 ++-
 .../PlaintextConsumerSubscriptionTest.java         |  83 +++++-
 .../clients/consumer/PlaintextConsumerTest.java    |  84 +++++-
 .../api/AbstractAuthorizerIntegrationTest.scala    |   1 +
 .../server/ConsumerGroupDescribeRequestTest.scala  |   3 +-
 .../server/ConsumerGroupHeartbeatRequestTest.scala | 175 ++++++++++++
 .../server/ConsumerProtocolMigrationTest.scala     | 152 ++++++----
 .../unit/kafka/server/ListGroupsRequestTest.scala  |   3 +-
 .../coordinator/group/GroupMetadataManager.java    |  42 +++
 .../coordinator/group/classic/ClassicGroup.java    |   2 +-
 .../group/GroupMetadataManagerTest.java            | 313 +++++++++++++++++++++
 tests/kafkatest/services/kafka/config_property.py  |   4 +
 tests/kafkatest/services/kafka/kafka.py            |  27 +-
 .../client/consumer_protocol_migration_test.py     |  63 ++++-
 tests/kafkatest/tests/client/consumer_test.py      |  37 ++-
 16 files changed, 933 insertions(+), 100 deletions(-)

diff --git 
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ConsumerBounceTest.java
 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ConsumerBounceTest.java
index e5f21a87e22..d8838d1b6e9 100644
--- 
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ConsumerBounceTest.java
+++ 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ConsumerBounceTest.java
@@ -31,6 +31,7 @@ import org.apache.kafka.common.test.ClusterInstance;
 import org.apache.kafka.common.test.api.ClusterConfigProperty;
 import org.apache.kafka.common.test.api.ClusterTest;
 import org.apache.kafka.common.test.api.ClusterTestDefaults;
+import org.apache.kafka.common.test.api.ClusterTests;
 import org.apache.kafka.common.test.api.Type;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
@@ -402,7 +403,14 @@ public class ConsumerBounceTest {
         
testConsumerReceivesFatalExceptionWhenGroupPassesMaxSize(GroupProtocol.CLASSIC);
     }
 
-    @ClusterTest
+    @ClusterTests({
+        @ClusterTest(serverProperties = {
+            @ClusterConfigProperty(key = 
GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, value = 
"0")
+        }),
+        @ClusterTest(serverProperties = {
+            @ClusterConfigProperty(key = 
GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, value = 
"1000")
+        })
+    })
     public void 
testAsyncConsumerReceivesFatalExceptionWhenGroupPassesMaxSize() throws 
Exception {
         
testConsumerReceivesFatalExceptionWhenGroupPassesMaxSize(GroupProtocol.CONSUMER);
     }
diff --git 
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ConsumerIntegrationTest.java
 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ConsumerIntegrationTest.java
index 4fa3a63a379..16e38967817 100644
--- 
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ConsumerIntegrationTest.java
+++ 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ConsumerIntegrationTest.java
@@ -232,16 +232,30 @@ public class ConsumerIntegrationTest {
         }
     }
 
-    @ClusterTest(
-        types = {Type.KRAFT},
-        brokers = 3,
-        serverProperties = {
-            @ClusterConfigProperty(id = 0, key = "broker.rack", value = 
"rack0"),
-            @ClusterConfigProperty(id = 1, key = "broker.rack", value = 
"rack1"),
-            @ClusterConfigProperty(id = 2, key = "broker.rack", value = 
"rack2"),
-            @ClusterConfigProperty(key = 
GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, value = 
"org.apache.kafka.clients.consumer.RackAwareAssignor")
-        }
-    )
+    @ClusterTests({
+        @ClusterTest(
+            types = {Type.KRAFT},
+            brokers = 3,
+            serverProperties = {
+                @ClusterConfigProperty(id = 0, key = "broker.rack", value = 
"rack0"),
+                @ClusterConfigProperty(id = 1, key = "broker.rack", value = 
"rack1"),
+                @ClusterConfigProperty(id = 2, key = "broker.rack", value = 
"rack2"),
+                @ClusterConfigProperty(key = 
GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, value = 
"org.apache.kafka.clients.consumer.RackAwareAssignor"),
+                @ClusterConfigProperty(key = 
GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, value = 
"0")
+            }
+        ),
+        @ClusterTest(
+            types = {Type.KRAFT},
+            brokers = 3,
+            serverProperties = {
+                @ClusterConfigProperty(id = 0, key = "broker.rack", value = 
"rack0"),
+                @ClusterConfigProperty(id = 1, key = "broker.rack", value = 
"rack1"),
+                @ClusterConfigProperty(id = 2, key = "broker.rack", value = 
"rack2"),
+                @ClusterConfigProperty(key = 
GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, value = 
"org.apache.kafka.clients.consumer.RackAwareAssignor"),
+                @ClusterConfigProperty(key = 
GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, value = 
"1000")
+            }
+        )
+    })
     public void testRackAwareAssignment(ClusterInstance clusterInstance) 
throws ExecutionException, InterruptedException {
         String topic = "test-topic";
         try (Admin admin = clusterInstance.admin();
diff --git 
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerSubscriptionTest.java
 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerSubscriptionTest.java
index ff840ebf2ea..5dbe2248eb8 100644
--- 
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerSubscriptionTest.java
+++ 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerSubscriptionTest.java
@@ -25,6 +25,7 @@ import org.apache.kafka.common.test.ClusterInstance;
 import org.apache.kafka.common.test.api.ClusterConfigProperty;
 import org.apache.kafka.common.test.api.ClusterTest;
 import org.apache.kafka.common.test.api.ClusterTestDefaults;
+import org.apache.kafka.common.test.api.ClusterTests;
 import org.apache.kafka.common.test.api.Type;
 import org.apache.kafka.test.TestUtils;
 
@@ -49,6 +50,7 @@ import static 
org.apache.kafka.clients.CommonClientConfigs.METADATA_MAX_AGE_CONF
 import static 
org.apache.kafka.clients.consumer.ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG;
 import static 
org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_PROTOCOL_CONFIG;
 import static 
org.apache.kafka.clients.consumer.ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG;
 import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG;
 import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG;
 import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG;
@@ -88,7 +90,14 @@ public class PlaintextConsumerSubscriptionTest {
         testPatternSubscription(GroupProtocol.CLASSIC);
     }
 
-    @ClusterTest
+    @ClusterTests({
+        @ClusterTest(serverProperties = {
+            @ClusterConfigProperty(key = 
CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, value = "0")
+        }),
+        @ClusterTest(serverProperties = {
+            @ClusterConfigProperty(key = 
CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, value = "1000")
+        })
+    })
     public void testAsyncConsumerPatternSubscription() throws 
InterruptedException {
         testPatternSubscription(GroupProtocol.CONSUMER);
     }
@@ -161,7 +170,14 @@ public class PlaintextConsumerSubscriptionTest {
         testSubsequentPatternSubscription(GroupProtocol.CLASSIC);
     }
 
-    @ClusterTest
+    @ClusterTests({
+        @ClusterTest(serverProperties = {
+            @ClusterConfigProperty(key = 
CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, value = "0")
+        }),
+        @ClusterTest(serverProperties = {
+            @ClusterConfigProperty(key = 
CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, value = "1000")
+        })
+    })
     public void testAsyncConsumerSubsequentPatternSubscription() throws 
InterruptedException {
         testSubsequentPatternSubscription(GroupProtocol.CONSUMER);
     }
@@ -231,7 +247,14 @@ public class PlaintextConsumerSubscriptionTest {
         testPatternUnsubscription(GroupProtocol.CLASSIC);
     }
 
-    @ClusterTest
+    @ClusterTests({
+        @ClusterTest(serverProperties = {
+            @ClusterConfigProperty(key = 
CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, value = "0")
+        }),
+        @ClusterTest(serverProperties = {
+            @ClusterConfigProperty(key = 
CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, value = "1000")
+        })
+    })
     public void testAsyncConsumerPatternUnsubscription() throws 
InterruptedException {
         testPatternUnsubscription(GroupProtocol.CONSUMER);
     }
@@ -273,7 +296,14 @@ public class PlaintextConsumerSubscriptionTest {
         }
     }
 
-    @ClusterTest
+    @ClusterTests({
+        @ClusterTest(serverProperties = {
+            @ClusterConfigProperty(key = 
CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, value = "0")
+        }),
+        @ClusterTest(serverProperties = {
+            @ClusterConfigProperty(key = 
CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, value = "1000")
+        })
+    })
     public void testAsyncConsumerRe2JPatternSubscription() throws 
InterruptedException {
         Map<String, Object> config = Map.of(GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT));
         try (Consumer<byte[], byte[]> consumer = cluster.consumer(config)) {
@@ -339,7 +369,14 @@ public class PlaintextConsumerSubscriptionTest {
         }
     }
 
-    @ClusterTest
+    @ClusterTests({
+        @ClusterTest(serverProperties = {
+            @ClusterConfigProperty(key = 
CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, value = "0")
+        }),
+        @ClusterTest(serverProperties = {
+            @ClusterConfigProperty(key = 
CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, value = "1000")
+        })
+    })
     public void testAsyncConsumerRe2JPatternExpandSubscription() throws 
InterruptedException {
         Map<String, Object> config = Map.of(GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT));
         try (Consumer<byte[], byte[]> consumer = cluster.consumer(config)) {
@@ -417,7 +454,14 @@ public class PlaintextConsumerSubscriptionTest {
         }
     }
 
-    @ClusterTest
+    @ClusterTests({
+        @ClusterTest(serverProperties = {
+            @ClusterConfigProperty(key = 
CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, value = "0")
+        }),
+        @ClusterTest(serverProperties = {
+            @ClusterConfigProperty(key = 
CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, value = "1000")
+        })
+    })
     public void testRe2JPatternSubscriptionAndTopicSubscription() throws 
InterruptedException {
         Map<String, Object> config = Map.of(GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT));
         try (Consumer<byte[], byte[]> consumer = cluster.consumer(config)) {
@@ -481,7 +525,14 @@ public class PlaintextConsumerSubscriptionTest {
         testExpandingTopicSubscriptions(GroupProtocol.CLASSIC);
     }
 
-    @ClusterTest
+    @ClusterTests({
+        @ClusterTest(serverProperties = {
+            @ClusterConfigProperty(key = 
CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, value = "0")
+        }),
+        @ClusterTest(serverProperties = {
+            @ClusterConfigProperty(key = 
CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, value = "1000")
+        })
+    })
     public void testAsyncConsumerExpandingTopicSubscriptions() throws 
InterruptedException {
         testExpandingTopicSubscriptions(GroupProtocol.CONSUMER);
     }
@@ -514,7 +565,14 @@ public class PlaintextConsumerSubscriptionTest {
         testShrinkingTopicSubscriptions(GroupProtocol.CLASSIC);
     }
 
-    @ClusterTest
+    @ClusterTests({
+        @ClusterTest(serverProperties = {
+            @ClusterConfigProperty(key = 
CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, value = "0")
+        }),
+        @ClusterTest(serverProperties = {
+            @ClusterConfigProperty(key = 
CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, value = "1000")
+        })
+    })
     public void testAsyncConsumerShrinkingTopicSubscriptions() throws 
InterruptedException {
         testShrinkingTopicSubscriptions(GroupProtocol.CONSUMER);
     }
@@ -552,7 +610,14 @@ public class PlaintextConsumerSubscriptionTest {
         ));
     }
 
-    @ClusterTest
+    @ClusterTests({
+        @ClusterTest(serverProperties = {
+            @ClusterConfigProperty(key = 
CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, value = "0")
+        }),
+        @ClusterTest(serverProperties = {
+            @ClusterConfigProperty(key = 
CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, value = "1000")
+        })
+    })
     public void testAsyncConsumerUnsubscribeTopic() throws 
InterruptedException {
         testUnsubscribeTopic(Map.of(GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT)));
     }
diff --git 
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerTest.java
 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerTest.java
index 4dd432859e9..ec9704c71ee 100644
--- 
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerTest.java
+++ 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerTest.java
@@ -45,6 +45,7 @@ import org.apache.kafka.common.test.ClusterInstance;
 import org.apache.kafka.common.test.api.ClusterConfigProperty;
 import org.apache.kafka.common.test.api.ClusterTest;
 import org.apache.kafka.common.test.api.ClusterTestDefaults;
+import org.apache.kafka.common.test.api.ClusterTests;
 import org.apache.kafka.common.test.api.Flaky;
 import org.apache.kafka.common.test.api.Type;
 import org.apache.kafka.server.quota.QuotaType;
@@ -101,12 +102,14 @@ import static 
org.apache.kafka.clients.producer.ProducerConfig.COMPRESSION_TYPE_
 import static 
org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
 import static 
org.apache.kafka.clients.producer.ProducerConfig.LINGER_MS_CONFIG;
 import static 
org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG;
 import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG;
 import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG;
 import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG;
 import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG;
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -360,7 +363,14 @@ public class PlaintextConsumerTest {
         ));
     }
 
-    @ClusterTest
+    @ClusterTests({
+        @ClusterTest(serverProperties = {
+            @ClusterConfigProperty(key = 
CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, value = "0")
+        }),
+        @ClusterTest(serverProperties = {
+            @ClusterConfigProperty(key = 
CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, value = "1000")
+        })
+    })
     public void testAsyncConsumerGroupConsumption() throws Exception {
         testGroupConsumption(Map.of(
             GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT)
@@ -378,6 +388,69 @@ public class PlaintextConsumerTest {
         }
     }
 
+    @ClusterTest
+    public void testClassicConsumerGroupConsumptionWithTwoMembers() throws 
InterruptedException {
+        testGroupConsumptionWithTwoMembers(Map.of(
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT)
+        ));
+    }
+
+    @ClusterTests({
+        @ClusterTest(serverProperties = {
+            @ClusterConfigProperty(key = 
CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, value = "0")
+        }),
+        @ClusterTest(serverProperties = {
+            @ClusterConfigProperty(key = 
CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, value = "1000")
+        })
+    })
+    public void testAsyncConsumerGroupConsumptionWithTwoMembers() throws 
InterruptedException {
+        testGroupConsumptionWithTwoMembers(Map.of(
+            GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT)
+        ));
+    }
+
+    private void testGroupConsumptionWithTwoMembers(Map<String, Object> 
consumerConfig) throws InterruptedException {
+        var fooTopic = "foo";
+        var foo0 = new TopicPartition(fooTopic, 0);
+        var foo1 = new TopicPartition(fooTopic, 1);
+        cluster.createTopic(fooTopic, 2, (short) BROKER_COUNT);
+
+        consumerConfig = new HashMap<>(consumerConfig);
+        consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, 
"group_two_members");
+
+        try (Producer<byte[], byte[]> producer = cluster.producer();
+             Consumer<byte[], byte[]> consumer1 = 
cluster.consumer(consumerConfig);
+             Consumer<byte[], byte[]> consumer2 = 
cluster.consumer(consumerConfig)
+        ) {
+            var startingTimestamp = System.currentTimeMillis();
+
+            consumer1.subscribe(List.of(fooTopic));
+            awaitAssignment(consumer1, Set.of(foo0, foo1));
+
+            sendRecords(producer, foo0, 10, startingTimestamp);
+            consumeAndVerifyRecords(consumer1, foo0, 10, 0, 0, 
startingTimestamp);
+
+            sendRecords(producer, foo1, 10, startingTimestamp);
+            consumeAndVerifyRecords(consumer1, foo1, 10, 0, 0, 
startingTimestamp);
+
+            consumer2.subscribe(List.of(fooTopic));
+            TestUtils.waitForCondition(() -> {
+                consumer1.poll(Duration.ofMillis(100));
+                consumer2.poll(Duration.ofMillis(100));
+                return consumer1.assignment().size() == 1 && 
consumer2.assignment().size() == 1;
+            }, "Timed out waiting for rebalance to complete");
+
+            assertTrue(consumer1.assignment().contains(foo0) || 
consumer1.assignment().contains(foo1));
+            assertTrue(consumer2.assignment().contains(foo0) || 
consumer2.assignment().contains(foo1));
+            assertNotEquals(consumer1.assignment(), consumer2.assignment());
+
+            sendRecords(producer, foo0, 10, startingTimestamp);
+            sendRecords(producer, foo1, 10, startingTimestamp);
+            consumeAndVerifyRecords(consumer1, 
consumer1.assignment().iterator().next(), 10, 10, 0, startingTimestamp);
+            consumeAndVerifyRecords(consumer2, 
consumer2.assignment().iterator().next(), 10, 10, 0, startingTimestamp);
+        }
+    }
+
     @ClusterTest
     public void testClassicConsumerPartitionsFor() throws Exception {
         testPartitionsFor(Map.of(
@@ -1322,7 +1395,14 @@ public class PlaintextConsumerTest {
         ));
     }
 
-    @ClusterTest
+    @ClusterTests({
+        @ClusterTest(serverProperties = {
+            @ClusterConfigProperty(key = 
CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, value = "0")
+        }),
+        @ClusterTest(serverProperties = {
+            @ClusterConfigProperty(key = 
CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, value = "1000")
+        })
+    })
     public void 
testAsyncConsumerStaticConsumerDetectsNewPartitionCreatedAfterRestart() throws 
Exception {
         testStaticConsumerDetectsNewPartitionCreatedAfterRestart(Map.of(
             GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT),
diff --git 
a/core/src/test/scala/integration/kafka/api/AbstractAuthorizerIntegrationTest.scala
 
b/core/src/test/scala/integration/kafka/api/AbstractAuthorizerIntegrationTest.scala
index fafce17382c..1ba21895c10 100644
--- 
a/core/src/test/scala/integration/kafka/api/AbstractAuthorizerIntegrationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/AbstractAuthorizerIntegrationTest.scala
@@ -115,6 +115,7 @@ class AbstractAuthorizerIntegrationTest extends 
BaseRequestTest {
 
     properties.put(GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, "1")
     
properties.put(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, 
"1")
+    
properties.put(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG,
 "0")
     
properties.put(GroupCoordinatorConfig.CONSUMER_GROUP_REGEX_REFRESH_INTERVAL_MS_CONFIG,
 "10000")
     properties.put(TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, 
"1")
     
properties.put(TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG,
 "1")
diff --git 
a/core/src/test/scala/unit/kafka/server/ConsumerGroupDescribeRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/ConsumerGroupDescribeRequestTest.scala
index 156969f4ece..018fe853973 100644
--- 
a/core/src/test/scala/unit/kafka/server/ConsumerGroupDescribeRequestTest.scala
+++ 
b/core/src/test/scala/unit/kafka/server/ConsumerGroupDescribeRequestTest.scala
@@ -42,7 +42,8 @@ import scala.jdk.CollectionConverters._
   brokers = 1,
   serverProperties = Array(
     new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
-    new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
+    new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
+    new ClusterConfigProperty(key = 
GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, value = 
"0")
   )
 )
 class ConsumerGroupDescribeRequestTest(cluster: ClusterInstance) extends 
GroupCoordinatorBaseRequestTest(cluster) {
diff --git 
a/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala
index a5e4eb42dca..20ac6d42347 100644
--- 
a/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala
+++ 
b/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala
@@ -33,6 +33,19 @@ import org.junit.jupiter.api.Assertions.{assertEquals, 
assertFalse, assertNotEqu
 import scala.collection.Map
 import scala.jdk.CollectionConverters._
 
+object ConsumerGroupHeartbeatRequestTest {
+  @ClusterTestDefaults(
+    types = Array(Type.KRAFT),
+    serverProperties = Array(
+      new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
+      new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
+      new ClusterConfigProperty(key = 
GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, value = 
"0")
+    )
+  )
+  class WithAssignmentBatchingDisabledTest(cluster: ClusterInstance) extends 
ConsumerGroupHeartbeatRequestTest(cluster) {
+  }
+}
+
 @ClusterTestDefaults(
   types = Array(Type.KRAFT),
   serverProperties = Array(
@@ -42,6 +55,10 @@ import scala.jdk.CollectionConverters._
 )
 class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) extends 
GroupCoordinatorBaseRequestTest(cluster) {
 
+  protected def isConsumerAssignmentBatchingEnabled: Boolean = {
+    cluster.brokers.values.stream.allMatch(b => 
b.config.groupCoordinatorConfig.consumerGroupAssignmentIntervalMs > 0)
+  }
+
   @ClusterTest(
     serverProperties = Array(
       new ClusterConfigProperty(key = 
GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = 
"classic")
@@ -1098,4 +1115,162 @@ class ConsumerGroupHeartbeatRequestTest(cluster: 
ClusterInstance) extends GroupC
     // Verify duplicate produces same response.
     assertEquals(expectedFirstResponse1, duplicateResponse1.data)
   }
+
+  @ClusterTest
+  def testConsumerGroupHeartbeatRebalance(): Unit = {
+    createOffsetsTopic()
+
+    val memberId1 = Uuid.randomUuid().toString
+    val memberId2 = Uuid.randomUuid().toString
+    val groupId = "test-rebalance-grp"
+
+    // Create topic.
+    val topicId = createTopic(topic = "foo", numPartitions = 2)
+
+    // Member 1 joins and gets all partitions.
+    val request1 = new ConsumerGroupHeartbeatRequest.Builder(
+      new ConsumerGroupHeartbeatRequestData()
+        .setGroupId(groupId)
+        .setMemberId(memberId1)
+        .setMemberEpoch(0)
+        .setRebalanceTimeoutMs(5 * 60 * 1000)
+        .setSubscribedTopicNames(List("foo").asJava)
+        .setTopicPartitions(List.empty.asJava)
+    ).build()
+
+    var response1: ConsumerGroupHeartbeatResponse = null
+    val allPartitions = new ConsumerGroupHeartbeatResponseData.Assignment()
+      .setTopicPartitions(List(new 
ConsumerGroupHeartbeatResponseData.TopicPartitions()
+        .setTopicId(topicId)
+        .setPartitions(List[Integer](0, 1).asJava)).asJava)
+
+    TestUtils.waitUntilTrue(() => {
+      response1 = connectAndReceive[ConsumerGroupHeartbeatResponse](request1)
+      response1.data.errorCode == Errors.NONE.code &&
+        response1.data.assignment == allPartitions
+    }, msg = s"Member 1 could not get assignment. Last response $response1.")
+
+    // Expected assignment.
+    val expectedAssignment1 = new 
ConsumerGroupHeartbeatResponseData.Assignment()
+      .setTopicPartitions(List(new 
ConsumerGroupHeartbeatResponseData.TopicPartitions()
+        .setTopicId(topicId)
+        .setPartitions(List[Integer](0, 1).asJava)).asJava)
+
+    val expectedResponse1 = new ConsumerGroupHeartbeatResponseData()
+      .setErrorCode(Errors.NONE.code)
+      .setMemberId(memberId1)
+      .setMemberEpoch(2)
+      .setHeartbeatIntervalMs(response1.data.heartbeatIntervalMs)
+      .setAssignment(expectedAssignment1)
+    assertEquals(expectedResponse1, response1.data)
+
+    val member1InitialEpoch = response1.data.memberEpoch
+
+    // Member 2 joins, triggering rebalance.
+    val request2 = new ConsumerGroupHeartbeatRequest.Builder(
+      new ConsumerGroupHeartbeatRequestData()
+        .setGroupId(groupId)
+        .setMemberId(memberId2)
+        .setMemberEpoch(0)
+        .setRebalanceTimeoutMs(5 * 60 * 1000)
+        .setSubscribedTopicNames(List("foo").asJava)
+        .setTopicPartitions(List.empty.asJava)
+    ).build()
+
+    var response2: ConsumerGroupHeartbeatResponse = null
+    TestUtils.waitUntilTrue(() => {
+      response2 = connectAndReceive[ConsumerGroupHeartbeatResponse](request2)
+      response2.data.errorCode == Errors.NONE.code
+    }, msg = s"Member 2 could not join. Last response $response2.")
+
+    // Expected assignment.
+    val expectedAssignment2 = new 
ConsumerGroupHeartbeatResponseData.Assignment()
+      .setTopicPartitions(List.empty.asJava)
+
+    val expectedResponse2 = new ConsumerGroupHeartbeatResponseData()
+      .setErrorCode(Errors.NONE.code)
+      .setMemberId(memberId2)
+      .setMemberEpoch(if (isConsumerAssignmentBatchingEnabled) 
member1InitialEpoch else member1InitialEpoch + 1)
+      .setHeartbeatIntervalMs(response2.data.heartbeatIntervalMs)
+      .setAssignment(expectedAssignment2)
+    assertEquals(expectedResponse2, response2.data)
+
+    val member2InitialEpoch = response2.data.memberEpoch
+
+    // Member 1 heartbeats and sees revoked partition.
+    val request3 = new ConsumerGroupHeartbeatRequest.Builder(
+      new ConsumerGroupHeartbeatRequestData()
+        .setGroupId(groupId)
+        .setMemberId(memberId1)
+        .setMemberEpoch(member1InitialEpoch)
+    ).build()
+
+    var response3: ConsumerGroupHeartbeatResponse = null
+    if (isConsumerAssignmentBatchingEnabled) {
+      TestUtils.waitUntilTrue(() => {
+        response3 = connectAndReceive[ConsumerGroupHeartbeatResponse](request3)
+        response3.data.errorCode == Errors.NONE.code &&
+          response3.data.assignment != null
+      }, msg = s"Member 1 did not get new assignment before timeout.")
+    } else {
+      response3 = connectAndReceive[ConsumerGroupHeartbeatResponse](request3)
+      assertEquals(Errors.NONE.code, response3.data.errorCode)
+    }
+
+    // Expected assignment.
+    val expectedAssignment3 = new 
ConsumerGroupHeartbeatResponseData.Assignment()
+      .setTopicPartitions(List(new 
ConsumerGroupHeartbeatResponseData.TopicPartitions()
+        .setTopicId(topicId)
+        .setPartitions(List[Integer](0).asJava)).asJava)
+
+    val expectedResponse3 = new ConsumerGroupHeartbeatResponseData()
+      .setErrorCode(Errors.NONE.code)
+      .setMemberId(memberId1)
+      .setMemberEpoch(member1InitialEpoch)
+      .setHeartbeatIntervalMs(response3.data.heartbeatIntervalMs)
+      .setAssignment(expectedAssignment3)
+    assertEquals(expectedResponse3, response3.data)
+
+    // Member 1 sends heartbeat acknowledging revocation (only reporting 
partition 0).
+    val ackRequest1 = new ConsumerGroupHeartbeatRequest.Builder(
+      new ConsumerGroupHeartbeatRequestData()
+        .setGroupId(groupId)
+        .setMemberId(memberId1)
+        .setMemberEpoch(member1InitialEpoch)
+        .setRebalanceTimeoutMs(5 * 60 * 1000)
+        .setSubscribedTopicNames(List("foo").asJava)
+        .setTopicPartitions(List(new 
ConsumerGroupHeartbeatRequestData.TopicPartitions()
+          .setTopicId(topicId)
+          .setPartitions(List[Integer](0).asJava)).asJava)
+    ).build()
+
+    val ackResponse1 = 
connectAndReceive[ConsumerGroupHeartbeatResponse](ackRequest1)
+    assertEquals(Errors.NONE.code, ackResponse1.data.errorCode)
+
+    val member1NewEpoch = ackResponse1.data.memberEpoch
+
+    // Member 2 heartbeats and is assigned new partition.
+    val request4 = new ConsumerGroupHeartbeatRequest.Builder(
+      new ConsumerGroupHeartbeatRequestData()
+        .setGroupId(groupId)
+        .setMemberId(memberId2)
+        .setMemberEpoch(member2InitialEpoch)
+    ).build()
+
+    val response4 = connectAndReceive[ConsumerGroupHeartbeatResponse](request4)
+    assertEquals(Errors.NONE.code, response3.data.errorCode)
+
+    val expectedAssignment4 = new 
ConsumerGroupHeartbeatResponseData.Assignment()
+      .setTopicPartitions(List(new 
ConsumerGroupHeartbeatResponseData.TopicPartitions()
+        .setTopicId(topicId)
+        .setPartitions(List[Integer](1).asJava)).asJava)
+
+    val expectedResponse4 = new ConsumerGroupHeartbeatResponseData()
+      .setErrorCode(Errors.NONE.code)
+      .setMemberId(memberId2)
+      .setMemberEpoch(member1NewEpoch)
+      .setHeartbeatIntervalMs(response4.data.heartbeatIntervalMs)
+      .setAssignment(expectedAssignment4)
+    assertEquals(expectedResponse4, response4.data)
+  }
 }
diff --git 
a/core/src/test/scala/unit/kafka/server/ConsumerProtocolMigrationTest.scala 
b/core/src/test/scala/unit/kafka/server/ConsumerProtocolMigrationTest.scala
index 3e86e605d2d..4e2606fd107 100644
--- a/core/src/test/scala/unit/kafka/server/ConsumerProtocolMigrationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ConsumerProtocolMigrationTest.scala
@@ -19,20 +19,34 @@ package kafka.server
 import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor
 import org.apache.kafka.clients.consumer.internals.ConsumerProtocol
 import org.apache.kafka.common.{TopicPartition, Uuid}
-import org.apache.kafka.common.message.{JoinGroupResponseData, 
ListGroupsResponseData, OffsetFetchRequestData, OffsetFetchResponseData, 
SyncGroupResponseData}
+import org.apache.kafka.common.message.{ConsumerGroupHeartbeatResponseData, 
JoinGroupResponseData, ListGroupsResponseData, OffsetFetchRequestData, 
OffsetFetchResponseData, SyncGroupResponseData}
 import org.apache.kafka.common.test.api.{ClusterConfigProperty, ClusterTest, 
ClusterTestDefaults, Type}
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.test.ClusterInstance
 import org.apache.kafka.coordinator.group.{Group, GroupCoordinatorConfig}
 import org.apache.kafka.coordinator.group.classic.ClassicGroupState
 import 
org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroup.ConsumerGroupState
-import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
 import org.junit.jupiter.api.Timeout
 
 import java.nio.ByteBuffer
 import java.util.Collections
 import scala.jdk.CollectionConverters._
 
+
+object ConsumerProtocolMigrationTest {
+  @ClusterTestDefaults(
+    types = Array(Type.KRAFT),
+    serverProperties = Array(
+      new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
+      new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
+      new ClusterConfigProperty(key = 
GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, value = 
"0")
+    )
+  )
+  class WithAssignmentBatchingDisabledTest(cluster: ClusterInstance) extends 
ConsumerProtocolMigrationTest(cluster) {
+  }
+}
+
 @Timeout(120)
 @ClusterTestDefaults(
   types = Array(Type.KRAFT),
@@ -42,6 +56,11 @@ import scala.jdk.CollectionConverters._
   )
 )
 class ConsumerProtocolMigrationTest(cluster: ClusterInstance) extends 
GroupCoordinatorBaseRequestTest(cluster) {
+
+  protected def isConsumerAssignmentBatchingEnabled: Boolean = {
+    cluster.brokers.values.stream.allMatch(b => 
b.config.groupCoordinatorConfig.consumerGroupAssignmentIntervalMs > 0)
+  }
+
   @ClusterTest(
     serverProperties = Array(
       new ClusterConfigProperty(key = 
GroupCoordinatorConfig.CONSUMER_GROUP_MIGRATION_POLICY_CONFIG, value = 
"bidirectional")
@@ -299,35 +318,10 @@ class ConsumerProtocolMigrationTest(cluster: 
ClusterInstance) extends GroupCoord
       syncGroupWithOldProtocol(
         groupId = groupId,
         memberId = memberId2,
-        generationId = 3
+        generationId = if (isConsumerAssignmentBatchingEnabled) 2 else 3
       )
     )
 
-    // Member 2 heartbeats.
-    heartbeat(
-      groupId = groupId,
-      generationId = 3,
-      memberId = memberId2
-    )
-
-    // Member 1 heartbeats to revoke partitions.
-    consumerGroupHeartbeat(
-      groupId = groupId,
-      memberId = memberId1,
-      rebalanceTimeoutMs = 5 * 60 * 1000,
-      subscribedTopicNames = List("foo"),
-      topicPartitions = List.empty,
-      expectedError = Errors.NONE
-    )
-
-    // Member 2 heartbeats and gets REBALANCE_IN_PROGRESS.
-    heartbeat(
-      groupId = groupId,
-      generationId = 3,
-      memberId = memberId2,
-      expectedError = Errors.REBALANCE_IN_PROGRESS
-    )
-
     // Downgrade the group by leaving member 1.
     leaveGroupWithNewProtocol(
       groupId = groupId,
@@ -793,15 +787,25 @@ class ConsumerProtocolMigrationTest(cluster: 
ClusterInstance) extends GroupCoord
     }
 
     // The joining request with a consumer group member 2 is accepted.
-    val memberId2 = consumerGroupHeartbeat(
-      groupId = groupId,
-      memberId = Uuid.randomUuid.toString,
-      instanceId = if (useStaticMembers) instanceId2 else null,
-      rebalanceTimeoutMs = 5 * 60 * 1000,
-      subscribedTopicNames = List("foo"),
-      topicPartitions = List.empty,
-      expectedError = Errors.NONE
-    ).memberId
+    val memberId2 = Uuid.randomUuid.toString
+    assertEquals(
+      new ConsumerGroupHeartbeatResponseData()
+        .setErrorCode(Errors.NONE.code)
+        .setMemberId(memberId2)
+        .setMemberEpoch(2)
+        .setHeartbeatIntervalMs(5000)
+        .setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment()
+          .setTopicPartitions(List.empty.asJava)),
+      consumerGroupHeartbeat(
+        groupId = groupId,
+        memberId = memberId2,
+        instanceId = if (useStaticMembers) instanceId2 else null,
+        rebalanceTimeoutMs = 5 * 60 * 1000,
+        subscribedTopicNames = List("foo"),
+        topicPartitions = List.empty,
+        expectedError = Errors.NONE
+      )
+    )
 
     // The group has become a consumer group.
     assertEquals(
@@ -965,6 +969,7 @@ class ConsumerProtocolMigrationTest(cluster: 
ClusterInstance) extends GroupCoord
       topicPartitions = List.empty,
       expectedError = Errors.NONE
     ).assignment.topicPartitions.get(0).partitions
+    assertTrue(partitionsOfMember2.size < 3)
 
     // The group has been stabilized.
     assertEquals(
@@ -1041,6 +1046,18 @@ class ConsumerProtocolMigrationTest(cluster: 
ClusterInstance) extends GroupCoord
       expectedAssignment = assignment(List(0, 1, 
2).filter(!partitionsOfMember2.contains(_)))
     )
 
+    if (isConsumerAssignmentBatchingEnabled) {
+      // Member 2 changes subscription and the assignment becomes out of date.
+      val memberEpoch2 = consumerGroupHeartbeat(
+        groupId = groupId,
+        memberId = memberId2,
+        memberEpoch = 2,
+        subscribedTopicNames = List("foo", "bar"),
+        expectedError = Errors.NONE
+      ).memberEpoch
+      assertEquals(2, memberEpoch2)
+    }
+
     if (!useStaticMembers) {
       // Downgrade the group by leaving member 2.
       leaveGroupWithNewProtocol(
@@ -1060,15 +1077,16 @@ class ConsumerProtocolMigrationTest(cluster: 
ClusterInstance) extends GroupCoord
     }
 
     // The group has become a classic group.
-    // If the downgrade is triggered by the static member replacement,
-    // the group should remain STABLE, otherwise, a rebalance is triggered.
+    // If the downgrade is triggered by the static member replacement and the
+    // assignment is up to date, the group should remain STABLE, otherwise,
+    // a rebalance is triggered.
     assertEquals(
       List(
         new ListGroupsResponseData.ListedGroup()
           .setGroupId(groupId)
           .setProtocolType("consumer")
           .setGroupState(
-            if (useStaticMembers)
+            if (useStaticMembers && !isConsumerAssignmentBatchingEnabled)
               ClassicGroupState.STABLE.toString
             else
               ClassicGroupState.PREPARING_REBALANCE.toString
@@ -1128,15 +1146,25 @@ class ConsumerProtocolMigrationTest(cluster: 
ClusterInstance) extends GroupCoord
     }
 
     // The joining request with a consumer group member 2 is accepted.
-    val memberId2 = consumerGroupHeartbeat(
-      groupId = groupId,
-      memberId = Uuid.randomUuid.toString,
-      instanceId = if (useStaticMembers) instanceId2 else null,
-      rebalanceTimeoutMs = 5 * 60 * 1000,
-      subscribedTopicNames = List("foo"),
-      topicPartitions = List.empty,
-      expectedError = Errors.NONE
-    ).memberId
+    val memberId2 = Uuid.randomUuid.toString
+    assertEquals(
+      new ConsumerGroupHeartbeatResponseData()
+        .setErrorCode(Errors.NONE.code)
+        .setMemberId(memberId2)
+        .setMemberEpoch(2)
+        .setHeartbeatIntervalMs(5000)
+        .setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment()
+          .setTopicPartitions(List.empty.asJava)),
+      consumerGroupHeartbeat(
+        groupId = groupId,
+        memberId = memberId2,
+        instanceId = if (useStaticMembers) instanceId2 else null,
+        rebalanceTimeoutMs = 5 * 60 * 1000,
+        subscribedTopicNames = List("foo"),
+        topicPartitions = List.empty,
+        expectedError = Errors.NONE
+      )
+    )
 
     // The group has become a consumer group.
     assertEquals(
@@ -1239,6 +1267,7 @@ class ConsumerProtocolMigrationTest(cluster: 
ClusterInstance) extends GroupCoord
         generationId = 1
       ).assignment()
     )).partitions()
+    assertTrue(partitionsOfMember1.size < 3)
 
     // Member 1 heartbeats and gets REBALANCE_IN_PROGRESS.
     heartbeat(
@@ -1271,7 +1300,7 @@ class ConsumerProtocolMigrationTest(cluster: 
ClusterInstance) extends GroupCoord
     )
 
     // Member 2 rejoins to retrieve partitions pending assignment.
-    consumerGroupHeartbeat(
+    val partitionsOfMember2 = consumerGroupHeartbeat(
       groupId = groupId,
       memberId = memberId2,
       memberEpoch = 2,
@@ -1279,7 +1308,9 @@ class ConsumerProtocolMigrationTest(cluster: 
ClusterInstance) extends GroupCoord
       subscribedTopicNames = List("foo"),
       topicPartitions = List.empty,
       expectedError = Errors.NONE
-    )
+    ).assignment.topicPartitions.get(0).partitions
+    assertEquals(3, partitionsOfMember1.size + partitionsOfMember2.size)
+    
assertTrue(partitionsOfMember1.asScala.intersect(partitionsOfMember2.asScala).isEmpty)
 
     // The group has been stabilized.
     assertEquals(
@@ -1296,6 +1327,18 @@ class ConsumerProtocolMigrationTest(cluster: 
ClusterInstance) extends GroupCoord
       )
     )
 
+    if (isConsumerAssignmentBatchingEnabled) {
+      // Member 2 changes subscription and the assignment becomes out of date.
+      val memberEpoch2 = consumerGroupHeartbeat(
+        groupId = groupId,
+        memberId = memberId2,
+        memberEpoch = 2,
+        subscribedTopicNames = List("foo", "bar"),
+        expectedError = Errors.NONE
+      ).memberEpoch
+      assertEquals(2, memberEpoch2)
+    }
+
     if (!useStaticMembers) {
       // Downgrade the group by leaving member 2.
       leaveGroupWithNewProtocol(
@@ -1315,15 +1358,16 @@ class ConsumerProtocolMigrationTest(cluster: 
ClusterInstance) extends GroupCoord
     }
 
     // The group has become a classic group.
-    // If the downgrade is triggered by the static member replacement,
-    // the group should remain STABLE, otherwise, a rebalance is triggered.
+    // If the downgrade is triggered by the static member replacement and the
+    // assignment is up to date, the group should remain STABLE, otherwise,
+    // a rebalance is triggered.
     assertEquals(
       List(
         new ListGroupsResponseData.ListedGroup()
           .setGroupId(groupId)
           .setProtocolType("consumer")
           .setGroupState(
-            if (useStaticMembers)
+            if (useStaticMembers && !isConsumerAssignmentBatchingEnabled)
               ClassicGroupState.STABLE.toString
             else
               ClassicGroupState.PREPARING_REBALANCE.toString
diff --git a/core/src/test/scala/unit/kafka/server/ListGroupsRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/ListGroupsRequestTest.scala
index 86cf887f2b2..e5df28c3900 100644
--- a/core/src/test/scala/unit/kafka/server/ListGroupsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ListGroupsRequestTest.scala
@@ -30,7 +30,8 @@ import org.junit.jupiter.api.Assertions.assertEquals
   serverProperties = Array(
     new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
     new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
-    new ClusterConfigProperty(key = 
GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, value = "1000")
+    new ClusterConfigProperty(key = 
GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, value = "1000"),
+    new ClusterConfigProperty(key = 
GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, value = 
"0")
   )
 )
 class ListGroupsRequestTest(cluster: ClusterInstance) extends 
GroupCoordinatorBaseRequestTest(cluster) {
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index 364890baae9..0fb6d3ba117 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -3787,6 +3787,39 @@ public class GroupMetadataManager {
         );
     }
 
+    /**
+     * Checks whether the next target assignment can be computed right now.
+     *
+     * @param assignmentTimestampMs The time at which the last target 
assignment calculation finished,
+     *                              or 0 if there is no previous assignment.
+     * @param assignmentIntervalMs  The interval between assignment updates
+     * @param currentTimeMs         The current time in milliseconds.
+     * @return {@code true} if the next target assignment can be computed 
right now, {@code false} otherwise.
+     */
+    // package private for testing
+    static boolean canComputeNextTargetAssignment(
+        long assignmentTimestampMs,
+        int assignmentIntervalMs,
+        long currentTimeMs
+    ) {
+        // The next target assignment can be computed immediately
+        //  * when there is no existing target assignment,
+        //  * or we do not know when the existing target assignment was 
computed.
+        if (assignmentTimestampMs == 0) {
+            return true;
+        }
+
+        // The next target assignment can be computed immediately when the 
assignment interval is
+        // zero. This provides an escape hatch if the wall clock undergoes a 
large backwards
+        // correction.
+        if (assignmentIntervalMs == 0) {
+            return true;
+        }
+
+        // Otherwise, we wait for the assignment interval to elapse.
+        return currentTimeMs >= assignmentTimestampMs + assignmentIntervalMs;
+    }
+
     /**
      * Updates the target assignment according to the updated member and 
subscription metadata.
      *
@@ -3811,6 +3844,15 @@ public class GroupMetadataManager {
             return 
UpdateTargetAssignmentResult.fromLastTargetAssignment(group, updatedMember);
         }
 
+        boolean canComputeNextTargetAssignment = 
canComputeNextTargetAssignment(
+            group.assignmentTimestamp(),
+            consumerGroupAssignmentIntervalMs(group.groupId()),
+            time.milliseconds()
+        );
+        if (!canComputeNextTargetAssignment) {
+            return 
UpdateTargetAssignmentResult.fromLastTargetAssignment(group, updatedMember);
+        }
+
         String preferredServerAssignor = group.computePreferredServerAssignor(
             member,
             updatedMember
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/classic/ClassicGroup.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/classic/ClassicGroup.java
index 9b937a4201b..0b04dcbf512 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/classic/ClassicGroup.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/classic/ClassicGroup.java
@@ -1429,7 +1429,7 @@ public class ClassicGroup implements Group {
             }
             byte[] assignment = 
Utils.toArray(ConsumerProtocol.serializeAssignment(
                 toConsumerProtocolAssignment(
-                    
consumerGroup.targetAssignment().get(memberId).partitions(),
+                    consumerGroup.targetAssignment(memberId).partitions(),
                     image
                 ),
                 ConsumerProtocol.deserializeVersion(
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index fdb47f417f2..3f2c06b2315 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -13690,6 +13690,109 @@ public class GroupMetadataManagerTest {
         assertTrue(classicGroup.isInState(PREPARING_REBALANCE));
     }
 
+    @Test
+    public void 
testLastStaticConsumerProtocolMemberReplacedByClassicProtocolMemberWhenTargetAssignmentIsMissing()
 throws ExecutionException, InterruptedException {
+        String groupId = "group-id";
+        String memberId1 = Uuid.randomUuid().toString();
+        String oldMemberId2 = Uuid.randomUuid().toString();
+        String instanceId = "instance-id";
+
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+        Uuid barTopicId = Uuid.randomUuid();
+        String barTopicName = "bar";
+
+        List<ConsumerGroupMemberMetadataValue.ClassicProtocol> protocols1 = 
List.of(
+            new ConsumerGroupMemberMetadataValue.ClassicProtocol()
+                .setName(NoOpPartitionAssignor.NAME)
+                
.setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription(new 
ConsumerPartitionAssignor.Subscription(
+                    List.of(fooTopicName, barTopicName),
+                    null,
+                    List.of()
+                ))))
+        );
+
+        ConsumerGroupMember member1 = new 
ConsumerGroupMember.Builder(memberId1)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(10)
+            .setPreviousMemberEpoch(10)
+            .setClientId(DEFAULT_CLIENT_ID)
+            .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+            .setSubscribedTopicNames(List.of(fooTopicName, barTopicName))
+            .setServerAssignorName(NoOpPartitionAssignor.NAME)
+            .setRebalanceTimeoutMs(45000)
+            .setClassicMemberMetadata(
+                new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()
+                    .setSessionTimeoutMs(5000)
+                    .setSupportedProtocols(protocols1)
+            )
+            .setAssignedPartitions(toAssignmentWithEpochs(mkAssignment(), 10))
+            .build();
+        ConsumerGroupMember oldMember2 = new 
ConsumerGroupMember.Builder(oldMemberId2)
+            .setInstanceId(instanceId)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(10)
+            .setPreviousMemberEpoch(9)
+            .setClientId(DEFAULT_CLIENT_ID)
+            .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+            .setSubscribedTopicNames(List.of(fooTopicName, barTopicName))
+            .setServerAssignorName(NoOpPartitionAssignor.NAME)
+            .setRebalanceTimeoutMs(45000)
+            .setAssignedPartitions(toAssignmentWithEpochs(mkAssignment(
+                mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5),
+                mkTopicAssignment(barTopicId, 0, 1)), 10))
+            .build();
+
+        CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
+            .addTopic(fooTopicId, fooTopicName, 6)
+            .addTopic(barTopicId, barTopicName, 2)
+            .addRacks()
+            .buildCoordinatorMetadataImage();
+
+        // Consumer group with two members.
+        // Member 1 uses the classic protocol and static member 2 uses the 
consumer protocol.
+        // Member 1 has just joined and does not have an assignment yet.
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_MIGRATION_POLICY_CONFIG, 
ConsumerGroupMigrationPolicy.DOWNGRADE.toString())
+            
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, List.of(new 
NoOpPartitionAssignor()))
+            .withMetadataImage(metadataImage)
+            .withConsumerGroup(new ConsumerGroupBuilder(groupId, 11)
+                .withMember(member1)
+                .withMember(oldMember2)
+                .withAssignment(oldMemberId2, mkAssignment(
+                    mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5),
+                    mkTopicAssignment(barTopicId, 0, 1)))
+                .withAssignmentEpoch(10)
+                .withMetadataHash(computeGroupHash(Map.of(
+                    fooTopicName, computeTopicHash(fooTopicName, 
metadataImage),
+                    barTopicName, computeTopicHash(barTopicName, metadataImage)
+                ))))
+            .build();
+
+        // A new member using classic protocol with the same instance id 
joins, scheduling the downgrade.
+        byte[] protocolsMetadata2 = 
Utils.toArray(ConsumerProtocol.serializeSubscription(new 
ConsumerPartitionAssignor.Subscription(
+            List.of(fooTopicName, barTopicName))));
+        JoinGroupRequestData.JoinGroupRequestProtocolCollection protocols2 =
+            new JoinGroupRequestData.JoinGroupRequestProtocolCollection(1);
+        protocols2.add(new JoinGroupRequestProtocol()
+            .setName(NoOpPartitionAssignor.NAME)
+            .setMetadata(protocolsMetadata2));
+        JoinGroupRequestData joinRequest = new 
GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
+            .withGroupId(groupId)
+            .withMemberId(UNKNOWN_MEMBER_ID)
+            .withGroupInstanceId(instanceId)
+            .withProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
+            .withProtocols(protocols2)
+            .build();
+        GroupMetadataManagerTestContext.JoinResult result = 
context.sendClassicGroupJoin(joinRequest);
+        result.appendFuture.complete(null);
+        result.joinFuture.get();
+
+        // A rebalance is triggered.
+        ClassicGroup classicGroup = 
context.groupMetadataManager.getOrMaybeCreateClassicGroup(groupId, false);
+        assertTrue(classicGroup.isInState(PREPARING_REBALANCE));
+    }
+
     @Test
     public void testJoiningConsumerGroupThrowsExceptionIfGroupOverMaxSize() {
         String groupId = "group-id";
@@ -26673,6 +26776,216 @@ public class GroupMetadataManagerTest {
         test.assertions.run();
     }
 
+    @Test
+    public void 
testCanComputeNextTargetAssignmentWithNoPreviousAssignmentTimestamp() {
+        // The next target assignment can always be computed when there is no 
previous assignment timestamp.
+        assertTrue(GroupMetadataManager.canComputeNextTargetAssignment(0, 
1000, 0));
+    }
+
+    @Test
+    public void 
testCanComputeNextTargetAssignmentWithZeroAssignmentIntervalMs() {
+        // The next target assignment can always be computed when the 
assignment interval is zero.
+        
assertTrue(GroupMetadataManager.canComputeNextTargetAssignment(1000000, 0, 0));
+        
assertTrue(GroupMetadataManager.canComputeNextTargetAssignment(1000000, 0, 
999999));
+        
assertTrue(GroupMetadataManager.canComputeNextTargetAssignment(1000000, 0, 
1000000));
+    }
+
+    @Test
+    public void testCanComputeNextTargetAssignment() {
+        // The next target assignment cannot be computed before the timestamp 
of the end of the previous assignment computation.
+        
assertFalse(GroupMetadataManager.canComputeNextTargetAssignment(1000000, 5000, 
0));
+        
assertFalse(GroupMetadataManager.canComputeNextTargetAssignment(1000000, 5000, 
999999));
+
+        // The next target assignment cannot be computed before the assignment 
interval has elapsed.
+        
assertFalse(GroupMetadataManager.canComputeNextTargetAssignment(1000000, 5000, 
1000000));
+        
assertFalse(GroupMetadataManager.canComputeNextTargetAssignment(1000000, 5000, 
1004999));
+
+        // The next target assignment can be computed after the assignment 
interval has elapsed.
+        
assertTrue(GroupMetadataManager.canComputeNextTargetAssignment(1000000, 5000, 
1005000));
+        
assertTrue(GroupMetadataManager.canComputeNextTargetAssignment(1000000, 5000, 
1007500));
+    }
+
+    @Test
+    public void testConsumerGroupAssignmentInterval() {
+        String groupId = "fooup";
+        String memberId1 = Uuid.randomUuid().toString();
+        String memberId2 = Uuid.randomUuid().toString();
+
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+
+        CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
+            .addTopic(fooTopicId, fooTopicName, 6)
+            .buildCoordinatorMetadataImage();
+
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG,
 5000)
+            
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, 
List.of(assignor))
+            .withMetadataImage(metadataImage)
+            .build();
+
+        // Member 1 joins the group and gets an assignment immediately.
+        assignor.prepareGroupAssignment(new GroupAssignment(Map.of(
+            memberId1, new MemberAssignmentImpl(mkAssignment(
+                mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)
+            ))
+        )));
+        CoordinatorResult<ConsumerGroupHeartbeatResponseData, 
CoordinatorRecord> result1 = context.consumerGroupHeartbeat(
+            new ConsumerGroupHeartbeatRequestData()
+                .setGroupId(groupId)
+                .setMemberId(memberId1)
+                .setMemberEpoch(0)
+                .setRebalanceTimeoutMs(5000)
+                .setServerAssignor("range")
+                .setSubscribedTopicNames(List.of(fooTopicName))
+                .setTopicPartitions(List.of()));
+
+        assertResponseEquals(
+            new ConsumerGroupHeartbeatResponseData()
+                .setMemberId(memberId1)
+                .setMemberEpoch(2)
+                .setHeartbeatIntervalMs(5000)
+                .setAssignment(new 
ConsumerGroupHeartbeatResponseData.Assignment()
+                    .setTopicPartitions(List.of(
+                        new 
ConsumerGroupHeartbeatResponseData.TopicPartitions()
+                            .setTopicId(fooTopicId)
+                            .setPartitions(List.of(0, 1, 2, 3, 4, 5))
+                    ))),
+            result1.response()
+        );
+
+        ConsumerGroupMember expectedMember1 = new 
ConsumerGroupMember.Builder(memberId1)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(2)
+            .setPreviousMemberEpoch(0)
+            .setClientId(DEFAULT_CLIENT_ID)
+            .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+            .setRebalanceTimeoutMs(5000)
+            .setSubscribedTopicNames(List.of(fooTopicName))
+            .setServerAssignorName("range")
+            .setAssignedPartitions(toAssignmentWithEpochs(mkAssignment(
+                mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)), 2))
+            .build();
+
+        assertRecordsEquals(
+            List.of(
+                
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, 
expectedMember1),
+                
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 2, 
computeGroupHash(Map.of(
+                    fooTopicName, computeTopicHash(fooTopicName, metadataImage)
+                ))),
+                
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, 
memberId1, mkAssignment(
+                    mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)
+                )),
+                
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord(groupId,
 2, context.time.milliseconds()),
+                
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, 
expectedMember1)
+            ),
+            result1.records()
+        );
+
+        // Wait until just before the expected delay.
+        context.time.sleep(4995);
+
+        // Member 2 joins the group and gets no assignment.
+        assignor.prepareGroupAssignment(new GroupAssignment(Map.of(
+            memberId1, new MemberAssignmentImpl(mkAssignment(
+                mkTopicAssignment(fooTopicId, 0, 1, 2)
+            )),
+            memberId2, new MemberAssignmentImpl(mkAssignment(
+                mkTopicAssignment(fooTopicId, 3, 4, 5)
+            ))
+        )));
+        CoordinatorResult<ConsumerGroupHeartbeatResponseData, 
CoordinatorRecord> result2 = context.consumerGroupHeartbeat(
+            new ConsumerGroupHeartbeatRequestData()
+                .setGroupId(groupId)
+                .setMemberId(memberId2)
+                .setMemberEpoch(0)
+                .setRebalanceTimeoutMs(5000)
+                .setServerAssignor("range")
+                .setSubscribedTopicNames(List.of(fooTopicName))
+                .setTopicPartitions(List.of()));
+
+        assertResponseEquals(
+            new ConsumerGroupHeartbeatResponseData()
+                .setMemberId(memberId2)
+                .setMemberEpoch(2)
+                .setHeartbeatIntervalMs(5000)
+                .setAssignment(new 
ConsumerGroupHeartbeatResponseData.Assignment()
+                    .setTopicPartitions(List.of())),
+            result2.response()
+        );
+
+        ConsumerGroupMember expectedMember2 = new 
ConsumerGroupMember.Builder(memberId2)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(2)
+            .setPreviousMemberEpoch(0)
+            .setClientId(DEFAULT_CLIENT_ID)
+            .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+            .setRebalanceTimeoutMs(5000)
+            .setSubscribedTopicNames(List.of(fooTopicName))
+            .setServerAssignorName("range")
+            .setAssignedPartitions(toAssignmentWithEpochs(mkAssignment(), 2))
+            .build();
+
+        assertRecordsEquals(
+            List.of(
+                
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, 
expectedMember2),
+                
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 3, 
computeGroupHash(Map.of(
+                    fooTopicName, computeTopicHash(fooTopicName, metadataImage)
+                ))),
+                
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, 
expectedMember2)
+            ),
+            result2.records()
+        );
+
+        // Wait a little more. The next target assignment can be computed now.
+        context.time.sleep(10);
+
+        // The next target assignment is computed.
+        CoordinatorResult<ConsumerGroupHeartbeatResponseData, 
CoordinatorRecord> result3 = context.consumerGroupHeartbeat(
+            new ConsumerGroupHeartbeatRequestData()
+                .setGroupId(groupId)
+                .setMemberId(memberId2)
+                .setMemberEpoch(2));
+
+        assertResponseEquals(
+            new ConsumerGroupHeartbeatResponseData()
+                .setMemberId(memberId2)
+                .setMemberEpoch(3)
+                .setHeartbeatIntervalMs(5000),
+            result3.response()
+        );
+
+        ConsumerGroupMember expectedMember3 = new 
ConsumerGroupMember.Builder(memberId2)
+            .setState(MemberState.UNRELEASED_PARTITIONS)
+            .setMemberEpoch(3)
+            .setPreviousMemberEpoch(2)
+            .setClientId(DEFAULT_CLIENT_ID)
+            .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+            .setRebalanceTimeoutMs(5000)
+            .setSubscribedTopicNames(List.of(fooTopicName))
+            .setServerAssignorName("range")
+            .setAssignedPartitions(toAssignmentWithEpochs(mkAssignment(), 2))
+            .build();
+
+        assertUnorderedRecordsEquals(
+            List.of(
+                List.of(
+                    
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, 
memberId1, mkAssignment(
+                        mkTopicAssignment(fooTopicId, 0, 1, 2)
+                    )),
+                    
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, 
memberId2, mkAssignment(
+                        mkTopicAssignment(fooTopicId, 3, 4, 5)
+                    ))
+                ),
+                
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentMetadataRecord(groupId,
 3, context.time.milliseconds())),
+                
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 expectedMember3))
+            ),
+            result3.records()
+        );
+    }
+
     private static void checkJoinGroupResponse(
         JoinGroupResponseData expectedResponse,
         JoinGroupResponseData actualResponse,
diff --git a/tests/kafkatest/services/kafka/config_property.py 
b/tests/kafkatest/services/kafka/config_property.py
index 702120e1625..2148b3bf3e9 100644
--- a/tests/kafkatest/services/kafka/config_property.py
+++ b/tests/kafkatest/services/kafka/config_property.py
@@ -75,9 +75,13 @@ DELEGATION_TOKEN_SECRET_KEY="delegation.token.secret.key"
 SASL_ENABLED_MECHANISMS="sasl.enabled.mechanisms"
 
 CONSUMER_GROUP_MIGRATION_POLICY = "group.consumer.migration.policy"
+CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS = "group.consumer.assignment.interval.ms"
 
 SHARE_COORDINATOR_STATE_TOPIC_REPLICATION_FACTOR 
="share.coordinator.state.topic.replication.factor"
 SHARE_COORDINATOR_STATE_TOPIC_MIN_ISR = "share.coordinator.state.topic.min.isr"
+SHARE_GROUP_ASSIGNMENT_INTERVAL_MS = "group.share.assignment.interval.ms"
+
+STREAMS_GROUP_ASSIGNMENT_INTERVAL_MS = "group.streams.assignment.interval.ms"
 
 UNSTABLE_API_VERSIONS_ENABLE = "unstable.api.versions.enable"
 UNSTABLE_FEATURE_VERSIONS_ENABLE = "unstable.feature.versions.enable"
diff --git a/tests/kafkatest/services/kafka/kafka.py 
b/tests/kafkatest/services/kafka/kafka.py
index 252ad520f53..be5a5201ce4 100644
--- a/tests/kafkatest/services/kafka/kafka.py
+++ b/tests/kafkatest/services/kafka/kafka.py
@@ -206,7 +206,8 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, 
Service):
                  consumer_group_migration_policy=None,
                  dynamicRaftQuorum=False,
                  use_transactions_v2=False,
-                 use_streams_groups=False
+                 use_streams_groups=False,
+                 enable_assignment_batching=None
                  ):
         """
         :param context: test context
@@ -271,6 +272,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, 
Service):
         :param dynamicRaftQuorum: When true, 
controller_quorum_bootstrap_servers, and bootstraps the first controller using 
the standalone flag
         :param use_transactions_v2: When true, uses transaction.version=2 
which utilizes the new transaction protocol introduced in KIP-890
         :param use_streams_groups: When true, enables the use of streams 
groups introduced in KIP-1071
+        :param enable_assignment_batching: When true, enables assignment 
batching introduced in KIP-1263. If not specified, defaults to True.
         """
 
         self.zk = zk
@@ -296,6 +298,18 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, 
Service):
                 consumer_group_migration_policy = context.globals.get(arg_name)
         self.consumer_group_migration_policy = consumer_group_migration_policy
 
+        # Set enable_assignment_batching based on context and arguments.
+        # If not specified, defaults to true.
+        if enable_assignment_batching is None:
+            arg_name = 'enable_assignment_batching'
+            if context.injected_args is not None:
+                enable_assignment_batching = 
context.injected_args.get(arg_name)
+            if enable_assignment_batching is None:
+                enable_assignment_batching = context.globals.get(arg_name)
+            if enable_assignment_batching is None:
+                enable_assignment_batching = True
+        self.enable_assignment_batching = enable_assignment_batching
+
         if num_nodes < 1:
             raise Exception("Must set a positive number of nodes: %i" % 
num_nodes)
         self.num_nodes_broker_role = 0
@@ -346,7 +360,8 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, 
Service):
                     isolated_kafka=self, 
allow_zk_with_kraft=self.allow_zk_with_kraft,
                     server_prop_overrides=server_prop_overrides, 
dynamicRaftQuorum=self.dynamicRaftQuorum,
                     use_transactions_v2=self.use_transactions_v2,
-                    use_streams_groups=self.use_streams_groups
+                    use_streams_groups=self.use_streams_groups,
+                    enable_assignment_batching=self.enable_assignment_batching
                 )
                 self.controller_quorum = self.isolated_controller_quorum
 
@@ -771,6 +786,14 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, 
Service):
             override_configs[config_property.UNSTABLE_API_VERSIONS_ENABLE] = 
str(True)
             override_configs[config_property.UNSTABLE_FEATURE_VERSIONS_ENABLE] 
= str(True)
 
+        if self.enable_assignment_batching:
+            # Assignment batching is enabled by default in Kafka
+            pass
+        else:
+            
override_configs[config_property.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS] = "0"
+            
override_configs[config_property.SHARE_GROUP_ASSIGNMENT_INTERVAL_MS] = "0"
+            
override_configs[config_property.STREAMS_GROUP_ASSIGNMENT_INTERVAL_MS] = "0"
+
         #update template configs with test override configs
         configs.update(override_configs)
 
diff --git a/tests/kafkatest/tests/client/consumer_protocol_migration_test.py 
b/tests/kafkatest/tests/client/consumer_protocol_migration_test.py
index ce4f0431bb2..bfe5a1eebaa 100644
--- a/tests/kafkatest/tests/client/consumer_protocol_migration_test.py
+++ b/tests/kafkatest/tests/client/consumer_protocol_migration_test.py
@@ -108,24 +108,37 @@ class 
ConsumerProtocolMigrationTest(VerifiableConsumerTest):
         metadata_quorum=[quorum.isolated_kraft],
         consumer_group_migration_policy=["disabled"],
         consumer_version=consumer_versions_supporting_range_assignnor,
-        assignment_strategy=[RANGE]
+        assignment_strategy=[RANGE],
+        enable_assignment_batching=[True]
     )
     @matrix(
         static_membership=[True],
         metadata_quorum=[quorum.isolated_kraft],
         consumer_group_migration_policy=["disabled"],
         consumer_version=consumer_versions_supporting_static_membership,
-        assignment_strategy=[RANGE]
+        assignment_strategy=[RANGE],
+        enable_assignment_batching=[True]
     )
     @matrix(
         static_membership=[True, False],
         metadata_quorum=[quorum.isolated_kraft],
         consumer_group_migration_policy=["disabled"],
         
consumer_version=consumer_versions_supporting_cooperative_sticky_assignor,
-        assignment_strategy=[COOPERATIVE_STICKEY]
+        assignment_strategy=[COOPERATIVE_STICKEY],
+        enable_assignment_batching=[True]
+    )
+    @matrix(
+        static_membership=[True, False],
+        metadata_quorum=[quorum.isolated_kraft],
+        consumer_group_migration_policy=["disabled"],
+        # Test the latest version only without assignment batching.
+        
consumer_version=consumer_versions_supporting_cooperative_sticky_assignor[-1:],
+        assignment_strategy=[RANGE, COOPERATIVE_STICKEY],
+        enable_assignment_batching=[False]
     )
     def test_consumer_offline_migration(self, static_membership, 
metadata_quorum,
-                                        consumer_group_migration_policy, 
consumer_version, assignment_strategy):
+                                        consumer_group_migration_policy, 
consumer_version, assignment_strategy,
+                                        enable_assignment_batching):
         """
         Verify correct consumer behavior when the consumers in the group are 
restarted to perform
         offline upgrade/downgrade.
@@ -178,24 +191,37 @@ class 
ConsumerProtocolMigrationTest(VerifiableConsumerTest):
         metadata_quorum=[quorum.isolated_kraft],
         consumer_group_migration_policy=["bidirectional", "upgrade"],
         consumer_version=consumer_versions_supporting_range_assignnor,
-        assignment_strategy=[RANGE]
+        assignment_strategy=[RANGE],
+        enable_assignment_batching=[True]
     )
     @matrix(
         static_membership=[True],
         metadata_quorum=[quorum.isolated_kraft],
         consumer_group_migration_policy=["bidirectional", "upgrade"],
         consumer_version=consumer_versions_supporting_static_membership,
-        assignment_strategy=[RANGE]
+        assignment_strategy=[RANGE],
+        enable_assignment_batching=[True]
     )
     @matrix(
         static_membership=[True, False],
         metadata_quorum=[quorum.isolated_kraft],
         consumer_group_migration_policy=["bidirectional", "upgrade"],
         
consumer_version=consumer_versions_supporting_cooperative_sticky_assignor,
-        assignment_strategy=[COOPERATIVE_STICKEY]
+        assignment_strategy=[COOPERATIVE_STICKEY],
+        enable_assignment_batching=[True]
+    )
+    @matrix(
+        static_membership=[True, False],
+        metadata_quorum=[quorum.isolated_kraft],
+        consumer_group_migration_policy=["bidirectional", "upgrade"],
+        # Test the latest version only without assignment batching.
+        
consumer_version=consumer_versions_supporting_cooperative_sticky_assignor[-1:],
+        assignment_strategy=[RANGE, COOPERATIVE_STICKEY],
+        enable_assignment_batching=[False]
     )
     def test_consumer_rolling_upgrade(self, static_membership, metadata_quorum,
-                                      consumer_group_migration_policy, 
consumer_version, assignment_strategy):
+                                      consumer_group_migration_policy, 
consumer_version, assignment_strategy,
+                                      enable_assignment_batching):
         """
         Verify correct consumer behavior when the consumers in the group are 
restarted to perform
         online upgrade when the migration policy is set to be UPGRADE.
@@ -239,24 +265,37 @@ class 
ConsumerProtocolMigrationTest(VerifiableConsumerTest):
         metadata_quorum=[quorum.isolated_kraft],
         consumer_group_migration_policy=["downgrade"],
         consumer_version=consumer_versions_supporting_range_assignnor,
-        assignment_strategy=[RANGE]
+        assignment_strategy=[RANGE],
+        enable_assignment_batching=[True]
     )
     @matrix(
         static_membership=[True],
         metadata_quorum=[quorum.isolated_kraft],
         consumer_group_migration_policy=["downgrade"],
         consumer_version=consumer_versions_supporting_static_membership,
-        assignment_strategy=[RANGE]
+        assignment_strategy=[RANGE],
+        enable_assignment_batching=[True]
     )
     @matrix(
         static_membership=[True, False],
         metadata_quorum=[quorum.isolated_kraft],
         consumer_group_migration_policy=["downgrade"],
         
consumer_version=consumer_versions_supporting_cooperative_sticky_assignor,
-        assignment_strategy=[COOPERATIVE_STICKEY]
+        assignment_strategy=[COOPERATIVE_STICKEY],
+        enable_assignment_batching=[True]
+    )
+    @matrix(
+        static_membership=[True, False],
+        metadata_quorum=[quorum.isolated_kraft],
+        consumer_group_migration_policy=["downgrade"],
+        # Test the latest version only without assignment batching.
+        
consumer_version=consumer_versions_supporting_cooperative_sticky_assignor[-1:],
+        assignment_strategy=[RANGE, COOPERATIVE_STICKEY],
+        enable_assignment_batching=[False]
     )
     def test_consumer_rolling_downgrade(self, static_membership, 
metadata_quorum,
-                                        consumer_group_migration_policy, 
consumer_version, assignment_strategy):
+                                        consumer_group_migration_policy, 
consumer_version, assignment_strategy,
+                                        enable_assignment_batching):
         """
         Verify correct consumer behavior when the consumers in the group are 
restarted to perform
         online downgrade when the migration policy is set to be DOWNGRADE.
diff --git a/tests/kafkatest/tests/client/consumer_test.py 
b/tests/kafkatest/tests/client/consumer_test.py
index c28cba1431b..6b71d9769f7 100644
--- a/tests/kafkatest/tests/client/consumer_test.py
+++ b/tests/kafkatest/tests/client/consumer_test.py
@@ -138,9 +138,17 @@ class OffsetValidationTest(VerifiableConsumerTest):
         clean_shutdown=[True],
         bounce_mode=["all", "rolling"],
         metadata_quorum=[quorum.isolated_kraft],
-        group_protocol=consumer_group.all_group_protocols
+        group_protocol=consumer_group.all_group_protocols,
+        enable_assignment_batching=[True]
     )
-    def test_consumer_bounce(self, clean_shutdown, bounce_mode, 
metadata_quorum=quorum.isolated_kraft, group_protocol=None):
+    @matrix(
+        clean_shutdown=[True],
+        bounce_mode=["all", "rolling"],
+        metadata_quorum=[quorum.isolated_kraft],
+        group_protocol=[consumer_group.consumer_group_protocol],
+        enable_assignment_batching=[False]
+    )
+    def test_consumer_bounce(self, clean_shutdown, bounce_mode, 
metadata_quorum=quorum.isolated_kraft, group_protocol=None, 
enable_assignment_batching=True):
         """
         Verify correct consumer behavior when the consumers in the group are 
consecutively restarted.
 
@@ -386,7 +394,15 @@ class OffsetValidationTest(VerifiableConsumerTest):
         clean_shutdown=[True],
         enable_autocommit=[True, False],
         metadata_quorum=[quorum.isolated_kraft],
-        group_protocol=consumer_group.all_group_protocols
+        group_protocol=consumer_group.all_group_protocols,
+        enable_assignment_batching=[True]
+    )
+    @matrix(
+        clean_shutdown=[True],
+        enable_autocommit=[True],
+        metadata_quorum=[quorum.isolated_kraft],
+        group_protocol=[consumer_group.consumer_group_protocol],
+        enable_assignment_batching=[False]
     )
     def test_consumer_failure(self, clean_shutdown, enable_autocommit, 
metadata_quorum=quorum.isolated_kraft, group_protocol=None):
         partition = TopicPartition(self.TOPIC, 0)
@@ -478,9 +494,15 @@ class OffsetValidationTest(VerifiableConsumerTest):
     @cluster(num_nodes=7)
     @matrix(
         metadata_quorum=[quorum.isolated_kraft],
-        group_protocol=consumer_group.all_group_protocols
+        group_protocol=consumer_group.all_group_protocols,
+        enable_assignment_batching=[True]
+    )
+    @matrix(
+        metadata_quorum=[quorum.isolated_kraft],
+        group_protocol=[consumer_group.consumer_group_protocol],
+        enable_assignment_batching=[False]
     )
-    def test_group_consumption(self, metadata_quorum=quorum.isolated_kraft, 
group_protocol=None):
+    def test_group_consumption(self, metadata_quorum=quorum.isolated_kraft, 
group_protocol=None, enable_assignment_batching=True):
         """
         Verifies correct group rebalance behavior as consumers are started and 
stopped.
         In particular, this test verifies that the partition is readable after 
every
@@ -541,9 +563,10 @@ class AssignmentValidationTest(VerifiableConsumerTest):
     @matrix(
         metadata_quorum=[quorum.isolated_kraft],
         group_protocol=[consumer_group.consumer_group_protocol],
-        group_remote_assignor=consumer_group.all_remote_assignors
+        group_remote_assignor=consumer_group.all_remote_assignors,
+        enable_assignment_batching=[False, True]
     )
-    def test_valid_assignment(self, assignment_strategy=None, 
metadata_quorum=quorum.isolated_kraft, group_protocol=None, 
group_remote_assignor=None):
+    def test_valid_assignment(self, assignment_strategy=None, 
metadata_quorum=quorum.isolated_kraft, group_protocol=None, 
group_remote_assignor=None, enable_assignment_batching=True):
         """
         Verify assignment strategy correctness: each partition is assigned to 
exactly
         one consumer instance.

Reply via email to