dajac commented on code in PR #15150:
URL: https://github.com/apache/kafka/pull/15150#discussion_r1502279590
##########
tools/src/test/java/org/apache/kafka/tools/consumer/group/ListConsumerGroupTest.java:
##########
@@ -20,83 +20,258 @@
import kafka.admin.ConsumerGroupCommand;
import org.apache.kafka.clients.admin.ConsumerGroupListing;
import org.apache.kafka.common.ConsumerGroupState;
+import org.apache.kafka.common.GroupType;
import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
+import org.junit.jupiter.params.provider.MethodSource;
import java.util.Arrays;
import java.util.Collections;
+import java.util.EnumSet;
import java.util.HashSet;
-import java.util.List;
import java.util.Objects;
import java.util.Optional;
+import java.util.Properties;
+import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
+import static
org.apache.kafka.tools.ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES;
import static org.apache.kafka.common.utils.Utils.mkSet;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
public class ListConsumerGroupTest extends ConsumerGroupCommandTest {
- @ParameterizedTest
- @ValueSource(strings = {"zk", "kraft"})
- public void testListConsumerGroups(String quorum) throws Exception {
+ @ParameterizedTest(name =
TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES)
+ @MethodSource("getTestQuorumAndGroupProtocolParametersAll")
+ public void testListConsumerGroupsWithoutFilters(String quorum, String
groupProtocol) throws Exception {
String simpleGroup = "simple-group";
+
+ createOffsetsTopic(listenerName(), new Properties());
+
addSimpleGroupExecutor(simpleGroup);
addConsumerGroupExecutor(1);
+ addConsumerGroupExecutor(1, PROTOCOL_GROUP, groupProtocol);
String[] cgcArgs = new String[]{"--bootstrap-server",
bootstrapServers(listenerName()), "--list"};
ConsumerGroupCommand.ConsumerGroupService service =
getConsumerGroupService(cgcArgs);
- scala.collection.Set<String> expectedGroups = set(Arrays.asList(GROUP,
simpleGroup));
+
+ scala.collection.Set<String> expectedGroups = set(Arrays.asList(GROUP,
simpleGroup, PROTOCOL_GROUP));
final AtomicReference<scala.collection.Set> foundGroups = new
AtomicReference<>();
+
TestUtils.waitForCondition(() -> {
foundGroups.set(service.listConsumerGroups().toSet());
return Objects.equals(expectedGroups, foundGroups.get());
}, "Expected --list to show groups " + expectedGroups + ", but found "
+ foundGroups.get() + ".");
}
- @ParameterizedTest
- @ValueSource(strings = {"zk", "kraft"})
+ @Test
public void testListWithUnrecognizedNewConsumerOption() {
String[] cgcArgs = new String[]{"--new-consumer",
"--bootstrap-server", bootstrapServers(listenerName()), "--list"};
assertThrows(OptionException.class, () ->
getConsumerGroupService(cgcArgs));
}
- @ParameterizedTest
- @ValueSource(strings = {"zk", "kraft"})
- public void testListConsumerGroupsWithStates() throws Exception {
+ @ParameterizedTest(name =
TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES)
+ @MethodSource("getTestQuorumAndGroupProtocolParametersAll")
+ public void testListConsumerGroupsWithStates(String quorum, String
groupProtocol) throws Exception {
String simpleGroup = "simple-group";
+
+ createOffsetsTopic(listenerName(), new Properties());
+
addSimpleGroupExecutor(simpleGroup);
- addConsumerGroupExecutor(1);
+ addConsumerGroupExecutor(1, groupProtocol);
String[] cgcArgs = new String[]{"--bootstrap-server",
bootstrapServers(listenerName()), "--list", "--state"};
ConsumerGroupCommand.ConsumerGroupService service =
getConsumerGroupService(cgcArgs);
- scala.collection.Set<ConsumerGroupListing> expectedListing =
set(Arrays.asList(
- new ConsumerGroupListing(simpleGroup, true,
Optional.of(ConsumerGroupState.EMPTY)),
- new ConsumerGroupListing(GROUP, false,
Optional.of(ConsumerGroupState.STABLE))));
+ Set<ConsumerGroupListing> expectedListing = new
HashSet<>(Arrays.asList(
+ new ConsumerGroupListing(
+ simpleGroup,
+ true,
+ Optional.of(ConsumerGroupState.EMPTY),
+ Optional.of(GroupType.CLASSIC)
+ ),
+ new ConsumerGroupListing(
+ GROUP,
+ false,
+ Optional.of(ConsumerGroupState.STABLE),
+ Optional.of(GroupType.parse(groupProtocol))
+ )
+ ));
- final AtomicReference<scala.collection.Set> foundListing = new
AtomicReference<>();
- TestUtils.waitForCondition(() -> {
-
foundListing.set(service.listConsumerGroupsWithState(set(Arrays.asList(ConsumerGroupState.values()))).toSet());
- return Objects.equals(expectedListing, foundListing.get());
- }, "Expected to show groups " + expectedListing + ", but found " +
foundListing.get());
+ assertGroupListing(
+ service,
+ Collections.emptySet(),
+ EnumSet.allOf(ConsumerGroupState.class),
+ expectedListing
+ );
- scala.collection.Set<ConsumerGroupListing> expectedListingStable =
set(Collections.singleton(
- new ConsumerGroupListing(GROUP, false,
Optional.of(ConsumerGroupState.STABLE))));
+ expectedListing = new HashSet<>(Collections.singletonList(
+ new ConsumerGroupListing(
+ GROUP,
+ false,
+ Optional.of(ConsumerGroupState.STABLE),
+ Optional.of(GroupType.parse(groupProtocol))
+ )
+ ));
- foundListing.set(null);
+ assertGroupListing(
+ service,
+ Collections.emptySet(),
+ mkSet(ConsumerGroupState.STABLE),
+ expectedListing
+ );
- TestUtils.waitForCondition(() -> {
-
foundListing.set(service.listConsumerGroupsWithState(set(Collections.singleton(ConsumerGroupState.STABLE))).toSet());
- return Objects.equals(expectedListingStable, foundListing.get());
- }, "Expected to show groups " + expectedListingStable + ", but found "
+ foundListing.get());
+ assertGroupListing(
+ service,
+ Collections.emptySet(),
+ mkSet(ConsumerGroupState.PREPARING_REBALANCE),
+ Collections.emptySet()
+ );
+ }
+
+ @ParameterizedTest(name =
TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES)
+
@MethodSource("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly")
+ public void testListConsumerGroupsWithTypesClassicProtocol(String quorum,
String groupProtocol) throws Exception {
+ String simpleGroup = "simple-group";
+
+ createOffsetsTopic(listenerName(), new Properties());
+
+ addSimpleGroupExecutor(simpleGroup);
+ addConsumerGroupExecutor(1);
+
+ String[] cgcArgs = new String[]{"--bootstrap-server",
bootstrapServers(listenerName()), "--list"};
+ ConsumerGroupCommand.ConsumerGroupService service =
getConsumerGroupService(cgcArgs);
+
+ Set<ConsumerGroupListing> expectedListing = new
HashSet<>(Arrays.asList(
Review Comment:
ditto.
##########
tools/src/test/java/org/apache/kafka/tools/consumer/group/ListConsumerGroupTest.java:
##########
@@ -20,83 +20,258 @@
import kafka.admin.ConsumerGroupCommand;
import org.apache.kafka.clients.admin.ConsumerGroupListing;
import org.apache.kafka.common.ConsumerGroupState;
+import org.apache.kafka.common.GroupType;
import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
+import org.junit.jupiter.params.provider.MethodSource;
import java.util.Arrays;
import java.util.Collections;
+import java.util.EnumSet;
import java.util.HashSet;
-import java.util.List;
import java.util.Objects;
import java.util.Optional;
+import java.util.Properties;
+import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
+import static
org.apache.kafka.tools.ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES;
import static org.apache.kafka.common.utils.Utils.mkSet;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
public class ListConsumerGroupTest extends ConsumerGroupCommandTest {
- @ParameterizedTest
- @ValueSource(strings = {"zk", "kraft"})
- public void testListConsumerGroups(String quorum) throws Exception {
+ @ParameterizedTest(name =
TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES)
+ @MethodSource("getTestQuorumAndGroupProtocolParametersAll")
+ public void testListConsumerGroupsWithoutFilters(String quorum, String
groupProtocol) throws Exception {
String simpleGroup = "simple-group";
+
+ createOffsetsTopic(listenerName(), new Properties());
+
addSimpleGroupExecutor(simpleGroup);
addConsumerGroupExecutor(1);
+ addConsumerGroupExecutor(1, PROTOCOL_GROUP, groupProtocol);
String[] cgcArgs = new String[]{"--bootstrap-server",
bootstrapServers(listenerName()), "--list"};
ConsumerGroupCommand.ConsumerGroupService service =
getConsumerGroupService(cgcArgs);
- scala.collection.Set<String> expectedGroups = set(Arrays.asList(GROUP,
simpleGroup));
+
+ scala.collection.Set<String> expectedGroups = set(Arrays.asList(GROUP,
simpleGroup, PROTOCOL_GROUP));
final AtomicReference<scala.collection.Set> foundGroups = new
AtomicReference<>();
+
TestUtils.waitForCondition(() -> {
foundGroups.set(service.listConsumerGroups().toSet());
return Objects.equals(expectedGroups, foundGroups.get());
}, "Expected --list to show groups " + expectedGroups + ", but found "
+ foundGroups.get() + ".");
}
- @ParameterizedTest
- @ValueSource(strings = {"zk", "kraft"})
+ @Test
public void testListWithUnrecognizedNewConsumerOption() {
String[] cgcArgs = new String[]{"--new-consumer",
"--bootstrap-server", bootstrapServers(listenerName()), "--list"};
assertThrows(OptionException.class, () ->
getConsumerGroupService(cgcArgs));
}
- @ParameterizedTest
- @ValueSource(strings = {"zk", "kraft"})
- public void testListConsumerGroupsWithStates() throws Exception {
+ @ParameterizedTest(name =
TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES)
+ @MethodSource("getTestQuorumAndGroupProtocolParametersAll")
+ public void testListConsumerGroupsWithStates(String quorum, String
groupProtocol) throws Exception {
String simpleGroup = "simple-group";
+
+ createOffsetsTopic(listenerName(), new Properties());
+
addSimpleGroupExecutor(simpleGroup);
- addConsumerGroupExecutor(1);
+ addConsumerGroupExecutor(1, groupProtocol);
String[] cgcArgs = new String[]{"--bootstrap-server",
bootstrapServers(listenerName()), "--list", "--state"};
ConsumerGroupCommand.ConsumerGroupService service =
getConsumerGroupService(cgcArgs);
- scala.collection.Set<ConsumerGroupListing> expectedListing =
set(Arrays.asList(
- new ConsumerGroupListing(simpleGroup, true,
Optional.of(ConsumerGroupState.EMPTY)),
- new ConsumerGroupListing(GROUP, false,
Optional.of(ConsumerGroupState.STABLE))));
+ Set<ConsumerGroupListing> expectedListing = new
HashSet<>(Arrays.asList(
+ new ConsumerGroupListing(
+ simpleGroup,
+ true,
+ Optional.of(ConsumerGroupState.EMPTY),
+ Optional.of(GroupType.CLASSIC)
+ ),
+ new ConsumerGroupListing(
+ GROUP,
+ false,
+ Optional.of(ConsumerGroupState.STABLE),
+ Optional.of(GroupType.parse(groupProtocol))
+ )
+ ));
- final AtomicReference<scala.collection.Set> foundListing = new
AtomicReference<>();
- TestUtils.waitForCondition(() -> {
-
foundListing.set(service.listConsumerGroupsWithState(set(Arrays.asList(ConsumerGroupState.values()))).toSet());
- return Objects.equals(expectedListing, foundListing.get());
- }, "Expected to show groups " + expectedListing + ", but found " +
foundListing.get());
+ assertGroupListing(
+ service,
+ Collections.emptySet(),
+ EnumSet.allOf(ConsumerGroupState.class),
+ expectedListing
+ );
- scala.collection.Set<ConsumerGroupListing> expectedListingStable =
set(Collections.singleton(
- new ConsumerGroupListing(GROUP, false,
Optional.of(ConsumerGroupState.STABLE))));
+ expectedListing = new HashSet<>(Collections.singletonList(
+ new ConsumerGroupListing(
+ GROUP,
+ false,
+ Optional.of(ConsumerGroupState.STABLE),
+ Optional.of(GroupType.parse(groupProtocol))
+ )
+ ));
- foundListing.set(null);
+ assertGroupListing(
+ service,
+ Collections.emptySet(),
+ mkSet(ConsumerGroupState.STABLE),
+ expectedListing
+ );
- TestUtils.waitForCondition(() -> {
-
foundListing.set(service.listConsumerGroupsWithState(set(Collections.singleton(ConsumerGroupState.STABLE))).toSet());
- return Objects.equals(expectedListingStable, foundListing.get());
- }, "Expected to show groups " + expectedListingStable + ", but found "
+ foundListing.get());
+ assertGroupListing(
+ service,
+ Collections.emptySet(),
+ mkSet(ConsumerGroupState.PREPARING_REBALANCE),
+ Collections.emptySet()
+ );
+ }
+
+ @ParameterizedTest(name =
TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES)
+
@MethodSource("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly")
+ public void testListConsumerGroupsWithTypesClassicProtocol(String quorum,
String groupProtocol) throws Exception {
+ String simpleGroup = "simple-group";
+
+ createOffsetsTopic(listenerName(), new Properties());
+
+ addSimpleGroupExecutor(simpleGroup);
+ addConsumerGroupExecutor(1);
+
+ String[] cgcArgs = new String[]{"--bootstrap-server",
bootstrapServers(listenerName()), "--list"};
+ ConsumerGroupCommand.ConsumerGroupService service =
getConsumerGroupService(cgcArgs);
+
+ Set<ConsumerGroupListing> expectedListing = new
HashSet<>(Arrays.asList(
+ new ConsumerGroupListing(
+ simpleGroup,
+ true,
+ Optional.of(ConsumerGroupState.EMPTY),
+ Optional.of(GroupType.CLASSIC)
+ ),
+ new ConsumerGroupListing(
+ GROUP,
+ false,
+ Optional.of(ConsumerGroupState.STABLE),
+ Optional.of(GroupType.CLASSIC)
+ )
+ ));
+
+ // No filters explicitly mentioned. Expectation is that all groups are
returned.
+ assertGroupListing(
+ service,
+ Collections.emptySet(),
+ Collections.emptySet(),
+ expectedListing
+ );
+
+ // When group type is mentioned:
+ // Old Group Coordinator returns empty listings if the type is not
Classic.
+ // New Group Coordinator returns groups according to the filter.
+ assertGroupListing(
+ service,
+ mkSet(GroupType.CONSUMER),
+ Collections.emptySet(),
+ Collections.emptySet()
+ );
+
+ assertGroupListing(
+ service,
+ mkSet(GroupType.CLASSIC),
+ Collections.emptySet(),
+ expectedListing
+ );
}
- @ParameterizedTest
- @ValueSource(strings = {"zk", "kraft"})
- public void testConsumerGroupStatesFromString(String quorum) {
+ @ParameterizedTest(name =
TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES)
+
@MethodSource("getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly")
+ public void testListConsumerGroupsWithTypesConsumerProtocol(String quorum,
String groupProtocol) throws Exception {
+ String simpleGroup = "simple-group";
+
+ createOffsetsTopic(listenerName(), new Properties());
+
+ addSimpleGroupExecutor(simpleGroup);
+ addConsumerGroupExecutor(1);
+ addConsumerGroupExecutor(1, PROTOCOL_GROUP, groupProtocol);
+
+ String[] cgcArgs = new String[]{"--bootstrap-server",
bootstrapServers(listenerName()), "--list"};
+ ConsumerGroupCommand.ConsumerGroupService service =
getConsumerGroupService(cgcArgs);
+
+ // No filters explicitly mentioned. Expectation is that all groups are
returned.
+ Set<ConsumerGroupListing> expectedListing = new
HashSet<>(Arrays.asList(
Review Comment:
ditto.
##########
tools/src/test/java/org/apache/kafka/tools/consumer/group/ListConsumerGroupTest.java:
##########
@@ -20,83 +20,258 @@
import kafka.admin.ConsumerGroupCommand;
import org.apache.kafka.clients.admin.ConsumerGroupListing;
import org.apache.kafka.common.ConsumerGroupState;
+import org.apache.kafka.common.GroupType;
import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
+import org.junit.jupiter.params.provider.MethodSource;
import java.util.Arrays;
import java.util.Collections;
+import java.util.EnumSet;
import java.util.HashSet;
-import java.util.List;
import java.util.Objects;
import java.util.Optional;
+import java.util.Properties;
+import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
+import static
org.apache.kafka.tools.ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES;
import static org.apache.kafka.common.utils.Utils.mkSet;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
public class ListConsumerGroupTest extends ConsumerGroupCommandTest {
- @ParameterizedTest
- @ValueSource(strings = {"zk", "kraft"})
- public void testListConsumerGroups(String quorum) throws Exception {
+ @ParameterizedTest(name =
TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES)
+ @MethodSource("getTestQuorumAndGroupProtocolParametersAll")
+ public void testListConsumerGroupsWithoutFilters(String quorum, String
groupProtocol) throws Exception {
String simpleGroup = "simple-group";
+
+ createOffsetsTopic(listenerName(), new Properties());
+
addSimpleGroupExecutor(simpleGroup);
addConsumerGroupExecutor(1);
+ addConsumerGroupExecutor(1, PROTOCOL_GROUP, groupProtocol);
String[] cgcArgs = new String[]{"--bootstrap-server",
bootstrapServers(listenerName()), "--list"};
ConsumerGroupCommand.ConsumerGroupService service =
getConsumerGroupService(cgcArgs);
- scala.collection.Set<String> expectedGroups = set(Arrays.asList(GROUP,
simpleGroup));
+
+ scala.collection.Set<String> expectedGroups = set(Arrays.asList(GROUP,
simpleGroup, PROTOCOL_GROUP));
final AtomicReference<scala.collection.Set> foundGroups = new
AtomicReference<>();
+
TestUtils.waitForCondition(() -> {
foundGroups.set(service.listConsumerGroups().toSet());
return Objects.equals(expectedGroups, foundGroups.get());
}, "Expected --list to show groups " + expectedGroups + ", but found "
+ foundGroups.get() + ".");
}
- @ParameterizedTest
- @ValueSource(strings = {"zk", "kraft"})
+ @Test
public void testListWithUnrecognizedNewConsumerOption() {
String[] cgcArgs = new String[]{"--new-consumer",
"--bootstrap-server", bootstrapServers(listenerName()), "--list"};
assertThrows(OptionException.class, () ->
getConsumerGroupService(cgcArgs));
}
- @ParameterizedTest
- @ValueSource(strings = {"zk", "kraft"})
- public void testListConsumerGroupsWithStates() throws Exception {
+ @ParameterizedTest(name =
TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES)
+ @MethodSource("getTestQuorumAndGroupProtocolParametersAll")
+ public void testListConsumerGroupsWithStates(String quorum, String
groupProtocol) throws Exception {
String simpleGroup = "simple-group";
+
+ createOffsetsTopic(listenerName(), new Properties());
+
addSimpleGroupExecutor(simpleGroup);
- addConsumerGroupExecutor(1);
+ addConsumerGroupExecutor(1, groupProtocol);
String[] cgcArgs = new String[]{"--bootstrap-server",
bootstrapServers(listenerName()), "--list", "--state"};
ConsumerGroupCommand.ConsumerGroupService service =
getConsumerGroupService(cgcArgs);
- scala.collection.Set<ConsumerGroupListing> expectedListing =
set(Arrays.asList(
- new ConsumerGroupListing(simpleGroup, true,
Optional.of(ConsumerGroupState.EMPTY)),
- new ConsumerGroupListing(GROUP, false,
Optional.of(ConsumerGroupState.STABLE))));
+ Set<ConsumerGroupListing> expectedListing = new
HashSet<>(Arrays.asList(
+ new ConsumerGroupListing(
+ simpleGroup,
+ true,
+ Optional.of(ConsumerGroupState.EMPTY),
+ Optional.of(GroupType.CLASSIC)
+ ),
+ new ConsumerGroupListing(
+ GROUP,
+ false,
+ Optional.of(ConsumerGroupState.STABLE),
+ Optional.of(GroupType.parse(groupProtocol))
+ )
+ ));
- final AtomicReference<scala.collection.Set> foundListing = new
AtomicReference<>();
- TestUtils.waitForCondition(() -> {
-
foundListing.set(service.listConsumerGroupsWithState(set(Arrays.asList(ConsumerGroupState.values()))).toSet());
- return Objects.equals(expectedListing, foundListing.get());
- }, "Expected to show groups " + expectedListing + ", but found " +
foundListing.get());
+ assertGroupListing(
+ service,
+ Collections.emptySet(),
+ EnumSet.allOf(ConsumerGroupState.class),
+ expectedListing
+ );
- scala.collection.Set<ConsumerGroupListing> expectedListingStable =
set(Collections.singleton(
- new ConsumerGroupListing(GROUP, false,
Optional.of(ConsumerGroupState.STABLE))));
+ expectedListing = new HashSet<>(Collections.singletonList(
+ new ConsumerGroupListing(
+ GROUP,
+ false,
+ Optional.of(ConsumerGroupState.STABLE),
+ Optional.of(GroupType.parse(groupProtocol))
+ )
+ ));
- foundListing.set(null);
+ assertGroupListing(
+ service,
+ Collections.emptySet(),
+ mkSet(ConsumerGroupState.STABLE),
+ expectedListing
+ );
- TestUtils.waitForCondition(() -> {
-
foundListing.set(service.listConsumerGroupsWithState(set(Collections.singleton(ConsumerGroupState.STABLE))).toSet());
- return Objects.equals(expectedListingStable, foundListing.get());
- }, "Expected to show groups " + expectedListingStable + ", but found "
+ foundListing.get());
+ assertGroupListing(
+ service,
+ Collections.emptySet(),
+ mkSet(ConsumerGroupState.PREPARING_REBALANCE),
+ Collections.emptySet()
+ );
+ }
+
+ @ParameterizedTest(name =
TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES)
+
@MethodSource("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly")
+ public void testListConsumerGroupsWithTypesClassicProtocol(String quorum,
String groupProtocol) throws Exception {
+ String simpleGroup = "simple-group";
+
+ createOffsetsTopic(listenerName(), new Properties());
+
+ addSimpleGroupExecutor(simpleGroup);
+ addConsumerGroupExecutor(1);
+
+ String[] cgcArgs = new String[]{"--bootstrap-server",
bootstrapServers(listenerName()), "--list"};
+ ConsumerGroupCommand.ConsumerGroupService service =
getConsumerGroupService(cgcArgs);
+
+ Set<ConsumerGroupListing> expectedListing = new
HashSet<>(Arrays.asList(
+ new ConsumerGroupListing(
+ simpleGroup,
+ true,
+ Optional.of(ConsumerGroupState.EMPTY),
+ Optional.of(GroupType.CLASSIC)
+ ),
+ new ConsumerGroupListing(
+ GROUP,
+ false,
+ Optional.of(ConsumerGroupState.STABLE),
+ Optional.of(GroupType.CLASSIC)
+ )
+ ));
+
+ // No filters explicitly mentioned. Expectation is that all groups are
returned.
+ assertGroupListing(
+ service,
+ Collections.emptySet(),
+ Collections.emptySet(),
+ expectedListing
+ );
+
+ // When group type is mentioned:
+ // Old Group Coordinator returns empty listings if the type is not
Classic.
+ // New Group Coordinator returns groups according to the filter.
+ assertGroupListing(
+ service,
+ mkSet(GroupType.CONSUMER),
+ Collections.emptySet(),
+ Collections.emptySet()
+ );
+
+ assertGroupListing(
+ service,
+ mkSet(GroupType.CLASSIC),
+ Collections.emptySet(),
+ expectedListing
+ );
}
- @ParameterizedTest
- @ValueSource(strings = {"zk", "kraft"})
- public void testConsumerGroupStatesFromString(String quorum) {
+ @ParameterizedTest(name =
TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES)
+
@MethodSource("getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly")
+ public void testListConsumerGroupsWithTypesConsumerProtocol(String quorum,
String groupProtocol) throws Exception {
+ String simpleGroup = "simple-group";
+
+ createOffsetsTopic(listenerName(), new Properties());
+
+ addSimpleGroupExecutor(simpleGroup);
+ addConsumerGroupExecutor(1);
+ addConsumerGroupExecutor(1, PROTOCOL_GROUP, groupProtocol);
+
+ String[] cgcArgs = new String[]{"--bootstrap-server",
bootstrapServers(listenerName()), "--list"};
+ ConsumerGroupCommand.ConsumerGroupService service =
getConsumerGroupService(cgcArgs);
+
+ // No filters explicitly mentioned. Expectation is that all groups are
returned.
+ Set<ConsumerGroupListing> expectedListing = new
HashSet<>(Arrays.asList(
+ new ConsumerGroupListing(
+ simpleGroup,
+ true,
+ Optional.of(ConsumerGroupState.EMPTY),
+ Optional.of(GroupType.CLASSIC)
+ ),
+ new ConsumerGroupListing(
+ GROUP,
+ false,
+ Optional.of(ConsumerGroupState.STABLE),
+ Optional.of(GroupType.CLASSIC)
+ ),
+ new ConsumerGroupListing(
+ PROTOCOL_GROUP,
+ false,
+ Optional.of(ConsumerGroupState.STABLE),
+ Optional.of(GroupType.CONSUMER)
+ )
+ ));
+
+ assertGroupListing(
+ service,
+ Collections.emptySet(),
+ Collections.emptySet(),
+ expectedListing
+ );
+
+ // When group type is mentioned:
+ // New Group Coordinator returns groups according to the filter.
+ expectedListing = new HashSet<>(Collections.singletonList(
Review Comment:
ditto.
##########
tools/src/test/java/org/apache/kafka/tools/consumer/group/ListConsumerGroupTest.java:
##########
@@ -20,83 +20,258 @@
import kafka.admin.ConsumerGroupCommand;
import org.apache.kafka.clients.admin.ConsumerGroupListing;
import org.apache.kafka.common.ConsumerGroupState;
+import org.apache.kafka.common.GroupType;
import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
+import org.junit.jupiter.params.provider.MethodSource;
import java.util.Arrays;
import java.util.Collections;
+import java.util.EnumSet;
import java.util.HashSet;
-import java.util.List;
import java.util.Objects;
import java.util.Optional;
+import java.util.Properties;
+import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
+import static
org.apache.kafka.tools.ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES;
import static org.apache.kafka.common.utils.Utils.mkSet;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
public class ListConsumerGroupTest extends ConsumerGroupCommandTest {
- @ParameterizedTest
- @ValueSource(strings = {"zk", "kraft"})
- public void testListConsumerGroups(String quorum) throws Exception {
+ @ParameterizedTest(name =
TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES)
+ @MethodSource("getTestQuorumAndGroupProtocolParametersAll")
+ public void testListConsumerGroupsWithoutFilters(String quorum, String
groupProtocol) throws Exception {
String simpleGroup = "simple-group";
+
+ createOffsetsTopic(listenerName(), new Properties());
+
addSimpleGroupExecutor(simpleGroup);
addConsumerGroupExecutor(1);
+ addConsumerGroupExecutor(1, PROTOCOL_GROUP, groupProtocol);
String[] cgcArgs = new String[]{"--bootstrap-server",
bootstrapServers(listenerName()), "--list"};
ConsumerGroupCommand.ConsumerGroupService service =
getConsumerGroupService(cgcArgs);
- scala.collection.Set<String> expectedGroups = set(Arrays.asList(GROUP,
simpleGroup));
+
+ scala.collection.Set<String> expectedGroups = set(Arrays.asList(GROUP,
simpleGroup, PROTOCOL_GROUP));
final AtomicReference<scala.collection.Set> foundGroups = new
AtomicReference<>();
+
TestUtils.waitForCondition(() -> {
foundGroups.set(service.listConsumerGroups().toSet());
return Objects.equals(expectedGroups, foundGroups.get());
}, "Expected --list to show groups " + expectedGroups + ", but found "
+ foundGroups.get() + ".");
}
- @ParameterizedTest
- @ValueSource(strings = {"zk", "kraft"})
+ @Test
public void testListWithUnrecognizedNewConsumerOption() {
String[] cgcArgs = new String[]{"--new-consumer",
"--bootstrap-server", bootstrapServers(listenerName()), "--list"};
assertThrows(OptionException.class, () ->
getConsumerGroupService(cgcArgs));
}
- @ParameterizedTest
- @ValueSource(strings = {"zk", "kraft"})
- public void testListConsumerGroupsWithStates() throws Exception {
+ @ParameterizedTest(name =
TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES)
+ @MethodSource("getTestQuorumAndGroupProtocolParametersAll")
+ public void testListConsumerGroupsWithStates(String quorum, String
groupProtocol) throws Exception {
String simpleGroup = "simple-group";
+
+ createOffsetsTopic(listenerName(), new Properties());
+
addSimpleGroupExecutor(simpleGroup);
- addConsumerGroupExecutor(1);
+ addConsumerGroupExecutor(1, groupProtocol);
String[] cgcArgs = new String[]{"--bootstrap-server",
bootstrapServers(listenerName()), "--list", "--state"};
ConsumerGroupCommand.ConsumerGroupService service =
getConsumerGroupService(cgcArgs);
- scala.collection.Set<ConsumerGroupListing> expectedListing =
set(Arrays.asList(
- new ConsumerGroupListing(simpleGroup, true,
Optional.of(ConsumerGroupState.EMPTY)),
- new ConsumerGroupListing(GROUP, false,
Optional.of(ConsumerGroupState.STABLE))));
+ Set<ConsumerGroupListing> expectedListing = new
HashSet<>(Arrays.asList(
Review Comment:
nit: `new HashSet<>(Arrays.asList(` -> `mkSet`?
##########
tools/src/test/java/org/apache/kafka/tools/consumer/group/ListConsumerGroupTest.java:
##########
@@ -20,83 +20,258 @@
import kafka.admin.ConsumerGroupCommand;
import org.apache.kafka.clients.admin.ConsumerGroupListing;
import org.apache.kafka.common.ConsumerGroupState;
+import org.apache.kafka.common.GroupType;
import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
+import org.junit.jupiter.params.provider.MethodSource;
import java.util.Arrays;
import java.util.Collections;
+import java.util.EnumSet;
import java.util.HashSet;
-import java.util.List;
import java.util.Objects;
import java.util.Optional;
+import java.util.Properties;
+import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
+import static
org.apache.kafka.tools.ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES;
import static org.apache.kafka.common.utils.Utils.mkSet;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
public class ListConsumerGroupTest extends ConsumerGroupCommandTest {
- @ParameterizedTest
- @ValueSource(strings = {"zk", "kraft"})
- public void testListConsumerGroups(String quorum) throws Exception {
+ @ParameterizedTest(name =
TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES)
+ @MethodSource("getTestQuorumAndGroupProtocolParametersAll")
+ public void testListConsumerGroupsWithoutFilters(String quorum, String
groupProtocol) throws Exception {
String simpleGroup = "simple-group";
+
+ createOffsetsTopic(listenerName(), new Properties());
+
addSimpleGroupExecutor(simpleGroup);
addConsumerGroupExecutor(1);
+ addConsumerGroupExecutor(1, PROTOCOL_GROUP, groupProtocol);
String[] cgcArgs = new String[]{"--bootstrap-server",
bootstrapServers(listenerName()), "--list"};
ConsumerGroupCommand.ConsumerGroupService service =
getConsumerGroupService(cgcArgs);
- scala.collection.Set<String> expectedGroups = set(Arrays.asList(GROUP,
simpleGroup));
+
+ scala.collection.Set<String> expectedGroups = set(Arrays.asList(GROUP,
simpleGroup, PROTOCOL_GROUP));
final AtomicReference<scala.collection.Set> foundGroups = new
AtomicReference<>();
+
TestUtils.waitForCondition(() -> {
foundGroups.set(service.listConsumerGroups().toSet());
return Objects.equals(expectedGroups, foundGroups.get());
}, "Expected --list to show groups " + expectedGroups + ", but found "
+ foundGroups.get() + ".");
}
- @ParameterizedTest
- @ValueSource(strings = {"zk", "kraft"})
+ @Test
public void testListWithUnrecognizedNewConsumerOption() {
String[] cgcArgs = new String[]{"--new-consumer",
"--bootstrap-server", bootstrapServers(listenerName()), "--list"};
assertThrows(OptionException.class, () ->
getConsumerGroupService(cgcArgs));
}
- @ParameterizedTest
- @ValueSource(strings = {"zk", "kraft"})
- public void testListConsumerGroupsWithStates() throws Exception {
+ @ParameterizedTest(name =
TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES)
+ @MethodSource("getTestQuorumAndGroupProtocolParametersAll")
+ public void testListConsumerGroupsWithStates(String quorum, String
groupProtocol) throws Exception {
String simpleGroup = "simple-group";
+
+ createOffsetsTopic(listenerName(), new Properties());
+
addSimpleGroupExecutor(simpleGroup);
- addConsumerGroupExecutor(1);
+ addConsumerGroupExecutor(1, groupProtocol);
String[] cgcArgs = new String[]{"--bootstrap-server",
bootstrapServers(listenerName()), "--list", "--state"};
ConsumerGroupCommand.ConsumerGroupService service =
getConsumerGroupService(cgcArgs);
- scala.collection.Set<ConsumerGroupListing> expectedListing =
set(Arrays.asList(
- new ConsumerGroupListing(simpleGroup, true,
Optional.of(ConsumerGroupState.EMPTY)),
- new ConsumerGroupListing(GROUP, false,
Optional.of(ConsumerGroupState.STABLE))));
+ Set<ConsumerGroupListing> expectedListing = new
HashSet<>(Arrays.asList(
+ new ConsumerGroupListing(
+ simpleGroup,
+ true,
+ Optional.of(ConsumerGroupState.EMPTY),
+ Optional.of(GroupType.CLASSIC)
+ ),
+ new ConsumerGroupListing(
+ GROUP,
+ false,
+ Optional.of(ConsumerGroupState.STABLE),
+ Optional.of(GroupType.parse(groupProtocol))
+ )
+ ));
- final AtomicReference<scala.collection.Set> foundListing = new
AtomicReference<>();
- TestUtils.waitForCondition(() -> {
-
foundListing.set(service.listConsumerGroupsWithState(set(Arrays.asList(ConsumerGroupState.values()))).toSet());
- return Objects.equals(expectedListing, foundListing.get());
- }, "Expected to show groups " + expectedListing + ", but found " +
foundListing.get());
+ assertGroupListing(
+ service,
+ Collections.emptySet(),
+ EnumSet.allOf(ConsumerGroupState.class),
+ expectedListing
+ );
- scala.collection.Set<ConsumerGroupListing> expectedListingStable =
set(Collections.singleton(
- new ConsumerGroupListing(GROUP, false,
Optional.of(ConsumerGroupState.STABLE))));
+ expectedListing = new HashSet<>(Collections.singletonList(
+ new ConsumerGroupListing(
+ GROUP,
+ false,
+ Optional.of(ConsumerGroupState.STABLE),
+ Optional.of(GroupType.parse(groupProtocol))
+ )
+ ));
- foundListing.set(null);
+ assertGroupListing(
+ service,
+ Collections.emptySet(),
+ mkSet(ConsumerGroupState.STABLE),
+ expectedListing
+ );
- TestUtils.waitForCondition(() -> {
-
foundListing.set(service.listConsumerGroupsWithState(set(Collections.singleton(ConsumerGroupState.STABLE))).toSet());
- return Objects.equals(expectedListingStable, foundListing.get());
- }, "Expected to show groups " + expectedListingStable + ", but found "
+ foundListing.get());
+ assertGroupListing(
+ service,
+ Collections.emptySet(),
+ mkSet(ConsumerGroupState.PREPARING_REBALANCE),
+ Collections.emptySet()
+ );
+ }
+
+ @ParameterizedTest(name =
TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES)
+
@MethodSource("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly")
+ public void testListConsumerGroupsWithTypesClassicProtocol(String quorum,
String groupProtocol) throws Exception {
+ String simpleGroup = "simple-group";
+
+ createOffsetsTopic(listenerName(), new Properties());
+
+ addSimpleGroupExecutor(simpleGroup);
+ addConsumerGroupExecutor(1);
+
+ String[] cgcArgs = new String[]{"--bootstrap-server",
bootstrapServers(listenerName()), "--list"};
+ ConsumerGroupCommand.ConsumerGroupService service =
getConsumerGroupService(cgcArgs);
+
+ Set<ConsumerGroupListing> expectedListing = new
HashSet<>(Arrays.asList(
+ new ConsumerGroupListing(
+ simpleGroup,
+ true,
+ Optional.of(ConsumerGroupState.EMPTY),
+ Optional.of(GroupType.CLASSIC)
+ ),
+ new ConsumerGroupListing(
+ GROUP,
+ false,
+ Optional.of(ConsumerGroupState.STABLE),
+ Optional.of(GroupType.CLASSIC)
+ )
+ ));
+
+ // No filters explicitly mentioned. Expectation is that all groups are
returned.
+ assertGroupListing(
+ service,
+ Collections.emptySet(),
+ Collections.emptySet(),
+ expectedListing
+ );
+
+ // When group type is mentioned:
+ // Old Group Coordinator returns empty listings if the type is not
Classic.
+ // New Group Coordinator returns groups according to the filter.
+ assertGroupListing(
+ service,
+ mkSet(GroupType.CONSUMER),
+ Collections.emptySet(),
+ Collections.emptySet()
+ );
+
+ assertGroupListing(
+ service,
+ mkSet(GroupType.CLASSIC),
+ Collections.emptySet(),
+ expectedListing
+ );
}
- @ParameterizedTest
- @ValueSource(strings = {"zk", "kraft"})
- public void testConsumerGroupStatesFromString(String quorum) {
+ @ParameterizedTest(name =
TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES)
+
@MethodSource("getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly")
+ public void testListConsumerGroupsWithTypesConsumerProtocol(String quorum,
String groupProtocol) throws Exception {
+ String simpleGroup = "simple-group";
+
+ createOffsetsTopic(listenerName(), new Properties());
+
+ addSimpleGroupExecutor(simpleGroup);
+ addConsumerGroupExecutor(1);
+ addConsumerGroupExecutor(1, PROTOCOL_GROUP, groupProtocol);
+
+ String[] cgcArgs = new String[]{"--bootstrap-server",
bootstrapServers(listenerName()), "--list"};
+ ConsumerGroupCommand.ConsumerGroupService service =
getConsumerGroupService(cgcArgs);
+
+ // No filters explicitly mentioned. Expectation is that all groups are
returned.
+ Set<ConsumerGroupListing> expectedListing = new
HashSet<>(Arrays.asList(
+ new ConsumerGroupListing(
+ simpleGroup,
+ true,
+ Optional.of(ConsumerGroupState.EMPTY),
+ Optional.of(GroupType.CLASSIC)
+ ),
+ new ConsumerGroupListing(
+ GROUP,
+ false,
+ Optional.of(ConsumerGroupState.STABLE),
+ Optional.of(GroupType.CLASSIC)
+ ),
+ new ConsumerGroupListing(
+ PROTOCOL_GROUP,
+ false,
+ Optional.of(ConsumerGroupState.STABLE),
+ Optional.of(GroupType.CONSUMER)
+ )
+ ));
+
+ assertGroupListing(
+ service,
+ Collections.emptySet(),
+ Collections.emptySet(),
+ expectedListing
+ );
+
+ // When group type is mentioned:
+ // New Group Coordinator returns groups according to the filter.
+ expectedListing = new HashSet<>(Collections.singletonList(
+ new ConsumerGroupListing(
+ PROTOCOL_GROUP,
+ false,
+ Optional.of(ConsumerGroupState.STABLE),
+ Optional.of(GroupType.CONSUMER)
+ )
+ ));
+
+ assertGroupListing(
+ service,
+ mkSet(GroupType.CONSUMER),
+ Collections.emptySet(),
+ expectedListing
+ );
+
+ expectedListing = new HashSet<>(Arrays.asList(
Review Comment:
ditto.
##########
tools/src/test/java/org/apache/kafka/tools/consumer/group/ListConsumerGroupTest.java:
##########
@@ -20,83 +20,258 @@
import kafka.admin.ConsumerGroupCommand;
import org.apache.kafka.clients.admin.ConsumerGroupListing;
import org.apache.kafka.common.ConsumerGroupState;
+import org.apache.kafka.common.GroupType;
import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
+import org.junit.jupiter.params.provider.MethodSource;
import java.util.Arrays;
import java.util.Collections;
+import java.util.EnumSet;
import java.util.HashSet;
-import java.util.List;
import java.util.Objects;
import java.util.Optional;
+import java.util.Properties;
+import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
+import static
org.apache.kafka.tools.ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES;
import static org.apache.kafka.common.utils.Utils.mkSet;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
public class ListConsumerGroupTest extends ConsumerGroupCommandTest {
- @ParameterizedTest
- @ValueSource(strings = {"zk", "kraft"})
- public void testListConsumerGroups(String quorum) throws Exception {
+ @ParameterizedTest(name =
TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES)
+ @MethodSource("getTestQuorumAndGroupProtocolParametersAll")
+ public void testListConsumerGroupsWithoutFilters(String quorum, String
groupProtocol) throws Exception {
String simpleGroup = "simple-group";
+
+ createOffsetsTopic(listenerName(), new Properties());
+
addSimpleGroupExecutor(simpleGroup);
addConsumerGroupExecutor(1);
+ addConsumerGroupExecutor(1, PROTOCOL_GROUP, groupProtocol);
String[] cgcArgs = new String[]{"--bootstrap-server",
bootstrapServers(listenerName()), "--list"};
ConsumerGroupCommand.ConsumerGroupService service =
getConsumerGroupService(cgcArgs);
- scala.collection.Set<String> expectedGroups = set(Arrays.asList(GROUP,
simpleGroup));
+
+ scala.collection.Set<String> expectedGroups = set(Arrays.asList(GROUP,
simpleGroup, PROTOCOL_GROUP));
final AtomicReference<scala.collection.Set> foundGroups = new
AtomicReference<>();
+
TestUtils.waitForCondition(() -> {
foundGroups.set(service.listConsumerGroups().toSet());
return Objects.equals(expectedGroups, foundGroups.get());
}, "Expected --list to show groups " + expectedGroups + ", but found "
+ foundGroups.get() + ".");
}
- @ParameterizedTest
- @ValueSource(strings = {"zk", "kraft"})
+ @Test
public void testListWithUnrecognizedNewConsumerOption() {
String[] cgcArgs = new String[]{"--new-consumer",
"--bootstrap-server", bootstrapServers(listenerName()), "--list"};
assertThrows(OptionException.class, () ->
getConsumerGroupService(cgcArgs));
}
- @ParameterizedTest
- @ValueSource(strings = {"zk", "kraft"})
- public void testListConsumerGroupsWithStates() throws Exception {
+ @ParameterizedTest(name =
TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES)
+ @MethodSource("getTestQuorumAndGroupProtocolParametersAll")
+ public void testListConsumerGroupsWithStates(String quorum, String
groupProtocol) throws Exception {
String simpleGroup = "simple-group";
+
+ createOffsetsTopic(listenerName(), new Properties());
+
addSimpleGroupExecutor(simpleGroup);
- addConsumerGroupExecutor(1);
+ addConsumerGroupExecutor(1, groupProtocol);
String[] cgcArgs = new String[]{"--bootstrap-server",
bootstrapServers(listenerName()), "--list", "--state"};
ConsumerGroupCommand.ConsumerGroupService service =
getConsumerGroupService(cgcArgs);
- scala.collection.Set<ConsumerGroupListing> expectedListing =
set(Arrays.asList(
- new ConsumerGroupListing(simpleGroup, true,
Optional.of(ConsumerGroupState.EMPTY)),
- new ConsumerGroupListing(GROUP, false,
Optional.of(ConsumerGroupState.STABLE))));
+ Set<ConsumerGroupListing> expectedListing = new
HashSet<>(Arrays.asList(
+ new ConsumerGroupListing(
+ simpleGroup,
+ true,
+ Optional.of(ConsumerGroupState.EMPTY),
+ Optional.of(GroupType.CLASSIC)
+ ),
+ new ConsumerGroupListing(
+ GROUP,
+ false,
+ Optional.of(ConsumerGroupState.STABLE),
+ Optional.of(GroupType.parse(groupProtocol))
+ )
+ ));
- final AtomicReference<scala.collection.Set> foundListing = new
AtomicReference<>();
- TestUtils.waitForCondition(() -> {
-
foundListing.set(service.listConsumerGroupsWithState(set(Arrays.asList(ConsumerGroupState.values()))).toSet());
- return Objects.equals(expectedListing, foundListing.get());
- }, "Expected to show groups " + expectedListing + ", but found " +
foundListing.get());
+ assertGroupListing(
+ service,
+ Collections.emptySet(),
+ EnumSet.allOf(ConsumerGroupState.class),
+ expectedListing
+ );
- scala.collection.Set<ConsumerGroupListing> expectedListingStable =
set(Collections.singleton(
- new ConsumerGroupListing(GROUP, false,
Optional.of(ConsumerGroupState.STABLE))));
+ expectedListing = new HashSet<>(Collections.singletonList(
Review Comment:
ditto.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]