This is an automated email from the ASF dual-hosted git repository.
clolov pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 949f1a74c1d MINOR: Cleanups in Tools module (#21823)
949f1a74c1d is described below
commit 949f1a74c1d8eabbba9774d91de2b9afa0dd26fb
Author: Sanskar Jhajharia <[email protected]>
AuthorDate: Thu Mar 26 18:41:02 2026 +0530
MINOR: Cleanups in Tools module (#21823)
The PR does some very minor cleanups in the Tools module predominantly
converting some inner classes to records as and when applicable.
Additionally some places where we could use smart lambdas.
Reviewers: Chia-Ping Tsai <[email protected]>, Ken Huang
<[email protected]>, Christo Lolov <[email protected]>
---
.../apache/kafka/tools/MetadataQuorumCommand.java | 13 +++-------
.../apache/kafka/tools/reassign/PartitionMove.java | 30 +++-------------------
.../tools/reassign/ReassignPartitionsCommand.java | 12 ++++-----
.../apache/kafka/tools/DumpLogSegmentsTest.java | 16 ++++--------
.../kafka/tools/LeaderElectionCommandTest.java | 12 ++++-----
.../apache/kafka/tools/ResetIntegrationTest.java | 8 ++----
.../consumer/group/ConsumerGroupServiceTest.java | 4 +--
.../tools/streams/StreamsGroupCommandTest.java | 2 +-
8 files changed, 29 insertions(+), 68 deletions(-)
diff --git
a/tools/src/main/java/org/apache/kafka/tools/MetadataQuorumCommand.java
b/tools/src/main/java/org/apache/kafka/tools/MetadataQuorumCommand.java
index 996be169315..bb1097fcfc0 100644
--- a/tools/src/main/java/org/apache/kafka/tools/MetadataQuorumCommand.java
+++ b/tools/src/main/java/org/apache/kafka/tools/MetadataQuorumCommand.java
@@ -300,15 +300,10 @@ public class MetadataQuorumCommand {
return node == null ? new ArrayList<>() : node.endpoints();
}
- private static class Node {
- private final int id;
- private final Uuid directoryId;
- private final List<RaftVoterEndpoint> endpoints;
-
- private Node(int id, Uuid directoryId, List<RaftVoterEndpoint>
endpoints) {
- this.id = id;
- this.directoryId = Objects.requireNonNull(directoryId);
- this.endpoints = Objects.requireNonNull(endpoints);
+ private record Node(int id, Uuid directoryId, List<RaftVoterEndpoint>
endpoints) {
+ private Node {
+ Objects.requireNonNull(directoryId);
+ Objects.requireNonNull(endpoints);
}
@Override
diff --git
a/tools/src/main/java/org/apache/kafka/tools/reassign/PartitionMove.java
b/tools/src/main/java/org/apache/kafka/tools/reassign/PartitionMove.java
index 571c21f9858..a20dff10be9 100644
--- a/tools/src/main/java/org/apache/kafka/tools/reassign/PartitionMove.java
+++ b/tools/src/main/java/org/apache/kafka/tools/reassign/PartitionMove.java
@@ -17,36 +17,12 @@
package org.apache.kafka.tools.reassign;
-import java.util.Objects;
import java.util.Set;
/**
* A partition movement. The source and destination brokers may overlap.
+ * @param sources The source brokers.
+ * @param destinations The destination brokers.
*/
-final class PartitionMove {
- public final Set<Integer> sources;
-
- public final Set<Integer> destinations;
-
- /**
- * @param sources The source brokers.
- * @param destinations The destination brokers.
- */
- public PartitionMove(Set<Integer> sources, Set<Integer> destinations) {
- this.sources = sources;
- this.destinations = destinations;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
- PartitionMove that = (PartitionMove) o;
- return Objects.equals(sources, that.sources) &&
Objects.equals(destinations, that.destinations);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(sources, destinations);
- }
+record PartitionMove(Set<Integer> sources, Set<Integer> destinations) {
}
diff --git
a/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java
b/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java
index 1933e4e3526..8105ba957d6 100644
---
a/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java
+++
b/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java
@@ -1061,7 +1061,7 @@ public class ReassignPartitionsCommand {
if (partMoves.containsKey(part.partition())) {
PartitionMove move = partMoves.get(part.partition());
- sources.addAll(move.sources);
+ sources.addAll(move.sources());
} else if (currentParts.containsKey(part))
sources.addAll(currentParts.get(part));
else
@@ -1086,7 +1086,7 @@ public class ReassignPartitionsCommand {
moveMap.forEach((topicName, partMoveMap) -> {
Set<String> components = new TreeSet<>();
partMoveMap.forEach((partId, move) ->
- move.sources.forEach(source ->
components.add(String.format("%d:%d", partId, source))));
+ move.sources().forEach(source ->
components.add(String.format("%d:%d", partId, source))));
results.put(topicName, String.join(",", components));
});
return results;
@@ -1103,8 +1103,8 @@ public class ReassignPartitionsCommand {
moveMap.forEach((topicName, partMoveMap) -> {
Set<String> components = new TreeSet<>();
partMoveMap.forEach((partId, move) ->
- move.destinations.forEach(destination -> {
- if (!move.sources.contains(destination)) {
+ move.destinations().forEach(destination -> {
+ if (!move.sources().contains(destination)) {
components.add(String.format("%d:%d", partId,
destination));
}
})
@@ -1124,8 +1124,8 @@ public class ReassignPartitionsCommand {
static Set<Integer> calculateReassigningBrokers(Map<String, Map<Integer,
PartitionMove>> moveMap) {
Set<Integer> reassigningBrokers = new TreeSet<>();
moveMap.values().forEach(partMoveMap ->
partMoveMap.values().forEach(partMove -> {
- reassigningBrokers.addAll(partMove.sources);
- reassigningBrokers.addAll(partMove.destinations);
+ reassigningBrokers.addAll(partMove.sources());
+ reassigningBrokers.addAll(partMove.destinations());
}));
return reassigningBrokers;
}
diff --git
a/tools/src/test/java/org/apache/kafka/tools/DumpLogSegmentsTest.java
b/tools/src/test/java/org/apache/kafka/tools/DumpLogSegmentsTest.java
index 9221f02df5a..66565fc5e38 100644
--- a/tools/src/test/java/org/apache/kafka/tools/DumpLogSegmentsTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/DumpLogSegmentsTest.java
@@ -138,17 +138,11 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
public class DumpLogSegmentsTest {
private static final Pattern SIZE_PATTERN =
Pattern.compile(".+?size:\\s(\\d+).+");
- private static final class BatchInfo {
- private final List<SimpleRecord> records;
- private final boolean hasKeys;
- private final boolean hasValues;
-
- private BatchInfo(List<SimpleRecord> records, boolean hasKeys, boolean
hasValues) {
- this.records = records;
- this.hasKeys = hasKeys;
- this.hasValues = hasValues;
- }
- }
+ private record BatchInfo(
+ List<SimpleRecord> records,
+ boolean hasKeys,
+ boolean hasValues
+ ) { }
private final File tmpDir = TestUtils.tempDir();
private final File logDir = TestUtils.randomPartitionLogDir(tmpDir);
diff --git
a/tools/src/test/java/org/apache/kafka/tools/LeaderElectionCommandTest.java
b/tools/src/test/java/org/apache/kafka/tools/LeaderElectionCommandTest.java
index 7894e34d9fd..e9fbc28cead 100644
--- a/tools/src/test/java/org/apache/kafka/tools/LeaderElectionCommandTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/LeaderElectionCommandTest.java
@@ -111,13 +111,13 @@ public class LeaderElectionCommandTest {
Path adminConfigPath = tempAdminConfig(defaultApiTimeoutMs,
requestTimeoutMs);
try (final MockedStatic<Admin> mockedAdmin =
Mockito.mockStatic(Admin.class)) {
- String output = ToolsTestUtils.captureStandardOut(() -> {
+ String output = ToolsTestUtils.captureStandardOut(() ->
LeaderElectionCommand.mainNoExit(
"--bootstrap-server", cluster.bootstrapServers(),
"--election-type", "unclean", "--all-topic-partitions",
"--admin.config", adminConfigPath.toString()
- );
- });
+ )
+ );
assertTrue(output.contains("Option --admin.config has been
deprecated and will be removed in a future version. Use --command-config
instead."));
ArgumentCaptor<Properties> argumentCaptor =
ArgumentCaptor.forClass(Properties.class);
@@ -161,14 +161,14 @@ public class LeaderElectionCommandTest {
// Mock Exit because CommandLineUtils.checkInvalidArgs calls exit
Exit.setExitProcedure(new ToolsTestUtils.MockExitProcedure());
- String output = ToolsTestUtils.captureStandardErr(() -> {
+ String output = ToolsTestUtils.captureStandardErr(() ->
LeaderElectionCommand.mainNoExit(
"--bootstrap-server", "localhost:9092",
"--election-type", "unclean", "--all-topic-partitions",
"--admin.config", adminConfigPath.toString(),
"--command-config", adminConfigPath.toString()
- );
- });
+ )
+ );
assertTrue(output.contains(String.format("Option \"%s\" can't be
used with option \"%s\"",
"[admin.config]", "[command-config]")));
diff --git
a/tools/src/test/java/org/apache/kafka/tools/ResetIntegrationTest.java
b/tools/src/test/java/org/apache/kafka/tools/ResetIntegrationTest.java
index 9b881fc8ece..f11578e2314 100644
--- a/tools/src/test/java/org/apache/kafka/tools/ResetIntegrationTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/ResetIntegrationTest.java
@@ -257,9 +257,7 @@ public class ResetIntegrationTest extends
AbstractResetIntegrationTest {
};
try (final MockedStatic<Admin> mockedAdmin =
Mockito.mockStatic(Admin.class, Mockito.CALLS_REAL_METHODS)) {
- String output = ToolsTestUtils.captureStandardOut(() -> {
- new StreamsResetter().execute(parameters);
- });
+ String output = ToolsTestUtils.captureStandardOut(() -> new
StreamsResetter().execute(parameters));
assertTrue(output.contains("Option --config-file has been
deprecated and will be removed in a future version. Use --command-config
instead."));
ArgumentCaptor<Properties> argumentCaptor =
ArgumentCaptor.forClass(Properties.class);
@@ -308,9 +306,7 @@ public class ResetIntegrationTest extends
AbstractResetIntegrationTest {
// Mock Exit because CommandLineUtils.checkInvalidArgs calls exit
Exit.setExitProcedure(new ToolsTestUtils.MockExitProcedure());
- String output = ToolsTestUtils.captureStandardErr(() -> {
- new StreamsResetter().execute(parameters);
- });
+ String output = ToolsTestUtils.captureStandardErr(() -> new
StreamsResetter().execute(parameters));
assertTrue(output.contains(String.format("Option \"%s\" can't be
used with option \"%s\"",
"[config-file]", "[command-config]")));
diff --git
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupServiceTest.java
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupServiceTest.java
index 0a009f4dc09..b40af8d7606 100644
---
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupServiceTest.java
+++
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupServiceTest.java
@@ -184,12 +184,12 @@ public class ConsumerGroupServiceTest {
Map<TopicPartition, Optional<Long>> returnedOffsets =
assignments.map(results ->
results.stream().collect(Collectors.toMap(
assignment -> new TopicPartition(assignment.topic().get(),
assignment.partition().get()),
- assignment -> assignment.offset()))
+ PartitionAssignmentState::offset))
).orElse(Map.of());
Map<TopicPartition, Optional<Integer>> returnedLeaderEpoch =
assignments.map(results ->
results.stream().collect(Collectors.toMap(
assignment -> new TopicPartition(assignment.topic().get(),
assignment.partition().get()),
- assignment -> assignment.leaderEpoch()))
+ PartitionAssignmentState::leaderEpoch))
).orElse(Map.of());
Map<TopicPartition, Optional<Long>> expectedOffsets = Map.of(
diff --git
a/tools/src/test/java/org/apache/kafka/tools/streams/StreamsGroupCommandTest.java
b/tools/src/test/java/org/apache/kafka/tools/streams/StreamsGroupCommandTest.java
index 6f087f13f09..20f6ed26e60 100644
---
a/tools/src/test/java/org/apache/kafka/tools/streams/StreamsGroupCommandTest.java
+++
b/tools/src/test/java/org/apache/kafka/tools/streams/StreamsGroupCommandTest.java
@@ -473,7 +473,7 @@ public class StreamsGroupCommandTest {
when(adminClient.listStreamsGroupOffsets(anyMap(),
any(ListStreamsGroupOffsetsOptions.class))).thenReturn(result);
when(result.partitionsToOffsetAndMetadata(anyString())).thenReturn(KafkaFuture.completedFuture(committedOffsetsMap));
StreamsGroupCommand.StreamsGroupService service =
getStreamsGroupService(args.toArray(new String[0]), adminClient);
- assertThrows(UnknownTopicOrPartitionException.class, () ->
service.resetOffsets());
+ assertThrows(UnknownTopicOrPartitionException.class,
service::resetOffsets);
service.close();
}