This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 4048aa19faf MINOR: Replace Collections factory methods with Java 11+
equivalents in clients (#22060)
4048aa19faf is described below
commit 4048aa19faf5159cca08a6754fafdc3be1c8bf08
Author: Maros Orsak <[email protected]>
AuthorDate: Sat Jun 6 17:38:59 2026 +0200
MINOR: Replace Collections factory methods with Java 11+ equivalents in
clients (#22060)
This is the 4th part of improving replace collections factory methods
with its Java 11 equivalents in the clients module.
Reviewers: nileshkumar3 <[email protected]>, Ken Huang
<[email protected]>, Christo Lolov <[email protected]>, Chia-Ping Tsai
<[email protected]>
---
.../main/java/org/apache/kafka/common/Cluster.java | 18 ++---
.../kafka/common/metrics/MetricsReporter.java | 3 +-
.../kafka/common/network/ChannelBuilders.java | 3 +-
.../kafka/common/network/SaslChannelBuilder.java | 9 +--
.../org/apache/kafka/common/network/Selector.java | 4 +-
.../kafka/common/quota/ClientQuotaFilter.java | 4 +-
.../security/kerberos/KerberosShortNamer.java | 3 +-
.../security/oauthbearer/BrokerJwtValidator.java | 5 +-
.../security/oauthbearer/ClientJwtValidator.java | 5 +-
.../oauthbearer/OAuthBearerLoginModule.java | 3 +-
.../OAuthBearerSaslClientCallbackHandler.java | 3 +-
.../internals/secured/JaasOptionsUtils.java | 3 +-
.../unsecured/OAuthBearerUnsecuredJws.java | 4 +-
.../OAuthBearerUnsecuredLoginCallbackHandler.java | 3 +-
...uthBearerUnsecuredValidatorCallbackHandler.java | 6 +-
.../security/scram/ScramExtensionsCallback.java | 3 +-
.../common/security/scram/ScramLoginModule.java | 3 +-
.../security/scram/internals/ScramExtensions.java | 3 +-
.../kafka/common/security/ssl/SslFactory.java | 4 +-
.../common/telemetry/ClientTelemetryState.java | 92 +++++++---------------
.../internals/ClientTelemetryReporter.java | 5 +-
.../telemetry/internals/ClientTelemetryUtils.java | 3 +-
.../common/telemetry/internals/MetricKey.java | 3 +-
.../common/telemetry/internals/MetricsEmitter.java | 3 +-
.../common/utils/internals/CopyOnWriteMap.java | 4 +-
.../ImplicitLinkedHashMultiCollection.java | 3 +-
26 files changed, 72 insertions(+), 130 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/common/Cluster.java
b/clients/src/main/java/org/apache/kafka/common/Cluster.java
index 93f2f4225bc..b8466dd0193 100644
--- a/clients/src/main/java/org/apache/kafka/common/Cluster.java
+++ b/clients/src/main/java/org/apache/kafka/common/Cluster.java
@@ -59,7 +59,7 @@ public final class Cluster {
Collection<PartitionInfo> partitions,
Set<String> unauthorizedTopics,
Set<String> internalTopics) {
- this(clusterId, false, nodes, partitions, unauthorizedTopics,
Collections.emptySet(), internalTopics, null, Collections.emptyMap());
+ this(clusterId, false, nodes, partitions, unauthorizedTopics,
Set.of(), internalTopics, null, Map.of());
}
/**
@@ -73,7 +73,7 @@ public final class Cluster {
Set<String> unauthorizedTopics,
Set<String> internalTopics,
Node controller) {
- this(clusterId, false, nodes, partitions, unauthorizedTopics,
Collections.emptySet(), internalTopics, controller, Collections.emptyMap());
+ this(clusterId, false, nodes, partitions, unauthorizedTopics,
Set.of(), internalTopics, controller, Map.of());
}
/**
@@ -88,7 +88,7 @@ public final class Cluster {
Set<String> invalidTopics,
Set<String> internalTopics,
Node controller) {
- this(clusterId, false, nodes, partitions, unauthorizedTopics,
invalidTopics, internalTopics, controller, Collections.emptyMap());
+ this(clusterId, false, nodes, partitions, unauthorizedTopics,
invalidTopics, internalTopics, controller, Map.of());
}
/**
@@ -199,8 +199,8 @@ public final class Cluster {
* Create an empty cluster instance with no nodes and no topic-partitions.
*/
public static Cluster empty() {
- return new Cluster(null, new ArrayList<>(0), new ArrayList<>(0),
Collections.emptySet(),
- Collections.emptySet(), null);
+ return new Cluster(null, new ArrayList<>(0), new ArrayList<>(0),
Set.of(),
+ Set.of(), null);
}
/**
@@ -214,7 +214,7 @@ public final class Cluster {
for (InetSocketAddress address : addresses)
nodes.add(new Node(nodeId--, address.getHostString(),
address.getPort()));
return new Cluster(null, true, nodes, new ArrayList<>(0),
- Collections.emptySet(), Collections.emptySet(),
Collections.emptySet(), null, Collections.emptyMap());
+ Set.of(), Set.of(), Set.of(), null, Map.of());
}
/**
@@ -292,7 +292,7 @@ public final class Cluster {
* @return A list of partitions
*/
public List<PartitionInfo> partitionsForTopic(String topic) {
- return partitionsByTopic.getOrDefault(topic, Collections.emptyList());
+ return partitionsByTopic.getOrDefault(topic, List.of());
}
/**
@@ -311,7 +311,7 @@ public final class Cluster {
* @return A list of partitions
*/
public List<PartitionInfo> availablePartitionsForTopic(String topic) {
- return availablePartitionsByTopic.getOrDefault(topic,
Collections.emptyList());
+ return availablePartitionsByTopic.getOrDefault(topic, List.of());
}
/**
@@ -320,7 +320,7 @@ public final class Cluster {
* @return A list of partitions
*/
public List<PartitionInfo> partitionsForNode(int nodeId) {
- return partitionsByNode.getOrDefault(nodeId, Collections.emptyList());
+ return partitionsByNode.getOrDefault(nodeId, List.of());
}
/**
diff --git
a/clients/src/main/java/org/apache/kafka/common/metrics/MetricsReporter.java
b/clients/src/main/java/org/apache/kafka/common/metrics/MetricsReporter.java
index 0b6e47ee2f5..a46105def92 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/MetricsReporter.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/MetricsReporter.java
@@ -19,7 +19,6 @@ package org.apache.kafka.common.metrics;
import org.apache.kafka.common.Reconfigurable;
import org.apache.kafka.common.config.ConfigException;
-import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -56,7 +55,7 @@ public interface MetricsReporter extends Reconfigurable,
AutoCloseable {
// default methods for backwards compatibility with reporters that only
implement Configurable
default Set<String> reconfigurableConfigs() {
- return Collections.emptySet();
+ return Set.of();
}
default void validateReconfiguration(Map<String, ?> configs) throws
ConfigException {
diff --git
a/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java
b/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java
index dd557a1b25c..a028b345764 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java
@@ -37,7 +37,6 @@ import org.apache.kafka.common.utils.internals.LogContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
@@ -163,7 +162,7 @@ public class ChannelBuilders {
// Use server context for inter-broker client connections
and client context for other clients
JaasContext jaasContext = contextType ==
JaasContext.Type.CLIENT ? JaasContext.loadClientContext(configs) :
JaasContext.loadServerContext(listenerName,
clientSaslMechanism, configs);
- jaasContexts =
Collections.singletonMap(clientSaslMechanism, jaasContext);
+ jaasContexts = Map.of(clientSaslMechanism, jaasContext);
}
channelBuilder = new SaslChannelBuilder(connectionMode,
jaasContexts,
diff --git
a/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
index 3401f593e91..52f798ef7e5 100644
---
a/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
+++
b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
@@ -65,7 +65,6 @@ import java.io.IOException;
import java.net.Socket;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
-import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -187,7 +186,7 @@ public class SaslChannelBuilder implements ChannelBuilder,
ListenerReconfigurabl
@Override
public Set<String> reconfigurableConfigs() {
- return securityProtocol == SecurityProtocol.SASL_SSL ?
SslConfigs.RECONFIGURABLE_CONFIGS : Collections.emptySet();
+ return securityProtocol == SecurityProtocol.SASL_SSL ?
SslConfigs.RECONFIGURABLE_CONFIGS : Set.of();
}
@Override
@@ -223,11 +222,11 @@ public class SaslChannelBuilder implements
ChannelBuilder, ListenerReconfigurabl
Supplier<Authenticator> authenticatorCreator;
if (connectionMode == ConnectionMode.SERVER) {
authenticatorCreator = () -> buildServerAuthenticator(configs,
- Collections.unmodifiableMap(saslCallbackHandlers),
+ Map.copyOf(saslCallbackHandlers),
id,
finalTransportLayer,
- Collections.unmodifiableMap(subjects),
-
Collections.unmodifiableMap(connectionsMaxReauthMsByMechanism),
+ Map.copyOf(subjects),
+ Map.copyOf(connectionsMaxReauthMsByMechanism),
metadataRegistry);
} else {
LoginManager loginManager =
loginManagers.get(clientSaslMechanism);
diff --git
a/clients/src/main/java/org/apache/kafka/common/network/Selector.java
b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
index a88a7c1196e..0b6cee13e1c 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
@@ -226,11 +226,11 @@ public class Selector implements Selectable,
AutoCloseable {
}
public Selector(long connectionMaxIdleMS, Metrics metrics, Time time,
String metricGrpPrefix, ChannelBuilder channelBuilder, LogContext logContext) {
- this(NetworkReceive.UNLIMITED, connectionMaxIdleMS, metrics, time,
metricGrpPrefix, Collections.emptyMap(), true, channelBuilder, logContext);
+ this(NetworkReceive.UNLIMITED, connectionMaxIdleMS, metrics, time,
metricGrpPrefix, Map.of(), true, channelBuilder, logContext);
}
public Selector(long connectionMaxIdleMS, int failedAuthenticationDelayMs,
Metrics metrics, Time time, String metricGrpPrefix, ChannelBuilder
channelBuilder, LogContext logContext) {
- this(NetworkReceive.UNLIMITED, connectionMaxIdleMS,
failedAuthenticationDelayMs, metrics, time, metricGrpPrefix,
Collections.emptyMap(), true, channelBuilder, logContext);
+ this(NetworkReceive.UNLIMITED, connectionMaxIdleMS,
failedAuthenticationDelayMs, metrics, time, metricGrpPrefix, Map.of(), true,
channelBuilder, logContext);
}
/**
diff --git
a/clients/src/main/java/org/apache/kafka/common/quota/ClientQuotaFilter.java
b/clients/src/main/java/org/apache/kafka/common/quota/ClientQuotaFilter.java
index e8a6a7290d4..0532c66580a 100644
--- a/clients/src/main/java/org/apache/kafka/common/quota/ClientQuotaFilter.java
+++ b/clients/src/main/java/org/apache/kafka/common/quota/ClientQuotaFilter.java
@@ -18,7 +18,7 @@
package org.apache.kafka.common.quota;
import java.util.Collection;
-import java.util.Collections;
+import java.util.List;
import java.util.Objects;
/**
@@ -64,7 +64,7 @@ public class ClientQuotaFilter {
* Constructs and returns a quota filter that matches all configured
entities.
*/
public static ClientQuotaFilter all() {
- return new ClientQuotaFilter(Collections.emptyList(), false);
+ return new ClientQuotaFilter(List.of(), false);
}
/**
diff --git
a/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosShortNamer.java
b/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosShortNamer.java
index 96e01f17942..8aded504dce 100644
---
a/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosShortNamer.java
+++
b/clients/src/main/java/org/apache/kafka/common/security/kerberos/KerberosShortNamer.java
@@ -18,7 +18,6 @@ package org.apache.kafka.common.security.kerberos;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -43,7 +42,7 @@ public class KerberosShortNamer {
}
public static KerberosShortNamer fromUnparsedRules(String defaultRealm,
List<String> principalToLocalRules) {
- List<String> rules = principalToLocalRules == null ?
Collections.singletonList("DEFAULT") : principalToLocalRules;
+ List<String> rules = principalToLocalRules == null ?
List.of("DEFAULT") : principalToLocalRules;
return new KerberosShortNamer(parseRules(defaultRealm, rules));
}
diff --git
a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/BrokerJwtValidator.java
b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/BrokerJwtValidator.java
index c69db033052..60b065a5e45 100644
---
a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/BrokerJwtValidator.java
+++
b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/BrokerJwtValidator.java
@@ -36,7 +36,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collection;
-import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -164,11 +163,11 @@ public class BrokerJwtValidator implements JwtValidator {
Collection<String> scopeRawCollection;
if (scopeRaw instanceof String)
- scopeRawCollection = Collections.singletonList((String) scopeRaw);
+ scopeRawCollection = List.of((String) scopeRaw);
else if (scopeRaw instanceof Collection)
scopeRawCollection = (Collection<String>) scopeRaw;
else
- scopeRawCollection = Collections.emptySet();
+ scopeRawCollection = Set.of();
NumericDate expirationRaw = getClaim(claims::getExpirationTime,
ReservedClaimNames.EXPIRATION_TIME);
String subRaw = getClaim(() ->
claims.getStringClaimValue(subClaimName), subClaimName);
diff --git
a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/ClientJwtValidator.java
b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/ClientJwtValidator.java
index 53cd88f24dd..6d3ce1a8079 100644
---
a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/ClientJwtValidator.java
+++
b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/ClientJwtValidator.java
@@ -28,7 +28,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collection;
-import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -107,11 +106,11 @@ public class ClientJwtValidator implements JwtValidator {
Collection<String> scopeRawCollection;
if (scopeRaw instanceof String)
- scopeRawCollection = Collections.singletonList((String) scopeRaw);
+ scopeRawCollection = List.of((String) scopeRaw);
else if (scopeRaw instanceof Collection)
scopeRawCollection = (Collection<String>) scopeRaw;
else
- scopeRawCollection = Collections.emptySet();
+ scopeRawCollection = Set.of();
Number expirationRaw = (Number) getClaim(payload,
EXPIRATION_CLAIM_NAME);
String subRaw = (String) getClaim(payload, subClaimName);
diff --git
a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerLoginModule.java
b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerLoginModule.java
index ddbbd1a787a..d8cbe777458 100644
---
a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerLoginModule.java
+++
b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerLoginModule.java
@@ -28,7 +28,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
@@ -257,7 +256,7 @@ public class OAuthBearerLoginModule implements LoginModule {
*/
public static final String OAUTHBEARER_MECHANISM = "OAUTHBEARER";
private static final Logger log =
LoggerFactory.getLogger(OAuthBearerLoginModule.class);
- private static final SaslExtensions EMPTY_EXTENSIONS = new
SaslExtensions(Collections.emptyMap());
+ private static final SaslExtensions EMPTY_EXTENSIONS = new
SaslExtensions(Map.of());
private Subject subject = null;
private AuthenticateCallbackHandler callbackHandler = null;
private OAuthBearerToken tokenRequiringCommit = null;
diff --git
a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslClientCallbackHandler.java
b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslClientCallbackHandler.java
index 955ab4c1fac..b2a508e92eb 100644
---
a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslClientCallbackHandler.java
+++
b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslClientCallbackHandler.java
@@ -28,7 +28,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.List;
@@ -100,7 +99,7 @@ public class OAuthBearerSaslClientCallbackHandler implements
AuthenticateCallbac
Subject subject = SecurityManagerCompatibility.get().current();
Set<OAuthBearerToken> privateCredentials = subject != null
? subject.getPrivateCredentials(OAuthBearerToken.class)
- : Collections.emptySet();
+ : Set.of();
if (privateCredentials.isEmpty())
throw new IOException("No OAuth Bearer tokens in Subject's private
credentials");
if (privateCredentials.size() == 1)
diff --git
a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/JaasOptionsUtils.java
b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/JaasOptionsUtils.java
index ec6d3daafe8..73b9a1560a6 100644
---
a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/JaasOptionsUtils.java
+++
b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/JaasOptionsUtils.java
@@ -31,7 +31,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.URL;
-import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -65,7 +64,7 @@ public class JaasOptionsUtils {
if (Objects.requireNonNull(jaasConfigEntries).size() != 1 ||
jaasConfigEntries.get(0) == null)
throw new IllegalArgumentException(String.format("Must supply
exactly 1 non-null JAAS mechanism configuration (size was %d)",
jaasConfigEntries.size()));
- return
Collections.unmodifiableMap(jaasConfigEntries.get(0).getOptions());
+ return Map.copyOf(jaasConfigEntries.get(0).getOptions());
}
public boolean containsKey(String name) {
diff --git
a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredJws.java
b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredJws.java
index bea463a8d14..6d7c4dae109 100644
---
a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredJws.java
+++
b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredJws.java
@@ -345,14 +345,14 @@ public class OAuthBearerUnsecuredJws implements
OAuthBearerToken {
if (isClaimType(scopeClaimName, String.class)) {
String scopeClaimValue = claim(scopeClaimName, String.class);
if (Utils.isBlank(scopeClaimValue))
- return Collections.emptySet();
+ return Set.of();
else {
return Set.of(scopeClaimValue.trim());
}
}
List<?> scopeClaimValue = claim(scopeClaimName, List.class);
if (scopeClaimValue == null || scopeClaimValue.isEmpty())
- return Collections.emptySet();
+ return Set.of();
@SuppressWarnings("unchecked")
List<String> stringList = (List<String>) scopeClaimValue;
Set<String> retval = new HashSet<>();
diff --git
a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredLoginCallbackHandler.java
b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredLoginCallbackHandler.java
index 455fda983c3..bec5e103a1b 100644
---
a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredLoginCallbackHandler.java
+++
b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredLoginCallbackHandler.java
@@ -34,7 +34,6 @@ import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.Base64.Encoder;
-import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -147,7 +146,7 @@ public class OAuthBearerUnsecuredLoginCallbackHandler
implements AuthenticateCal
throw new IllegalArgumentException(
String.format("Must supply exactly 1 non-null JAAS
mechanism configuration (size was %d)",
jaasConfigEntries.size()));
- this.moduleOptions = Collections.unmodifiableMap((Map<String, String>)
jaasConfigEntries.get(0).getOptions());
+ this.moduleOptions = Map.copyOf((Map<String, String>)
jaasConfigEntries.get(0).getOptions());
configured = true;
}
diff --git
a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredValidatorCallbackHandler.java
b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredValidatorCallbackHandler.java
index 53e099688d9..4f2c06f4b85 100644
---
a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredValidatorCallbackHandler.java
+++
b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/unsecured/OAuthBearerUnsecuredValidatorCallbackHandler.java
@@ -26,7 +26,6 @@ import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -118,8 +117,7 @@ public class OAuthBearerUnsecuredValidatorCallbackHandler
implements Authenticat
throw new IllegalArgumentException(
String.format("Must supply exactly 1 non-null JAAS
mechanism configuration (size was %d)",
jaasConfigEntries.size()));
- this.moduleOptions = Collections
- .unmodifiableMap((Map<String, String>)
jaasConfigEntries.get(0).getOptions());
+ this.moduleOptions = Map.copyOf((Map<String, String>)
jaasConfigEntries.get(0).getOptions());
configured = true;
}
@@ -188,7 +186,7 @@ public class OAuthBearerUnsecuredValidatorCallbackHandler
implements Authenticat
private List<String> requiredScope() {
String requiredSpaceDelimitedScope = option(REQUIRED_SCOPE_OPTION);
- return Utils.isBlank(requiredSpaceDelimitedScope) ?
Collections.emptyList() :
OAuthBearerScopeUtils.parseScope(requiredSpaceDelimitedScope.trim());
+ return Utils.isBlank(requiredSpaceDelimitedScope) ? List.of() :
OAuthBearerScopeUtils.parseScope(requiredSpaceDelimitedScope.trim());
}
private int allowableClockSkewMs() {
diff --git
a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramExtensionsCallback.java
b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramExtensionsCallback.java
index 23ab1aaa8e4..69350b7f464 100644
---
a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramExtensionsCallback.java
+++
b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramExtensionsCallback.java
@@ -16,7 +16,6 @@
*/
package org.apache.kafka.common.security.scram;
-import java.util.Collections;
import java.util.Map;
import javax.security.auth.callback.Callback;
@@ -27,7 +26,7 @@ import javax.security.auth.callback.Callback;
* in the SASL/SCRAM exchange.
*/
public class ScramExtensionsCallback implements Callback {
- private Map<String, String> extensions = Collections.emptyMap();
+ private Map<String, String> extensions = Map.of();
/**
* Returns map of the extension names and values that are sent by the
client to
diff --git
a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramLoginModule.java
b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramLoginModule.java
index eb523c9a82c..3bf60161a04 100644
---
a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramLoginModule.java
+++
b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramLoginModule.java
@@ -19,7 +19,6 @@ package org.apache.kafka.common.security.scram;
import
org.apache.kafka.common.security.scram.internals.ScramSaslClientProvider;
import
org.apache.kafka.common.security.scram.internals.ScramSaslServerProvider;
-import java.util.Collections;
import java.util.Map;
import javax.security.auth.Subject;
@@ -48,7 +47,7 @@ public class ScramLoginModule implements LoginModule {
boolean useTokenAuthentication = "true".equalsIgnoreCase((String)
options.get(TOKEN_AUTH_CONFIG));
if (useTokenAuthentication) {
- Map<String, String> scramExtensions =
Collections.singletonMap(TOKEN_AUTH_CONFIG, "true");
+ Map<String, String> scramExtensions = Map.of(TOKEN_AUTH_CONFIG,
"true");
subject.getPublicCredentials().add(scramExtensions);
}
}
diff --git
a/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramExtensions.java
b/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramExtensions.java
index 439bcfe14f2..81fd5693b1c 100644
---
a/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramExtensions.java
+++
b/clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramExtensions.java
@@ -20,13 +20,12 @@ import org.apache.kafka.common.security.auth.SaslExtensions;
import org.apache.kafka.common.security.scram.ScramLoginModule;
import org.apache.kafka.common.utils.Utils;
-import java.util.Collections;
import java.util.Map;
public class ScramExtensions extends SaslExtensions {
public ScramExtensions() {
- this(Collections.emptyMap());
+ this(Map.of());
}
public ScramExtensions(String extensions) {
diff --git
a/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java
b/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java
index ced699640fd..ab99f5411c7 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java
@@ -40,10 +40,8 @@ import java.security.cert.Certificate;
import java.security.cert.X509Certificate;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
import java.util.Enumeration;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -372,7 +370,7 @@ public class SslFactory implements Reconfigurable,
Closeable {
this.subjectPrincipal = cert.getSubjectX500Principal();
Collection<List<?>> altNames = cert.getSubjectAlternativeNames();
// use a set for comparison
- this.subjectAltNames = altNames != null ? new HashSet<>(altNames)
: Collections.emptySet();
+ this.subjectAltNames = altNames != null ? Set.copyOf(altNames) :
Set.of();
}
@Override
diff --git
a/clients/src/main/java/org/apache/kafka/common/telemetry/ClientTelemetryState.java
b/clients/src/main/java/org/apache/kafka/common/telemetry/ClientTelemetryState.java
index 35762de28fc..18fe80a54e3 100644
---
a/clients/src/main/java/org/apache/kafka/common/telemetry/ClientTelemetryState.java
+++
b/clients/src/main/java/org/apache/kafka/common/telemetry/ClientTelemetryState.java
@@ -18,9 +18,6 @@
package org.apache.kafka.common.telemetry;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.EnumMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@@ -65,67 +62,34 @@ public enum ClientTelemetryState {
*/
TERMINATED;
- private static final Map<ClientTelemetryState, List<ClientTelemetryState>>
VALID_NEXT_STATES = new EnumMap<>(ClientTelemetryState.class);
-
- static {
- /*
- If clients needs a subscription, then issue telemetry API to fetch
subscription from broker.
-
- However, it's still possible that client doesn't get very far before
terminating.
- */
- VALID_NEXT_STATES.put(
- SUBSCRIPTION_NEEDED, Arrays.asList(SUBSCRIPTION_IN_PROGRESS,
TERMINATED));
-
- /*
- If client is finished waiting for subscription, then client is ready
to push the telemetry.
- But, it's possible that no telemetry metrics are requested, hence
client should go back to
- subscription needed state i.e. requesting the next updated
subscription.
-
- However, it's still possible that client doesn't get very far before
terminating.
- */
- VALID_NEXT_STATES.put(SUBSCRIPTION_IN_PROGRESS,
Arrays.asList(PUSH_NEEDED,
- SUBSCRIPTION_NEEDED, TERMINATING_PUSH_NEEDED, TERMINATED));
-
- /*
- If client transitions out of this state, then client should proceed
to push the metrics.
- But, if the push fails (network issues, the subscription changed,
etc.) then client should
- go back to subscription needed state and request the next
subscription.
-
- However, it's still possible that client doesn't get very far before
terminating.
- */
- VALID_NEXT_STATES.put(PUSH_NEEDED, Arrays.asList(PUSH_IN_PROGRESS,
SUBSCRIPTION_NEEDED,
- TERMINATING_PUSH_NEEDED, TERMINATED));
-
- /*
- A successful push should transition client to push needed which sends
the next telemetry
- metrics after the elapsed wait interval. But, if the push fails
(network issues, the
- subscription changed, etc.) then client should go back to
subscription needed state and
- request the next subscription.
-
- However, it's still possible that client doesn't get very far before
terminating.
- */
- VALID_NEXT_STATES.put(
- PUSH_IN_PROGRESS, Arrays.asList(PUSH_NEEDED, SUBSCRIPTION_NEEDED,
TERMINATING_PUSH_NEEDED,
- TERMINATED));
-
- /*
- If client is moving out of this state, then try to send last metrics
push.
-
- However, it's still possible that client doesn't get very far before
terminating.
- */
- VALID_NEXT_STATES.put(
- TERMINATING_PUSH_NEEDED,
Arrays.asList(TERMINATING_PUSH_IN_PROGRESS, TERMINATED));
-
- /*
- Client should only be transited to terminated state.
- */
- VALID_NEXT_STATES.put(TERMINATING_PUSH_IN_PROGRESS,
Collections.singletonList(TERMINATED));
-
- /*
- Client should never be able to transition out of terminated state.
- */
- VALID_NEXT_STATES.put(TERMINATED, Collections.emptyList());
- }
+ private static final Map<ClientTelemetryState, List<ClientTelemetryState>>
VALID_NEXT_STATES = Map.of(
+ // If client needs a subscription, then issue telemetry API to fetch
subscription from broker.
+ // However, it's still possible that client doesn't get very far
before terminating.
+ SUBSCRIPTION_NEEDED, List.of(SUBSCRIPTION_IN_PROGRESS, TERMINATED),
+ // If client is finished waiting for subscription, then client is
ready to push the telemetry.
+ // But, it's possible that no telemetry metrics are requested, hence
client should go back to
+ // subscription needed state i.e. requesting the next updated
subscription.
+ // However, it's still possible that client doesn't get very far
before terminating.
+ SUBSCRIPTION_IN_PROGRESS, List.of(PUSH_NEEDED, SUBSCRIPTION_NEEDED,
TERMINATING_PUSH_NEEDED, TERMINATED),
+ // If client transitions out of this state, then client should proceed
to push the metrics.
+ // But, if the push fails (network issues, the subscription changed,
etc.) then client should
+ // go back to subscription needed state and request the next
subscription.
+ // However, it's still possible that client doesn't get very far
before terminating.
+ PUSH_NEEDED, List.of(PUSH_IN_PROGRESS, SUBSCRIPTION_NEEDED,
TERMINATING_PUSH_NEEDED, TERMINATED),
+ // A successful push should transition client to push needed which
sends the next telemetry
+ // metrics after the elapsed wait interval. But, if the push fails
(network issues, the
+ // subscription changed, etc.) then client should go back to
subscription needed state and
+ // request the next subscription.
+ // However, it's still possible that client doesn't get very far
before terminating.
+ PUSH_IN_PROGRESS, List.of(PUSH_NEEDED, SUBSCRIPTION_NEEDED,
TERMINATING_PUSH_NEEDED, TERMINATED),
+ // If client is moving out of this state, then try to send last
metrics push.
+ // However, it's still possible that client doesn't get very far
before terminating.
+ TERMINATING_PUSH_NEEDED, List.of(TERMINATING_PUSH_IN_PROGRESS,
TERMINATED),
+ // Client should only be transited to terminated state.
+ TERMINATING_PUSH_IN_PROGRESS, List.of(TERMINATED),
+ // Client should never be able to transition out of terminated state.
+ TERMINATED, List.of()
+ );
/**
* Validates that the <code>newState</code> is one of the valid transition
from the current
diff --git
a/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java
b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java
index 0aa6bf16f0a..cee8ff435a9 100644
---
a/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java
+++
b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java
@@ -45,7 +45,6 @@ import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer;
import java.time.Duration;
-import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -112,7 +111,7 @@ public class ClientTelemetryReporter implements
MetricsReporter {
context. These additional labels from the request context should be added
by broker prior
exporting the metrics to the telemetry backend.
*/
- private static final Set<String> EXCLUDE_LABELS =
Collections.singleton("client_id");
+ private static final Set<String> EXCLUDE_LABELS = Set.of("client_id");
public static final int DEFAULT_PUSH_INTERVAL_MS = 5 * 60 * 1000;
@@ -976,7 +975,7 @@ public class ClientTelemetryReporter implements
MetricsReporter {
this.clientInstanceId = clientInstanceId;
this.subscriptionId = subscriptionId;
this.pushIntervalMs = pushIntervalMs;
- this.acceptedCompressionTypes =
Collections.unmodifiableList(acceptedCompressionTypes);
+ this.acceptedCompressionTypes =
List.copyOf(acceptedCompressionTypes);
this.deltaTemporality = deltaTemporality;
this.selector = selector;
}
diff --git
a/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java
b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java
index 7f44b2dbd27..d67775e1afb 100644
---
a/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java
+++
b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java
@@ -36,7 +36,6 @@ import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -122,7 +121,7 @@ public class ClientTelemetryUtils {
public static List<CompressionType>
getCompressionTypesFromAcceptedList(List<Byte> acceptedCompressionTypes) {
if (acceptedCompressionTypes == null ||
acceptedCompressionTypes.isEmpty()) {
- return Collections.emptyList();
+ return List.of();
}
List<CompressionType> result = new ArrayList<>();
diff --git
a/clients/src/main/java/org/apache/kafka/common/telemetry/internals/MetricKey.java
b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/MetricKey.java
index d2054069a9c..4dbf66b0b40 100644
---
a/clients/src/main/java/org/apache/kafka/common/telemetry/internals/MetricKey.java
+++
b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/MetricKey.java
@@ -18,7 +18,6 @@ package org.apache.kafka.common.telemetry.internals;
import org.apache.kafka.common.MetricName;
-import java.util.Collections;
import java.util.Map;
import java.util.Objects;
@@ -49,7 +48,7 @@ public class MetricKey implements MetricKeyable {
*/
public MetricKey(String name, Map<String, String> tags) {
this.name = Objects.requireNonNull(name);
- this.tags = tags != null ? Collections.unmodifiableMap(tags) :
Collections.emptyMap();
+ this.tags = tags != null ? Map.copyOf(tags) : Map.of();
}
public MetricKey(MetricName metricName) {
diff --git
a/clients/src/main/java/org/apache/kafka/common/telemetry/internals/MetricsEmitter.java
b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/MetricsEmitter.java
index 3b6a3864aa2..005e6eb150e 100644
---
a/clients/src/main/java/org/apache/kafka/common/telemetry/internals/MetricsEmitter.java
+++
b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/MetricsEmitter.java
@@ -17,7 +17,6 @@
package org.apache.kafka.common.telemetry.internals;
import java.io.Closeable;
-import java.util.Collections;
import java.util.List;
/**
@@ -69,7 +68,7 @@ public interface MetricsEmitter extends Closeable {
* @return emitted metrics.
*/
default List<SinglePointMetric> emittedMetrics() {
- return Collections.emptyList();
+ return List.of();
}
/**
diff --git
a/clients/src/main/java/org/apache/kafka/common/utils/internals/CopyOnWriteMap.java
b/clients/src/main/java/org/apache/kafka/common/utils/internals/CopyOnWriteMap.java
index 2e7bd69ff8c..5734742759b 100644
---
a/clients/src/main/java/org/apache/kafka/common/utils/internals/CopyOnWriteMap.java
+++
b/clients/src/main/java/org/apache/kafka/common/utils/internals/CopyOnWriteMap.java
@@ -35,7 +35,7 @@ public class CopyOnWriteMap<K, V> implements ConcurrentMap<K,
V> {
private volatile Map<K, V> map;
public CopyOnWriteMap() {
- this.map = Collections.emptyMap();
+ this.map = Map.of();
}
public CopyOnWriteMap(Map<K, V> map) {
@@ -84,7 +84,7 @@ public class CopyOnWriteMap<K, V> implements ConcurrentMap<K,
V> {
@Override
public synchronized void clear() {
- this.map = Collections.emptyMap();
+ this.map = Map.of();
}
@Override
diff --git
a/clients/src/main/java/org/apache/kafka/common/utils/internals/ImplicitLinkedHashMultiCollection.java
b/clients/src/main/java/org/apache/kafka/common/utils/internals/ImplicitLinkedHashMultiCollection.java
index 17252a5285c..280ac26c6f7 100644
---
a/clients/src/main/java/org/apache/kafka/common/utils/internals/ImplicitLinkedHashMultiCollection.java
+++
b/clients/src/main/java/org/apache/kafka/common/utils/internals/ImplicitLinkedHashMultiCollection.java
@@ -18,7 +18,6 @@
package org.apache.kafka.common.utils.internals;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.Iterator;
import java.util.List;
@@ -121,7 +120,7 @@ public class ImplicitLinkedHashMultiCollection<E extends
ImplicitLinkedHashColle
*/
public final List<E> findAll(E key) {
if (key == null || size() == 0) {
- return Collections.emptyList();
+ return List.of();
}
ArrayList<E> results = new ArrayList<>();
int slot = slot(elements, key);