This is an automated email from the ASF dual-hosted git repository.
payang pushed a commit to branch 4.2
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.2 by this push:
new 0553e67b628 KAFKA-20254: Add integration tests for group migration
with compaction and broker restart (#21642)
0553e67b628 is described below
commit 0553e67b6284dc1c1137d5d41ebe84cb1ff75a43
Author: Lucas Brutschy <[email protected]>
AuthorDate: Tue Mar 10 15:12:54 2026 +0100
KAFKA-20254: Add integration tests for group migration with compaction and
broker restart (#21642)
This adds integration tests to GroupCoordinatorIntegrationTest that
verify the group coordinator loads correctly after compaction and broker
restart when a group has been upgraded from the classic protocol to the
consumer or streams protocol.
For each protocol upgrade path (classic-to-consumer and
classic-to-streams), there are two test variants. The first variant
compacts __consumer_offsets but retains the GroupMetadata tombstone,
verifying that replaying tombstones after compaction works correctly.
The second variant forces tombstone removal by running two compaction
passes with delete.retention.ms=0 and min.cleanable.dirty.ratio=0. This
reproduces the KAFKA-20254 scenario: after the tombstone is removed, the
classic group's offset commit records precede the consumer/streams group
records in the log. During replay, these offset commits create a simple
classic group, which the consumer/streams group replay logic must handle
via the isSimpleGroup() fix in
getOrMaybeCreatePersistedConsumerGroup/getOrMaybeCreatePersistedStreamsGroup.
In the tombstone-removed variants, the upgraded group avoids committing
offsets so that the classic group's offset commits are not overwritten
during compaction and survive at their original early position in the
log, naturally triggering the bug without the fix.
Reviewers: David Jacot <[email protected]>, Matthias J. Sax
<[email protected]>
(cherry picked from commit b9eae3388cb8145c3e43130f66526927f804a749)
---
build.gradle | 1 +
.../api/GroupCoordinatorIntegrationTest.scala | 355 ++++++++++++++++++++-
2 files changed, 351 insertions(+), 5 deletions(-)
diff --git a/build.gradle b/build.gradle
index 2b27f3af758..e6270ae96c8 100644
--- a/build.gradle
+++ b/build.gradle
@@ -1091,6 +1091,7 @@ project(':core') {
testImplementation project(':server-common').sourceSets.test.output
testImplementation project(':storage:storage-api').sourceSets.test.output
testImplementation project(':server').sourceSets.test.output
+ testImplementation project(':streams')
testImplementation project(':test-common:test-common-runtime')
testImplementation project(':test-common:test-common-internal-api')
testImplementation project(':test-common:test-common-util')
diff --git
a/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala
index e161406a50e..7fb75c6849c 100644
---
a/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala
+++
b/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala
@@ -14,10 +14,12 @@ package kafka.api
import org.apache.kafka.common.test.api.{ClusterConfigProperty, ClusterTest,
Type}
import kafka.utils.TestUtils
-import org.apache.kafka.clients.admin.{Admin, ConsumerGroupDescription}
-import org.apache.kafka.clients.consumer.{Consumer, GroupProtocol,
OffsetAndMetadata}
+import org.apache.kafka.clients.admin.{Admin, AlterConfigOp, ConfigEntry,
ConsumerGroupDescription}
+import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig,
GroupProtocol, OffsetAndMetadata}
+import org.apache.kafka.common.config.{ConfigResource, TopicConfig}
import org.apache.kafka.common.errors.{GroupIdNotFoundException,
UnknownTopicOrPartitionException}
-import org.apache.kafka.common.{ConsumerGroupState, GroupType, KafkaFuture,
TopicCollection, TopicPartition}
+import org.apache.kafka.common.{ConsumerGroupState, GroupState, GroupType,
KafkaFuture, TopicCollection, TopicPartition}
+import org.apache.kafka.common.serialization.Serdes
import org.junit.jupiter.api.Assertions._
import scala.jdk.CollectionConverters._
@@ -26,11 +28,14 @@ import org.apache.kafka.common.record.CompressionType
import org.apache.kafka.common.test.ClusterInstance
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
import org.apache.kafka.server.config.ServerConfigs
+import org.apache.kafka.streams.{KafkaStreams, StreamsBuilder, StreamsConfig}
+import org.apache.kafka.streams.{GroupProtocol => StreamsGroupProtocol}
import org.apache.kafka.storage.internals.log.UnifiedLog
import org.apache.kafka.test.{TestUtils => JTestUtils}
import org.junit.jupiter.api.Timeout
import java.time.Duration
+import java.util.Properties
import java.util.concurrent.TimeUnit
import scala.concurrent.ExecutionException
@@ -279,6 +284,295 @@ class GroupCoordinatorIntegrationTest(cluster:
ClusterInstance) {
}
}
+ @ClusterTest(
+ types = Array(Type.KRAFT),
+ serverProperties = Array(
+ new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
+ new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
+ )
+ )
+ def
testCoordinatorFailoverAfterCompactingPartitionWithUpgradedConsumerGroupAndTombstoneRemoved():
Unit = {
+ withAdmin { admin =>
+ TestUtils.createTopicWithAdminRaw(
+ admin = admin,
+ topic = "foo",
+ numPartitions = 3
+ )
+
+ // Create a classic group with one member and commit offsets.
+ withConsumer(groupId = "grp4", groupProtocol = GroupProtocol.CLASSIC,
enableAutoCommit = false) { consumer =>
+ consumer.subscribe(java.util.List.of("foo"))
+ TestUtils.waitUntilTrue(() => {
+ consumer.poll(Duration.ofMillis(50))
+ consumer.assignment().asScala.nonEmpty
+ }, msg = "Consumer did not get an non empty assignment")
+ consumer.commitSync()
+ }
+
+ // Set delete.retention.ms=0 before the tombstone is written so that
+ // compaction will remove it.
+ configureDeleteRetention()
+
+ // Upgrade the group to the consumer protocol. Don't commit offsets
+ // so the classic group's offset commits survive compaction.
+ withConsumer(groupId = "grp4", groupProtocol = GroupProtocol.CONSUMER,
enableAutoCommit = false) { consumer =>
+ consumer.subscribe(java.util.List.of("foo"))
+ TestUtils.waitUntilTrue(() => {
+ consumer.poll(Duration.ofMillis(50))
+ consumer.assignment().asScala.nonEmpty
+ }, msg = "Consumer did not get an non empty assignment")
+ }
+ }
+
+ // Force compaction twice to remove tombstones: the first pass sets
+ // deleteHorizonMs, and the second pass removes them.
+ rollAndCompactConsumerOffsets()
+ writeOneOffsetCommit()
+ rollAndCompactConsumerOffsets()
+
+ // Restart the broker to reload the group coordinator.
+ cluster.shutdownBroker(0)
+ cluster.startBroker(0)
+
+ // Verify the state of the groups to ensure that the group coordinator
+ // was correctly loaded. Without the fix for KAFKA-20254, the offset
+ // commit records create a simple classic group during replay and the
+ // consumer group records fail to load.
+ withAdmin { admin =>
+ val groups = admin
+ .describeConsumerGroups(java.util.List.of("grp4"))
+ .describedGroups()
+ .asScala
+ .toMap
+
+ assertDescribedGroup(groups, "grp4", GroupType.CONSUMER,
ConsumerGroupState.EMPTY)
+ }
+ }
+
+ @ClusterTest(
+ types = Array(Type.KRAFT),
+ serverProperties = Array(
+ new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
+ new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
+ )
+ )
+ def
testCoordinatorFailoverAfterCompactingPartitionWithUpgradedSimpleConsumerGroup():
Unit = {
+ withAdmin { admin =>
+ TestUtils.createTopicWithAdminRaw(
+ admin = admin,
+ topic = "foo",
+ numPartitions = 3
+ )
+
+ // Create a simple classic group by committing offsets directly
+ // without subscribing. This only writes offset commit records
+ // without any GroupMetadata records.
+ withConsumer(groupId = "grp6", groupProtocol = GroupProtocol.CLASSIC,
enableAutoCommit = false) { consumer =>
+ val tp = new TopicPartition("foo", 0)
+ consumer.assign(java.util.List.of(tp))
+ consumer.commitSync(java.util.Map.of(tp, new OffsetAndMetadata(0)))
+ }
+
+ // Upgrade the group to the consumer protocol. Don't commit offsets
+ // so the simple classic group's offset commits survive compaction.
+ withConsumer(groupId = "grp6", groupProtocol = GroupProtocol.CONSUMER,
enableAutoCommit = false) { consumer =>
+ consumer.subscribe(java.util.List.of("foo"))
+ TestUtils.waitUntilTrue(() => {
+ consumer.poll(Duration.ofMillis(50))
+ consumer.assignment().asScala.nonEmpty
+ }, msg = "Consumer did not get an non empty assignment")
+ }
+ }
+
+ // Force a compaction. Since a simple classic group has no
+ // GroupMetadata records, there are no tombstones — the offset
+ // commit records always survive compaction.
+ rollAndCompactConsumerOffsets()
+
+ // Restart the broker to reload the group coordinator.
+ cluster.shutdownBroker(0)
+ cluster.startBroker(0)
+
+ // Verify the state of the groups to ensure that the group coordinator
+ // was correctly loaded. Without the fix for KAFKA-20254, the offset
+ // commit records create a simple classic group during replay and the
+ // consumer group records fail to load.
+ withAdmin { admin =>
+ val groups = admin
+ .describeConsumerGroups(java.util.List.of("grp6"))
+ .describedGroups()
+ .asScala
+ .toMap
+
+ assertDescribedGroup(groups, "grp6", GroupType.CONSUMER,
ConsumerGroupState.EMPTY)
+ }
+ }
+
+ @ClusterTest(
+ types = Array(Type.KRAFT),
+ serverProperties = Array(
+ new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
+ new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
+ )
+ )
+ def
testCoordinatorFailoverAfterCompactingPartitionWithUpgradedStreamsGroup(): Unit
= {
+ withAdmin { admin =>
+ TestUtils.createTopicWithAdminRaw(
+ admin = admin,
+ topic = "foo",
+ numPartitions = 3
+ )
+
+ // Create a classic group with one member.
+ withConsumer(groupId = "grp5", groupProtocol = GroupProtocol.CLASSIC) {
consumer =>
+ consumer.subscribe(java.util.List.of("foo"))
+ TestUtils.waitUntilTrue(() => {
+ consumer.poll(Duration.ofMillis(50))
+ consumer.assignment().asScala.nonEmpty
+ }, msg = "Consumer did not get an non empty assignment")
+ }
+
+ // Upgrade the group to the streams protocol.
+ withStreamsApp(applicationId = "grp5", inputTopic = "foo")
+ }
+
+ // Force a compaction.
+ rollAndCompactConsumerOffsets()
+
+ // Restart the broker to reload the group coordinator.
+ cluster.shutdownBroker(0)
+ cluster.startBroker(0)
+
+ // Verify the state of the groups to ensure that the group coordinator
+ // was correctly loaded. If replaying any of the records fails, the
+ // group coordinator won't be available.
+ withAdmin { admin =>
+ val groups = admin
+ .describeStreamsGroups(java.util.List.of("grp5"))
+ .describedGroups()
+ .asScala
+ .toMap
+
+ val group = groups("grp5").get(10, TimeUnit.SECONDS)
+ assertEquals("grp5", group.groupId)
+ assertEquals(GroupState.EMPTY, group.groupState)
+ }
+ }
+
+ @ClusterTest(
+ types = Array(Type.KRAFT),
+ serverProperties = Array(
+ new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
+ new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
+ )
+ )
+ def
testCoordinatorFailoverAfterCompactingPartitionWithUpgradedStreamsGroupAndTombstoneRemoved():
Unit = {
+ withAdmin { admin =>
+ TestUtils.createTopicWithAdminRaw(
+ admin = admin,
+ topic = "foo",
+ numPartitions = 3
+ )
+
+ // Create a classic group with one member and commit offsets.
+ withConsumer(groupId = "grp5", groupProtocol = GroupProtocol.CLASSIC,
enableAutoCommit = false) { consumer =>
+ consumer.subscribe(java.util.List.of("foo"))
+ TestUtils.waitUntilTrue(() => {
+ consumer.poll(Duration.ofMillis(50))
+ consumer.assignment().asScala.nonEmpty
+ }, msg = "Consumer did not get an non empty assignment")
+ consumer.commitSync()
+ }
+
+ // Set delete.retention.ms=0 before the tombstone is written so that
+ // compaction will remove it.
+ configureDeleteRetention()
+
+ // Upgrade the group to the streams protocol.
+ withStreamsApp(applicationId = "grp5", inputTopic = "foo")
+ }
+
+ // Force compaction twice to remove tombstones: the first pass sets
+ // deleteHorizonMs, and the second pass removes them.
+ rollAndCompactConsumerOffsets()
+ writeOneOffsetCommit()
+ rollAndCompactConsumerOffsets()
+
+ // Restart the broker to reload the group coordinator.
+ cluster.shutdownBroker(0)
+ cluster.startBroker(0)
+
+ // Verify the state of the groups to ensure that the group coordinator
+ // was correctly loaded. Without the fix for KAFKA-20254, the offset
+ // commit records create a simple classic group during replay and the
+ // streams group records fail to load.
+ withAdmin { admin =>
+ val groups = admin
+ .describeStreamsGroups(java.util.List.of("grp5"))
+ .describedGroups()
+ .asScala
+ .toMap
+
+ val group = groups("grp5").get(10, TimeUnit.SECONDS)
+ assertEquals("grp5", group.groupId)
+ assertEquals(GroupState.EMPTY, group.groupState)
+ }
+ }
+
+ @ClusterTest(
+ types = Array(Type.KRAFT),
+ serverProperties = Array(
+ new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
+ new ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
+ )
+ )
+ def
testCoordinatorFailoverAfterCompactingPartitionWithUpgradedSimpleStreamsGroup():
Unit = {
+ withAdmin { admin =>
+ TestUtils.createTopicWithAdminRaw(
+ admin = admin,
+ topic = "foo",
+ numPartitions = 3
+ )
+
+ // Create a simple classic group by committing offsets directly
+ // without subscribing. This only writes offset commit records
+ // without any GroupMetadata records.
+ withConsumer(groupId = "grp7", groupProtocol = GroupProtocol.CLASSIC,
enableAutoCommit = false) { consumer =>
+ val tp = new TopicPartition("foo", 0)
+ consumer.assign(java.util.List.of(tp))
+ consumer.commitSync(java.util.Map.of(tp, new OffsetAndMetadata(0)))
+ }
+
+ // Upgrade the group to the streams protocol.
+ withStreamsApp(applicationId = "grp7", inputTopic = "foo")
+ }
+
+ // Force a compaction. Since a simple classic group has no
+ // GroupMetadata records, there are no tombstones — the offset
+ // commit records always survive compaction.
+ rollAndCompactConsumerOffsets()
+
+ // Restart the broker to reload the group coordinator.
+ cluster.shutdownBroker(0)
+ cluster.startBroker(0)
+
+ // Verify the state of the groups to ensure that the group coordinator
+ // was correctly loaded. Without the fix for KAFKA-20254, the offset
+ // commit records create a simple classic group during replay and the
+ // streams group records fail to load.
+ withAdmin { admin =>
+ val groups = admin
+ .describeStreamsGroups(java.util.List.of("grp7"))
+ .describedGroups()
+ .asScala
+ .toMap
+
+ val group = groups("grp7").get(10, TimeUnit.SECONDS)
+ assertEquals("grp7", group.groupId)
+ assertEquals(GroupState.EMPTY, group.groupState)
+ }
+ }
+
@ClusterTest(
types = Array(Type.KRAFT),
serverProperties = Array(
@@ -331,12 +625,34 @@ class GroupCoordinatorIntegrationTest(cluster:
ClusterInstance) {
}
}
+ private def writeOneOffsetCommit(): Unit = {
+ // Write a single offset commit to create dirty data past the cleaner
+ // checkpoint so the cleaner will re-process previously compacted
+ // segments on the next compaction pass.
+ withConsumer(groupId = "compaction-trigger", groupProtocol =
GroupProtocol.CLASSIC, enableAutoCommit = false) { consumer =>
+ val tp = new TopicPartition("foo", 0)
+ consumer.assign(java.util.List.of(tp))
+ consumer.commitSync(java.util.Map.of(tp, new OffsetAndMetadata(0)))
+ }
+ }
+
+ private def configureDeleteRetention(): Unit = {
+ withAdmin { admin =>
+ val resource = new ConfigResource(ConfigResource.Type.TOPIC,
Topic.GROUP_METADATA_TOPIC_NAME)
+ admin.incrementalAlterConfigs(java.util.Map.of(resource,
java.util.List.of(
+ new AlterConfigOp(new
ConfigEntry(TopicConfig.DELETE_RETENTION_MS_CONFIG, "0"),
AlterConfigOp.OpType.SET),
+ new AlterConfigOp(new
ConfigEntry(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG, "0.0"),
AlterConfigOp.OpType.SET)
+ ))).all().get()
+ }
+ }
+
private def rollAndCompactConsumerOffsets(): Unit = {
- val tp = new TopicPartition("__consumer_offsets", 0)
+ val tp = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)
val broker = cluster.brokers.asScala.head._2
val log = broker.logManager.getLog(tp).get
+ val endOffset = log.logEndOffset
log.roll()
- assertTrue(broker.logManager.cleaner.awaitCleaned(tp, 0, 60000L))
+ assertTrue(broker.logManager.cleaner.awaitCleaned(tp, endOffset, 60000L))
}
private def withAdmin(f: Admin => Unit): Unit = {
@@ -366,6 +682,35 @@ class GroupCoordinatorIntegrationTest(cluster:
ClusterInstance) {
}
}
+ private def withStreamsApp(
+ applicationId: String,
+ inputTopic: String
+ ): Unit = {
+ val builder = new StreamsBuilder()
+ builder.stream(inputTopic)
+
+ val props = new Properties()
+ props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId)
+ props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
cluster.bootstrapServers())
+ props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
classOf[Serdes.StringSerde].getName)
+ props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
classOf[Serdes.StringSerde].getName)
+ props.put(StreamsConfig.GROUP_PROTOCOL_CONFIG,
StreamsGroupProtocol.STREAMS.name())
+ props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "500")
+ props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "100")
+
+ val streams = new KafkaStreams(builder.build(), props)
+ try {
+ streams.start()
+ TestUtils.waitUntilTrue(
+ () => streams.state() == KafkaStreams.State.RUNNING,
+ msg = "Streams app did not reach RUNNING state"
+ )
+ } finally {
+ streams.close(Duration.ofSeconds(30))
+ streams.cleanUp()
+ }
+ }
+
private def assertDescribedGroup(
groups: Map[String, KafkaFuture[ConsumerGroupDescription]],
groupId: String,