This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 7527a8bac08 MINOR: Cleanup Connect Module (4/n) (#20389)
7527a8bac08 is described below
commit 7527a8bac088ac551a34c7ef2447588f70d8ec7e
Author: Sanskar Jhajharia <[email protected]>
AuthorDate: Thu Aug 28 02:53:22 2025 +0530
MINOR: Cleanup Connect Module (4/n) (#20389)
Now that Kafka support Java 17, this PR makes some changes in connect
module. The changes in this PR are limited to only some files. A future
PR(s) shall follow.
The changes mostly include:
- Collections.emptyList(), Collections.singletonList() and
Arrays.asList() are replaced with List.of()
- Collections.emptyMap() and Collections.singletonMap() are replaced
with Map.of()
- Collections.singleton() is replaced with Set.of()
Modules target: runtime/src/main
Reviewers: Ken Huang <[email protected]>, Chia-Ping Tsai
<[email protected]>
---
.../kafka/connect/cli/AbstractConnectCli.java | 6 ++--
.../kafka/connect/cli/ConnectDistributed.java | 6 ++--
.../kafka/connect/cli/ConnectStandalone.java | 3 +-
.../kafka/connect/runtime/AbstractHerder.java | 8 ++---
.../kafka/connect/runtime/ConnectMetrics.java | 2 +-
.../connect/runtime/ConnectMetricsRegistry.java | 22 ++++++-------
.../kafka/connect/runtime/ConnectorConfig.java | 17 ++++------
.../kafka/connect/runtime/SinkConnectorConfig.java | 5 ++-
.../connect/runtime/SourceConnectorConfig.java | 8 ++---
.../kafka/connect/runtime/SubmittedRecords.java | 2 +-
.../kafka/connect/runtime/TopicCreationConfig.java | 7 ++---
.../org/apache/kafka/connect/runtime/Worker.java | 9 +++---
.../apache/kafka/connect/runtime/WorkerConfig.java | 5 ++-
.../kafka/connect/runtime/WorkerSinkTask.java | 9 +++---
.../connect/runtime/WorkerSinkTaskContext.java | 13 ++++----
.../runtime/distributed/ConnectProtocol.java | 4 +--
.../runtime/distributed/DistributedConfig.java | 3 +-
.../runtime/distributed/DistributedHerder.java | 14 ++++-----
.../connect/runtime/distributed/EagerAssignor.java | 8 ++---
.../runtime/distributed/ExtendedAssignment.java | 11 +++----
.../IncrementalCooperativeAssignor.java | 36 ++++++++++------------
.../runtime/distributed/WorkerCoordinator.java | 3 +-
.../runtime/errors/DeadLetterQueueReporter.java | 5 ++-
.../runtime/errors/RetryWithToleranceOperator.java | 7 ++---
.../runtime/isolation/DelegatingClassLoader.java | 3 +-
.../runtime/isolation/PluginScanResult.java | 3 +-
.../connect/runtime/isolation/PluginUtils.java | 12 +++-----
.../kafka/connect/runtime/isolation/Plugins.java | 6 ++--
.../runtime/isolation/PluginsRecommenders.java | 17 +++++-----
.../connect/runtime/rest/ConnectRestServer.java | 9 ++----
.../kafka/connect/runtime/rest/RestServer.java | 3 +-
.../connect/runtime/rest/RestServerConfig.java | 5 ++-
.../runtime/rest/resources/ConnectorsResource.java | 3 +-
.../kafka/connect/runtime/rest/util/SSLUtils.java | 3 +-
.../kafka/connect/storage/ClusterConfigState.java | 25 ++++++++-------
.../storage/ConnectorOffsetBackingStore.java | 3 +-
.../connect/storage/FileOffsetBackingStore.java | 3 +-
.../connect/storage/KafkaConfigBackingStore.java | 7 ++---
.../connect/storage/KafkaOffsetBackingStore.java | 5 ++-
.../connect/storage/KafkaStatusBackingStore.java | 9 +++---
.../connect/storage/MemoryConfigBackingStore.java | 10 +++---
.../connect/storage/MemoryStatusBackingStore.java | 5 ++-
.../connect/storage/OffsetStorageReaderImpl.java | 7 ++---
.../apache/kafka/connect/tools/PredicateDoc.java | 4 +--
.../kafka/connect/tools/TransformationDoc.java | 3 +-
.../apache/kafka/connect/util/KafkaBasedLog.java | 5 ++-
.../apache/kafka/connect/util/LoggingContext.java | 4 +--
.../org/apache/kafka/connect/util/SinkUtils.java | 3 +-
.../java/org/apache/kafka/connect/util/Table.java | 5 ++-
.../org/apache/kafka/connect/util/TopicAdmin.java | 25 ++++++++-------
.../apache/kafka/connect/util/TopicCreation.java | 3 +-
.../kafka/connect/util/TopicCreationGroup.java | 3 +-
52 files changed, 178 insertions(+), 228 deletions(-)
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/AbstractConnectCli.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/AbstractConnectCli.java
index 5cfa300bafc..8c0d30b1c99 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/AbstractConnectCli.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/AbstractConnectCli.java
@@ -34,7 +34,7 @@ import org.slf4j.LoggerFactory;
import java.net.URI;
import java.util.Arrays;
-import java.util.Collections;
+import java.util.List;
import java.util.Map;
/**
@@ -82,7 +82,7 @@ public abstract class AbstractConnectCli<H extends Herder, T
extends WorkerConfi
* Validate {@link #args}, process worker properties from the first CLI
argument, and start {@link Connect}
*/
public void run() {
- if (args.length < 1 || Arrays.asList(args).contains("--help")) {
+ if (args.length < 1 || List.of(args).contains("--help")) {
log.info("Usage: {}", usage());
Exit.exit(1);
}
@@ -90,7 +90,7 @@ public abstract class AbstractConnectCli<H extends Herder, T
extends WorkerConfi
try {
String workerPropsFile = args[0];
Map<String, String> workerProps = !workerPropsFile.isEmpty() ?
- Utils.propsToStringMap(Utils.loadProps(workerPropsFile)) :
Collections.emptyMap();
+ Utils.propsToStringMap(Utils.loadProps(workerPropsFile)) :
Map.of();
String[] extraArgs = Arrays.copyOfRange(args, 1, args.length);
Connect<H> connect = startConnect(workerProps);
processExtraArgs(connect, extraArgs);
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
index 8763dd908a1..59b943ae919 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
@@ -36,8 +36,8 @@ import org.apache.kafka.connect.storage.StatusBackingStore;
import org.apache.kafka.connect.util.ConnectUtils;
import org.apache.kafka.connect.util.SharedTopicAdmin;
-import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import static org.apache.kafka.clients.CommonClientConfigs.CLIENT_ID_CONFIG;
@@ -77,7 +77,7 @@ public class ConnectDistributed extends
AbstractConnectCli<DistributedHerder, Di
KafkaOffsetBackingStore offsetBackingStore = new
KafkaOffsetBackingStore(sharedAdmin, () -> clientIdBase,
plugins.newInternalConverter(true,
JsonConverter.class.getName(),
-
Collections.singletonMap(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, "false")));
+ Map.of(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG,
"false")));
offsetBackingStore.configure(config);
Worker worker = new Worker(workerId, Time.SYSTEM, plugins, config,
offsetBackingStore, connectorClientConfigOverridePolicy);
@@ -99,7 +99,7 @@ public class ConnectDistributed extends
AbstractConnectCli<DistributedHerder, Di
return new DistributedHerder(config, Time.SYSTEM, worker,
kafkaClusterId, statusBackingStore, configBackingStore,
restServer.advertisedUrl().toString(), restClient,
connectorClientConfigOverridePolicy,
- Collections.emptyList(), sharedAdmin);
+ List.of(), sharedAdmin);
}
@Override
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
index 43af6b274b6..ded4103c69c 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
@@ -49,7 +49,6 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.nio.file.Paths;
-import java.util.Collections;
import java.util.Map;
import static org.apache.kafka.connect.runtime.ConnectorConfig.NAME_CONFIG;
@@ -163,7 +162,7 @@ public class ConnectStandalone extends
AbstractConnectCli<StandaloneHerder, Stan
RestServer restServer, RestClient
restClient) {
OffsetBackingStore offsetBackingStore = new
FileOffsetBackingStore(plugins.newInternalConverter(
- true, JsonConverter.class.getName(),
Collections.singletonMap(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, "false")));
+ true, JsonConverter.class.getName(),
Map.of(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, "false")));
offsetBackingStore.configure(config);
Worker worker = new Worker(workerId, Time.SYSTEM, plugins, config,
offsetBackingStore,
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
index bce67129388..165a540ca62 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
@@ -555,7 +555,7 @@ public abstract class AbstractHerder implements Herder,
TaskStatus.Listener, Con
"header converter",
HEADER_CONVERTER_CLASS_CONFIG,
HEADER_CONVERTER_VERSION_CONFIG,
- Collections.singletonMap(ConverterConfig.TYPE_CONFIG,
ConverterType.HEADER.getName()),
+ Map.of(ConverterConfig.TYPE_CONFIG,
ConverterType.HEADER.getName()),
connectorLoader,
reportStage
);
@@ -568,7 +568,7 @@ public abstract class AbstractHerder implements Herder,
TaskStatus.Listener, Con
"key converter",
KEY_CONVERTER_CLASS_CONFIG,
KEY_CONVERTER_VERSION_CONFIG,
- Collections.singletonMap(ConverterConfig.TYPE_CONFIG,
ConverterType.KEY.getName()),
+ Map.of(ConverterConfig.TYPE_CONFIG,
ConverterType.KEY.getName()),
connectorLoader,
reportStage
);
@@ -582,7 +582,7 @@ public abstract class AbstractHerder implements Herder,
TaskStatus.Listener, Con
"value converter",
VALUE_CONVERTER_CLASS_CONFIG,
VALUE_CONVERTER_VERSION_CONFIG,
- Collections.singletonMap(ConverterConfig.TYPE_CONFIG,
ConverterType.VALUE.getName()),
+ Map.of(ConverterConfig.TYPE_CONFIG,
ConverterType.VALUE.getName()),
connectorLoader,
reportStage
);
@@ -1278,7 +1278,7 @@ public abstract class AbstractHerder implements Herder,
TaskStatus.Listener, Con
if (!loggers.isValidLevel(normalizedLevel)) {
log.warn("Ignoring request to set invalid level '{}' for namespace
{}", desiredLevelStr, namespace);
- return Collections.emptyList();
+ return List.of();
}
return loggers.setLevel(namespace, normalizedLevel);
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java
index 90e6650df37..83408649000 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java
@@ -253,7 +253,7 @@ public class ConnectMetrics {
Objects.requireNonNull(groupName);
Objects.requireNonNull(tags);
this.groupName = groupName;
- this.tags = Collections.unmodifiableMap(new LinkedHashMap<>(tags));
+ this.tags = Collections.unmodifiableMap(new
LinkedHashMap<>(tags)); // To ensure the order of insertion, we have to use
Collections.
this.hc = Objects.hash(this.groupName, this.tags);
StringBuilder sb = new StringBuilder(this.groupName);
for (Map.Entry<String, String> entry : this.tags.entrySet()) {
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetricsRegistry.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetricsRegistry.java
index 496c838cf45..7f879ea8f2a 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetricsRegistry.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetricsRegistry.java
@@ -19,8 +19,6 @@ package org.apache.kafka.connect.runtime;
import org.apache.kafka.common.MetricNameTemplate;
import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
@@ -135,7 +133,7 @@ public class ConnectMetricsRegistry {
public final MetricNameTemplate predicateClass;
public final MetricNameTemplate predicateVersion;
- public Map<MetricNameTemplate, TaskStatus.State> connectorStatusMetrics;
+ public final Map<MetricNameTemplate, TaskStatus.State>
connectorStatusMetrics;
public ConnectMetricsRegistry() {
this(new LinkedHashSet<>());
@@ -388,14 +386,14 @@ public class ConnectMetricsRegistry {
WORKER_GROUP_NAME,
"The number of restarting tasks of the connector on the worker.",
workerConnectorTags);
- connectorStatusMetrics = new HashMap<>();
- connectorStatusMetrics.put(connectorRunningTaskCount,
TaskStatus.State.RUNNING);
- connectorStatusMetrics.put(connectorPausedTaskCount,
TaskStatus.State.PAUSED);
- connectorStatusMetrics.put(connectorFailedTaskCount,
TaskStatus.State.FAILED);
- connectorStatusMetrics.put(connectorUnassignedTaskCount,
TaskStatus.State.UNASSIGNED);
- connectorStatusMetrics.put(connectorDestroyedTaskCount,
TaskStatus.State.DESTROYED);
- connectorStatusMetrics.put(connectorRestartingTaskCount,
TaskStatus.State.RESTARTING);
- connectorStatusMetrics =
Collections.unmodifiableMap(connectorStatusMetrics);
+ connectorStatusMetrics = Map.of(
+ connectorRunningTaskCount, TaskStatus.State.RUNNING,
+ connectorPausedTaskCount, TaskStatus.State.PAUSED,
+ connectorFailedTaskCount, TaskStatus.State.FAILED,
+ connectorUnassignedTaskCount, TaskStatus.State.UNASSIGNED,
+ connectorDestroyedTaskCount, TaskStatus.State.DESTROYED,
+ connectorRestartingTaskCount, TaskStatus.State.RESTARTING
+ );
/* Worker rebalance level */
Set<String> rebalanceTags = new LinkedHashSet<>(tags);
@@ -444,7 +442,7 @@ public class ConnectMetricsRegistry {
}
public List<MetricNameTemplate> getAllTemplates() {
- return Collections.unmodifiableList(allTemplates);
+ return List.copyOf(allTemplates);
}
public String connectorTagName() {
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
index efd421bd2e2..ff4d399db1a 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
@@ -47,7 +47,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
@@ -250,8 +249,8 @@ public class ConnectorConfig extends AbstractConfig {
.define(VALUE_CONVERTER_VERSION_CONFIG, Type.STRING,
valueConverterDefaults.version, VALUE_CONVERTER_VERSION_VALIDATOR,
Importance.LOW, VALUE_CONVERTER_VERSION_DOC, COMMON_GROUP, ++orderInGroup,
Width.SHORT, VALUE_CONVERTER_VERSION_DISPLAY,
recommender.valueConverterPluginVersionRecommender())
.define(HEADER_CONVERTER_CLASS_CONFIG, Type.CLASS,
headerConverterDefaults.type, HEADER_CONVERTER_CLASS_VALIDATOR, Importance.LOW,
HEADER_CONVERTER_CLASS_DOC, COMMON_GROUP, ++orderInGroup, Width.SHORT,
HEADER_CONVERTER_CLASS_DISPLAY, recommender.headerConverterPluginRecommender())
.define(HEADER_CONVERTER_VERSION_CONFIG, Type.STRING,
headerConverterDefaults.version, HEADER_CONVERTER_VERSION_VALIDATOR,
Importance.LOW, HEADER_CONVERTER_VERSION_DOC, COMMON_GROUP, ++orderInGroup,
Width.SHORT, HEADER_CONVERTER_VERSION_DISPLAY,
recommender.headerConverterPluginVersionRecommender())
- .define(TRANSFORMS_CONFIG, Type.LIST, Collections.emptyList(),
aliasValidator("transformation"), Importance.LOW, TRANSFORMS_DOC,
TRANSFORMS_GROUP, ++orderInGroup, Width.LONG, TRANSFORMS_DISPLAY)
- .define(PREDICATES_CONFIG, Type.LIST, Collections.emptyList(),
aliasValidator("predicate"), Importance.LOW, PREDICATES_DOC, PREDICATES_GROUP,
++orderInGroup, Width.LONG, PREDICATES_DISPLAY)
+ .define(TRANSFORMS_CONFIG, Type.LIST, List.of(),
aliasValidator("transformation"), Importance.LOW, TRANSFORMS_DOC,
TRANSFORMS_GROUP, ++orderInGroup, Width.LONG, TRANSFORMS_DISPLAY)
+ .define(PREDICATES_CONFIG, Type.LIST, List.of(),
aliasValidator("predicate"), Importance.LOW, PREDICATES_DOC, PREDICATES_GROUP,
++orderInGroup, Width.LONG, PREDICATES_DISPLAY)
.define(CONFIG_RELOAD_ACTION_CONFIG, Type.STRING,
CONFIG_RELOAD_ACTION_RESTART,
in(CONFIG_RELOAD_ACTION_NONE,
CONFIG_RELOAD_ACTION_RESTART), Importance.LOW,
CONFIG_RELOAD_ACTION_DOC, COMMON_GROUP,
++orderInGroup, Width.MEDIUM, CONFIG_RELOAD_ACTION_DISPLAY)
@@ -303,7 +302,7 @@ public class ConnectorConfig extends AbstractConfig {
}
public ConnectorConfig(Plugins plugins) {
- this(plugins, Collections.emptyMap());
+ this(plugins, Map.of());
}
public ConnectorConfig(Plugins plugins, Map<String, String> props) {
@@ -646,7 +645,7 @@ public class ConnectorConfig extends AbstractConfig {
newDef.define(typeConfig, Type.CLASS,
ConfigDef.NO_DEFAULT_VALUE, typeValidator, Importance.HIGH,
"Class for the '" + alias + "' " +
aliasKind.toLowerCase(Locale.ENGLISH) + ".", group, orderInGroup++, Width.LONG,
baseClass.getSimpleName() + " type for " + alias,
- Collections.emptyList(), new ClassRecommender());
+ List.of(), new ClassRecommender());
// Add the version configuration
final ConfigDef.Validator versionValidator = (name, value) -> {
@@ -664,7 +663,7 @@ public class ConnectorConfig extends AbstractConfig {
newDef.define(versionConfig, Type.STRING, defaultVersion,
versionValidator, Importance.HIGH,
"Version of the '" + alias + "' " +
aliasKind.toLowerCase(Locale.ENGLISH) + ".", group, orderInGroup++, Width.LONG,
baseClass.getSimpleName() + " version for " + alias,
- Collections.emptyList(),
versionRecommender(typeConfig));
+ List.of(), versionRecommender(typeConfig));
final ConfigDef configDef = populateConfigDef(typeConfig,
versionConfig, plugins);
if (configDef == null) continue;
@@ -780,11 +779,7 @@ public class ConnectorConfig extends AbstractConfig {
@Override
public List<Object> validValues(String name, Map<String, Object>
parsedConfig) {
- List<Object> result = new ArrayList<>();
- for (PluginDesc<T> plugin : plugins()) {
- result.add(plugin.pluginClass());
- }
- return Collections.unmodifiableList(result);
+ return plugins().stream().map(p -> (Object)
p.pluginClass()).toList();
}
@Override
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java
index 4584255e231..b52b73810fd 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java
@@ -27,7 +27,6 @@ import org.apache.kafka.connect.sink.SinkTask;
import org.apache.kafka.connect.transforms.util.RegexValidator;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -169,7 +168,7 @@ public class SinkConnectorConfig extends ConnectorConfig {
private static void addErrorMessage(Map<String, ConfigValue>
validatedConfig, String name, String value, String errorMessage) {
validatedConfig.computeIfAbsent(
name,
- p -> new ConfigValue(name, value, Collections.emptyList(), new
ArrayList<>())
+ p -> new ConfigValue(name, value, List.of(), new ArrayList<>())
).addErrorMessage(
errorMessage
);
@@ -189,7 +188,7 @@ public class SinkConnectorConfig extends ConnectorConfig {
public static List<String> parseTopicsList(Map<String, String> props) {
List<String> topics = (List<String>)
ConfigDef.parseType(TOPICS_CONFIG, props.get(TOPICS_CONFIG), Type.LIST);
if (topics == null) {
- return Collections.emptyList();
+ return List.of();
}
return topics
.stream()
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceConnectorConfig.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceConnectorConfig.java
index 336468f491a..e9913e81f4c 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceConnectorConfig.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceConnectorConfig.java
@@ -26,12 +26,12 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
+import java.util.Set;
import java.util.stream.Collectors;
import static org.apache.kafka.common.utils.Utils.enumOptions;
@@ -132,7 +132,7 @@ public final class SourceConnectorConfig extends
ConnectorConfig {
.define(
TOPIC_CREATION_GROUPS_CONFIG,
ConfigDef.Type.LIST,
- Collections.emptyList(),
+ List.of(),
ConfigDef.CompositeValidator.of(
new ConfigDef.NonNullValidator(),
ConfigDef.LambdaValidator.with(
@@ -240,7 +240,7 @@ public final class SourceConnectorConfig extends
ConnectorConfig {
if (topicCreationGroups.contains(DEFAULT_TOPIC_CREATION_GROUP)) {
log.warn("'{}' topic creation group always exists and does not
need to be listed explicitly",
DEFAULT_TOPIC_CREATION_GROUP);
-
topicCreationGroups.removeAll(Collections.singleton(DEFAULT_TOPIC_CREATION_GROUP));
+
topicCreationGroups.removeAll(Set.of(DEFAULT_TOPIC_CREATION_GROUP));
}
ConfigDef newDef = new ConfigDef(baseConfigDef);
@@ -332,7 +332,7 @@ public final class SourceConnectorConfig extends
ConnectorConfig {
public Map<String, Object> topicCreationOtherConfigs(String group) {
if (enrichedSourceConfig == null) {
- return Collections.emptyMap();
+ return Map.of();
}
return enrichedSourceConfig.originalsWithPrefix(TOPIC_CREATION_PREFIX
+ group + '.').entrySet().stream()
.filter(e -> {
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java
index accbb0196d7..86cec8080db 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java
@@ -250,7 +250,7 @@ class SubmittedRecords {
/**
* An "empty" snapshot that contains no offsets to commit and whose
metadata contains no committable or uncommitable messages.
*/
- public static final CommittableOffsets EMPTY = new
CommittableOffsets(Collections.emptyMap(), 0, 0, 0, 0, null);
+ public static final CommittableOffsets EMPTY = new
CommittableOffsets(Map.of(), 0, 0, 0, 0, null);
CommittableOffsets {
offsets = Collections.unmodifiableMap(offsets);
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TopicCreationConfig.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TopicCreationConfig.java
index 11c2ba9d374..4339fd6f236 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TopicCreationConfig.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TopicCreationConfig.java
@@ -20,7 +20,6 @@ import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.util.TopicAdmin;
-import java.util.Collections;
import java.util.List;
import java.util.regex.Pattern;
import java.util.regex.PatternSyntaxException;
@@ -102,11 +101,11 @@ public class TopicCreationConfig {
int orderInGroup = 0;
ConfigDef configDef = new ConfigDef();
configDef
- .define(INCLUDE_REGEX_CONFIG, ConfigDef.Type.LIST,
Collections.emptyList(),
+ .define(INCLUDE_REGEX_CONFIG, ConfigDef.Type.LIST, List.of(),
REGEX_VALIDATOR, ConfigDef.Importance.LOW,
INCLUDE_REGEX_DOC, group, ++orderInGroup,
ConfigDef.Width.LONG,
"Inclusion Topic Pattern for " + group)
- .define(EXCLUDE_REGEX_CONFIG, ConfigDef.Type.LIST,
Collections.emptyList(),
+ .define(EXCLUDE_REGEX_CONFIG, ConfigDef.Type.LIST, List.of(),
REGEX_VALIDATOR, ConfigDef.Importance.LOW,
EXCLUDE_REGEX_DOC, group, ++orderInGroup,
ConfigDef.Width.LONG,
"Exclusion Topic Pattern for " + group)
@@ -129,7 +128,7 @@ public class TopicCreationConfig {
new ConfigDef.NonNullValidator(),
ConfigDef.Importance.LOW,
INCLUDE_REGEX_DOC, DEFAULT_TOPIC_CREATION_GROUP,
++orderInGroup, ConfigDef.Width.LONG,
"Inclusion Topic Pattern for " +
DEFAULT_TOPIC_CREATION_GROUP)
- .define(EXCLUDE_REGEX_CONFIG, ConfigDef.Type.LIST,
Collections.emptyList(),
+ .define(EXCLUDE_REGEX_CONFIG, ConfigDef.Type.LIST, List.of(),
new ConfigDef.NonNullValidator(),
ConfigDef.Importance.LOW,
EXCLUDE_REGEX_DOC, DEFAULT_TOPIC_CREATION_GROUP,
++orderInGroup, ConfigDef.Width.LONG,
"Exclusion Topic Pattern for " +
DEFAULT_TOPIC_CREATION_GROUP)
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index a3e914d3f90..53cc40d7fd8 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -108,7 +108,6 @@ import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -197,7 +196,7 @@ public final class Worker {
this.connectorClientConfigOverridePolicy =
connectorClientConfigOverridePolicy;
this.workerMetricsGroup = new WorkerMetricsGroup(this.connectors,
this.tasks, metrics);
- Map<String, String> internalConverterConfig =
Collections.singletonMap(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, "false");
+ Map<String, String> internalConverterConfig =
Map.of(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, "false");
this.internalKeyConverter = plugins.newInternalConverter(true,
JsonConverter.class.getName(), internalConverterConfig);
this.internalValueConverter = plugins.newInternalConverter(false,
JsonConverter.class.getName(), internalConverterConfig);
@@ -539,7 +538,7 @@ public final class Worker {
*/
public void stopAndAwaitConnector(String connName) {
stopConnector(connName);
- awaitStopConnectors(Collections.singletonList(connName));
+ awaitStopConnectors(List.of(connName));
}
/**
@@ -1149,7 +1148,7 @@ public final class Worker {
*/
public void stopAndAwaitTask(ConnectorTaskId taskId) {
stopTask(taskId);
- awaitStopTasks(Collections.singletonList(taskId));
+ awaitStopTasks(List.of(taskId));
}
/**
@@ -1562,7 +1561,7 @@ public final class Worker {
private void resetSinkConnectorOffsets(String connName, String groupId,
Admin admin, Callback<Message> cb, boolean alterOffsetsResult, Timer timer) {
DeleteConsumerGroupsOptions deleteConsumerGroupsOptions = new
DeleteConsumerGroupsOptions().timeoutMs((int) timer.remainingMs());
- admin.deleteConsumerGroups(Collections.singleton(groupId),
deleteConsumerGroupsOptions)
+ admin.deleteConsumerGroups(Set.of(groupId),
deleteConsumerGroupsOptions)
.all()
.whenComplete((ignored, error) -> {
// We treat GroupIdNotFoundException as a non-error here
because resetting a connector's offsets is expected to be an idempotent
operation
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
index b3aded30682..bec7d2085ec 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
@@ -37,7 +37,6 @@ import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Map;
@@ -254,7 +253,7 @@ public class WorkerConfig extends AbstractConfig {
.define(HEADER_CONVERTER_VERSION, Type.STRING,
HEADER_CONVERTER_VERSION_DEFAULT, Importance.LOW,
HEADER_CONVERTER_VERSION_DOC)
.define(CONFIG_PROVIDERS_CONFIG, Type.LIST,
- Collections.emptyList(),
+ List.of(),
Importance.LOW, CONFIG_PROVIDERS_DOC)
.define(CONNECTOR_CLIENT_POLICY_CLASS_CONFIG, Type.STRING,
CONNECTOR_CLIENT_POLICY_CLASS_DEFAULT,
Importance.MEDIUM, CONNECTOR_CLIENT_POLICY_CLASS_DOC)
@@ -316,7 +315,7 @@ public class WorkerConfig extends AbstractConfig {
private void logInternalConverterRemovalWarnings(Map<String, String>
props) {
List<String> removedProperties = new ArrayList<>();
- for (String property : Arrays.asList("internal.key.converter",
"internal.value.converter")) {
+ for (String property : List.of("internal.key.converter",
"internal.value.converter")) {
if (props.containsKey(property)) {
removedProperties.add(property);
}
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
index 14b093c9123..1de9ff2d9a5 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
@@ -63,16 +63,15 @@ import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
-import static java.util.Collections.singleton;
import static
org.apache.kafka.connect.runtime.WorkerConfig.TOPIC_TRACKING_ENABLE_CONFIG;
/**
@@ -367,12 +366,12 @@ class WorkerSinkTask extends
WorkerTask<ConsumerRecord<byte[], byte[]>, SinkReco
//VisibleForTesting
Map<TopicPartition, OffsetAndMetadata> lastCommittedOffsets() {
- return Collections.unmodifiableMap(lastCommittedOffsets);
+ return Map.copyOf(lastCommittedOffsets);
}
//VisibleForTesting
Map<TopicPartition, OffsetAndMetadata> currentOffsets() {
- return Collections.unmodifiableMap(currentOffsets);
+ return Map.copyOf(currentOffsets);
}
private void doCommitSync(Map<TopicPartition, OffsetAndMetadata> offsets,
int seqno) {
@@ -613,7 +612,7 @@ class WorkerSinkTask extends
WorkerTask<ConsumerRecord<byte[], byte[]>, SinkReco
private void resumeAll() {
for (TopicPartition tp : consumer.assignment())
if (!context.pausedPartitions().contains(tp))
- consumer.resume(singleton(tp));
+ consumer.resume(Set.of(tp));
}
private void pauseAll() {
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java
index 11b8446b3d8..9b31b6e35d9 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java
@@ -27,10 +27,10 @@ import org.apache.kafka.connect.storage.ClusterConfigState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -117,7 +117,7 @@ public class WorkerSinkTaskContext implements
SinkTaskContext {
if (sinkTask.shouldPause()) {
log.debug("{} Connector is paused, so not pausing consumer's
partitions {}", this, partitions);
} else {
- consumer.pause(Arrays.asList(partitions));
+ consumer.pause(List.of(partitions));
log.debug("{} Pausing partitions {}. Connector is not
paused.", this, partitions);
}
} catch (IllegalStateException e) {
@@ -131,12 +131,13 @@ public class WorkerSinkTaskContext implements
SinkTaskContext {
throw new IllegalWorkerStateException("SinkTaskContext may not be
used to resume consumption until the task is initialized");
}
try {
- pausedPartitions.removeAll(Arrays.asList(partitions));
+ List<TopicPartition> partitionList = List.of(partitions);
+ partitionList.forEach(pausedPartitions::remove);
if (sinkTask.shouldPause()) {
- log.debug("{} Connector is paused, so not resuming consumer's
partitions {}", this, partitions);
+ log.debug("{} Connector is paused, so not resuming consumer's
partitions {}", this, partitionList);
} else {
- consumer.resume(Arrays.asList(partitions));
- log.debug("{} Resuming partitions: {}", this, partitions);
+ consumer.resume(partitionList);
+ log.debug("{} Resuming partitions: {}", this, partitionList);
}
} catch (IllegalStateException e) {
throw new IllegalWorkerStateException("SinkTasks may not resume
partitions that are not currently assigned to them.", e);
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ConnectProtocol.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ConnectProtocol.java
index 2644e105d4d..6b29598ab10 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ConnectProtocol.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ConnectProtocol.java
@@ -27,12 +27,12 @@ import org.apache.kafka.connect.util.ConnectorTaskId;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Set;
import static
org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocol;
import static
org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocolCollection;
@@ -142,7 +142,7 @@ public class ConnectProtocol {
* @return the collection of Connect protocol metadata
*/
public static JoinGroupRequestProtocolCollection
metadataRequest(WorkerState workerState) {
- return new JoinGroupRequestProtocolCollection(Collections.singleton(
+ return new JoinGroupRequestProtocolCollection(Set.of(
new JoinGroupRequestProtocol()
.setName(EAGER.protocol())
.setMetadata(ConnectProtocol.serializeMetadata(workerState).array()))
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
index 16ab0d47a3c..6b209da331a 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
@@ -35,7 +35,6 @@ import java.security.Provider;
import java.security.Security;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
@@ -218,7 +217,7 @@ public final class DistributedConfig extends WorkerConfig {
+ "on other JVMs, no default is used and a value for this property
must be manually specified in the worker config.";
public static final String INTER_WORKER_VERIFICATION_ALGORITHMS_CONFIG =
"inter.worker.verification.algorithms";
- public static final List<String>
INTER_WORKER_VERIFICATION_ALGORITHMS_DEFAULT =
Collections.singletonList(INTER_WORKER_SIGNATURE_ALGORITHM_DEFAULT);
+ public static final List<String>
INTER_WORKER_VERIFICATION_ALGORITHMS_DEFAULT =
List.of(INTER_WORKER_SIGNATURE_ALGORITHM_DEFAULT);
public static final String INTER_WORKER_VERIFICATION_ALGORITHMS_DOC = "A
list of permitted algorithms for verifying internal requests, "
+ "which must include the algorithm used for the " +
INTER_WORKER_SIGNATURE_ALGORITHM_CONFIG + " property. "
+ "The algorithm(s) '" + INTER_WORKER_VERIFICATION_ALGORITHMS_DEFAULT
+ "' will be used as a default on JVMs that provide them; "
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
index 7ad78744062..6c4bed311d3 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -82,9 +82,7 @@ import org.apache.kafka.connect.util.TemporaryStage;
import org.slf4j.Logger;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collection;
-import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -305,7 +303,7 @@ public class DistributedHerder extends AbstractHerder
implements Runnable {
this.restClient = restClient;
this.isTopicTrackingEnabled =
config.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG);
this.restNamespace = Objects.requireNonNull(restNamespace);
- this.uponShutdown = Arrays.asList(uponShutdown);
+ this.uponShutdown = List.of(uponShutdown);
String clientIdConfig =
config.getString(CommonClientConfigs.CLIENT_ID_CONFIG);
String clientId = clientIdConfig.isEmpty() ? "connect-" + workerId :
clientIdConfig;
@@ -713,7 +711,7 @@ public class DistributedHerder extends AbstractHerder
implements Runnable {
private void processConnectorConfigUpdates(Set<String>
connectorConfigUpdates) {
// If we only have connector config updates, we can just bounce the
updated connectors that are
// currently assigned to this worker.
- Set<String> localConnectors = assignment == null ?
Collections.emptySet() : new HashSet<>(assignment.connectors());
+ Set<String> localConnectors = assignment == null ? Set.of() : new
HashSet<>(assignment.connectors());
Collection<Callable<Void>> connectorsToStart = new ArrayList<>();
log.trace("Processing connector config updates; "
+ "currently-owned connectors are {}, and to-be-updated
connectors are {}",
@@ -769,7 +767,7 @@ public class DistributedHerder extends AbstractHerder
implements Runnable {
private void
processTaskConfigUpdatesWithIncrementalCooperative(Set<ConnectorTaskId>
taskConfigUpdates) {
Set<ConnectorTaskId> localTasks = assignment == null
- ? Collections.emptySet()
+ ? Set.of()
: new HashSet<>(assignment.tasks());
log.trace("Processing task config updates with incremental cooperative
rebalance protocol; "
+ "currently-owned tasks are {}, and to-be-updated tasks are
{}",
@@ -781,7 +779,7 @@ public class DistributedHerder extends AbstractHerder
implements Runnable {
private void stopReconfiguredTasks(Set<String> connectors) {
Set<ConnectorTaskId> localTasks = assignment == null
- ? Collections.emptySet()
+ ? Set.of()
: new HashSet<>(assignment.tasks());
List<ConnectorTaskId> tasksToStop = localTasks.stream()
@@ -966,7 +964,7 @@ public class DistributedHerder extends AbstractHerder
implements Runnable {
String consumerGroupId =
config.get(overriddenConsumerGroupIdConfig);
ConfigValue validatedGroupId = validatedConfig.computeIfAbsent(
overriddenConsumerGroupIdConfig,
- p -> new ConfigValue(overriddenConsumerGroupIdConfig,
consumerGroupId, Collections.emptyList(), new ArrayList<>())
+ p -> new ConfigValue(overriddenConsumerGroupIdConfig,
consumerGroupId, List.of(), new ArrayList<>())
);
if (workerGroupId.equals(consumerGroupId)) {
validatedGroupId.addErrorMessage("Consumer group " +
consumerGroupId +
@@ -1195,7 +1193,7 @@ public class DistributedHerder extends AbstractHerder
implements Runnable {
// if the connector is reassigned during the ensuing
rebalance, it is likely that it will immediately generate
// a non-empty set of task configs). A STOPPED connector
with a non-empty set of tasks is less acceptable
// and likely to confuse users.
- writeTaskConfigs(connName, Collections.emptyList());
+ writeTaskConfigs(connName, List.of());
String stageDescription = "writing the STOPPED target
stage for connector " + connName + " to the config topic";
try (TickThreadStage stage = new
TickThreadStage(stageDescription)) {
configBackingStore.putTargetState(connName,
TargetState.STOPPED);
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/EagerAssignor.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/EagerAssignor.java
index 2d8dba5d758..1004382e102 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/EagerAssignor.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/EagerAssignor.java
@@ -132,13 +132,13 @@ public class EagerAssignor implements ConnectAssignor {
Map<String, ByteBuffer> groupAssignment = new HashMap<>();
for (String member : members) {
- Collection<String> connectors = connectorAssignments.get(member);
+ Collection<String> connectors =
connectorAssignments.getOrDefault(member, List.of());
if (connectors == null) {
- connectors = Collections.emptyList();
+ connectors = List.of();
}
- Collection<ConnectorTaskId> tasks = taskAssignments.get(member);
+ Collection<ConnectorTaskId> tasks =
taskAssignments.getOrDefault(member, List.of());
if (tasks == null) {
- tasks = Collections.emptyList();
+ tasks = List.of();
}
Assignment assignment = new Assignment(error, leaderId, leaderUrl,
maxOffset, connectors, tasks);
log.debug("Assignment: {} -> {}", member, assignment);
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ExtendedAssignment.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ExtendedAssignment.java
index d99b38349a8..3c1f483b1f1 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ExtendedAssignment.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ExtendedAssignment.java
@@ -21,7 +21,6 @@ import org.apache.kafka.connect.util.ConnectorTaskId;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
@@ -56,7 +55,7 @@ public class ExtendedAssignment extends
ConnectProtocol.Assignment {
private static final ExtendedAssignment EMPTY = new ExtendedAssignment(
CONNECT_PROTOCOL_V1, ConnectProtocol.Assignment.NO_ERROR, null,
null, -1,
- Collections.emptyList(), Collections.emptyList(),
Collections.emptyList(), Collections.emptyList(), 0);
+ new ArrayList<>(), new ArrayList<>(), new ArrayList<>(), new
ArrayList<>(), 0);
/**
* Create an assignment indicating responsibility for the given connector
instances and task Ids.
@@ -167,7 +166,7 @@ public class ExtendedAssignment extends
ConnectProtocol.Assignment {
// Using LinkedHashMap preserves the ordering, which is helpful for
tests and debugging
Map<String, Collection<Integer>> taskMap = new LinkedHashMap<>();
Optional.ofNullable(revokedConnectorIds)
- .orElseGet(Collections::emptyList)
+ .orElseGet(List::of)
.stream()
.distinct()
.forEachOrdered(connectorId -> {
@@ -177,7 +176,7 @@ public class ExtendedAssignment extends
ConnectProtocol.Assignment {
});
Optional.ofNullable(revokedTaskIds)
- .orElseGet(Collections::emptyList)
+ .orElseGet(List::of)
.forEach(taskId -> {
String connectorId = taskId.connector();
Collection<Integer> connectorTasks =
@@ -244,7 +243,7 @@ public class ExtendedAssignment extends
ConnectProtocol.Assignment {
Object[] connectors = struct.getArray(key);
if (connectors == null) {
- return Collections.emptyList();
+ return List.of();
}
List<String> connectorIds = new ArrayList<>();
for (Object structObj : connectors) {
@@ -265,7 +264,7 @@ public class ExtendedAssignment extends
ConnectProtocol.Assignment {
Object[] tasks = struct.getArray(key);
if (tasks == null) {
- return Collections.emptyList();
+ return List.of();
}
List<ConnectorTaskId> tasksIds = new ArrayList<>();
for (Object structObj : tasks) {
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
index e5bd9097033..2b8f87c81c7 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
@@ -29,9 +29,7 @@ import org.slf4j.Logger;
import java.nio.ByteBuffer;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collection;
-import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -89,7 +87,7 @@ public class IncrementalCooperativeAssignor implements
ConnectAssignor {
this.candidateWorkersForReassignment = new LinkedHashSet<>();
this.delay = 0;
this.previousGenerationId = -1;
- this.previousMembers = Collections.emptySet();
+ this.previousMembers = Set.of();
this.numSuccessiveRevokingRebalances = 0;
// By default, initial interval is 1. The only corner case is when the
user has set maxDelay to 0
// in which case, the exponential backoff delay should be 0 which
would return the backoff delay to be 0 always
@@ -472,7 +470,7 @@ public class IncrementalCooperativeAssignor implements
ConnectAssignor {
if (scheduledRebalance > 0 && now >= scheduledRebalance) {
// delayed rebalance expired and it's time to assign resources
log.debug("Delayed rebalance expired. Reassigning lost tasks");
- List<WorkerLoad> candidateWorkerLoad = Collections.emptyList();
+ List<WorkerLoad> candidateWorkerLoad = List.of();
if (!candidateWorkersForReassignment.isEmpty()) {
candidateWorkerLoad =
pickCandidateWorkerForReassignment(completeWorkerAssignment);
}
@@ -606,7 +604,7 @@ public class IncrementalCooperativeAssignor implements
ConnectAssignor {
Map<String, Collection<T>> incremental = new HashMap<>();
for (Map.Entry<String, Collection<T>> entry : base.entrySet()) {
List<T> values = new ArrayList<>(entry.getValue());
- values.removeAll(toSubtract.getOrDefault(entry.getKey(),
Collections.emptySet()));
+ values.removeAll(toSubtract.getOrDefault(entry.getKey(),
Set.of()));
incremental.put(entry.getKey(), values);
}
return incremental;
@@ -643,11 +641,11 @@ public class IncrementalCooperativeAssignor implements
ConnectAssignor {
log.trace("No load-balancing revocations required; all workers are
either new "
+ "or will have all currently-assigned connectors and
tasks revoked during this round"
);
- return Collections.emptyMap();
+ return Map.of();
}
if (configured.isEmpty()) {
log.trace("No load-balancing revocations required; no connectors
are currently configured on this cluster");
- return Collections.emptyMap();
+ return Map.of();
}
Map<String, ConnectorsAndTasks.Builder> result = new HashMap<>();
@@ -705,7 +703,7 @@ public class IncrementalCooperativeAssignor implements
ConnectAssignor {
allocatedResourceName,
allocatedResourceName
);
- return Collections.emptyMap();
+ return Map.of();
}
Map<String, Set<E>> result = new HashMap<>();
@@ -887,12 +885,12 @@ public class IncrementalCooperativeAssignor implements
ConnectAssignor {
private final Set<String> allWorkers;
public static final ClusterAssignment EMPTY = new ClusterAssignment(
- Collections.emptyMap(),
- Collections.emptyMap(),
- Collections.emptyMap(),
- Collections.emptyMap(),
- Collections.emptyMap(),
- Collections.emptyMap()
+ Map.of(),
+ Map.of(),
+ Map.of(),
+ Map.of(),
+ Map.of(),
+ Map.of()
);
public ClusterAssignment(
@@ -910,7 +908,7 @@ public class IncrementalCooperativeAssignor implements
ConnectAssignor {
this.allAssignedConnectors = allAssignedConnectors;
this.allAssignedTasks = allAssignedTasks;
this.allWorkers = combineCollections(
- Arrays.asList(newlyAssignedConnectors, newlyAssignedTasks,
newlyRevokedConnectors, newlyRevokedTasks, allAssignedConnectors,
allAssignedTasks),
+ List.of(newlyAssignedConnectors, newlyAssignedTasks,
newlyRevokedConnectors, newlyRevokedTasks, allAssignedConnectors,
allAssignedTasks),
Map::keySet,
Collectors.toSet()
);
@@ -921,7 +919,7 @@ public class IncrementalCooperativeAssignor implements
ConnectAssignor {
}
public Collection<String> newlyAssignedConnectors(String worker) {
- return newlyAssignedConnectors.getOrDefault(worker,
Collections.emptySet());
+ return newlyAssignedConnectors.getOrDefault(worker, Set.of());
}
public Map<String, Collection<ConnectorTaskId>> newlyAssignedTasks() {
@@ -929,7 +927,7 @@ public class IncrementalCooperativeAssignor implements
ConnectAssignor {
}
public Collection<ConnectorTaskId> newlyAssignedTasks(String worker) {
- return newlyAssignedTasks.getOrDefault(worker,
Collections.emptySet());
+ return newlyAssignedTasks.getOrDefault(worker, Set.of());
}
public Map<String, Collection<String>> newlyRevokedConnectors() {
@@ -937,7 +935,7 @@ public class IncrementalCooperativeAssignor implements
ConnectAssignor {
}
public Collection<String> newlyRevokedConnectors(String worker) {
- return newlyRevokedConnectors.getOrDefault(worker,
Collections.emptySet());
+ return newlyRevokedConnectors.getOrDefault(worker, Set.of());
}
public Map<String, Collection<ConnectorTaskId>> newlyRevokedTasks() {
@@ -945,7 +943,7 @@ public class IncrementalCooperativeAssignor implements
ConnectAssignor {
}
public Collection<ConnectorTaskId> newlyRevokedTasks(String worker) {
- return newlyRevokedTasks.getOrDefault(worker,
Collections.emptySet());
+ return newlyRevokedTasks.getOrDefault(worker, Set.of());
}
public Map<String, Collection<String>> allAssignedConnectors() {
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
index 871fe3b33e4..edb174ab52b 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
@@ -36,7 +36,6 @@ import java.io.Closeable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.LinkedHashSet;
@@ -460,7 +459,7 @@ public class WorkerCoordinator extends AbstractCoordinator
implements Closeable
public static class ConnectorsAndTasks {
public static final ConnectorsAndTasks EMPTY =
- new ConnectorsAndTasks(Collections.emptyList(),
Collections.emptyList());
+ new ConnectorsAndTasks(List.of(), List.of());
private final Collection<String> connectors;
private final Collection<ConnectorTaskId> tasks;
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java
index 3b9ec5993c6..bb240af82d7 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java
@@ -38,12 +38,11 @@ import java.io.PrintStream;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.Objects;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
-import static java.util.Collections.singleton;
-
/**
* Write the original consumed record into a dead letter queue. The dead
letter queue is a Kafka topic located
* on the same cluster used by the worker to maintain internal topics. Each
connector is typically configured
@@ -85,7 +84,7 @@ public class DeadLetterQueueReporter implements
ErrorReporter<ConsumerRecord<byt
if (!admin.listTopics().names().get().contains(topic)) {
log.error("Topic {} doesn't exist. Will attempt to create
topic.", topic);
NewTopic schemaTopicRequest = new NewTopic(topic,
DLQ_NUM_DESIRED_PARTITIONS, sinkConfig.dlqTopicReplicationFactor());
- admin.createTopics(singleton(schemaTopicRequest)).all().get();
+ admin.createTopics(Set.of(schemaTopicRequest)).all().get();
}
} catch (InterruptedException e) {
throw new ConnectException("Could not initialize dead letter queue
with topic=" + topic, e);
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java
index 51f4e66d636..b8b74f12e5f 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java
@@ -26,7 +26,6 @@ import org.apache.kafka.connect.runtime.ConnectorConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -101,7 +100,7 @@ public class RetryWithToleranceOperator<T> implements
AutoCloseable {
this.errorHandlingMetrics = errorHandlingMetrics;
this.stopRequestedLatch = stopRequestedLatch;
this.stopping = false;
- this.reporters = Collections.emptyList();
+ this.reporters = List.of();
}
/**
@@ -137,7 +136,7 @@ public class RetryWithToleranceOperator<T> implements
AutoCloseable {
// Visible for testing
synchronized Future<Void> report(ProcessingContext<T> context) {
if (reporters.size() == 1) {
- return new
WorkerErrantRecordReporter.ErrantRecordFuture(Collections.singletonList(reporters.iterator().next().report(context)));
+ return new
WorkerErrantRecordReporter.ErrantRecordFuture(List.of(reporters.get(0).report(context)));
}
List<Future<RecordMetadata>> futures = reporters.stream()
.map(r -> r.report(context))
@@ -357,7 +356,7 @@ public class RetryWithToleranceOperator<T> implements
AutoCloseable {
e.addSuppressed(t);
}
}
- reporters = Collections.emptyList();
+ reporters = List.of();
if (e != null) {
throw e;
}
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
index 9f3cd021750..f89b1f03a75 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
@@ -23,7 +23,6 @@ import org.slf4j.LoggerFactory;
import java.net.URL;
import java.net.URLClassLoader;
-import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -259,7 +258,7 @@ public class DelegatingClassLoader extends URLClassLoader {
fullName,
pluginVersion,
range
- ), Collections.singletonList(pluginVersion));
+ ), List.of(pluginVersion));
}
}
}
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanResult.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanResult.java
index 7d5105012b1..04be35f2d26 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanResult.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanResult.java
@@ -26,7 +26,6 @@ import org.apache.kafka.connect.storage.HeaderConverter;
import org.apache.kafka.connect.transforms.Transformation;
import org.apache.kafka.connect.transforms.predicates.Predicate;
-import java.util.Arrays;
import java.util.List;
import java.util.SortedSet;
import java.util.TreeSet;
@@ -67,7 +66,7 @@ public class PluginScanResult {
this.restExtensions = restExtensions;
this.connectorClientConfigPolicies = connectorClientConfigPolicies;
this.allPlugins =
- Arrays.asList(sinkConnectors, sourceConnectors, converters,
headerConverters, transformations, predicates,
+ List.of(sinkConnectors, sourceConnectors, converters,
headerConverters, transformations, predicates,
configProviders, restExtensions,
connectorClientConfigPolicies);
}
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java
index 433c586b55e..729074d508e 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java
@@ -34,9 +34,7 @@ import java.nio.file.InvalidPathException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collection;
-import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -204,7 +202,7 @@ public class PluginUtils {
public static Set<Path> pluginLocations(String pluginPath, boolean
failFast) {
if (pluginPath == null) {
- return Collections.emptySet();
+ return Set.of();
}
String[] pluginPathElements =
COMMA_WITH_WHITESPACE.split(pluginPath.trim(), -1);
Set<Path> pluginLocations = new LinkedHashSet<>();
@@ -266,7 +264,7 @@ public class PluginUtils {
Set<Path> visited = new HashSet<>();
if (isArchive(topPath)) {
- return Collections.singletonList(topPath);
+ return List.of(topPath);
}
DirectoryStream<Path> topListing = Files.newDirectoryStream(
@@ -335,12 +333,12 @@ public class PluginUtils {
if (containsClassFiles) {
if (archives.isEmpty()) {
- return Collections.singletonList(topPath);
+ return List.of(topPath);
}
log.warn("Plugin path contains both java archives and class files.
Returning only the"
+ " archives");
}
- return Arrays.asList(archives.toArray(new Path[0]));
+ return List.copyOf(archives);
}
public static Set<PluginSource> pluginSources(Set<Path> pluginLocations,
ClassLoader classLoader, PluginClassLoaderFactory factory) {
@@ -475,7 +473,7 @@ public class PluginUtils {
if (classLoader instanceof URLClassLoader) {
URL[] urls = ((URLClassLoader) classLoader).getURLs();
if (urls != null) {
- result.addAll(new HashSet<>(Arrays.asList(urls)));
+ result.addAll(new HashSet<>(List.of(urls)));
}
}
classLoader = classLoader.getParent();
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
index 130db0ab61c..daf9f219992 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
@@ -42,9 +42,7 @@ import org.slf4j.LoggerFactory;
import java.nio.file.Path;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collection;
-import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -80,7 +78,7 @@ public class Plugins {
}
public PluginScanResult initLoaders(Set<PluginSource> pluginSources,
PluginDiscoveryMode discoveryMode) {
- PluginScanResult empty = new PluginScanResult(Collections.emptyList());
+ PluginScanResult empty = new PluginScanResult(List.of());
PluginScanResult serviceLoadingScanResult;
try {
serviceLoadingScanResult = discoveryMode.serviceLoad() ?
@@ -93,7 +91,7 @@ public class Plugins {
}
PluginScanResult reflectiveScanResult =
discoveryMode.reflectivelyScan() ?
new ReflectionScanner().discoverPlugins(pluginSources) : empty;
- PluginScanResult scanResult = new
PluginScanResult(Arrays.asList(reflectiveScanResult, serviceLoadingScanResult));
+ PluginScanResult scanResult = new
PluginScanResult(List.of(reflectiveScanResult, serviceLoadingScanResult));
maybeReportHybridDiscoveryIssue(discoveryMode,
serviceLoadingScanResult, scanResult);
delegatingLoader.installDiscoveredPlugins(scanResult);
return scanResult;
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginsRecommenders.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginsRecommenders.java
index fa9efeed7d3..da9dd0d2cab 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginsRecommenders.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginsRecommenders.java
@@ -22,7 +22,6 @@ import org.apache.kafka.connect.runtime.ConnectorConfig;
import org.apache.kafka.connect.transforms.Transformation;
import org.apache.kafka.connect.transforms.predicates.Predicate;
-import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -90,12 +89,12 @@ public class PluginsRecommenders {
@Override
public List<Object> validValues(String name, Map<String, Object>
parsedConfig) {
if (plugins == null) {
- return Collections.emptyList();
+ return List.of();
}
String connectorClassOrAlias = (String)
parsedConfig.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
if (connectorClassOrAlias == null) {
//should never happen
- return Collections.emptyList();
+ return List.of();
}
List<Object> sourceConnectors =
plugins.sourceConnectors(connectorClassOrAlias).stream()
.map(PluginDesc::version).distinct().collect(Collectors.toList());
@@ -118,7 +117,7 @@ public class PluginsRecommenders {
@Override
public List<Object> validValues(String name, Map<String, Object>
parsedConfig) {
if (plugins == null) {
- return Collections.emptyList();
+ return List.of();
}
return plugins.converters().stream()
.map(PluginDesc::pluginClass).distinct().collect(Collectors.toList());
@@ -135,7 +134,7 @@ public class PluginsRecommenders {
@Override
public List<Object> validValues(String name, Map<String, Object>
parsedConfig) {
if (plugins == null) {
- return Collections.emptyList();
+ return List.of();
}
return plugins.headerConverters().stream()
.map(PluginDesc::pluginClass).distinct().collect(Collectors.toList());
@@ -160,10 +159,10 @@ public class PluginsRecommenders {
@Override
public List<Object> validValues(String name, Map<String, Object>
parsedConfig) {
if (plugins == null) {
- return Collections.emptyList();
+ return List.of();
}
if (parsedConfig.get(converterConfig()) == null) {
- return Collections.emptyList();
+ return List.of();
}
Class converterClass = (Class) parsedConfig.get(converterConfig());
return recommendations().apply(converterClass.getName());
@@ -221,10 +220,10 @@ public class PluginsRecommenders {
@SuppressWarnings({"rawtypes"})
public List<Object> validValues(String name, Map<String, Object>
parsedConfig) {
if (plugins == null) {
- return Collections.emptyList();
+ return List.of();
}
if (parsedConfig.get(classOrAliasConfig) == null) {
- return Collections.emptyList();
+ return List.of();
}
Class classOrAlias = (Class) parsedConfig.get(classOrAliasConfig);
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/ConnectRestServer.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/ConnectRestServer.java
index f5de82dab73..db660f1651f 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/ConnectRestServer.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/ConnectRestServer.java
@@ -26,9 +26,8 @@ import
org.apache.kafka.connect.runtime.rest.resources.RootResource;
import org.glassfish.hk2.utilities.binding.AbstractBinder;
import org.glassfish.jersey.server.ResourceConfig;
-import java.util.Arrays;
import java.util.Collection;
-import java.util.Collections;
+import java.util.List;
import java.util.Map;
public class ConnectRestServer extends RestServer {
@@ -48,7 +47,7 @@ public class ConnectRestServer extends RestServer {
@Override
protected Collection<Class<?>> regularResources() {
- return Arrays.asList(
+ return List.of(
RootResource.class,
ConnectorsResource.class,
InternalConnectResource.class,
@@ -58,9 +57,7 @@ public class ConnectRestServer extends RestServer {
@Override
protected Collection<Class<?>> adminResources() {
- return Collections.singletonList(
- LoggingResource.class
- );
+ return List.of(LoggingResource.class);
}
@Override
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
index 59b02543363..5bbc3312aa7 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
@@ -57,7 +57,6 @@ import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import java.util.Locale;
@@ -97,7 +96,7 @@ public abstract class RestServer {
private final Server jettyServer;
private final RequestTimeout requestTimeout;
- private List<Plugin<ConnectRestExtension>> connectRestExtensionPlugins =
Collections.emptyList();
+ private List<Plugin<ConnectRestExtension>> connectRestExtensionPlugins =
List.of();
/**
* Create a REST server for this herder using the specified configs.
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServerConfig.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServerConfig.java
index 96993c37c5c..3c4980675a0 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServerConfig.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServerConfig.java
@@ -28,7 +28,6 @@ import org.eclipse.jetty.util.StringUtil;
import java.util.Arrays;
import java.util.Collection;
-import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -54,7 +53,7 @@ public abstract class RestServerConfig extends AbstractConfig
{
" Leave hostname empty to bind to default interface.\n" +
" Examples of legal listener lists:
HTTP://myhost:8083,HTTPS://myhost:8084";
// Visible for testing
- static final List<String> LISTENERS_DEFAULT =
Collections.singletonList("http://:8083");
+ static final List<String> LISTENERS_DEFAULT = List.of("http://:8083");
public static final String REST_ADVERTISED_HOST_NAME_CONFIG =
"rest.advertised.host.name";
private static final String REST_ADVERTISED_HOST_NAME_DOC
@@ -391,7 +390,7 @@ public abstract class RestServerConfig extends
AbstractConfig {
@Override
public List<String> adminListeners() {
// Disable admin resources (such as the logging resource)
- return Collections.emptyList();
+ return List.of();
}
@Override
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
index c498f309dd0..148e96a4cee 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
@@ -41,7 +41,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.URI;
-import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -191,7 +190,7 @@ public class ConnectorsResource {
"Topic tracking is disabled.");
}
ActiveTopicsInfo info = herder.connectorActiveTopics(connector);
- return Response.ok(Collections.singletonMap(info.connector(),
info)).build();
+ return Response.ok(Map.of(info.connector(), info)).build();
}
@PUT
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/util/SSLUtils.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/util/SSLUtils.java
index 8f51b6e1b94..33e187130d4 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/util/SSLUtils.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/util/SSLUtils.java
@@ -25,7 +25,6 @@ import org.apache.kafka.connect.runtime.rest.RestServer;
import org.eclipse.jetty.util.ssl.SslContextFactory;
-import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
@@ -123,7 +122,7 @@ public class SSLUtils {
*/
@SuppressWarnings("unchecked")
protected static void
configureSslContextFactoryAlgorithms(SslContextFactory ssl, Map<String, Object>
sslConfigValues) {
- List<String> sslEnabledProtocols = (List<String>)
getOrDefault(sslConfigValues, SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG,
Arrays.asList(COMMA_WITH_WHITESPACE.split(SslConfigs.DEFAULT_SSL_ENABLED_PROTOCOLS)));
+ List<String> sslEnabledProtocols = (List<String>)
getOrDefault(sslConfigValues, SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG,
List.of(COMMA_WITH_WHITESPACE.split(SslConfigs.DEFAULT_SSL_ENABLED_PROTOCOLS)));
ssl.setIncludeProtocols(sslEnabledProtocols.toArray(new String[0]));
String sslProvider = (String)
sslConfigValues.get(SslConfigs.SSL_PROVIDER_CONFIG);
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/ClusterConfigState.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/ClusterConfigState.java
index fc2327a1bf7..5626fbc809d 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/ClusterConfigState.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/ClusterConfigState.java
@@ -23,7 +23,6 @@ import
org.apache.kafka.connect.runtime.WorkerConfigTransformer;
import org.apache.kafka.connect.util.ConnectorTaskId;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -37,15 +36,15 @@ public class ClusterConfigState {
public static final ClusterConfigState EMPTY = new ClusterConfigState(
NO_OFFSET,
null,
- Collections.emptyMap(),
- Collections.emptyMap(),
- Collections.emptyMap(),
- Collections.emptyMap(),
- Collections.emptyMap(),
- Collections.emptyMap(),
- Collections.emptyMap(),
- Collections.emptySet(),
- Collections.emptySet());
+ Map.of(),
+ Map.of(),
+ Map.of(),
+ Map.of(),
+ Map.of(),
+ Map.of(),
+ Map.of(),
+ Set.of(),
+ Set.of());
private final long offset;
private final SessionKey sessionKey;
@@ -232,12 +231,12 @@ public class ClusterConfigState {
*/
public List<ConnectorTaskId> tasks(String connectorName) {
if (inconsistentConnectors.contains(connectorName)) {
- return Collections.emptyList();
+ return List.of();
}
Integer numTasks = connectorTaskCounts.get(connectorName);
if (numTasks == null) {
- return Collections.emptyList();
+ return List.of();
}
List<ConnectorTaskId> taskIds = new ArrayList<>(numTasks);
@@ -245,7 +244,7 @@ public class ClusterConfigState {
ConnectorTaskId taskId = new ConnectorTaskId(connectorName,
taskIndex);
taskIds.add(taskId);
}
- return Collections.unmodifiableList(taskIds);
+ return List.copyOf(taskIds);
}
/**
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java
index 85d241bf7f2..99e7f94fc57 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java
@@ -28,7 +28,6 @@ import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Collection;
-import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
@@ -398,7 +397,7 @@ public class ConnectorOffsetBackingStore implements
OffsetBackingStore {
}
private static Future<Map<ByteBuffer, ByteBuffer>> getFromStore(Optional<?
extends OffsetBackingStore> store, Collection<ByteBuffer> keys) {
- return store.map(s -> s.get(keys)).orElseGet(() ->
CompletableFuture.completedFuture(Collections.emptyMap()));
+ return store.map(s -> s.get(keys)).orElseGet(() ->
CompletableFuture.completedFuture(Map.of()));
}
private class ChainedOffsetWriteFuture implements Future<Void> {
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/FileOffsetBackingStore.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/FileOffsetBackingStore.java
index 59caa612660..200e5e0b48f 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/FileOffsetBackingStore.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/FileOffsetBackingStore.java
@@ -31,7 +31,6 @@ import java.io.ObjectOutputStream;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
-import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
@@ -112,6 +111,6 @@ public class FileOffsetBackingStore extends
MemoryOffsetBackingStore {
@Override
public Set<Map<String, Object>> connectorPartitions(String connectorName) {
- return connectorPartitions.getOrDefault(connectorName,
Collections.emptySet());
+ return connectorPartitions.getOrDefault(connectorName, Set.of());
}
}
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
index 1cd10c79351..0e425301c11 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
@@ -62,7 +62,6 @@ import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
-import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -545,7 +544,7 @@ public final class KafkaConfigBackingStore extends
KafkaTopicBasedBackingStore i
log.debug("Removing connector configuration for connector '{}'",
connector);
try {
Timer timer = time.timer(READ_WRITE_TOTAL_TIMEOUT_MS);
- List<ProducerKeyValue> keyValues = Arrays.asList(
+ List<ProducerKeyValue> keyValues = List.of(
new ProducerKeyValue(CONNECTOR_KEY(connector), null),
new ProducerKeyValue(TARGET_STATE_KEY(connector), null)
);
@@ -792,7 +791,7 @@ public final class KafkaConfigBackingStore extends
KafkaTopicBasedBackingStore i
Map<String, Object> topicSettings = config instanceof DistributedConfig
? ((DistributedConfig)
config).configStorageTopicSettings()
- : Collections.emptyMap();
+ : Map.of();
NewTopic topicDescription = TopicAdmin.defineTopic(topic)
.config(topicSettings) // first so that we override
user-supplied settings as needed
.compacted()
@@ -811,7 +810,7 @@ public final class KafkaConfigBackingStore extends
KafkaTopicBasedBackingStore i
* @param timer Timer bounding how long this method can block. The timer
is updated before the method returns.
*/
private void sendPrivileged(String key, byte[] value, Timer timer) throws
ExecutionException, InterruptedException, TimeoutException {
- sendPrivileged(Collections.singletonList(new ProducerKeyValue(key,
value)), timer);
+ sendPrivileged(List.of(new ProducerKeyValue(key, value)), timer);
}
/**
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
index 7ea2691e0fa..7920b3d6e0c 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
@@ -45,7 +45,6 @@ import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Collection;
-import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
@@ -218,7 +217,7 @@ public class KafkaOffsetBackingStore extends
KafkaTopicBasedBackingStore impleme
protected NewTopic newTopicDescription(final String topic, final
WorkerConfig config) {
Map<String, Object> topicSettings = config instanceof DistributedConfig
? ((DistributedConfig) config).offsetStorageTopicSettings()
- : Collections.emptyMap();
+ : Map.of();
return TopicAdmin.defineTopic(topic)
.config(topicSettings) // first so that we override
user-supplied settings as needed
.compacted()
@@ -297,7 +296,7 @@ public class KafkaOffsetBackingStore extends
KafkaTopicBasedBackingStore impleme
@Override
public Set<Map<String, Object>> connectorPartitions(String connectorName) {
- return connectorPartitions.getOrDefault(connectorName,
Collections.emptySet());
+ return connectorPartitions.getOrDefault(connectorName, Set.of());
}
protected final Callback<ConsumerRecord<byte[], byte[]>> consumedCallback
= (error, record) -> {
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
index b9382399cf4..8de8d9ee18a 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
@@ -53,7 +53,6 @@ import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
-import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -199,7 +198,7 @@ public class KafkaStatusBackingStore extends
KafkaTopicBasedBackingStore impleme
Map<String, Object> topicSettings = config instanceof DistributedConfig
? ((DistributedConfig)
config).statusStorageTopicSettings()
- : Collections.emptyMap();
+ : Map.of();
NewTopic topicDescription = TopicAdmin.defineTopic(statusTopic)
.config(topicSettings) // first so that we override
user-supplied settings as needed
.compacted()
@@ -402,8 +401,8 @@ public class KafkaStatusBackingStore extends
KafkaTopicBasedBackingStore impleme
public Collection<TopicStatus> getAllTopics(String connector) {
ConcurrentMap<String, TopicStatus> activeTopics =
topics.get(Objects.requireNonNull(connector));
return activeTopics != null
- ?
Collections.unmodifiableCollection(Objects.requireNonNull(activeTopics.values()))
- : Collections.emptySet();
+ ? Set.copyOf(Objects.requireNonNull(activeTopics.values()))
+ : Set.of();
}
@Override
@@ -509,7 +508,7 @@ public class KafkaStatusBackingStore extends
KafkaTopicBasedBackingStore impleme
return converter.fromConnectData(
statusTopic,
TOPIC_STATUS_SCHEMA_V0,
- Collections.singletonMap(TOPIC_STATE_KEY, struct));
+ Map.of(TOPIC_STATE_KEY, struct));
}
private String parseConnectorStatusKey(String key) {
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryConfigBackingStore.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryConfigBackingStore.java
index f4981e80953..254aaf89584 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryConfigBackingStore.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryConfigBackingStore.java
@@ -22,11 +22,11 @@ import org.apache.kafka.connect.runtime.TargetState;
import org.apache.kafka.connect.runtime.WorkerConfigTransformer;
import org.apache.kafka.connect.util.ConnectorTaskId;
-import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
@@ -82,11 +82,11 @@ public class MemoryConfigBackingStore implements
ConfigBackingStore {
connectorConfigs,
connectorTargetStates,
taskConfigs,
- Collections.emptyMap(),
- Collections.emptyMap(),
+ Map.of(),
+ Map.of(),
appliedConnectorConfigs,
- Collections.emptySet(),
- Collections.emptySet(),
+ Set.of(),
+ Set.of(),
configTransformer
);
}
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryStatusBackingStore.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryStatusBackingStore.java
index a51a405d3de..a465bea9689 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryStatusBackingStore.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryStatusBackingStore.java
@@ -24,7 +24,6 @@ import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.connect.util.Table;
import java.util.Collection;
-import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
@@ -119,8 +118,8 @@ public class MemoryStatusBackingStore implements
StatusBackingStore {
public Collection<TopicStatus> getAllTopics(String connector) {
ConcurrentMap<String, TopicStatus> activeTopics =
topics.get(Objects.requireNonNull(connector));
return activeTopics != null
- ? Collections.unmodifiableCollection(activeTopics.values())
- : Collections.emptySet();
+ ? Set.copyOf(activeTopics.values())
+ : Set.of();
}
@Override
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageReaderImpl.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageReaderImpl.java
index d9776e05dd3..c17d2fb099c 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageReaderImpl.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageReaderImpl.java
@@ -23,11 +23,10 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer;
-import java.util.Arrays;
import java.util.Collection;
-import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CancellationException;
@@ -61,7 +60,7 @@ public class OffsetStorageReaderImpl implements
CloseableOffsetStorageReader {
@Override
public <T> Map<String, Object> offset(Map<String, T> partition) {
- return offsets(Collections.singletonList(partition)).get(partition);
+ return offsets(List.of(partition)).get(partition);
}
@Override
@@ -73,7 +72,7 @@ public class OffsetStorageReaderImpl implements
CloseableOffsetStorageReader {
try {
// Offsets are treated as schemaless, their format is only
validated here (and the returned value below)
OffsetUtils.validateFormat(key);
- byte[] keySerialized = keyConverter.fromConnectData(namespace,
null, Arrays.asList(namespace, key));
+ byte[] keySerialized = keyConverter.fromConnectData(namespace,
null, List.of(namespace, key));
ByteBuffer keyBuffer = (keySerialized != null) ?
ByteBuffer.wrap(keySerialized) : null;
serializedToOriginal.put(keyBuffer, key);
} catch (Throwable t) {
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/PredicateDoc.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/PredicateDoc.java
index e76b06b55d4..e1415bed575 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/PredicateDoc.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/PredicateDoc.java
@@ -20,9 +20,9 @@ import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.transforms.predicates.Predicate;
-import java.util.Collections;
import java.util.Comparator;
import java.util.List;
+import java.util.Map;
public class PredicateDoc {
@@ -38,7 +38,7 @@ public class PredicateDoc {
}
}
- private static final List<DocInfo> PREDICATES = new
Plugins(Collections.emptyMap()).predicates().stream()
+ private static final List<DocInfo> PREDICATES = new
Plugins(Map.of()).predicates().stream()
.map(p -> {
try {
String overviewDoc = (String)
p.pluginClass().getDeclaredField("OVERVIEW_DOC").get(null);
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/TransformationDoc.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/TransformationDoc.java
index 95eeed2e0f4..e3e9ad063d2 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/TransformationDoc.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/TransformationDoc.java
@@ -34,7 +34,6 @@ import org.apache.kafka.connect.transforms.TimestampConverter;
import org.apache.kafka.connect.transforms.TimestampRouter;
import org.apache.kafka.connect.transforms.ValueToKey;
-import java.util.Arrays;
import java.util.List;
public class TransformationDoc {
@@ -42,7 +41,7 @@ public class TransformationDoc {
private record DocInfo(String transformationName, String overview,
ConfigDef configDef) {
}
- private static final List<DocInfo> TRANSFORMATIONS = Arrays.asList(
+ private static final List<DocInfo> TRANSFORMATIONS = List.of(
new DocInfo(Cast.class.getName(), Cast.OVERVIEW_DOC,
Cast.CONFIG_DEF),
new DocInfo(DropHeaders.class.getName(), DropHeaders.OVERVIEW_DOC,
DropHeaders.CONFIG_DEF),
new DocInfo(ExtractField.class.getName(),
ExtractField.OVERVIEW_DOC, ExtractField.CONFIG_DEF),
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
index e36df1b7dbc..5452ee9e1ee 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
@@ -44,7 +44,6 @@ import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.ArrayDeque;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -185,8 +184,8 @@ public class KafkaBasedLog<K, V> {
Objects.requireNonNull(topicAdmin);
Objects.requireNonNull(readTopicPartition);
return new KafkaBasedLog<>(topic,
- Collections.emptyMap(),
- Collections.emptyMap(),
+ Map.of(),
+ Map.of(),
() -> topicAdmin,
consumedCallback,
time,
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/util/LoggingContext.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/util/LoggingContext.java
index fa6a0b9cccd..a83c515e73a 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/util/LoggingContext.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/util/LoggingContext.java
@@ -19,9 +19,9 @@ package org.apache.kafka.connect.util;
import org.slf4j.MDC;
import java.util.Collection;
-import java.util.Collections;
import java.util.Map;
import java.util.Objects;
+import java.util.Set;
/**
* A utility for defining Mapped Diagnostic Context (MDC) for SLF4J logs.
@@ -49,7 +49,7 @@ public final class LoggingContext implements AutoCloseable {
*/
public static final String CONNECTOR_CONTEXT = "connector.context";
- public static final Collection<String> ALL_CONTEXTS =
Collections.singleton(CONNECTOR_CONTEXT);
+ public static final Collection<String> ALL_CONTEXTS =
Set.of(CONNECTOR_CONTEXT);
/**
* The Scope values used by Connect when specifying the context.
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/util/SinkUtils.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/util/SinkUtils.java
index 70bcf8c427e..620eec2f139 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/SinkUtils.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/SinkUtils.java
@@ -23,7 +23,6 @@ import
org.apache.kafka.connect.runtime.rest.entities.ConnectorOffsets;
import org.apache.kafka.connect.runtime.rest.errors.BadRequestException;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -48,7 +47,7 @@ public final class SinkUtils {
partition.put(KAFKA_TOPIC_KEY,
topicPartitionOffset.getKey().topic());
partition.put(KAFKA_PARTITION_KEY,
topicPartitionOffset.getKey().partition());
connectorOffsets.add(new ConnectorOffset(partition,
- Collections.singletonMap(KAFKA_OFFSET_KEY,
topicPartitionOffset.getValue().offset())));
+ Map.of(KAFKA_OFFSET_KEY,
topicPartitionOffset.getValue().offset())));
}
return new ConnectorOffsets(connectorOffsets);
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/util/Table.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/util/Table.java
index 82c216b4121..a8f783a5709 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/Table.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/Table.java
@@ -16,7 +16,6 @@
*/
package org.apache.kafka.connect.util;
-import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@@ -54,8 +53,8 @@ public class Table<R, C, V> {
public Map<C, V> row(R row) {
Map<C, V> columns = table.get(row);
if (columns == null)
- return Collections.emptyMap();
- return Collections.unmodifiableMap(columns);
+ return Map.of();
+ return Map.copyOf(columns);
}
public boolean isEmpty() {
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
index 348beb00233..67285c1c197 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
@@ -56,7 +56,6 @@ import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
-import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -73,8 +72,8 @@ import java.util.stream.Collectors;
*/
public class TopicAdmin implements AutoCloseable {
- public static final TopicCreationResponse EMPTY_CREATION = new
TopicCreationResponse(Collections.emptySet(), Collections.emptySet());
- private static final List<Class<? extends Exception>>
CAUSES_TO_RETRY_TOPIC_CREATION = Arrays.asList(
+ public static final TopicCreationResponse EMPTY_CREATION = new
TopicCreationResponse(Set.of(), Set.of());
+ private static final List<Class<? extends Exception>>
CAUSES_TO_RETRY_TOPIC_CREATION = List.of(
InvalidReplicationFactorException.class,
TimeoutException.class);
@@ -84,8 +83,8 @@ public class TopicAdmin implements AutoCloseable {
private final Set<String> existing;
public TopicCreationResponse(Set<String> createdTopicNames,
Set<String> existingTopicNames) {
- this.created = Collections.unmodifiableSet(createdTopicNames);
- this.existing = Collections.unmodifiableSet(existingTopicNames);
+ this.created = Set.copyOf(createdTopicNames);
+ this.existing = Set.copyOf(existingTopicNames);
}
public Set<String> createdTopics() {
@@ -473,12 +472,12 @@ public class TopicAdmin implements AutoCloseable {
*/
public Map<String, TopicDescription> describeTopics(String... topics) {
if (topics == null) {
- return Collections.emptyMap();
+ return Map.of();
}
String topicNameList = String.join(", ", topics);
Map<String, KafkaFuture<TopicDescription>> newResults =
- admin.describeTopics(Arrays.asList(topics), new
DescribeTopicsOptions()).topicNameValues();
+ admin.describeTopics(List.of(topics), new
DescribeTopicsOptions()).topicNameValues();
// Iterate over each future so that we can handle individual failures
like when some topics don't exist
Map<String, TopicDescription> existingTopics = new HashMap<>();
@@ -536,7 +535,7 @@ public class TopicAdmin implements AutoCloseable {
+ "describe topic configurations.", topic,
TopicConfig.CLEANUP_POLICY_COMPACT);
return false;
}
- Set<String> expectedPolicies =
Collections.singleton(TopicConfig.CLEANUP_POLICY_COMPACT);
+ Set<String> expectedPolicies =
Set.of(TopicConfig.CLEANUP_POLICY_COMPACT);
if (!cleanupPolicies.equals(expectedPolicies)) {
String expectedPolicyStr = String.join(",", expectedPolicies);
String cleanupPolicyStr = String.join(",", cleanupPolicies);
@@ -566,7 +565,7 @@ public class TopicAdmin implements AutoCloseable {
if (topicConfig == null) {
// The topic must not exist
log.debug("Unable to find topic '{}' when getting cleanup policy",
topic);
- return Collections.emptySet();
+ return Set.of();
}
ConfigEntry entry = topicConfig.get(CLEANUP_POLICY_CONFIG);
if (entry != null && entry.value() != null) {
@@ -581,7 +580,7 @@ public class TopicAdmin implements AutoCloseable {
// This is unexpected, as the topic config should include the
cleanup.policy even if
// the topic settings don't override the broker's log.cleanup.policy.
But just to be safe.
log.debug("Found no cleanup.policy for topic '{}'", topic);
- return Collections.emptySet();
+ return Set.of();
}
/**
@@ -620,7 +619,7 @@ public class TopicAdmin implements AutoCloseable {
*/
public Map<String, Config> describeTopicConfigs(String... topicNames) {
if (topicNames == null) {
- return Collections.emptyMap();
+ return Map.of();
}
Collection<String> topics = Arrays.stream(topicNames)
.filter(Objects::nonNull)
@@ -628,7 +627,7 @@ public class TopicAdmin implements AutoCloseable {
.filter(s -> !s.isEmpty())
.collect(Collectors.toList());
if (topics.isEmpty()) {
- return Collections.emptyMap();
+ return Map.of();
}
String topicNameList = String.join(", ", topics);
Collection<ConfigResource> resources = topics.stream()
@@ -686,7 +685,7 @@ public class TopicAdmin implements AutoCloseable {
*/
public Map<TopicPartition, Long> endOffsets(Set<TopicPartition>
partitions) {
if (partitions == null || partitions.isEmpty()) {
- return Collections.emptyMap();
+ return Map.of();
}
Map<TopicPartition, OffsetSpec> offsetSpecMap =
partitions.stream().collect(Collectors.toMap(Function.identity(), tp ->
OffsetSpec.latest()));
ListOffsetsResult resultFuture = admin.listOffsets(offsetSpecMap, new
ListOffsetsOptions(IsolationLevel.READ_UNCOMMITTED));
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicCreation.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicCreation.java
index 45c12aa292a..f98d1afa5b2 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicCreation.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicCreation.java
@@ -18,7 +18,6 @@ package org.apache.kafka.connect.util;
import org.apache.kafka.connect.runtime.WorkerConfig;
-import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.Map;
@@ -32,7 +31,7 @@ import static
org.apache.kafka.connect.runtime.TopicCreationConfig.DEFAULT_TOPIC
*/
public class TopicCreation {
private static final TopicCreation EMPTY =
- new TopicCreation(false, null, Collections.emptyMap(),
Collections.emptySet());
+ new TopicCreation(false, null, Map.of(), Set.of());
private final boolean isTopicCreationEnabled;
private final TopicCreationGroup defaultTopicGroup;
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicCreationGroup.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicCreationGroup.java
index 5393fd2a013..e5694c944d4 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicCreationGroup.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicCreationGroup.java
@@ -20,7 +20,6 @@ import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.connect.runtime.SourceConnectorConfig;
import org.apache.kafka.connect.runtime.TopicCreationConfig;
-import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -64,7 +63,7 @@ public class TopicCreationGroup {
*/
public static Map<String, TopicCreationGroup>
configuredGroups(SourceConnectorConfig config) {
if (!config.usesTopicCreation()) {
- return Collections.emptyMap();
+ return Map.of();
}
List<String> groupNames = config.getList(TOPIC_CREATION_GROUPS_CONFIG);
Map<String, TopicCreationGroup> groups = new LinkedHashMap<>();