dajac commented on code in PR #16898:
URL: https://github.com/apache/kafka/pull/16898#discussion_r1721521757
##########
core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala:
##########
@@ -12,53 +12,212 @@
*/
package kafka.api
-import kafka.integration.KafkaServerTestHarness
import kafka.log.UnifiedLog
-import kafka.server.KafkaConfig
+import kafka.test.ClusterInstance
+import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, Type}
+import kafka.test.junit.ClusterTestExtensions
import kafka.utils.TestUtils
-import org.apache.kafka.clients.consumer.OffsetAndMetadata
-import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.clients.admin.{Admin, ConsumerGroupDescription}
+import org.apache.kafka.clients.consumer.{Consumer, GroupProtocol,
OffsetAndMetadata}
+import org.apache.kafka.common.{ConsumerGroupState, GroupType, KafkaFuture,
TopicPartition}
import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.ValueSource
import scala.jdk.CollectionConverters._
-import java.util.Properties
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.record.CompressionType
-import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
+import org.junit.jupiter.api.Timeout
+import org.junit.jupiter.api.extension.ExtendWith
-class GroupCoordinatorIntegrationTest extends KafkaServerTestHarness {
- val offsetsTopicCompressionCodec = CompressionType.GZIP
- val overridingProps = new Properties()
- overridingProps.put(GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG,
"1")
-
overridingProps.put(GroupCoordinatorConfig.OFFSETS_TOPIC_COMPRESSION_CODEC_CONFIG,
offsetsTopicCompressionCodec.id.toString)
+import java.time.Duration
+import java.util.Collections
+import java.util.concurrent.TimeUnit
- override def generateConfigs = TestUtils.createBrokerConfigs(1,
zkConnectOrNull, enableControlledShutdown = false).map {
- KafkaConfig.fromProps(_, overridingProps)
+@Timeout(120)
+@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
+class GroupCoordinatorIntegrationTest(cluster: ClusterInstance) {
+
+ @ClusterTest(
+ types = Array(Type.KRAFT, Type.ZK),
+ serverProperties = Array(
+ new ClusterConfigProperty(key = "offsets.topic.num.partitions", value =
"1"),
+ new ClusterConfigProperty(key = "offsets.topic.replication.factor",
value = "1"),
+ new ClusterConfigProperty(key = "offsets.topic.compression.codec", value
= "1"),
+ new ClusterConfigProperty(key = "controlled.shutdown.enable", value =
"false"),
+ )
+ )
+ def testGroupCoordinatorPropagatesOffsetsTopicCompressionCodec(): Unit = {
+ val logManager = cluster.brokers().asScala.head._2.logManager
+ val consumer = TestUtils.createConsumer(cluster.bootstrapServers())
+
+ try {
+ consumer.commitSync(Map(
+ new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0) -> new
OffsetAndMetadata(10, "")
+ ).asJava)
+
+ def getGroupMetadataLogOpt: Option[UnifiedLog] =
+ logManager.getLog(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME,
0))
+
+ TestUtils.waitUntilTrue(() =>
getGroupMetadataLogOpt.exists(_.logSegments.asScala.exists(_.log.batches.asScala.nonEmpty)),
+ "Commit message not appended in time")
+
+ val logSegments = getGroupMetadataLogOpt.get.logSegments.asScala
+ val incorrectCompressionCodecs = logSegments
+ .flatMap(_.log.batches.asScala.map(_.compressionType))
+ .filter(_ != CompressionType.GZIP)
+
+ assertEquals(Seq.empty, incorrectCompressionCodecs, "Incorrect
compression codecs should be empty")
+ } finally {
+ consumer.close()
+ }
}
- @ParameterizedTest
- @ValueSource(strings = Array("zk", "kraft"))
- def testGroupCoordinatorPropagatesOffsetsTopicCompressionCodec(quorum:
String): Unit = {
- val consumer = TestUtils.createConsumer(bootstrapServers())
- val offsetMap = Map(
- new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0) -> new
OffsetAndMetadata(10, "")
- ).asJava
- consumer.commitSync(offsetMap)
- val logManager = brokers.head.logManager
- def getGroupMetadataLogOpt: Option[UnifiedLog] =
- logManager.getLog(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0))
-
- TestUtils.waitUntilTrue(() =>
getGroupMetadataLogOpt.exists(_.logSegments.asScala.exists(_.log.batches.asScala.nonEmpty)),
- "Commit message not appended in time")
-
- val logSegments = getGroupMetadataLogOpt.get.logSegments.asScala
- val incorrectCompressionCodecs = logSegments
- .flatMap(_.log.batches.asScala.map(_.compressionType))
- .filter(_ != offsetsTopicCompressionCodec)
- assertEquals(Seq.empty, incorrectCompressionCodecs, "Incorrect compression
codecs should be empty")
-
- consumer.close()
+ @ClusterTest(
+ types = Array(Type.KRAFT),
+ serverProperties = Array(
+ new ClusterConfigProperty(key = "group.coordinator.new.enable", value =
"true"),
+ new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols",
value = "classic,consumer"),
+ new ClusterConfigProperty(key = "offsets.topic.num.partitions", value =
"1"),
+ new ClusterConfigProperty(key = "offsets.topic.replication.factor",
value = "1")
+ )
+ )
+ def
testCoordinatorFailoverWithConsumerGroupRecordsAfterCompactingPartition(): Unit
= {
+ withAdmin { admin =>
+ TestUtils.createTopicWithAdminRaw(
+ admin = admin,
+ topic = "foo",
+ numPartitions = 3
+ )
+
+ // Create a consumer group grp1 with one member. The member subscribes
to foo and leaves. This creates
+ // a mix of group records with tombstones to delete the member.
+ withConsumer(groupId = "grp1", groupProtocol = GroupProtocol.CONSUMER) {
consumer =>
+ consumer.subscribe(List("foo").asJava)
+ TestUtils.waitUntilTrue(() => {
+ consumer.poll(Duration.ofMillis(50))
+ consumer.assignment.asScala.nonEmpty
+ }, msg = "Consumer did not get an non empty assignment")
+ }
+
+ // Create a consumer group grp2 with one member. The member subscribes
to foo, manually commits offsets,
+ // unsubscribes and finally re-subscribes to foo. This creates a mix of
group records with tombstones
+ // and ensure that all the offset commit records are before the consumer
group records due to the
+ // rebalance after the commit sync.
+ withConsumer(groupId = "grp2", groupProtocol = GroupProtocol.CONSUMER,
enableAutoCommit = false) { consumer =>
+ consumer.subscribe(List("foo").asJava)
+ TestUtils.waitUntilTrue(() => {
+ consumer.poll(Duration.ofMillis(50))
+ consumer.assignment().asScala.nonEmpty
+ }, msg = "Consumer did not get an non empty assignment")
+ consumer.commitSync()
+ consumer.unsubscribe()
+ consumer.subscribe(List("foo").asJava)
+ TestUtils.waitUntilTrue(() => {
+ consumer.poll(Duration.ofMillis(50))
+ consumer.assignment().asScala.nonEmpty
+ }, msg = "Consumer did not get an non empty assignment")
+ }
+
+ // Create a consumer group grp3 with one member. The member subscribes
to foo and leaves the group. Then
+ // the group is deleted. This creates tombstones to delete the member,
the group and the offsets.
+ withConsumer(groupId = "grp3", groupProtocol = GroupProtocol.CONSUMER) {
consumer =>
+ consumer.subscribe(List("foo").asJava)
+ TestUtils.waitUntilTrue(() => {
+ consumer.poll(Duration.ofMillis(50))
+ consumer.assignment().asScala.nonEmpty
+ }, msg = "Consumer did not get an non empty assignment")
+ }
+
+ admin
+ .deleteConsumerGroups(List("grp3").asJava)
+ .deletedGroups()
+ .get("grp3")
+ .get(10, TimeUnit.SECONDS)
+
+ // Create a classic group grp4 with one member. Upgrades the group to
the consumer
+ // protocol.
+ withConsumer(groupId = "grp4", groupProtocol = GroupProtocol.CLASSIC) {
consumer =>
+ consumer.subscribe(List("foo").asJava)
+ TestUtils.waitUntilTrue(() => {
+ consumer.poll(Duration.ofMillis(50))
+ consumer.assignment().asScala.nonEmpty
+ }, msg = "Consumer did not get an non empty assignment")
+ }
+
+ withConsumer(groupId = "grp4", groupProtocol = GroupProtocol.CONSUMER) {
consumer =>
+ consumer.subscribe(List("foo").asJava)
+ TestUtils.waitUntilTrue(() => {
+ consumer.poll(Duration.ofMillis(50))
+ consumer.assignment().asScala.nonEmpty
+ }, msg = "Consumer did not get an non empty assignment")
+ }
+ }
+
+ // Force a compaction.
+ val tp = new TopicPartition("__consumer_offsets", 0)
+ val broker = cluster.brokers().asScala.head._2
+ val log = broker.logManager.getLog(tp).get
+ log.roll()
+ assertTrue(broker.logManager.cleaner.awaitCleaned(tp, 0))
+
+ // Restart the broker to reload the group coordinator.
+ cluster.shutdownBroker(broker.config.brokerId)
+ cluster.startBroker(broker.config.brokerId)
+
+ // Verify the state of the groups to ensure that the group coordinator
+ // was correctly loaded. If replaying any of the records fails, the
+ // group coordinator won't be available.
+ withAdmin { admin =>
+ val groups = admin
+ .describeConsumerGroups(List("grp1", "grp2", "grp3", "grp4").asJava)
+ .describedGroups()
+ .asScala
+ .toMap
+
+ assertDescribedGroup(groups, "grp1", GroupType.CONSUMER,
ConsumerGroupState.EMPTY)
+ assertDescribedGroup(groups, "grp2", GroupType.CONSUMER,
ConsumerGroupState.EMPTY)
+ assertDescribedGroup(groups, "grp3", GroupType.CLASSIC,
ConsumerGroupState.DEAD)
+ assertDescribedGroup(groups, "grp4", GroupType.CONSUMER,
ConsumerGroupState.EMPTY)
+ }
+ }
+
+ private def withAdmin(f: Admin => Unit): Unit = {
+ val admin: Admin = cluster.createAdminClient()
+ try {
+ f(admin)
+ } finally {
+ admin.close()
+ }
+ }
+
+ private def withConsumer(
+ groupId: String,
+ groupProtocol: GroupProtocol,
+ enableAutoCommit: Boolean = true
+ )(f: Consumer[Array[Byte], Array[Byte]] => Unit): Unit = {
+ val consumer = TestUtils.createConsumer(
+ brokerList = cluster.bootstrapServers(),
+ groupId = groupId,
+ groupProtocol = groupProtocol,
+ enableAutoCommit = enableAutoCommit
+ )
+ try {
+ f(consumer)
+ } finally {
+ consumer.close()
+ }
+ }
+
+ private def assertDescribedGroup(
+ groups: Map[String, KafkaFuture[ConsumerGroupDescription]],
+ groupId: String,
+ `type`: GroupType,
Review Comment:
Let me change it. You have to do this when the word is a reserved word.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]