This is an automated email from the ASF dual-hosted git repository.
FrankYang0529 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 7b7fae7a636 MINOR: Convert GroupOffsetsResetterOptions to record
(#22251)
7b7fae7a636 is described below
commit 7b7fae7a636f23edf0ce1148ba9bba8d1e8f2bf9
Author: majialong <[email protected]>
AuthorDate: Mon May 11 16:16:57 2026 +0800
MINOR: Convert GroupOffsetsResetterOptions to record (#22251)
Convert `GroupOffsetsResetterOptions` to record.
Reviewers: Ken Huang <[email protected]>, Maros Orsak
<[email protected]>, PoAn Yang <[email protected]>
---
.../apache/kafka/tools/GroupOffsetsResetter.java | 59 +++++++---------------
1 file changed, 19 insertions(+), 40 deletions(-)
diff --git
a/tools/src/main/java/org/apache/kafka/tools/GroupOffsetsResetter.java
b/tools/src/main/java/org/apache/kafka/tools/GroupOffsetsResetter.java
index c5f983fb0c0..16725f9fc83 100644
--- a/tools/src/main/java/org/apache/kafka/tools/GroupOffsetsResetter.java
+++ b/tools/src/main/java/org/apache/kafka/tools/GroupOffsetsResetter.java
@@ -94,9 +94,9 @@ public class GroupOffsetsResetter {
}
public Optional<Map<String, Map<TopicPartition, OffsetAndMetadata>>>
resetPlanFromFile() {
- if (opts.resetFromFileOpt != null && !opts.resetFromFileOpt.isEmpty())
{
+ if (opts.resetFromFileOpt() != null &&
!opts.resetFromFileOpt().isEmpty()) {
try {
- String resetPlanPath = opts.resetFromFileOpt.get(0);
+ String resetPlanPath = opts.resetFromFileOpt().get(0);
String resetPlanCsv = Utils.readFileAsString(resetPlanPath);
Map<String, Map<TopicPartition, OffsetAndMetadata>> resetPlan
= parseResetPlan(resetPlanCsv);
return Optional.of(resetPlan);
@@ -109,7 +109,7 @@ public class GroupOffsetsResetter {
private Map<String, Map<TopicPartition, OffsetAndMetadata>>
parseResetPlan(String resetPlanCsv) {
ObjectReader csvReader =
CsvUtils.readerFor(CsvUtils.CsvRecordNoGroup.class);
String[] lines = resetPlanCsv.split("\n");
- boolean isSingleGroupQuery = opts.groupOpt.size() == 1;
+ boolean isSingleGroupQuery = opts.groupOpt().size() == 1;
boolean isOldCsvFormat = false;
try {
if (lines.length > 0) {
@@ -126,7 +126,7 @@ public class GroupOffsetsResetter {
try {
// Single group CSV format: "topic,partition,offset"
if (isSingleGroupQuery && isOldCsvFormat) {
- String group = opts.groupOpt.get(0);
+ String group = opts.groupOpt().get(0);
for (String line : lines) {
CsvUtils.CsvRecordNoGroup rec = csvReader.readValue(line,
CsvUtils.CsvRecordNoGroup.class);
dataMap.computeIfAbsent(group, k -> new HashMap<>())
@@ -298,8 +298,8 @@ public class GroupOffsetsResetter {
}
public Map<TopicPartition, OffsetAndMetadata>
resetToOffset(Collection<TopicPartition> partitionsToReset) {
- long offset = opts.resetToOffsetOpt != null &&
!opts.resetToOffsetOpt.isEmpty()
- ? opts.resetToOffsetOpt.get(0)
+ long offset = opts.resetToOffsetOpt() != null &&
!opts.resetToOffsetOpt().isEmpty()
+ ? opts.resetToOffsetOpt().get(0)
: 0L;
return
checkOffsetsRange(partitionsToReset.stream().collect(Collectors.toMap(Function.identity(),
tp -> offset)))
.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e
-> new OffsetAndMetadata(e.getValue())));
@@ -336,7 +336,7 @@ public class GroupOffsetsResetter {
Map<TopicPartition, OffsetAndMetadata> currentCommittedOffsets) {
Map<TopicPartition, Long> requestedOffsets =
partitionsToReset.stream().collect(Collectors.toMap(Function.identity(),
topicPartition -> {
- long shiftBy = opts.resetShiftByOpt;
+ long shiftBy = opts.resetShiftByOpt();
OffsetAndMetadata currentOffset =
currentCommittedOffsets.get(topicPartition);
if (currentOffset == null) {
@@ -351,7 +351,7 @@ public class GroupOffsetsResetter {
public Map<TopicPartition, OffsetAndMetadata>
resetToDateTime(Collection<TopicPartition> partitionsToReset) {
try {
- long timestamp = Utils.getDateTime(opts.resetToDatetimeOpt.get(0));
+ long timestamp =
Utils.getDateTime(opts.resetToDatetimeOpt().get(0));
Map<TopicPartition, LogOffsetResult> logTimestampOffsets =
getLogTimestampOffsets(partitionsToReset, timestamp);
return
partitionsToReset.stream().collect(Collectors.toMap(Function.identity(),
topicPartition -> {
@@ -367,7 +367,7 @@ public class GroupOffsetsResetter {
}
public Map<TopicPartition, OffsetAndMetadata>
resetByDuration(Collection<TopicPartition> partitionsToReset) {
- String duration = opts.resetByDurationOpt;
+ String duration = opts.resetByDurationOpt();
Duration durationParsed = Duration.parse(duration);
Instant now = Instant.now();
durationParsed.negated().addTo(now);
@@ -489,7 +489,7 @@ public class GroupOffsetsResetter {
}
private <T extends AbstractOptions<T>> T withTimeoutMs(T options) {
- int t = (int) opts.timeoutMsOpt;
+ int t = (int) opts.timeoutMsOpt();
return options.timeoutMs(t);
}
@@ -508,41 +508,20 @@ public class GroupOffsetsResetter {
public static class Ignore implements LogOffsetResult { }
- public static class GroupOffsetsResetterOptions {
- List<String> groupOpt;
- List<Long> resetToOffsetOpt;
- List<String> resetFromFileOpt;
- List<String> resetToDatetimeOpt;
- String resetByDurationOpt;
- Long resetShiftByOpt;
- long timeoutMsOpt;
+ public record GroupOffsetsResetterOptions(
+ List<String> groupOpt,
+ List<Long> resetToOffsetOpt,
+ List<String> resetFromFileOpt,
+ List<String> resetToDatetimeOpt,
+ String resetByDurationOpt,
+ Long resetShiftByOpt,
+ long timeoutMsOpt) {
public GroupOffsetsResetterOptions(
List<String> groupOpt,
- List<Long> resetToOffsetOpt,
- List<String> resetFromFileOpt,
List<String> resetToDatetimeOpt,
- String resetByDurationOpt,
- Long resetShiftByOpt,
long timeoutMsOpt) {
-
- this.groupOpt = groupOpt;
- this.resetToOffsetOpt = resetToOffsetOpt;
- this.resetFromFileOpt = resetFromFileOpt;
- this.resetToDatetimeOpt = resetToDatetimeOpt;
- this.resetByDurationOpt = resetByDurationOpt;
- this.resetShiftByOpt = resetShiftByOpt;
- this.timeoutMsOpt = timeoutMsOpt;
- }
-
- public GroupOffsetsResetterOptions(
- List<String> groupOpt,
- List<String> resetToDatetimeOpt,
- long timeoutMsOpt) {
-
- this.groupOpt = groupOpt;
- this.resetToDatetimeOpt = resetToDatetimeOpt;
- this.timeoutMsOpt = timeoutMsOpt;
+ this(groupOpt, null, null, resetToDatetimeOpt, null, null,
timeoutMsOpt);
}
}
}