This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 0da9cacffab MINOR: Cleanups in Tools Module (3/n) (#20332)
0da9cacffab is described below
commit 0da9cacffaba276e1bc6c1301aed449aaff788b9
Author: Sanskar Jhajharia <[email protected]>
AuthorDate: Tue Aug 19 19:54:13 2025 +0530
MINOR: Cleanups in Tools Module (3/n) (#20332)
This PR aims at cleaning up the tools module further by getting rid of
some extra code which can be replaced by `record`
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../java/org/apache/kafka/tools/AclCommand.java | 168 ++++++++++-----------
.../org/apache/kafka/tools/ConnectPluginPath.java | 35 +----
.../apache/kafka/tools/MetadataQuorumCommand.java | 3 +-
.../apache/kafka/tools/OAuthCompatibilityTool.java | 9 +-
.../java/org/apache/kafka/tools/OffsetsUtils.java | 11 +-
.../kafka/tools/PushHttpMetricsReporter.java | 85 ++---------
.../kafka/tools/ReplicaVerificationTool.java | 13 +-
.../apache/kafka/tools/TransactionsCommand.java | 31 ++--
.../tools/consumer/group/ConsumerGroupCommand.java | 96 ++++++------
.../tools/consumer/group/GroupInformation.java | 28 +---
.../consumer/group/MemberAssignmentState.java | 42 +-----
.../consumer/group/PartitionAssignmentState.java | 42 +-----
.../tools/consumer/group/ShareGroupCommand.java | 22 +--
.../kafka/tools/reassign/ActiveMoveState.java | 37 +----
.../kafka/tools/reassign/CancelledMoveState.java | 32 +---
.../kafka/tools/reassign/CompletedMoveState.java | 27 +---
.../tools/reassign/MissingLogDirMoveState.java | 27 +---
.../tools/reassign/MissingReplicaMoveState.java | 27 +---
.../tools/reassign/ReassignPartitionsCommand.java | 10 +-
.../apache/kafka/tools/ConnectPluginPathTest.java | 27 +---
.../consumer/group/ConsumerGroupServiceTest.java | 8 +-
.../consumer/group/DeleteConsumerGroupsTest.java | 2 +-
.../consumer/group/DescribeConsumerGroupTest.java | 92 +++++------
.../group/ResetConsumerGroupOffsetTest.java | 4 +-
.../reassign/ReassignPartitionsCommandTest.java | 11 +-
25 files changed, 242 insertions(+), 647 deletions(-)
diff --git a/tools/src/main/java/org/apache/kafka/tools/AclCommand.java
b/tools/src/main/java/org/apache/kafka/tools/AclCommand.java
index 791397b2405..fd62e8f3b56 100644
--- a/tools/src/main/java/org/apache/kafka/tools/AclCommand.java
+++ b/tools/src/main/java/org/apache/kafka/tools/AclCommand.java
@@ -73,14 +73,13 @@ public class AclCommand {
public static void main(String[] args) {
AclCommandOptions opts = new AclCommandOptions(args);
- AdminClientService aclCommandService = new AdminClientService(opts);
try (Admin admin = Admin.create(adminConfigs(opts))) {
if (opts.options.has(opts.addOpt)) {
- aclCommandService.addAcls(admin);
+ addAcls(admin, opts);
} else if (opts.options.has(opts.removeOpt)) {
- aclCommandService.removeAcls(admin);
+ removeAcls(admin, opts);
} else if (opts.options.has(opts.listOpt)) {
- aclCommandService.listAcls(admin);
+ listAcls(admin, opts);
}
} catch (Throwable e) {
System.out.println("Error while executing ACL command: " +
e.getMessage());
@@ -102,106 +101,97 @@ public class AclCommand {
return props;
}
- private static class AdminClientService {
-
- private final AclCommandOptions opts;
-
- AdminClientService(AclCommandOptions opts) {
- this.opts = opts;
- }
-
- void addAcls(Admin admin) throws ExecutionException,
InterruptedException {
- Map<ResourcePattern, Set<AccessControlEntry>> resourceToAcl =
getResourceToAcls(opts);
- for (Map.Entry<ResourcePattern, Set<AccessControlEntry>> entry :
resourceToAcl.entrySet()) {
- ResourcePattern resource = entry.getKey();
- Set<AccessControlEntry> acls = entry.getValue();
- System.out.println("Adding ACLs for resource `" + resource +
"`: " + NL + " " + acls.stream().map(a -> "\t" +
a).collect(Collectors.joining(NL)) + NL);
- Collection<AclBinding> aclBindings = acls.stream().map(acl ->
new AclBinding(resource, acl)).collect(Collectors.toList());
- admin.createAcls(aclBindings).all().get();
- }
+ private static void addAcls(Admin admin, AclCommandOptions opts) throws
ExecutionException, InterruptedException {
+ Map<ResourcePattern, Set<AccessControlEntry>> resourceToAcl =
getResourceToAcls(opts);
+ for (Map.Entry<ResourcePattern, Set<AccessControlEntry>> entry :
resourceToAcl.entrySet()) {
+ ResourcePattern resource = entry.getKey();
+ Set<AccessControlEntry> acls = entry.getValue();
+ System.out.println("Adding ACLs for resource `" + resource + "`: "
+ NL + " " + acls.stream().map(a -> "\t" + a).collect(Collectors.joining(NL)) +
NL);
+ Collection<AclBinding> aclBindings = acls.stream().map(acl -> new
AclBinding(resource, acl)).collect(Collectors.toList());
+ admin.createAcls(aclBindings).all().get();
}
+ }
- void removeAcls(Admin admin) throws ExecutionException,
InterruptedException {
- Map<ResourcePatternFilter, Set<AccessControlEntry>> filterToAcl =
getResourceFilterToAcls(opts);
- for (Map.Entry<ResourcePatternFilter, Set<AccessControlEntry>>
entry : filterToAcl.entrySet()) {
- ResourcePatternFilter filter = entry.getKey();
- Set<AccessControlEntry> acls = entry.getValue();
- if (acls.isEmpty()) {
- if (confirmAction(opts, "Are you sure you want to delete
all ACLs for resource filter `" + filter + "`? (y/n)")) {
- removeAcls(admin, acls, filter);
- }
- } else {
- String msg = "Are you sure you want to remove ACLs: " + NL
+
- " " + acls.stream().map(a -> "\t" +
a).collect(Collectors.joining(NL)) + NL +
- " from resource filter `" + filter + "`? (y/n)";
- if (confirmAction(opts, msg)) {
- removeAcls(admin, acls, filter);
- }
+ private static void removeAcls(Admin admin, AclCommandOptions opts) throws
ExecutionException, InterruptedException {
+ Map<ResourcePatternFilter, Set<AccessControlEntry>> filterToAcl =
getResourceFilterToAcls(opts);
+ for (Map.Entry<ResourcePatternFilter, Set<AccessControlEntry>> entry :
filterToAcl.entrySet()) {
+ ResourcePatternFilter filter = entry.getKey();
+ Set<AccessControlEntry> acls = entry.getValue();
+ if (acls.isEmpty()) {
+ if (confirmAction(opts, "Are you sure you want to delete all
ACLs for resource filter `" + filter + "`? (y/n)")) {
+ removeAcls(admin, acls, filter);
+ }
+ } else {
+ String msg = "Are you sure you want to remove ACLs: " + NL +
+ " " + acls.stream().map(a -> "\t" +
a).collect(Collectors.joining(NL)) + NL +
+ " from resource filter `" + filter + "`? (y/n)";
+ if (confirmAction(opts, msg)) {
+ removeAcls(admin, acls, filter);
}
}
}
+ }
- private void listAcls(Admin admin) throws ExecutionException,
InterruptedException {
- Set<ResourcePatternFilter> filters = getResourceFilter(opts,
false);
- Set<KafkaPrincipal> listPrincipals = getPrincipals(opts,
opts.listPrincipalsOpt);
- Map<ResourcePattern, Set<AccessControlEntry>> resourceToAcls =
getAcls(admin, filters);
+ private static void listAcls(Admin admin, AclCommandOptions opts) throws
ExecutionException, InterruptedException {
+ Set<ResourcePatternFilter> filters = getResourceFilter(opts, false);
+ Set<KafkaPrincipal> listPrincipals = getPrincipals(opts,
opts.listPrincipalsOpt);
+ Map<ResourcePattern, Set<AccessControlEntry>> resourceToAcls =
getAcls(admin, filters);
- if (listPrincipals.isEmpty()) {
- printResourceAcls(resourceToAcls);
- } else {
- listPrincipals.forEach(principal -> {
- System.out.println("ACLs for principal `" + principal +
"`");
- Map<ResourcePattern, Set<AccessControlEntry>>
filteredResourceToAcls = resourceToAcls.entrySet().stream()
- .map(entry -> {
- ResourcePattern resource = entry.getKey();
- Set<AccessControlEntry> acls =
entry.getValue().stream()
- .filter(acl ->
principal.toString().equals(acl.principal()))
- .collect(Collectors.toSet());
- return new AbstractMap.SimpleEntry<>(resource,
acls);
- })
- .filter(entry -> !entry.getValue().isEmpty())
- .collect(Collectors.toMap(Map.Entry::getKey,
Map.Entry::getValue));
- printResourceAcls(filteredResourceToAcls);
- });
- }
+ if (listPrincipals.isEmpty()) {
+ printResourceAcls(resourceToAcls);
+ } else {
+ listPrincipals.forEach(principal -> {
+ System.out.println("ACLs for principal `" + principal + "`");
+ Map<ResourcePattern, Set<AccessControlEntry>>
filteredResourceToAcls = resourceToAcls.entrySet().stream()
+ .map(entry -> {
+ ResourcePattern resource = entry.getKey();
+ Set<AccessControlEntry> acls =
entry.getValue().stream()
+ .filter(acl ->
principal.toString().equals(acl.principal()))
+ .collect(Collectors.toSet());
+ return new AbstractMap.SimpleEntry<>(resource,
acls);
+ })
+ .filter(entry -> !entry.getValue().isEmpty())
+ .collect(Collectors.toMap(Map.Entry::getKey,
Map.Entry::getValue));
+ printResourceAcls(filteredResourceToAcls);
+ });
}
+ }
- private static void printResourceAcls(Map<ResourcePattern,
Set<AccessControlEntry>> resourceToAcls) {
- resourceToAcls.forEach((resource, acls) ->
- System.out.println("Current ACLs for resource `" + resource +
"`:" + NL +
- acls.stream().map(acl -> "\t" +
acl).collect(Collectors.joining(NL)) + NL)
- );
- }
+ private static void printResourceAcls(Map<ResourcePattern,
Set<AccessControlEntry>> resourceToAcls) {
+ resourceToAcls.forEach((resource, acls) ->
+ System.out.println("Current ACLs for resource `" + resource + "`:"
+ NL +
+ acls.stream().map(acl -> "\t" +
acl).collect(Collectors.joining(NL)) + NL)
+ );
+ }
- private static void removeAcls(Admin adminClient,
Set<AccessControlEntry> acls, ResourcePatternFilter filter) throws
ExecutionException, InterruptedException {
- if (acls.isEmpty()) {
- adminClient.deleteAcls(List.of(new AclBindingFilter(filter,
AccessControlEntryFilter.ANY))).all().get();
- } else {
- List<AclBindingFilter> aclBindingFilters =
acls.stream().map(acl -> new AclBindingFilter(filter,
acl.toFilter())).collect(Collectors.toList());
- adminClient.deleteAcls(aclBindingFilters).all().get();
- }
+ private static void removeAcls(Admin adminClient, Set<AccessControlEntry>
acls, ResourcePatternFilter filter) throws ExecutionException,
InterruptedException {
+ if (acls.isEmpty()) {
+ adminClient.deleteAcls(List.of(new AclBindingFilter(filter,
AccessControlEntryFilter.ANY))).all().get();
+ } else {
+ List<AclBindingFilter> aclBindingFilters = acls.stream().map(acl
-> new AclBindingFilter(filter, acl.toFilter())).collect(Collectors.toList());
+ adminClient.deleteAcls(aclBindingFilters).all().get();
}
+ }
- private Map<ResourcePattern, Set<AccessControlEntry>> getAcls(Admin
adminClient, Set<ResourcePatternFilter> filters) throws ExecutionException,
InterruptedException {
- Collection<AclBinding> aclBindings;
- if (filters.isEmpty()) {
- aclBindings =
adminClient.describeAcls(AclBindingFilter.ANY).values().get();
- } else {
- aclBindings = new ArrayList<>();
- for (ResourcePatternFilter filter : filters) {
- aclBindings.addAll(adminClient.describeAcls(new
AclBindingFilter(filter, AccessControlEntryFilter.ANY)).values().get());
- }
+ private static Map<ResourcePattern, Set<AccessControlEntry>> getAcls(Admin
adminClient, Set<ResourcePatternFilter> filters) throws ExecutionException,
InterruptedException {
+ Collection<AclBinding> aclBindings;
+ if (filters.isEmpty()) {
+ aclBindings =
adminClient.describeAcls(AclBindingFilter.ANY).values().get();
+ } else {
+ aclBindings = new ArrayList<>();
+ for (ResourcePatternFilter filter : filters) {
+ aclBindings.addAll(adminClient.describeAcls(new
AclBindingFilter(filter, AccessControlEntryFilter.ANY)).values().get());
}
+ }
- Map<ResourcePattern, Set<AccessControlEntry>> resourceToAcls = new
HashMap<>();
- for (AclBinding aclBinding : aclBindings) {
- ResourcePattern resource = aclBinding.pattern();
- Set<AccessControlEntry> acls =
resourceToAcls.getOrDefault(resource, new HashSet<>());
- acls.add(aclBinding.entry());
- resourceToAcls.put(resource, acls);
- }
- return resourceToAcls;
+ Map<ResourcePattern, Set<AccessControlEntry>> resourceToAcls = new
HashMap<>();
+ for (AclBinding aclBinding : aclBindings) {
+ ResourcePattern resource = aclBinding.pattern();
+ Set<AccessControlEntry> acls =
resourceToAcls.getOrDefault(resource, new HashSet<>());
+ acls.add(aclBinding.entry());
+ resourceToAcls.put(resource, acls);
}
+ return resourceToAcls;
}
private static Map<ResourcePattern, Set<AccessControlEntry>>
getResourceToAcls(AclCommandOptions opts) {
diff --git a/tools/src/main/java/org/apache/kafka/tools/ConnectPluginPath.java
b/tools/src/main/java/org/apache/kafka/tools/ConnectPluginPath.java
index c86638e846a..a3da1d3a9c6 100644
--- a/tools/src/main/java/org/apache/kafka/tools/ConnectPluginPath.java
+++ b/tools/src/main/java/org/apache/kafka/tools/ConnectPluginPath.java
@@ -196,22 +196,8 @@ public class ConnectPluginPath {
LIST, SYNC_MANIFESTS
}
- private static class Config {
- private final Command command;
- private final Set<Path> locations;
- private final boolean dryRun;
- private final boolean keepNotFound;
- private final PrintStream out;
- private final PrintStream err;
-
- private Config(Command command, Set<Path> locations, boolean dryRun,
boolean keepNotFound, PrintStream out, PrintStream err) {
- this.command = command;
- this.locations = locations;
- this.dryRun = dryRun;
- this.keepNotFound = keepNotFound;
- this.out = out;
- this.err = err;
- }
+ private record Config(Command command, Set<Path> locations, boolean
dryRun, boolean keepNotFound, PrintStream out,
+ PrintStream err) {
@Override
public String toString() {
@@ -262,16 +248,9 @@ public class ConnectPluginPath {
* <p>This is unique to the (source, class, type) tuple, and contains
additional pre-computed information
* that pertains to this specific plugin.
*/
- private static class Row {
- private final ManifestWorkspace.SourceWorkspace<?> workspace;
- private final String className;
- private final PluginType type;
- private final String version;
- private final List<String> aliases;
- private final boolean loadable;
- private final boolean hasManifest;
-
- public Row(ManifestWorkspace.SourceWorkspace<?> workspace, String
className, PluginType type, String version, List<String> aliases, boolean
loadable, boolean hasManifest) {
+ private record Row(ManifestWorkspace.SourceWorkspace<?> workspace, String
className, PluginType type,
+ String version, List<String> aliases, boolean loadable,
boolean hasManifest) {
+ private Row(ManifestWorkspace.SourceWorkspace<?> workspace, String
className, PluginType type, String version, List<String> aliases, boolean
loadable, boolean hasManifest) {
this.workspace = Objects.requireNonNull(workspace, "workspace must
be non-null");
this.className = Objects.requireNonNull(className, "className must
be non-null");
this.version = Objects.requireNonNull(version, "version must be
non-null");
@@ -281,10 +260,6 @@ public class ConnectPluginPath {
this.hasManifest = hasManifest;
}
- private boolean loadable() {
- return loadable;
- }
-
private boolean compatible() {
return loadable && hasManifest;
}
diff --git
a/tools/src/main/java/org/apache/kafka/tools/MetadataQuorumCommand.java
b/tools/src/main/java/org/apache/kafka/tools/MetadataQuorumCommand.java
index b65655a8c89..4abe443b82b 100644
--- a/tools/src/main/java/org/apache/kafka/tools/MetadataQuorumCommand.java
+++ b/tools/src/main/java/org/apache/kafka/tools/MetadataQuorumCommand.java
@@ -65,7 +65,6 @@ import java.util.stream.Stream;
import static java.lang.String.format;
import static java.lang.String.valueOf;
-import static java.util.Arrays.asList;
/**
* A tool for describing quorum status
@@ -206,7 +205,7 @@ public class MetadataQuorumCommand {
rows.addAll(quorumInfoToRows(leader, quorumInfo.observers().stream(),
"Observer", humanReadable));
ToolsUtils.prettyPrintTable(
- asList("NodeId", "DirectoryId", "LogEndOffset", "Lag",
"LastFetchTimestamp", "LastCaughtUpTimestamp", "Status"),
+ List.of("NodeId", "DirectoryId", "LogEndOffset", "Lag",
"LastFetchTimestamp", "LastCaughtUpTimestamp", "Status"),
rows,
System.out
);
diff --git
a/tools/src/main/java/org/apache/kafka/tools/OAuthCompatibilityTool.java
b/tools/src/main/java/org/apache/kafka/tools/OAuthCompatibilityTool.java
index 40f3100054b..59dbb47daec 100644
--- a/tools/src/main/java/org/apache/kafka/tools/OAuthCompatibilityTool.java
+++ b/tools/src/main/java/org/apache/kafka/tools/OAuthCompatibilityTool.java
@@ -292,14 +292,7 @@ public class OAuthCompatibilityTool {
}
- private static class ConfigHandler {
-
- private final Namespace namespace;
-
-
- private ConfigHandler(Namespace namespace) {
- this.namespace = namespace;
- }
+ private record ConfigHandler(Namespace namespace) {
private Map<String, ?> getConfigs() {
Map<String, Object> m = new HashMap<>();
diff --git a/tools/src/main/java/org/apache/kafka/tools/OffsetsUtils.java
b/tools/src/main/java/org/apache/kafka/tools/OffsetsUtils.java
index e37bf2804d7..269cd53875d 100644
--- a/tools/src/main/java/org/apache/kafka/tools/OffsetsUtils.java
+++ b/tools/src/main/java/org/apache/kafka/tools/OffsetsUtils.java
@@ -500,16 +500,7 @@ public class OffsetsUtils {
public interface LogOffsetResult { }
- public static class LogOffset implements LogOffsetResult {
- final long value;
-
- public LogOffset(long value) {
- this.value = value;
- }
-
- public long value() {
- return value;
- }
+ public record LogOffset(long value) implements LogOffsetResult {
}
public static class Unknown implements LogOffsetResult { }
diff --git
a/tools/src/main/java/org/apache/kafka/tools/PushHttpMetricsReporter.java
b/tools/src/main/java/org/apache/kafka/tools/PushHttpMetricsReporter.java
index 86ea19f0623..9b423b78947 100644
--- a/tools/src/main/java/org/apache/kafka/tools/PushHttpMetricsReporter.java
+++ b/tools/src/main/java/org/apache/kafka/tools/PushHttpMetricsReporter.java
@@ -240,86 +240,19 @@ public class PushHttpMetricsReporter implements
MetricsReporter {
}
}
- private static class MetricsReport {
- private final MetricClientInfo client;
- private final Collection<MetricValue> metrics;
-
- MetricsReport(MetricClientInfo client, Collection<MetricValue>
metrics) {
- this.client = client;
- this.metrics = metrics;
- }
-
- @JsonProperty
- public MetricClientInfo client() {
- return client;
- }
-
- @JsonProperty
- public Collection<MetricValue> metrics() {
- return metrics;
- }
+ private record MetricsReport(@JsonProperty("client") MetricClientInfo
client,
+ @JsonProperty("metrics")
Collection<MetricValue> metrics) {
}
- private static class MetricClientInfo {
- private final String host;
- private final String clientId;
- private final long time;
-
- MetricClientInfo(String host, String clientId, long time) {
- this.host = host;
- this.clientId = clientId;
- this.time = time;
- }
-
- @JsonProperty
- public String host() {
- return host;
- }
-
- @JsonProperty("client_id")
- public String clientId() {
- return clientId;
- }
-
- @JsonProperty
- public long time() {
- return time;
- }
+ private record MetricClientInfo(@JsonProperty("host") String host,
+ @JsonProperty("client_id") String clientId,
+ @JsonProperty("time") long time) {
}
- private static class MetricValue {
-
- private final String name;
- private final String group;
- private final Map<String, String> tags;
- private final Object value;
-
- MetricValue(String name, String group, Map<String, String> tags,
Object value) {
- this.name = name;
- this.group = group;
- this.tags = tags;
- this.value = value;
- }
-
- @JsonProperty
- public String name() {
- return name;
- }
-
- @JsonProperty
- public String group() {
- return group;
- }
-
- @JsonProperty
- public Map<String, String> tags() {
- return tags;
- }
-
- @JsonProperty
- public Object value() {
- return value;
- }
+ private record MetricValue(@JsonProperty("name") String name,
+ @JsonProperty("group") String group,
+ @JsonProperty("tags") Map<String, String> tags,
+ @JsonProperty("value") Object value) {
}
// The signature for getInt changed from returning int to Integer so to
remain compatible with 0.8.2.2 jars
diff --git
a/tools/src/main/java/org/apache/kafka/tools/ReplicaVerificationTool.java
b/tools/src/main/java/org/apache/kafka/tools/ReplicaVerificationTool.java
index c5f1ddcc2d4..937e1cc5955 100644
--- a/tools/src/main/java/org/apache/kafka/tools/ReplicaVerificationTool.java
+++ b/tools/src/main/java/org/apache/kafka/tools/ReplicaVerificationTool.java
@@ -348,18 +348,7 @@ public class ReplicaVerificationTool {
}
}
- private static class MessageInfo {
- final int replicaId;
- final long offset;
- final long nextOffset;
- final long checksum;
-
- MessageInfo(int replicaId, long offset, long nextOffset, long
checksum) {
- this.replicaId = replicaId;
- this.offset = offset;
- this.nextOffset = nextOffset;
- this.checksum = checksum;
- }
+ private record MessageInfo(int replicaId, long offset, long nextOffset,
long checksum) {
}
protected static class ReplicaBuffer {
diff --git
a/tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java
b/tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java
index 9c5323104fe..5b59fbee4a1 100644
--- a/tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java
+++ b/tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java
@@ -63,7 +63,6 @@ import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
-import static java.util.Arrays.asList;
import static net.sourceforge.argparse4j.impl.Arguments.store;
public abstract class TransactionsCommand {
@@ -288,7 +287,7 @@ public abstract class TransactionsCommand {
}
static class DescribeProducersCommand extends TransactionsCommand {
- static final List<String> HEADERS = asList(
+ static final List<String> HEADERS = List.of(
"ProducerId",
"ProducerEpoch",
"LatestCoordinatorEpoch",
@@ -360,7 +359,7 @@ public abstract class TransactionsCommand {
String.valueOf(producerState.currentTransactionStartOffset().getAsLong()) :
"None";
- return asList(
+ return List.of(
String.valueOf(producerState.producerId()),
String.valueOf(producerState.producerEpoch()),
String.valueOf(producerState.coordinatorEpoch().orElse(-1)),
@@ -375,7 +374,7 @@ public abstract class TransactionsCommand {
}
static class DescribeTransactionsCommand extends TransactionsCommand {
- static final List<String> HEADERS = asList(
+ static final List<String> HEADERS = List.of(
"CoordinatorId",
"TransactionalId",
"ProducerId",
@@ -436,7 +435,7 @@ public abstract class TransactionsCommand {
transactionDurationMsColumnValue = "None";
}
- List<String> row = asList(
+ List<String> row = List.of(
String.valueOf(result.coordinatorId()),
transactionalId,
String.valueOf(result.producerId()),
@@ -453,7 +452,7 @@ public abstract class TransactionsCommand {
}
static class ListTransactionsCommand extends TransactionsCommand {
- static final List<String> HEADERS = asList(
+ static final List<String> HEADERS = List.of(
"TransactionalId",
"Coordinator",
"ProducerId",
@@ -510,7 +509,7 @@ public abstract class TransactionsCommand {
Collection<TransactionListing> listings =
brokerListingsEntry.getValue();
for (TransactionListing listing : listings) {
- rows.add(asList(
+ rows.add(List.of(
listing.transactionalId(),
coordinatorIdString,
String.valueOf(listing.producerId()),
@@ -526,7 +525,7 @@ public abstract class TransactionsCommand {
static class FindHangingTransactionsCommand extends TransactionsCommand {
private static final int MAX_BATCH_SIZE = 500;
- static final List<String> HEADERS = asList(
+ static final List<String> HEADERS = List.of(
"Topic",
"Partition",
"ProducerId",
@@ -709,7 +708,7 @@ public abstract class TransactionsCommand {
long transactionDurationMinutes =
TimeUnit.MILLISECONDS.toMinutes(
currentTimeMs - transaction.producerState.lastTimestamp());
- rows.add(asList(
+ rows.add(List.of(
transaction.topicPartition.topic(),
String.valueOf(transaction.topicPartition.partition()),
String.valueOf(transaction.producerState.producerId()),
@@ -848,17 +847,7 @@ public abstract class TransactionsCommand {
return candidateTransactions;
}
- private static class OpenTransaction {
- private final TopicPartition topicPartition;
- private final ProducerState producerState;
-
- private OpenTransaction(
- TopicPartition topicPartition,
- ProducerState producerState
- ) {
- this.topicPartition = topicPartition;
- this.producerState = producerState;
- }
+ private record OpenTransaction(TopicPartition topicPartition,
ProducerState producerState) {
}
private void collectCandidateOpenTransactions(
@@ -1024,7 +1013,7 @@ public abstract class TransactionsCommand {
PrintStream out,
Time time
) throws Exception {
- List<TransactionsCommand> commands = asList(
+ List<TransactionsCommand> commands = List.of(
new ListTransactionsCommand(time),
new DescribeTransactionsCommand(time),
new DescribeProducersCommand(time),
diff --git
a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java
b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java
index 1f29cdd8156..cfdef8f7518 100644
---
a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java
+++
b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java
@@ -347,20 +347,20 @@ public class ConsumerGroupCommand {
for (PartitionAssignmentState consumerAssignment :
consumerAssignments) {
if (verbose) {
System.out.printf(format,
- consumerAssignment.group,
-
consumerAssignment.topic.orElse(MISSING_COLUMN_VALUE),
consumerAssignment.partition.map(Object::toString).orElse(MISSING_COLUMN_VALUE),
-
consumerAssignment.leaderEpoch.map(Object::toString).orElse(MISSING_COLUMN_VALUE),
-
consumerAssignment.offset.map(Object::toString).orElse(MISSING_COLUMN_VALUE),
consumerAssignment.logEndOffset.map(Object::toString).orElse(MISSING_COLUMN_VALUE),
-
consumerAssignment.lag.map(Object::toString).orElse(MISSING_COLUMN_VALUE),
consumerAssignment.consumerId.orElse(MISSING_COLUMN_VALUE),
-
consumerAssignment.host.orElse(MISSING_COLUMN_VALUE),
consumerAssignment.clientId.orElse(MISSING_COLUMN_VALUE)
+ consumerAssignment.group(),
+
consumerAssignment.topic().orElse(MISSING_COLUMN_VALUE),
consumerAssignment.partition().map(Object::toString).orElse(MISSING_COLUMN_VALUE),
+
consumerAssignment.leaderEpoch().map(Object::toString).orElse(MISSING_COLUMN_VALUE),
+
consumerAssignment.offset().map(Object::toString).orElse(MISSING_COLUMN_VALUE),
consumerAssignment.logEndOffset().map(Object::toString).orElse(MISSING_COLUMN_VALUE),
+
consumerAssignment.lag().map(Object::toString).orElse(MISSING_COLUMN_VALUE),
consumerAssignment.consumerId().orElse(MISSING_COLUMN_VALUE),
+
consumerAssignment.host().orElse(MISSING_COLUMN_VALUE),
consumerAssignment.clientId().orElse(MISSING_COLUMN_VALUE)
);
} else {
System.out.printf(format,
- consumerAssignment.group,
-
consumerAssignment.topic.orElse(MISSING_COLUMN_VALUE),
consumerAssignment.partition.map(Object::toString).orElse(MISSING_COLUMN_VALUE),
-
consumerAssignment.offset.map(Object::toString).orElse(MISSING_COLUMN_VALUE),
consumerAssignment.logEndOffset.map(Object::toString).orElse(MISSING_COLUMN_VALUE),
-
consumerAssignment.lag.map(Object::toString).orElse(MISSING_COLUMN_VALUE),
consumerAssignment.consumerId.orElse(MISSING_COLUMN_VALUE),
-
consumerAssignment.host.orElse(MISSING_COLUMN_VALUE),
consumerAssignment.clientId.orElse(MISSING_COLUMN_VALUE)
+ consumerAssignment.group(),
+
consumerAssignment.topic().orElse(MISSING_COLUMN_VALUE),
consumerAssignment.partition().map(Object::toString).orElse(MISSING_COLUMN_VALUE),
+
consumerAssignment.offset().map(Object::toString).orElse(MISSING_COLUMN_VALUE),
consumerAssignment.logEndOffset().map(Object::toString).orElse(MISSING_COLUMN_VALUE),
+
consumerAssignment.lag().map(Object::toString).orElse(MISSING_COLUMN_VALUE),
consumerAssignment.consumerId().orElse(MISSING_COLUMN_VALUE),
+
consumerAssignment.host().orElse(MISSING_COLUMN_VALUE),
consumerAssignment.clientId().orElse(MISSING_COLUMN_VALUE)
);
}
}
@@ -379,10 +379,10 @@ public class ConsumerGroupCommand {
if (assignments.isPresent()) {
Collection<PartitionAssignmentState> consumerAssignments =
assignments.get();
for (PartitionAssignmentState consumerAssignment :
consumerAssignments) {
- maxGroupLen = Math.max(maxGroupLen,
consumerAssignment.group.length());
- maxTopicLen = Math.max(maxTopicLen,
consumerAssignment.topic.orElse(MISSING_COLUMN_VALUE).length());
- maxConsumerIdLen = Math.max(maxConsumerIdLen,
consumerAssignment.consumerId.orElse(MISSING_COLUMN_VALUE).length());
- maxHostLen = Math.max(maxHostLen,
consumerAssignment.host.orElse(MISSING_COLUMN_VALUE).length());
+ maxGroupLen = Math.max(maxGroupLen,
consumerAssignment.group().length());
+ maxTopicLen = Math.max(maxTopicLen,
consumerAssignment.topic().orElse(MISSING_COLUMN_VALUE).length());
+ maxConsumerIdLen = Math.max(maxConsumerIdLen,
consumerAssignment.consumerId().orElse(MISSING_COLUMN_VALUE).length());
+ maxHostLen = Math.max(maxHostLen,
consumerAssignment.host().orElse(MISSING_COLUMN_VALUE).length());
}
}
@@ -408,20 +408,20 @@ public class ConsumerGroupCommand {
// find proper columns width
if (assignments.isPresent()) {
for (MemberAssignmentState memberAssignment :
assignments.get()) {
- maxGroupLen = Math.max(maxGroupLen,
memberAssignment.group.length());
- maxConsumerIdLen = Math.max(maxConsumerIdLen,
memberAssignment.consumerId.length());
- maxGroupInstanceIdLen =
Math.max(maxGroupInstanceIdLen, memberAssignment.groupInstanceId.length());
- maxHostLen = Math.max(maxHostLen,
memberAssignment.host.length());
- maxClientIdLen = Math.max(maxClientIdLen,
memberAssignment.clientId.length());
- includeGroupInstanceId = includeGroupInstanceId ||
!memberAssignment.groupInstanceId.isEmpty();
- String currentAssignment =
memberAssignment.assignment.isEmpty() ?
- MISSING_COLUMN_VALUE :
getAssignmentString(memberAssignment.assignment);
- String targetAssignment =
memberAssignment.targetAssignment.isEmpty() ?
- MISSING_COLUMN_VALUE :
getAssignmentString(memberAssignment.targetAssignment);
+ maxGroupLen = Math.max(maxGroupLen,
memberAssignment.group().length());
+ maxConsumerIdLen = Math.max(maxConsumerIdLen,
memberAssignment.consumerId().length());
+ maxGroupInstanceIdLen =
Math.max(maxGroupInstanceIdLen, memberAssignment.groupInstanceId().length());
+ maxHostLen = Math.max(maxHostLen,
memberAssignment.host().length());
+ maxClientIdLen = Math.max(maxClientIdLen,
memberAssignment.clientId().length());
+ includeGroupInstanceId = includeGroupInstanceId ||
!memberAssignment.groupInstanceId().isEmpty();
+ String currentAssignment =
memberAssignment.assignment().isEmpty() ?
+ MISSING_COLUMN_VALUE :
getAssignmentString(memberAssignment.assignment());
+ String targetAssignment =
memberAssignment.targetAssignment().isEmpty() ?
+ MISSING_COLUMN_VALUE :
getAssignmentString(memberAssignment.targetAssignment());
maxCurrentAssignment =
Math.max(maxCurrentAssignment, currentAssignment.length());
maxTargetAssignment =
Math.max(maxTargetAssignment, targetAssignment.length());
- hasClassicMember = hasClassicMember ||
(memberAssignment.upgraded.isPresent() && !memberAssignment.upgraded.get());
- hasConsumerMember = hasConsumerMember ||
(memberAssignment.upgraded.isPresent() && memberAssignment.upgraded.get());
+ hasClassicMember = hasClassicMember ||
(memberAssignment.upgraded().isPresent() && !memberAssignment.upgraded().get());
+ hasConsumerMember = hasConsumerMember ||
(memberAssignment.upgraded().isPresent() && memberAssignment.upgraded().get());
}
}
}
@@ -465,23 +465,23 @@ public class ConsumerGroupCommand {
) {
for (MemberAssignmentState memberAssignment : memberAssignments) {
if (includeGroupInstanceId) {
- System.out.printf(formatWithGroupInstanceId,
memberAssignment.group, memberAssignment.consumerId,
- memberAssignment.groupInstanceId,
memberAssignment.host, memberAssignment.clientId,
- memberAssignment.numPartitions);
+ System.out.printf(formatWithGroupInstanceId,
memberAssignment.group(), memberAssignment.consumerId(),
+ memberAssignment.groupInstanceId(),
memberAssignment.host(), memberAssignment.clientId(),
+ memberAssignment.numPartitions());
} else {
- System.out.printf(formatWithoutGroupInstanceId,
memberAssignment.group, memberAssignment.consumerId,
- memberAssignment.host, memberAssignment.clientId,
memberAssignment.numPartitions);
+ System.out.printf(formatWithoutGroupInstanceId,
memberAssignment.group(), memberAssignment.consumerId(),
+ memberAssignment.host(), memberAssignment.clientId(),
memberAssignment.numPartitions());
}
if (verbose) {
- String currentEpoch =
memberAssignment.currentEpoch.map(Object::toString).orElse(MISSING_COLUMN_VALUE);
- String currentAssignment =
memberAssignment.assignment.isEmpty() ?
- MISSING_COLUMN_VALUE :
getAssignmentString(memberAssignment.assignment);
- String targetEpoch =
memberAssignment.targetEpoch.map(Object::toString).orElse(MISSING_COLUMN_VALUE);
- String targetAssignment =
memberAssignment.targetAssignment.isEmpty() ?
- MISSING_COLUMN_VALUE :
getAssignmentString(memberAssignment.targetAssignment);
+ String currentEpoch =
memberAssignment.currentEpoch().map(Object::toString).orElse(MISSING_COLUMN_VALUE);
+ String currentAssignment =
memberAssignment.assignment().isEmpty() ?
+ MISSING_COLUMN_VALUE :
getAssignmentString(memberAssignment.assignment());
+ String targetEpoch =
memberAssignment.targetEpoch().map(Object::toString).orElse(MISSING_COLUMN_VALUE);
+ String targetAssignment =
memberAssignment.targetAssignment().isEmpty() ?
+ MISSING_COLUMN_VALUE :
getAssignmentString(memberAssignment.targetAssignment());
if (hasMigrationMember) {
System.out.printf(formatWithUpgrade, currentEpoch,
currentAssignment, targetEpoch, targetAssignment,
-
memberAssignment.upgraded.map(Object::toString).orElse(MISSING_COLUMN_VALUE));
+
memberAssignment.upgraded().map(Object::toString).orElse(MISSING_COLUMN_VALUE));
} else {
System.out.printf(formatWithoutUpgrade, currentEpoch,
currentAssignment, targetEpoch, targetAssignment);
}
@@ -511,23 +511,23 @@ public class ConsumerGroupCommand {
private void printStates(Map<String, GroupInformation> states, boolean
verbose) {
states.forEach((groupId, state) -> {
- if (shouldPrintMemberState(groupId,
Optional.of(state.groupState), Optional.of(1))) {
- String coordinator = state.coordinator.host() + ":" +
state.coordinator.port() + " (" + state.coordinator.idString() + ")";
+ if (shouldPrintMemberState(groupId,
Optional.of(state.groupState()), Optional.of(1))) {
+ String coordinator = state.coordinator().host() + ":" +
state.coordinator().port() + " (" + state.coordinator().idString() + ")";
int coordinatorColLen = Math.max(25, coordinator.length());
- int groupColLen = Math.max(15, state.group.length());
+ int groupColLen = Math.max(15, state.group().length());
- String assignmentStrategy =
state.assignmentStrategy.isEmpty() ? MISSING_COLUMN_VALUE :
state.assignmentStrategy;
+ String assignmentStrategy =
state.assignmentStrategy().isEmpty() ? MISSING_COLUMN_VALUE :
state.assignmentStrategy();
if (verbose) {
String format = "\n%" + -groupColLen + "s %" +
-coordinatorColLen + "s %-20s %-20s %-15s %-25s %s";
System.out.printf(format, "GROUP", "COORDINATOR (ID)",
"ASSIGNMENT-STRATEGY", "STATE",
"GROUP-EPOCH", "TARGET-ASSIGNMENT-EPOCH",
"#MEMBERS");
- System.out.printf(format, state.group, coordinator,
assignmentStrategy, state.groupState,
-
state.groupEpoch.map(Object::toString).orElse(MISSING_COLUMN_VALUE),
state.targetAssignmentEpoch.map(Object::toString).orElse(MISSING_COLUMN_VALUE),
state.numMembers);
+ System.out.printf(format, state.group(), coordinator,
assignmentStrategy, state.groupState(),
+
state.groupEpoch().map(Object::toString).orElse(MISSING_COLUMN_VALUE),
state.targetAssignmentEpoch().map(Object::toString).orElse(MISSING_COLUMN_VALUE),
state.numMembers());
} else {
String format = "\n%" + -groupColLen + "s %" +
-coordinatorColLen + "s %-20s %-20s %s";
System.out.printf(format, "GROUP", "COORDINATOR (ID)",
"ASSIGNMENT-STRATEGY", "STATE", "#MEMBERS");
- System.out.printf(format, state.group, coordinator,
assignmentStrategy, state.groupState, state.numMembers);
+ System.out.printf(format, state.group(), coordinator,
assignmentStrategy, state.groupState(), state.numMembers());
}
System.out.println();
}
@@ -623,8 +623,8 @@ public class ConsumerGroupCommand {
// concat the data and then sort them
return Stream.concat(existLeaderAssignments.stream(),
noneLeaderAssignments.stream())
.sorted(Comparator.<PartitionAssignmentState,
String>comparing(
- state -> state.topic.orElse(""), String::compareTo)
- .thenComparingInt(state ->
state.partition.orElse(-1)))
+ state -> state.topic().orElse(""),
String::compareTo)
+ .thenComparingInt(state ->
state.partition().orElse(-1)))
.collect(Collectors.toList());
}
diff --git
a/tools/src/main/java/org/apache/kafka/tools/consumer/group/GroupInformation.java
b/tools/src/main/java/org/apache/kafka/tools/consumer/group/GroupInformation.java
index 9fde352cdda..d313390b6a9 100644
---
a/tools/src/main/java/org/apache/kafka/tools/consumer/group/GroupInformation.java
+++
b/tools/src/main/java/org/apache/kafka/tools/consumer/group/GroupInformation.java
@@ -21,30 +21,6 @@ import org.apache.kafka.common.Node;
import java.util.Optional;
-class GroupInformation {
- final String group;
- final Node coordinator;
- final String assignmentStrategy;
- final GroupState groupState;
- final int numMembers;
- final Optional<Integer> groupEpoch;
- final Optional<Integer> targetAssignmentEpoch;
-
- GroupInformation(
- String group,
- Node coordinator,
- String assignmentStrategy,
- GroupState groupState,
- int numMembers,
- Optional<Integer> groupEpoch,
- Optional<Integer> targetAssignmentEpoch
- ) {
- this.group = group;
- this.coordinator = coordinator;
- this.assignmentStrategy = assignmentStrategy;
- this.groupState = groupState;
- this.numMembers = numMembers;
- this.groupEpoch = groupEpoch;
- this.targetAssignmentEpoch = targetAssignmentEpoch;
- }
+record GroupInformation(String group, Node coordinator, String
assignmentStrategy, GroupState groupState,
+ int numMembers, Optional<Integer> groupEpoch,
Optional<Integer> targetAssignmentEpoch) {
}
diff --git
a/tools/src/main/java/org/apache/kafka/tools/consumer/group/MemberAssignmentState.java
b/tools/src/main/java/org/apache/kafka/tools/consumer/group/MemberAssignmentState.java
index 420e640419e..56feb8b029f 100644
---
a/tools/src/main/java/org/apache/kafka/tools/consumer/group/MemberAssignmentState.java
+++
b/tools/src/main/java/org/apache/kafka/tools/consumer/group/MemberAssignmentState.java
@@ -21,42 +21,8 @@ import org.apache.kafka.common.TopicPartition;
import java.util.List;
import java.util.Optional;
-class MemberAssignmentState {
- final String group;
- final String consumerId;
- final String host;
- final String clientId;
- final String groupInstanceId;
- final int numPartitions;
- final List<TopicPartition> assignment;
- final List<TopicPartition> targetAssignment;
- final Optional<Integer> currentEpoch;
- final Optional<Integer> targetEpoch;
- final Optional<Boolean> upgraded;
-
- MemberAssignmentState(
- String group,
- String consumerId,
- String host,
- String clientId,
- String groupInstanceId,
- int numPartitions,
- List<TopicPartition> assignment,
- List<TopicPartition> targetAssignment,
- Optional<Integer> currentEpoch,
- Optional<Integer> targetEpoch,
- Optional<Boolean> upgraded
- ) {
- this.group = group;
- this.consumerId = consumerId;
- this.host = host;
- this.clientId = clientId;
- this.groupInstanceId = groupInstanceId;
- this.numPartitions = numPartitions;
- this.assignment = assignment;
- this.targetAssignment = targetAssignment;
- this.currentEpoch = currentEpoch;
- this.targetEpoch = targetEpoch;
- this.upgraded = upgraded;
- }
+record MemberAssignmentState(String group, String consumerId, String host,
String clientId, String groupInstanceId,
+ int numPartitions, List<TopicPartition>
assignment, List<TopicPartition> targetAssignment,
+ Optional<Integer> currentEpoch, Optional<Integer>
targetEpoch,
+ Optional<Boolean> upgraded) {
}
diff --git
a/tools/src/main/java/org/apache/kafka/tools/consumer/group/PartitionAssignmentState.java
b/tools/src/main/java/org/apache/kafka/tools/consumer/group/PartitionAssignmentState.java
index 53f5c391100..d42377ca029 100644
---
a/tools/src/main/java/org/apache/kafka/tools/consumer/group/PartitionAssignmentState.java
+++
b/tools/src/main/java/org/apache/kafka/tools/consumer/group/PartitionAssignmentState.java
@@ -20,42 +20,8 @@ import org.apache.kafka.common.Node;
import java.util.Optional;
-class PartitionAssignmentState {
- final String group;
- final Optional<Node> coordinator;
- final Optional<String> topic;
- final Optional<Integer> partition;
- final Optional<Long> offset;
- final Optional<Long> lag;
- final Optional<String> consumerId;
- final Optional<String> host;
- final Optional<String> clientId;
- final Optional<Long> logEndOffset;
- final Optional<Integer> leaderEpoch;
-
- PartitionAssignmentState(
- String group,
- Optional<Node> coordinator,
- Optional<String> topic,
- Optional<Integer> partition,
- Optional<Long> offset,
- Optional<Long> lag,
- Optional<String> consumerId,
- Optional<String> host,
- Optional<String> clientId,
- Optional<Long> logEndOffset,
- Optional<Integer> leaderEpoch
- ) {
- this.group = group;
- this.coordinator = coordinator;
- this.topic = topic;
- this.partition = partition;
- this.offset = offset;
- this.lag = lag;
- this.consumerId = consumerId;
- this.host = host;
- this.clientId = clientId;
- this.logEndOffset = logEndOffset;
- this.leaderEpoch = leaderEpoch;
- }
+record PartitionAssignmentState(String group, Optional<Node> coordinator,
Optional<String> topic,
+ Optional<Integer> partition, Optional<Long>
offset, Optional<Long> lag,
+ Optional<String> consumerId, Optional<String>
host, Optional<String> clientId,
+ Optional<Long> logEndOffset, Optional<Integer>
leaderEpoch) {
}
diff --git
a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java
b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java
index 95723665032..df4353c467a 100644
---
a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java
+++
b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java
@@ -656,25 +656,7 @@ public class ShareGroupCommand {
}
}
- static class SharePartitionOffsetInformation {
- final String group;
- final String topic;
- final int partition;
- final Optional<Long> offset;
- final Optional<Integer> leaderEpoch;
-
- SharePartitionOffsetInformation(
- String group,
- String topic,
- int partition,
- Optional<Long> offset,
- Optional<Integer> leaderEpoch
- ) {
- this.group = group;
- this.topic = topic;
- this.partition = partition;
- this.offset = offset;
- this.leaderEpoch = leaderEpoch;
- }
+ record SharePartitionOffsetInformation(String group, String topic, int
partition, Optional<Long> offset,
+ Optional<Integer> leaderEpoch) {
}
}
diff --git
a/tools/src/main/java/org/apache/kafka/tools/reassign/ActiveMoveState.java
b/tools/src/main/java/org/apache/kafka/tools/reassign/ActiveMoveState.java
index 842d46ec587..58e2cf9b80f 100644
--- a/tools/src/main/java/org/apache/kafka/tools/reassign/ActiveMoveState.java
+++ b/tools/src/main/java/org/apache/kafka/tools/reassign/ActiveMoveState.java
@@ -17,44 +17,15 @@
package org.apache.kafka.tools.reassign;
-import java.util.Objects;
-
/**
* A replica log directory move state where the move is in progress.
+ * @param currentLogDir The current log directory.
+ * @param futureLogDir The log directory that the replica is moving to.
+ * @param targetLogDir The log directory that we wanted the replica to move
to.
*/
-final class ActiveMoveState implements LogDirMoveState {
- public final String currentLogDir;
-
- public final String targetLogDir;
-
- public final String futureLogDir;
-
- /**
- * @param currentLogDir The current log directory.
- * @param futureLogDir The log directory that the replica is moving
to.
- * @param targetLogDir The log directory that we wanted the replica
to move to.
- */
- public ActiveMoveState(String currentLogDir, String targetLogDir, String
futureLogDir) {
- this.currentLogDir = currentLogDir;
- this.targetLogDir = targetLogDir;
- this.futureLogDir = futureLogDir;
- }
-
+record ActiveMoveState(String currentLogDir, String targetLogDir, String
futureLogDir) implements LogDirMoveState {
@Override
public boolean done() {
return false;
}
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
- ActiveMoveState that = (ActiveMoveState) o;
- return Objects.equals(currentLogDir, that.currentLogDir) &&
Objects.equals(targetLogDir, that.targetLogDir) && Objects.equals(futureLogDir,
that.futureLogDir);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(currentLogDir, targetLogDir, futureLogDir);
- }
}
diff --git
a/tools/src/main/java/org/apache/kafka/tools/reassign/CancelledMoveState.java
b/tools/src/main/java/org/apache/kafka/tools/reassign/CancelledMoveState.java
index f405eedd403..c7fa61af3d9 100644
---
a/tools/src/main/java/org/apache/kafka/tools/reassign/CancelledMoveState.java
+++
b/tools/src/main/java/org/apache/kafka/tools/reassign/CancelledMoveState.java
@@ -17,41 +17,15 @@
package org.apache.kafka.tools.reassign;
-import java.util.Objects;
-
/**
* A replica log directory move state where there is no move in progress, but
we did not
* reach the target log directory.
+ * @param currentLogDir The current log directory.
+ * @param targetLogDir The log directory that we wanted the replica to move
to.
*/
-final class CancelledMoveState implements LogDirMoveState {
- public final String currentLogDir;
-
- public final String targetLogDir;
-
- /**
- * @param currentLogDir The current log directory.
- * @param targetLogDir The log directory that we wanted the replica
to move to.
- */
- public CancelledMoveState(String currentLogDir, String targetLogDir) {
- this.currentLogDir = currentLogDir;
- this.targetLogDir = targetLogDir;
- }
-
+record CancelledMoveState(String currentLogDir, String targetLogDir)
implements LogDirMoveState {
@Override
public boolean done() {
return true;
}
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
- CancelledMoveState that = (CancelledMoveState) o;
- return Objects.equals(currentLogDir, that.currentLogDir) &&
Objects.equals(targetLogDir, that.targetLogDir);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(currentLogDir, targetLogDir);
- }
}
diff --git
a/tools/src/main/java/org/apache/kafka/tools/reassign/CompletedMoveState.java
b/tools/src/main/java/org/apache/kafka/tools/reassign/CompletedMoveState.java
index df5fb890148..28b017b7cdc 100644
---
a/tools/src/main/java/org/apache/kafka/tools/reassign/CompletedMoveState.java
+++
b/tools/src/main/java/org/apache/kafka/tools/reassign/CompletedMoveState.java
@@ -17,36 +17,13 @@
package org.apache.kafka.tools.reassign;
-import java.util.Objects;
-
/**
* The completed replica log directory move state.
+ * @param targetLogDir The log directory that we wanted the replica to move to.
*/
-final class CompletedMoveState implements LogDirMoveState {
- public final String targetLogDir;
-
- /**
- * @param targetLogDir The log directory that we wanted the replica
to move to.
- */
- public CompletedMoveState(String targetLogDir) {
- this.targetLogDir = targetLogDir;
- }
-
+record CompletedMoveState(String targetLogDir) implements LogDirMoveState {
@Override
public boolean done() {
return true;
}
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
- CompletedMoveState that = (CompletedMoveState) o;
- return Objects.equals(targetLogDir, that.targetLogDir);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(targetLogDir);
- }
}
diff --git
a/tools/src/main/java/org/apache/kafka/tools/reassign/MissingLogDirMoveState.java
b/tools/src/main/java/org/apache/kafka/tools/reassign/MissingLogDirMoveState.java
index eb3f592841c..44982a8b712 100644
---
a/tools/src/main/java/org/apache/kafka/tools/reassign/MissingLogDirMoveState.java
+++
b/tools/src/main/java/org/apache/kafka/tools/reassign/MissingLogDirMoveState.java
@@ -17,36 +17,13 @@
package org.apache.kafka.tools.reassign;
-import java.util.Objects;
-
/**
* A replica log directory move state where the source replica is missing.
+ * @param targetLogDir The log directory that we wanted the replica to move to.
*/
-final class MissingLogDirMoveState implements LogDirMoveState {
- public final String targetLogDir;
-
- /**
- * @param targetLogDir The log directory that we wanted the replica
to move to.
- */
- public MissingLogDirMoveState(String targetLogDir) {
- this.targetLogDir = targetLogDir;
- }
-
+record MissingLogDirMoveState(String targetLogDir) implements LogDirMoveState {
@Override
public boolean done() {
return false;
}
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
- MissingLogDirMoveState that = (MissingLogDirMoveState) o;
- return Objects.equals(targetLogDir, that.targetLogDir);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(targetLogDir);
- }
}
diff --git
a/tools/src/main/java/org/apache/kafka/tools/reassign/MissingReplicaMoveState.java
b/tools/src/main/java/org/apache/kafka/tools/reassign/MissingReplicaMoveState.java
index eda9c22b829..b802275ceeb 100644
---
a/tools/src/main/java/org/apache/kafka/tools/reassign/MissingReplicaMoveState.java
+++
b/tools/src/main/java/org/apache/kafka/tools/reassign/MissingReplicaMoveState.java
@@ -17,36 +17,13 @@
package org.apache.kafka.tools.reassign;
-import java.util.Objects;
-
/**
* A replica log directory move state where the source log directory is
missing.
+ * @param targetLogDir The log directory that we wanted the replica to move to.
*/
-final class MissingReplicaMoveState implements LogDirMoveState {
- public final String targetLogDir;
-
- /**
- * @param targetLogDir The log directory that we wanted the replica
to move to.
- */
- public MissingReplicaMoveState(String targetLogDir) {
- this.targetLogDir = targetLogDir;
- }
-
+record MissingReplicaMoveState(String targetLogDir) implements LogDirMoveState
{
@Override
public boolean done() {
return false;
}
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
- MissingReplicaMoveState that = (MissingReplicaMoveState) o;
- return Objects.equals(targetLogDir, that.targetLogDir);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(targetLogDir);
- }
}
diff --git
a/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java
b/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java
index 591c127ae2f..4628c34fe18 100644
---
a/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java
+++
b/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java
@@ -467,8 +467,8 @@ public class ReassignPartitionsCommand {
bld.add("Partition " + replica.topic() + "-" +
replica.partition() + " cannot be found " +
"in any live log directory on broker " +
replica.brokerId() + ".");
} else if (state instanceof ActiveMoveState) {
- String targetLogDir = ((ActiveMoveState) state).targetLogDir;
- String futureLogDir = ((ActiveMoveState) state).futureLogDir;
+ String targetLogDir = ((ActiveMoveState) state).targetLogDir();
+ String futureLogDir = ((ActiveMoveState) state).futureLogDir();
if (targetLogDir.equals(futureLogDir)) {
bld.add("Reassignment of replica " + replica + " is still
in progress.");
} else {
@@ -477,8 +477,8 @@ public class ReassignPartitionsCommand {
"instead of " + targetLogDir + ".");
}
} else if (state instanceof CancelledMoveState) {
- String targetLogDir = ((CancelledMoveState)
state).targetLogDir;
- String currentLogDir = ((CancelledMoveState)
state).currentLogDir;
+ String targetLogDir = ((CancelledMoveState)
state).targetLogDir();
+ String currentLogDir = ((CancelledMoveState)
state).currentLogDir();
bld.add("Partition " + replica.topic() + "-" +
replica.partition() + " on broker " +
replica.brokerId() + " is not being moved from log dir " +
currentLogDir + " to " +
targetLogDir + ".");
@@ -1292,7 +1292,7 @@ public class ReassignPartitionsCommand {
Map<TopicPartitionReplica, String> curMovingParts = new HashMap<>();
findLogDirMoveStates(adminClient, targetReplicas).forEach((part,
moveState) -> {
if (moveState instanceof ActiveMoveState)
- curMovingParts.put(part, ((ActiveMoveState)
moveState).currentLogDir);
+ curMovingParts.put(part, ((ActiveMoveState)
moveState).currentLogDir());
});
if (curMovingParts.isEmpty()) {
System.out.print("None of the specified partition moves are
active.");
diff --git
a/tools/src/test/java/org/apache/kafka/tools/ConnectPluginPathTest.java
b/tools/src/test/java/org/apache/kafka/tools/ConnectPluginPathTest.java
index 2b8666d57bc..d10cf7b2e45 100644
--- a/tools/src/test/java/org/apache/kafka/tools/ConnectPluginPathTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/ConnectPluginPathTest.java
@@ -444,13 +444,7 @@ public class ConnectPluginPathTest {
MULTI_JAR
}
- private static class PluginLocation {
- private final Path path;
-
- private PluginLocation(Path path) {
- this.path = path;
- }
-
+ private record PluginLocation(Path path) {
@Override
public String toString() {
return path.toString();
@@ -504,15 +498,7 @@ public class ConnectPluginPathTest {
}
}
- private static class PluginPathElement {
- private final Path root;
- private final List<PluginLocation> locations;
-
- private PluginPathElement(Path root, List<PluginLocation> locations) {
- this.root = root;
- this.locations = locations;
- }
-
+ private record PluginPathElement(Path root, List<PluginLocation>
locations) {
@Override
public String toString() {
return root.toString();
@@ -535,14 +521,7 @@ public class ConnectPluginPathTest {
return new PluginPathElement(path, locations);
}
- private static class WorkerConfig {
- private final Path configFile;
- private final List<PluginPathElement> pluginPathElements;
-
- private WorkerConfig(Path configFile, List<PluginPathElement>
pluginPathElements) {
- this.configFile = configFile;
- this.pluginPathElements = pluginPathElements;
- }
+ private record WorkerConfig(Path configFile, List<PluginPathElement>
pluginPathElements) {
@Override
public String toString() {
diff --git
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupServiceTest.java
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupServiceTest.java
index d5c1b035a44..236d1ce51cc 100644
---
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupServiceTest.java
+++
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupServiceTest.java
@@ -183,13 +183,13 @@ public class ConsumerGroupServiceTest {
Map<TopicPartition, Optional<Long>> returnedOffsets =
assignments.map(results ->
results.stream().collect(Collectors.toMap(
- assignment -> new TopicPartition(assignment.topic.get(),
assignment.partition.get()),
- assignment -> assignment.offset))
+ assignment -> new TopicPartition(assignment.topic().get(),
assignment.partition().get()),
+ assignment -> assignment.offset()))
).orElse(Map.of());
Map<TopicPartition, Optional<Integer>> returnedLeaderEpoch =
assignments.map(results ->
results.stream().collect(Collectors.toMap(
- assignment -> new TopicPartition(assignment.topic.get(),
assignment.partition.get()),
- assignment -> assignment.leaderEpoch))
+ assignment -> new TopicPartition(assignment.topic().get(),
assignment.partition().get()),
+ assignment -> assignment.leaderEpoch()))
).orElse(Map.of());
Map<TopicPartition, Optional<Long>> expectedOffsets = Map.of(
diff --git
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteConsumerGroupsTest.java
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteConsumerGroupsTest.java
index d30c8081440..2866027e232 100644
---
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteConsumerGroupsTest.java
+++
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteConsumerGroupsTest.java
@@ -322,7 +322,7 @@ public class DeleteConsumerGroupsTest {
}
private boolean checkGroupState(ConsumerGroupCommand.ConsumerGroupService
service, String groupId, GroupState state) throws Exception {
- return Objects.equals(service.collectGroupState(groupId).groupState,
state);
+ return Objects.equals(service.collectGroupState(groupId).groupState(),
state);
}
private ConsumerGroupCommand.ConsumerGroupService
getConsumerGroupService(String[] args) {
diff --git
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/DescribeConsumerGroupTest.java
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/DescribeConsumerGroupTest.java
index b15b4fe9d45..9e5072576f4 100644
---
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/DescribeConsumerGroupTest.java
+++
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/DescribeConsumerGroupTest.java
@@ -462,7 +462,7 @@ public class DescribeConsumerGroupTest {
Optional<GroupState> state = groupOffsets.getKey();
Optional<Collection<PartitionAssignmentState>> assignments
= groupOffsets.getValue();
- Predicate<PartitionAssignmentState> isGrp = s ->
Objects.equals(s.group, group);
+ Predicate<PartitionAssignmentState> isGrp = s ->
Objects.equals(s.group(), group);
boolean res = state.map(s ->
s.equals(GroupState.STABLE)).orElse(false) &&
assignments.isPresent() &&
@@ -477,9 +477,9 @@ public class DescribeConsumerGroupTest {
PartitionAssignmentState partitionState =
maybePartitionState.get();
- return !partitionState.consumerId.map(s0 ->
s0.trim().equals(ConsumerGroupCommand.MISSING_COLUMN_VALUE)).orElse(false) &&
- !partitionState.clientId.map(s0 ->
s0.trim().equals(ConsumerGroupCommand.MISSING_COLUMN_VALUE)).orElse(false) &&
- !partitionState.host.map(h ->
h.trim().equals(ConsumerGroupCommand.MISSING_COLUMN_VALUE)).orElse(false);
+ return !partitionState.consumerId().map(s0 ->
s0.trim().equals(ConsumerGroupCommand.MISSING_COLUMN_VALUE)).orElse(false) &&
+ !partitionState.clientId().map(s0 ->
s0.trim().equals(ConsumerGroupCommand.MISSING_COLUMN_VALUE)).orElse(false) &&
+ !partitionState.host().map(h ->
h.trim().equals(ConsumerGroupCommand.MISSING_COLUMN_VALUE)).orElse(false);
}, "Expected a 'Stable' group status, rows and valid values
for consumer id / client id / host columns in describe results for group " +
group + ".");
}
}
@@ -506,7 +506,7 @@ public class DescribeConsumerGroupTest {
Entry<Optional<GroupState>,
Optional<Collection<MemberAssignmentState>>> res =
service.collectGroupMembers(group);
assertTrue(res.getValue().isPresent());
- assertTrue(res.getValue().get().size() == 1 &&
res.getValue().get().iterator().next().assignment.size() == 1,
+ assertTrue(res.getValue().get().size() == 1 &&
res.getValue().get().iterator().next().assignment().size() == 1,
"Expected a topic partition assigned to the single
group member for group " + group);
}
}
@@ -526,10 +526,10 @@ public class DescribeConsumerGroupTest {
) {
TestUtils.waitForCondition(() -> {
GroupInformation state = service.collectGroupState(group);
- return Objects.equals(state.groupState, GroupState.STABLE)
&&
- state.numMembers == 1 &&
- state.coordinator != null &&
-
clusterInstance.brokerIds().contains(state.coordinator.id());
+ return Objects.equals(state.groupState(),
GroupState.STABLE) &&
+ state.numMembers() == 1 &&
+ state.coordinator() != null &&
+
clusterInstance.brokerIds().contains(state.coordinator().id());
}, "Expected a 'Stable' group status, with one member for
group " + group + ".");
}
}
@@ -558,11 +558,11 @@ public class DescribeConsumerGroupTest {
try (ConsumerGroupCommand.ConsumerGroupService service =
consumerGroupService(new String[]{"--bootstrap-server",
clusterInstance.bootstrapServers(), "--describe", "--group", group})) {
TestUtils.waitForCondition(() -> {
GroupInformation state =
service.collectGroupState(group);
- return Objects.equals(state.groupState,
GroupState.STABLE) &&
- state.numMembers == 1 &&
- Objects.equals(state.assignmentStrategy,
expectedName) &&
- state.coordinator != null &&
-
clusterInstance.brokerIds().contains(state.coordinator.id());
+ return Objects.equals(state.groupState(),
GroupState.STABLE) &&
+ state.numMembers() == 1 &&
+ Objects.equals(state.assignmentStrategy(),
expectedName) &&
+ state.coordinator() != null &&
+
clusterInstance.brokerIds().contains(state.coordinator().id());
}, "Expected a 'Stable' group status, with one member and
" + expectedName + " assignment strategy for group " + group + ".");
}
} finally {
@@ -619,7 +619,7 @@ public class DescribeConsumerGroupTest {
TestUtils.waitForCondition(() -> {
Entry<Optional<GroupState>,
Optional<Collection<PartitionAssignmentState>>> res =
service.collectGroupOffsets(group);
return res.getKey().map(s ->
s.equals(GroupState.STABLE)).orElse(false)
- && res.getValue().map(c ->
c.stream().anyMatch(assignment -> Objects.equals(assignment.group, group) &&
assignment.offset.isPresent())).orElse(false);
+ && res.getValue().map(c ->
c.stream().anyMatch(assignment -> Objects.equals(assignment.group(), group) &&
assignment.offset().isPresent())).orElse(false);
}, "Expected the group to initially become stable, and to find
group in assignments after initial offset commit.");
// stop the consumer so the group has no active member anymore
@@ -629,13 +629,13 @@ public class DescribeConsumerGroupTest {
Entry<Optional<GroupState>,
Optional<Collection<PartitionAssignmentState>>> offsets =
service.collectGroupOffsets(group);
Optional<GroupState> state = offsets.getKey();
Optional<Collection<PartitionAssignmentState>> assignments
= offsets.getValue();
- List<PartitionAssignmentState> testGroupAssignments =
assignments.get().stream().filter(a -> Objects.equals(a.group, group)).toList();
+ List<PartitionAssignmentState> testGroupAssignments =
assignments.get().stream().filter(a -> Objects.equals(a.group(),
group)).toList();
PartitionAssignmentState assignment =
testGroupAssignments.get(0);
return state.map(s ->
s.equals(GroupState.EMPTY)).orElse(false) &&
testGroupAssignments.size() == 1 &&
- assignment.consumerId.map(c ->
c.trim().equals(ConsumerGroupCommand.MISSING_COLUMN_VALUE)).orElse(false) && //
the member should be gone
- assignment.clientId.map(c ->
c.trim().equals(ConsumerGroupCommand.MISSING_COLUMN_VALUE)).orElse(false) &&
- assignment.host.map(c ->
c.trim().equals(ConsumerGroupCommand.MISSING_COLUMN_VALUE)).orElse(false);
+ assignment.consumerId().map(c ->
c.trim().equals(ConsumerGroupCommand.MISSING_COLUMN_VALUE)).orElse(false) && //
the member should be gone
+ assignment.clientId().map(c ->
c.trim().equals(ConsumerGroupCommand.MISSING_COLUMN_VALUE)).orElse(false) &&
+ assignment.host().map(c ->
c.trim().equals(ConsumerGroupCommand.MISSING_COLUMN_VALUE)).orElse(false);
}, "failed to collect group offsets");
}
}
@@ -656,7 +656,7 @@ public class DescribeConsumerGroupTest {
TestUtils.waitForCondition(() -> {
Entry<Optional<GroupState>,
Optional<Collection<MemberAssignmentState>>> res =
service.collectGroupMembers(group);
return res.getKey().map(s ->
s.equals(GroupState.STABLE)).orElse(false)
- && res.getValue().map(c -> c.stream().anyMatch(m
-> Objects.equals(m.group, group))).orElse(false);
+ && res.getValue().map(c -> c.stream().anyMatch(m
-> Objects.equals(m.group(), group))).orElse(false);
}, "Expected the group to initially become stable, and to find
group in assignments after initial offset commit.");
// stop the consumer so the group has no active member anymore
@@ -684,10 +684,10 @@ public class DescribeConsumerGroupTest {
) {
TestUtils.waitForCondition(() -> {
GroupInformation state = service.collectGroupState(group);
- return Objects.equals(state.groupState, GroupState.STABLE)
&&
- state.numMembers == 1 &&
- state.coordinator != null &&
-
clusterInstance.brokerIds().contains(state.coordinator.id());
+ return Objects.equals(state.groupState(),
GroupState.STABLE) &&
+ state.numMembers() == 1 &&
+ state.coordinator() != null &&
+
clusterInstance.brokerIds().contains(state.coordinator().id());
}, "Expected the group to initially become stable, and have a
single member.");
// stop the consumer so the group has no active member anymore
@@ -695,7 +695,7 @@ public class DescribeConsumerGroupTest {
TestUtils.waitForCondition(() -> {
GroupInformation state = service.collectGroupState(group);
- return Objects.equals(state.groupState, GroupState.EMPTY)
&& state.numMembers == 0;
+ return Objects.equals(state.groupState(),
GroupState.EMPTY) && state.numMembers() == 0;
}, "Expected the group to become empty after the only member
leaving.");
}
}
@@ -744,8 +744,8 @@ public class DescribeConsumerGroupTest {
Entry<Optional<GroupState>,
Optional<Collection<PartitionAssignmentState>>> res =
service.collectGroupOffsets(group);
return res.getKey().map(s ->
s.equals(GroupState.STABLE)).isPresent() &&
res.getValue().isPresent() &&
- res.getValue().get().stream().filter(s ->
Objects.equals(s.group, group)).count() == 1 &&
- res.getValue().get().stream().filter(x ->
Objects.equals(x.group, group) && x.partition.isPresent()).count() == 1;
+ res.getValue().get().stream().filter(s ->
Objects.equals(s.group(), group)).count() == 1 &&
+ res.getValue().get().stream().filter(x ->
Objects.equals(x.group(), group) && x.partition().isPresent()).count() == 1;
}, "Expected rows for consumers with no assigned partitions in
describe group results");
}
}
@@ -767,15 +767,15 @@ public class DescribeConsumerGroupTest {
Entry<Optional<GroupState>,
Optional<Collection<MemberAssignmentState>>> res =
service.collectGroupMembers(group);
return res.getKey().map(s ->
s.equals(GroupState.STABLE)).orElse(false) &&
res.getValue().isPresent() &&
- res.getValue().get().stream().filter(s ->
Objects.equals(s.group, group)).count() == 2 &&
- res.getValue().get().stream().filter(x ->
Objects.equals(x.group, group) && x.numPartitions == 1).count() == 1 &&
- res.getValue().get().stream().filter(x ->
Objects.equals(x.group, group) && x.numPartitions == 0).count() == 1 &&
- res.getValue().get().stream().anyMatch(s ->
!s.assignment.isEmpty());
+ res.getValue().get().stream().filter(s ->
Objects.equals(s.group(), group)).count() == 2 &&
+ res.getValue().get().stream().filter(x ->
Objects.equals(x.group(), group) && x.numPartitions() == 1).count() == 1 &&
+ res.getValue().get().stream().filter(x ->
Objects.equals(x.group(), group) && x.numPartitions() == 0).count() == 1 &&
+ res.getValue().get().stream().anyMatch(s ->
!s.assignment().isEmpty());
}, "Expected rows for consumers with no assigned partitions in
describe group results");
Entry<Optional<GroupState>,
Optional<Collection<MemberAssignmentState>>> res =
service.collectGroupMembers(group);
assertTrue(res.getKey().map(s ->
s.equals(GroupState.STABLE)).orElse(false)
- && res.getValue().map(c ->
c.stream().anyMatch(s -> !s.assignment.isEmpty())).orElse(false),
+ && res.getValue().map(c ->
c.stream().anyMatch(s -> !s.assignment().isEmpty())).orElse(false),
"Expected additional columns in verbose version of
describe members");
}
}
@@ -795,7 +795,7 @@ public class DescribeConsumerGroupTest {
) {
TestUtils.waitForCondition(() -> {
GroupInformation state = service.collectGroupState(group);
- return Objects.equals(state.groupState, GroupState.STABLE)
&& state.numMembers == 2;
+ return Objects.equals(state.groupState(),
GroupState.STABLE) && state.numMembers() == 2;
}, "Expected two consumers in describe group results");
}
}
@@ -844,9 +844,9 @@ public class DescribeConsumerGroupTest {
Entry<Optional<GroupState>,
Optional<Collection<PartitionAssignmentState>>> res =
service.collectGroupOffsets(group);
return res.getKey().map(s ->
s.equals(GroupState.STABLE)).orElse(false) &&
res.getValue().isPresent() &&
- res.getValue().get().stream().filter(s ->
Objects.equals(s.group, group)).count() == 2 &&
- res.getValue().get().stream().filter(x ->
Objects.equals(x.group, group) && x.partition.isPresent()).count() == 2 &&
- res.getValue().get().stream().noneMatch(x ->
Objects.equals(x.group, group) && x.partition.isEmpty());
+ res.getValue().get().stream().filter(s ->
Objects.equals(s.group(), group)).count() == 2 &&
+ res.getValue().get().stream().filter(x ->
Objects.equals(x.group(), group) && x.partition().isPresent()).count() == 2 &&
+ res.getValue().get().stream().noneMatch(x ->
Objects.equals(x.group(), group) && x.partition().isEmpty());
}, "Expected two rows (one row per consumer) in describe group
results.");
}
}
@@ -868,13 +868,13 @@ public class DescribeConsumerGroupTest {
Entry<Optional<GroupState>,
Optional<Collection<MemberAssignmentState>>> res =
service.collectGroupMembers(group);
return res.getKey().map(s ->
s.equals(GroupState.STABLE)).orElse(false) &&
res.getValue().isPresent() &&
- res.getValue().get().stream().filter(s ->
Objects.equals(s.group, group)).count() == 2 &&
- res.getValue().get().stream().filter(x ->
Objects.equals(x.group, group) && x.numPartitions == 1).count() == 2 &&
- res.getValue().get().stream().noneMatch(x ->
Objects.equals(x.group, group) && x.numPartitions == 0);
+ res.getValue().get().stream().filter(s ->
Objects.equals(s.group(), group)).count() == 2 &&
+ res.getValue().get().stream().filter(x ->
Objects.equals(x.group(), group) && x.numPartitions() == 1).count() == 2 &&
+ res.getValue().get().stream().noneMatch(x ->
Objects.equals(x.group(), group) && x.numPartitions() == 0);
}, "Expected two rows (one row per consumer) in describe group
members results.");
Entry<Optional<GroupState>,
Optional<Collection<MemberAssignmentState>>> res =
service.collectGroupMembers(group);
- assertTrue(res.getKey().map(s ->
s.equals(GroupState.STABLE)).orElse(false) && res.getValue().map(s ->
s.stream().filter(x -> x.assignment.isEmpty()).count()).orElse(0L) == 0,
+ assertTrue(res.getKey().map(s ->
s.equals(GroupState.STABLE)).orElse(false) && res.getValue().map(s ->
s.stream().filter(x -> x.assignment().isEmpty()).count()).orElse(0L) == 0,
"Expected additional columns in verbose version of
describe members");
}
}
@@ -894,7 +894,7 @@ public class DescribeConsumerGroupTest {
) {
TestUtils.waitForCondition(() -> {
GroupInformation state = service.collectGroupState(group);
- return Objects.equals(state.groupState, GroupState.STABLE)
&& Objects.equals(state.group, group) && state.numMembers == 2;
+ return Objects.equals(state.groupState(),
GroupState.STABLE) && Objects.equals(state.group(), group) &&
state.numMembers() == 2;
}, "Expected a stable group with two members in describe group
state result.");
}
}
@@ -915,7 +915,7 @@ public class DescribeConsumerGroupTest {
TestUtils.waitForCondition(() -> {
Entry<Optional<GroupState>,
Optional<Collection<PartitionAssignmentState>>> res =
service.collectGroupOffsets(group);
return res.getKey().map(s ->
s.equals(GroupState.EMPTY)).orElse(false)
- && res.getValue().isPresent() &&
res.getValue().get().stream().filter(s -> Objects.equals(s.group,
group)).count() == 2;
+ && res.getValue().isPresent() &&
res.getValue().get().stream().filter(s -> Objects.equals(s.group(),
group)).count() == 2;
}, "Expected a stable group with two members in describe group
state result.");
}
}
@@ -1030,7 +1030,7 @@ public class DescribeConsumerGroupTest {
TestUtils.waitForCondition(() -> {
Entry<Optional<GroupState>,
Optional<Collection<PartitionAssignmentState>>> groupOffsets =
service.collectGroupOffsets(group);
- Predicate<PartitionAssignmentState> isGrp = s ->
Objects.equals(s.group, group);
+ Predicate<PartitionAssignmentState> isGrp = s ->
Objects.equals(s.group(), group);
boolean res = groupOffsets.getKey().map(s ->
s.equals(GroupState.STABLE)).orElse(false) &&
groupOffsets.getValue().isPresent() &&
@@ -1045,9 +1045,9 @@ public class DescribeConsumerGroupTest {
PartitionAssignmentState assignmentState =
maybeAssignmentState.get();
- return assignmentState.consumerId.map(c ->
!c.trim().equals(ConsumerGroupCommand.MISSING_COLUMN_VALUE)).orElse(false) &&
- assignmentState.clientId.map(c ->
!c.trim().equals(ConsumerGroupCommand.MISSING_COLUMN_VALUE)).orElse(false) &&
- assignmentState.host.map(h ->
!h.trim().equals(ConsumerGroupCommand.MISSING_COLUMN_VALUE)).orElse(false);
+ return assignmentState.consumerId().map(c ->
!c.trim().equals(ConsumerGroupCommand.MISSING_COLUMN_VALUE)).orElse(false) &&
+ assignmentState.clientId().map(c ->
!c.trim().equals(ConsumerGroupCommand.MISSING_COLUMN_VALUE)).orElse(false) &&
+ assignmentState.host().map(h ->
!h.trim().equals(ConsumerGroupCommand.MISSING_COLUMN_VALUE)).orElse(false);
}, "Expected a 'Stable' group status, rows and valid values
for consumer id / client id / host columns in describe results for
non-offset-committing group " + group + ".");
}
}
diff --git
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ResetConsumerGroupOffsetTest.java
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ResetConsumerGroupOffsetTest.java
index 1c9ab9cf98b..bbcfb6e35c1 100644
---
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ResetConsumerGroupOffsetTest.java
+++
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ResetConsumerGroupOffsetTest.java
@@ -896,10 +896,10 @@ public class ResetConsumerGroupOffsetTest {
private void
awaitConsumerGroupInactive(ConsumerGroupCommand.ConsumerGroupService service,
String group) throws Exception {
TestUtils.waitForCondition(() -> {
- GroupState state = service.collectGroupState(group).groupState;
+ GroupState state = service.collectGroupState(group).groupState();
return Objects.equals(state, GroupState.EMPTY) ||
Objects.equals(state, GroupState.DEAD);
}, "Expected that consumer group is inactive. Actual state: " +
- service.collectGroupState(group).groupState);
+ service.collectGroupState(group).groupState());
}
private void resetAndAssertOffsetsCommitted(ClusterInstance cluster,
diff --git
a/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandTest.java
b/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandTest.java
index f1726721bfd..a8d830b4bd4 100644
---
a/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandTest.java
+++
b/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandTest.java
@@ -646,16 +646,7 @@ public class ReassignPartitionsCommandTest {
}));
}
- static class LogDirReassignment {
- final String json;
- final String currentDir;
- final String targetDir;
-
- public LogDirReassignment(String json, String currentDir, String
targetDir) {
- this.json = json;
- this.currentDir = currentDir;
- this.targetDir = targetDir;
- }
+ record LogDirReassignment(String json, String currentDir, String
targetDir) {
}
private LogDirReassignment buildLogDirReassignment(TopicPartition
topicPartition,