OmniaGM commented on code in PR #16920:
URL: https://github.com/apache/kafka/pull/16920#discussion_r1724844017


##########
core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala:
##########
@@ -73,6 +75,10 @@ abstract class IntegrationTestHarness extends 
KafkaServerTestHarness {
       
cfgs.foreach(_.setProperty(GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG,
 "true"))
       
cfgs.foreach(_.setProperty(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG,
 "classic,consumer"))
     }
+    if (isShareGroupTest()) {
+      
cfgs.foreach(_.setProperty(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG,
 "classic,consumer,share"))
+      cfgs.foreach(_.setProperty("unstable.api.versions.enable", "true"))

Review Comment:
   Can we use `ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG` instead of 
`"unstable.api.versions.enable"`



##########
clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeShareGroupsHandler.java:
##########
@@ -173,8 +177,17 @@ private void handleError(
                 break;
 
             case GROUP_ID_NOT_FOUND:
-                log.error("`DescribeShareGroups` request for group id {} 
failed because the group does not exist.", groupId.idValue);
-                failed.put(groupId, error.exception(errorMsg));
+                // In order to maintain compatibility with 
describeConsumerGroups, an unknown group ID is
+                // reported as a DEAD share group, and the admin client 
operation did not fail
+                log.debug("`DescribeShareGroups` request for group id {} 
failed because the group does not exist. {}",
+                    groupId.idValue, errorMsg != null ? errorMsg : "");
+                final ShareGroupDescription shareGroupDescription =
+                    new ShareGroupDescription(groupId.idValue,
+                        Collections.emptySet(),
+                        ShareGroupState.DEAD,
+                        coordinator,
+                        
validAclOperations(describedGroup.authorizedOperations()));

Review Comment:
   Not related to this change but I can see the logic of `validAclOperations` 
is similar to `DescribeConsumerGroupsHandler`. Can't this method become a 
static or a utils to be called from both handlers?



##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -1948,6 +1948,166 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     }
   }
 
+  @ParameterizedTest
+  @ValueSource(strings = Array("kraft+kip932"))
+  def testShareGroups(quorum: String): Unit = {
+    val config = createConfig
+    client = Admin.create(config)
+    try {
+      // Verify that initially there are no share groups to list.
+      val list1 = client.listShareGroups()
+      assertTrue(0 == list1.all().get().size())
+      assertTrue(0 == list1.errors().get().size())
+      assertTrue(0 == list1.valid().get().size())
+      val testTopicName = "test_topic"
+      val testNumPartitions = 2
+
+      client.createTopics(util.Arrays.asList(

Review Comment:
   we can use `util.Collections.singleton` instead here



##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -1948,6 +1948,166 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     }
   }
 
+  @ParameterizedTest
+  @ValueSource(strings = Array("kraft+kip932"))
+  def testShareGroups(quorum: String): Unit = {
+    val config = createConfig
+    client = Admin.create(config)
+    try {
+      // Verify that initially there are no share groups to list.
+      val list1 = client.listShareGroups()
+      assertTrue(0 == list1.all().get().size())
+      assertTrue(0 == list1.errors().get().size())
+      assertTrue(0 == list1.valid().get().size())
+      val testTopicName = "test_topic"
+      val testNumPartitions = 2
+
+      client.createTopics(util.Arrays.asList(
+        new NewTopic(testTopicName, testNumPartitions, 1.toShort)
+      )).all().get()
+      waitForTopics(client, List(testTopicName), List())
+
+      val producer = createProducer()
+      try {
+        producer.send(new ProducerRecord(testTopicName, 0, null, null)).get()
+      } finally {
+        Utils.closeQuietly(producer, "producer")
+      }
+
+      val testGroupId = "test_group_id"
+      val testClientId = "test_client_id"
+      val fakeGroupId = "fake_group_id"
+
+      def createProperties(): Properties = {
+        val newConsumerConfig = new Properties(consumerConfig)
+        newConsumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, 
testGroupId)
+        newConsumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, 
testClientId)
+        newConsumerConfig
+      }
+
+      val consumerSet = Set(createShareConsumer(configOverrides = 
createProperties()))
+      val topicSet = Set(testTopicName)
+
+      val latch = new CountDownLatch(consumerSet.size)
+      try {
+        def createShareConsumerThread[K,V](consumer: ShareConsumer[K,V], 
topic: String): Thread = {
+          new Thread {
+            override def run : Unit = {
+              consumer.subscribe(Collections.singleton(topic))
+              try {
+                while (true) {
+                  consumer.poll(JDuration.ofSeconds(5))
+                  if (latch.getCount > 0L)
+                    latch.countDown()
+                  consumer.commitSync()
+                }
+              } catch {
+                case _: InterruptException => // Suppress the output to stderr
+              }
+            }
+          }
+        }
+
+        // Start consumers in a thread that will subscribe to a new group.
+        val consumerThreads = consumerSet.zip(topicSet).map(zipped => 
createShareConsumerThread(zipped._1, zipped._2))
+
+        try {
+          consumerThreads.foreach(_.start())
+          assertTrue(latch.await(30000, TimeUnit.MILLISECONDS))
+          // Test that we can list the new group.
+          TestUtils.waitUntilTrue(() => {
+            val matching = client.listShareGroups.all.get.asScala.filter(group 
=>
+              group.groupId == testGroupId &&
+                group.state.get == ShareGroupState.STABLE)
+            matching.size == 1
+          }, s"Expected to be able to list $testGroupId")
+
+          TestUtils.waitUntilTrue(() => {
+            val options = new 
ListShareGroupsOptions().inStates(Set(ShareGroupState.STABLE).asJava)
+            val matching = 
client.listShareGroups(options).all.get.asScala.filter(group =>
+              group.groupId == testGroupId &&
+                group.state.get == ShareGroupState.STABLE)
+            matching.size == 1
+          }, s"Expected to be able to list $testGroupId in state Stable")
+
+          TestUtils.waitUntilTrue(() => {
+            val options = new 
ListShareGroupsOptions().inStates(Set(ShareGroupState.EMPTY).asJava)

Review Comment:
   ditto



##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -1948,6 +1948,166 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     }
   }
 
+  @ParameterizedTest
+  @ValueSource(strings = Array("kraft+kip932"))
+  def testShareGroups(quorum: String): Unit = {
+    val config = createConfig
+    client = Admin.create(config)
+    try {
+      // Verify that initially there are no share groups to list.
+      val list1 = client.listShareGroups()
+      assertTrue(0 == list1.all().get().size())
+      assertTrue(0 == list1.errors().get().size())
+      assertTrue(0 == list1.valid().get().size())
+      val testTopicName = "test_topic"
+      val testNumPartitions = 2
+
+      client.createTopics(util.Arrays.asList(
+        new NewTopic(testTopicName, testNumPartitions, 1.toShort)
+      )).all().get()
+      waitForTopics(client, List(testTopicName), List())
+
+      val producer = createProducer()
+      try {
+        producer.send(new ProducerRecord(testTopicName, 0, null, null)).get()
+      } finally {
+        Utils.closeQuietly(producer, "producer")
+      }
+
+      val testGroupId = "test_group_id"
+      val testClientId = "test_client_id"
+      val fakeGroupId = "fake_group_id"
+
+      def createProperties(): Properties = {
+        val newConsumerConfig = new Properties(consumerConfig)
+        newConsumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, 
testGroupId)
+        newConsumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, 
testClientId)
+        newConsumerConfig
+      }
+
+      val consumerSet = Set(createShareConsumer(configOverrides = 
createProperties()))
+      val topicSet = Set(testTopicName)
+
+      val latch = new CountDownLatch(consumerSet.size)
+      try {
+        def createShareConsumerThread[K,V](consumer: ShareConsumer[K,V], 
topic: String): Thread = {
+          new Thread {
+            override def run : Unit = {
+              consumer.subscribe(Collections.singleton(topic))
+              try {
+                while (true) {
+                  consumer.poll(JDuration.ofSeconds(5))
+                  if (latch.getCount > 0L)
+                    latch.countDown()
+                  consumer.commitSync()
+                }
+              } catch {
+                case _: InterruptException => // Suppress the output to stderr
+              }
+            }
+          }
+        }
+
+        // Start consumers in a thread that will subscribe to a new group.
+        val consumerThreads = consumerSet.zip(topicSet).map(zipped => 
createShareConsumerThread(zipped._1, zipped._2))
+
+        try {
+          consumerThreads.foreach(_.start())
+          assertTrue(latch.await(30000, TimeUnit.MILLISECONDS))
+          // Test that we can list the new group.
+          TestUtils.waitUntilTrue(() => {
+            val matching = client.listShareGroups.all.get.asScala.filter(group 
=>
+              group.groupId == testGroupId &&
+                group.state.get == ShareGroupState.STABLE)
+            matching.size == 1
+          }, s"Expected to be able to list $testGroupId")
+
+          TestUtils.waitUntilTrue(() => {
+            val options = new 
ListShareGroupsOptions().inStates(Set(ShareGroupState.STABLE).asJava)
+            val matching = 
client.listShareGroups(options).all.get.asScala.filter(group =>
+              group.groupId == testGroupId &&
+                group.state.get == ShareGroupState.STABLE)
+            matching.size == 1
+          }, s"Expected to be able to list $testGroupId in state Stable")
+
+          TestUtils.waitUntilTrue(() => {
+            val options = new 
ListShareGroupsOptions().inStates(Set(ShareGroupState.EMPTY).asJava)
+            val matching = 
client.listShareGroups(options).all.get.asScala.filter(
+              _.groupId == testGroupId)
+            matching.isEmpty
+          }, s"Expected to find zero groups")
+
+          val describeWithFakeGroupResult = 
client.describeShareGroups(Seq(testGroupId, fakeGroupId).asJava,
+            new DescribeShareGroupsOptions().includeAuthorizedOperations(true))
+          assertEquals(2, describeWithFakeGroupResult.describedGroups().size())
+
+          // Test that we can get information about the test share group.
+          
assertTrue(describeWithFakeGroupResult.describedGroups().containsKey(testGroupId))
+          assertEquals(2, describeWithFakeGroupResult.describedGroups().size())
+          var testGroupDescription = 
describeWithFakeGroupResult.describedGroups().get(testGroupId).get()
+
+          assertEquals(testGroupId, testGroupDescription.groupId())
+          assertEquals(consumerSet.size, testGroupDescription.members().size())
+          val members = testGroupDescription.members()
+          members.asScala.foreach(member => assertEquals(testClientId, 
member.clientId()))
+          val topicPartitionsByTopic = 
members.asScala.flatMap(_.assignment().topicPartitions().asScala).groupBy(_.topic())
+          topicSet.foreach { topic =>
+            val topicPartitions = topicPartitionsByTopic.getOrElse(topic, 
List.empty)
+            assertEquals(testNumPartitions, topicPartitions.size)
+          }
+
+          val expectedOperations = 
AclEntry.supportedOperations(ResourceType.GROUP)
+          assertEquals(expectedOperations, 
testGroupDescription.authorizedOperations())
+
+          // Test that the fake group is listed as dead.
+          
assertTrue(describeWithFakeGroupResult.describedGroups().containsKey(fakeGroupId))
+          val fakeGroupDescription = 
describeWithFakeGroupResult.describedGroups().get(fakeGroupId).get()
+
+          assertEquals(fakeGroupId, fakeGroupDescription.groupId())
+          assertEquals(0, fakeGroupDescription.members().size())
+          assertEquals(ShareGroupState.DEAD, fakeGroupDescription.state())
+          assertNull(fakeGroupDescription.authorizedOperations())
+
+          // Test that all() returns 2 results
+          assertEquals(2, describeWithFakeGroupResult.all().get().size())
+
+          val describeTestGroupResult = 
client.describeShareGroups(Seq(testGroupId).asJava,

Review Comment:
   ditto use java collection directly 



##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -1948,6 +1948,166 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     }
   }
 
+  @ParameterizedTest
+  @ValueSource(strings = Array("kraft+kip932"))
+  def testShareGroups(quorum: String): Unit = {
+    val config = createConfig
+    client = Admin.create(config)
+    try {
+      // Verify that initially there are no share groups to list.
+      val list1 = client.listShareGroups()
+      assertTrue(0 == list1.all().get().size())

Review Comment:
   why not `assertEquals`?



##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -1948,6 +1948,166 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     }
   }
 
+  @ParameterizedTest
+  @ValueSource(strings = Array("kraft+kip932"))
+  def testShareGroups(quorum: String): Unit = {
+    val config = createConfig
+    client = Admin.create(config)
+    try {
+      // Verify that initially there are no share groups to list.
+      val list1 = client.listShareGroups()
+      assertTrue(0 == list1.all().get().size())
+      assertTrue(0 == list1.errors().get().size())
+      assertTrue(0 == list1.valid().get().size())

Review Comment:
   ditto



##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -1948,6 +1948,166 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     }
   }
 
+  @ParameterizedTest
+  @ValueSource(strings = Array("kraft+kip932"))
+  def testShareGroups(quorum: String): Unit = {
+    val config = createConfig
+    client = Admin.create(config)
+    try {
+      // Verify that initially there are no share groups to list.
+      val list1 = client.listShareGroups()
+      assertTrue(0 == list1.all().get().size())
+      assertTrue(0 == list1.errors().get().size())

Review Comment:
   ditto



##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -1948,6 +1948,166 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     }
   }
 
+  @ParameterizedTest
+  @ValueSource(strings = Array("kraft+kip932"))
+  def testShareGroups(quorum: String): Unit = {
+    val config = createConfig
+    client = Admin.create(config)
+    try {
+      // Verify that initially there are no share groups to list.
+      val list1 = client.listShareGroups()
+      assertTrue(0 == list1.all().get().size())
+      assertTrue(0 == list1.errors().get().size())
+      assertTrue(0 == list1.valid().get().size())
+      val testTopicName = "test_topic"
+      val testNumPartitions = 2
+
+      client.createTopics(util.Arrays.asList(
+        new NewTopic(testTopicName, testNumPartitions, 1.toShort)
+      )).all().get()
+      waitForTopics(client, List(testTopicName), List())
+
+      val producer = createProducer()
+      try {
+        producer.send(new ProducerRecord(testTopicName, 0, null, null)).get()
+      } finally {
+        Utils.closeQuietly(producer, "producer")
+      }
+
+      val testGroupId = "test_group_id"
+      val testClientId = "test_client_id"
+      val fakeGroupId = "fake_group_id"
+
+      def createProperties(): Properties = {
+        val newConsumerConfig = new Properties(consumerConfig)
+        newConsumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, 
testGroupId)
+        newConsumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, 
testClientId)
+        newConsumerConfig
+      }
+
+      val consumerSet = Set(createShareConsumer(configOverrides = 
createProperties()))
+      val topicSet = Set(testTopicName)
+
+      val latch = new CountDownLatch(consumerSet.size)
+      try {
+        def createShareConsumerThread[K,V](consumer: ShareConsumer[K,V], 
topic: String): Thread = {
+          new Thread {
+            override def run : Unit = {
+              consumer.subscribe(Collections.singleton(topic))
+              try {
+                while (true) {
+                  consumer.poll(JDuration.ofSeconds(5))
+                  if (latch.getCount > 0L)
+                    latch.countDown()
+                  consumer.commitSync()
+                }
+              } catch {
+                case _: InterruptException => // Suppress the output to stderr
+              }
+            }
+          }
+        }
+
+        // Start consumers in a thread that will subscribe to a new group.
+        val consumerThreads = consumerSet.zip(topicSet).map(zipped => 
createShareConsumerThread(zipped._1, zipped._2))
+
+        try {
+          consumerThreads.foreach(_.start())
+          assertTrue(latch.await(30000, TimeUnit.MILLISECONDS))
+          // Test that we can list the new group.
+          TestUtils.waitUntilTrue(() => {
+            val matching = client.listShareGroups.all.get.asScala.filter(group 
=>

Review Comment:
   This can be refactored to without need to convert Java collections to scala 
   ```
             TestUtils.waitUntilTrue(() => {
               client.listShareGroups.all.get.stream().filter(group =>
                 group.groupId == testGroupId && group.state.get == 
ShareGroupState.STABLE).count() == 1 
             }, s"Expected to be able to list $testGroupId")
             ```
             



##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -1948,6 +1948,166 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     }
   }
 
+  @ParameterizedTest
+  @ValueSource(strings = Array("kraft+kip932"))
+  def testShareGroups(quorum: String): Unit = {
+    val config = createConfig
+    client = Admin.create(config)
+    try {
+      // Verify that initially there are no share groups to list.
+      val list1 = client.listShareGroups()
+      assertTrue(0 == list1.all().get().size())
+      assertTrue(0 == list1.errors().get().size())
+      assertTrue(0 == list1.valid().get().size())
+      val testTopicName = "test_topic"
+      val testNumPartitions = 2
+
+      client.createTopics(util.Arrays.asList(
+        new NewTopic(testTopicName, testNumPartitions, 1.toShort)
+      )).all().get()
+      waitForTopics(client, List(testTopicName), List())
+
+      val producer = createProducer()
+      try {
+        producer.send(new ProducerRecord(testTopicName, 0, null, null)).get()
+      } finally {
+        Utils.closeQuietly(producer, "producer")
+      }
+
+      val testGroupId = "test_group_id"
+      val testClientId = "test_client_id"
+      val fakeGroupId = "fake_group_id"
+
+      def createProperties(): Properties = {
+        val newConsumerConfig = new Properties(consumerConfig)
+        newConsumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, 
testGroupId)
+        newConsumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, 
testClientId)
+        newConsumerConfig
+      }
+
+      val consumerSet = Set(createShareConsumer(configOverrides = 
createProperties()))
+      val topicSet = Set(testTopicName)
+
+      val latch = new CountDownLatch(consumerSet.size)
+      try {
+        def createShareConsumerThread[K,V](consumer: ShareConsumer[K,V], 
topic: String): Thread = {
+          new Thread {
+            override def run : Unit = {
+              consumer.subscribe(Collections.singleton(topic))
+              try {
+                while (true) {
+                  consumer.poll(JDuration.ofSeconds(5))
+                  if (latch.getCount > 0L)
+                    latch.countDown()
+                  consumer.commitSync()
+                }
+              } catch {
+                case _: InterruptException => // Suppress the output to stderr
+              }
+            }
+          }
+        }
+
+        // Start consumers in a thread that will subscribe to a new group.
+        val consumerThreads = consumerSet.zip(topicSet).map(zipped => 
createShareConsumerThread(zipped._1, zipped._2))
+
+        try {
+          consumerThreads.foreach(_.start())
+          assertTrue(latch.await(30000, TimeUnit.MILLISECONDS))
+          // Test that we can list the new group.
+          TestUtils.waitUntilTrue(() => {
+            val matching = client.listShareGroups.all.get.asScala.filter(group 
=>
+              group.groupId == testGroupId &&
+                group.state.get == ShareGroupState.STABLE)
+            matching.size == 1
+          }, s"Expected to be able to list $testGroupId")
+
+          TestUtils.waitUntilTrue(() => {
+            val options = new 
ListShareGroupsOptions().inStates(Set(ShareGroupState.STABLE).asJava)
+            val matching = 
client.listShareGroups(options).all.get.asScala.filter(group =>
+              group.groupId == testGroupId &&
+                group.state.get == ShareGroupState.STABLE)
+            matching.size == 1
+          }, s"Expected to be able to list $testGroupId in state Stable")
+
+          TestUtils.waitUntilTrue(() => {
+            val options = new 
ListShareGroupsOptions().inStates(Set(ShareGroupState.EMPTY).asJava)
+            val matching = 
client.listShareGroups(options).all.get.asScala.filter(
+              _.groupId == testGroupId)
+            matching.isEmpty
+          }, s"Expected to find zero groups")
+
+          val describeWithFakeGroupResult = 
client.describeShareGroups(Seq(testGroupId, fakeGroupId).asJava,
+            new DescribeShareGroupsOptions().includeAuthorizedOperations(true))
+          assertEquals(2, describeWithFakeGroupResult.describedGroups().size())
+
+          // Test that we can get information about the test share group.
+          
assertTrue(describeWithFakeGroupResult.describedGroups().containsKey(testGroupId))
+          assertEquals(2, describeWithFakeGroupResult.describedGroups().size())
+          var testGroupDescription = 
describeWithFakeGroupResult.describedGroups().get(testGroupId).get()
+
+          assertEquals(testGroupId, testGroupDescription.groupId())
+          assertEquals(consumerSet.size, testGroupDescription.members().size())
+          val members = testGroupDescription.members()
+          members.asScala.foreach(member => assertEquals(testClientId, 
member.clientId()))

Review Comment:
   no need to convert just use `members.forEach`



##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -1948,6 +1948,166 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     }
   }
 
+  @ParameterizedTest
+  @ValueSource(strings = Array("kraft+kip932"))
+  def testShareGroups(quorum: String): Unit = {
+    val config = createConfig
+    client = Admin.create(config)
+    try {
+      // Verify that initially there are no share groups to list.
+      val list1 = client.listShareGroups()
+      assertTrue(0 == list1.all().get().size())
+      assertTrue(0 == list1.errors().get().size())
+      assertTrue(0 == list1.valid().get().size())
+      val testTopicName = "test_topic"
+      val testNumPartitions = 2
+
+      client.createTopics(util.Arrays.asList(
+        new NewTopic(testTopicName, testNumPartitions, 1.toShort)
+      )).all().get()
+      waitForTopics(client, List(testTopicName), List())
+
+      val producer = createProducer()
+      try {
+        producer.send(new ProducerRecord(testTopicName, 0, null, null)).get()
+      } finally {
+        Utils.closeQuietly(producer, "producer")
+      }
+
+      val testGroupId = "test_group_id"
+      val testClientId = "test_client_id"
+      val fakeGroupId = "fake_group_id"
+
+      def createProperties(): Properties = {
+        val newConsumerConfig = new Properties(consumerConfig)
+        newConsumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, 
testGroupId)
+        newConsumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, 
testClientId)
+        newConsumerConfig
+      }
+
+      val consumerSet = Set(createShareConsumer(configOverrides = 
createProperties()))
+      val topicSet = Set(testTopicName)
+
+      val latch = new CountDownLatch(consumerSet.size)
+      try {
+        def createShareConsumerThread[K,V](consumer: ShareConsumer[K,V], 
topic: String): Thread = {
+          new Thread {
+            override def run : Unit = {
+              consumer.subscribe(Collections.singleton(topic))
+              try {
+                while (true) {
+                  consumer.poll(JDuration.ofSeconds(5))
+                  if (latch.getCount > 0L)
+                    latch.countDown()
+                  consumer.commitSync()
+                }
+              } catch {
+                case _: InterruptException => // Suppress the output to stderr
+              }
+            }
+          }
+        }
+
+        // Start consumers in a thread that will subscribe to a new group.
+        val consumerThreads = consumerSet.zip(topicSet).map(zipped => 
createShareConsumerThread(zipped._1, zipped._2))
+
+        try {
+          consumerThreads.foreach(_.start())
+          assertTrue(latch.await(30000, TimeUnit.MILLISECONDS))
+          // Test that we can list the new group.
+          TestUtils.waitUntilTrue(() => {
+            val matching = client.listShareGroups.all.get.asScala.filter(group 
=>
+              group.groupId == testGroupId &&
+                group.state.get == ShareGroupState.STABLE)
+            matching.size == 1
+          }, s"Expected to be able to list $testGroupId")
+
+          TestUtils.waitUntilTrue(() => {
+            val options = new 
ListShareGroupsOptions().inStates(Set(ShareGroupState.STABLE).asJava)
+            val matching = 
client.listShareGroups(options).all.get.asScala.filter(group =>
+              group.groupId == testGroupId &&
+                group.state.get == ShareGroupState.STABLE)
+            matching.size == 1
+          }, s"Expected to be able to list $testGroupId in state Stable")
+
+          TestUtils.waitUntilTrue(() => {
+            val options = new 
ListShareGroupsOptions().inStates(Set(ShareGroupState.EMPTY).asJava)
+            val matching = 
client.listShareGroups(options).all.get.asScala.filter(
+              _.groupId == testGroupId)
+            matching.isEmpty
+          }, s"Expected to find zero groups")
+
+          val describeWithFakeGroupResult = 
client.describeShareGroups(Seq(testGroupId, fakeGroupId).asJava,

Review Comment:
   ditto use java collections directly 



##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -1948,6 +1948,166 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     }
   }
 
+  @ParameterizedTest
+  @ValueSource(strings = Array("kraft+kip932"))
+  def testShareGroups(quorum: String): Unit = {
+    val config = createConfig
+    client = Admin.create(config)
+    try {
+      // Verify that initially there are no share groups to list.
+      val list1 = client.listShareGroups()
+      assertTrue(0 == list1.all().get().size())
+      assertTrue(0 == list1.errors().get().size())
+      assertTrue(0 == list1.valid().get().size())
+      val testTopicName = "test_topic"
+      val testNumPartitions = 2
+
+      client.createTopics(util.Arrays.asList(
+        new NewTopic(testTopicName, testNumPartitions, 1.toShort)
+      )).all().get()
+      waitForTopics(client, List(testTopicName), List())
+
+      val producer = createProducer()
+      try {
+        producer.send(new ProducerRecord(testTopicName, 0, null, null)).get()
+      } finally {
+        Utils.closeQuietly(producer, "producer")
+      }
+
+      val testGroupId = "test_group_id"
+      val testClientId = "test_client_id"
+      val fakeGroupId = "fake_group_id"
+
+      def createProperties(): Properties = {
+        val newConsumerConfig = new Properties(consumerConfig)
+        newConsumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, 
testGroupId)
+        newConsumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, 
testClientId)
+        newConsumerConfig
+      }
+
+      val consumerSet = Set(createShareConsumer(configOverrides = 
createProperties()))
+      val topicSet = Set(testTopicName)
+
+      val latch = new CountDownLatch(consumerSet.size)
+      try {
+        def createShareConsumerThread[K,V](consumer: ShareConsumer[K,V], 
topic: String): Thread = {
+          new Thread {
+            override def run : Unit = {
+              consumer.subscribe(Collections.singleton(topic))
+              try {
+                while (true) {
+                  consumer.poll(JDuration.ofSeconds(5))
+                  if (latch.getCount > 0L)
+                    latch.countDown()
+                  consumer.commitSync()
+                }
+              } catch {
+                case _: InterruptException => // Suppress the output to stderr
+              }
+            }
+          }
+        }
+
+        // Start consumers in a thread that will subscribe to a new group.
+        val consumerThreads = consumerSet.zip(topicSet).map(zipped => 
createShareConsumerThread(zipped._1, zipped._2))
+
+        try {
+          consumerThreads.foreach(_.start())
+          assertTrue(latch.await(30000, TimeUnit.MILLISECONDS))
+          // Test that we can list the new group.
+          TestUtils.waitUntilTrue(() => {
+            val matching = client.listShareGroups.all.get.asScala.filter(group 
=>
+              group.groupId == testGroupId &&
+                group.state.get == ShareGroupState.STABLE)
+            matching.size == 1
+          }, s"Expected to be able to list $testGroupId")
+
+          TestUtils.waitUntilTrue(() => {
+            val options = new 
ListShareGroupsOptions().inStates(Set(ShareGroupState.STABLE).asJava)
+            val matching = 
client.listShareGroups(options).all.get.asScala.filter(group =>
+              group.groupId == testGroupId &&
+                group.state.get == ShareGroupState.STABLE)
+            matching.size == 1
+          }, s"Expected to be able to list $testGroupId in state Stable")
+
+          TestUtils.waitUntilTrue(() => {
+            val options = new 
ListShareGroupsOptions().inStates(Set(ShareGroupState.EMPTY).asJava)
+            val matching = 
client.listShareGroups(options).all.get.asScala.filter(
+              _.groupId == testGroupId)
+            matching.isEmpty
+          }, s"Expected to find zero groups")
+
+          val describeWithFakeGroupResult = 
client.describeShareGroups(Seq(testGroupId, fakeGroupId).asJava,
+            new DescribeShareGroupsOptions().includeAuthorizedOperations(true))
+          assertEquals(2, describeWithFakeGroupResult.describedGroups().size())
+
+          // Test that we can get information about the test share group.
+          
assertTrue(describeWithFakeGroupResult.describedGroups().containsKey(testGroupId))
+          assertEquals(2, describeWithFakeGroupResult.describedGroups().size())
+          var testGroupDescription = 
describeWithFakeGroupResult.describedGroups().get(testGroupId).get()
+
+          assertEquals(testGroupId, testGroupDescription.groupId())
+          assertEquals(consumerSet.size, testGroupDescription.members().size())
+          val members = testGroupDescription.members()
+          members.asScala.foreach(member => assertEquals(testClientId, 
member.clientId()))
+          val topicPartitionsByTopic = 
members.asScala.flatMap(_.assignment().topicPartitions().asScala).groupBy(_.topic())
+          topicSet.foreach { topic =>
+            val topicPartitions = topicPartitionsByTopic.getOrElse(topic, 
List.empty)
+            assertEquals(testNumPartitions, topicPartitions.size)
+          }
+
+          val expectedOperations = 
AclEntry.supportedOperations(ResourceType.GROUP)
+          assertEquals(expectedOperations, 
testGroupDescription.authorizedOperations())
+
+          // Test that the fake group is listed as dead.
+          
assertTrue(describeWithFakeGroupResult.describedGroups().containsKey(fakeGroupId))
+          val fakeGroupDescription = 
describeWithFakeGroupResult.describedGroups().get(fakeGroupId).get()
+
+          assertEquals(fakeGroupId, fakeGroupDescription.groupId())
+          assertEquals(0, fakeGroupDescription.members().size())
+          assertEquals(ShareGroupState.DEAD, fakeGroupDescription.state())
+          assertNull(fakeGroupDescription.authorizedOperations())
+
+          // Test that all() returns 2 results
+          assertEquals(2, describeWithFakeGroupResult.all().get().size())
+
+          val describeTestGroupResult = 
client.describeShareGroups(Seq(testGroupId).asJava,
+            new DescribeShareGroupsOptions().includeAuthorizedOperations(true))
+          assertEquals(1, describeTestGroupResult.all().get().size())
+          assertEquals(1, describeTestGroupResult.describedGroups().size())
+
+          testGroupDescription = 
describeTestGroupResult.describedGroups().get(testGroupId).get()
+
+          assertEquals(testGroupId, testGroupDescription.groupId)
+          assertEquals(consumerSet.size, testGroupDescription.members().size())
+
+          // Describing a share group using describeConsumerGroups reports it 
as a DEAD consumer group
+          // in the same way as a non-existent group
+          val describeConsumerGroupResult = 
client.describeConsumerGroups(Seq(testGroupId).asJava,

Review Comment:
   ditto



##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -1948,6 +1948,166 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     }
   }
 
+  @ParameterizedTest
+  @ValueSource(strings = Array("kraft+kip932"))
+  def testShareGroups(quorum: String): Unit = {
+    val config = createConfig
+    client = Admin.create(config)
+    try {
+      // Verify that initially there are no share groups to list.
+      val list1 = client.listShareGroups()
+      assertTrue(0 == list1.all().get().size())
+      assertTrue(0 == list1.errors().get().size())
+      assertTrue(0 == list1.valid().get().size())
+      val testTopicName = "test_topic"
+      val testNumPartitions = 2
+
+      client.createTopics(util.Arrays.asList(
+        new NewTopic(testTopicName, testNumPartitions, 1.toShort)
+      )).all().get()
+      waitForTopics(client, List(testTopicName), List())
+
+      val producer = createProducer()
+      try {
+        producer.send(new ProducerRecord(testTopicName, 0, null, null)).get()
+      } finally {
+        Utils.closeQuietly(producer, "producer")
+      }
+
+      val testGroupId = "test_group_id"
+      val testClientId = "test_client_id"
+      val fakeGroupId = "fake_group_id"
+
+      def createProperties(): Properties = {
+        val newConsumerConfig = new Properties(consumerConfig)
+        newConsumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, 
testGroupId)
+        newConsumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, 
testClientId)
+        newConsumerConfig
+      }
+
+      val consumerSet = Set(createShareConsumer(configOverrides = 
createProperties()))
+      val topicSet = Set(testTopicName)
+
+      val latch = new CountDownLatch(consumerSet.size)
+      try {
+        def createShareConsumerThread[K,V](consumer: ShareConsumer[K,V], 
topic: String): Thread = {
+          new Thread {
+            override def run : Unit = {
+              consumer.subscribe(Collections.singleton(topic))
+              try {
+                while (true) {
+                  consumer.poll(JDuration.ofSeconds(5))
+                  if (latch.getCount > 0L)
+                    latch.countDown()
+                  consumer.commitSync()
+                }
+              } catch {
+                case _: InterruptException => // Suppress the output to stderr
+              }
+            }
+          }
+        }
+
+        // Start consumers in a thread that will subscribe to a new group.
+        val consumerThreads = consumerSet.zip(topicSet).map(zipped => 
createShareConsumerThread(zipped._1, zipped._2))
+
+        try {
+          consumerThreads.foreach(_.start())
+          assertTrue(latch.await(30000, TimeUnit.MILLISECONDS))
+          // Test that we can list the new group.
+          TestUtils.waitUntilTrue(() => {
+            val matching = client.listShareGroups.all.get.asScala.filter(group 
=>
+              group.groupId == testGroupId &&
+                group.state.get == ShareGroupState.STABLE)
+            matching.size == 1
+          }, s"Expected to be able to list $testGroupId")
+
+          TestUtils.waitUntilTrue(() => {
+            val options = new 
ListShareGroupsOptions().inStates(Set(ShareGroupState.STABLE).asJava)
+            val matching = 
client.listShareGroups(options).all.get.asScala.filter(group =>
+              group.groupId == testGroupId &&
+                group.state.get == ShareGroupState.STABLE)
+            matching.size == 1
+          }, s"Expected to be able to list $testGroupId in state Stable")
+
+          TestUtils.waitUntilTrue(() => {
+            val options = new 
ListShareGroupsOptions().inStates(Set(ShareGroupState.EMPTY).asJava)
+            val matching = 
client.listShareGroups(options).all.get.asScala.filter(

Review Comment:
   ditto this can be 
   ```
               client.listShareGroups(options).all.get.stream().filter(
                 _.groupId == testGroupId).count() == 0
                 ```



##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -1948,6 +1948,166 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     }
   }
 
+  @ParameterizedTest
+  @ValueSource(strings = Array("kraft+kip932"))
+  def testShareGroups(quorum: String): Unit = {
+    val config = createConfig
+    client = Admin.create(config)
+    try {
+      // Verify that initially there are no share groups to list.
+      val list1 = client.listShareGroups()
+      assertTrue(0 == list1.all().get().size())
+      assertTrue(0 == list1.errors().get().size())
+      assertTrue(0 == list1.valid().get().size())
+      val testTopicName = "test_topic"
+      val testNumPartitions = 2
+
+      client.createTopics(util.Arrays.asList(
+        new NewTopic(testTopicName, testNumPartitions, 1.toShort)
+      )).all().get()
+      waitForTopics(client, List(testTopicName), List())
+
+      val producer = createProducer()
+      try {
+        producer.send(new ProducerRecord(testTopicName, 0, null, null)).get()
+      } finally {
+        Utils.closeQuietly(producer, "producer")
+      }
+
+      val testGroupId = "test_group_id"
+      val testClientId = "test_client_id"
+      val fakeGroupId = "fake_group_id"
+
+      def createProperties(): Properties = {
+        val newConsumerConfig = new Properties(consumerConfig)
+        newConsumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, 
testGroupId)
+        newConsumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, 
testClientId)
+        newConsumerConfig
+      }
+
+      val consumerSet = Set(createShareConsumer(configOverrides = 
createProperties()))
+      val topicSet = Set(testTopicName)
+
+      val latch = new CountDownLatch(consumerSet.size)
+      try {
+        def createShareConsumerThread[K,V](consumer: ShareConsumer[K,V], 
topic: String): Thread = {
+          new Thread {
+            override def run : Unit = {
+              consumer.subscribe(Collections.singleton(topic))
+              try {
+                while (true) {
+                  consumer.poll(JDuration.ofSeconds(5))
+                  if (latch.getCount > 0L)
+                    latch.countDown()
+                  consumer.commitSync()
+                }
+              } catch {
+                case _: InterruptException => // Suppress the output to stderr
+              }
+            }
+          }
+        }
+
+        // Start consumers in a thread that will subscribe to a new group.
+        val consumerThreads = consumerSet.zip(topicSet).map(zipped => 
createShareConsumerThread(zipped._1, zipped._2))
+
+        try {
+          consumerThreads.foreach(_.start())
+          assertTrue(latch.await(30000, TimeUnit.MILLISECONDS))
+          // Test that we can list the new group.
+          TestUtils.waitUntilTrue(() => {
+            val matching = client.listShareGroups.all.get.asScala.filter(group 
=>
+              group.groupId == testGroupId &&
+                group.state.get == ShareGroupState.STABLE)
+            matching.size == 1
+          }, s"Expected to be able to list $testGroupId")
+
+          TestUtils.waitUntilTrue(() => {
+            val options = new 
ListShareGroupsOptions().inStates(Set(ShareGroupState.STABLE).asJava)

Review Comment:
   can we use `Collections.singleton` instead of converting scala to java?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to