dajac commented on code in PR #15150:
URL: https://github.com/apache/kafka/pull/15150#discussion_r1484046178


##########
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java:
##########
@@ -2811,6 +2811,72 @@ public void testListConsumerGroupsWithStates() throws 
Exception {
         }
     }
 
+    @Test
+    public void testListConsumerGroupsWithTypes() throws Exception {
+        try (AdminClientUnitTestEnv env = new 
AdminClientUnitTestEnv(mockCluster(1, 0))) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+
+            // Test with a specific state filter but no type filter in list 
consumer group options.
+            
env.kafkaClient().prepareResponse(prepareMetadataResponse(env.cluster(), 
Errors.NONE));
+
+            env.kafkaClient().prepareResponseFrom(
+                request -> request instanceof ListGroupsRequest &&
+                    !((ListGroupsRequest) 
request).data().statesFilter().isEmpty() &&
+                    ((ListGroupsRequest) 
request).data().typesFilter().isEmpty(),

Review Comment:
   nit: It would be better to be precise here and validate if the states are 
actually the correct one. This also applies to the other matchers. 
`expectCreateTopicsRequestWithTopics` may be a good inspiration for the 
structure.



##########
core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala:
##########
@@ -18,74 +18,196 @@ package kafka.admin
 
 import joptsimple.OptionException
 import org.junit.jupiter.api.Assertions._
-import kafka.utils.TestUtils
-import org.apache.kafka.common.ConsumerGroupState
+import kafka.utils.{TestInfoUtils, TestUtils}
 import org.apache.kafka.clients.admin.ConsumerGroupListing
-import java.util.Optional
-
+import org.apache.kafka.common.{ConsumerGroupState, GroupType}
+import org.junit.jupiter.api.Test
 import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.ValueSource
+import org.junit.jupiter.params.provider.MethodSource
+
+import java.util.Optional
 
 class ListConsumerGroupTest extends ConsumerGroupCommandTest {
 
-  @ParameterizedTest
-  @ValueSource(strings = Array("zk", "kraft"))
-  def testListConsumerGroups(quorum: String): Unit = {
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testListConsumerGroupsWithoutFilters(quorum: String, groupProtocol: 
String): Unit = {
     val simpleGroup = "simple-group"
+    val protocolGroup = "protocol-group"
+
+    createOffsetsTopic()
+
     addSimpleGroupExecutor(group = simpleGroup)
     addConsumerGroupExecutor(numConsumers = 1)
+    addConsumerGroupExecutor(numConsumers = 1, group = protocolGroup, 
groupProtocol = groupProtocol)
 
     val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--list")
     val service = getConsumerGroupService(cgcArgs)
 
-    val expectedGroups = Set(group, simpleGroup)
+    val expectedGroups = Set(protocolGroup, group, simpleGroup)
     var foundGroups = Set.empty[String]
     TestUtils.waitUntilTrue(() => {
       foundGroups = service.listConsumerGroups().toSet
       expectedGroups == foundGroups
     }, s"Expected --list to show groups $expectedGroups, but found 
$foundGroups.")
   }
 
-  @ParameterizedTest
-  @ValueSource(strings = Array("zk", "kraft"))
+  @Test
   def testListWithUnrecognizedNewConsumerOption(): Unit = {
     val cgcArgs = Array("--new-consumer", "--bootstrap-server", 
bootstrapServers(), "--list")
     assertThrows(classOf[OptionException], () => 
getConsumerGroupService(cgcArgs))
   }
 
-  @ParameterizedTest
-  @ValueSource(strings = Array("zk", "kraft"))
-  def testListConsumerGroupsWithStates(): Unit = {
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testListConsumerGroupsWithStates(quorum: String, groupProtocol: String): 
Unit = {
     val simpleGroup = "simple-group"
     addSimpleGroupExecutor(group = simpleGroup)
     addConsumerGroupExecutor(numConsumers = 1)
 
     val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--list", 
"--state")
     val service = getConsumerGroupService(cgcArgs)
 
+    var expectedListing = Set(
+      new ConsumerGroupListing(
+        simpleGroup,
+        true,
+        Optional.of(ConsumerGroupState.EMPTY),
+        Optional.of(GroupType.CLASSIC)
+      ),
+      new ConsumerGroupListing(
+        group,
+        false,
+        Optional.of(ConsumerGroupState.STABLE),
+        Optional.of(GroupType.CLASSIC)
+      )
+    )
+
+    testWaitUntilTrue(Set.empty, ConsumerGroupState.values.toSet, 
expectedListing, service)

Review Comment:
   I like it but the name could be better. How about `assertGroupListing` or 
something like this?



##########
core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala:
##########
@@ -189,16 +197,65 @@ object ConsumerGroupCommand extends Logging {
     }
 
     def listGroups(): Unit = {
-      if (opts.options.has(opts.stateOpt)) {
-        val stateValue = opts.options.valueOf(opts.stateOpt)
-        val states = if (stateValue == null || stateValue.isEmpty)
-          Set[ConsumerGroupState]()
-        else
-          consumerGroupStatesFromString(stateValue)
-        val listings = listConsumerGroupsWithState(states)
-        printGroupStates(listings.map(e => (e.groupId, e.state.get.toString)))
-      } else
+      val includeType = opts.options.has(opts.typeOpt)
+      val includeState = opts.options.has(opts.stateOpt)
+
+      if (includeType || includeState) {
+        val types = typeValues()
+        val states = stateValues()
+        val listings = listConsumerGroupsWithFilters(types, states)
+
+        printGroupInfo(listings, includeType, includeState)
+
+      } else {
         listConsumerGroups().foreach(println(_))
+      }
+    }
+
+    private def stateValues(): Set[ConsumerGroupState] = {
+      val stateValue = opts.options.valueOf(opts.stateOpt)
+      if (stateValue == null || stateValue.isEmpty)
+        Set[ConsumerGroupState]()
+      else
+        consumerGroupStatesFromString(stateValue)
+    }
+
+    private def typeValues(): Set[GroupType] = {
+      val typeValue = opts.options.valueOf(opts.typeOpt)
+      if (typeValue == null || typeValue.isEmpty)
+        Set[GroupType]()
+      else
+        consumerGroupTypesFromString(typeValue)
+    }
+
+    private def printGroupInfo(groups: List[ConsumerGroupListing], 
includeType: Boolean, includeState: Boolean): Unit = {
+      def groupId(groupListing: ConsumerGroupListing): String = 
groupListing.groupId
+      def groupType(groupListing: ConsumerGroupListing): String = 
groupListing.`type`().orElse(GroupType.UNKNOWN).toString
+      def groupState(groupListing: ConsumerGroupListing): String = 
groupListing.state.orElse(ConsumerGroupState.UNKNOWN).toString
+
+      val maxGroupLen = groups.foldLeft(15)((maxLen, groupListing) => 
Math.max(maxLen, groupId(groupListing).length)) + 10

Review Comment:
   Makes sense.



##########
core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala:
##########
@@ -111,37 +233,138 @@ class ListConsumerGroupTest extends 
ConsumerGroupCommandTest {
     assertThrows(classOf[IllegalArgumentException], () => 
ConsumerGroupCommand.consumerGroupStatesFromString("   ,   ,"))
   }
 
-  @ParameterizedTest
-  @ValueSource(strings = Array("zk", "kraft"))
-  def testListGroupCommand(quorum: String): Unit = {
+  @Test
+  def testConsumerGroupTypesFromString(): Unit = {
+    var result = ConsumerGroupCommand.consumerGroupTypesFromString("consumer")
+    assertEquals(Set(GroupType.CONSUMER), result)
+
+    result = ConsumerGroupCommand.consumerGroupTypesFromString("consumer, 
classic")
+    assertEquals(Set(GroupType.CONSUMER, GroupType.CLASSIC), result)
+
+    result = ConsumerGroupCommand.consumerGroupTypesFromString("Consumer, 
Classic")
+    assertEquals(Set(GroupType.CONSUMER, GroupType.CLASSIC), result)
+
+    assertThrows(classOf[IllegalArgumentException], () => 
ConsumerGroupCommand.consumerGroupTypesFromString("bad, wrong"))
+
+    assertThrows(classOf[IllegalArgumentException], () => 
ConsumerGroupCommand.consumerGroupTypesFromString("  bad, generic"))
+
+    assertThrows(classOf[IllegalArgumentException], () => 
ConsumerGroupCommand.consumerGroupTypesFromString("   ,   ,"))
+  }
+
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly"))
+  def testListGroupCommandClassicProtocol(quorum: String, groupProtocol: 
String): Unit = {
     val simpleGroup = "simple-group"
+    val protocolGroup = "protocol-group"
+
     addSimpleGroupExecutor(group = simpleGroup)
     addConsumerGroupExecutor(numConsumers = 1)
+    addConsumerGroupExecutor(numConsumers = 1, group = protocolGroup, 
groupProtocol = groupProtocol)
     var out = ""
 
     var cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--list")
     TestUtils.waitUntilTrue(() => {
       out = TestUtils.grabConsoleOutput(ConsumerGroupCommand.main(cgcArgs))
-      !out.contains("STATE") && out.contains(simpleGroup) && 
out.contains(group)
-    }, s"Expected to find $simpleGroup, $group and no header, but found $out")
+      !out.contains("STATE") && out.contains(simpleGroup) && 
out.contains(group) && out.contains(protocolGroup)
+    }, s"Expected to find $simpleGroup, $group, $protocolGroup and no header, 
but found $out")
 
     cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--list", 
"--state")
     TestUtils.waitUntilTrue(() => {
       out = TestUtils.grabConsoleOutput(ConsumerGroupCommand.main(cgcArgs))
-      out.contains("STATE") && out.contains(simpleGroup) && out.contains(group)
-    }, s"Expected to find $simpleGroup, $group and the header, but found $out")
+      out.contains("STATE") && !out.contains("TYPE") && 
out.contains(simpleGroup) && out.contains(group) && out.contains(protocolGroup)
+    }, s"Expected to find $simpleGroup, $group, $protocolGroup and the header, 
but found $out")
+
+    cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--list", 
"--type")
+    TestUtils.waitUntilTrue(() => {
+      out = TestUtils.grabConsoleOutput(ConsumerGroupCommand.main(cgcArgs))
+      out.contains("TYPE") && !out.contains("STATE") && 
out.contains(simpleGroup) && out.contains(group) && out.contains(protocolGroup)
+    }, s"Expected to find $simpleGroup, $group, $protocolGroup and the header, 
but found $out")
+
+    cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--list", 
"--state", "--type")
+    TestUtils.waitUntilTrue(() => {
+      out = TestUtils.grabConsoleOutput(ConsumerGroupCommand.main(cgcArgs))
+      out.contains("TYPE") && out.contains("STATE") && 
out.contains(simpleGroup) && out.contains(group) && out.contains(protocolGroup)
+    }, s"Expected to find $simpleGroup, $group, $protocolGroup and the header, 
but found $out")
 
     cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--list", 
"--state", "Stable")
     TestUtils.waitUntilTrue(() => {
       out = TestUtils.grabConsoleOutput(ConsumerGroupCommand.main(cgcArgs))
-      out.contains("STATE") && out.contains(group) && out.contains("Stable")
-    }, s"Expected to find $group in state Stable and the header, but found 
$out")
+      out.contains("STATE") && out.contains(group) && out.contains("Stable") 
&& out.contains(protocolGroup)
+    }, s"Expected to find $group, $protocolGroup in state Stable and the 
header, but found $out")
 
     cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--list", 
"--state", "stable")
     TestUtils.waitUntilTrue(() => {
       out = TestUtils.grabConsoleOutput(ConsumerGroupCommand.main(cgcArgs))
-      out.contains("STATE") && out.contains(group) && out.contains("Stable")
-    }, s"Expected to find $group in state Stable and the header, but found 
$out")
+      out.contains("STATE") && out.contains(group) && out.contains("Stable") 
&& out.contains(protocolGroup)
+    }, s"Expected to find $group, $protocolGroup in state Stable and the 
header, but found $out")
+
+    cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--list", 
"--type", "Classic")
+    TestUtils.waitUntilTrue(() => {
+      out = TestUtils.grabConsoleOutput(ConsumerGroupCommand.main(cgcArgs))
+      out.contains("TYPE") && out.contains("Classic") && 
!out.contains("STATE") && out.contains(simpleGroup) && out.contains(group) && 
out.contains(protocolGroup)
+    }, s"Expected to find $simpleGroup, $group, $protocolGroup and the header, 
but found $out")
+
+    cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--list", 
"--type", "classic")
+    TestUtils.waitUntilTrue(() => {
+      out = TestUtils.grabConsoleOutput(ConsumerGroupCommand.main(cgcArgs))
+      out.contains("TYPE") && out.contains("Classic") && 
!out.contains("STATE") && out.contains(simpleGroup) && out.contains(group) && 
out.contains(protocolGroup)
+    }, s"Expected to find $simpleGroup, $group, $protocolGroup  and the 
header, but found $out")
+  }
+
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly"))
+  def testListGroupCommandConsumerProtocol(quorum: String, groupProtocol: 
String): Unit = {
+    val simpleGroup = "simple-group"
+    val protocolGroup = "protocol-group"
+
+    createOffsetsTopic()
+
+    addSimpleGroupExecutor(group = simpleGroup)
+    addConsumerGroupExecutor(numConsumers = 1)
+    addConsumerGroupExecutor(numConsumers = 1, group = protocolGroup, 
groupProtocol = groupProtocol)
+    var out = ""
+
+    var cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--list")
+    TestUtils.waitUntilTrue(() => {
+      out = TestUtils.grabConsoleOutput(ConsumerGroupCommand.main(cgcArgs))
+      !out.contains("STATE") && out.contains(simpleGroup) && 
out.contains(group) && out.contains(protocolGroup)
+    }, s"Expected to find $simpleGroup, $group, $protocolGroup and no header, 
but found $out")
+
+    cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--list", 
"--state")
+    TestUtils.waitUntilTrue(() => {
+      out = TestUtils.grabConsoleOutput(ConsumerGroupCommand.main(cgcArgs))
+      out.contains("STATE") && !out.contains("TYPE") && 
out.contains(simpleGroup) && out.contains(group) && out.contains(protocolGroup)
+    }, s"Expected to find $simpleGroup, $group, $protocolGroup and the header, 
but found $out")
+
+    cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--list", 
"--type")
+    TestUtils.waitUntilTrue(() => {
+      out = TestUtils.grabConsoleOutput(ConsumerGroupCommand.main(cgcArgs))
+      out.contains("TYPE") && out.contains("Consumer") && 
!out.contains("STATE") && out.contains(protocolGroup) && 
out.contains(simpleGroup) && out.contains(group)
+    }, s"Expected to find $protocolGroup and the header, but found $out")
+
+    cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--list", 
"--type", "consumer")
+    TestUtils.waitUntilTrue(() => {
+      out = TestUtils.grabConsoleOutput(ConsumerGroupCommand.main(cgcArgs))
+      out.contains("TYPE") && out.contains("Consumer") && 
!out.contains("STATE") && out.contains(protocolGroup)
+    }, s"Expected to find $protocolGroup and the header, but found $out")
+
+    cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--list", 
"--type", "consumer", "--state", "Stable")
+    TestUtils.waitUntilTrue(() => {
+      out = TestUtils.grabConsoleOutput(ConsumerGroupCommand.main(cgcArgs))
+      out.contains("TYPE") && out.contains("Consumer") && 
out.contains("STATE") && out.contains("Stable") && out.contains(protocolGroup)
+    }, s"Expected to find $protocolGroup and the header, but found $out")
   }
 
+  def testWaitUntilTrue(
+    typeFilterSet: Set[GroupType],
+    stateFilterSet: Set[ConsumerGroupState],
+    expectedListing: Set[ConsumerGroupListing],
+    service:  ConsumerGroupCommand.ConsumerGroupService

Review Comment:
   nit: There is an extra space after `:`. I also suggest to put `service` 
first.



##########
core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala:
##########
@@ -18,74 +18,196 @@ package kafka.admin
 
 import joptsimple.OptionException
 import org.junit.jupiter.api.Assertions._
-import kafka.utils.TestUtils
-import org.apache.kafka.common.ConsumerGroupState
+import kafka.utils.{TestInfoUtils, TestUtils}
 import org.apache.kafka.clients.admin.ConsumerGroupListing
-import java.util.Optional
-
+import org.apache.kafka.common.{ConsumerGroupState, GroupType}
+import org.junit.jupiter.api.Test
 import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.ValueSource
+import org.junit.jupiter.params.provider.MethodSource
+
+import java.util.Optional
 
 class ListConsumerGroupTest extends ConsumerGroupCommandTest {
 
-  @ParameterizedTest
-  @ValueSource(strings = Array("zk", "kraft"))
-  def testListConsumerGroups(quorum: String): Unit = {
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testListConsumerGroupsWithoutFilters(quorum: String, groupProtocol: 
String): Unit = {
     val simpleGroup = "simple-group"
+    val protocolGroup = "protocol-group"
+
+    createOffsetsTopic()

Review Comment:
   While we are here, let's add it as the first line in all the tests using a 
cluster.



##########
core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala:
##########
@@ -18,44 +18,47 @@ package kafka.admin
 
 import joptsimple.OptionException
 import org.junit.jupiter.api.Assertions._
-import kafka.utils.TestUtils
-import org.apache.kafka.common.ConsumerGroupState
+import kafka.utils.{TestInfoUtils, TestUtils}
 import org.apache.kafka.clients.admin.ConsumerGroupListing
-import java.util.Optional
-
+import org.apache.kafka.common.{ConsumerGroupState, GroupType}
 import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.ValueSource
+import org.junit.jupiter.params.provider.MethodSource
+
+import java.util.Optional
 
 class ListConsumerGroupTest extends ConsumerGroupCommandTest {
 
-  @ParameterizedTest
-  @ValueSource(strings = Array("zk", "kraft"))
-  def testListConsumerGroups(quorum: String): Unit = {
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testListConsumerGroupsWithoutFilters(quorum: String, groupProtocol: 
String): Unit = {
     val simpleGroup = "simple-group"
+    val protocolGroup = "protocol-group"
+
     addSimpleGroupExecutor(group = simpleGroup)
     addConsumerGroupExecutor(numConsumers = 1)
+    addConsumerGroupExecutor(numConsumers = 1, group = protocolGroup, 
groupProtocol = groupProtocol)
 
     val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--list")
     val service = getConsumerGroupService(cgcArgs)
 
-    val expectedGroups = Set(group, simpleGroup)
+    val expectedGroups = Set(protocolGroup, group, simpleGroup)
     var foundGroups = Set.empty[String]
     TestUtils.waitUntilTrue(() => {
       foundGroups = service.listConsumerGroups().toSet
       expectedGroups == foundGroups
     }, s"Expected --list to show groups $expectedGroups, but found 
$foundGroups.")
   }
 
-  @ParameterizedTest
-  @ValueSource(strings = Array("zk", "kraft"))
-  def testListWithUnrecognizedNewConsumerOption(): Unit = {
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testListWithUnrecognizedNewConsumerOption(quorum: String, groupProtocol: 
String): Unit = {
     val cgcArgs = Array("--new-consumer", "--bootstrap-server", 
bootstrapServers(), "--list")
     assertThrows(classOf[OptionException], () => 
getConsumerGroupService(cgcArgs))
   }
 
-  @ParameterizedTest
-  @ValueSource(strings = Array("zk", "kraft"))
-  def testListConsumerGroupsWithStates(): Unit = {
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testListConsumerGroupsWithStates(quorum: String, groupProtocol: String): 
Unit = {
     val simpleGroup = "simple-group"
     addSimpleGroupExecutor(group = simpleGroup)
     addConsumerGroupExecutor(numConsumers = 1)

Review Comment:
   My point was that we have parameterized the test to run with all the 
coordinators and all the consumers but we don't use the latter at all. So we 
either need to only run with classic or we should make the test using the 
other. A simple way would be to pass the groupProtocol to the existing 
addConsumerGroupExecutor.



##########
core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala:
##########
@@ -111,37 +233,138 @@ class ListConsumerGroupTest extends 
ConsumerGroupCommandTest {
     assertThrows(classOf[IllegalArgumentException], () => 
ConsumerGroupCommand.consumerGroupStatesFromString("   ,   ,"))
   }
 
-  @ParameterizedTest
-  @ValueSource(strings = Array("zk", "kraft"))
-  def testListGroupCommand(quorum: String): Unit = {
+  @Test
+  def testConsumerGroupTypesFromString(): Unit = {
+    var result = ConsumerGroupCommand.consumerGroupTypesFromString("consumer")
+    assertEquals(Set(GroupType.CONSUMER), result)
+
+    result = ConsumerGroupCommand.consumerGroupTypesFromString("consumer, 
classic")
+    assertEquals(Set(GroupType.CONSUMER, GroupType.CLASSIC), result)
+
+    result = ConsumerGroupCommand.consumerGroupTypesFromString("Consumer, 
Classic")
+    assertEquals(Set(GroupType.CONSUMER, GroupType.CLASSIC), result)
+
+    assertThrows(classOf[IllegalArgumentException], () => 
ConsumerGroupCommand.consumerGroupTypesFromString("bad, wrong"))
+
+    assertThrows(classOf[IllegalArgumentException], () => 
ConsumerGroupCommand.consumerGroupTypesFromString("  bad, generic"))
+
+    assertThrows(classOf[IllegalArgumentException], () => 
ConsumerGroupCommand.consumerGroupTypesFromString("   ,   ,"))
+  }
+
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly"))
+  def testListGroupCommandClassicProtocol(quorum: String, groupProtocol: 
String): Unit = {
     val simpleGroup = "simple-group"
+    val protocolGroup = "protocol-group"
+
     addSimpleGroupExecutor(group = simpleGroup)
     addConsumerGroupExecutor(numConsumers = 1)
+    addConsumerGroupExecutor(numConsumers = 1, group = protocolGroup, 
groupProtocol = groupProtocol)
     var out = ""
 
     var cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--list")
     TestUtils.waitUntilTrue(() => {
       out = TestUtils.grabConsoleOutput(ConsumerGroupCommand.main(cgcArgs))
-      !out.contains("STATE") && out.contains(simpleGroup) && 
out.contains(group)
-    }, s"Expected to find $simpleGroup, $group and no header, but found $out")
+      !out.contains("STATE") && out.contains(simpleGroup) && 
out.contains(group) && out.contains(protocolGroup)
+    }, s"Expected to find $simpleGroup, $group, $protocolGroup and no header, 
but found $out")
 
     cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--list", 
"--state")
     TestUtils.waitUntilTrue(() => {
       out = TestUtils.grabConsoleOutput(ConsumerGroupCommand.main(cgcArgs))
-      out.contains("STATE") && out.contains(simpleGroup) && out.contains(group)
-    }, s"Expected to find $simpleGroup, $group and the header, but found $out")
+      out.contains("STATE") && !out.contains("TYPE") && 
out.contains(simpleGroup) && out.contains(group) && out.contains(protocolGroup)
+    }, s"Expected to find $simpleGroup, $group, $protocolGroup and the header, 
but found $out")
+
+    cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--list", 
"--type")
+    TestUtils.waitUntilTrue(() => {
+      out = TestUtils.grabConsoleOutput(ConsumerGroupCommand.main(cgcArgs))
+      out.contains("TYPE") && !out.contains("STATE") && 
out.contains(simpleGroup) && out.contains(group) && out.contains(protocolGroup)
+    }, s"Expected to find $simpleGroup, $group, $protocolGroup and the header, 
but found $out")
+
+    cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--list", 
"--state", "--type")
+    TestUtils.waitUntilTrue(() => {
+      out = TestUtils.grabConsoleOutput(ConsumerGroupCommand.main(cgcArgs))
+      out.contains("TYPE") && out.contains("STATE") && 
out.contains(simpleGroup) && out.contains(group) && out.contains(protocolGroup)
+    }, s"Expected to find $simpleGroup, $group, $protocolGroup and the header, 
but found $out")
 
     cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--list", 
"--state", "Stable")
     TestUtils.waitUntilTrue(() => {
       out = TestUtils.grabConsoleOutput(ConsumerGroupCommand.main(cgcArgs))
-      out.contains("STATE") && out.contains(group) && out.contains("Stable")
-    }, s"Expected to find $group in state Stable and the header, but found 
$out")
+      out.contains("STATE") && out.contains(group) && out.contains("Stable") 
&& out.contains(protocolGroup)
+    }, s"Expected to find $group, $protocolGroup in state Stable and the 
header, but found $out")
 
     cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--list", 
"--state", "stable")
     TestUtils.waitUntilTrue(() => {
       out = TestUtils.grabConsoleOutput(ConsumerGroupCommand.main(cgcArgs))
-      out.contains("STATE") && out.contains(group) && out.contains("Stable")
-    }, s"Expected to find $group in state Stable and the header, but found 
$out")
+      out.contains("STATE") && out.contains(group) && out.contains("Stable") 
&& out.contains(protocolGroup)
+    }, s"Expected to find $group, $protocolGroup in state Stable and the 
header, but found $out")
+
+    cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--list", 
"--type", "Classic")
+    TestUtils.waitUntilTrue(() => {
+      out = TestUtils.grabConsoleOutput(ConsumerGroupCommand.main(cgcArgs))
+      out.contains("TYPE") && out.contains("Classic") && 
!out.contains("STATE") && out.contains(simpleGroup) && out.contains(group) && 
out.contains(protocolGroup)
+    }, s"Expected to find $simpleGroup, $group, $protocolGroup and the header, 
but found $out")
+
+    cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--list", 
"--type", "classic")
+    TestUtils.waitUntilTrue(() => {
+      out = TestUtils.grabConsoleOutput(ConsumerGroupCommand.main(cgcArgs))
+      out.contains("TYPE") && out.contains("Classic") && 
!out.contains("STATE") && out.contains(simpleGroup) && out.contains(group) && 
out.contains(protocolGroup)
+    }, s"Expected to find $simpleGroup, $group, $protocolGroup  and the 
header, but found $out")
+  }
+
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly"))
+  def testListGroupCommandConsumerProtocol(quorum: String, groupProtocol: 
String): Unit = {
+    val simpleGroup = "simple-group"
+    val protocolGroup = "protocol-group"
+
+    createOffsetsTopic()
+
+    addSimpleGroupExecutor(group = simpleGroup)
+    addConsumerGroupExecutor(numConsumers = 1)
+    addConsumerGroupExecutor(numConsumers = 1, group = protocolGroup, 
groupProtocol = groupProtocol)
+    var out = ""
+
+    var cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--list")
+    TestUtils.waitUntilTrue(() => {
+      out = TestUtils.grabConsoleOutput(ConsumerGroupCommand.main(cgcArgs))
+      !out.contains("STATE") && out.contains(simpleGroup) && 
out.contains(group) && out.contains(protocolGroup)
+    }, s"Expected to find $simpleGroup, $group, $protocolGroup and no header, 
but found $out")
+
+    cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--list", 
"--state")
+    TestUtils.waitUntilTrue(() => {
+      out = TestUtils.grabConsoleOutput(ConsumerGroupCommand.main(cgcArgs))
+      out.contains("STATE") && !out.contains("TYPE") && 
out.contains(simpleGroup) && out.contains(group) && out.contains(protocolGroup)
+    }, s"Expected to find $simpleGroup, $group, $protocolGroup and the header, 
but found $out")
+
+    cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--list", 
"--type")
+    TestUtils.waitUntilTrue(() => {
+      out = TestUtils.grabConsoleOutput(ConsumerGroupCommand.main(cgcArgs))
+      out.contains("TYPE") && out.contains("Consumer") && 
!out.contains("STATE") && out.contains(protocolGroup) && 
out.contains(simpleGroup) && out.contains(group)
+    }, s"Expected to find $protocolGroup and the header, but found $out")
+
+    cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--list", 
"--type", "consumer")
+    TestUtils.waitUntilTrue(() => {
+      out = TestUtils.grabConsoleOutput(ConsumerGroupCommand.main(cgcArgs))
+      out.contains("TYPE") && out.contains("Consumer") && 
!out.contains("STATE") && out.contains(protocolGroup)
+    }, s"Expected to find $protocolGroup and the header, but found $out")
+
+    cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--list", 
"--type", "consumer", "--state", "Stable")
+    TestUtils.waitUntilTrue(() => {
+      out = TestUtils.grabConsoleOutput(ConsumerGroupCommand.main(cgcArgs))
+      out.contains("TYPE") && out.contains("Consumer") && 
out.contains("STATE") && out.contains("Stable") && out.contains(protocolGroup)
+    }, s"Expected to find $protocolGroup and the header, but found $out")
   }
 
+  def testWaitUntilTrue(

Review Comment:
   nit: We could make it private.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to