dajac commented on code in PR #15150:
URL: https://github.com/apache/kafka/pull/15150#discussion_r1483062130
##########
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java:
##########
@@ -2811,6 +2811,71 @@ public void testListConsumerGroupsWithStates() throws
Exception {
}
}
+ @Test
+ public void testListConsumerGroupsWithTypes() throws Exception {
+ try (AdminClientUnitTestEnv env = new
AdminClientUnitTestEnv(mockCluster(1, 0))) {
+ env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+
+ // Test with no specific list consumer group options.
+
env.kafkaClient().prepareResponse(prepareMetadataResponse(env.cluster(),
Errors.NONE));
+
+ env.kafkaClient().prepareResponseFrom(
+ new ListGroupsResponse(new ListGroupsResponseData()
+ .setErrorCode(Errors.NONE.code())
+ .setGroups(Arrays.asList(
+ new ListGroupsResponseData.ListedGroup()
+ .setGroupId("group-1")
+ .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
+ .setGroupState("Stable")
+ .setGroupType(GroupType.CLASSIC.toString()),
+ new ListGroupsResponseData.ListedGroup()
+ .setGroupId("group-2")
+ .setGroupState("Empty")
+ .setGroupType(GroupType.CLASSIC.toString())))),
+ env.cluster().nodeById(0));
+
+ final ListConsumerGroupsOptions options = new
ListConsumerGroupsOptions();
+ final ListConsumerGroupsResult result =
env.adminClient().listConsumerGroups(options);
+ Collection<ConsumerGroupListing> listings = result.valid().get();
+
+ assertEquals(2, listings.size());
+ List<ConsumerGroupListing> expected = new ArrayList<>();
+ expected.add(new ConsumerGroupListing("group-2", true,
Optional.of(ConsumerGroupState.EMPTY), Optional.of(GroupType.CLASSIC)));
+ expected.add(new ConsumerGroupListing("group-1", false,
Optional.of(ConsumerGroupState.STABLE), Optional.of(GroupType.CLASSIC)));
+ assertEquals(expected, listings);
+ assertEquals(0, result.errors().get().size());
+
+ // Test with list consumer group options.
+
env.kafkaClient().prepareResponse(prepareMetadataResponse(env.cluster(),
Errors.NONE));
+
+ env.kafkaClient().prepareResponseFrom(
Review Comment:
It may be worth validating that the request also contains the expected
filters. For this, you can pass a request matcher as the first argument here.
There are examples in this file. We could also add it to the previous case.
##########
core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala:
##########
@@ -189,16 +197,65 @@ object ConsumerGroupCommand extends Logging {
}
def listGroups(): Unit = {
- if (opts.options.has(opts.stateOpt)) {
- val stateValue = opts.options.valueOf(opts.stateOpt)
- val states = if (stateValue == null || stateValue.isEmpty)
- Set[ConsumerGroupState]()
- else
- consumerGroupStatesFromString(stateValue)
- val listings = listConsumerGroupsWithState(states)
- printGroupStates(listings.map(e => (e.groupId, e.state.get.toString)))
- } else
+ val includeType = opts.options.has(opts.typeOpt)
+ val includeState = opts.options.has(opts.stateOpt)
+
+ if (includeType || includeState) {
+ val types = typeValues()
+ val states = stateValues()
+ val listings = listConsumerGroupsWithFilters(types, states)
+
+ printGroupInfo(listings, includeType, includeState)
+
+ } else {
listConsumerGroups().foreach(println(_))
+ }
+ }
+
+ private def stateValues(): Set[ConsumerGroupState] = {
+ val stateValue = opts.options.valueOf(opts.stateOpt)
+ if (stateValue == null || stateValue.isEmpty)
+ Set[ConsumerGroupState]()
+ else
+ consumerGroupStatesFromString(stateValue)
+ }
+
+ private def typeValues(): Set[GroupType] = {
+ val typeValue = opts.options.valueOf(opts.typeOpt)
+ if (typeValue == null || typeValue.isEmpty)
+ Set[GroupType]()
+ else
+ consumerGroupTypesFromString(typeValue)
+ }
+
+ private def printGroupInfo(groups: List[ConsumerGroupListing],
includeType: Boolean, includeState: Boolean): Unit = {
+ def groupId(groupListing: ConsumerGroupListing): String =
groupListing.groupId
+ def groupType(groupListing: ConsumerGroupListing): String =
groupListing.`type`().orElse(GroupType.UNKNOWN).toString
+ def groupState(groupListing: ConsumerGroupListing): String =
groupListing.state.orElse(ConsumerGroupState.UNKNOWN).toString
+
+ val maxGroupLen = groups.foldLeft(15)((maxLen, groupListing) =>
Math.max(maxLen, groupId(groupListing).length)) + 10
+ var format = s"%-${maxGroupLen}s"
+ var header = List("Group")
Review Comment:
nit: `GROUP` as it was before.
##########
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java:
##########
@@ -2849,6 +2914,44 @@ public void
testListConsumerGroupsWithStatesOlderBrokerVersion() throws Exceptio
}
}
+ @Test
+ public void testListConsumerGroupsWithTypesOlderBrokerVersion() throws
Exception {
+ ApiVersion listGroupV4 = new ApiVersion()
+ .setApiKey(ApiKeys.LIST_GROUPS.id)
+ .setMinVersion((short) 0)
+ .setMaxVersion((short) 4);
+ try (AdminClientUnitTestEnv env = new
AdminClientUnitTestEnv(mockCluster(1, 0))) {
+
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create(Collections.singletonList(listGroupV4)));
+
+
env.kafkaClient().prepareResponse(prepareMetadataResponse(env.cluster(),
Errors.NONE));
+
+ // Check if we can list groups with older broker if we don't
specify types.
+ env.kafkaClient().prepareResponseFrom(
+ new ListGroupsResponse(new ListGroupsResponseData()
+ .setErrorCode(Errors.NONE.code())
+ .setGroups(Collections.singletonList(
+ new ListGroupsResponseData.ListedGroup()
+ .setGroupId("group-1")
+
.setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)))),
+ env.cluster().nodeById(0));
+ ListConsumerGroupsOptions options = new
ListConsumerGroupsOptions();
+ ListConsumerGroupsResult result =
env.adminClient().listConsumerGroups(options);
+ Collection<ConsumerGroupListing> listing = result.all().get();
+ assertEquals(1, listing.size());
+ List<ConsumerGroupListing> expected =
Collections.singletonList(new ConsumerGroupListing("group-1", false));
+ assertEquals(expected, listing);
Review Comment:
I wonder if this is really necessary. We already test this in the other test.
##########
core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala:
##########
@@ -111,37 +263,124 @@ class ListConsumerGroupTest extends
ConsumerGroupCommandTest {
assertThrows(classOf[IllegalArgumentException], () =>
ConsumerGroupCommand.consumerGroupStatesFromString(" , ,"))
}
- @ParameterizedTest
- @ValueSource(strings = Array("zk", "kraft"))
- def testListGroupCommand(quorum: String): Unit = {
+ @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+ @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+ def testConsumerGroupTypesFromString(quorum: String, groupProtocol: String):
Unit = {
+ var result = ConsumerGroupCommand.consumerGroupTypesFromString("consumer")
+ assertEquals(Set(GroupType.CONSUMER), result)
+
+ result = ConsumerGroupCommand.consumerGroupTypesFromString("consumer,
classic")
+ assertEquals(Set(GroupType.CONSUMER, GroupType.CLASSIC), result)
+
+ result = ConsumerGroupCommand.consumerGroupTypesFromString("Consumer,
Classic")
+ assertEquals(Set(GroupType.CONSUMER, GroupType.CLASSIC), result)
+
+ assertThrows(classOf[IllegalArgumentException], () =>
ConsumerGroupCommand.consumerGroupTypesFromString("bad, wrong"))
+
+ assertThrows(classOf[IllegalArgumentException], () =>
ConsumerGroupCommand.consumerGroupTypesFromString(" bad, generic"))
+
+ assertThrows(classOf[IllegalArgumentException], () =>
ConsumerGroupCommand.consumerGroupTypesFromString(" , ,"))
+ }
+
+ @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly"))
+ def testListGroupCommandClassicProtocol(quorum: String, groupProtocol:
String): Unit = {
val simpleGroup = "simple-group"
+ val protocolGroup = "protocol-group"
+
addSimpleGroupExecutor(group = simpleGroup)
addConsumerGroupExecutor(numConsumers = 1)
+ addConsumerGroupExecutor(numConsumers = 1, group = protocolGroup,
groupProtocol = groupProtocol)
var out = ""
var cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--list")
TestUtils.waitUntilTrue(() => {
out = TestUtils.grabConsoleOutput(ConsumerGroupCommand.main(cgcArgs))
- !out.contains("STATE") && out.contains(simpleGroup) &&
out.contains(group)
- }, s"Expected to find $simpleGroup, $group and no header, but found $out")
+ !out.contains("STATE") && out.contains(simpleGroup) &&
out.contains(group) && out.contains(protocolGroup)
+ }, s"Expected to find $simpleGroup, $group, $protocolGroup and no header,
but found $out")
cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--list",
"--state")
TestUtils.waitUntilTrue(() => {
out = TestUtils.grabConsoleOutput(ConsumerGroupCommand.main(cgcArgs))
- out.contains("STATE") && out.contains(simpleGroup) && out.contains(group)
- }, s"Expected to find $simpleGroup, $group and the header, but found $out")
+ out.contains("STATE") && !out.contains("TYPE") &&
out.contains(simpleGroup) && out.contains(group) && out.contains(protocolGroup)
+ }, s"Expected to find $simpleGroup, $group, $protocolGroup and the header,
but found $out")
+
+ cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--list",
"--type")
+ TestUtils.waitUntilTrue(() => {
+ out = TestUtils.grabConsoleOutput(ConsumerGroupCommand.main(cgcArgs))
+ out.contains("TYPE") && !out.contains("STATE") &&
out.contains(simpleGroup) && out.contains(group) && out.contains(protocolGroup)
+ }, s"Expected to find $simpleGroup, $group, $protocolGroup and the header,
but found $out")
+
+ cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--list",
"--state", "--type")
+ TestUtils.waitUntilTrue(() => {
+ out = TestUtils.grabConsoleOutput(ConsumerGroupCommand.main(cgcArgs))
+ out.contains("TYPE") && out.contains("STATE") &&
out.contains(simpleGroup) && out.contains(group) && out.contains(protocolGroup)
+ }, s"Expected to find $simpleGroup, $group, $protocolGroup and the header,
but found $out")
cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--list",
"--state", "Stable")
TestUtils.waitUntilTrue(() => {
out = TestUtils.grabConsoleOutput(ConsumerGroupCommand.main(cgcArgs))
- out.contains("STATE") && out.contains(group) && out.contains("Stable")
- }, s"Expected to find $group in state Stable and the header, but found
$out")
+ out.contains("STATE") && out.contains(group) && out.contains("Stable")
&& out.contains(protocolGroup)
+ }, s"Expected to find $group, $protocolGroup in state Stable and the
header, but found $out")
cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--list",
"--state", "stable")
TestUtils.waitUntilTrue(() => {
out = TestUtils.grabConsoleOutput(ConsumerGroupCommand.main(cgcArgs))
- out.contains("STATE") && out.contains(group) && out.contains("Stable")
- }, s"Expected to find $group in state Stable and the header, but found
$out")
+ out.contains("STATE") && out.contains(group) && out.contains("Stable")
&& out.contains(protocolGroup)
+ }, s"Expected to find $group, $protocolGroup in state Stable and the
header, but found $out")
+
+ cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--list",
"--type", "Classic")
+ TestUtils.waitUntilTrue(() => {
+ out = TestUtils.grabConsoleOutput(ConsumerGroupCommand.main(cgcArgs))
+ out.contains("TYPE") && out.contains("Classic") &&
!out.contains("STATE") && out.contains(simpleGroup) && out.contains(group) &&
out.contains(protocolGroup)
+ }, s"Expected to find $simpleGroup, $group, $protocolGroup and the header,
but found $out")
+
+ cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--list",
"--type", "classic")
+ TestUtils.waitUntilTrue(() => {
+ out = TestUtils.grabConsoleOutput(ConsumerGroupCommand.main(cgcArgs))
+ out.contains("TYPE") && out.contains("Classic") &&
!out.contains("STATE") && out.contains(simpleGroup) && out.contains(group) &&
out.contains(protocolGroup)
+ }, s"Expected to find $simpleGroup, $group, $protocolGroup and the
header, but found $out")
}
+ @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+ @CsvSource(Array("kraft+kip848,consumer"))
+ def testListGroupCommandConsumerProtocol(quorum: String, groupProtocol:
String): Unit = {
+ val simpleGroup = "simple-group"
Review Comment:
btw, I was wondering why tests fail in this suite but not in
DescribeConsumerGroupTest. I have noticed that there we call
`createOffsetsTopic()` in each tests. We could also try this here.
##########
core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala:
##########
@@ -64,28 +67,177 @@ class ListConsumerGroupTest extends
ConsumerGroupCommandTest {
val service = getConsumerGroupService(cgcArgs)
val expectedListing = Set(
- new ConsumerGroupListing(simpleGroup, true,
Optional.of(ConsumerGroupState.EMPTY)),
- new ConsumerGroupListing(group, false,
Optional.of(ConsumerGroupState.STABLE)))
+ new ConsumerGroupListing(
+ simpleGroup,
+ true,
+ Optional.of(ConsumerGroupState.EMPTY),
+ Optional.of(GroupType.CLASSIC)
+ ),
+ new ConsumerGroupListing(
+ group,
+ false,
+ Optional.of(ConsumerGroupState.STABLE),
+ Optional.of(GroupType.CLASSIC)
+ )
+ )
var foundListing = Set.empty[ConsumerGroupListing]
TestUtils.waitUntilTrue(() => {
- foundListing =
service.listConsumerGroupsWithState(ConsumerGroupState.values.toSet).toSet
+ foundListing = service.listConsumerGroupsWithFilters(Set.empty,
ConsumerGroupState.values.toSet).toSet
expectedListing == foundListing
}, s"Expected to show groups $expectedListing, but found $foundListing")
- val expectedListingStable = Set(
- new ConsumerGroupListing(group, false,
Optional.of(ConsumerGroupState.STABLE)))
+ val expectedListingStable = Set.empty[ConsumerGroupListing]
foundListing = Set.empty[ConsumerGroupListing]
TestUtils.waitUntilTrue(() => {
- foundListing =
service.listConsumerGroupsWithState(Set(ConsumerGroupState.STABLE)).toSet
+ foundListing = service.listConsumerGroupsWithFilters(Set.empty,
Set(ConsumerGroupState.PREPARING_REBALANCE)).toSet
expectedListingStable == foundListing
}, s"Expected to show groups $expectedListingStable, but found
$foundListing")
}
- @ParameterizedTest
- @ValueSource(strings = Array("zk", "kraft"))
- def testConsumerGroupStatesFromString(quorum: String): Unit = {
+ @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly"))
+ def testListConsumerGroupsWithTypesClassicProtocol(quorum: String,
groupProtocol: String): Unit = {
+ val simpleGroup = "simple-group"
+ val protocolGroup = "protocol-group"
+
+ addSimpleGroupExecutor(group = simpleGroup)
+ addConsumerGroupExecutor(numConsumers = 1)
+ addConsumerGroupExecutor(numConsumers = 1, group = protocolGroup,
groupProtocol = groupProtocol)
+
+ val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--list",
"--type")
+ val service = getConsumerGroupService(cgcArgs)
+
+ val expectedListingStable = Set.empty[ConsumerGroupListing]
+
+ val expectedListing = Set(
+ new ConsumerGroupListing(
+ simpleGroup,
+ true,
+ Optional.of(ConsumerGroupState.EMPTY),
+ Optional.of(GroupType.CLASSIC)
+ ),
+ new ConsumerGroupListing(
+ group,
+ false,
+ Optional.of(ConsumerGroupState.STABLE),
+ Optional.of(GroupType.CLASSIC)
+ ),
+ new ConsumerGroupListing(
+ protocolGroup,
+ false,
+ Optional.of(ConsumerGroupState.STABLE),
+ Optional.of(GroupType.CLASSIC)
+ )
+ )
+
+ // No filters explicitly mentioned. Expectation is that all groups are
returned.
+ var foundListing = Set.empty[ConsumerGroupListing]
+ TestUtils.waitUntilTrue(() => {
+ foundListing = service.listConsumerGroupsWithFilters(Set.empty,
Set.empty).toSet
+ expectedListing == foundListing
+ }, s"Expected to show groups $expectedListing, but found $foundListing")
Review Comment:
nit: This code is repeated many times. I wonder if it may be worth
extracting it into an helper method. It would make the test cases smaller.
##########
core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala:
##########
@@ -189,16 +197,65 @@ object ConsumerGroupCommand extends Logging {
}
def listGroups(): Unit = {
- if (opts.options.has(opts.stateOpt)) {
- val stateValue = opts.options.valueOf(opts.stateOpt)
- val states = if (stateValue == null || stateValue.isEmpty)
- Set[ConsumerGroupState]()
- else
- consumerGroupStatesFromString(stateValue)
- val listings = listConsumerGroupsWithState(states)
- printGroupStates(listings.map(e => (e.groupId, e.state.get.toString)))
- } else
+ val includeType = opts.options.has(opts.typeOpt)
+ val includeState = opts.options.has(opts.stateOpt)
+
+ if (includeType || includeState) {
+ val types = typeValues()
+ val states = stateValues()
+ val listings = listConsumerGroupsWithFilters(types, states)
+
+ printGroupInfo(listings, includeType, includeState)
+
+ } else {
listConsumerGroups().foreach(println(_))
+ }
+ }
+
+ private def stateValues(): Set[ConsumerGroupState] = {
+ val stateValue = opts.options.valueOf(opts.stateOpt)
+ if (stateValue == null || stateValue.isEmpty)
+ Set[ConsumerGroupState]()
+ else
+ consumerGroupStatesFromString(stateValue)
+ }
+
+ private def typeValues(): Set[GroupType] = {
+ val typeValue = opts.options.valueOf(opts.typeOpt)
+ if (typeValue == null || typeValue.isEmpty)
+ Set[GroupType]()
+ else
+ consumerGroupTypesFromString(typeValue)
+ }
+
+ private def printGroupInfo(groups: List[ConsumerGroupListing],
includeType: Boolean, includeState: Boolean): Unit = {
+ def groupId(groupListing: ConsumerGroupListing): String =
groupListing.groupId
+ def groupType(groupListing: ConsumerGroupListing): String =
groupListing.`type`().orElse(GroupType.UNKNOWN).toString
+ def groupState(groupListing: ConsumerGroupListing): String =
groupListing.state.orElse(ConsumerGroupState.UNKNOWN).toString
+
+ val maxGroupLen = groups.foldLeft(15)((maxLen, groupListing) =>
Math.max(maxLen, groupId(groupListing).length)) + 10
Review Comment:
nit: It looks like `+ 10` was not there before. Why do we need it?
##########
core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala:
##########
@@ -64,28 +67,177 @@ class ListConsumerGroupTest extends
ConsumerGroupCommandTest {
val service = getConsumerGroupService(cgcArgs)
val expectedListing = Set(
- new ConsumerGroupListing(simpleGroup, true,
Optional.of(ConsumerGroupState.EMPTY)),
- new ConsumerGroupListing(group, false,
Optional.of(ConsumerGroupState.STABLE)))
+ new ConsumerGroupListing(
+ simpleGroup,
+ true,
+ Optional.of(ConsumerGroupState.EMPTY),
+ Optional.of(GroupType.CLASSIC)
+ ),
+ new ConsumerGroupListing(
+ group,
+ false,
+ Optional.of(ConsumerGroupState.STABLE),
+ Optional.of(GroupType.CLASSIC)
+ )
+ )
var foundListing = Set.empty[ConsumerGroupListing]
TestUtils.waitUntilTrue(() => {
- foundListing =
service.listConsumerGroupsWithState(ConsumerGroupState.values.toSet).toSet
+ foundListing = service.listConsumerGroupsWithFilters(Set.empty,
ConsumerGroupState.values.toSet).toSet
expectedListing == foundListing
}, s"Expected to show groups $expectedListing, but found $foundListing")
- val expectedListingStable = Set(
- new ConsumerGroupListing(group, false,
Optional.of(ConsumerGroupState.STABLE)))
+ val expectedListingStable = Set.empty[ConsumerGroupListing]
foundListing = Set.empty[ConsumerGroupListing]
TestUtils.waitUntilTrue(() => {
- foundListing =
service.listConsumerGroupsWithState(Set(ConsumerGroupState.STABLE)).toSet
+ foundListing = service.listConsumerGroupsWithFilters(Set.empty,
Set(ConsumerGroupState.PREPARING_REBALANCE)).toSet
expectedListingStable == foundListing
}, s"Expected to show groups $expectedListingStable, but found
$foundListing")
}
- @ParameterizedTest
- @ValueSource(strings = Array("zk", "kraft"))
- def testConsumerGroupStatesFromString(quorum: String): Unit = {
+ @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly"))
+ def testListConsumerGroupsWithTypesClassicProtocol(quorum: String,
groupProtocol: String): Unit = {
+ val simpleGroup = "simple-group"
+ val protocolGroup = "protocol-group"
+
+ addSimpleGroupExecutor(group = simpleGroup)
+ addConsumerGroupExecutor(numConsumers = 1)
+ addConsumerGroupExecutor(numConsumers = 1, group = protocolGroup,
groupProtocol = groupProtocol)
Review Comment:
nit: We probably don't need this one as we only test with the classic
protocol in this test.
##########
clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupListing.java:
##########
@@ -51,6 +53,27 @@ public ConsumerGroupListing(String groupId, boolean
isSimpleConsumerGroup, Optio
this.groupId = groupId;
this.isSimpleConsumerGroup = isSimpleConsumerGroup;
this.state = Objects.requireNonNull(state);
+ this.type = Optional.empty();
Review Comment:
nit: Let's call the other constructor here.
##########
core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala:
##########
@@ -111,37 +263,124 @@ class ListConsumerGroupTest extends
ConsumerGroupCommandTest {
assertThrows(classOf[IllegalArgumentException], () =>
ConsumerGroupCommand.consumerGroupStatesFromString(" , ,"))
}
- @ParameterizedTest
- @ValueSource(strings = Array("zk", "kraft"))
- def testListGroupCommand(quorum: String): Unit = {
+ @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+ @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
Review Comment:
nit: `@Test`?
##########
core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala:
##########
@@ -111,37 +263,124 @@ class ListConsumerGroupTest extends
ConsumerGroupCommandTest {
assertThrows(classOf[IllegalArgumentException], () =>
ConsumerGroupCommand.consumerGroupStatesFromString(" , ,"))
}
- @ParameterizedTest
- @ValueSource(strings = Array("zk", "kraft"))
- def testListGroupCommand(quorum: String): Unit = {
+ @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+ @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+ def testConsumerGroupTypesFromString(quorum: String, groupProtocol: String):
Unit = {
+ var result = ConsumerGroupCommand.consumerGroupTypesFromString("consumer")
+ assertEquals(Set(GroupType.CONSUMER), result)
+
+ result = ConsumerGroupCommand.consumerGroupTypesFromString("consumer,
classic")
+ assertEquals(Set(GroupType.CONSUMER, GroupType.CLASSIC), result)
+
+ result = ConsumerGroupCommand.consumerGroupTypesFromString("Consumer,
Classic")
+ assertEquals(Set(GroupType.CONSUMER, GroupType.CLASSIC), result)
+
+ assertThrows(classOf[IllegalArgumentException], () =>
ConsumerGroupCommand.consumerGroupTypesFromString("bad, wrong"))
+
+ assertThrows(classOf[IllegalArgumentException], () =>
ConsumerGroupCommand.consumerGroupTypesFromString(" bad, generic"))
+
+ assertThrows(classOf[IllegalArgumentException], () =>
ConsumerGroupCommand.consumerGroupTypesFromString(" , ,"))
+ }
+
+ @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly"))
+ def testListGroupCommandClassicProtocol(quorum: String, groupProtocol:
String): Unit = {
val simpleGroup = "simple-group"
+ val protocolGroup = "protocol-group"
+
addSimpleGroupExecutor(group = simpleGroup)
addConsumerGroupExecutor(numConsumers = 1)
+ addConsumerGroupExecutor(numConsumers = 1, group = protocolGroup,
groupProtocol = groupProtocol)
var out = ""
var cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--list")
TestUtils.waitUntilTrue(() => {
out = TestUtils.grabConsoleOutput(ConsumerGroupCommand.main(cgcArgs))
- !out.contains("STATE") && out.contains(simpleGroup) &&
out.contains(group)
- }, s"Expected to find $simpleGroup, $group and no header, but found $out")
+ !out.contains("STATE") && out.contains(simpleGroup) &&
out.contains(group) && out.contains(protocolGroup)
+ }, s"Expected to find $simpleGroup, $group, $protocolGroup and no header,
but found $out")
cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--list",
"--state")
TestUtils.waitUntilTrue(() => {
out = TestUtils.grabConsoleOutput(ConsumerGroupCommand.main(cgcArgs))
- out.contains("STATE") && out.contains(simpleGroup) && out.contains(group)
- }, s"Expected to find $simpleGroup, $group and the header, but found $out")
+ out.contains("STATE") && !out.contains("TYPE") &&
out.contains(simpleGroup) && out.contains(group) && out.contains(protocolGroup)
+ }, s"Expected to find $simpleGroup, $group, $protocolGroup and the header,
but found $out")
+
+ cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--list",
"--type")
+ TestUtils.waitUntilTrue(() => {
+ out = TestUtils.grabConsoleOutput(ConsumerGroupCommand.main(cgcArgs))
+ out.contains("TYPE") && !out.contains("STATE") &&
out.contains(simpleGroup) && out.contains(group) && out.contains(protocolGroup)
+ }, s"Expected to find $simpleGroup, $group, $protocolGroup and the header,
but found $out")
+
+ cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--list",
"--state", "--type")
+ TestUtils.waitUntilTrue(() => {
+ out = TestUtils.grabConsoleOutput(ConsumerGroupCommand.main(cgcArgs))
+ out.contains("TYPE") && out.contains("STATE") &&
out.contains(simpleGroup) && out.contains(group) && out.contains(protocolGroup)
+ }, s"Expected to find $simpleGroup, $group, $protocolGroup and the header,
but found $out")
cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--list",
"--state", "Stable")
TestUtils.waitUntilTrue(() => {
out = TestUtils.grabConsoleOutput(ConsumerGroupCommand.main(cgcArgs))
- out.contains("STATE") && out.contains(group) && out.contains("Stable")
- }, s"Expected to find $group in state Stable and the header, but found
$out")
+ out.contains("STATE") && out.contains(group) && out.contains("Stable")
&& out.contains(protocolGroup)
+ }, s"Expected to find $group, $protocolGroup in state Stable and the
header, but found $out")
cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--list",
"--state", "stable")
TestUtils.waitUntilTrue(() => {
out = TestUtils.grabConsoleOutput(ConsumerGroupCommand.main(cgcArgs))
- out.contains("STATE") && out.contains(group) && out.contains("Stable")
- }, s"Expected to find $group in state Stable and the header, but found
$out")
+ out.contains("STATE") && out.contains(group) && out.contains("Stable")
&& out.contains(protocolGroup)
+ }, s"Expected to find $group, $protocolGroup in state Stable and the
header, but found $out")
+
+ cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--list",
"--type", "Classic")
+ TestUtils.waitUntilTrue(() => {
+ out = TestUtils.grabConsoleOutput(ConsumerGroupCommand.main(cgcArgs))
+ out.contains("TYPE") && out.contains("Classic") &&
!out.contains("STATE") && out.contains(simpleGroup) && out.contains(group) &&
out.contains(protocolGroup)
+ }, s"Expected to find $simpleGroup, $group, $protocolGroup and the header,
but found $out")
+
+ cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--list",
"--type", "classic")
+ TestUtils.waitUntilTrue(() => {
+ out = TestUtils.grabConsoleOutput(ConsumerGroupCommand.main(cgcArgs))
+ out.contains("TYPE") && out.contains("Classic") &&
!out.contains("STATE") && out.contains(simpleGroup) && out.contains(group) &&
out.contains(protocolGroup)
+ }, s"Expected to find $simpleGroup, $group, $protocolGroup and the
header, but found $out")
}
+ @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+ @CsvSource(Array("kraft+kip848,consumer"))
Review Comment:
It would be better to also have a method as the source (like the others) for
this one. We can add one if does not exist yet.
##########
core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala:
##########
@@ -64,28 +67,177 @@ class ListConsumerGroupTest extends
ConsumerGroupCommandTest {
val service = getConsumerGroupService(cgcArgs)
val expectedListing = Set(
- new ConsumerGroupListing(simpleGroup, true,
Optional.of(ConsumerGroupState.EMPTY)),
- new ConsumerGroupListing(group, false,
Optional.of(ConsumerGroupState.STABLE)))
+ new ConsumerGroupListing(
+ simpleGroup,
+ true,
+ Optional.of(ConsumerGroupState.EMPTY),
+ Optional.of(GroupType.CLASSIC)
+ ),
+ new ConsumerGroupListing(
+ group,
+ false,
+ Optional.of(ConsumerGroupState.STABLE),
+ Optional.of(GroupType.CLASSIC)
+ )
+ )
var foundListing = Set.empty[ConsumerGroupListing]
TestUtils.waitUntilTrue(() => {
- foundListing =
service.listConsumerGroupsWithState(ConsumerGroupState.values.toSet).toSet
+ foundListing = service.listConsumerGroupsWithFilters(Set.empty,
ConsumerGroupState.values.toSet).toSet
expectedListing == foundListing
}, s"Expected to show groups $expectedListing, but found $foundListing")
- val expectedListingStable = Set(
- new ConsumerGroupListing(group, false,
Optional.of(ConsumerGroupState.STABLE)))
+ val expectedListingStable = Set.empty[ConsumerGroupListing]
foundListing = Set.empty[ConsumerGroupListing]
TestUtils.waitUntilTrue(() => {
- foundListing =
service.listConsumerGroupsWithState(Set(ConsumerGroupState.STABLE)).toSet
+ foundListing = service.listConsumerGroupsWithFilters(Set.empty,
Set(ConsumerGroupState.PREPARING_REBALANCE)).toSet
expectedListingStable == foundListing
}, s"Expected to show groups $expectedListingStable, but found
$foundListing")
}
- @ParameterizedTest
- @ValueSource(strings = Array("zk", "kraft"))
- def testConsumerGroupStatesFromString(quorum: String): Unit = {
+ @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly"))
+ def testListConsumerGroupsWithTypesClassicProtocol(quorum: String,
groupProtocol: String): Unit = {
+ val simpleGroup = "simple-group"
+ val protocolGroup = "protocol-group"
+
+ addSimpleGroupExecutor(group = simpleGroup)
+ addConsumerGroupExecutor(numConsumers = 1)
+ addConsumerGroupExecutor(numConsumers = 1, group = protocolGroup,
groupProtocol = groupProtocol)
+
+ val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--list",
"--type")
+ val service = getConsumerGroupService(cgcArgs)
+
+ val expectedListingStable = Set.empty[ConsumerGroupListing]
+
+ val expectedListing = Set(
+ new ConsumerGroupListing(
+ simpleGroup,
+ true,
+ Optional.of(ConsumerGroupState.EMPTY),
+ Optional.of(GroupType.CLASSIC)
+ ),
+ new ConsumerGroupListing(
+ group,
+ false,
+ Optional.of(ConsumerGroupState.STABLE),
+ Optional.of(GroupType.CLASSIC)
+ ),
+ new ConsumerGroupListing(
+ protocolGroup,
+ false,
+ Optional.of(ConsumerGroupState.STABLE),
+ Optional.of(GroupType.CLASSIC)
+ )
+ )
+
+ // No filters explicitly mentioned. Expectation is that all groups are
returned.
+ var foundListing = Set.empty[ConsumerGroupListing]
+ TestUtils.waitUntilTrue(() => {
+ foundListing = service.listConsumerGroupsWithFilters(Set.empty,
Set.empty).toSet
+ expectedListing == foundListing
+ }, s"Expected to show groups $expectedListing, but found $foundListing")
+
+ // When group type is mentioned:
+ // Old Group Coordinator returns empty listings if the type is not Classic.
+ // New Group Coordinator returns groups according to the filter.
+ foundListing = Set.empty[ConsumerGroupListing]
+ TestUtils.waitUntilTrue(() => {
+ foundListing =
service.listConsumerGroupsWithFilters(Set(GroupType.CONSUMER), Set.empty).toSet
+ expectedListingStable == foundListing
Review Comment:
nit: Should we remove `expectedListingStable` and directly use
`Set.empty[ConsumerGroupListing]` here?
##########
core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala:
##########
@@ -64,28 +67,177 @@ class ListConsumerGroupTest extends
ConsumerGroupCommandTest {
val service = getConsumerGroupService(cgcArgs)
val expectedListing = Set(
- new ConsumerGroupListing(simpleGroup, true,
Optional.of(ConsumerGroupState.EMPTY)),
- new ConsumerGroupListing(group, false,
Optional.of(ConsumerGroupState.STABLE)))
+ new ConsumerGroupListing(
+ simpleGroup,
+ true,
+ Optional.of(ConsumerGroupState.EMPTY),
+ Optional.of(GroupType.CLASSIC)
+ ),
+ new ConsumerGroupListing(
+ group,
+ false,
+ Optional.of(ConsumerGroupState.STABLE),
+ Optional.of(GroupType.CLASSIC)
+ )
+ )
var foundListing = Set.empty[ConsumerGroupListing]
TestUtils.waitUntilTrue(() => {
- foundListing =
service.listConsumerGroupsWithState(ConsumerGroupState.values.toSet).toSet
+ foundListing = service.listConsumerGroupsWithFilters(Set.empty,
ConsumerGroupState.values.toSet).toSet
expectedListing == foundListing
}, s"Expected to show groups $expectedListing, but found $foundListing")
- val expectedListingStable = Set(
- new ConsumerGroupListing(group, false,
Optional.of(ConsumerGroupState.STABLE)))
+ val expectedListingStable = Set.empty[ConsumerGroupListing]
foundListing = Set.empty[ConsumerGroupListing]
TestUtils.waitUntilTrue(() => {
- foundListing =
service.listConsumerGroupsWithState(Set(ConsumerGroupState.STABLE)).toSet
+ foundListing = service.listConsumerGroupsWithFilters(Set.empty,
Set(ConsumerGroupState.PREPARING_REBALANCE)).toSet
Review Comment:
Why are we changing this? It you really want to test with a state that does
not exist, it may be better to keep this one as it was and to add it after it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]