This is an automated email from the ASF dual-hosted git repository.
lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 8afeecc99fa MINOR: Add streams group tests to
GroupCoordinatorIntegrationTest (#21772)
8afeecc99fa is described below
commit 8afeecc99fa094921c5ced2cddaf31b506a8ae1a
Author: Lucas Brutschy <[email protected]>
AuthorDate: Tue Apr 28 11:20:39 2026 +0200
MINOR: Add streams group tests to GroupCoordinatorIntegrationTest (#21772)
GroupCoordinatorIntegrationTest had several tests that only covered
consumer groups but not streams groups. This adds four streams group
equivalents to match coverage:
- Coordinator failover after compaction with a member joining and
leaving
- Coordinator failover after compaction with a member leaving and
rejoining
- Coordinator failover after compaction with a deleted group
- Recreating the __consumer_offsets topic with a streams group
Reviewers: Matthias J. Sax <[email protected]>
---
.../api/GroupCoordinatorIntegrationTest.scala | 177 ++++++++++++++++++++-
1 file changed, 176 insertions(+), 1 deletion(-)
diff --git
a/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala
index a0359cfe5ad..2d1919b7559 100644
---
a/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala
+++
b/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala
@@ -230,6 +230,181 @@ class GroupCoordinatorIntegrationTest(cluster:
ClusterInstance) {
}
}
+ @ClusterTest(
+ types = Array(Type.KRAFT),
+ serverProperties = Array(
+ new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
+ new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
+ )
+ )
+ def
testCoordinatorFailoverAfterCompactingPartitionWithStreamsGroupMemberJoiningAndLeaving():
Unit = {
+ withAdmin { admin =>
+ TestUtils.createTopicWithAdminRaw(
+ admin = admin,
+ topic = "foo",
+ numPartitions = 3
+ )
+
+ // Create a streams group grp1s with one member. The member joins and
leaves. This creates
+ // a mix of group records with tombstones to delete the member.
+ withStreamsApp(applicationId = "grp1s", inputTopic = "foo")
+ }
+
+ // Force a compaction.
+ rollAndCompactConsumerOffsets()
+
+ // Restart the broker to reload the group coordinator.
+ cluster.shutdownBroker(0)
+ cluster.startBroker(0)
+
+ // Verify the state of the groups to ensure that the group coordinator
+ // was correctly loaded. If replaying any of the records fails, the
+ // group coordinator won't be available.
+ withAdmin { admin =>
+ val groups = admin
+ .describeStreamsGroups(java.util.List.of("grp1s"))
+ .describedGroups()
+ .asScala
+ .toMap
+
+ val group = groups("grp1s").get(10, TimeUnit.SECONDS)
+ assertEquals("grp1s", group.groupId)
+ assertEquals(GroupState.EMPTY, group.groupState)
+ }
+ }
+
+ @ClusterTest(
+ types = Array(Type.KRAFT),
+ serverProperties = Array(
+ new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
+ new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
+ )
+ )
+ def
testCoordinatorFailoverCompactingPartitionWithStreamsGroupMemberLeavingAndRejoining():
Unit = {
+ withAdmin { admin =>
+ TestUtils.createTopicWithAdminRaw(
+ admin = admin,
+ topic = "foo",
+ numPartitions = 3
+ )
+
+ // Create a streams group grp2s with one member. The member joins,
leaves, and rejoins.
+ // This creates a mix of group records with tombstones and ensures that
all the offset
+ // commit records are before the streams group records due to the
rebalance after rejoining.
+ withStreamsApp(applicationId = "grp2s", inputTopic = "foo")
+ withStreamsApp(applicationId = "grp2s", inputTopic = "foo")
+ }
+
+ // Force a compaction.
+ rollAndCompactConsumerOffsets()
+
+ // Restart the broker to reload the group coordinator.
+ cluster.shutdownBroker(0)
+ cluster.startBroker(0)
+
+ // Verify the state of the groups to ensure that the group coordinator
+ // was correctly loaded. If replaying any of the records fails, the
+ // group coordinator won't be available.
+ withAdmin { admin =>
+ val groups = admin
+ .describeStreamsGroups(java.util.List.of("grp2s"))
+ .describedGroups()
+ .asScala
+ .toMap
+
+ val group = groups("grp2s").get(10, TimeUnit.SECONDS)
+ assertEquals("grp2s", group.groupId)
+ assertEquals(GroupState.EMPTY, group.groupState)
+ }
+ }
+
+ @ClusterTest(
+ types = Array(Type.KRAFT),
+ serverProperties = Array(
+ new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
+ new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
+ )
+ )
+ def
testCoordinatorFailoverAfterCompactingPartitionWithStreamsGroupDeleted(): Unit
= {
+ withAdmin { admin =>
+ TestUtils.createTopicWithAdminRaw(
+ admin = admin,
+ topic = "foo",
+ numPartitions = 3
+ )
+
+ // Create a streams group grp3s with one member. The member joins and
leaves the group. Then
+ // the group is deleted. This creates tombstones to delete the member,
the group and the offsets.
+ withStreamsApp(applicationId = "grp3s", inputTopic = "foo")
+
+ admin
+ .deleteConsumerGroups(java.util.List.of("grp3s"))
+ .deletedGroups()
+ .get("grp3s")
+ .get(10, TimeUnit.SECONDS)
+ }
+
+ // Force a compaction.
+ rollAndCompactConsumerOffsets()
+
+ // Restart the broker to reload the group coordinator.
+ cluster.shutdownBroker(0)
+ cluster.startBroker(0)
+
+ // Verify the state of the groups to ensure that the group coordinator
+ // was correctly loaded. If replaying any of the records fails, the
+ // group coordinator won't be available.
+ withAdmin { admin =>
+ val groups = admin
+ .describeStreamsGroups(java.util.List.of("grp3s"))
+ .describedGroups()
+ .asScala
+ .toMap
+
+ assertDescribedDeadGroup(groups, "grp3s")
+ }
+ }
+
+ @ClusterTest(
+ types = Array(Type.KRAFT),
+ serverProperties = Array(
+ new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
+ new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
+ )
+ )
+ def testRecreatingConsumerOffsetsTopicWithStreamsGroup(): Unit = {
+ withAdmin { admin =>
+ TestUtils.createTopicWithAdminRaw(
+ admin = admin,
+ topic = "foo",
+ numPartitions = 3
+ )
+
+ withStreamsApp(applicationId = "groups", inputTopic = "foo")
+
+ admin
+
.deleteTopics(TopicCollection.ofTopicNames(List(Topic.GROUP_METADATA_TOPIC_NAME).asJava))
+ .all()
+ .get()
+
+ TestUtils.waitUntilTrue(() => {
+ try {
+ admin
+
.describeTopics(TopicCollection.ofTopicNames(List(Topic.GROUP_METADATA_TOPIC_NAME).asJava))
+ .topicNameValues()
+ .get(Topic.GROUP_METADATA_TOPIC_NAME)
+ .get(JTestUtils.DEFAULT_MAX_WAIT_MS, TimeUnit.MILLISECONDS)
+ false
+ } catch {
+ case e: ExecutionException =>
+ e.getCause.isInstanceOf[UnknownTopicOrPartitionException]
+ }
+ }, msg = s"${Topic.GROUP_METADATA_TOPIC_NAME} was not deleted")
+
+ withStreamsApp(applicationId = "groups", inputTopic = "foo")
+ }
+ }
+
@ClusterTest(
types = Array(Type.KRAFT),
serverProperties = Array(
@@ -727,7 +902,7 @@ class GroupCoordinatorIntegrationTest(cluster:
ClusterInstance) {
}
private def assertDescribedDeadGroup(
- groups: Map[String, KafkaFuture[ConsumerGroupDescription]],
+ groups: Map[String, _ <: KafkaFuture[_]],
groupId: String
): Unit = {
try {