This is an automated email from the ASF dual-hosted git repository.

lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 8afeecc99fa MINOR: Add streams group tests to 
GroupCoordinatorIntegrationTest (#21772)
8afeecc99fa is described below

commit 8afeecc99fa094921c5ced2cddaf31b506a8ae1a
Author: Lucas Brutschy <[email protected]>
AuthorDate: Tue Apr 28 11:20:39 2026 +0200

    MINOR: Add streams group tests to GroupCoordinatorIntegrationTest (#21772)
    
    GroupCoordinatorIntegrationTest had several tests that only covered
    consumer groups but not streams groups. This adds four streams group
    equivalents to match coverage:
    
    - Coordinator failover after compaction with a member joining and
    leaving
    - Coordinator failover after compaction with a member leaving and
    rejoining
    - Coordinator failover after compaction with a deleted group
    - Recreating the __consumer_offsets topic with a streams group
    
    Reviewers: Matthias J. Sax <[email protected]>
---
 .../api/GroupCoordinatorIntegrationTest.scala      | 177 ++++++++++++++++++++-
 1 file changed, 176 insertions(+), 1 deletion(-)

diff --git 
a/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala
 
b/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala
index a0359cfe5ad..2d1919b7559 100644
--- 
a/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala
@@ -230,6 +230,181 @@ class GroupCoordinatorIntegrationTest(cluster: 
ClusterInstance) {
     }
   }
 
+  @ClusterTest(
+    types = Array(Type.KRAFT),
+    serverProperties = Array(
+      new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
+      new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
+    )
+  )
+  def 
testCoordinatorFailoverAfterCompactingPartitionWithStreamsGroupMemberJoiningAndLeaving():
 Unit = {
+    withAdmin { admin =>
+      TestUtils.createTopicWithAdminRaw(
+        admin = admin,
+        topic = "foo",
+        numPartitions = 3
+      )
+
+      // Create a streams group grp1s with one member. The member joins and 
leaves. This creates
+      // a mix of group records with tombstones to delete the member.
+      withStreamsApp(applicationId = "grp1s", inputTopic = "foo")
+    }
+
+    // Force a compaction.
+    rollAndCompactConsumerOffsets()
+
+    // Restart the broker to reload the group coordinator.
+    cluster.shutdownBroker(0)
+    cluster.startBroker(0)
+
+    // Verify the state of the groups to ensure that the group coordinator
+    // was correctly loaded. If replaying any of the records fails, the
+    // group coordinator won't be available.
+    withAdmin { admin =>
+      val groups = admin
+        .describeStreamsGroups(java.util.List.of("grp1s"))
+        .describedGroups()
+        .asScala
+        .toMap
+
+      val group = groups("grp1s").get(10, TimeUnit.SECONDS)
+      assertEquals("grp1s", group.groupId)
+      assertEquals(GroupState.EMPTY, group.groupState)
+    }
+  }
+
+  @ClusterTest(
+    types = Array(Type.KRAFT),
+    serverProperties = Array(
+      new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
+      new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
+    )
+  )
+  def 
testCoordinatorFailoverCompactingPartitionWithStreamsGroupMemberLeavingAndRejoining():
 Unit = {
+    withAdmin { admin =>
+      TestUtils.createTopicWithAdminRaw(
+        admin = admin,
+        topic = "foo",
+        numPartitions = 3
+      )
+
+      // Create a streams group grp2s with one member. The member joins, 
leaves, and rejoins.
+      // This creates a mix of group records with tombstones and ensures that 
all the offset
+      // commit records are before the streams group records due to the 
rebalance after rejoining.
+      withStreamsApp(applicationId = "grp2s", inputTopic = "foo")
+      withStreamsApp(applicationId = "grp2s", inputTopic = "foo")
+    }
+
+    // Force a compaction.
+    rollAndCompactConsumerOffsets()
+
+    // Restart the broker to reload the group coordinator.
+    cluster.shutdownBroker(0)
+    cluster.startBroker(0)
+
+    // Verify the state of the groups to ensure that the group coordinator
+    // was correctly loaded. If replaying any of the records fails, the
+    // group coordinator won't be available.
+    withAdmin { admin =>
+      val groups = admin
+        .describeStreamsGroups(java.util.List.of("grp2s"))
+        .describedGroups()
+        .asScala
+        .toMap
+
+      val group = groups("grp2s").get(10, TimeUnit.SECONDS)
+      assertEquals("grp2s", group.groupId)
+      assertEquals(GroupState.EMPTY, group.groupState)
+    }
+  }
+
+  @ClusterTest(
+    types = Array(Type.KRAFT),
+    serverProperties = Array(
+      new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
+      new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
+    )
+  )
+  def 
testCoordinatorFailoverAfterCompactingPartitionWithStreamsGroupDeleted(): Unit 
= {
+    withAdmin { admin =>
+      TestUtils.createTopicWithAdminRaw(
+        admin = admin,
+        topic = "foo",
+        numPartitions = 3
+      )
+
+      // Create a streams group grp3s with one member. The member joins and 
leaves the group. Then
+      // the group is deleted. This creates tombstones to delete the member, 
the group and the offsets.
+      withStreamsApp(applicationId = "grp3s", inputTopic = "foo")
+
+      admin
+        .deleteConsumerGroups(java.util.List.of("grp3s"))
+        .deletedGroups()
+        .get("grp3s")
+        .get(10, TimeUnit.SECONDS)
+    }
+
+    // Force a compaction.
+    rollAndCompactConsumerOffsets()
+
+    // Restart the broker to reload the group coordinator.
+    cluster.shutdownBroker(0)
+    cluster.startBroker(0)
+
+    // Verify the state of the groups to ensure that the group coordinator
+    // was correctly loaded. If replaying any of the records fails, the
+    // group coordinator won't be available.
+    withAdmin { admin =>
+      val groups = admin
+        .describeStreamsGroups(java.util.List.of("grp3s"))
+        .describedGroups()
+        .asScala
+        .toMap
+
+      assertDescribedDeadGroup(groups, "grp3s")
+    }
+  }
+
+  @ClusterTest(
+    types = Array(Type.KRAFT),
+    serverProperties = Array(
+      new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
+      new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
+    )
+  )
+  def testRecreatingConsumerOffsetsTopicWithStreamsGroup(): Unit = {
+    withAdmin { admin =>
+      TestUtils.createTopicWithAdminRaw(
+        admin = admin,
+        topic = "foo",
+        numPartitions = 3
+      )
+
+      withStreamsApp(applicationId = "groups", inputTopic = "foo")
+
+      admin
+        
.deleteTopics(TopicCollection.ofTopicNames(List(Topic.GROUP_METADATA_TOPIC_NAME).asJava))
+        .all()
+        .get()
+
+      TestUtils.waitUntilTrue(() => {
+        try {
+          admin
+            
.describeTopics(TopicCollection.ofTopicNames(List(Topic.GROUP_METADATA_TOPIC_NAME).asJava))
+            .topicNameValues()
+            .get(Topic.GROUP_METADATA_TOPIC_NAME)
+            .get(JTestUtils.DEFAULT_MAX_WAIT_MS, TimeUnit.MILLISECONDS)
+          false
+        } catch {
+          case e: ExecutionException =>
+            e.getCause.isInstanceOf[UnknownTopicOrPartitionException]
+        }
+      }, msg = s"${Topic.GROUP_METADATA_TOPIC_NAME} was not deleted")
+
+      withStreamsApp(applicationId = "groups", inputTopic = "foo")
+    }
+  }
+
   @ClusterTest(
     types = Array(Type.KRAFT),
     serverProperties = Array(
@@ -727,7 +902,7 @@ class GroupCoordinatorIntegrationTest(cluster: 
ClusterInstance) {
   }
 
   private def assertDescribedDeadGroup(
-    groups: Map[String, KafkaFuture[ConsumerGroupDescription]],
+    groups: Map[String, _ <: KafkaFuture[_]],
     groupId: String
   ): Unit = {
     try {

Reply via email to