This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 5eff4ceb5f2 MINOR: Replace Collections factory methods with Java 11+
equivalents in group-coordinator and part of clients (#21876)
5eff4ceb5f2 is described below
commit 5eff4ceb5f21e8aa4f0c538369923416b7563894
Author: Maros Orsak <[email protected]>
AuthorDate: Mon Mar 30 16:13:04 2026 +0200
MINOR: Replace Collections factory methods with Java 11+ equivalents in
group-coordinator and part of clients (#21876)
This PR is another one with changing old JDK 8 API to JDK 11. It's
changing in the whole `group-coordinator` module and part of clients
(i.e., common/requests, common/errors, common/config). I didn't want to
do the whole... because it would be more bigger change ...
Also I have faced the issue where in `GroupCoordinatorService we use
`Collections.singletonList(null)` so I skipped that as `List.of(...)`
would break the code. Moreover in `AssignorHelpers` there is
`Collections.emptyMap().getClass()` which is using reflection and that I
would also use different class ...
Reviewers: Ken Huang <[email protected]>, PoAn Yang
<[email protected]>, Chia-Ping Tsai <[email protected]>
---
.../apache/kafka/common/config/AbstractConfig.java | 12 +++++-----
.../org/apache/kafka/common/config/ConfigDef.java | 26 +++++++++++-----------
.../config/internals/BrokerSecurityConfigs.java | 5 ++---
.../config/provider/DirectoryConfigProvider.java | 3 +--
.../common/errors/TopicAuthorizationException.java | 3 +--
.../kafka/common/requests/AbstractResponse.java | 3 +--
.../common/requests/AddRaftVoterResponse.java | 5 ++---
.../requests/AllocateProducerIdsResponse.java | 3 +--
.../common/requests/AlterPartitionRequest.java | 3 +--
.../requests/AssignReplicasToDirsResponse.java | 3 +--
.../common/requests/BeginQuorumEpochRequest.java | 7 +++---
.../requests/ConsumerGroupHeartbeatResponse.java | 3 +--
.../requests/ControllerRegistrationResponse.java | 3 +--
.../kafka/common/requests/DeleteTopicsRequest.java | 3 +--
.../common/requests/DescribeGroupsResponse.java | 3 +--
.../common/requests/DescribeQuorumRequest.java | 5 ++---
.../common/requests/DescribeQuorumResponse.java | 10 ++++-----
.../requests/DescribeTopicPartitionsRequest.java | 3 +--
.../kafka/common/requests/ElectLeadersRequest.java | 3 +--
.../common/requests/EndQuorumEpochRequest.java | 5 ++---
.../apache/kafka/common/requests/FetchRequest.java | 5 ++---
.../kafka/common/requests/FetchResponse.java | 3 +--
.../common/requests/FindCoordinatorRequest.java | 7 +++---
.../common/requests/FindCoordinatorResponse.java | 5 ++---
.../kafka/common/requests/JoinGroupRequest.java | 5 +++--
.../kafka/common/requests/LeaveGroupRequest.java | 5 ++---
.../kafka/common/requests/ListGroupsRequest.java | 3 +--
.../kafka/common/requests/ListOffsetsResponse.java | 3 +--
.../kafka/common/requests/MetadataRequest.java | 7 +++---
.../kafka/common/requests/OffsetFetchRequest.java | 3 +--
.../kafka/common/requests/ProduceRequest.java | 6 ++---
.../kafka/common/requests/ProduceResponse.java | 9 ++++----
.../common/requests/RemoveRaftVoterResponse.java | 5 ++---
.../kafka/common/requests/ShareFetchResponse.java | 3 +--
.../requests/ShareGroupHeartbeatResponse.java | 3 +--
.../requests/StreamsGroupHeartbeatResponse.java | 2 +-
.../common/requests/UpdateFeaturesRequest.java | 4 ++--
.../common/requests/UpdateRaftVoterResponse.java | 5 ++---
.../apache/kafka/common/requests/VoteRequest.java | 7 +++---
.../coordinator/group/GroupCoordinatorService.java | 2 +-
.../coordinator/group/GroupMetadataManager.java | 2 +-
.../coordinator/group/streams/StreamsGroup.java | 4 ++--
.../group/streams/StreamsGroupMember.java | 2 +-
43 files changed, 93 insertions(+), 118 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
index e271cd99c4c..39d015ec82c 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
@@ -133,7 +133,7 @@ public class AbstractConfig {
* @param originals the configuration properties plus any optional config
provider properties; may not be null
*/
public AbstractConfig(ConfigDef definition, Map<?, ?> originals) {
- this(definition, originals, Collections.emptyMap(), true);
+ this(definition, originals, Map.of(), true);
}
/**
@@ -146,7 +146,7 @@ public class AbstractConfig {
* @param doLog whether the configurations should be logged
*/
public AbstractConfig(ConfigDef definition, Map<?, ?> originals, boolean
doLog) {
- this(definition, originals, Collections.emptyMap(), doLog);
+ this(definition, originals, Map.of(), doLog);
}
/**
@@ -168,7 +168,7 @@ public class AbstractConfig {
* @return a map of updates that should be applied to the configuration
(will be validated to prevent bad updates)
*/
protected Map<String, Object> postProcessParsedConfig(Map<String, Object>
parsedValues) {
- return Collections.emptyMap();
+ return Map.of();
}
protected Object get(String key) {
@@ -430,7 +430,7 @@ public class AbstractConfig {
* @return A configured instance of the class
*/
public <T> T getConfiguredInstance(String key, Class<T> t) {
- return getConfiguredInstance(key, t, Collections.emptyMap());
+ return getConfiguredInstance(key, t, Map.of());
}
/**
@@ -458,7 +458,7 @@ public class AbstractConfig {
* @return The list of configured instances
*/
public <T> List<T> getConfiguredInstances(String key, Class<T> t) {
- return getConfiguredInstances(key, t, Collections.emptyMap());
+ return getConfiguredInstances(key, t, Map.of());
}
/**
@@ -606,7 +606,7 @@ public class AbstractConfig {
final String configProviders =
indirectConfigs.get(CONFIG_PROVIDERS_CONFIG);
if (configProviders == null || configProviders.isEmpty()) {
- return Collections.emptyMap();
+ return Map.of();
}
Map<String, String> providerMap = new HashMap<>();
diff --git
a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
index dd1e4898eaf..efe7f0a1ecc 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
@@ -218,7 +218,7 @@ public class ConfigDef {
*/
public ConfigDef define(String name, Type type, Object defaultValue,
Validator validator, Importance importance, String documentation,
String group, int orderInGroup, Width width,
String displayName, Recommender recommender) {
- return define(name, type, defaultValue, validator, importance,
documentation, group, orderInGroup, width, displayName,
Collections.emptyList(), recommender);
+ return define(name, type, defaultValue, validator, importance,
documentation, group, orderInGroup, width, displayName, List.of(), recommender);
}
/**
@@ -237,7 +237,7 @@ public class ConfigDef {
*/
public ConfigDef define(String name, Type type, Object defaultValue,
Validator validator, Importance importance, String documentation,
String group, int orderInGroup, Width width,
String displayName) {
- return define(name, type, defaultValue, validator, importance,
documentation, group, orderInGroup, width, displayName,
Collections.emptyList());
+ return define(name, type, defaultValue, validator, importance,
documentation, group, orderInGroup, width, displayName, List.of());
}
/**
@@ -295,7 +295,7 @@ public class ConfigDef {
*/
public ConfigDef define(String name, Type type, Object defaultValue,
Importance importance, String documentation,
String group, int orderInGroup, Width width,
String displayName, Recommender recommender) {
- return define(name, type, defaultValue, null, importance,
documentation, group, orderInGroup, width, displayName,
Collections.emptyList(), recommender);
+ return define(name, type, defaultValue, null, importance,
documentation, group, orderInGroup, width, displayName, List.of(), recommender);
}
/**
@@ -313,7 +313,7 @@ public class ConfigDef {
*/
public ConfigDef define(String name, Type type, Object defaultValue,
Importance importance, String documentation,
String group, int orderInGroup, Width width,
String displayName) {
- return define(name, type, defaultValue, null, importance,
documentation, group, orderInGroup, width, displayName,
Collections.emptyList());
+ return define(name, type, defaultValue, null, importance,
documentation, group, orderInGroup, width, displayName, List.of());
}
/**
@@ -368,7 +368,7 @@ public class ConfigDef {
*/
public ConfigDef define(String name, Type type, Importance importance,
String documentation, String group, int orderInGroup,
Width width, String displayName, Recommender
recommender) {
- return define(name, type, NO_DEFAULT_VALUE, null, importance,
documentation, group, orderInGroup, width, displayName,
Collections.emptyList(), recommender);
+ return define(name, type, NO_DEFAULT_VALUE, null, importance,
documentation, group, orderInGroup, width, displayName, List.of(), recommender);
}
/**
@@ -385,7 +385,7 @@ public class ConfigDef {
*/
public ConfigDef define(String name, Type type, Importance importance,
String documentation, String group, int orderInGroup,
Width width, String displayName) {
- return define(name, type, NO_DEFAULT_VALUE, null, importance,
documentation, group, orderInGroup, width, displayName,
Collections.emptyList());
+ return define(name, type, NO_DEFAULT_VALUE, null, importance,
documentation, group, orderInGroup, width, displayName, List.of());
}
/**
@@ -427,7 +427,7 @@ public class ConfigDef {
*/
public ConfigDef define(String name, Type type, Object defaultValue,
Importance importance, String documentation, String alternativeString) {
return define(name, type, defaultValue, null, importance,
documentation, null, -1, Width.NONE,
- name, Collections.emptyList(), null, alternativeString);
+ name, List.of(), null, alternativeString);
}
/**
@@ -452,7 +452,7 @@ public class ConfigDef {
* @return This ConfigDef so you can chain calls
*/
public ConfigDef defineInternal(final String name, final Type type, final
Object defaultValue, final Importance importance) {
- return define(new ConfigKey(name, type, defaultValue, null,
importance, "", "", -1, Width.NONE, name, Collections.emptyList(), null, true,
null));
+ return define(new ConfigKey(name, type, defaultValue, null,
importance, "", "", -1, Width.NONE, name, List.of(), null, true, null));
}
/**
@@ -467,7 +467,7 @@ public class ConfigDef {
* @return This ConfigDef so you can chain calls
*/
public ConfigDef defineInternal(final String name, final Type type, final
Object defaultValue, final Validator validator, final Importance importance,
final String documentation) {
- return define(new ConfigKey(name, type, defaultValue, validator,
importance, documentation, "", -1, Width.NONE, name, Collections.emptyList(),
null, true, null));
+ return define(new ConfigKey(name, type, defaultValue, validator,
importance, documentation, "", -1, Width.NONE, name, List.of(), null, true,
null));
}
/**
@@ -768,7 +768,7 @@ public class ConfigDef {
return value;
else if (value instanceof String)
if (trimmed.isEmpty())
- return Collections.emptyList();
+ return List.of();
else
return
Arrays.asList(COMMA_WITH_WHITESPACE.split(trimmed, -1));
else
@@ -1445,7 +1445,7 @@ public class ConfigDef {
}
public String toHtmlTable() {
- return toHtmlTable(Collections.emptyMap());
+ return toHtmlTable(Map.of());
}
private void addHeader(StringBuilder builder, String headerName) {
@@ -1700,7 +1700,7 @@ public class ConfigDef {
}
public String toHtml() {
- return toHtml(Collections.emptyMap());
+ return toHtml(Map.of());
}
/**
@@ -1709,7 +1709,7 @@ public class ConfigDef {
* @param idGenerator A function for computing the HTML id attribute in
the generated HTML from a given config name.
*/
public String toHtml(int headerDepth, Function<String, String>
idGenerator) {
- return toHtml(headerDepth, idGenerator, Collections.emptyMap());
+ return toHtml(headerDepth, idGenerator, Map.of());
}
/**
diff --git
a/clients/src/main/java/org/apache/kafka/common/config/internals/BrokerSecurityConfigs.java
b/clients/src/main/java/org/apache/kafka/common/config/internals/BrokerSecurityConfigs.java
index c5289676439..339acab0a22 100644
---
a/clients/src/main/java/org/apache/kafka/common/config/internals/BrokerSecurityConfigs.java
+++
b/clients/src/main/java/org/apache/kafka/common/config/internals/BrokerSecurityConfigs.java
@@ -25,7 +25,6 @@ import
org.apache.kafka.common.security.auth.KafkaPrincipalBuilder;
import
org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder;
import org.apache.kafka.common.utils.Utils;
-import java.util.Collections;
import java.util.List;
import static org.apache.kafka.common.config.ConfigDef.Importance.LOW;
@@ -61,7 +60,7 @@ public class BrokerSecurityConfigs {
" configuration.";
public static final String SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES_CONFIG =
"sasl.kerberos.principal.to.local.rules";
- public static final List<String>
DEFAULT_SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES =
Collections.singletonList(DEFAULT_SSL_PRINCIPAL_MAPPING_RULES);
+ public static final List<String>
DEFAULT_SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES =
List.of(DEFAULT_SSL_PRINCIPAL_MAPPING_RULES);
public static final String SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES_DOC = "A
list of rules for mapping from principal " +
"names to short names (typically operating system usernames). The
rules are evaluated in order and the " +
"first rule that matches a principal name is used to map it to a
short name. Any later rules in the list are " +
@@ -95,7 +94,7 @@ public class BrokerSecurityConfigs {
+ "</ul>";
public static final String SASL_ENABLED_MECHANISMS_CONFIG =
"sasl.enabled.mechanisms";
- public static final List<String> DEFAULT_SASL_ENABLED_MECHANISMS =
Collections.singletonList(SaslConfigs.GSSAPI_MECHANISM);
+ public static final List<String> DEFAULT_SASL_ENABLED_MECHANISMS =
List.of(SaslConfigs.GSSAPI_MECHANISM);
public static final String SASL_ENABLED_MECHANISMS_DOC = "The list of SASL
mechanisms enabled in the Kafka server. "
+ "The list may contain any mechanism for which a security
provider is available. "
+ "Only GSSAPI is enabled by default.";
diff --git
a/clients/src/main/java/org/apache/kafka/common/config/provider/DirectoryConfigProvider.java
b/clients/src/main/java/org/apache/kafka/common/config/provider/DirectoryConfigProvider.java
index 3e0fa5ed772..aecd29be6ff 100644
---
a/clients/src/main/java/org/apache/kafka/common/config/provider/DirectoryConfigProvider.java
+++
b/clients/src/main/java/org/apache/kafka/common/config/provider/DirectoryConfigProvider.java
@@ -32,7 +32,6 @@ import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
-import static java.util.Collections.emptyMap;
/**
* An implementation of {@link ConfigProvider} based on a directory of files.
@@ -87,7 +86,7 @@ public class DirectoryConfigProvider implements
ConfigProvider {
throw new IllegalStateException("The provider has not been
configured yet.");
}
- Map<String, String> map = emptyMap();
+ Map<String, String> map = Map.of();
if (path != null && !path.isEmpty()) {
Path dir = allowedPaths.parseUntrustedPath(path);
diff --git
a/clients/src/main/java/org/apache/kafka/common/errors/TopicAuthorizationException.java
b/clients/src/main/java/org/apache/kafka/common/errors/TopicAuthorizationException.java
index e2235f804e1..0b4b210e1f6 100644
---
a/clients/src/main/java/org/apache/kafka/common/errors/TopicAuthorizationException.java
+++
b/clients/src/main/java/org/apache/kafka/common/errors/TopicAuthorizationException.java
@@ -16,7 +16,6 @@
*/
package org.apache.kafka.common.errors;
-import java.util.Collections;
import java.util.Set;
public class TopicAuthorizationException extends AuthorizationException {
@@ -32,7 +31,7 @@ public class TopicAuthorizationException extends
AuthorizationException {
}
public TopicAuthorizationException(String message) {
- this(message, Collections.emptySet());
+ this(message, Set.of());
}
/**
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
index bc313078d74..7d96d1a1731 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
@@ -26,7 +26,6 @@ import org.apache.kafka.common.protocol.SendBuilder;
import java.nio.ByteBuffer;
import java.util.Collection;
-import java.util.Collections;
import java.util.EnumMap;
import java.util.Map;
import java.util.stream.Collectors;
@@ -65,7 +64,7 @@ public abstract class AbstractResponse implements
AbstractRequestResponse {
public abstract Map<Errors, Integer> errorCounts();
protected static Map<Errors, Integer> errorCounts(Errors error) {
- return Collections.singletonMap(error, 1);
+ return Map.of(error, 1);
}
protected static Map<Errors, Integer> errorCounts(Stream<Errors> errors) {
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/AddRaftVoterResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/AddRaftVoterResponse.java
index 52a0cb05feb..1efaed61b99 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/AddRaftVoterResponse.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/AddRaftVoterResponse.java
@@ -22,7 +22,6 @@ import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.Readable;
-import java.util.Collections;
import java.util.Map;
public class AddRaftVoterResponse extends AbstractResponse {
@@ -51,9 +50,9 @@ public class AddRaftVoterResponse extends AbstractResponse {
@Override
public Map<Errors, Integer> errorCounts() {
if (data.errorCode() != Errors.NONE.code()) {
- return Collections.singletonMap(Errors.forCode(data.errorCode()),
1);
+ return Map.of(Errors.forCode(data.errorCode()), 1);
} else {
- return Collections.emptyMap();
+ return Map.of();
}
}
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/AllocateProducerIdsResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/AllocateProducerIdsResponse.java
index 4c476511931..2575e05abb5 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/AllocateProducerIdsResponse.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/AllocateProducerIdsResponse.java
@@ -22,7 +22,6 @@ import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.Readable;
-import java.util.Collections;
import java.util.Map;
public class AllocateProducerIdsResponse extends AbstractResponse {
@@ -47,7 +46,7 @@ public class AllocateProducerIdsResponse extends
AbstractResponse {
*/
@Override
public Map<Errors, Integer> errorCounts() {
- return Collections.singletonMap(Errors.forCode(data.errorCode()), 1);
+ return Map.of(Errors.forCode(data.errorCode()), 1);
}
@Override
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionRequest.java
index 2d181f48766..d6a0eccb611 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionRequest.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/AlterPartitionRequest.java
@@ -25,7 +25,6 @@ import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.Readable;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
@@ -90,7 +89,7 @@ public class AlterPartitionRequest extends AbstractRequest {
newIsr.add(brokerState.brokerId())
);
partitionData.setNewIsr(newIsr);
-
partitionData.setNewIsrWithEpochs(Collections.emptyList());
+ partitionData.setNewIsrWithEpochs(List.of());
}
})
);
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/AssignReplicasToDirsResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/AssignReplicasToDirsResponse.java
index 84f86d058ec..d785e807dfc 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/AssignReplicasToDirsResponse.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/AssignReplicasToDirsResponse.java
@@ -21,7 +21,6 @@ import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.Readable;
-import java.util.Collections;
import java.util.Map;
public class AssignReplicasToDirsResponse extends AbstractResponse {
@@ -40,7 +39,7 @@ public class AssignReplicasToDirsResponse extends
AbstractResponse {
@Override
public Map<Errors, Integer> errorCounts() {
- return Collections.singletonMap(Errors.forCode(data.errorCode()), 1);
+ return Map.of(Errors.forCode(data.errorCode()), 1);
}
@Override
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/BeginQuorumEpochRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/BeginQuorumEpochRequest.java
index f3375da9c41..8e7b816a065 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/BeginQuorumEpochRequest.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/BeginQuorumEpochRequest.java
@@ -23,7 +23,8 @@ import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.Readable;
-import java.util.Collections;
+import java.util.List;
+
public class BeginQuorumEpochRequest extends AbstractRequest {
public static class Builder extends
AbstractRequest.Builder<BeginQuorumEpochRequest> {
@@ -75,10 +76,10 @@ public class BeginQuorumEpochRequest extends
AbstractRequest {
) {
return new BeginQuorumEpochRequestData()
.setClusterId(clusterId)
- .setTopics(Collections.singletonList(
+ .setTopics(List.of(
new BeginQuorumEpochRequestData.TopicData()
.setTopicName(topicPartition.topic())
- .setPartitions(Collections.singletonList(
+ .setPartitions(List.of(
new BeginQuorumEpochRequestData.PartitionData()
.setPartitionIndex(topicPartition.partition())
.setLeaderEpoch(leaderEpoch)
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/ConsumerGroupHeartbeatResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/ConsumerGroupHeartbeatResponse.java
index dbbabdcaccd..ebc9c46beb3 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/ConsumerGroupHeartbeatResponse.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/ConsumerGroupHeartbeatResponse.java
@@ -23,7 +23,6 @@ import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.Readable;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@@ -59,7 +58,7 @@ public class ConsumerGroupHeartbeatResponse extends
AbstractResponse {
@Override
public Map<Errors, Integer> errorCounts() {
- return Collections.singletonMap(Errors.forCode(data.errorCode()), 1);
+ return Map.of(Errors.forCode(data.errorCode()), 1);
}
@Override
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/ControllerRegistrationResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/ControllerRegistrationResponse.java
index 9cc53db15a9..d6462f1f501 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/ControllerRegistrationResponse.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/ControllerRegistrationResponse.java
@@ -22,7 +22,6 @@ import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.Readable;
-import java.util.Collections;
import java.util.Map;
public class ControllerRegistrationResponse extends AbstractResponse {
@@ -50,7 +49,7 @@ public class ControllerRegistrationResponse extends
AbstractResponse {
@Override
public Map<Errors, Integer> errorCounts() {
- return Collections.singletonMap(Errors.forCode(data.errorCode()), 1);
+ return Map.of(Errors.forCode(data.errorCode()), 1);
}
public static ControllerRegistrationResponse parse(Readable readable,
short version) {
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsRequest.java
index b90f853211d..dac9f21c169 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsRequest.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsRequest.java
@@ -25,7 +25,6 @@ import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Readable;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
@@ -104,7 +103,7 @@ public class DeleteTopicsRequest extends AbstractRequest {
public List<Uuid> topicIds() {
if (version() >= 6)
return
data.topics().stream().map(DeleteTopicState::topicId).collect(Collectors.toList());
- return Collections.emptyList();
+ return List.of();
}
public List<DeleteTopicState> topics() {
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
index 4d59aee8758..b97526d0dd5 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
@@ -24,7 +24,6 @@ import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.Readable;
import org.apache.kafka.common.utils.Utils;
-import java.util.Collections;
import java.util.EnumMap;
import java.util.List;
import java.util.Map;
@@ -106,7 +105,7 @@ public class DescribeGroupsResponse extends
AbstractResponse {
public static DescribedGroup groupError(String groupId, Errors error) {
return groupMetadata(groupId, error,
DescribeGroupsResponse.UNKNOWN_STATE,
DescribeGroupsResponse.UNKNOWN_PROTOCOL_TYPE,
- DescribeGroupsResponse.UNKNOWN_PROTOCOL, Collections.emptyList(),
AUTHORIZED_OPERATIONS_OMITTED);
+ DescribeGroupsResponse.UNKNOWN_PROTOCOL, List.of(),
AUTHORIZED_OPERATIONS_OMITTED);
}
public static DescribedGroup groupError(String groupId, Errors error,
String errorMessage) {
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/DescribeQuorumRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/DescribeQuorumRequest.java
index e5f6b00f9fc..569d58f1159 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/DescribeQuorumRequest.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/DescribeQuorumRequest.java
@@ -24,7 +24,6 @@ import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.Readable;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
@@ -61,10 +60,10 @@ public class DescribeQuorumRequest extends AbstractRequest {
public static DescribeQuorumRequestData singletonRequest(TopicPartition
topicPartition) {
return new DescribeQuorumRequestData()
- .setTopics(Collections.singletonList(
+ .setTopics(List.of(
new DescribeQuorumRequestData.TopicData()
.setTopicName(topicPartition.topic())
- .setPartitions(Collections.singletonList(
+ .setPartitions(List.of(
new DescribeQuorumRequestData.PartitionData()
.setPartitionIndex(topicPartition.partition()))
)));
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/DescribeQuorumResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/DescribeQuorumResponse.java
index c3b33d48052..bbc936a7846 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/DescribeQuorumResponse.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/DescribeQuorumResponse.java
@@ -22,8 +22,8 @@ import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.Readable;
-import java.util.Collections;
import java.util.EnumMap;
+import java.util.List;
import java.util.Map;
/**
@@ -79,9 +79,9 @@ public class DescribeQuorumResponse extends AbstractResponse {
Errors error
) {
return new DescribeQuorumResponseData()
- .setTopics(Collections.singletonList(new
DescribeQuorumResponseData.TopicData()
+ .setTopics(List.of(new DescribeQuorumResponseData.TopicData()
.setTopicName(topicPartition.topic())
- .setPartitions(Collections.singletonList(new
DescribeQuorumResponseData.PartitionData()
+ .setPartitions(List.of(new
DescribeQuorumResponseData.PartitionData()
.setPartitionIndex(topicPartition.partition())
.setErrorCode(error.code())
.setErrorMessage(error.message())))));
@@ -94,9 +94,9 @@ public class DescribeQuorumResponse extends AbstractResponse {
DescribeQuorumResponseData.NodeCollection nodes
) {
DescribeQuorumResponseData res = new DescribeQuorumResponseData()
- .setTopics(Collections.singletonList(new
DescribeQuorumResponseData.TopicData()
+ .setTopics(List.of(new DescribeQuorumResponseData.TopicData()
.setTopicName(topicPartition.topic())
- .setPartitions(Collections.singletonList(partitionData
+ .setPartitions(List.of(partitionData
.setPartitionIndex(topicPartition.partition())))));
if (nodes != null)
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/DescribeTopicPartitionsRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/DescribeTopicPartitionsRequest.java
index f65ef91db54..0054bd8a8f0 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/DescribeTopicPartitionsRequest.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/DescribeTopicPartitionsRequest.java
@@ -23,7 +23,6 @@ import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.Readable;
-import java.util.Collections;
import java.util.List;
public class DescribeTopicPartitionsRequest extends AbstractRequest {
@@ -83,7 +82,7 @@ public class DescribeTopicPartitionsRequest extends
AbstractRequest {
.setName(topic.name())
.setErrorCode(error.code())
.setIsInternal(false)
- .setPartitions(Collections.emptyList())
+ .setPartitions(List.of())
);
}
responseData.setThrottleTimeMs(throttleTimeMs);
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/ElectLeadersRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/ElectLeadersRequest.java
index 8ed9cb67671..c48098ccd4b 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/ElectLeadersRequest.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/ElectLeadersRequest.java
@@ -30,7 +30,6 @@ import org.apache.kafka.common.protocol.Readable;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
@@ -92,7 +91,7 @@ public class ElectLeadersRequest extends AbstractRequest {
public Set<TopicPartition> topicPartitions() {
if (this.data.topicPartitions() == null) {
- return Collections.emptySet();
+ return Set.of();
}
return this.data.topicPartitions().stream()
.flatMap(topicPartition -> topicPartition.partitions().stream()
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/EndQuorumEpochRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/EndQuorumEpochRequest.java
index de45ca457cb..e6b9c2a8486 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/EndQuorumEpochRequest.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/EndQuorumEpochRequest.java
@@ -24,7 +24,6 @@ import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.Readable;
-import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
@@ -84,10 +83,10 @@ public class EndQuorumEpochRequest extends AbstractRequest {
List<Integer>
preferredSuccessors) {
return new EndQuorumEpochRequestData()
.setClusterId(clusterId)
- .setTopics(Collections.singletonList(
+ .setTopics(List.of(
new EndQuorumEpochRequestData.TopicData()
.setTopicName(topicPartition.topic())
- .setPartitions(Collections.singletonList(
+ .setPartitions(List.of(
new EndQuorumEpochRequestData.PartitionData()
.setPartitionIndex(topicPartition.partition())
.setLeaderEpoch(leaderEpoch)
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
index 762acd09387..8bce0c775e2 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
@@ -30,7 +30,6 @@ import org.apache.kafka.common.protocol.Readable;
import org.apache.kafka.common.record.internal.RecordBatch;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -159,8 +158,8 @@ public class FetchRequest extends AbstractRequest {
private IsolationLevel isolationLevel =
IsolationLevel.READ_UNCOMMITTED;
private int maxBytes = DEFAULT_RESPONSE_MAX_BYTES;
private FetchMetadata metadata = FetchMetadata.LEGACY;
- private List<TopicIdPartition> removed = Collections.emptyList();
- private List<TopicIdPartition> replaced = Collections.emptyList();
+ private List<TopicIdPartition> removed = List.of();
+ private List<TopicIdPartition> replaced = List.of();
private String rackId = "";
public static Builder forConsumer(short maxVersion, int maxWait, int
minBytes, Map<TopicPartition, PartitionData> fetchData) {
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
index d489906c735..eee7daee2ae 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
@@ -29,7 +29,6 @@ import org.apache.kafka.common.record.internal.MemoryRecords;
import org.apache.kafka.common.record.internal.Records;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.EnumMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
@@ -166,7 +165,7 @@ public class FetchResponse extends AbstractResponse {
FetchResponseData.PartitionData>> partIterator) {
// Since the throttleTimeMs and metadata field sizes are constant and
fixed, we can
// use arbitrary values here without affecting the result.
- FetchResponseData data = toMessage(Errors.NONE, 0, INVALID_SESSION_ID,
partIterator, Collections.emptyList());
+ FetchResponseData data = toMessage(Errors.NONE, 0, INVALID_SESSION_ID,
partIterator, List.of());
ObjectSerializationCache cache = new ObjectSerializationCache();
return 4 + data.size(cache, version);
}
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorRequest.java
index b14fae5cfac..cda50c1732f 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorRequest.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorRequest.java
@@ -25,7 +25,8 @@ import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.Readable;
-import java.util.Collections;
+import java.util.List;
+
public class FindCoordinatorRequest extends AbstractRequest {
@@ -52,10 +53,10 @@ public class FindCoordinatorRequest extends AbstractRequest
{
"because we require features supported only in " +
MIN_BATCHED_VERSION + " or later.");
if (batchedKeys == 1) {
data.setKey(data.coordinatorKeys().get(0));
- data.setCoordinatorKeys(Collections.emptyList());
+ data.setCoordinatorKeys(List.of());
}
} else if (batchedKeys == 0 && data.key() != null) {
- data.setCoordinatorKeys(Collections.singletonList(data.key()));
+ data.setCoordinatorKeys(List.of(data.key()));
data.setKey(""); // default value
}
return new FindCoordinatorRequest(data, version);
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java
index 5bd08934b62..fb997b05d61 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java
@@ -24,7 +24,6 @@ import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.Readable;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.EnumMap;
import java.util.List;
import java.util.Map;
@@ -132,7 +131,7 @@ public class FindCoordinatorResponse extends
AbstractResponse {
.setNodeId(data.nodeId())
.setHost(data.host())
.setPort(data.port());
- return Collections.singletonList(coordinator);
+ return List.of(coordinator);
}
}
@@ -148,7 +147,7 @@ public class FindCoordinatorResponse extends
AbstractResponse {
public static FindCoordinatorResponse prepareResponse(Errors error, String
key, Node node) {
FindCoordinatorResponseData data = new FindCoordinatorResponseData();
- data.setCoordinators(Collections.singletonList(
+ data.setCoordinators(List.of(
prepareCoordinatorResponse(error, key, node)
));
return new FindCoordinatorResponse(data);
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
index 9d75d383bab..30649b27303 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
@@ -25,7 +25,8 @@ import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.Readable;
-import java.util.Collections;
+import java.util.List;
+
public class JoinGroupRequest extends AbstractRequest {
@@ -197,7 +198,7 @@ public class JoinGroupRequest extends AbstractRequest {
.setProtocolName(UNKNOWN_PROTOCOL_NAME)
.setLeader(UNKNOWN_MEMBER_ID)
.setMemberId(UNKNOWN_MEMBER_ID)
- .setMembers(Collections.emptyList());
+ .setMembers(List.of());
if (version() >= 7)
data.setProtocolName(null);
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupRequest.java
index 2dd69afab98..4a97c05037b 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupRequest.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupRequest.java
@@ -25,7 +25,6 @@ import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.MessageUtil;
import org.apache.kafka.common.protocol.Readable;
-import java.util.Collections;
import java.util.List;
public class LeaveGroupRequest extends AbstractRequest {
@@ -97,14 +96,14 @@ public class LeaveGroupRequest extends AbstractRequest {
} else {
return new LeaveGroupRequestData()
.setGroupId(data.groupId())
- .setMembers(Collections.singletonList(
+ .setMembers(List.of(
new MemberIdentity().setMemberId(data.memberId())));
}
}
public List<MemberIdentity> members() {
// Before version 3, leave group request is still in single mode
- return version() <= 2 ? Collections.singletonList(
+ return version() <= 2 ? List.of(
new MemberIdentity()
.setMemberId(data.memberId())) : data.members();
}
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsRequest.java
index 84f7cc2a72d..ccdf356aff8 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsRequest.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsRequest.java
@@ -24,7 +24,6 @@ import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.Readable;
-import java.util.Collections;
import java.util.HashSet;
import java.util.List;
@@ -86,7 +85,7 @@ public class ListGroupsRequest extends AbstractRequest {
@Override
public ListGroupsResponse getErrorResponse(int throttleTimeMs, Throwable
e) {
ListGroupsResponseData listGroupsResponseData = new
ListGroupsResponseData().
- setGroups(Collections.emptyList()).
+ setGroups(List.of()).
setErrorCode(Errors.forException(e).code());
if (version() >= 1) {
listGroupsResponseData.setThrottleTimeMs(throttleTimeMs);
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetsResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetsResponse.java
index a1dc329054c..7cab5d4bb50 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetsResponse.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetsResponse.java
@@ -25,7 +25,6 @@ import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.Readable;
import org.apache.kafka.common.record.internal.RecordBatch;
-import java.util.Collections;
import java.util.EnumMap;
import java.util.List;
import java.util.Map;
@@ -105,7 +104,7 @@ public class ListOffsetsResponse extends AbstractResponse {
public static ListOffsetsTopicResponse
singletonListOffsetsTopicResponse(TopicPartition tp, Errors error, long
timestamp, long offset, int epoch) {
return new ListOffsetsTopicResponse()
.setName(tp.topic())
- .setPartitions(Collections.singletonList(new
ListOffsetsPartitionResponse()
+ .setPartitions(List.of(new ListOffsetsPartitionResponse()
.setPartitionIndex(tp.partition())
.setErrorCode(error.code())
.setTimestamp(timestamp)
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
index d3dcabfb4f9..392bf2a9105 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
@@ -26,7 +26,6 @@ import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.Readable;
import java.util.Collection;
-import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@@ -173,7 +172,7 @@ public class MetadataRequest extends AbstractRequest {
.setTopicId(topic.topicId())
.setErrorCode(error.code())
.setIsInternal(false)
- .setPartitions(Collections.emptyList()));
+ .setPartitions(List.of()));
}
}
@@ -200,9 +199,9 @@ public class MetadataRequest extends AbstractRequest {
public List<Uuid> topicIds() {
if (isAllTopics())
- return Collections.emptyList();
+ return List.of();
else if (version() < 10)
- return Collections.emptyList();
+ return List.of();
else
return data.topics()
.stream()
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
index 9b8eed7cb3b..3cb9588c4b7 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
@@ -33,7 +33,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -228,7 +227,7 @@ public class OffsetFetchRequest extends AbstractRequest {
);
}
- return Collections.singletonList(group);
+ return List.of(group);
}
}
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
index e733fa9eda4..9b922d06606 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
@@ -30,9 +30,9 @@ import org.apache.kafka.common.record.internal.RecordBatch;
import org.apache.kafka.common.record.internal.Records;
import org.apache.kafka.common.utils.Utils;
-import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
+import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@@ -174,7 +174,7 @@ public class ProduceRequest extends AbstractRequest {
}
tpr.partitionResponses().add(new
ProduceResponseData.PartitionProduceResponse()
.setIndex(tpId.partition())
- .setRecordErrors(Collections.emptyList())
+ .setRecordErrors(List.of())
.setBaseOffset(INVALID_OFFSET)
.setLogAppendTimeMs(RecordBatch.NO_TIMESTAMP)
.setLogStartOffset(INVALID_OFFSET)
@@ -187,7 +187,7 @@ public class ProduceRequest extends AbstractRequest {
@Override
public Map<Errors, Integer> errorCounts(Throwable e) {
Errors error = Errors.forException(e);
- return Collections.singletonMap(error, partitionSizes().size());
+ return Map.of(error, partitionSizes().size());
}
public short acks() {
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
index df3bff0c32f..9d7817e4902 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
@@ -25,7 +25,6 @@ import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.Readable;
import org.apache.kafka.common.record.internal.RecordBatch;
-import java.util.Collections;
import java.util.EnumMap;
import java.util.List;
import java.util.Map;
@@ -74,7 +73,7 @@ public class ProduceResponse extends AbstractResponse {
*/
@Deprecated
public ProduceResponse(Map<TopicIdPartition, PartitionResponse> responses)
{
- this(responses, DEFAULT_THROTTLE_TIME, Collections.emptyList());
+ this(responses, DEFAULT_THROTTLE_TIME, List.of());
}
/**
@@ -85,7 +84,7 @@ public class ProduceResponse extends AbstractResponse {
*/
@Deprecated
public ProduceResponse(Map<TopicIdPartition, PartitionResponse> responses,
int throttleTimeMs) {
- this(toData(responses, throttleTimeMs, Collections.emptyList()));
+ this(toData(responses, throttleTimeMs, List.of()));
}
/**
@@ -170,11 +169,11 @@ public class ProduceResponse extends AbstractResponse {
}
public PartitionResponse(Errors error, String errorMessage) {
- this(error, INVALID_OFFSET, RecordBatch.NO_TIMESTAMP,
INVALID_OFFSET, Collections.emptyList(), errorMessage);
+ this(error, INVALID_OFFSET, RecordBatch.NO_TIMESTAMP,
INVALID_OFFSET, List.of(), errorMessage);
}
public PartitionResponse(Errors error, long baseOffset, long
logAppendTime, long logStartOffset) {
- this(error, baseOffset, logAppendTime, logStartOffset,
Collections.emptyList(), null);
+ this(error, baseOffset, logAppendTime, logStartOffset, List.of(),
null);
}
public PartitionResponse(Errors error, long baseOffset, long
logAppendTime, long logStartOffset, List<RecordError> recordErrors) {
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/RemoveRaftVoterResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/RemoveRaftVoterResponse.java
index 271cfde8cff..852f7bcc33d 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/RemoveRaftVoterResponse.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/RemoveRaftVoterResponse.java
@@ -22,7 +22,6 @@ import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.Readable;
-import java.util.Collections;
import java.util.Map;
public class RemoveRaftVoterResponse extends AbstractResponse {
@@ -51,9 +50,9 @@ public class RemoveRaftVoterResponse extends AbstractResponse
{
@Override
public Map<Errors, Integer> errorCounts() {
if (data.errorCode() != Errors.NONE.code()) {
- return Collections.singletonMap(Errors.forCode(data.errorCode()),
1);
+ return Map.of(Errors.forCode(data.errorCode()), 1);
} else {
- return Collections.emptyMap();
+ return Map.of();
}
}
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/ShareFetchResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/ShareFetchResponse.java
index 64251a0c5fd..fe725fa9dac 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/ShareFetchResponse.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/ShareFetchResponse.java
@@ -29,7 +29,6 @@ import org.apache.kafka.common.record.internal.MemoryRecords;
import org.apache.kafka.common.record.internal.Records;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.EnumMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
@@ -139,7 +138,7 @@ public class ShareFetchResponse extends AbstractResponse {
Iterator<Map.Entry<TopicIdPartition,
ShareFetchResponseData.PartitionData>> partIterator) {
// Since the throttleTimeMs and metadata field sizes are constant and
fixed, we can
// use arbitrary values here without affecting the result.
- ShareFetchResponseData data = toMessage(Errors.NONE, 0, partIterator,
Collections.emptyList(), 0);
+ ShareFetchResponseData data = toMessage(Errors.NONE, 0, partIterator,
List.of(), 0);
ObjectSerializationCache cache = new ObjectSerializationCache();
return 4 + data.size(cache, version);
}
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/ShareGroupHeartbeatResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/ShareGroupHeartbeatResponse.java
index 5d8426e2650..6df71cbb5c1 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/ShareGroupHeartbeatResponse.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/ShareGroupHeartbeatResponse.java
@@ -23,7 +23,6 @@ import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.Readable;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -56,7 +55,7 @@ public class ShareGroupHeartbeatResponse extends
AbstractResponse {
@Override
public Map<Errors, Integer> errorCounts() {
- return Collections.singletonMap(Errors.forCode(data.errorCode()), 1);
+ return Map.of(Errors.forCode(data.errorCode()), 1);
}
@Override
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupHeartbeatResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupHeartbeatResponse.java
index 87c10a98d37..68f2a460c4f 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupHeartbeatResponse.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupHeartbeatResponse.java
@@ -59,7 +59,7 @@ public class StreamsGroupHeartbeatResponse extends
AbstractResponse {
@Override
public Map<Errors, Integer> errorCounts() {
- return Collections.singletonMap(Errors.forCode(data.errorCode()), 1);
+ return Map.of(Errors.forCode(data.errorCode()), 1);
}
@Override
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/UpdateFeaturesRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/UpdateFeaturesRequest.java
index 35b4cce2095..8a4f7bd7c27 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/UpdateFeaturesRequest.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/UpdateFeaturesRequest.java
@@ -22,7 +22,7 @@ import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Readable;
import java.util.Collection;
-import java.util.Collections;
+import java.util.Set;
import java.util.stream.Collectors;
public class UpdateFeaturesRequest extends AbstractRequest {
@@ -105,7 +105,7 @@ public class UpdateFeaturesRequest extends AbstractRequest {
public UpdateFeaturesResponse getErrorResponse(int throttleTimeMs,
Throwable e) {
return UpdateFeaturesResponse.createWithErrors(
ApiError.fromThrowable(e),
- Collections.emptySet(),
+ Set.of(),
throttleTimeMs
);
}
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/UpdateRaftVoterResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/UpdateRaftVoterResponse.java
index f52157234fa..8d5420c9eb0 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/UpdateRaftVoterResponse.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/UpdateRaftVoterResponse.java
@@ -22,7 +22,6 @@ import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.Readable;
-import java.util.Collections;
import java.util.Map;
public class UpdateRaftVoterResponse extends AbstractResponse {
@@ -51,9 +50,9 @@ public class UpdateRaftVoterResponse extends AbstractResponse
{
@Override
public Map<Errors, Integer> errorCounts() {
if (data.errorCode() != Errors.NONE.code()) {
- return Collections.singletonMap(Errors.forCode(data.errorCode()),
1);
+ return Map.of(Errors.forCode(data.errorCode()), 1);
} else {
- return Collections.emptyMap();
+ return Map.of();
}
}
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/VoteRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/VoteRequest.java
index 73217558367..91b1cafebc7 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/VoteRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/VoteRequest.java
@@ -23,7 +23,8 @@ import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.Readable;
-import java.util.Collections;
+import java.util.List;
+
public class VoteRequest extends AbstractRequest {
@@ -77,10 +78,10 @@ public class VoteRequest extends AbstractRequest {
boolean preVote) {
return new VoteRequestData()
.setClusterId(clusterId)
- .setTopics(Collections.singletonList(
+ .setTopics(List.of(
new VoteRequestData.TopicData()
.setTopicName(topicPartition.topic())
- .setPartitions(Collections.singletonList(
+ .setPartitions(List.of(
new VoteRequestData.PartitionData()
.setPartitionIndex(topicPartition.partition())
.setReplicaEpoch(replicaEpoch)
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
index 3d32e98f8b4..2081d5bd5f4 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
@@ -766,7 +766,7 @@ public class GroupCoordinatorService implements
GroupCoordinator {
Map<Integer, PartitionErrorData> partitionErrors =
Optional.ofNullable(topicPartitionErrorsMap)
.map(map -> map.get(topic.topicId()))
- .orElse(Collections.emptyMap());
+ .orElse(Map.of());
PartitionErrorData error =
partitionErrors.get(partition.partitionIndex());
if (error == null) {
partitionData = partition.duplicate();
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index 09714ab28b8..837caddcb03 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -2166,7 +2166,7 @@ public class GroupMetadataManager {
response.setEndpointInformationEpoch(group.endpointInformationEpoch());
}
- Map<String, CreatableTopic> internalTopicsToBeCreated =
Collections.emptyMap();
+ Map<String, CreatableTopic> internalTopicsToBeCreated = Map.of();
if
(updatedConfiguredTopology.topicConfigurationException().isPresent()) {
TopicConfigurationException exception =
updatedConfiguredTopology.topicConfigurationException().get();
internalTopicsToBeCreated =
updatedConfiguredTopology.internalTopicsToBeCreated();
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
index b1a5b026654..a74871871a6 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java
@@ -1275,11 +1275,11 @@ public class StreamsGroup implements Group {
// Search for the partition in assigned tasks, then in tasks
pending revocation
Integer assignmentEpoch = assignedTasks.activeTasksWithEpochs()
- .getOrDefault(subtopologyId, Collections.emptyMap())
+ .getOrDefault(subtopologyId, Map.of())
.get(partitionId);
if (assignmentEpoch == null) {
assignmentEpoch =
tasksPendingRevocation.activeTasksWithEpochs()
- .getOrDefault(subtopologyId, Collections.emptyMap())
+ .getOrDefault(subtopologyId, Map.of())
.get(partitionId);
}
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMember.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMember.java
index 887472132a6..a43431ce47e 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMember.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMember.java
@@ -287,7 +287,7 @@ public record StreamsGroupMember(String memberId,
.setClientId("")
.setClientHost("")
.setProcessId("")
- .setClientTags(Collections.emptyMap())
+ .setClientTags(Map.of())
.setState(MemberState.STABLE)
.setMemberEpoch(0)
.setPreviousMemberEpoch(0)