dajac commented on code in PR #15150:
URL: https://github.com/apache/kafka/pull/15150#discussion_r1457193686
##########
clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsOptions.java:
##########
@@ -34,13 +34,25 @@ public class ListConsumerGroupsOptions extends
AbstractOptions<ListConsumerGroup
private Set<ConsumerGroupState> states = Collections.emptySet();
+ private Set<ConsumerGroupType> groupTypes = Collections.emptySet();
Review Comment:
nit: `types`?
##########
clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupListing.java:
##########
@@ -21,95 +21,111 @@
import java.util.Optional;
Review Comment:
This class is part of our public API therefore we cannot change the existing
(e.g. removing constructors). Could you please revert all the unnecessary
changes here and only add the new field and its related changes?
##########
clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsOptions.java:
##########
@@ -34,13 +34,25 @@ public class ListConsumerGroupsOptions extends
AbstractOptions<ListConsumerGroup
private Set<ConsumerGroupState> states = Collections.emptySet();
+ private Set<ConsumerGroupType> groupTypes = Collections.emptySet();
+
/**
- * If states is set, only groups in these states will be returned by
listConsumerGroups()
+ * If states is set, only groups in these states will be returned by
listConsumerGroups().
* Otherwise, all groups are returned.
* This operation is supported by brokers with version 2.6.0 or later.
*/
public ListConsumerGroupsOptions inStates(Set<ConsumerGroupState> states) {
- this.states = (states == null) ? Collections.emptySet() : new
HashSet<>(states);
+ this.states = (states == null || states.isEmpty()) ?
Collections.emptySet() : states;
+ return this;
+ }
+
+ /**
+ * If groupTypes is set, only groups of these groupTypes will be returned
by listConsumerGroups().
+ * Otherwise, all groups are returned.
+ *
+ */
+ public ListConsumerGroupsOptions inTypes(Set<ConsumerGroupType>
groupTypes) {
+ this.groupTypes = (groupTypes == null || groupTypes.isEmpty()) ?
Collections.emptySet() : groupTypes;
Review Comment:
Should we make a copy of the types?
##########
clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsOptions.java:
##########
@@ -34,13 +34,25 @@ public class ListConsumerGroupsOptions extends
AbstractOptions<ListConsumerGroup
private Set<ConsumerGroupState> states = Collections.emptySet();
+ private Set<ConsumerGroupType> groupTypes = Collections.emptySet();
+
/**
- * If states is set, only groups in these states will be returned by
listConsumerGroups()
+ * If states is set, only groups in these states will be returned by
listConsumerGroups().
* Otherwise, all groups are returned.
* This operation is supported by brokers with version 2.6.0 or later.
*/
public ListConsumerGroupsOptions inStates(Set<ConsumerGroupState> states) {
- this.states = (states == null) ? Collections.emptySet() : new
HashSet<>(states);
+ this.states = (states == null || states.isEmpty()) ?
Collections.emptySet() : states;
+ return this;
+ }
+
+ /**
+ * If groupTypes is set, only groups of these groupTypes will be returned
by listConsumerGroups().
+ * Otherwise, all groups are returned.
+ *
Review Comment:
nit: This empty line could be removed.
##########
core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala:
##########
@@ -64,28 +64,89 @@ class ListConsumerGroupTest extends
ConsumerGroupCommandTest {
val service = getConsumerGroupService(cgcArgs)
val expectedListing = Set(
- new ConsumerGroupListing(simpleGroup, true,
Optional.of(ConsumerGroupState.EMPTY)),
- new ConsumerGroupListing(group, false,
Optional.of(ConsumerGroupState.STABLE)))
+ new ConsumerGroupListing(simpleGroup, true)
Review Comment:
Do we need all the changes in this test? It may be better to keep it as it
was.
##########
core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala:
##########
@@ -46,16 +46,16 @@ class ListConsumerGroupTest extends
ConsumerGroupCommandTest {
}, s"Expected --list to show groups $expectedGroups, but found
$foundGroups.")
}
- @ParameterizedTest
- @ValueSource(strings = Array("zk", "kraft"))
- def testListWithUnrecognizedNewConsumerOption(): Unit = {
+ @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+ @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+ def testListWithUnrecognizedNewConsumerOption(quorum: String, groupProtocol:
String): Unit = {
Review Comment:
Does it bring any value to test all the combinations in this case?
##########
core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala:
##########
@@ -64,28 +64,89 @@ class ListConsumerGroupTest extends
ConsumerGroupCommandTest {
val service = getConsumerGroupService(cgcArgs)
val expectedListing = Set(
- new ConsumerGroupListing(simpleGroup, true,
Optional.of(ConsumerGroupState.EMPTY)),
- new ConsumerGroupListing(group, false,
Optional.of(ConsumerGroupState.STABLE)))
+ new ConsumerGroupListing(simpleGroup, true)
+ .setState(Optional.of(ConsumerGroupState.EMPTY))
+ .setType(if (quorum.contains("kip848"))
Optional.of(ConsumerGroupType.CLASSIC) else Optional.empty()),
+ new ConsumerGroupListing(group, false)
+ .setState(Optional.of(ConsumerGroupState.STABLE))
+ .setType(if (quorum.contains("kip848"))
Optional.of(ConsumerGroupType.CLASSIC) else Optional.empty())
+ )
var foundListing = Set.empty[ConsumerGroupListing]
TestUtils.waitUntilTrue(() => {
- foundListing =
service.listConsumerGroupsWithState(ConsumerGroupState.values.toSet).toSet
+ foundListing =
service.listConsumerGroupsWithFilters(ConsumerGroupState.values.toSet,
Set.empty).toSet
expectedListing == foundListing
}, s"Expected to show groups $expectedListing, but found $foundListing")
- val expectedListingStable = Set(
- new ConsumerGroupListing(group, false,
Optional.of(ConsumerGroupState.STABLE)))
+ val expectedListingStable = Set.empty[ConsumerGroupListing]
foundListing = Set.empty[ConsumerGroupListing]
TestUtils.waitUntilTrue(() => {
- foundListing =
service.listConsumerGroupsWithState(Set(ConsumerGroupState.STABLE)).toSet
+ foundListing =
service.listConsumerGroupsWithFilters(Set(ConsumerGroupState.PREPARING_REBALANCE),
Set.empty).toSet
expectedListingStable == foundListing
}, s"Expected to show groups $expectedListingStable, but found
$foundListing")
}
- @ParameterizedTest
- @ValueSource(strings = Array("zk", "kraft"))
- def testConsumerGroupStatesFromString(quorum: String): Unit = {
+ @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+ @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+ def testListConsumerGroupsWithTypes(quorum: String, groupProtocol: String):
Unit = {
+ val simpleGroup = "simple-group"
+ addSimpleGroupExecutor(group = simpleGroup)
Review Comment:
It would be great if we could also have a new consumer in this test.
##########
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##########
@@ -2229,7 +2229,7 @@ void handleResponse(AbstractResponse abstractResponse) {
String topicName = cluster.topicName(topicId);
if (topicName == null) {
- future.completeExceptionally(new
UnknownTopicIdException("TopicId " + topicId + " not found."));
+ future.completeExceptionally(new
InvalidTopicException("TopicId " + topicId + " not found."));
Review Comment:
Why are we changing this? It does not seem related.
##########
clients/src/main/java/org/apache/kafka/common/ConsumerGroupType.java:
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common;
+
+import java.util.Arrays;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+public enum ConsumerGroupType {
Review Comment:
I would rather call it `GroupType`.
##########
clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsOptions.java:
##########
@@ -34,13 +34,25 @@ public class ListConsumerGroupsOptions extends
AbstractOptions<ListConsumerGroup
private Set<ConsumerGroupState> states = Collections.emptySet();
+ private Set<ConsumerGroupType> groupTypes = Collections.emptySet();
+
/**
- * If states is set, only groups in these states will be returned by
listConsumerGroups()
+ * If states is set, only groups in these states will be returned by
listConsumerGroups().
* Otherwise, all groups are returned.
* This operation is supported by brokers with version 2.6.0 or later.
*/
public ListConsumerGroupsOptions inStates(Set<ConsumerGroupState> states) {
- this.states = (states == null) ? Collections.emptySet() : new
HashSet<>(states);
+ this.states = (states == null || states.isEmpty()) ?
Collections.emptySet() : states;
+ return this;
+ }
+
+ /**
+ * If groupTypes is set, only groups of these groupTypes will be returned
by listConsumerGroups().
+ * Otherwise, all groups are returned.
+ *
+ */
+ public ListConsumerGroupsOptions inTypes(Set<ConsumerGroupType>
groupTypes) {
Review Comment:
This KIP says `withTypes` here.
##########
core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala:
##########
@@ -102,6 +103,15 @@ object ConsumerGroupCommand extends Logging {
parsedStates
}
+ def consumerGroupTypesFromString(input: String): Set[ConsumerGroupType] = {
+ val parsedStates = input.split(',').map(s =>
ConsumerGroupType.parse(s.trim)).toSet
Review Comment:
nit: `parsedTypes`?
##########
clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsOptions.java:
##########
@@ -34,13 +34,25 @@ public class ListConsumerGroupsOptions extends
AbstractOptions<ListConsumerGroup
private Set<ConsumerGroupState> states = Collections.emptySet();
+ private Set<ConsumerGroupType> groupTypes = Collections.emptySet();
+
/**
- * If states is set, only groups in these states will be returned by
listConsumerGroups()
+ * If states is set, only groups in these states will be returned by
listConsumerGroups().
* Otherwise, all groups are returned.
* This operation is supported by brokers with version 2.6.0 or later.
*/
public ListConsumerGroupsOptions inStates(Set<ConsumerGroupState> states) {
- this.states = (states == null) ? Collections.emptySet() : new
HashSet<>(states);
+ this.states = (states == null || states.isEmpty()) ?
Collections.emptySet() : states;
+ return this;
+ }
+
+ /**
+ * If groupTypes is set, only groups of these groupTypes will be returned
by listConsumerGroups().
Review Comment:
nit: `groupTypes` -> `types`? We could change it everywhere in this file.
##########
core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala:
##########
@@ -187,16 +197,69 @@ object ConsumerGroupCommand extends Logging {
}
def listGroups(): Unit = {
- if (opts.options.has(opts.stateOpt)) {
- val stateValue = opts.options.valueOf(opts.stateOpt)
- val states = if (stateValue == null || stateValue.isEmpty)
- Set[ConsumerGroupState]()
- else
- consumerGroupStatesFromString(stateValue)
- val listings = listConsumerGroupsWithState(states)
- printGroupStates(listings.map(e => (e.groupId, e.state.get.toString)))
- } else
+ val includeState = opts.options.has(opts.stateOpt)
+ val includeType = opts.options.has(opts.typeOpt)
+
+ val groupInfoMap = mutable.Map[String, (String, String)]()
+
+ if (includeType || includeState) {
+ val states = getStateValues()
+ val types = getTypeValues()
+ val listings = {
+ listConsumerGroupsWithFilters(states, types)
+ }
+
+ listings.foreach { listing =>
+ val groupId = listing.groupId
+ val groupType =
listing.groupType().orElse(ConsumerGroupType.UNKNOWN).toString
+ val state =
listing.state().orElse(ConsumerGroupState.UNKNOWN).toString
+ groupInfoMap.update(groupId, (state, groupType))
+ }
+
+ val groupInfoList = groupInfoMap.toList.map { case (groupId, (state,
groupType)) => (groupId, state, groupType) }
+ printGroupInfo(groupInfoList, includeState, includeType)
+
+ } else {
listConsumerGroups().foreach(println(_))
+ }
+ }
+
+ private def getStateValues(): Set[ConsumerGroupState] = {
+ val stateValue = opts.options.valueOf(opts.stateOpt)
+ if (stateValue == null || stateValue.isEmpty)
+ Set[ConsumerGroupState]()
+ else
+ consumerGroupStatesFromString(stateValue)
+ }
+
+ private def getTypeValues(): Set[ConsumerGroupType] = {
+ val typeValue = opts.options.valueOf(opts.typeOpt)
+ if (typeValue == null || typeValue.isEmpty)
+ Set[ConsumerGroupType]()
+ else
+ consumerGroupTypesFromString(typeValue)
+ }
+
+ private def printGroupInfo(groupsAndInfo: List[(String, String, String)],
includeState: Boolean, includeType: Boolean): Unit = {
+ val maxGroupLen: Int = groupsAndInfo.foldLeft(15)((maxLen, group) =>
Math.max(maxLen, group._1.length))
+ var header = "GROUP"
Review Comment:
It may be better to use a List[String].
##########
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java:
##########
@@ -2500,12 +2500,12 @@ public void testDescribeTopicsByIds() {
try {
Review Comment:
Should we add tests with group types?
##########
clients/src/main/java/org/apache/kafka/common/ConsumerGroupType.java:
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common;
+
+import java.util.Arrays;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+public enum ConsumerGroupType {
+ UNKNOWN("unknown"),
+ CONSUMER("consumer"),
+ CLASSIC("classic");
+
+ private final static Map<String, ConsumerGroupType> NAME_TO_ENUM =
Arrays.stream(values())
+ .collect(Collectors.toMap(type -> type.name, Function.identity()));
+
+ private final String name;
+
+ ConsumerGroupType(String name) {
+ this.name = name;
+ }
+
+ /**
+ * Parse a string into a consumer group type.
+ */
+ public static ConsumerGroupType parse(String name) {
Review Comment:
Should we make this case insensitive?
##########
core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala:
##########
@@ -187,16 +197,69 @@ object ConsumerGroupCommand extends Logging {
}
def listGroups(): Unit = {
- if (opts.options.has(opts.stateOpt)) {
- val stateValue = opts.options.valueOf(opts.stateOpt)
- val states = if (stateValue == null || stateValue.isEmpty)
- Set[ConsumerGroupState]()
- else
- consumerGroupStatesFromString(stateValue)
- val listings = listConsumerGroupsWithState(states)
- printGroupStates(listings.map(e => (e.groupId, e.state.get.toString)))
- } else
+ val includeState = opts.options.has(opts.stateOpt)
+ val includeType = opts.options.has(opts.typeOpt)
+
+ val groupInfoMap = mutable.Map[String, (String, String)]()
+
+ if (includeType || includeState) {
+ val states = getStateValues()
+ val types = getTypeValues()
+ val listings = {
+ listConsumerGroupsWithFilters(states, types)
+ }
+
+ listings.foreach { listing =>
+ val groupId = listing.groupId
+ val groupType =
listing.groupType().orElse(ConsumerGroupType.UNKNOWN).toString
+ val state =
listing.state().orElse(ConsumerGroupState.UNKNOWN).toString
+ groupInfoMap.update(groupId, (state, groupType))
+ }
+
+ val groupInfoList = groupInfoMap.toList.map { case (groupId, (state,
groupType)) => (groupId, state, groupType) }
+ printGroupInfo(groupInfoList, includeState, includeType)
+
+ } else {
listConsumerGroups().foreach(println(_))
+ }
+ }
+
+ private def getStateValues(): Set[ConsumerGroupState] = {
+ val stateValue = opts.options.valueOf(opts.stateOpt)
+ if (stateValue == null || stateValue.isEmpty)
+ Set[ConsumerGroupState]()
+ else
+ consumerGroupStatesFromString(stateValue)
+ }
+
+ private def getTypeValues(): Set[ConsumerGroupType] = {
+ val typeValue = opts.options.valueOf(opts.typeOpt)
+ if (typeValue == null || typeValue.isEmpty)
+ Set[ConsumerGroupType]()
+ else
+ consumerGroupTypesFromString(typeValue)
+ }
+
+ private def printGroupInfo(groupsAndInfo: List[(String, String, String)],
includeState: Boolean, includeType: Boolean): Unit = {
+ val maxGroupLen: Int = groupsAndInfo.foldLeft(15)((maxLen, group) =>
Math.max(maxLen, group._1.length))
+ var header = "GROUP"
+ var format = s"%-${maxGroupLen}s"
+
+ if (includeState) {
+ header += " STATE"
+ format += " %-20s"
+ }
+ if (includeType) {
+ header += " TYPE"
+ format += " %-20s"
+ }
Review Comment:
nit: I would print the type before the state.
##########
clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsOptions.java:
##########
@@ -34,13 +34,25 @@ public class ListConsumerGroupsOptions extends
AbstractOptions<ListConsumerGroup
private Set<ConsumerGroupState> states = Collections.emptySet();
+ private Set<ConsumerGroupType> groupTypes = Collections.emptySet();
+
/**
- * If states is set, only groups in these states will be returned by
listConsumerGroups()
+ * If states is set, only groups in these states will be returned by
listConsumerGroups().
* Otherwise, all groups are returned.
* This operation is supported by brokers with version 2.6.0 or later.
*/
public ListConsumerGroupsOptions inStates(Set<ConsumerGroupState> states) {
- this.states = (states == null) ? Collections.emptySet() : new
HashSet<>(states);
+ this.states = (states == null || states.isEmpty()) ?
Collections.emptySet() : states;
Review Comment:
Why are we changing this? Making a copy of `states` seems to be the right
thing to do here.
##########
clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsOptions.java:
##########
@@ -50,4 +62,11 @@ public ListConsumerGroupsOptions
inStates(Set<ConsumerGroupState> states) {
public Set<ConsumerGroupState> states() {
return states;
}
+
+ /**
+ * Returns the list of types that are requested or empty if no groupTypes
have been specified
+ */
+ public Set<ConsumerGroupType> groupTypes() {
Review Comment:
nit: `types`?
##########
core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala:
##########
@@ -46,16 +46,16 @@ class ListConsumerGroupTest extends
ConsumerGroupCommandTest {
}, s"Expected --list to show groups $expectedGroups, but found
$foundGroups.")
}
- @ParameterizedTest
- @ValueSource(strings = Array("zk", "kraft"))
- def testListWithUnrecognizedNewConsumerOption(): Unit = {
+ @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+ @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+ def testListWithUnrecognizedNewConsumerOption(quorum: String, groupProtocol:
String): Unit = {
Review Comment:
Does it bring any value to test all the combinations in this case?
##########
core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala:
##########
@@ -64,28 +64,89 @@ class ListConsumerGroupTest extends
ConsumerGroupCommandTest {
val service = getConsumerGroupService(cgcArgs)
val expectedListing = Set(
- new ConsumerGroupListing(simpleGroup, true,
Optional.of(ConsumerGroupState.EMPTY)),
- new ConsumerGroupListing(group, false,
Optional.of(ConsumerGroupState.STABLE)))
+ new ConsumerGroupListing(simpleGroup, true)
+ .setState(Optional.of(ConsumerGroupState.EMPTY))
+ .setType(if (quorum.contains("kip848"))
Optional.of(ConsumerGroupType.CLASSIC) else Optional.empty()),
+ new ConsumerGroupListing(group, false)
+ .setState(Optional.of(ConsumerGroupState.STABLE))
+ .setType(if (quorum.contains("kip848"))
Optional.of(ConsumerGroupType.CLASSIC) else Optional.empty())
+ )
var foundListing = Set.empty[ConsumerGroupListing]
TestUtils.waitUntilTrue(() => {
- foundListing =
service.listConsumerGroupsWithState(ConsumerGroupState.values.toSet).toSet
+ foundListing =
service.listConsumerGroupsWithFilters(ConsumerGroupState.values.toSet,
Set.empty).toSet
expectedListing == foundListing
}, s"Expected to show groups $expectedListing, but found $foundListing")
- val expectedListingStable = Set(
- new ConsumerGroupListing(group, false,
Optional.of(ConsumerGroupState.STABLE)))
+ val expectedListingStable = Set.empty[ConsumerGroupListing]
foundListing = Set.empty[ConsumerGroupListing]
TestUtils.waitUntilTrue(() => {
- foundListing =
service.listConsumerGroupsWithState(Set(ConsumerGroupState.STABLE)).toSet
+ foundListing =
service.listConsumerGroupsWithFilters(Set(ConsumerGroupState.PREPARING_REBALANCE),
Set.empty).toSet
expectedListingStable == foundListing
}, s"Expected to show groups $expectedListingStable, but found
$foundListing")
}
- @ParameterizedTest
- @ValueSource(strings = Array("zk", "kraft"))
- def testConsumerGroupStatesFromString(quorum: String): Unit = {
+ @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+ @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+ def testListConsumerGroupsWithTypes(quorum: String, groupProtocol: String):
Unit = {
+ val simpleGroup = "simple-group"
+ addSimpleGroupExecutor(group = simpleGroup)
+ addConsumerGroupExecutor(numConsumers = 1)
+
+ val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--list",
"--type")
+ val service = getConsumerGroupService(cgcArgs)
+
+ val expectedListingStable = Set.empty[ConsumerGroupListing]
+
+ val expectedListing = Set(
+ new ConsumerGroupListing(simpleGroup, true)
+ .setState(Optional.of(ConsumerGroupState.EMPTY))
+ .setType(if(quorum.contains("kip848"))
Optional.of(ConsumerGroupType.CLASSIC) else Optional.empty()),
Review Comment:
Why are we doing this? The type should work in all combinations, no?
##########
core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala:
##########
@@ -187,16 +197,69 @@ object ConsumerGroupCommand extends Logging {
}
def listGroups(): Unit = {
- if (opts.options.has(opts.stateOpt)) {
- val stateValue = opts.options.valueOf(opts.stateOpt)
- val states = if (stateValue == null || stateValue.isEmpty)
- Set[ConsumerGroupState]()
- else
- consumerGroupStatesFromString(stateValue)
- val listings = listConsumerGroupsWithState(states)
- printGroupStates(listings.map(e => (e.groupId, e.state.get.toString)))
- } else
+ val includeState = opts.options.has(opts.stateOpt)
+ val includeType = opts.options.has(opts.typeOpt)
+
+ val groupInfoMap = mutable.Map[String, (String, String)]()
+
+ if (includeType || includeState) {
+ val states = getStateValues()
+ val types = getTypeValues()
+ val listings = {
+ listConsumerGroupsWithFilters(states, types)
+ }
+
+ listings.foreach { listing =>
+ val groupId = listing.groupId
+ val groupType =
listing.groupType().orElse(ConsumerGroupType.UNKNOWN).toString
+ val state =
listing.state().orElse(ConsumerGroupState.UNKNOWN).toString
+ groupInfoMap.update(groupId, (state, groupType))
+ }
+
+ val groupInfoList = groupInfoMap.toList.map { case (groupId, (state,
groupType)) => (groupId, state, groupType) }
Review Comment:
I wonder if this conversion is really necessary. Passing the Map to
`printGroupInfo` seems reasonable too.
##########
core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala:
##########
@@ -64,28 +64,89 @@ class ListConsumerGroupTest extends
ConsumerGroupCommandTest {
val service = getConsumerGroupService(cgcArgs)
val expectedListing = Set(
- new ConsumerGroupListing(simpleGroup, true,
Optional.of(ConsumerGroupState.EMPTY)),
- new ConsumerGroupListing(group, false,
Optional.of(ConsumerGroupState.STABLE)))
+ new ConsumerGroupListing(simpleGroup, true)
+ .setState(Optional.of(ConsumerGroupState.EMPTY))
+ .setType(if (quorum.contains("kip848"))
Optional.of(ConsumerGroupType.CLASSIC) else Optional.empty()),
+ new ConsumerGroupListing(group, false)
+ .setState(Optional.of(ConsumerGroupState.STABLE))
+ .setType(if (quorum.contains("kip848"))
Optional.of(ConsumerGroupType.CLASSIC) else Optional.empty())
+ )
var foundListing = Set.empty[ConsumerGroupListing]
TestUtils.waitUntilTrue(() => {
- foundListing =
service.listConsumerGroupsWithState(ConsumerGroupState.values.toSet).toSet
+ foundListing =
service.listConsumerGroupsWithFilters(ConsumerGroupState.values.toSet,
Set.empty).toSet
expectedListing == foundListing
}, s"Expected to show groups $expectedListing, but found $foundListing")
- val expectedListingStable = Set(
- new ConsumerGroupListing(group, false,
Optional.of(ConsumerGroupState.STABLE)))
+ val expectedListingStable = Set.empty[ConsumerGroupListing]
foundListing = Set.empty[ConsumerGroupListing]
TestUtils.waitUntilTrue(() => {
- foundListing =
service.listConsumerGroupsWithState(Set(ConsumerGroupState.STABLE)).toSet
+ foundListing =
service.listConsumerGroupsWithFilters(Set(ConsumerGroupState.PREPARING_REBALANCE),
Set.empty).toSet
expectedListingStable == foundListing
}, s"Expected to show groups $expectedListingStable, but found
$foundListing")
}
- @ParameterizedTest
- @ValueSource(strings = Array("zk", "kraft"))
- def testConsumerGroupStatesFromString(quorum: String): Unit = {
+ @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+ @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+ def testListConsumerGroupsWithTypes(quorum: String, groupProtocol: String):
Unit = {
+ val simpleGroup = "simple-group"
+ addSimpleGroupExecutor(group = simpleGroup)
+ addConsumerGroupExecutor(numConsumers = 1)
+
+ val cgcArgs = Array("--bootstrap-server", bootstrapServers(), "--list",
"--type")
+ val service = getConsumerGroupService(cgcArgs)
+
+ val expectedListingStable = Set.empty[ConsumerGroupListing]
+
+ val expectedListing = Set(
+ new ConsumerGroupListing(simpleGroup, true)
+ .setState(Optional.of(ConsumerGroupState.EMPTY))
+ .setType(if(quorum.contains("kip848"))
Optional.of(ConsumerGroupType.CLASSIC) else Optional.empty()),
+ new ConsumerGroupListing(group, false)
+ .setState(Optional.of(ConsumerGroupState.STABLE))
+ .setType(if(quorum.contains("kip848"))
Optional.of(ConsumerGroupType.CLASSIC) else Optional.empty())
+ )
+
+ var foundListing = Set.empty[ConsumerGroupListing]
+ TestUtils.waitUntilTrue(() => {
+ foundListing = service.listConsumerGroupsWithFilters(Set.empty,
Set.empty).toSet
+ expectedListing == foundListing
+ }, s"Expected to show groups $expectedListing, but found $foundListing")
+
+ // When group type is mentioned:
Review Comment:
This test is a bit confusing. I wonder if it would be better to split it in
multiple tests. For instance, we could have one testing the old coordinator in
ZK or KRaft mode. Then on testing the new coordinator in KRaft mode. What do
you think?
##########
core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala:
##########
@@ -18,19 +18,19 @@ package kafka.admin
import joptsimple.OptionException
import org.junit.jupiter.api.Assertions._
-import kafka.utils.TestUtils
-import org.apache.kafka.common.ConsumerGroupState
+import kafka.utils.{TestInfoUtils, TestUtils}
import org.apache.kafka.clients.admin.ConsumerGroupListing
-import java.util.Optional
-
+import org.apache.kafka.common.{ConsumerGroupState, ConsumerGroupType}
import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.ValueSource
+import org.junit.jupiter.params.provider.MethodSource
+
+import java.util.Optional
class ListConsumerGroupTest extends ConsumerGroupCommandTest {
- @ParameterizedTest
- @ValueSource(strings = Array("zk", "kraft"))
- def testListConsumerGroups(quorum: String): Unit = {
+ @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+ @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+ def testListConsumerGroupsWithoutFilters(quorum: String, groupProtocol:
String): Unit = {
Review Comment:
Does `groupProtocol` have any effect on the consumer created in this test?
##########
core/src/test/scala/unit/kafka/admin/ConsumerGroupCommandTest.scala:
##########
@@ -116,6 +123,18 @@ class ConsumerGroupCommandTest extends
KafkaServerTestHarness {
}
object ConsumerGroupCommandTest {
+ // We want to test the following combinations:
+ // * ZooKeeper and the classic group protocol.
+ // * KRaft and the classic group protocol.
+ // * KRaft with the new group coordinator enabled and the classic group
protocol.
+ // * KRaft with the new group coordinator enabled and the consumer group
protocol.
+ def getTestQuorumAndGroupProtocolParametersAll:
java.util.stream.Stream[Arguments] = {
Review Comment:
We already have the same one defined in BaseConsumerTest. I would if we
could reuse it. Could we?
##########
core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala:
##########
@@ -46,16 +46,16 @@ class ListConsumerGroupTest extends
ConsumerGroupCommandTest {
}, s"Expected --list to show groups $expectedGroups, but found
$foundGroups.")
}
- @ParameterizedTest
- @ValueSource(strings = Array("zk", "kraft"))
- def testListWithUnrecognizedNewConsumerOption(): Unit = {
+ @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+ @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+ def testListWithUnrecognizedNewConsumerOption(quorum: String, groupProtocol:
String): Unit = {
val cgcArgs = Array("--new-consumer", "--bootstrap-server",
bootstrapServers(), "--list")
assertThrows(classOf[OptionException], () =>
getConsumerGroupService(cgcArgs))
}
- @ParameterizedTest
- @ValueSource(strings = Array("zk", "kraft"))
- def testListConsumerGroupsWithStates(): Unit = {
+ @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+ @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+ def testListConsumerGroupsWithStates(quorum: String, groupProtocol: String):
Unit = {
Review Comment:
Same question. Is it necessary? Or would using `@ValueSource(strings =
Array("zk", "kraft", "kraft+kip-848"))` be enough?
##########
core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala:
##########
@@ -104,9 +165,27 @@ class ListConsumerGroupTest extends
ConsumerGroupCommandTest {
assertThrows(classOf[IllegalArgumentException], () =>
ConsumerGroupCommand.consumerGroupStatesFromString(" , ,"))
}
- @ParameterizedTest
- @ValueSource(strings = Array("zk", "kraft"))
- def testListGroupCommand(quorum: String): Unit = {
+ @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+ @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+ def testConsumerGroupTypesFromString(quorum: String, groupProtocol: String):
Unit = {
+ var result = ConsumerGroupCommand.consumerGroupTypesFromString("consumer")
+ assertEquals(Set(ConsumerGroupType.CONSUMER), result)
+
+ result = ConsumerGroupCommand.consumerGroupTypesFromString("consumer,
classic")
+ assertEquals(Set(ConsumerGroupType.CONSUMER, ConsumerGroupType.CLASSIC),
result)
+
+ assertThrows(classOf[IllegalArgumentException], () =>
ConsumerGroupCommand.consumerGroupTypesFromString("bad, wrong"))
+
+ assertThrows(classOf[IllegalArgumentException], () =>
ConsumerGroupCommand.consumerGroupTypesFromString("Consumer"))
Review Comment:
hum... I expect this to actually work.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]