chia7712 commented on code in PR #15779:
URL: https://github.com/apache/kafka/pull/15779#discussion_r1606053454
##########
tools/src/test/java/org/apache/kafka/tools/consumer/group/ResetConsumerGroupOffsetTest.java:
##########
@@ -62,506 +86,764 @@
* - scope=topics+partitions, scenario=to-earliest
* - export/import
*/
-public class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {
- private String[] basicArgs() {
+@ExtendWith(value = ClusterTestExtensions.class)
+public class ResetConsumerGroupOffsetTest {
+
+ private static final String TOPIC_PREFIX = "foo-";
+ private static final String GROUP_PREFIX = "test.group-";
+
+ private static void generator(ClusterGenerator clusterGenerator) {
+ ConsumerGroupCommandTestUtils.generator(clusterGenerator);
+ }
+
+ private String[] basicArgs(ClusterInstance cluster) {
return new String[]{"--reset-offsets",
- "--bootstrap-server", bootstrapServers(listenerName()),
+ "--bootstrap-server", cluster.bootstrapServers(),
"--timeout", Long.toString(DEFAULT_MAX_WAIT_MS)};
}
- private String[] buildArgsForGroups(List<String> groups, String...args) {
- List<String> res = new ArrayList<>(Arrays.asList(basicArgs()));
+ private String[] buildArgsForGroups(ClusterInstance cluster, List<String>
groups, String... args) {
+ List<String> res = new ArrayList<>(asList(basicArgs(cluster)));
for (String group : groups) {
res.add("--group");
res.add(group);
}
- res.addAll(Arrays.asList(args));
+ res.addAll(asList(args));
return res.toArray(new String[0]);
}
- private String[] buildArgsForGroup(String group, String...args) {
- return buildArgsForGroups(Collections.singletonList(group), args);
+ private String[] buildArgsForGroup(ClusterInstance cluster, String group,
String... args) {
+ return buildArgsForGroups(cluster, singletonList(group), args);
}
- private String[] buildArgsForAllGroups(String...args) {
- List<String> res = new ArrayList<>(Arrays.asList(basicArgs()));
+ private String[] buildArgsForAllGroups(ClusterInstance cluster, String...
args) {
+ List<String> res = new ArrayList<>(asList(basicArgs(cluster)));
res.add("--all-groups");
- res.addAll(Arrays.asList(args));
+ res.addAll(asList(args));
return res.toArray(new String[0]);
}
- @Test
- public void testResetOffsetsNotExistingGroup() throws Exception {
+ @ClusterTemplate("generator")
+ public void testResetOffsetsNotExistingGroup(ClusterInstance cluster)
throws Exception {
+ String topic = generateRandomTopic();
String group = "missing.group";
- String[] args = buildArgsForGroup(group, "--all-topics",
"--to-current", "--execute");
- ConsumerGroupCommand.ConsumerGroupService consumerGroupCommand =
getConsumerGroupService(args);
- // Make sure we got a coordinator
- TestUtils.waitForCondition(
- () ->
Objects.equals(consumerGroupCommand.collectGroupState(group).coordinator.host(),
"localhost"),
- "Can't find a coordinator");
- Map<TopicPartition, OffsetAndMetadata> resetOffsets =
consumerGroupCommand.resetOffsets().get(group);
- assertTrue(resetOffsets.isEmpty());
- assertTrue(committedOffsets(TOPIC, group).isEmpty());
- }
-
- @Test
- public void testResetOffsetsExistingTopic() {
- String group = "new.group";
- String[] args = buildArgsForGroup(group, "--topic", TOPIC,
"--to-offset", "50");
- produceMessages(TOPIC, 100);
- resetAndAssertOffsets(args, 50, true,
Collections.singletonList(TOPIC));
- resetAndAssertOffsets(addTo(args, "--dry-run"), 50, true,
Collections.singletonList(TOPIC));
- resetAndAssertOffsets(addTo(args, "--execute"), 50, false,
Collections.singletonList(TOPIC));
+ String[] args = buildArgsForGroup(cluster, group, "--all-topics",
"--to-current", "--execute");
+
+ try (ConsumerGroupCommand.ConsumerGroupService service =
getConsumerGroupService(args)) {
+ // Make sure we got a coordinator
+ TestUtils.waitForCondition(
+ () ->
"localhost".equals(service.collectGroupState(group).coordinator.host()),
+ "Can't find a coordinator");
+ Map<TopicPartition, OffsetAndMetadata> resetOffsets =
service.resetOffsets().get(group);
+ assertTrue(resetOffsets.isEmpty());
+ assertTrue(committedOffsets(cluster, topic, group).isEmpty());
+ }
}
- @Test
- public void testResetOffsetsExistingTopicSelectedGroups() throws Exception
{
- produceMessages(TOPIC, 100);
- List<String> groups = IntStream.rangeClosed(1, 3).mapToObj(id -> GROUP
+ id).collect(Collectors.toList());
- for (String group : groups) {
- ConsumerGroupExecutor executor = addConsumerGroupExecutor(1,
TOPIC, group, GroupProtocol.CLASSIC.name);
- awaitConsumerProgress(TOPIC, group, 100L);
- executor.shutdown();
- }
- String[] args = buildArgsForGroups(groups, "--topic", TOPIC,
"--to-offset", "50");
- resetAndAssertOffsets(args, 50, true,
Collections.singletonList(TOPIC));
- resetAndAssertOffsets(addTo(args, "--dry-run"), 50, true,
Collections.singletonList(TOPIC));
- resetAndAssertOffsets(addTo(args, "--execute"), 50, false,
Collections.singletonList(TOPIC));
- }
-
- @Test
- public void testResetOffsetsExistingTopicAllGroups() throws Exception {
- String[] args = buildArgsForAllGroups("--topic", TOPIC, "--to-offset",
"50");
- produceMessages(TOPIC, 100);
- for (int i = 1; i <= 3; i++) {
- String group = GROUP + i;
- ConsumerGroupExecutor executor = addConsumerGroupExecutor(1,
TOPIC, group, GroupProtocol.CLASSIC.name);
- awaitConsumerProgress(TOPIC, group, 100L);
- executor.shutdown();
- }
- resetAndAssertOffsets(args, 50, true,
Collections.singletonList(TOPIC));
- resetAndAssertOffsets(addTo(args, "--dry-run"), 50, true,
Collections.singletonList(TOPIC));
- resetAndAssertOffsets(addTo(args, "--execute"), 50, false,
Collections.singletonList(TOPIC));
+ @ClusterTemplate("generator")
+ public void testResetOffsetsExistingTopic(ClusterInstance cluster) {
+ String topic = generateRandomTopic();
+ String group = "new.group";
+ String[] args = buildArgsForGroup(cluster, group, "--topic", topic,
"--to-offset", "50");
+
+ produceMessages(cluster, topic, 100);
+ resetAndAssertOffsets(cluster, args, 50, true, singletonList(topic));
+ resetAndAssertOffsets(cluster, addTo(args, "--dry-run"),
+ 50, true, singletonList(topic));
+ resetAndAssertOffsets(cluster, addTo(args, "--execute"),
+ 50, false, singletonList(topic));
}
- @Test
- public void testResetOffsetsAllTopicsAllGroups() throws Exception {
- String[] args = buildArgsForAllGroups("--all-topics", "--to-offset",
"50");
- List<String> topics = IntStream.rangeClosed(1, 3).mapToObj(i -> TOPIC
+ i).collect(Collectors.toList());
- List<String> groups = IntStream.rangeClosed(1, 3).mapToObj(i -> GROUP
+ i).collect(Collectors.toList());
- topics.forEach(topic -> produceMessages(topic, 100));
+ @ClusterTemplate("generator")
+ public void testResetOffsetsExistingTopicSelectedGroups(ClusterInstance
cluster) throws Exception {
+ for (GroupProtocol groupProtocol : cluster.supportedGroupProtocols()) {
+ String topic = generateRandomTopic();
- for (String topic : topics) {
+ produceMessages(cluster, topic, 100);
+ List<String> groups = generateIds(topic);
for (String group : groups) {
- ConsumerGroupExecutor executor = addConsumerGroupExecutor(3,
topic, group, GroupProtocol.CLASSIC.name);
- awaitConsumerProgress(topic, group, 100);
- executor.shutdown();
+ try (AutoCloseable consumerGroupCloseable =
+ consumerGroupClosable(cluster, 1, topic, group,
groupProtocol)) {
+ awaitConsumerProgress(cluster, topic, group, 100L);
+ }
}
+
+ String[] args = buildArgsForGroups(cluster, groups, "--topic",
topic, "--to-offset", "50");
+ resetAndAssertOffsets(cluster, args, 50, true,
singletonList(topic));
+ resetAndAssertOffsets(cluster, addTo(args, "--dry-run"),
+ 50, true, singletonList(topic));
+ resetAndAssertOffsets(cluster, addTo(args, "--execute"),
+ 50, false, singletonList(topic));
}
- resetAndAssertOffsets(args, 50, true, topics);
- resetAndAssertOffsets(addTo(args, "--dry-run"), 50, true, topics);
- resetAndAssertOffsets(addTo(args, "--execute"), 50, false, topics);
}
- @Test
- public void testResetOffsetsToLocalDateTime() throws Exception {
- SimpleDateFormat format = new
SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS");
- Calendar calendar = Calendar.getInstance();
- calendar.add(Calendar.DATE, -1);
+ @ClusterTemplate("generator")
+ public void testResetOffsetsExistingTopicAllGroups(ClusterInstance
cluster) throws Exception {
+ for (GroupProtocol groupProtocol : cluster.supportedGroupProtocols()) {
+ String topic = generateRandomTopic();
+ String[] args = buildArgsForAllGroups(cluster, "--topic", topic,
"--to-offset", "50");
+
+ produceMessages(cluster, topic, 100);
+ for (int i = 1; i <= 3; i++) {
+ String group = generateRandomGroupId();
+ try (AutoCloseable consumerGroupCloseable =
+ consumerGroupClosable(cluster, 1, topic, group,
groupProtocol)) {
+ awaitConsumerProgress(cluster, topic, group, 100L);
+ }
+ }
+ resetAndAssertOffsets(cluster, args, 50, true,
singletonList(topic));
+ resetAndAssertOffsets(cluster, addTo(args, "--dry-run"),
+ 50, true, singletonList(topic));
+ resetAndAssertOffsets(cluster, addTo(args, "--execute"),
+ 50, false, singletonList(topic));
+ }
+ }
- produceMessages(TOPIC, 100);
+ @ClusterTemplate("generator")
+ public void testResetOffsetsAllTopicsAllGroups(ClusterInstance cluster)
throws Exception {
+ for (GroupProtocol groupProtocol : cluster.supportedGroupProtocols()) {
+ String groupId = generateRandomGroupId();
+ String topicId = generateRandomTopic();
+
+ String[] args = buildArgsForAllGroups(cluster, "--all-topics",
"--to-offset", "50");
+ List<String> topics = generateIds(groupId);
+ List<String> groups = generateIds(topicId);
+ topics.forEach(topic -> produceMessages(cluster, topic, 100));
+
+ for (String topic : topics) {
+ for (String group : groups) {
+ try (AutoCloseable consumerGroupCloseable =
+ consumerGroupClosable(cluster, 3, topic,
group, groupProtocol)) {
+ awaitConsumerProgress(cluster, topic, group, 100);
+ }
+ }
+ }
- ConsumerGroupExecutor executor = addConsumerGroupExecutor(1, TOPIC,
GROUP, GroupProtocol.CLASSIC.name);
- awaitConsumerProgress(TOPIC, GROUP, 100L);
- executor.shutdown();
+ resetAndAssertOffsets(cluster, args, 50, true, topics);
+ resetAndAssertOffsets(cluster, addTo(args, "--dry-run"),
+ 50, true, topics);
+ resetAndAssertOffsets(cluster, addTo(args, "--execute"),
+ 50, false, topics);
- String[] args = buildArgsForGroup(GROUP, "--all-topics",
"--to-datetime", format.format(calendar.getTime()), "--execute");
- resetAndAssertOffsets(args, 0);
+ try (Admin admin = cluster.createAdminClient()) {
+ admin.deleteConsumerGroups(groups).all().get();
+ }
+ }
}
- @Test
- public void testResetOffsetsToZonedDateTime() throws Exception {
- SimpleDateFormat format = new
SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX");
+ @ClusterTemplate("generator")
+ public void testResetOffsetsToLocalDateTime(ClusterInstance cluster)
throws Exception {
+ for (GroupProtocol groupProtocol : cluster.supportedGroupProtocols()) {
+ String group = generateRandomGroupId();
+ String topic = generateRandomTopic();
- produceMessages(TOPIC, 50);
- Date checkpoint = new Date();
- produceMessages(TOPIC, 50);
+ DateTimeFormatter format =
DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS");
+ LocalDateTime dateTime = now().minusDays(1);
+ String[] args = buildArgsForGroup(cluster, group,
+ "--all-topics", "--to-datetime",
+ format.format(dateTime), "--execute");
- ConsumerGroupExecutor executor = addConsumerGroupExecutor(1, TOPIC,
GROUP, GroupProtocol.CLASSIC.name);
- awaitConsumerProgress(TOPIC, GROUP, 100L);
- executor.shutdown();
+ produceMessages(cluster, topic, 100);
- String[] args = buildArgsForGroup(GROUP, "--all-topics",
"--to-datetime", format.format(checkpoint), "--execute");
- resetAndAssertOffsets(args, 50);
- }
+ try (AutoCloseable consumerGroupCloseable =
+ consumerGroupClosable(cluster, 1, topic, group,
groupProtocol)) {
+ awaitConsumerProgress(cluster, topic, group, 100L);
+ }
- @Test
- public void testResetOffsetsByDuration() throws Exception {
- String[] args = buildArgsForGroup(GROUP, "--all-topics",
"--by-duration", "PT1M", "--execute");
- produceConsumeAndShutdown(TOPIC, GROUP, 100, 1);
- resetAndAssertOffsets(args, 0);
+ resetAndAssertOffsets(cluster, topic, args, 0);
+ }
}
- @Test
- public void testResetOffsetsByDurationToEarliest() throws Exception {
- String[] args = buildArgsForGroup(GROUP, "--all-topics",
"--by-duration", "PT0.1S", "--execute");
- produceConsumeAndShutdown(TOPIC, GROUP, 100, 1);
- resetAndAssertOffsets(args, 100);
- }
+ @ClusterTemplate("generator")
+ public void testResetOffsetsToZonedDateTime(ClusterInstance cluster)
throws Exception {
+ for (GroupProtocol groupProtocol : cluster.supportedGroupProtocols()) {
+ String group = generateRandomGroupId();
+ String topic = generateRandomTopic();
+ DateTimeFormatter format =
DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSXXX");
- @Test
- public void testResetOffsetsByDurationFallbackToLatestWhenNoRecords() {
- String topic = "foo2";
- String[] args = buildArgsForGroup(GROUP, "--topic", topic,
"--by-duration", "PT1M", "--execute");
- createTopic(topic, 1, 1, new Properties(), listenerName(), new
Properties());
- resetAndAssertOffsets(args, 0, false,
Collections.singletonList("foo2"));
+ produceMessages(cluster, topic, 50);
+ ZonedDateTime checkpoint = now().atZone(ZoneId.systemDefault());
+ produceMessages(cluster, topic, 50);
- adminZkClient().deleteTopic(topic);
- }
+ String[] args = buildArgsForGroup(cluster, group,
+ "--all-topics", "--to-datetime", format.format(checkpoint),
+ "--execute");
- @Test
- public void testResetOffsetsToEarliest() throws Exception {
- String[] args = buildArgsForGroup(GROUP, "--all-topics",
"--to-earliest", "--execute");
- produceConsumeAndShutdown(TOPIC, GROUP, 100, 1);
- resetAndAssertOffsets(args, 0);
- }
+ try (AutoCloseable consumerGroupCloseable =
+ consumerGroupClosable(cluster, 1, topic, group,
groupProtocol)) {
+ awaitConsumerProgress(cluster, topic, group, 100L);
+ }
- @Test
- public void testResetOffsetsToLatest() throws Exception {
- String[] args = buildArgsForGroup(GROUP, "--all-topics",
"--to-latest", "--execute");
- produceConsumeAndShutdown(TOPIC, GROUP, 100, 1);
- produceMessages(TOPIC, 100);
- resetAndAssertOffsets(args, 200);
+ resetAndAssertOffsets(cluster, topic, args, 50);
+ }
}
- @Test
- public void testResetOffsetsToCurrentOffset() throws Exception {
- String[] args = buildArgsForGroup(GROUP, "--all-topics",
"--to-current", "--execute");
- produceConsumeAndShutdown(TOPIC, GROUP, 100, 1);
- produceMessages(TOPIC, 100);
- resetAndAssertOffsets(args, 100);
- }
+ @ClusterTemplate("generator")
+ public void testResetOffsetsByDuration(ClusterInstance cluster) throws
Exception {
+ for (GroupProtocol groupProtocol : cluster.supportedGroupProtocols()) {
+ String group = generateRandomGroupId();
+ String topic = generateRandomTopic();
- @Test
- public void testResetOffsetsToSpecificOffset() throws Exception {
- String[] args = buildArgsForGroup(GROUP, "--all-topics",
"--to-offset", "1", "--execute");
- produceConsumeAndShutdown(TOPIC, GROUP, 100, 1);
- resetAndAssertOffsets(args, 1);
+ String[] args = buildArgsForGroup(cluster, group, "--all-topics",
"--by-duration", "PT1M", "--execute");
+ produceConsumeAndShutdown(cluster, topic, group, 1, groupProtocol);
+ resetAndAssertOffsets(cluster, topic, args, 0);
+ }
}
- @Test
- public void testResetOffsetsShiftPlus() throws Exception {
- String[] args = buildArgsForGroup(GROUP, "--all-topics", "--shift-by",
"50", "--execute");
- produceConsumeAndShutdown(TOPIC, GROUP, 100, 1);
- produceMessages(TOPIC, 100);
- resetAndAssertOffsets(args, 150);
- }
+ @ClusterTemplate("generator")
+ public void testResetOffsetsByDurationToEarliest(ClusterInstance cluster)
throws Exception {
+ for (GroupProtocol groupProtocol : cluster.supportedGroupProtocols()) {
+ String group = generateRandomGroupId();
+ String topic = generateRandomTopic();
- @Test
- public void testResetOffsetsShiftMinus() throws Exception {
- String[] args = buildArgsForGroup(GROUP, "--all-topics", "--shift-by",
"-50", "--execute");
- produceConsumeAndShutdown(TOPIC, GROUP, 100, 1);
- produceMessages(TOPIC, 100);
- resetAndAssertOffsets(args, 50);
+ String[] args = buildArgsForGroup(cluster, group, "--all-topics",
"--by-duration", "PT0.1S", "--execute");
+ produceConsumeAndShutdown(cluster, topic, group, 1, groupProtocol);
+ resetAndAssertOffsets(cluster, topic, args, 100);
+ }
}
- @Test
- public void testResetOffsetsShiftByLowerThanEarliest() throws Exception {
- String[] args = buildArgsForGroup(GROUP, "--all-topics", "--shift-by",
"-150", "--execute");
- produceConsumeAndShutdown(TOPIC, GROUP, 100, 1);
- produceMessages(TOPIC, 100);
- resetAndAssertOffsets(args, 0);
- }
+ @ClusterTemplate("generator")
+ public void
testResetOffsetsByDurationFallbackToLatestWhenNoRecords(ClusterInstance
cluster) throws ExecutionException, InterruptedException {
+ String group = generateRandomGroupId();
+ String topic = generateRandomTopic();
+
+ String[] args = buildArgsForGroup(cluster, group, "--topic", topic,
"--by-duration", "PT1M", "--execute");
- @Test
- public void testResetOffsetsShiftByHigherThanLatest() throws Exception {
- String[] args = buildArgsForGroup(GROUP, "--all-topics", "--shift-by",
"150", "--execute");
- produceConsumeAndShutdown(TOPIC, GROUP, 100, 1);
- produceMessages(TOPIC, 100);
- resetAndAssertOffsets(args, 200);
+ try (Admin admin = cluster.createAdminClient()) {
+ admin.createTopics(singleton(new NewTopic(topic, 1, (short)
1))).all().get();
+ resetAndAssertOffsets(cluster, args, 0, false,
singletonList(topic));
+ admin.deleteTopics(singleton(topic)).all().get();
+ }
}
- @Test
- public void testResetOffsetsToEarliestOnOneTopic() throws Exception {
- String[] args = buildArgsForGroup(GROUP, "--topic", TOPIC,
"--to-earliest", "--execute");
- produceConsumeAndShutdown(TOPIC, GROUP, 100, 1);
- resetAndAssertOffsets(args, 0);
+ @ClusterTemplate("generator")
+ public void testResetOffsetsToEarliest(ClusterInstance cluster) throws
Exception {
+ for (GroupProtocol groupProtocol : cluster.supportedGroupProtocols()) {
+ String group = generateRandomGroupId();
+ String topic = generateRandomTopic();
+
+ String[] args = buildArgsForGroup(cluster, group, "--all-topics",
"--to-earliest", "--execute");
+ produceConsumeAndShutdown(cluster, topic, group, 1, groupProtocol);
+ resetAndAssertOffsets(cluster, topic, args, 0);
+ }
}
- @Test
- public void testResetOffsetsToEarliestOnOneTopicAndPartition() throws
Exception {
- String topic = "bar";
- createTopic(topic, 2, 1, new Properties(), listenerName(), new
Properties());
+ @ClusterTemplate("generator")
+ public void testResetOffsetsToLatest(ClusterInstance cluster) throws
Exception {
+ for (GroupProtocol groupProtocol : cluster.supportedGroupProtocols()) {
+ String group = generateRandomGroupId();
+ String topic = generateRandomTopic();
- String[] args = buildArgsForGroup(GROUP, "--topic", topic + ":1",
"--to-earliest", "--execute");
- ConsumerGroupCommand.ConsumerGroupService consumerGroupCommand =
getConsumerGroupService(args);
+ String[] args = buildArgsForGroup(cluster, group, "--all-topics",
"--to-latest", "--execute");
+ produceConsumeAndShutdown(cluster, topic, group, 1, groupProtocol);
+ produceMessages(cluster, topic, 100);
+ resetAndAssertOffsets(cluster, topic, args, 200);
+ }
+ }
- produceConsumeAndShutdown(topic, GROUP, 100, 2);
- Map<TopicPartition, Long> priorCommittedOffsets =
committedOffsets(topic, GROUP);
+ @ClusterTemplate("generator")
+ public void testResetOffsetsToCurrentOffset(ClusterInstance cluster)
throws Exception {
+ for (GroupProtocol groupProtocol : cluster.supportedGroupProtocols()) {
+ String group = generateRandomGroupId();
+ String topic = generateRandomTopic();
- TopicPartition tp0 = new TopicPartition(topic, 0);
- TopicPartition tp1 = new TopicPartition(topic, 1);
- Map<TopicPartition, Long> expectedOffsets = new HashMap<>();
- expectedOffsets.put(tp0, priorCommittedOffsets.get(tp0));
- expectedOffsets.put(tp1, 0L);
- resetAndAssertOffsetsCommitted(consumerGroupCommand, expectedOffsets,
topic);
+ String[] args = buildArgsForGroup(cluster, group, "--all-topics",
"--to-current", "--execute");
+ produceConsumeAndShutdown(cluster, topic, group, 1, groupProtocol);
+ produceMessages(cluster, topic, 100);
+ resetAndAssertOffsets(cluster, topic, args, 100);
+ }
+ }
+
+ @ClusterTemplate("generator")
+ public void testResetOffsetsToSpecificOffset(ClusterInstance cluster)
throws Exception {
+ for (GroupProtocol groupProtocol : cluster.supportedGroupProtocols()) {
+ String group = generateRandomGroupId();
+ String topic = generateRandomTopic();
- adminZkClient().deleteTopic(topic);
+ String[] args = buildArgsForGroup(cluster, group, "--all-topics",
"--to-offset", "1", "--execute");
+ produceConsumeAndShutdown(cluster, topic, group, 1, groupProtocol);
+ resetAndAssertOffsets(cluster, topic, args, 1);
+ }
}
- @Test
- public void testResetOffsetsToEarliestOnTopics() throws Exception {
- String topic1 = "topic1";
- String topic2 = "topic2";
- createTopic(topic1, 1, 1, new Properties(), listenerName(), new
Properties());
- createTopic(topic2, 1, 1, new Properties(), listenerName(), new
Properties());
+ @ClusterTemplate("generator")
+ public void testResetOffsetsShiftPlus(ClusterInstance cluster) throws
Exception {
+ for (GroupProtocol groupProtocol : cluster.supportedGroupProtocols()) {
+ String group = generateRandomGroupId();
+ String topic = generateRandomTopic();
- String[] args = buildArgsForGroup(GROUP, "--topic", topic1, "--topic",
topic2, "--to-earliest", "--execute");
- ConsumerGroupCommand.ConsumerGroupService consumerGroupCommand =
getConsumerGroupService(args);
+ String[] args = buildArgsForGroup(cluster, group, "--all-topics",
"--shift-by", "50", "--execute");
+ produceConsumeAndShutdown(cluster, topic, group, 1, groupProtocol);
+ produceMessages(cluster, topic, 100);
+ resetAndAssertOffsets(cluster, topic, args, 150);
+ }
+ }
- produceConsumeAndShutdown(topic1, GROUP, 100, 1);
- produceConsumeAndShutdown(topic2, GROUP, 100, 1);
+ @ClusterTemplate("generator")
+ public void testResetOffsetsShiftMinus(ClusterInstance cluster) throws
Exception {
+ for (GroupProtocol groupProtocol : cluster.supportedGroupProtocols()) {
+ String group = generateRandomGroupId();
+ String topic = generateRandomTopic();
- TopicPartition tp1 = new TopicPartition(topic1, 0);
- TopicPartition tp2 = new TopicPartition(topic2, 0);
+ String[] args = buildArgsForGroup(cluster, group, "--all-topics",
"--shift-by", "-50", "--execute");
+ produceConsumeAndShutdown(cluster, topic, group, 1, groupProtocol);
+ produceMessages(cluster, topic, 100);
+ resetAndAssertOffsets(cluster, topic, args, 50);
+ }
+ }
- Map<TopicPartition, Long> allResetOffsets =
toOffsetMap(resetOffsets(consumerGroupCommand).get(GROUP));
- Map<TopicPartition, Long> expMap = new HashMap<>();
- expMap.put(tp1, 0L);
- expMap.put(tp2, 0L);
- assertEquals(expMap, allResetOffsets);
- assertEquals(Collections.singletonMap(tp1, 0L),
committedOffsets(topic1, GROUP));
- assertEquals(Collections.singletonMap(tp2, 0L),
committedOffsets(topic2, GROUP));
+ @ClusterTemplate("generator")
+ public void testResetOffsetsShiftByLowerThanEarliest(ClusterInstance
cluster) throws Exception {
+ for (GroupProtocol groupProtocol : cluster.supportedGroupProtocols()) {
+ String group = generateRandomGroupId();
+ String topic = generateRandomTopic();
- adminZkClient().deleteTopic(topic1);
- adminZkClient().deleteTopic(topic2);
+ String[] args = buildArgsForGroup(cluster, group, "--all-topics",
"--shift-by", "-150", "--execute");
+ produceConsumeAndShutdown(cluster, topic, group, 1, groupProtocol);
+ produceMessages(cluster, topic, 100);
+ resetAndAssertOffsets(cluster, topic, args, 0);
+ }
}
- @Test
- public void testResetOffsetsToEarliestOnTopicsAndPartitions() throws
Exception {
- String topic1 = "topic1";
- String topic2 = "topic2";
+ @ClusterTemplate("generator")
+ public void testResetOffsetsShiftByHigherThanLatest(ClusterInstance
cluster) throws Exception {
+ for (GroupProtocol groupProtocol : cluster.supportedGroupProtocols()) {
+ String group = generateRandomGroupId();
+ String topic = generateRandomTopic();
- createTopic(topic1, 2, 1, new Properties(), listenerName(), new
Properties());
- createTopic(topic2, 2, 1, new Properties(), listenerName(), new
Properties());
+ String[] args = buildArgsForGroup(cluster, group, "--all-topics",
"--shift-by", "150", "--execute");
+ produceConsumeAndShutdown(cluster, topic, group, 1, groupProtocol);
+ produceMessages(cluster, topic, 100);
+ resetAndAssertOffsets(cluster, topic, args, 200);
+ }
+ }
- String[] args = buildArgsForGroup(GROUP, "--topic", topic1 + ":1",
"--topic", topic2 + ":1", "--to-earliest", "--execute");
- ConsumerGroupCommand.ConsumerGroupService consumerGroupCommand =
getConsumerGroupService(args);
+ @ClusterTemplate("generator")
+ public void testResetOffsetsToEarliestOnOneTopic(ClusterInstance cluster)
throws Exception {
+ for (GroupProtocol groupProtocol : cluster.supportedGroupProtocols()) {
+ String group = generateRandomGroupId();
+ String topic = generateRandomTopic();
- produceConsumeAndShutdown(topic1, GROUP, 100, 2);
- produceConsumeAndShutdown(topic2, GROUP, 100, 2);
+ String[] args = buildArgsForGroup(cluster, group, "--topic",
topic, "--to-earliest", "--execute");
+ produceConsumeAndShutdown(cluster, topic, group, 1, groupProtocol);
+ resetAndAssertOffsets(cluster, topic, args, 0);
+ }
+ }
- Map<TopicPartition, Long> priorCommittedOffsets1 =
committedOffsets(topic1, GROUP);
- Map<TopicPartition, Long> priorCommittedOffsets2 =
committedOffsets(topic2, GROUP);
+ @ClusterTemplate("generator")
+ public void
testResetOffsetsToEarliestOnOneTopicAndPartition(ClusterInstance cluster)
throws Exception {
+ for (GroupProtocol groupProtocol : cluster.supportedGroupProtocols()) {
+ String group = generateRandomGroupId();
+ String topic = generateRandomTopic();
+ String[] args = buildArgsForGroup(cluster, group, "--topic", topic
+ ":1",
+ "--to-earliest", "--execute");
+
+ try (Admin admin = cluster.createAdminClient();
+ ConsumerGroupCommand.ConsumerGroupService service =
getConsumerGroupService(args)) {
+ admin.createTopics(singleton(new NewTopic(topic, 2, (short)
1))).all().get();
+
+ produceConsumeAndShutdown(cluster, topic, group, 2,
groupProtocol);
+ Map<TopicPartition, Long> priorCommittedOffsets =
committedOffsets(cluster, topic, group);
+ TopicPartition tp0 = new TopicPartition(topic, 0);
+ TopicPartition tp1 = new TopicPartition(topic, 1);
+ Map<TopicPartition, Long> expectedOffsets = new HashMap<>();
+ expectedOffsets.put(tp0, priorCommittedOffsets.get(tp0));
+ expectedOffsets.put(tp1, 0L);
+ resetAndAssertOffsetsCommitted(cluster, service,
expectedOffsets, topic);
+
+ admin.deleteTopics(singleton(topic)).all().get();
+ }
+ }
+ }
- TopicPartition tp1 = new TopicPartition(topic1, 1);
- TopicPartition tp2 = new TopicPartition(topic2, 1);
- Map<TopicPartition, Long> allResetOffsets =
toOffsetMap(resetOffsets(consumerGroupCommand).get(GROUP));
- Map<TopicPartition, Long> expMap = new HashMap<>();
- expMap.put(tp1, 0L);
- expMap.put(tp2, 0L);
- assertEquals(expMap, allResetOffsets);
- priorCommittedOffsets1.put(tp1, 0L);
- assertEquals(priorCommittedOffsets1, committedOffsets(topic1, GROUP));
- priorCommittedOffsets2.put(tp2, 0L);
- assertEquals(priorCommittedOffsets2, committedOffsets(topic2, GROUP));
+ @ClusterTemplate("generator")
+ public void testResetOffsetsToEarliestOnTopics(ClusterInstance cluster)
throws Exception {
+ for (GroupProtocol groupProtocol : cluster.supportedGroupProtocols()) {
+ String group = generateRandomGroupId();
+ String topic1 = generateRandomTopic();
+ String topic2 = generateRandomTopic();
+ String[] args = buildArgsForGroup(cluster, group,
+ "--topic", topic1,
+ "--topic", topic2,
+ "--to-earliest", "--execute");
+
+ try (Admin admin = cluster.createAdminClient();
+ ConsumerGroupCommand.ConsumerGroupService service =
getConsumerGroupService(args)) {
+ admin.createTopics(asList(new NewTopic(topic1, 1, (short) 1),
+ new NewTopic(topic2, 1, (short) 1))).all().get();
+
+ produceConsumeAndShutdown(cluster, topic1, group, 1,
groupProtocol);
+ produceConsumeAndShutdown(cluster, topic2, group, 1,
groupProtocol);
+
+ TopicPartition tp1 = new TopicPartition(topic1, 0);
+ TopicPartition tp2 = new TopicPartition(topic2, 0);
+
+ Map<TopicPartition, Long> allResetOffsets =
toOffsetMap(resetOffsets(service).get(group));
+ Map<TopicPartition, Long> expMap = new HashMap<>();
+ expMap.put(tp1, 0L);
+ expMap.put(tp2, 0L);
+ assertEquals(expMap, allResetOffsets);
+ assertEquals(singletonMap(tp1, 0L), committedOffsets(cluster,
topic1, group));
+ assertEquals(singletonMap(tp2, 0L), committedOffsets(cluster,
topic2, group));
+
+ admin.deleteTopics(asList(topic1, topic2)).all().get();
+ }
+ }
+ }
- adminZkClient().deleteTopic(topic1);
- adminZkClient().deleteTopic(topic2);
+ @ClusterTemplate("generator")
+ public void
testResetOffsetsToEarliestOnTopicsAndPartitions(ClusterInstance cluster) throws
Exception {
+ for (GroupProtocol groupProtocol : cluster.supportedGroupProtocols()) {
+ String group = generateRandomGroupId();
+ String topic1 = generateRandomTopic();
+ String topic2 = generateRandomTopic();
+ String[] args = buildArgsForGroup(cluster, group,
+ "--topic", topic1 + ":1",
+ "--topic", topic2 + ":1",
+ "--to-earliest", "--execute");
+
+ try (Admin admin = cluster.createAdminClient();
+ ConsumerGroupCommand.ConsumerGroupService service =
getConsumerGroupService(args)) {
+ admin.createTopics(asList(new NewTopic(topic1, 2, (short) 1),
+ new NewTopic(topic2, 2, (short) 1))).all().get();
+
+ produceConsumeAndShutdown(cluster, topic1, group, 2,
groupProtocol);
+ produceConsumeAndShutdown(cluster, topic2, group, 2,
groupProtocol);
+
+ Map<TopicPartition, Long> priorCommittedOffsets1 =
+ committedOffsets(cluster, topic1, group);
+ Map<TopicPartition, Long> priorCommittedOffsets2 =
+ committedOffsets(cluster, topic2, group);
+
+ TopicPartition tp1 = new TopicPartition(topic1, 1);
+ TopicPartition tp2 = new TopicPartition(topic2, 1);
+ Map<TopicPartition, Long> allResetOffsets =
toOffsetMap(resetOffsets(service).get(group));
+ Map<TopicPartition, Long> expMap = new HashMap<>();
+ expMap.put(tp1, 0L);
+ expMap.put(tp2, 0L);
+ assertEquals(expMap, allResetOffsets);
+ priorCommittedOffsets1.put(tp1, 0L);
+ assertEquals(priorCommittedOffsets1, committedOffsets(cluster,
topic1, group));
+ priorCommittedOffsets2.put(tp2, 0L);
+ assertEquals(priorCommittedOffsets2, committedOffsets(cluster,
topic2, group));
+
+ admin.deleteTopics(asList(topic1, topic2)).all().get();
+ }
+ }
}
- @Test
- // This one deals with old CSV export/import format for a single --group
arg: "topic,partition,offset" to support old behavior
- public void testResetOffsetsExportImportPlanSingleGroupArg() throws
Exception {
- String topic = "bar";
- TopicPartition tp0 = new TopicPartition(topic, 0);
- TopicPartition tp1 = new TopicPartition(topic, 1);
- createTopic(topic, 2, 1, new Properties(), listenerName(), new
Properties());
+ @ClusterTemplate("generator")
+ // This one deals with old CSV export/import format for a single --group
arg:
+ // "topic,partition,offset" to support old behavior
+ public void testResetOffsetsExportImportPlanSingleGroupArg(ClusterInstance
cluster) throws Exception {
+ for (GroupProtocol groupProtocol : cluster.supportedGroupProtocols()) {
+ String group = generateRandomGroupId();
+ String topic = generateRandomTopic();
+
+ TopicPartition tp0 = new TopicPartition(topic, 0);
+ TopicPartition tp1 = new TopicPartition(topic, 1);
+ String[] cgcArgs = buildArgsForGroup(cluster, group,
"--all-topics", "--to-offset", "2", "--export");
+ File file = TestUtils.tempFile("reset", ".csv");
+ String[] cgcArgsExec = buildArgsForGroup(cluster, group,
"--all-topics",
+ "--from-file", file.getCanonicalPath(), "--dry-run");
- String[] cgcArgs = buildArgsForGroup(GROUP, "--all-topics",
"--to-offset", "2", "--export");
- ConsumerGroupCommand.ConsumerGroupService consumerGroupCommand =
getConsumerGroupService(cgcArgs);
+ try (Admin admin = cluster.createAdminClient();
+ ConsumerGroupCommand.ConsumerGroupService service =
getConsumerGroupService(cgcArgs);
+ BufferedWriter bw = new BufferedWriter(new FileWriter(file));
+ ConsumerGroupCommand.ConsumerGroupService serviceExec =
getConsumerGroupService(cgcArgsExec)) {
- produceConsumeAndShutdown(topic, GROUP, 100, 2);
+ admin.createTopics(singleton(new NewTopic(topic, 2, (short)
1))).all().get();
+ produceConsumeAndShutdown(cluster, topic, group, 2,
groupProtocol);
- File file = TestUtils.tempFile("reset", ".csv");
+ Map<String, Map<TopicPartition, OffsetAndMetadata>>
exportedOffsets = service.resetOffsets();
- Map<String, Map<TopicPartition, OffsetAndMetadata>> exportedOffsets =
consumerGroupCommand.resetOffsets();
- BufferedWriter bw = new BufferedWriter(new FileWriter(file));
- bw.write(consumerGroupCommand.exportOffsetsToCsv(exportedOffsets));
- bw.close();
+ bw.write(service.exportOffsetsToCsv(exportedOffsets));
+ bw.close();
- Map<TopicPartition, Long> exp1 = new HashMap<>();
- exp1.put(tp0, 2L);
- exp1.put(tp1, 2L);
- assertEquals(exp1, toOffsetMap(exportedOffsets.get(GROUP)));
+ Map<TopicPartition, Long> exp1 = new HashMap<>();
+ exp1.put(tp0, 2L);
+ exp1.put(tp1, 2L);
+ assertEquals(exp1, toOffsetMap(exportedOffsets.get(group)));
- String[] cgcArgsExec = buildArgsForGroup(GROUP, "--all-topics",
"--from-file", file.getCanonicalPath(), "--dry-run");
- ConsumerGroupCommand.ConsumerGroupService consumerGroupCommandExec =
getConsumerGroupService(cgcArgsExec);
- Map<String, Map<TopicPartition, OffsetAndMetadata>> importedOffsets =
consumerGroupCommandExec.resetOffsets();
- assertEquals(exp1, toOffsetMap(importedOffsets.get(GROUP)));
+ Map<String, Map<TopicPartition, OffsetAndMetadata>>
importedOffsets = serviceExec.resetOffsets();
+ assertEquals(exp1, toOffsetMap(importedOffsets.get(group)));
- adminZkClient().deleteTopic(topic);
+ admin.deleteTopics(singleton(topic));
+ }
+ }
}
- @Test
+ @ClusterTemplate("generator")
// This one deals with universal CSV export/import file format
"group,topic,partition,offset",
// supporting multiple --group args or --all-groups arg
- public void testResetOffsetsExportImportPlan() throws Exception {
- String group1 = GROUP + "1";
- String group2 = GROUP + "2";
- String topic1 = "bar1";
- String topic2 = "bar2";
- TopicPartition t1p0 = new TopicPartition(topic1, 0);
- TopicPartition t1p1 = new TopicPartition(topic1, 1);
- TopicPartition t2p0 = new TopicPartition(topic2, 0);
- TopicPartition t2p1 = new TopicPartition(topic2, 1);
- createTopic(topic1, 2, 1, new Properties(), listenerName(), new
Properties());
- createTopic(topic2, 2, 1, new Properties(), listenerName(), new
Properties());
-
- String[] cgcArgs = buildArgsForGroups(Arrays.asList(group1, group2),
"--all-topics", "--to-offset", "2", "--export");
- ConsumerGroupCommand.ConsumerGroupService consumerGroupCommand =
getConsumerGroupService(cgcArgs);
-
- produceConsumeAndShutdown(topic1, group1, 100, 1);
- produceConsumeAndShutdown(topic2, group2, 100, 1);
-
- awaitConsumerGroupInactive(consumerGroupCommand, group1);
- awaitConsumerGroupInactive(consumerGroupCommand, group2);
-
- File file = TestUtils.tempFile("reset", ".csv");
-
- Map<String, Map<TopicPartition, OffsetAndMetadata>> exportedOffsets =
consumerGroupCommand.resetOffsets();
- BufferedWriter bw = new BufferedWriter(new FileWriter(file));
- bw.write(consumerGroupCommand.exportOffsetsToCsv(exportedOffsets));
- bw.close();
- Map<TopicPartition, Long> exp1 = new HashMap<>();
- exp1.put(t1p0, 2L);
- exp1.put(t1p1, 2L);
- Map<TopicPartition, Long> exp2 = new HashMap<>();
- exp2.put(t2p0, 2L);
- exp2.put(t2p1, 2L);
-
- assertEquals(exp1, toOffsetMap(exportedOffsets.get(group1)));
- assertEquals(exp2, toOffsetMap(exportedOffsets.get(group2)));
-
- // Multiple --group's offset import
- String[] cgcArgsExec = buildArgsForGroups(Arrays.asList(group1,
group2), "--all-topics", "--from-file", file.getCanonicalPath(), "--dry-run");
- ConsumerGroupCommand.ConsumerGroupService consumerGroupCommandExec =
getConsumerGroupService(cgcArgsExec);
- Map<String, Map<TopicPartition, OffsetAndMetadata>> importedOffsets =
consumerGroupCommandExec.resetOffsets();
- assertEquals(exp1, toOffsetMap(importedOffsets.get(group1)));
- assertEquals(exp2, toOffsetMap(importedOffsets.get(group2)));
-
- // Single --group offset import using "group,topic,partition,offset"
csv format
- String[] cgcArgsExec2 = buildArgsForGroup(group1, "--all-topics",
"--from-file", file.getCanonicalPath(), "--dry-run");
- ConsumerGroupCommand.ConsumerGroupService consumerGroupCommandExec2 =
getConsumerGroupService(cgcArgsExec2);
- Map<String, Map<TopicPartition, OffsetAndMetadata>> importedOffsets2 =
consumerGroupCommandExec2.resetOffsets();
- assertEquals(exp1, toOffsetMap(importedOffsets2.get(group1)));
-
- adminZkClient().deleteTopic(TOPIC);
- }
-
- @Test
- public void testResetWithUnrecognizedNewConsumerOption() {
- String[] cgcArgs = new String[]{"--new-consumer",
"--bootstrap-server", bootstrapServers(listenerName()), "--reset-offsets",
- "--group", GROUP, "--all-topics", "--to-offset", "2", "--export"};
+ public void testResetOffsetsExportImportPlan(ClusterInstance cluster)
throws Exception {
+ for (GroupProtocol groupProtocol : cluster.supportedGroupProtocols()) {
+ String group1 = generateRandomGroupId();
+ String group2 = generateRandomGroupId();
+ String topic1 = generateRandomTopic();
+ String topic2 = generateRandomTopic();
+
+ TopicPartition t1p0 = new TopicPartition(topic1, 0);
+ TopicPartition t1p1 = new TopicPartition(topic1, 1);
+ TopicPartition t2p0 = new TopicPartition(topic2, 0);
+ TopicPartition t2p1 = new TopicPartition(topic2, 1);
+ String[] cgcArgs = buildArgsForGroups(cluster, asList(group1,
group2),
+ "--all-topics", "--to-offset", "2", "--export");
+ File file = TestUtils.tempFile("reset", ".csv");
+ // Multiple --group's offset import
+ String[] cgcArgsExec = buildArgsForGroups(cluster, asList(group1,
group2),
+ "--all-topics",
+ "--from-file", file.getCanonicalPath(), "--dry-run");
+ // Single --group offset import using
"group,topic,partition,offset" csv format
+ String[] cgcArgsExec2 = buildArgsForGroup(cluster, group1,
"--all-topics",
+ "--from-file", file.getCanonicalPath(), "--dry-run");
+
+ try (Admin admin = cluster.createAdminClient();
+ ConsumerGroupCommand.ConsumerGroupService service =
getConsumerGroupService(cgcArgs);
+ BufferedWriter bw = new BufferedWriter(new FileWriter(file));
+ ConsumerGroupCommand.ConsumerGroupService serviceExec =
getConsumerGroupService(cgcArgsExec);
+ ConsumerGroupCommand.ConsumerGroupService serviceExec2 =
getConsumerGroupService(cgcArgsExec2)) {
+
+ admin.createTopics(asList(new NewTopic(topic1, 2, (short) 1),
+ new NewTopic(topic2, 2, (short) 1))).all().get();
+
+ produceConsumeAndShutdown(cluster, topic1, group1, 1,
groupProtocol);
+ produceConsumeAndShutdown(cluster, topic2, group2, 1,
groupProtocol);
+
+ awaitConsumerGroupInactive(service, group1);
+ awaitConsumerGroupInactive(service, group2);
+
+ Map<String, Map<TopicPartition, OffsetAndMetadata>>
exportedOffsets = service.resetOffsets();
+ bw.write(service.exportOffsetsToCsv(exportedOffsets));
+ bw.close();
+
+ Map<TopicPartition, Long> exp1 = new HashMap<>();
+ exp1.put(t1p0, 2L);
+ exp1.put(t1p1, 2L);
+ Map<TopicPartition, Long> exp2 = new HashMap<>();
+ exp2.put(t2p0, 2L);
+ exp2.put(t2p1, 2L);
+
+ assertEquals(exp1, toOffsetMap(exportedOffsets.get(group1)));
+ assertEquals(exp2, toOffsetMap(exportedOffsets.get(group2)));
+
+ Map<String, Map<TopicPartition, OffsetAndMetadata>>
importedOffsets = serviceExec.resetOffsets();
+ assertEquals(exp1, toOffsetMap(importedOffsets.get(group1)));
+ assertEquals(exp2, toOffsetMap(importedOffsets.get(group2)));
+
+ Map<String, Map<TopicPartition, OffsetAndMetadata>>
importedOffsets2 = serviceExec2.resetOffsets();
+ assertEquals(exp1, toOffsetMap(importedOffsets2.get(group1)));
+
+ admin.deleteTopics(asList(topic1, topic2));
+ }
+ }
+ }
+
+ @ClusterTemplate("generator")
+ public void testResetWithUnrecognizedNewConsumerOption(ClusterInstance
cluster) {
+ String group = generateRandomGroupId();
+ String[] cgcArgs = new String[]{"--new-consumer",
+ "--bootstrap-server", cluster.bootstrapServers(),
+ "--reset-offsets", "--group", group, "--all-topics",
+ "--to-offset", "2", "--export"};
assertThrows(OptionException.class, () ->
getConsumerGroupService(cgcArgs));
}
- private void produceMessages(String topic, int numMessages) {
- List<ProducerRecord<byte[], byte[]>> records = IntStream.range(0,
numMessages)
- .mapToObj(i -> new ProducerRecord<byte[], byte[]>(topic, new
byte[100 * 1000]))
- .collect(Collectors.toList());
- kafka.utils.TestUtils.produceMessages(servers(), seq(records), 1);
+ private String generateRandomTopic() {
+ return TOPIC_PREFIX + TestUtils.randomString(10);
}
- private void produceConsumeAndShutdown(String topic, String group, int
totalMessages, int numConsumers) throws Exception {
- produceMessages(topic, totalMessages);
- ConsumerGroupExecutor executor =
addConsumerGroupExecutor(numConsumers, topic, group,
GroupProtocol.CLASSIC.name);
- awaitConsumerProgress(topic, group, totalMessages);
- executor.shutdown();
+ private String generateRandomGroupId() {
+ return GROUP_PREFIX + TestUtils.randomString(10);
}
- private void awaitConsumerProgress(String topic,
- String group,
- long count) throws Exception {
- try (Consumer<String, String> consumer =
createNoAutoCommitConsumer(group)) {
- Set<TopicPartition> partitions =
consumer.partitionsFor(topic).stream()
- .map(partitionInfo -> new
TopicPartition(partitionInfo.topic(), partitionInfo.partition()))
- .collect(Collectors.toSet());
-
- TestUtils.waitForCondition(() -> {
- Collection<OffsetAndMetadata> committed =
consumer.committed(partitions).values();
- long total = committed.stream()
- .mapToLong(offsetAndMetadata ->
Optional.ofNullable(offsetAndMetadata).map(OffsetAndMetadata::offset).orElse(0L))
- .sum();
-
- return total == count;
- }, "Expected that consumer group has consumed all messages from
topic/partition. " +
- "Expected offset: " + count + ". Actual offset: " +
committedOffsets(topic,
group).values().stream().mapToLong(Long::longValue).sum());
+ private Map<TopicPartition, Long> committedOffsets(ClusterInstance cluster,
+ String topic,
+ String group) {
+ try (Admin admin = Admin.create(singletonMap(BOOTSTRAP_SERVERS_CONFIG,
cluster.bootstrapServers()))) {
+ return admin.listConsumerGroupOffsets(group)
+ .all().get()
+ .get(group).entrySet()
+ .stream()
+ .filter(e -> e.getKey().topic().equals(topic))
+ .collect(toMap(Map.Entry::getKey, e ->
e.getValue().offset()));
+ } catch (ExecutionException | InterruptedException e) {
+ throw new RuntimeException(e);
}
}
- private void
awaitConsumerGroupInactive(ConsumerGroupCommand.ConsumerGroupService
consumerGroupService, String group) throws Exception {
- TestUtils.waitForCondition(() -> {
- ConsumerGroupState state =
consumerGroupService.collectGroupState(group).state;
- return Objects.equals(state, ConsumerGroupState.EMPTY) ||
Objects.equals(state, ConsumerGroupState.DEAD);
- }, "Expected that consumer group is inactive. Actual state: " +
consumerGroupService.collectGroupState(group).state);
+ private ConsumerGroupCommand.ConsumerGroupService
getConsumerGroupService(String[] args) {
+ return new ConsumerGroupCommand.ConsumerGroupService(
+ ConsumerGroupCommandOptions.fromArgs(args),
+ singletonMap(RETRIES_CONFIG,
Integer.toString(Integer.MAX_VALUE)));
}
- private void resetAndAssertOffsets(String[] args,
- long expectedOffset) {
- resetAndAssertOffsets(args, expectedOffset, false,
Collections.singletonList(TOPIC));
+ private void produceMessages(ClusterInstance cluster, String topic, int
numMessages) {
+ List<ProducerRecord<byte[], byte[]>> records = IntStream.range(0,
numMessages)
+ .mapToObj(i -> new ProducerRecord<byte[], byte[]>(topic, new
byte[100 * 1000]))
+ .collect(Collectors.toList());
+ produceMessages(cluster, records);
+ }
+
+ private void produceMessages(ClusterInstance cluster,
List<ProducerRecord<byte[], byte[]>> records) {
+ try (Producer<byte[], byte[]> producer = createProducer(cluster)) {
+ records.forEach(producer::send);
+ }
}
- private void resetAndAssertOffsets(String[] args,
- long expectedOffset,
- boolean dryRun,
- List<String> topics) {
- ConsumerGroupCommand.ConsumerGroupService consumerGroupCommand =
getConsumerGroupService(args);
- Map<String, Map<TopicPartition, Long>> expectedOffsets =
topics.stream().collect(Collectors.toMap(
- Function.identity(),
- topic -> Collections.singletonMap(new TopicPartition(topic, 0),
expectedOffset)));
- Map<String, Map<TopicPartition, OffsetAndMetadata>>
resetOffsetsResultByGroup = resetOffsets(consumerGroupCommand);
+ private Producer<byte[], byte[]> createProducer(ClusterInstance cluster) {
+ Properties props = new Properties();
+ props.put(BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers());
+ props.put(ACKS_CONFIG, "1");
+ props.put(KEY_SERIALIZER_CLASS_CONFIG,
ByteArraySerializer.class.getName());
+ props.put(VALUE_SERIALIZER_CLASS_CONFIG,
ByteArraySerializer.class.getName());
+ return new KafkaProducer<>(props);
+ }
- try {
+ private void resetAndAssertOffsets(ClusterInstance cluster,
+ String topic,
+ String[] args,
+ long expectedOffset) {
+ resetAndAssertOffsets(cluster, args, expectedOffset, false,
singletonList(topic));
+ }
+
+ private void resetAndAssertOffsets(ClusterInstance cluster,
+ String[] args,
+ long expectedOffset,
+ boolean dryRun,
+ List<String> topics) {
+ try (ConsumerGroupCommand.ConsumerGroupService service =
getConsumerGroupService(args)) {
+ Map<String, Map<TopicPartition, Long>> topicToExpectedOffsets =
getTopicExceptOffsets(topics, expectedOffset);
+ Map<String, Map<TopicPartition, OffsetAndMetadata>>
resetOffsetsResultByGroup =
+ resetOffsets(service);
for (final String topic : topics) {
resetOffsetsResultByGroup.forEach((group, partitionInfo) -> {
- Map<TopicPartition, Long> priorOffsets =
committedOffsets(topic, group);
- assertEquals(expectedOffsets.get(topic),
- partitionInfo.entrySet().stream()
- .filter(entry ->
Objects.equals(entry.getKey().topic(), topic))
- .collect(Collectors.toMap(Map.Entry::getKey, e ->
e.getValue().offset())));
- assertEquals(dryRun ? priorOffsets :
expectedOffsets.get(topic), committedOffsets(topic, group));
+ Map<TopicPartition, Long> priorOffsets =
committedOffsets(cluster, topic, group);
+ assertEquals(topicToExpectedOffsets.get(topic),
partitionToOffsets(topic, partitionInfo));
+ assertEquals(dryRun ? priorOffsets :
topicToExpectedOffsets.get(topic),
+ committedOffsets(cluster, topic, group));
});
}
- } finally {
- consumerGroupCommand.close();
}
}
- private void
resetAndAssertOffsetsCommitted(ConsumerGroupCommand.ConsumerGroupService
consumerGroupService,
- Map<TopicPartition, Long>
expectedOffsets,
- String topic) {
- Map<String, Map<TopicPartition, OffsetAndMetadata>> allResetOffsets =
resetOffsets(consumerGroupService);
-
- allResetOffsets.forEach((group, offsetsInfo) -> {
- offsetsInfo.forEach((tp, offsetMetadata) -> {
- assertEquals(offsetMetadata.offset(), expectedOffsets.get(tp));
- assertEquals(expectedOffsets, committedOffsets(topic, group));
- });
- });
+ private Map<String, Map<TopicPartition, Long>>
getTopicExceptOffsets(List<String> topics,
+ long
expectedOffset) {
+ return topics.stream()
+ .collect(toMap(Function.identity(),
+ topic -> singletonMap(new TopicPartition(topic, 0),
+ expectedOffset)));
}
- private Map<String, Map<TopicPartition, OffsetAndMetadata>>
resetOffsets(ConsumerGroupCommand.ConsumerGroupService consumerGroupService) {
+ private Map<String, Map<TopicPartition, OffsetAndMetadata>> resetOffsets(
+ ConsumerGroupCommand.ConsumerGroupService consumerGroupService) {
return consumerGroupService.resetOffsets();
}
- Map<TopicPartition, Long> toOffsetMap(Map<TopicPartition,
OffsetAndMetadata> map) {
- return
map.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e ->
e.getValue().offset()));
+ private Map<TopicPartition, Long> partitionToOffsets(String topic,
+ Map<TopicPartition,
OffsetAndMetadata> partitionInfo) {
+ return partitionInfo.entrySet()
+ .stream()
+ .filter(entry -> Objects.equals(entry.getKey().topic(), topic))
+ .collect(toMap(Map.Entry::getKey, e -> e.getValue().offset()));
+ }
+
+ private static List<String> generateIds(String name) {
+ return IntStream.rangeClosed(1, 2)
+ .mapToObj(id -> name + id)
+ .collect(Collectors.toList());
+ }
+
+ private void produceConsumeAndShutdown(ClusterInstance cluster,
+ String topic,
+ String group,
+ int numConsumers,
+ GroupProtocol groupProtocol) throws
Exception {
+ produceMessages(cluster, topic, 100);
+ try (AutoCloseable consumerGroupCloseable =
+ consumerGroupClosable(cluster, numConsumers, topic,
group, groupProtocol)) {
+ awaitConsumerProgress(cluster, topic, group, 100);
+ }
+ }
+
+ private AutoCloseable consumerGroupClosable(ClusterInstance cluster,
+ int numConsumers,
+ String topic,
+ String group,
+ GroupProtocol groupProtocol) {
+ Map<String, Object> configs = composeConsumerConfigs(cluster, group,
groupProtocol);
+ return ConsumerGroupCommandTestUtils.buildConsumers(
+ numConsumers,
+ false,
+ topic,
+ () -> new KafkaConsumer<String, String>(configs));
+ }
+
+ private Map<String, Object> composeConsumerConfigs(ClusterInstance cluster,
+ String group,
+ GroupProtocol
groupProtocol) {
+ HashMap<String, Object> configs = new HashMap<>();
+ configs.put(BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers());
+ configs.put(GROUP_ID_CONFIG, group);
+ configs.put(GROUP_PROTOCOL_CONFIG, groupProtocol.name);
+ configs.put(KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
+ configs.put(VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
+ configs.put(PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
RangeAssignor.class.getName());
+ return configs;
+ }
+
+ private void awaitConsumerProgress(ClusterInstance cluster,
+ String topic,
+ String group,
+ long count) throws Exception {
+ try (Admin admin = Admin.create(singletonMap(BOOTSTRAP_SERVERS_CONFIG,
cluster.bootstrapServers()))) {
+ TestUtils.waitForCondition(() ->
admin.listConsumerGroupOffsets(group)
Review Comment:
Could you rewrite it by `committedOffsets`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]