This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 5eff4ceb5f2 MINOR: Replace Collections factory methods with Java 11+ 
equivalents in group-coordinator and part of clients (#21876)
5eff4ceb5f2 is described below

commit 5eff4ceb5f21e8aa4f0c538369923416b7563894
Author: Maros Orsak <[email protected]>
AuthorDate: Mon Mar 30 16:13:04 2026 +0200

    MINOR: Replace Collections factory methods with Java 11+ equivalents in 
group-coordinator and part of clients (#21876)
    
    This PR is another one with changing old JDK 8 API to JDK 11. It's
    changing in the whole `group-coordinator` module and part of clients
    (i.e., common/requests, common/errors, common/config). I didn't want to
    do the whole... because it would be more bigger change ...
    
    Also I have faced the issue where in  `GroupCoordinatorService we use
    `Collections.singletonList(null)` so I skipped that as `List.of(...)`
    would break the code. Moreover in `AssignorHelpers` there is
    `Collections.emptyMap().getClass()` which is using reflection and that I
    would also use different class  ...
    
    Reviewers: Ken Huang <[email protected]>, PoAn Yang
     <[email protected]>, Chia-Ping Tsai <[email protected]>
---
 .../apache/kafka/common/config/AbstractConfig.java | 12 +++++-----
 .../org/apache/kafka/common/config/ConfigDef.java  | 26 +++++++++++-----------
 .../config/internals/BrokerSecurityConfigs.java    |  5 ++---
 .../config/provider/DirectoryConfigProvider.java   |  3 +--
 .../common/errors/TopicAuthorizationException.java |  3 +--
 .../kafka/common/requests/AbstractResponse.java    |  3 +--
 .../common/requests/AddRaftVoterResponse.java      |  5 ++---
 .../requests/AllocateProducerIdsResponse.java      |  3 +--
 .../common/requests/AlterPartitionRequest.java     |  3 +--
 .../requests/AssignReplicasToDirsResponse.java     |  3 +--
 .../common/requests/BeginQuorumEpochRequest.java   |  7 +++---
 .../requests/ConsumerGroupHeartbeatResponse.java   |  3 +--
 .../requests/ControllerRegistrationResponse.java   |  3 +--
 .../kafka/common/requests/DeleteTopicsRequest.java |  3 +--
 .../common/requests/DescribeGroupsResponse.java    |  3 +--
 .../common/requests/DescribeQuorumRequest.java     |  5 ++---
 .../common/requests/DescribeQuorumResponse.java    | 10 ++++-----
 .../requests/DescribeTopicPartitionsRequest.java   |  3 +--
 .../kafka/common/requests/ElectLeadersRequest.java |  3 +--
 .../common/requests/EndQuorumEpochRequest.java     |  5 ++---
 .../apache/kafka/common/requests/FetchRequest.java |  5 ++---
 .../kafka/common/requests/FetchResponse.java       |  3 +--
 .../common/requests/FindCoordinatorRequest.java    |  7 +++---
 .../common/requests/FindCoordinatorResponse.java   |  5 ++---
 .../kafka/common/requests/JoinGroupRequest.java    |  5 +++--
 .../kafka/common/requests/LeaveGroupRequest.java   |  5 ++---
 .../kafka/common/requests/ListGroupsRequest.java   |  3 +--
 .../kafka/common/requests/ListOffsetsResponse.java |  3 +--
 .../kafka/common/requests/MetadataRequest.java     |  7 +++---
 .../kafka/common/requests/OffsetFetchRequest.java  |  3 +--
 .../kafka/common/requests/ProduceRequest.java      |  6 ++---
 .../kafka/common/requests/ProduceResponse.java     |  9 ++++----
 .../common/requests/RemoveRaftVoterResponse.java   |  5 ++---
 .../kafka/common/requests/ShareFetchResponse.java  |  3 +--
 .../requests/ShareGroupHeartbeatResponse.java      |  3 +--
 .../requests/StreamsGroupHeartbeatResponse.java    |  2 +-
 .../common/requests/UpdateFeaturesRequest.java     |  4 ++--
 .../common/requests/UpdateRaftVoterResponse.java   |  5 ++---
 .../apache/kafka/common/requests/VoteRequest.java  |  7 +++---
 .../coordinator/group/GroupCoordinatorService.java |  2 +-
 .../coordinator/group/GroupMetadataManager.java    |  2 +-
 .../coordinator/group/streams/StreamsGroup.java    |  4 ++--
 .../group/streams/StreamsGroupMember.java          |  2 +-
 43 files changed, 93 insertions(+), 118 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java 
b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
index e271cd99c4c..39d015ec82c 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
@@ -133,7 +133,7 @@ public class AbstractConfig {
      * @param originals  the configuration properties plus any optional config 
provider properties; may not be null
      */
     public AbstractConfig(ConfigDef definition, Map<?, ?> originals) {
-        this(definition, originals, Collections.emptyMap(), true);
+        this(definition, originals, Map.of(), true);
     }
 
     /**
@@ -146,7 +146,7 @@ public class AbstractConfig {
      * @param doLog      whether the configurations should be logged
      */
     public AbstractConfig(ConfigDef definition, Map<?, ?> originals, boolean 
doLog) {
-        this(definition, originals, Collections.emptyMap(), doLog);
+        this(definition, originals, Map.of(), doLog);
     }
 
     /**
@@ -168,7 +168,7 @@ public class AbstractConfig {
      * @return a map of updates that should be applied to the configuration 
(will be validated to prevent bad updates)
      */
     protected Map<String, Object> postProcessParsedConfig(Map<String, Object> 
parsedValues) {
-        return Collections.emptyMap();
+        return Map.of();
     }
 
     protected Object get(String key) {
@@ -430,7 +430,7 @@ public class AbstractConfig {
      * @return A configured instance of the class
      */
     public <T> T getConfiguredInstance(String key, Class<T> t) {
-        return getConfiguredInstance(key, t, Collections.emptyMap());
+        return getConfiguredInstance(key, t, Map.of());
     }
 
     /**
@@ -458,7 +458,7 @@ public class AbstractConfig {
      * @return The list of configured instances
      */
     public <T> List<T> getConfiguredInstances(String key, Class<T> t) {
-        return getConfiguredInstances(key, t, Collections.emptyMap());
+        return getConfiguredInstances(key, t, Map.of());
     }
 
     /**
@@ -606,7 +606,7 @@ public class AbstractConfig {
         final String configProviders = 
indirectConfigs.get(CONFIG_PROVIDERS_CONFIG);
 
         if (configProviders == null || configProviders.isEmpty()) {
-            return Collections.emptyMap();
+            return Map.of();
         }
 
         Map<String, String> providerMap = new HashMap<>();
diff --git 
a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java 
b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
index dd1e4898eaf..efe7f0a1ecc 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
@@ -218,7 +218,7 @@ public class ConfigDef {
      */
     public ConfigDef define(String name, Type type, Object defaultValue, 
Validator validator, Importance importance, String documentation,
                             String group, int orderInGroup, Width width, 
String displayName, Recommender recommender) {
-        return define(name, type, defaultValue, validator, importance, 
documentation, group, orderInGroup, width, displayName, 
Collections.emptyList(), recommender);
+        return define(name, type, defaultValue, validator, importance, 
documentation, group, orderInGroup, width, displayName, List.of(), recommender);
     }
 
     /**
@@ -237,7 +237,7 @@ public class ConfigDef {
      */
     public ConfigDef define(String name, Type type, Object defaultValue, 
Validator validator, Importance importance, String documentation,
                             String group, int orderInGroup, Width width, 
String displayName) {
-        return define(name, type, defaultValue, validator, importance, 
documentation, group, orderInGroup, width, displayName, 
Collections.emptyList());
+        return define(name, type, defaultValue, validator, importance, 
documentation, group, orderInGroup, width, displayName, List.of());
     }
 
     /**
@@ -295,7 +295,7 @@ public class ConfigDef {
      */
     public ConfigDef define(String name, Type type, Object defaultValue, 
Importance importance, String documentation,
                             String group, int orderInGroup, Width width, 
String displayName, Recommender recommender) {
-        return define(name, type, defaultValue, null, importance, 
documentation, group, orderInGroup, width, displayName, 
Collections.emptyList(), recommender);
+        return define(name, type, defaultValue, null, importance, 
documentation, group, orderInGroup, width, displayName, List.of(), recommender);
     }
 
     /**
@@ -313,7 +313,7 @@ public class ConfigDef {
      */
     public ConfigDef define(String name, Type type, Object defaultValue, 
Importance importance, String documentation,
                             String group, int orderInGroup, Width width, 
String displayName) {
-        return define(name, type, defaultValue, null, importance, 
documentation, group, orderInGroup, width, displayName, 
Collections.emptyList());
+        return define(name, type, defaultValue, null, importance, 
documentation, group, orderInGroup, width, displayName, List.of());
     }
 
     /**
@@ -368,7 +368,7 @@ public class ConfigDef {
      */
     public ConfigDef define(String name, Type type, Importance importance, 
String documentation, String group, int orderInGroup,
                             Width width, String displayName, Recommender 
recommender) {
-        return define(name, type, NO_DEFAULT_VALUE, null, importance, 
documentation, group, orderInGroup, width, displayName, 
Collections.emptyList(), recommender);
+        return define(name, type, NO_DEFAULT_VALUE, null, importance, 
documentation, group, orderInGroup, width, displayName, List.of(), recommender);
     }
 
     /**
@@ -385,7 +385,7 @@ public class ConfigDef {
      */
     public ConfigDef define(String name, Type type, Importance importance, 
String documentation, String group, int orderInGroup,
                             Width width, String displayName) {
-        return define(name, type, NO_DEFAULT_VALUE, null, importance, 
documentation, group, orderInGroup, width, displayName, 
Collections.emptyList());
+        return define(name, type, NO_DEFAULT_VALUE, null, importance, 
documentation, group, orderInGroup, width, displayName, List.of());
     }
 
     /**
@@ -427,7 +427,7 @@ public class ConfigDef {
      */
     public ConfigDef define(String name, Type type, Object defaultValue, 
Importance importance, String documentation, String alternativeString) {
         return define(name, type, defaultValue, null, importance, 
documentation, null, -1, Width.NONE,
-                name, Collections.emptyList(), null, alternativeString);
+                name, List.of(), null, alternativeString);
     }
 
     /**
@@ -452,7 +452,7 @@ public class ConfigDef {
      * @return This ConfigDef so you can chain calls
      */
     public ConfigDef defineInternal(final String name, final Type type, final 
Object defaultValue, final Importance importance) {
-        return define(new ConfigKey(name, type, defaultValue, null, 
importance, "", "", -1, Width.NONE, name, Collections.emptyList(), null, true, 
null));
+        return define(new ConfigKey(name, type, defaultValue, null, 
importance, "", "", -1, Width.NONE, name, List.of(), null, true, null));
     }
 
     /**
@@ -467,7 +467,7 @@ public class ConfigDef {
      * @return This ConfigDef so you can chain calls
      */
     public ConfigDef defineInternal(final String name, final Type type, final 
Object defaultValue, final Validator validator, final Importance importance, 
final String documentation) {
-        return define(new ConfigKey(name, type, defaultValue, validator, 
importance, documentation, "", -1, Width.NONE, name, Collections.emptyList(), 
null, true, null));
+        return define(new ConfigKey(name, type, defaultValue, validator, 
importance, documentation, "", -1, Width.NONE, name, List.of(), null, true, 
null));
     }
 
     /**
@@ -768,7 +768,7 @@ public class ConfigDef {
                         return value;
                     else if (value instanceof String)
                         if (trimmed.isEmpty())
-                            return Collections.emptyList();
+                            return List.of();
                         else
                             return 
Arrays.asList(COMMA_WITH_WHITESPACE.split(trimmed, -1));
                     else
@@ -1445,7 +1445,7 @@ public class ConfigDef {
     }
 
     public String toHtmlTable() {
-        return toHtmlTable(Collections.emptyMap());
+        return toHtmlTable(Map.of());
     }
 
     private void addHeader(StringBuilder builder, String headerName) {
@@ -1700,7 +1700,7 @@ public class ConfigDef {
     }
 
     public String toHtml() {
-        return toHtml(Collections.emptyMap());
+        return toHtml(Map.of());
     }
 
     /**
@@ -1709,7 +1709,7 @@ public class ConfigDef {
      * @param idGenerator A function for computing the HTML id attribute in 
the generated HTML from a given config name.
      */
     public String toHtml(int headerDepth, Function<String, String> 
idGenerator) {
-        return toHtml(headerDepth, idGenerator, Collections.emptyMap());
+        return toHtml(headerDepth, idGenerator, Map.of());
     }
 
     /**
diff --git 
a/clients/src/main/java/org/apache/kafka/common/config/internals/BrokerSecurityConfigs.java
 
b/clients/src/main/java/org/apache/kafka/common/config/internals/BrokerSecurityConfigs.java
index c5289676439..339acab0a22 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/config/internals/BrokerSecurityConfigs.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/config/internals/BrokerSecurityConfigs.java
@@ -25,7 +25,6 @@ import 
org.apache.kafka.common.security.auth.KafkaPrincipalBuilder;
 import 
org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder;
 import org.apache.kafka.common.utils.Utils;
 
-import java.util.Collections;
 import java.util.List;
 
 import static org.apache.kafka.common.config.ConfigDef.Importance.LOW;
@@ -61,7 +60,7 @@ public class BrokerSecurityConfigs {
             " configuration.";
 
     public static final String SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES_CONFIG = 
"sasl.kerberos.principal.to.local.rules";
-    public static final List<String> 
DEFAULT_SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES = 
Collections.singletonList(DEFAULT_SSL_PRINCIPAL_MAPPING_RULES);
+    public static final List<String> 
DEFAULT_SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES = 
List.of(DEFAULT_SSL_PRINCIPAL_MAPPING_RULES);
     public static final String SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES_DOC = "A 
list of rules for mapping from principal " +
             "names to short names (typically operating system usernames). The 
rules are evaluated in order and the " +
             "first rule that matches a principal name is used to map it to a 
short name. Any later rules in the list are " +
@@ -95,7 +94,7 @@ public class BrokerSecurityConfigs {
             + "</ul>";
 
     public static final String SASL_ENABLED_MECHANISMS_CONFIG = 
"sasl.enabled.mechanisms";
-    public static final List<String> DEFAULT_SASL_ENABLED_MECHANISMS = 
Collections.singletonList(SaslConfigs.GSSAPI_MECHANISM);
+    public static final List<String> DEFAULT_SASL_ENABLED_MECHANISMS = 
List.of(SaslConfigs.GSSAPI_MECHANISM);
     public static final String SASL_ENABLED_MECHANISMS_DOC = "The list of SASL 
mechanisms enabled in the Kafka server. "
             + "The list may contain any mechanism for which a security 
provider is available. "
             + "Only GSSAPI is enabled by default.";
diff --git 
a/clients/src/main/java/org/apache/kafka/common/config/provider/DirectoryConfigProvider.java
 
b/clients/src/main/java/org/apache/kafka/common/config/provider/DirectoryConfigProvider.java
index 3e0fa5ed772..aecd29be6ff 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/config/provider/DirectoryConfigProvider.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/config/provider/DirectoryConfigProvider.java
@@ -32,7 +32,6 @@ import java.util.function.Predicate;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
-import static java.util.Collections.emptyMap;
 
 /**
  * An implementation of {@link ConfigProvider} based on a directory of files.
@@ -87,7 +86,7 @@ public class DirectoryConfigProvider implements 
ConfigProvider {
             throw new IllegalStateException("The provider has not been 
configured yet.");
         }
 
-        Map<String, String> map = emptyMap();
+        Map<String, String> map = Map.of();
 
         if (path != null && !path.isEmpty()) {
             Path dir = allowedPaths.parseUntrustedPath(path);
diff --git 
a/clients/src/main/java/org/apache/kafka/common/errors/TopicAuthorizationException.java
 
b/clients/src/main/java/org/apache/kafka/common/errors/TopicAuthorizationException.java
index e2235f804e1..0b4b210e1f6 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/errors/TopicAuthorizationException.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/errors/TopicAuthorizationException.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.common.errors;
 
-import java.util.Collections;
 import java.util.Set;
 
 public class TopicAuthorizationException extends AuthorizationException {
@@ -32,7 +31,7 @@ public class TopicAuthorizationException extends 
AuthorizationException {
     }
 
     public TopicAuthorizationException(String message) {
-        this(message, Collections.emptySet());
+        this(message, Set.of());
     }
 
     /**
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java 
b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
index bc313078d74..7d96d1a1731 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
@@ -26,7 +26,6 @@ import org.apache.kafka.common.protocol.SendBuilder;
 
 import java.nio.ByteBuffer;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.EnumMap;
 import java.util.Map;
 import java.util.stream.Collectors;
@@ -65,7 +64,7 @@ public abstract class AbstractResponse implements 
AbstractRequestResponse {
     public abstract Map<Errors, Integer> errorCounts();
 
     protected static Map<Errors, Integer> errorCounts(Errors error) {
-        return Collections.singletonMap(error, 1);
+        return Map.of(error, 1);
     }
 
     protected static Map<Errors, Integer> errorCounts(Stream<Errors> errors) {
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/AddRaftVoterResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/AddRaftVoterResponse.java
index 52a0cb05feb..1efaed61b99 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/AddRaftVoterResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/AddRaftVoterResponse.java
@@ -22,7 +22,6 @@ import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.Readable;
 
-import java.util.Collections;
 import java.util.Map;
 
 public class AddRaftVoterResponse extends AbstractResponse {
@@ -51,9 +50,9 @@ public class AddRaftVoterResponse extends AbstractResponse {
     @Override
     public Map<Errors, Integer> errorCounts() {
         if (data.errorCode() != Errors.NONE.code()) {
-            return Collections.singletonMap(Errors.forCode(data.errorCode()), 
1);
+            return Map.of(Errors.forCode(data.errorCode()), 1);
         } else {
-            return Collections.emptyMap();
+            return Map.of();
         }
     }
 
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/AllocateProducerIdsResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/AllocateProducerIdsResponse.java
index 4c476511931..2575e05abb5 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/AllocateProducerIdsResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/AllocateProducerIdsResponse.java
@@ -22,7 +22,6 @@ import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.Readable;
 
-import java.util.Collections;
 import java.util.Map;
 
 public class AllocateProducerIdsResponse extends AbstractResponse {
@@ -47,7 +46,7 @@ public class AllocateProducerIdsResponse extends 
AbstractResponse {
      */
     @Override
     public Map<Errors, Integer> errorCounts() {
-        return Collections.singletonMap(Errors.forCode(data.errorCode()), 1);
+        return Map.of(Errors.forCode(data.errorCode()), 1);
     }
 
     @Override
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionRequest.java
index 2d181f48766..d6a0eccb611 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionRequest.java
@@ -25,7 +25,6 @@ import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.Readable;
 
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 import java.util.stream.Collectors;
 
@@ -90,7 +89,7 @@ public class AlterPartitionRequest extends AbstractRequest {
                                 newIsr.add(brokerState.brokerId())
                             );
                             partitionData.setNewIsr(newIsr);
-                            
partitionData.setNewIsrWithEpochs(Collections.emptyList());
+                            partitionData.setNewIsrWithEpochs(List.of());
                         }
                     })
                 );
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/AssignReplicasToDirsResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/AssignReplicasToDirsResponse.java
index 84f86d058ec..d785e807dfc 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/AssignReplicasToDirsResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/AssignReplicasToDirsResponse.java
@@ -21,7 +21,6 @@ import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.Readable;
 
-import java.util.Collections;
 import java.util.Map;
 
 public class AssignReplicasToDirsResponse extends AbstractResponse {
@@ -40,7 +39,7 @@ public class AssignReplicasToDirsResponse extends 
AbstractResponse {
 
     @Override
     public Map<Errors, Integer> errorCounts() {
-        return Collections.singletonMap(Errors.forCode(data.errorCode()), 1);
+        return Map.of(Errors.forCode(data.errorCode()), 1);
     }
 
     @Override
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/BeginQuorumEpochRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/BeginQuorumEpochRequest.java
index f3375da9c41..8e7b816a065 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/BeginQuorumEpochRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/BeginQuorumEpochRequest.java
@@ -23,7 +23,8 @@ import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.Readable;
 
-import java.util.Collections;
+import java.util.List;
+
 
 public class BeginQuorumEpochRequest extends AbstractRequest {
     public static class Builder extends 
AbstractRequest.Builder<BeginQuorumEpochRequest> {
@@ -75,10 +76,10 @@ public class BeginQuorumEpochRequest extends 
AbstractRequest {
     ) {
         return new BeginQuorumEpochRequestData()
                    .setClusterId(clusterId)
-                   .setTopics(Collections.singletonList(
+                   .setTopics(List.of(
                        new BeginQuorumEpochRequestData.TopicData()
                            .setTopicName(topicPartition.topic())
-                           .setPartitions(Collections.singletonList(
+                           .setPartitions(List.of(
                                new BeginQuorumEpochRequestData.PartitionData()
                                    
.setPartitionIndex(topicPartition.partition())
                                    .setLeaderEpoch(leaderEpoch)
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/ConsumerGroupHeartbeatResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/ConsumerGroupHeartbeatResponse.java
index dbbabdcaccd..ebc9c46beb3 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/ConsumerGroupHeartbeatResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/ConsumerGroupHeartbeatResponse.java
@@ -23,7 +23,6 @@ import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.Readable;
 
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
@@ -59,7 +58,7 @@ public class ConsumerGroupHeartbeatResponse extends 
AbstractResponse {
 
     @Override
     public Map<Errors, Integer> errorCounts() {
-        return Collections.singletonMap(Errors.forCode(data.errorCode()), 1);
+        return Map.of(Errors.forCode(data.errorCode()), 1);
     }
 
     @Override
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/ControllerRegistrationResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/ControllerRegistrationResponse.java
index 9cc53db15a9..d6462f1f501 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/ControllerRegistrationResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/ControllerRegistrationResponse.java
@@ -22,7 +22,6 @@ import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.Readable;
 
-import java.util.Collections;
 import java.util.Map;
 
 public class ControllerRegistrationResponse extends AbstractResponse {
@@ -50,7 +49,7 @@ public class ControllerRegistrationResponse extends 
AbstractResponse {
 
     @Override
     public Map<Errors, Integer> errorCounts() {
-        return Collections.singletonMap(Errors.forCode(data.errorCode()), 1);
+        return Map.of(Errors.forCode(data.errorCode()), 1);
     }
 
     public static ControllerRegistrationResponse parse(Readable readable, 
short version) {
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsRequest.java
index b90f853211d..dac9f21c169 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsRequest.java
@@ -25,7 +25,6 @@ import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Readable;
 
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 import java.util.stream.Collectors;
 
@@ -104,7 +103,7 @@ public class DeleteTopicsRequest extends AbstractRequest {
     public List<Uuid> topicIds() {
         if (version() >= 6)
             return 
data.topics().stream().map(DeleteTopicState::topicId).collect(Collectors.toList());
-        return Collections.emptyList();
+        return List.of();
     }
     
     public List<DeleteTopicState> topics() {
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
index 4d59aee8758..b97526d0dd5 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
@@ -24,7 +24,6 @@ import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.Readable;
 import org.apache.kafka.common.utils.Utils;
 
-import java.util.Collections;
 import java.util.EnumMap;
 import java.util.List;
 import java.util.Map;
@@ -106,7 +105,7 @@ public class DescribeGroupsResponse extends 
AbstractResponse {
 
     public static DescribedGroup groupError(String groupId, Errors error) {
         return groupMetadata(groupId, error, 
DescribeGroupsResponse.UNKNOWN_STATE, 
DescribeGroupsResponse.UNKNOWN_PROTOCOL_TYPE,
-            DescribeGroupsResponse.UNKNOWN_PROTOCOL, Collections.emptyList(), 
AUTHORIZED_OPERATIONS_OMITTED);
+            DescribeGroupsResponse.UNKNOWN_PROTOCOL, List.of(), 
AUTHORIZED_OPERATIONS_OMITTED);
     }
 
     public static DescribedGroup groupError(String groupId, Errors error, 
String errorMessage) {
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/DescribeQuorumRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/DescribeQuorumRequest.java
index e5f6b00f9fc..569d58f1159 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/DescribeQuorumRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/DescribeQuorumRequest.java
@@ -24,7 +24,6 @@ import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.Readable;
 
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 import java.util.stream.Collectors;
 
@@ -61,10 +60,10 @@ public class DescribeQuorumRequest extends AbstractRequest {
 
     public static DescribeQuorumRequestData singletonRequest(TopicPartition 
topicPartition) {
         return new DescribeQuorumRequestData()
-            .setTopics(Collections.singletonList(
+            .setTopics(List.of(
                 new DescribeQuorumRequestData.TopicData()
                     .setTopicName(topicPartition.topic())
-                    .setPartitions(Collections.singletonList(
+                    .setPartitions(List.of(
                         new DescribeQuorumRequestData.PartitionData()
                             .setPartitionIndex(topicPartition.partition()))
             )));
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/DescribeQuorumResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/DescribeQuorumResponse.java
index c3b33d48052..bbc936a7846 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/DescribeQuorumResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/DescribeQuorumResponse.java
@@ -22,8 +22,8 @@ import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.Readable;
 
-import java.util.Collections;
 import java.util.EnumMap;
+import java.util.List;
 import java.util.Map;
 
 /**
@@ -79,9 +79,9 @@ public class DescribeQuorumResponse extends AbstractResponse {
         Errors error
     ) {
         return new DescribeQuorumResponseData()
-            .setTopics(Collections.singletonList(new 
DescribeQuorumResponseData.TopicData()
+            .setTopics(List.of(new DescribeQuorumResponseData.TopicData()
                 .setTopicName(topicPartition.topic())
-                .setPartitions(Collections.singletonList(new 
DescribeQuorumResponseData.PartitionData()
+                .setPartitions(List.of(new 
DescribeQuorumResponseData.PartitionData()
                     .setPartitionIndex(topicPartition.partition())
                     .setErrorCode(error.code())
                     .setErrorMessage(error.message())))));
@@ -94,9 +94,9 @@ public class DescribeQuorumResponse extends AbstractResponse {
         DescribeQuorumResponseData.NodeCollection nodes
     ) {
         DescribeQuorumResponseData res = new DescribeQuorumResponseData()
-            .setTopics(Collections.singletonList(new 
DescribeQuorumResponseData.TopicData()
+            .setTopics(List.of(new DescribeQuorumResponseData.TopicData()
                 .setTopicName(topicPartition.topic())
-                .setPartitions(Collections.singletonList(partitionData
+                .setPartitions(List.of(partitionData
                     .setPartitionIndex(topicPartition.partition())))));
 
         if (nodes != null)
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/DescribeTopicPartitionsRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/DescribeTopicPartitionsRequest.java
index f65ef91db54..0054bd8a8f0 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/DescribeTopicPartitionsRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/DescribeTopicPartitionsRequest.java
@@ -23,7 +23,6 @@ import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.Readable;
 
-import java.util.Collections;
 import java.util.List;
 
 public class DescribeTopicPartitionsRequest extends AbstractRequest {
@@ -83,7 +82,7 @@ public class DescribeTopicPartitionsRequest extends 
AbstractRequest {
                 .setName(topic.name())
                 .setErrorCode(error.code())
                 .setIsInternal(false)
-                .setPartitions(Collections.emptyList())
+                .setPartitions(List.of())
             );
         }
         responseData.setThrottleTimeMs(throttleTimeMs);
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/ElectLeadersRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/ElectLeadersRequest.java
index 8ed9cb67671..c48098ccd4b 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/ElectLeadersRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/ElectLeadersRequest.java
@@ -30,7 +30,6 @@ import org.apache.kafka.common.protocol.Readable;
 
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.List;
 import java.util.Set;
 import java.util.stream.Collectors;
@@ -92,7 +91,7 @@ public class ElectLeadersRequest extends AbstractRequest {
 
     public Set<TopicPartition> topicPartitions() {
         if (this.data.topicPartitions() == null) {
-            return Collections.emptySet();
+            return Set.of();
         }
         return this.data.topicPartitions().stream()
             .flatMap(topicPartition -> topicPartition.partitions().stream()
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/EndQuorumEpochRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/EndQuorumEpochRequest.java
index de45ca457cb..e6b9c2a8486 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/EndQuorumEpochRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/EndQuorumEpochRequest.java
@@ -24,7 +24,6 @@ import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.Readable;
 
-import java.util.Collections;
 import java.util.List;
 import java.util.stream.Collectors;
 
@@ -84,10 +83,10 @@ public class EndQuorumEpochRequest extends AbstractRequest {
                                                              List<Integer> 
preferredSuccessors) {
         return new EndQuorumEpochRequestData()
                    .setClusterId(clusterId)
-                   .setTopics(Collections.singletonList(
+                   .setTopics(List.of(
                        new EndQuorumEpochRequestData.TopicData()
                            .setTopicName(topicPartition.topic())
-                           .setPartitions(Collections.singletonList(
+                           .setPartitions(List.of(
                                new EndQuorumEpochRequestData.PartitionData()
                                    
.setPartitionIndex(topicPartition.partition())
                                    .setLeaderEpoch(leaderEpoch)
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java 
b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
index 762acd09387..8bce0c775e2 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
@@ -30,7 +30,6 @@ import org.apache.kafka.common.protocol.Readable;
 import org.apache.kafka.common.record.internal.RecordBatch;
 
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -159,8 +158,8 @@ public class FetchRequest extends AbstractRequest {
         private IsolationLevel isolationLevel = 
IsolationLevel.READ_UNCOMMITTED;
         private int maxBytes = DEFAULT_RESPONSE_MAX_BYTES;
         private FetchMetadata metadata = FetchMetadata.LEGACY;
-        private List<TopicIdPartition> removed = Collections.emptyList();
-        private List<TopicIdPartition> replaced = Collections.emptyList();
+        private List<TopicIdPartition> removed = List.of();
+        private List<TopicIdPartition> replaced = List.of();
         private String rackId = "";
 
         public static Builder forConsumer(short maxVersion, int maxWait, int 
minBytes, Map<TopicPartition, PartitionData> fetchData) {
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java 
b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
index d489906c735..eee7daee2ae 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
@@ -29,7 +29,6 @@ import org.apache.kafka.common.record.internal.MemoryRecords;
 import org.apache.kafka.common.record.internal.Records;
 
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.EnumMap;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
@@ -166,7 +165,7 @@ public class FetchResponse extends AbstractResponse {
                              FetchResponseData.PartitionData>> partIterator) {
         // Since the throttleTimeMs and metadata field sizes are constant and 
fixed, we can
         // use arbitrary values here without affecting the result.
-        FetchResponseData data = toMessage(Errors.NONE, 0, INVALID_SESSION_ID, 
partIterator, Collections.emptyList());
+        FetchResponseData data = toMessage(Errors.NONE, 0, INVALID_SESSION_ID, 
partIterator, List.of());
         ObjectSerializationCache cache = new ObjectSerializationCache();
         return 4 + data.size(cache, version);
     }
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorRequest.java
index b14fae5cfac..cda50c1732f 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorRequest.java
@@ -25,7 +25,8 @@ import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.Readable;
 
-import java.util.Collections;
+import java.util.List;
+
 
 public class FindCoordinatorRequest extends AbstractRequest {
 
@@ -52,10 +53,10 @@ public class FindCoordinatorRequest extends AbstractRequest 
{
                         "because we require features supported only in " + 
MIN_BATCHED_VERSION + " or later.");
                 if (batchedKeys == 1) {
                     data.setKey(data.coordinatorKeys().get(0));
-                    data.setCoordinatorKeys(Collections.emptyList());
+                    data.setCoordinatorKeys(List.of());
                 }
             } else if (batchedKeys == 0 && data.key() != null) {
-                data.setCoordinatorKeys(Collections.singletonList(data.key()));
+                data.setCoordinatorKeys(List.of(data.key()));
                 data.setKey(""); // default value
             }
             return new FindCoordinatorRequest(data, version);
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java
index 5bd08934b62..fb997b05d61 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java
@@ -24,7 +24,6 @@ import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.Readable;
 
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.EnumMap;
 import java.util.List;
 import java.util.Map;
@@ -132,7 +131,7 @@ public class FindCoordinatorResponse extends 
AbstractResponse {
                     .setNodeId(data.nodeId())
                     .setHost(data.host())
                     .setPort(data.port());
-            return Collections.singletonList(coordinator);
+            return List.of(coordinator);
         }
     }
 
@@ -148,7 +147,7 @@ public class FindCoordinatorResponse extends 
AbstractResponse {
 
     public static FindCoordinatorResponse prepareResponse(Errors error, String 
key, Node node) {
         FindCoordinatorResponseData data = new FindCoordinatorResponseData();
-        data.setCoordinators(Collections.singletonList(
+        data.setCoordinators(List.of(
             prepareCoordinatorResponse(error, key, node)
         ));
         return new FindCoordinatorResponse(data);
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java 
b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
index 9d75d383bab..30649b27303 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
@@ -25,7 +25,8 @@ import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.Readable;
 
-import java.util.Collections;
+import java.util.List;
+
 
 public class JoinGroupRequest extends AbstractRequest {
 
@@ -197,7 +198,7 @@ public class JoinGroupRequest extends AbstractRequest {
             .setProtocolName(UNKNOWN_PROTOCOL_NAME)
             .setLeader(UNKNOWN_MEMBER_ID)
             .setMemberId(UNKNOWN_MEMBER_ID)
-            .setMembers(Collections.emptyList());
+            .setMembers(List.of());
 
         if (version() >= 7)
             data.setProtocolName(null);
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupRequest.java 
b/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupRequest.java
index 2dd69afab98..4a97c05037b 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupRequest.java
@@ -25,7 +25,6 @@ import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.MessageUtil;
 import org.apache.kafka.common.protocol.Readable;
 
-import java.util.Collections;
 import java.util.List;
 
 public class LeaveGroupRequest extends AbstractRequest {
@@ -97,14 +96,14 @@ public class LeaveGroupRequest extends AbstractRequest {
         } else {
             return new LeaveGroupRequestData()
                 .setGroupId(data.groupId())
-                .setMembers(Collections.singletonList(
+                .setMembers(List.of(
                     new MemberIdentity().setMemberId(data.memberId())));
         }
     }
 
     public List<MemberIdentity> members() {
         // Before version 3, leave group request is still in single mode
-        return version() <= 2 ? Collections.singletonList(
+        return version() <= 2 ? List.of(
             new MemberIdentity()
                 .setMemberId(data.memberId())) : data.members();
     }
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsRequest.java 
b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsRequest.java
index 84f7cc2a72d..ccdf356aff8 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsRequest.java
@@ -24,7 +24,6 @@ import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.Readable;
 
-import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 
@@ -86,7 +85,7 @@ public class ListGroupsRequest extends AbstractRequest {
     @Override
     public ListGroupsResponse getErrorResponse(int throttleTimeMs, Throwable 
e) {
         ListGroupsResponseData listGroupsResponseData = new 
ListGroupsResponseData().
-            setGroups(Collections.emptyList()).
+            setGroups(List.of()).
             setErrorCode(Errors.forException(e).code());
         if (version() >= 1) {
             listGroupsResponseData.setThrottleTimeMs(throttleTimeMs);
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetsResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetsResponse.java
index a1dc329054c..7cab5d4bb50 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetsResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetsResponse.java
@@ -25,7 +25,6 @@ import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.Readable;
 import org.apache.kafka.common.record.internal.RecordBatch;
 
-import java.util.Collections;
 import java.util.EnumMap;
 import java.util.List;
 import java.util.Map;
@@ -105,7 +104,7 @@ public class ListOffsetsResponse extends AbstractResponse {
     public static ListOffsetsTopicResponse 
singletonListOffsetsTopicResponse(TopicPartition tp, Errors error, long 
timestamp, long offset, int epoch) {
         return new ListOffsetsTopicResponse()
                  .setName(tp.topic())
-                 .setPartitions(Collections.singletonList(new 
ListOffsetsPartitionResponse()
+                 .setPartitions(List.of(new ListOffsetsPartitionResponse()
                          .setPartitionIndex(tp.partition())
                          .setErrorCode(error.code())
                          .setTimestamp(timestamp)
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java 
b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
index d3dcabfb4f9..392bf2a9105 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
@@ -26,7 +26,6 @@ import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.Readable;
 
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
@@ -173,7 +172,7 @@ public class MetadataRequest extends AbstractRequest {
                     .setTopicId(topic.topicId())
                     .setErrorCode(error.code())
                     .setIsInternal(false)
-                    .setPartitions(Collections.emptyList()));
+                    .setPartitions(List.of()));
             }
         }
 
@@ -200,9 +199,9 @@ public class MetadataRequest extends AbstractRequest {
 
     public List<Uuid> topicIds() {
         if (isAllTopics())
-            return Collections.emptyList();
+            return List.of();
         else if (version() < 10)
-            return Collections.emptyList();
+            return List.of();
         else
             return data.topics()
                     .stream()
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
index 9b8eed7cb3b..3cb9588c4b7 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
@@ -33,7 +33,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -228,7 +227,7 @@ public class OffsetFetchRequest extends AbstractRequest {
                 );
             }
 
-            return Collections.singletonList(group);
+            return List.of(group);
         }
     }
 
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java 
b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
index e733fa9eda4..9b922d06606 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
@@ -30,9 +30,9 @@ import org.apache.kafka.common.record.internal.RecordBatch;
 import org.apache.kafka.common.record.internal.Records;
 import org.apache.kafka.common.utils.Utils;
 
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
@@ -174,7 +174,7 @@ public class ProduceRequest extends AbstractRequest {
             }
             tpr.partitionResponses().add(new 
ProduceResponseData.PartitionProduceResponse()
                     .setIndex(tpId.partition())
-                    .setRecordErrors(Collections.emptyList())
+                    .setRecordErrors(List.of())
                     .setBaseOffset(INVALID_OFFSET)
                     .setLogAppendTimeMs(RecordBatch.NO_TIMESTAMP)
                     .setLogStartOffset(INVALID_OFFSET)
@@ -187,7 +187,7 @@ public class ProduceRequest extends AbstractRequest {
     @Override
     public Map<Errors, Integer> errorCounts(Throwable e) {
         Errors error = Errors.forException(e);
-        return Collections.singletonMap(error, partitionSizes().size());
+        return Map.of(error, partitionSizes().size());
     }
 
     public short acks() {
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java 
b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
index df3bff0c32f..9d7817e4902 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
@@ -25,7 +25,6 @@ import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.Readable;
 import org.apache.kafka.common.record.internal.RecordBatch;
 
-import java.util.Collections;
 import java.util.EnumMap;
 import java.util.List;
 import java.util.Map;
@@ -74,7 +73,7 @@ public class ProduceResponse extends AbstractResponse {
      */
     @Deprecated
     public ProduceResponse(Map<TopicIdPartition, PartitionResponse> responses) 
{
-        this(responses, DEFAULT_THROTTLE_TIME, Collections.emptyList());
+        this(responses, DEFAULT_THROTTLE_TIME, List.of());
     }
 
     /**
@@ -85,7 +84,7 @@ public class ProduceResponse extends AbstractResponse {
      */
     @Deprecated
     public ProduceResponse(Map<TopicIdPartition, PartitionResponse> responses, 
int throttleTimeMs) {
-        this(toData(responses, throttleTimeMs, Collections.emptyList()));
+        this(toData(responses, throttleTimeMs, List.of()));
     }
 
     /**
@@ -170,11 +169,11 @@ public class ProduceResponse extends AbstractResponse {
         }
 
         public PartitionResponse(Errors error, String errorMessage) {
-            this(error, INVALID_OFFSET, RecordBatch.NO_TIMESTAMP, 
INVALID_OFFSET, Collections.emptyList(), errorMessage);
+            this(error, INVALID_OFFSET, RecordBatch.NO_TIMESTAMP, 
INVALID_OFFSET, List.of(), errorMessage);
         }
 
         public PartitionResponse(Errors error, long baseOffset, long 
logAppendTime, long logStartOffset) {
-            this(error, baseOffset, logAppendTime, logStartOffset, 
Collections.emptyList(), null);
+            this(error, baseOffset, logAppendTime, logStartOffset, List.of(), 
null);
         }
 
         public PartitionResponse(Errors error, long baseOffset, long 
logAppendTime, long logStartOffset, List<RecordError> recordErrors) {
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/RemoveRaftVoterResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/RemoveRaftVoterResponse.java
index 271cfde8cff..852f7bcc33d 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/RemoveRaftVoterResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/RemoveRaftVoterResponse.java
@@ -22,7 +22,6 @@ import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.Readable;
 
-import java.util.Collections;
 import java.util.Map;
 
 public class RemoveRaftVoterResponse extends AbstractResponse {
@@ -51,9 +50,9 @@ public class RemoveRaftVoterResponse extends AbstractResponse 
{
     @Override
     public Map<Errors, Integer> errorCounts() {
         if (data.errorCode() != Errors.NONE.code()) {
-            return Collections.singletonMap(Errors.forCode(data.errorCode()), 
1);
+            return Map.of(Errors.forCode(data.errorCode()), 1);
         } else {
-            return Collections.emptyMap();
+            return Map.of();
         }
     }
 
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/ShareFetchResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/ShareFetchResponse.java
index 64251a0c5fd..fe725fa9dac 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/ShareFetchResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/ShareFetchResponse.java
@@ -29,7 +29,6 @@ import org.apache.kafka.common.record.internal.MemoryRecords;
 import org.apache.kafka.common.record.internal.Records;
 
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.EnumMap;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
@@ -139,7 +138,7 @@ public class ShareFetchResponse extends AbstractResponse {
                              Iterator<Map.Entry<TopicIdPartition, 
ShareFetchResponseData.PartitionData>> partIterator) {
         // Since the throttleTimeMs and metadata field sizes are constant and 
fixed, we can
         // use arbitrary values here without affecting the result.
-        ShareFetchResponseData data = toMessage(Errors.NONE, 0, partIterator, 
Collections.emptyList(), 0);
+        ShareFetchResponseData data = toMessage(Errors.NONE, 0, partIterator, 
List.of(), 0);
         ObjectSerializationCache cache = new ObjectSerializationCache();
         return 4 + data.size(cache, version);
     }
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/ShareGroupHeartbeatResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/ShareGroupHeartbeatResponse.java
index 5d8426e2650..6df71cbb5c1 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/ShareGroupHeartbeatResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/ShareGroupHeartbeatResponse.java
@@ -23,7 +23,6 @@ import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.Readable;
 
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -56,7 +55,7 @@ public class ShareGroupHeartbeatResponse extends 
AbstractResponse {
 
     @Override
     public Map<Errors, Integer> errorCounts() {
-        return Collections.singletonMap(Errors.forCode(data.errorCode()), 1);
+        return Map.of(Errors.forCode(data.errorCode()), 1);
     }
 
     @Override
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupHeartbeatResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupHeartbeatResponse.java
index 87c10a98d37..68f2a460c4f 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupHeartbeatResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupHeartbeatResponse.java
@@ -59,7 +59,7 @@ public class StreamsGroupHeartbeatResponse extends 
AbstractResponse {
 
     @Override
     public Map<Errors, Integer> errorCounts() {
-        return Collections.singletonMap(Errors.forCode(data.errorCode()), 1);
+        return Map.of(Errors.forCode(data.errorCode()), 1);
     }
 
     @Override
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/UpdateFeaturesRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/UpdateFeaturesRequest.java
index 35b4cce2095..8a4f7bd7c27 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/UpdateFeaturesRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/UpdateFeaturesRequest.java
@@ -22,7 +22,7 @@ import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Readable;
 
 import java.util.Collection;
-import java.util.Collections;
+import java.util.Set;
 import java.util.stream.Collectors;
 
 public class UpdateFeaturesRequest extends AbstractRequest {
@@ -105,7 +105,7 @@ public class UpdateFeaturesRequest extends AbstractRequest {
     public UpdateFeaturesResponse getErrorResponse(int throttleTimeMs, 
Throwable e) {
         return UpdateFeaturesResponse.createWithErrors(
             ApiError.fromThrowable(e),
-            Collections.emptySet(),
+            Set.of(),
             throttleTimeMs
         );
     }
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/UpdateRaftVoterResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/UpdateRaftVoterResponse.java
index f52157234fa..8d5420c9eb0 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/UpdateRaftVoterResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/UpdateRaftVoterResponse.java
@@ -22,7 +22,6 @@ import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.Readable;
 
-import java.util.Collections;
 import java.util.Map;
 
 public class UpdateRaftVoterResponse extends AbstractResponse {
@@ -51,9 +50,9 @@ public class UpdateRaftVoterResponse extends AbstractResponse 
{
     @Override
     public Map<Errors, Integer> errorCounts() {
         if (data.errorCode() != Errors.NONE.code()) {
-            return Collections.singletonMap(Errors.forCode(data.errorCode()), 
1);
+            return Map.of(Errors.forCode(data.errorCode()), 1);
         } else {
-            return Collections.emptyMap();
+            return Map.of();
         }
     }
 
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/VoteRequest.java 
b/clients/src/main/java/org/apache/kafka/common/requests/VoteRequest.java
index 73217558367..91b1cafebc7 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/VoteRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/VoteRequest.java
@@ -23,7 +23,8 @@ import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.Readable;
 
-import java.util.Collections;
+import java.util.List;
+
 
 public class VoteRequest extends AbstractRequest {
 
@@ -77,10 +78,10 @@ public class VoteRequest extends AbstractRequest {
                                                    boolean preVote) {
         return new VoteRequestData()
                    .setClusterId(clusterId)
-                   .setTopics(Collections.singletonList(
+                   .setTopics(List.of(
                        new VoteRequestData.TopicData()
                            .setTopicName(topicPartition.topic())
-                           .setPartitions(Collections.singletonList(
+                           .setPartitions(List.of(
                                new VoteRequestData.PartitionData()
                                    
.setPartitionIndex(topicPartition.partition())
                                    .setReplicaEpoch(replicaEpoch)
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
index 3d32e98f8b4..2081d5bd5f4 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
@@ -766,7 +766,7 @@ public class GroupCoordinatorService implements 
GroupCoordinator {
                         Map<Integer, PartitionErrorData> partitionErrors =
                             Optional.ofNullable(topicPartitionErrorsMap)
                                 .map(map -> map.get(topic.topicId()))
-                                .orElse(Collections.emptyMap());
+                                .orElse(Map.of());
                         PartitionErrorData error = 
partitionErrors.get(partition.partitionIndex());
                         if (error == null) {
                             partitionData = partition.duplicate();
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index 09714ab28b8..837caddcb03 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -2166,7 +2166,7 @@ public class GroupMetadataManager {
             
response.setEndpointInformationEpoch(group.endpointInformationEpoch());
         }
 
-        Map<String, CreatableTopic> internalTopicsToBeCreated = 
Collections.emptyMap();
+        Map<String, CreatableTopic> internalTopicsToBeCreated = Map.of();
         if 
(updatedConfiguredTopology.topicConfigurationException().isPresent()) {
             TopicConfigurationException exception = 
updatedConfiguredTopology.topicConfigurationException().get();
             internalTopicsToBeCreated = 
updatedConfiguredTopology.internalTopicsToBeCreated();
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
index b1a5b026654..a74871871a6 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
@@ -1275,11 +1275,11 @@ public class StreamsGroup implements Group {
 
             // Search for the partition in assigned tasks, then in tasks 
pending revocation
             Integer assignmentEpoch = assignedTasks.activeTasksWithEpochs()
-                .getOrDefault(subtopologyId, Collections.emptyMap())
+                .getOrDefault(subtopologyId, Map.of())
                 .get(partitionId);
             if (assignmentEpoch == null) {
                 assignmentEpoch = 
tasksPendingRevocation.activeTasksWithEpochs()
-                    .getOrDefault(subtopologyId, Collections.emptyMap())
+                    .getOrDefault(subtopologyId, Map.of())
                     .get(partitionId);
             }
 
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMember.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMember.java
index 887472132a6..a43431ce47e 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMember.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMember.java
@@ -287,7 +287,7 @@ public record StreamsGroupMember(String memberId,
                 .setClientId("")
                 .setClientHost("")
                 .setProcessId("")
-                .setClientTags(Collections.emptyMap())
+                .setClientTags(Map.of())
                 .setState(MemberState.STABLE)
                 .setMemberEpoch(0)
                 .setPreviousMemberEpoch(0)

Reply via email to