This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 7527a8bac08 MINOR: Cleanup Connect Module (4/n) (#20389)
7527a8bac08 is described below

commit 7527a8bac088ac551a34c7ef2447588f70d8ec7e
Author: Sanskar Jhajharia <[email protected]>
AuthorDate: Thu Aug 28 02:53:22 2025 +0530

    MINOR: Cleanup Connect Module (4/n) (#20389)
    
    Now that Kafka support Java 17, this PR makes some changes in connect
    module. The changes in this PR are limited to only some files. A future
    PR(s) shall follow.
    
    The changes mostly include:
    - Collections.emptyList(), Collections.singletonList() and
    Arrays.asList() are replaced with List.of()
    - Collections.emptyMap() and Collections.singletonMap() are replaced
    with Map.of()
    - Collections.singleton() is replaced with Set.of()
    
    Modules target: runtime/src/main
    
    Reviewers: Ken Huang <[email protected]>, Chia-Ping Tsai
     <[email protected]>
---
 .../kafka/connect/cli/AbstractConnectCli.java      |  6 ++--
 .../kafka/connect/cli/ConnectDistributed.java      |  6 ++--
 .../kafka/connect/cli/ConnectStandalone.java       |  3 +-
 .../kafka/connect/runtime/AbstractHerder.java      |  8 ++---
 .../kafka/connect/runtime/ConnectMetrics.java      |  2 +-
 .../connect/runtime/ConnectMetricsRegistry.java    | 22 ++++++-------
 .../kafka/connect/runtime/ConnectorConfig.java     | 17 ++++------
 .../kafka/connect/runtime/SinkConnectorConfig.java |  5 ++-
 .../connect/runtime/SourceConnectorConfig.java     |  8 ++---
 .../kafka/connect/runtime/SubmittedRecords.java    |  2 +-
 .../kafka/connect/runtime/TopicCreationConfig.java |  7 ++---
 .../org/apache/kafka/connect/runtime/Worker.java   |  9 +++---
 .../apache/kafka/connect/runtime/WorkerConfig.java |  5 ++-
 .../kafka/connect/runtime/WorkerSinkTask.java      |  9 +++---
 .../connect/runtime/WorkerSinkTaskContext.java     | 13 ++++----
 .../runtime/distributed/ConnectProtocol.java       |  4 +--
 .../runtime/distributed/DistributedConfig.java     |  3 +-
 .../runtime/distributed/DistributedHerder.java     | 14 ++++-----
 .../connect/runtime/distributed/EagerAssignor.java |  8 ++---
 .../runtime/distributed/ExtendedAssignment.java    | 11 +++----
 .../IncrementalCooperativeAssignor.java            | 36 ++++++++++------------
 .../runtime/distributed/WorkerCoordinator.java     |  3 +-
 .../runtime/errors/DeadLetterQueueReporter.java    |  5 ++-
 .../runtime/errors/RetryWithToleranceOperator.java |  7 ++---
 .../runtime/isolation/DelegatingClassLoader.java   |  3 +-
 .../runtime/isolation/PluginScanResult.java        |  3 +-
 .../connect/runtime/isolation/PluginUtils.java     | 12 +++-----
 .../kafka/connect/runtime/isolation/Plugins.java   |  6 ++--
 .../runtime/isolation/PluginsRecommenders.java     | 17 +++++-----
 .../connect/runtime/rest/ConnectRestServer.java    |  9 ++----
 .../kafka/connect/runtime/rest/RestServer.java     |  3 +-
 .../connect/runtime/rest/RestServerConfig.java     |  5 ++-
 .../runtime/rest/resources/ConnectorsResource.java |  3 +-
 .../kafka/connect/runtime/rest/util/SSLUtils.java  |  3 +-
 .../kafka/connect/storage/ClusterConfigState.java  | 25 ++++++++-------
 .../storage/ConnectorOffsetBackingStore.java       |  3 +-
 .../connect/storage/FileOffsetBackingStore.java    |  3 +-
 .../connect/storage/KafkaConfigBackingStore.java   |  7 ++---
 .../connect/storage/KafkaOffsetBackingStore.java   |  5 ++-
 .../connect/storage/KafkaStatusBackingStore.java   |  9 +++---
 .../connect/storage/MemoryConfigBackingStore.java  | 10 +++---
 .../connect/storage/MemoryStatusBackingStore.java  |  5 ++-
 .../connect/storage/OffsetStorageReaderImpl.java   |  7 ++---
 .../apache/kafka/connect/tools/PredicateDoc.java   |  4 +--
 .../kafka/connect/tools/TransformationDoc.java     |  3 +-
 .../apache/kafka/connect/util/KafkaBasedLog.java   |  5 ++-
 .../apache/kafka/connect/util/LoggingContext.java  |  4 +--
 .../org/apache/kafka/connect/util/SinkUtils.java   |  3 +-
 .../java/org/apache/kafka/connect/util/Table.java  |  5 ++-
 .../org/apache/kafka/connect/util/TopicAdmin.java  | 25 ++++++++-------
 .../apache/kafka/connect/util/TopicCreation.java   |  3 +-
 .../kafka/connect/util/TopicCreationGroup.java     |  3 +-
 52 files changed, 178 insertions(+), 228 deletions(-)

diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/AbstractConnectCli.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/AbstractConnectCli.java
index 5cfa300bafc..8c0d30b1c99 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/AbstractConnectCli.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/AbstractConnectCli.java
@@ -34,7 +34,7 @@ import org.slf4j.LoggerFactory;
 
 import java.net.URI;
 import java.util.Arrays;
-import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 
 /**
@@ -82,7 +82,7 @@ public abstract class AbstractConnectCli<H extends Herder, T 
extends WorkerConfi
      *  Validate {@link #args}, process worker properties from the first CLI 
argument, and start {@link Connect}
      */
     public void run() {
-        if (args.length < 1 || Arrays.asList(args).contains("--help")) {
+        if (args.length < 1 || List.of(args).contains("--help")) {
             log.info("Usage: {}", usage());
             Exit.exit(1);
         }
@@ -90,7 +90,7 @@ public abstract class AbstractConnectCli<H extends Herder, T 
extends WorkerConfi
         try {
             String workerPropsFile = args[0];
             Map<String, String> workerProps = !workerPropsFile.isEmpty() ?
-                    Utils.propsToStringMap(Utils.loadProps(workerPropsFile)) : 
Collections.emptyMap();
+                    Utils.propsToStringMap(Utils.loadProps(workerPropsFile)) : 
Map.of();
             String[] extraArgs = Arrays.copyOfRange(args, 1, args.length);
             Connect<H> connect = startConnect(workerProps);
             processExtraArgs(connect, extraArgs);
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
index 8763dd908a1..59b943ae919 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
@@ -36,8 +36,8 @@ import org.apache.kafka.connect.storage.StatusBackingStore;
 import org.apache.kafka.connect.util.ConnectUtils;
 import org.apache.kafka.connect.util.SharedTopicAdmin;
 
-import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 import static org.apache.kafka.clients.CommonClientConfigs.CLIENT_ID_CONFIG;
@@ -77,7 +77,7 @@ public class ConnectDistributed extends 
AbstractConnectCli<DistributedHerder, Di
 
         KafkaOffsetBackingStore offsetBackingStore = new 
KafkaOffsetBackingStore(sharedAdmin, () -> clientIdBase,
                 plugins.newInternalConverter(true, 
JsonConverter.class.getName(),
-                        
Collections.singletonMap(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, "false")));
+                        Map.of(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, 
"false")));
         offsetBackingStore.configure(config);
 
         Worker worker = new Worker(workerId, Time.SYSTEM, plugins, config, 
offsetBackingStore, connectorClientConfigOverridePolicy);
@@ -99,7 +99,7 @@ public class ConnectDistributed extends 
AbstractConnectCli<DistributedHerder, Di
         return new DistributedHerder(config, Time.SYSTEM, worker,
                 kafkaClusterId, statusBackingStore, configBackingStore,
                 restServer.advertisedUrl().toString(), restClient, 
connectorClientConfigOverridePolicy,
-                Collections.emptyList(), sharedAdmin);
+                List.of(), sharedAdmin);
     }
 
     @Override
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
index 43af6b274b6..ded4103c69c 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
@@ -49,7 +49,6 @@ import org.slf4j.LoggerFactory;
 import java.io.File;
 import java.io.IOException;
 import java.nio.file.Paths;
-import java.util.Collections;
 import java.util.Map;
 
 import static org.apache.kafka.connect.runtime.ConnectorConfig.NAME_CONFIG;
@@ -163,7 +162,7 @@ public class ConnectStandalone extends 
AbstractConnectCli<StandaloneHerder, Stan
                                   RestServer restServer, RestClient 
restClient) {
 
         OffsetBackingStore offsetBackingStore = new 
FileOffsetBackingStore(plugins.newInternalConverter(
-                true, JsonConverter.class.getName(), 
Collections.singletonMap(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, "false")));
+                true, JsonConverter.class.getName(), 
Map.of(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, "false")));
         offsetBackingStore.configure(config);
 
         Worker worker = new Worker(workerId, Time.SYSTEM, plugins, config, 
offsetBackingStore,
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
index bce67129388..165a540ca62 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
@@ -555,7 +555,7 @@ public abstract class AbstractHerder implements Herder, 
TaskStatus.Listener, Con
                 "header converter",
                 HEADER_CONVERTER_CLASS_CONFIG,
                 HEADER_CONVERTER_VERSION_CONFIG,
-                Collections.singletonMap(ConverterConfig.TYPE_CONFIG, 
ConverterType.HEADER.getName()),
+                Map.of(ConverterConfig.TYPE_CONFIG, 
ConverterType.HEADER.getName()),
                 connectorLoader,
                 reportStage
         );
@@ -568,7 +568,7 @@ public abstract class AbstractHerder implements Herder, 
TaskStatus.Listener, Con
                 "key converter",
                 KEY_CONVERTER_CLASS_CONFIG,
                 KEY_CONVERTER_VERSION_CONFIG,
-                Collections.singletonMap(ConverterConfig.TYPE_CONFIG, 
ConverterType.KEY.getName()),
+                Map.of(ConverterConfig.TYPE_CONFIG, 
ConverterType.KEY.getName()),
                 connectorLoader,
                 reportStage
         );
@@ -582,7 +582,7 @@ public abstract class AbstractHerder implements Herder, 
TaskStatus.Listener, Con
                 "value converter",
                 VALUE_CONVERTER_CLASS_CONFIG,
                 VALUE_CONVERTER_VERSION_CONFIG,
-                Collections.singletonMap(ConverterConfig.TYPE_CONFIG, 
ConverterType.VALUE.getName()),
+                Map.of(ConverterConfig.TYPE_CONFIG, 
ConverterType.VALUE.getName()),
                 connectorLoader,
                 reportStage
         );
@@ -1278,7 +1278,7 @@ public abstract class AbstractHerder implements Herder, 
TaskStatus.Listener, Con
 
         if (!loggers.isValidLevel(normalizedLevel)) {
             log.warn("Ignoring request to set invalid level '{}' for namespace 
{}", desiredLevelStr, namespace);
-            return Collections.emptyList();
+            return List.of();
         }
 
         return loggers.setLevel(namespace, normalizedLevel);
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java
index 90e6650df37..83408649000 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java
@@ -253,7 +253,7 @@ public class ConnectMetrics {
             Objects.requireNonNull(groupName);
             Objects.requireNonNull(tags);
             this.groupName = groupName;
-            this.tags = Collections.unmodifiableMap(new LinkedHashMap<>(tags));
+            this.tags = Collections.unmodifiableMap(new 
LinkedHashMap<>(tags)); // To ensure the order of insertion, we have to use 
Collections.
             this.hc = Objects.hash(this.groupName, this.tags);
             StringBuilder sb = new StringBuilder(this.groupName);
             for (Map.Entry<String, String> entry : this.tags.entrySet()) {
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetricsRegistry.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetricsRegistry.java
index 496c838cf45..7f879ea8f2a 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetricsRegistry.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetricsRegistry.java
@@ -19,8 +19,6 @@ package org.apache.kafka.connect.runtime;
 import org.apache.kafka.common.MetricNameTemplate;
 
 import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
@@ -135,7 +133,7 @@ public class ConnectMetricsRegistry {
     public final MetricNameTemplate predicateClass;
     public final MetricNameTemplate predicateVersion;
 
-    public Map<MetricNameTemplate, TaskStatus.State> connectorStatusMetrics;
+    public final Map<MetricNameTemplate, TaskStatus.State> 
connectorStatusMetrics;
 
     public ConnectMetricsRegistry() {
         this(new LinkedHashSet<>());
@@ -388,14 +386,14 @@ public class ConnectMetricsRegistry {
             WORKER_GROUP_NAME,
             "The number of restarting tasks of the connector on the worker.", 
workerConnectorTags);
 
-        connectorStatusMetrics = new HashMap<>();
-        connectorStatusMetrics.put(connectorRunningTaskCount, 
TaskStatus.State.RUNNING);
-        connectorStatusMetrics.put(connectorPausedTaskCount, 
TaskStatus.State.PAUSED);
-        connectorStatusMetrics.put(connectorFailedTaskCount, 
TaskStatus.State.FAILED);
-        connectorStatusMetrics.put(connectorUnassignedTaskCount, 
TaskStatus.State.UNASSIGNED);
-        connectorStatusMetrics.put(connectorDestroyedTaskCount, 
TaskStatus.State.DESTROYED);
-        connectorStatusMetrics.put(connectorRestartingTaskCount, 
TaskStatus.State.RESTARTING);
-        connectorStatusMetrics = 
Collections.unmodifiableMap(connectorStatusMetrics);
+        connectorStatusMetrics = Map.of(
+            connectorRunningTaskCount, TaskStatus.State.RUNNING,
+            connectorPausedTaskCount, TaskStatus.State.PAUSED,
+            connectorFailedTaskCount, TaskStatus.State.FAILED,
+            connectorUnassignedTaskCount, TaskStatus.State.UNASSIGNED,
+            connectorDestroyedTaskCount, TaskStatus.State.DESTROYED,
+            connectorRestartingTaskCount, TaskStatus.State.RESTARTING
+        );
 
         /* Worker rebalance level */
         Set<String> rebalanceTags = new LinkedHashSet<>(tags);
@@ -444,7 +442,7 @@ public class ConnectMetricsRegistry {
     }
 
     public List<MetricNameTemplate> getAllTemplates() {
-        return Collections.unmodifiableList(allTemplates);
+        return List.copyOf(allTemplates);
     }
 
     public String connectorTagName() {
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
index efd421bd2e2..ff4d399db1a 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
@@ -47,7 +47,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashSet;
 import java.util.LinkedHashSet;
 import java.util.List;
@@ -250,8 +249,8 @@ public class ConnectorConfig extends AbstractConfig {
                 .define(VALUE_CONVERTER_VERSION_CONFIG, Type.STRING, 
valueConverterDefaults.version, VALUE_CONVERTER_VERSION_VALIDATOR, 
Importance.LOW, VALUE_CONVERTER_VERSION_DOC, COMMON_GROUP, ++orderInGroup, 
Width.SHORT, VALUE_CONVERTER_VERSION_DISPLAY, 
recommender.valueConverterPluginVersionRecommender())
                 .define(HEADER_CONVERTER_CLASS_CONFIG, Type.CLASS, 
headerConverterDefaults.type, HEADER_CONVERTER_CLASS_VALIDATOR, Importance.LOW, 
HEADER_CONVERTER_CLASS_DOC, COMMON_GROUP, ++orderInGroup, Width.SHORT, 
HEADER_CONVERTER_CLASS_DISPLAY, recommender.headerConverterPluginRecommender())
                 .define(HEADER_CONVERTER_VERSION_CONFIG, Type.STRING, 
headerConverterDefaults.version, HEADER_CONVERTER_VERSION_VALIDATOR, 
Importance.LOW, HEADER_CONVERTER_VERSION_DOC, COMMON_GROUP, ++orderInGroup, 
Width.SHORT, HEADER_CONVERTER_VERSION_DISPLAY, 
recommender.headerConverterPluginVersionRecommender())
-                .define(TRANSFORMS_CONFIG, Type.LIST, Collections.emptyList(), 
aliasValidator("transformation"), Importance.LOW, TRANSFORMS_DOC, 
TRANSFORMS_GROUP, ++orderInGroup, Width.LONG, TRANSFORMS_DISPLAY)
-                .define(PREDICATES_CONFIG, Type.LIST, Collections.emptyList(), 
aliasValidator("predicate"), Importance.LOW, PREDICATES_DOC, PREDICATES_GROUP, 
++orderInGroup, Width.LONG, PREDICATES_DISPLAY)
+                .define(TRANSFORMS_CONFIG, Type.LIST, List.of(), 
aliasValidator("transformation"), Importance.LOW, TRANSFORMS_DOC, 
TRANSFORMS_GROUP, ++orderInGroup, Width.LONG, TRANSFORMS_DISPLAY)
+                .define(PREDICATES_CONFIG, Type.LIST, List.of(), 
aliasValidator("predicate"), Importance.LOW, PREDICATES_DOC, PREDICATES_GROUP, 
++orderInGroup, Width.LONG, PREDICATES_DISPLAY)
                 .define(CONFIG_RELOAD_ACTION_CONFIG, Type.STRING, 
CONFIG_RELOAD_ACTION_RESTART,
                         in(CONFIG_RELOAD_ACTION_NONE, 
CONFIG_RELOAD_ACTION_RESTART), Importance.LOW,
                         CONFIG_RELOAD_ACTION_DOC, COMMON_GROUP, 
++orderInGroup, Width.MEDIUM, CONFIG_RELOAD_ACTION_DISPLAY)
@@ -303,7 +302,7 @@ public class ConnectorConfig extends AbstractConfig {
     }
 
     public ConnectorConfig(Plugins plugins) {
-        this(plugins, Collections.emptyMap());
+        this(plugins, Map.of());
     }
 
     public ConnectorConfig(Plugins plugins, Map<String, String> props) {
@@ -646,7 +645,7 @@ public class ConnectorConfig extends AbstractConfig {
                 newDef.define(typeConfig, Type.CLASS, 
ConfigDef.NO_DEFAULT_VALUE, typeValidator, Importance.HIGH,
                         "Class for the '" + alias + "' " + 
aliasKind.toLowerCase(Locale.ENGLISH) + ".", group, orderInGroup++, Width.LONG,
                         baseClass.getSimpleName() + " type for " + alias,
-                        Collections.emptyList(), new ClassRecommender());
+                        List.of(), new ClassRecommender());
 
                 // Add the version configuration
                 final ConfigDef.Validator versionValidator = (name, value) -> {
@@ -664,7 +663,7 @@ public class ConnectorConfig extends AbstractConfig {
                 newDef.define(versionConfig, Type.STRING, defaultVersion, 
versionValidator, Importance.HIGH,
                         "Version of the '" + alias + "' " + 
aliasKind.toLowerCase(Locale.ENGLISH) + ".", group, orderInGroup++, Width.LONG,
                         baseClass.getSimpleName() + " version for " + alias,
-                        Collections.emptyList(), 
versionRecommender(typeConfig));
+                        List.of(), versionRecommender(typeConfig));
 
                 final ConfigDef configDef = populateConfigDef(typeConfig, 
versionConfig, plugins);
                 if (configDef == null) continue;
@@ -780,11 +779,7 @@ public class ConnectorConfig extends AbstractConfig {
 
             @Override
             public List<Object> validValues(String name, Map<String, Object> 
parsedConfig) {
-                List<Object> result = new ArrayList<>();
-                for (PluginDesc<T> plugin : plugins()) {
-                    result.add(plugin.pluginClass());
-                }
-                return Collections.unmodifiableList(result);
+                return plugins().stream().map(p -> (Object) 
p.pluginClass()).toList();
             }
 
             @Override
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java
index 4584255e231..b52b73810fd 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java
@@ -27,7 +27,6 @@ import org.apache.kafka.connect.sink.SinkTask;
 import org.apache.kafka.connect.transforms.util.RegexValidator;
 
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -169,7 +168,7 @@ public class SinkConnectorConfig extends ConnectorConfig {
     private static void addErrorMessage(Map<String, ConfigValue> 
validatedConfig, String name, String value, String errorMessage) {
         validatedConfig.computeIfAbsent(
                 name,
-                p -> new ConfigValue(name, value, Collections.emptyList(), new 
ArrayList<>())
+                p -> new ConfigValue(name, value, List.of(), new ArrayList<>())
         ).addErrorMessage(
                 errorMessage
         );
@@ -189,7 +188,7 @@ public class SinkConnectorConfig extends ConnectorConfig {
     public static List<String> parseTopicsList(Map<String, String> props) {
         List<String> topics = (List<String>) 
ConfigDef.parseType(TOPICS_CONFIG, props.get(TOPICS_CONFIG), Type.LIST);
         if (topics == null) {
-            return Collections.emptyList();
+            return List.of();
         }
         return topics
                 .stream()
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceConnectorConfig.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceConnectorConfig.java
index 336468f491a..e9913e81f4c 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceConnectorConfig.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceConnectorConfig.java
@@ -26,12 +26,12 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
+import java.util.Set;
 import java.util.stream.Collectors;
 
 import static org.apache.kafka.common.utils.Utils.enumOptions;
@@ -132,7 +132,7 @@ public final class SourceConnectorConfig extends 
ConnectorConfig {
                 .define(
                         TOPIC_CREATION_GROUPS_CONFIG,
                         ConfigDef.Type.LIST,
-                        Collections.emptyList(),
+                        List.of(),
                         ConfigDef.CompositeValidator.of(
                                 new ConfigDef.NonNullValidator(),
                                 ConfigDef.LambdaValidator.with(
@@ -240,7 +240,7 @@ public final class SourceConnectorConfig extends 
ConnectorConfig {
         if (topicCreationGroups.contains(DEFAULT_TOPIC_CREATION_GROUP)) {
             log.warn("'{}' topic creation group always exists and does not 
need to be listed explicitly",
                 DEFAULT_TOPIC_CREATION_GROUP);
-            
topicCreationGroups.removeAll(Collections.singleton(DEFAULT_TOPIC_CREATION_GROUP));
+            
topicCreationGroups.removeAll(Set.of(DEFAULT_TOPIC_CREATION_GROUP));
         }
 
         ConfigDef newDef = new ConfigDef(baseConfigDef);
@@ -332,7 +332,7 @@ public final class SourceConnectorConfig extends 
ConnectorConfig {
 
     public Map<String, Object> topicCreationOtherConfigs(String group) {
         if (enrichedSourceConfig == null) {
-            return Collections.emptyMap();
+            return Map.of();
         }
         return enrichedSourceConfig.originalsWithPrefix(TOPIC_CREATION_PREFIX 
+ group + '.').entrySet().stream()
                 .filter(e -> {
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java
index accbb0196d7..86cec8080db 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java
@@ -250,7 +250,7 @@ class SubmittedRecords {
         /**
          * An "empty" snapshot that contains no offsets to commit and whose 
metadata contains no committable or uncommitable messages.
          */
-        public static final CommittableOffsets EMPTY = new 
CommittableOffsets(Collections.emptyMap(), 0, 0, 0, 0, null);
+        public static final CommittableOffsets EMPTY = new 
CommittableOffsets(Map.of(), 0, 0, 0, 0, null);
 
         CommittableOffsets {
             offsets = Collections.unmodifiableMap(offsets);
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TopicCreationConfig.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TopicCreationConfig.java
index 11c2ba9d374..4339fd6f236 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TopicCreationConfig.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TopicCreationConfig.java
@@ -20,7 +20,6 @@ import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.connect.util.TopicAdmin;
 
-import java.util.Collections;
 import java.util.List;
 import java.util.regex.Pattern;
 import java.util.regex.PatternSyntaxException;
@@ -102,11 +101,11 @@ public class TopicCreationConfig {
         int orderInGroup = 0;
         ConfigDef configDef = new ConfigDef();
         configDef
-                .define(INCLUDE_REGEX_CONFIG, ConfigDef.Type.LIST, 
Collections.emptyList(),
+                .define(INCLUDE_REGEX_CONFIG, ConfigDef.Type.LIST, List.of(),
                         REGEX_VALIDATOR, ConfigDef.Importance.LOW,
                         INCLUDE_REGEX_DOC, group, ++orderInGroup, 
ConfigDef.Width.LONG,
                         "Inclusion Topic Pattern for " + group)
-                .define(EXCLUDE_REGEX_CONFIG, ConfigDef.Type.LIST, 
Collections.emptyList(),
+                .define(EXCLUDE_REGEX_CONFIG, ConfigDef.Type.LIST, List.of(),
                         REGEX_VALIDATOR, ConfigDef.Importance.LOW,
                         EXCLUDE_REGEX_DOC, group, ++orderInGroup, 
ConfigDef.Width.LONG,
                         "Exclusion Topic Pattern for " + group)
@@ -129,7 +128,7 @@ public class TopicCreationConfig {
                         new ConfigDef.NonNullValidator(), 
ConfigDef.Importance.LOW,
                         INCLUDE_REGEX_DOC, DEFAULT_TOPIC_CREATION_GROUP, 
++orderInGroup, ConfigDef.Width.LONG,
                         "Inclusion Topic Pattern for " + 
DEFAULT_TOPIC_CREATION_GROUP)
-                .define(EXCLUDE_REGEX_CONFIG, ConfigDef.Type.LIST, 
Collections.emptyList(),
+                .define(EXCLUDE_REGEX_CONFIG, ConfigDef.Type.LIST, List.of(),
                         new ConfigDef.NonNullValidator(), 
ConfigDef.Importance.LOW,
                         EXCLUDE_REGEX_DOC, DEFAULT_TOPIC_CREATION_GROUP, 
++orderInGroup, ConfigDef.Width.LONG,
                         "Exclusion Topic Pattern for " + 
DEFAULT_TOPIC_CREATION_GROUP)
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index a3e914d3f90..53cc40d7fd8 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -108,7 +108,6 @@ import org.slf4j.LoggerFactory;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -197,7 +196,7 @@ public final class Worker {
         this.connectorClientConfigOverridePolicy = 
connectorClientConfigOverridePolicy;
         this.workerMetricsGroup = new WorkerMetricsGroup(this.connectors, 
this.tasks, metrics);
 
-        Map<String, String> internalConverterConfig = 
Collections.singletonMap(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, "false");
+        Map<String, String> internalConverterConfig = 
Map.of(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, "false");
         this.internalKeyConverter = plugins.newInternalConverter(true, 
JsonConverter.class.getName(), internalConverterConfig);
         this.internalValueConverter = plugins.newInternalConverter(false, 
JsonConverter.class.getName(), internalConverterConfig);
 
@@ -539,7 +538,7 @@ public final class Worker {
      */
     public void stopAndAwaitConnector(String connName) {
         stopConnector(connName);
-        awaitStopConnectors(Collections.singletonList(connName));
+        awaitStopConnectors(List.of(connName));
     }
 
     /**
@@ -1149,7 +1148,7 @@ public final class Worker {
      */
     public void stopAndAwaitTask(ConnectorTaskId taskId) {
         stopTask(taskId);
-        awaitStopTasks(Collections.singletonList(taskId));
+        awaitStopTasks(List.of(taskId));
     }
 
     /**
@@ -1562,7 +1561,7 @@ public final class Worker {
     private void resetSinkConnectorOffsets(String connName, String groupId, 
Admin admin, Callback<Message> cb, boolean alterOffsetsResult, Timer timer) {
         DeleteConsumerGroupsOptions deleteConsumerGroupsOptions = new 
DeleteConsumerGroupsOptions().timeoutMs((int) timer.remainingMs());
 
-        admin.deleteConsumerGroups(Collections.singleton(groupId), 
deleteConsumerGroupsOptions)
+        admin.deleteConsumerGroups(Set.of(groupId), 
deleteConsumerGroupsOptions)
                 .all()
                 .whenComplete((ignored, error) -> {
                     // We treat GroupIdNotFoundException as a non-error here 
because resetting a connector's offsets is expected to be an idempotent 
operation
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
index b3aded30682..bec7d2085ec 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
@@ -37,7 +37,6 @@ import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
@@ -254,7 +253,7 @@ public class WorkerConfig extends AbstractConfig {
                 .define(HEADER_CONVERTER_VERSION, Type.STRING,
                         HEADER_CONVERTER_VERSION_DEFAULT, Importance.LOW, 
HEADER_CONVERTER_VERSION_DOC)
                 .define(CONFIG_PROVIDERS_CONFIG, Type.LIST,
-                        Collections.emptyList(),
+                        List.of(),
                         Importance.LOW, CONFIG_PROVIDERS_DOC)
                 .define(CONNECTOR_CLIENT_POLICY_CLASS_CONFIG, Type.STRING, 
CONNECTOR_CLIENT_POLICY_CLASS_DEFAULT,
                         Importance.MEDIUM, CONNECTOR_CLIENT_POLICY_CLASS_DOC)
@@ -316,7 +315,7 @@ public class WorkerConfig extends AbstractConfig {
 
     private void logInternalConverterRemovalWarnings(Map<String, String> 
props) {
         List<String> removedProperties = new ArrayList<>();
-        for (String property : Arrays.asList("internal.key.converter", 
"internal.value.converter")) {
+        for (String property : List.of("internal.key.converter", 
"internal.value.converter")) {
             if (props.containsKey(property)) {
                 removedProperties.add(property);
             }
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
index 14b093c9123..1de9ff2d9a5 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
@@ -63,16 +63,15 @@ import org.slf4j.LoggerFactory;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.function.Function;
 import java.util.function.Supplier;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
-import static java.util.Collections.singleton;
 import static 
org.apache.kafka.connect.runtime.WorkerConfig.TOPIC_TRACKING_ENABLE_CONFIG;
 
 /**
@@ -367,12 +366,12 @@ class WorkerSinkTask extends 
WorkerTask<ConsumerRecord<byte[], byte[]>, SinkReco
 
     //VisibleForTesting
     Map<TopicPartition, OffsetAndMetadata> lastCommittedOffsets() {
-        return Collections.unmodifiableMap(lastCommittedOffsets);
+        return Map.copyOf(lastCommittedOffsets);
     }
 
     //VisibleForTesting
     Map<TopicPartition, OffsetAndMetadata> currentOffsets() {
-        return Collections.unmodifiableMap(currentOffsets);
+        return Map.copyOf(currentOffsets);
     }
 
     private void doCommitSync(Map<TopicPartition, OffsetAndMetadata> offsets, 
int seqno) {
@@ -613,7 +612,7 @@ class WorkerSinkTask extends 
WorkerTask<ConsumerRecord<byte[], byte[]>, SinkReco
     private void resumeAll() {
         for (TopicPartition tp : consumer.assignment())
             if (!context.pausedPartitions().contains(tp))
-                consumer.resume(singleton(tp));
+                consumer.resume(Set.of(tp));
     }
 
     private void pauseAll() {
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java
index 11b8446b3d8..9b31b6e35d9 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java
@@ -27,10 +27,10 @@ import org.apache.kafka.connect.storage.ClusterConfigState;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
@@ -117,7 +117,7 @@ public class WorkerSinkTaskContext implements 
SinkTaskContext {
             if (sinkTask.shouldPause()) {
                 log.debug("{} Connector is paused, so not pausing consumer's 
partitions {}", this, partitions);
             } else {
-                consumer.pause(Arrays.asList(partitions));
+                consumer.pause(List.of(partitions));
                 log.debug("{} Pausing partitions {}. Connector is not 
paused.", this, partitions);
             }
         } catch (IllegalStateException e) {
@@ -131,12 +131,13 @@ public class WorkerSinkTaskContext implements 
SinkTaskContext {
             throw new IllegalWorkerStateException("SinkTaskContext may not be 
used to resume consumption until the task is initialized");
         }
         try {
-            pausedPartitions.removeAll(Arrays.asList(partitions));
+            List<TopicPartition> partitionList = List.of(partitions);
+            partitionList.forEach(pausedPartitions::remove);
             if (sinkTask.shouldPause()) {
-                log.debug("{} Connector is paused, so not resuming consumer's 
partitions {}", this, partitions);
+                log.debug("{} Connector is paused, so not resuming consumer's 
partitions {}", this, partitionList);
             } else {
-                consumer.resume(Arrays.asList(partitions));
-                log.debug("{} Resuming partitions: {}", this, partitions);
+                consumer.resume(partitionList);
+                log.debug("{} Resuming partitions: {}", this, partitionList);
             }
         } catch (IllegalStateException e) {
             throw new IllegalWorkerStateException("SinkTasks may not resume 
partitions that are not currently assigned to them.", e);
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ConnectProtocol.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ConnectProtocol.java
index 2644e105d4d..6b29598ab10 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ConnectProtocol.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ConnectProtocol.java
@@ -27,12 +27,12 @@ import org.apache.kafka.connect.util.ConnectorTaskId;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
 
 import static 
org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocol;
 import static 
org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocolCollection;
@@ -142,7 +142,7 @@ public class ConnectProtocol {
      * @return the collection of Connect protocol metadata
      */
     public static JoinGroupRequestProtocolCollection 
metadataRequest(WorkerState workerState) {
-        return new JoinGroupRequestProtocolCollection(Collections.singleton(
+        return new JoinGroupRequestProtocolCollection(Set.of(
                 new JoinGroupRequestProtocol()
                         .setName(EAGER.protocol())
                         
.setMetadata(ConnectProtocol.serializeMetadata(workerState).array()))
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
index 16ab0d47a3c..6b209da331a 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
@@ -35,7 +35,6 @@ import java.security.Provider;
 import java.security.Security;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Locale;
@@ -218,7 +217,7 @@ public final class DistributedConfig extends WorkerConfig {
             + "on other JVMs, no default is used and a value for this property 
must be manually specified in the worker config.";
 
     public static final String INTER_WORKER_VERIFICATION_ALGORITHMS_CONFIG = 
"inter.worker.verification.algorithms";
-    public static final List<String> 
INTER_WORKER_VERIFICATION_ALGORITHMS_DEFAULT = 
Collections.singletonList(INTER_WORKER_SIGNATURE_ALGORITHM_DEFAULT);
+    public static final List<String> 
INTER_WORKER_VERIFICATION_ALGORITHMS_DEFAULT = 
List.of(INTER_WORKER_SIGNATURE_ALGORITHM_DEFAULT);
     public static final String INTER_WORKER_VERIFICATION_ALGORITHMS_DOC = "A 
list of permitted algorithms for verifying internal requests, "
         + "which must include the algorithm used for the " + 
INTER_WORKER_SIGNATURE_ALGORITHM_CONFIG + " property. "
         + "The algorithm(s) '" + INTER_WORKER_VERIFICATION_ALGORITHMS_DEFAULT 
+ "' will be used as a default on JVMs that provide them; "
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
index 7ad78744062..6c4bed311d3 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -82,9 +82,7 @@ import org.apache.kafka.connect.util.TemporaryStage;
 import org.slf4j.Logger;
 
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -305,7 +303,7 @@ public class DistributedHerder extends AbstractHerder 
implements Runnable {
         this.restClient = restClient;
         this.isTopicTrackingEnabled = 
config.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG);
         this.restNamespace = Objects.requireNonNull(restNamespace);
-        this.uponShutdown = Arrays.asList(uponShutdown);
+        this.uponShutdown = List.of(uponShutdown);
 
         String clientIdConfig = 
config.getString(CommonClientConfigs.CLIENT_ID_CONFIG);
         String clientId = clientIdConfig.isEmpty() ? "connect-" + workerId : 
clientIdConfig;
@@ -713,7 +711,7 @@ public class DistributedHerder extends AbstractHerder 
implements Runnable {
     private void processConnectorConfigUpdates(Set<String> 
connectorConfigUpdates) {
         // If we only have connector config updates, we can just bounce the 
updated connectors that are
         // currently assigned to this worker.
-        Set<String> localConnectors = assignment == null ? 
Collections.emptySet() : new HashSet<>(assignment.connectors());
+        Set<String> localConnectors = assignment == null ? Set.of() : new 
HashSet<>(assignment.connectors());
         Collection<Callable<Void>> connectorsToStart = new ArrayList<>();
         log.trace("Processing connector config updates; "
                 + "currently-owned connectors are {}, and to-be-updated 
connectors are {}",
@@ -769,7 +767,7 @@ public class DistributedHerder extends AbstractHerder 
implements Runnable {
 
     private void 
processTaskConfigUpdatesWithIncrementalCooperative(Set<ConnectorTaskId> 
taskConfigUpdates) {
         Set<ConnectorTaskId> localTasks = assignment == null
-                                          ? Collections.emptySet()
+                                          ? Set.of()
                                           : new HashSet<>(assignment.tasks());
         log.trace("Processing task config updates with incremental cooperative 
rebalance protocol; "
                 + "currently-owned tasks are {}, and to-be-updated tasks are 
{}",
@@ -781,7 +779,7 @@ public class DistributedHerder extends AbstractHerder 
implements Runnable {
 
     private void stopReconfiguredTasks(Set<String> connectors) {
         Set<ConnectorTaskId> localTasks = assignment == null
-                ? Collections.emptySet()
+                ? Set.of()
                 : new HashSet<>(assignment.tasks());
 
         List<ConnectorTaskId> tasksToStop = localTasks.stream()
@@ -966,7 +964,7 @@ public class DistributedHerder extends AbstractHerder 
implements Runnable {
             String consumerGroupId = 
config.get(overriddenConsumerGroupIdConfig);
             ConfigValue validatedGroupId = validatedConfig.computeIfAbsent(
                     overriddenConsumerGroupIdConfig,
-                    p -> new ConfigValue(overriddenConsumerGroupIdConfig, 
consumerGroupId, Collections.emptyList(), new ArrayList<>())
+                    p -> new ConfigValue(overriddenConsumerGroupIdConfig, 
consumerGroupId, List.of(), new ArrayList<>())
             );
             if (workerGroupId.equals(consumerGroupId)) {
                 validatedGroupId.addErrorMessage("Consumer group " + 
consumerGroupId +
@@ -1195,7 +1193,7 @@ public class DistributedHerder extends AbstractHerder 
implements Runnable {
                     // if the connector is reassigned during the ensuing 
rebalance, it is likely that it will immediately generate
                     // a non-empty set of task configs). A STOPPED connector 
with a non-empty set of tasks is less acceptable
                     // and likely to confuse users.
-                    writeTaskConfigs(connName, Collections.emptyList());
+                    writeTaskConfigs(connName, List.of());
                     String stageDescription = "writing the STOPPED target 
stage for connector " + connName + " to the config topic";
                     try (TickThreadStage stage = new 
TickThreadStage(stageDescription)) {
                         configBackingStore.putTargetState(connName, 
TargetState.STOPPED);
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/EagerAssignor.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/EagerAssignor.java
index 2d8dba5d758..1004382e102 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/EagerAssignor.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/EagerAssignor.java
@@ -132,13 +132,13 @@ public class EagerAssignor implements ConnectAssignor {
 
         Map<String, ByteBuffer> groupAssignment = new HashMap<>();
         for (String member : members) {
-            Collection<String> connectors = connectorAssignments.get(member);
+            Collection<String> connectors = 
connectorAssignments.getOrDefault(member, List.of());
             if (connectors == null) {
-                connectors = Collections.emptyList();
+                connectors = List.of();
             }
-            Collection<ConnectorTaskId> tasks = taskAssignments.get(member);
+            Collection<ConnectorTaskId> tasks = 
taskAssignments.getOrDefault(member, List.of());
             if (tasks == null) {
-                tasks = Collections.emptyList();
+                tasks = List.of();
             }
             Assignment assignment = new Assignment(error, leaderId, leaderUrl, 
maxOffset, connectors, tasks);
             log.debug("Assignment: {} -> {}", member, assignment);
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ExtendedAssignment.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ExtendedAssignment.java
index d99b38349a8..3c1f483b1f1 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ExtendedAssignment.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ExtendedAssignment.java
@@ -21,7 +21,6 @@ import org.apache.kafka.connect.util.ConnectorTaskId;
 
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
 import java.util.List;
@@ -56,7 +55,7 @@ public class ExtendedAssignment extends 
ConnectProtocol.Assignment {
 
     private static final ExtendedAssignment EMPTY = new ExtendedAssignment(
             CONNECT_PROTOCOL_V1, ConnectProtocol.Assignment.NO_ERROR, null, 
null, -1,
-            Collections.emptyList(), Collections.emptyList(), 
Collections.emptyList(), Collections.emptyList(), 0);
+            new ArrayList<>(), new ArrayList<>(), new ArrayList<>(), new 
ArrayList<>(), 0);
 
     /**
      * Create an assignment indicating responsibility for the given connector 
instances and task Ids.
@@ -167,7 +166,7 @@ public class ExtendedAssignment extends 
ConnectProtocol.Assignment {
         // Using LinkedHashMap preserves the ordering, which is helpful for 
tests and debugging
         Map<String, Collection<Integer>> taskMap = new LinkedHashMap<>();
         Optional.ofNullable(revokedConnectorIds)
-                .orElseGet(Collections::emptyList)
+                .orElseGet(List::of)
                 .stream()
                 .distinct()
                 .forEachOrdered(connectorId -> {
@@ -177,7 +176,7 @@ public class ExtendedAssignment extends 
ConnectProtocol.Assignment {
                 });
 
         Optional.ofNullable(revokedTaskIds)
-                .orElseGet(Collections::emptyList)
+                .orElseGet(List::of)
                 .forEach(taskId -> {
                     String connectorId = taskId.connector();
                     Collection<Integer> connectorTasks =
@@ -244,7 +243,7 @@ public class ExtendedAssignment extends 
ConnectProtocol.Assignment {
 
         Object[] connectors = struct.getArray(key);
         if (connectors == null) {
-            return Collections.emptyList();
+            return List.of();
         }
         List<String> connectorIds = new ArrayList<>();
         for (Object structObj : connectors) {
@@ -265,7 +264,7 @@ public class ExtendedAssignment extends 
ConnectProtocol.Assignment {
 
         Object[] tasks = struct.getArray(key);
         if (tasks == null) {
-            return Collections.emptyList();
+            return List.of();
         }
         List<ConnectorTaskId> tasksIds = new ArrayList<>();
         for (Object structObj : tasks) {
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
index e5bd9097033..2b8f87c81c7 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
@@ -29,9 +29,7 @@ import org.slf4j.Logger;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -89,7 +87,7 @@ public class IncrementalCooperativeAssignor implements 
ConnectAssignor {
         this.candidateWorkersForReassignment = new LinkedHashSet<>();
         this.delay = 0;
         this.previousGenerationId = -1;
-        this.previousMembers = Collections.emptySet();
+        this.previousMembers = Set.of();
         this.numSuccessiveRevokingRebalances = 0;
         // By default, initial interval is 1. The only corner case is when the 
user has set maxDelay to 0
         // in which case, the exponential backoff delay should be 0 which 
would return the backoff delay to be 0 always
@@ -472,7 +470,7 @@ public class IncrementalCooperativeAssignor implements 
ConnectAssignor {
         if (scheduledRebalance > 0 && now >= scheduledRebalance) {
             // delayed rebalance expired and it's time to assign resources
             log.debug("Delayed rebalance expired. Reassigning lost tasks");
-            List<WorkerLoad> candidateWorkerLoad = Collections.emptyList();
+            List<WorkerLoad> candidateWorkerLoad = List.of();
             if (!candidateWorkersForReassignment.isEmpty()) {
                 candidateWorkerLoad = 
pickCandidateWorkerForReassignment(completeWorkerAssignment);
             }
@@ -606,7 +604,7 @@ public class IncrementalCooperativeAssignor implements 
ConnectAssignor {
         Map<String, Collection<T>> incremental = new HashMap<>();
         for (Map.Entry<String, Collection<T>> entry : base.entrySet()) {
             List<T> values = new ArrayList<>(entry.getValue());
-            values.removeAll(toSubtract.getOrDefault(entry.getKey(), 
Collections.emptySet()));
+            values.removeAll(toSubtract.getOrDefault(entry.getKey(), 
Set.of()));
             incremental.put(entry.getKey(), values);
         }
         return incremental;
@@ -643,11 +641,11 @@ public class IncrementalCooperativeAssignor implements 
ConnectAssignor {
             log.trace("No load-balancing revocations required; all workers are 
either new "
                     + "or will have all currently-assigned connectors and 
tasks revoked during this round"
             );
-            return Collections.emptyMap();
+            return Map.of();
         }
         if (configured.isEmpty()) {
             log.trace("No load-balancing revocations required; no connectors 
are currently configured on this cluster");
-            return Collections.emptyMap();
+            return Map.of();
         }
 
         Map<String, ConnectorsAndTasks.Builder> result = new HashMap<>();
@@ -705,7 +703,7 @@ public class IncrementalCooperativeAssignor implements 
ConnectAssignor {
                     allocatedResourceName,
                     allocatedResourceName
             );
-            return Collections.emptyMap();
+            return Map.of();
         }
 
         Map<String, Set<E>> result = new HashMap<>();
@@ -887,12 +885,12 @@ public class IncrementalCooperativeAssignor implements 
ConnectAssignor {
         private final Set<String> allWorkers;
 
         public static final ClusterAssignment EMPTY = new ClusterAssignment(
-                Collections.emptyMap(),
-                Collections.emptyMap(),
-                Collections.emptyMap(),
-                Collections.emptyMap(),
-                Collections.emptyMap(),
-                Collections.emptyMap()
+                Map.of(),
+                Map.of(),
+                Map.of(),
+                Map.of(),
+                Map.of(),
+                Map.of()
         );
 
         public ClusterAssignment(
@@ -910,7 +908,7 @@ public class IncrementalCooperativeAssignor implements 
ConnectAssignor {
             this.allAssignedConnectors = allAssignedConnectors;
             this.allAssignedTasks = allAssignedTasks;
             this.allWorkers = combineCollections(
-                    Arrays.asList(newlyAssignedConnectors, newlyAssignedTasks, 
newlyRevokedConnectors, newlyRevokedTasks, allAssignedConnectors, 
allAssignedTasks),
+                    List.of(newlyAssignedConnectors, newlyAssignedTasks, 
newlyRevokedConnectors, newlyRevokedTasks, allAssignedConnectors, 
allAssignedTasks),
                     Map::keySet,
                     Collectors.toSet()
             );
@@ -921,7 +919,7 @@ public class IncrementalCooperativeAssignor implements 
ConnectAssignor {
         }
 
         public Collection<String> newlyAssignedConnectors(String worker) {
-            return newlyAssignedConnectors.getOrDefault(worker, 
Collections.emptySet());
+            return newlyAssignedConnectors.getOrDefault(worker, Set.of());
         }
 
         public Map<String, Collection<ConnectorTaskId>> newlyAssignedTasks() {
@@ -929,7 +927,7 @@ public class IncrementalCooperativeAssignor implements 
ConnectAssignor {
         }
 
         public Collection<ConnectorTaskId> newlyAssignedTasks(String worker) {
-            return newlyAssignedTasks.getOrDefault(worker, 
Collections.emptySet());
+            return newlyAssignedTasks.getOrDefault(worker, Set.of());
         }
 
         public Map<String, Collection<String>> newlyRevokedConnectors() {
@@ -937,7 +935,7 @@ public class IncrementalCooperativeAssignor implements 
ConnectAssignor {
         }
 
         public Collection<String> newlyRevokedConnectors(String worker) {
-            return newlyRevokedConnectors.getOrDefault(worker, 
Collections.emptySet());
+            return newlyRevokedConnectors.getOrDefault(worker, Set.of());
         }
 
         public Map<String, Collection<ConnectorTaskId>> newlyRevokedTasks() {
@@ -945,7 +943,7 @@ public class IncrementalCooperativeAssignor implements 
ConnectAssignor {
         }
 
         public Collection<ConnectorTaskId> newlyRevokedTasks(String worker) {
-            return newlyRevokedTasks.getOrDefault(worker, 
Collections.emptySet());
+            return newlyRevokedTasks.getOrDefault(worker, Set.of());
         }
 
         public Map<String, Collection<String>> allAssignedConnectors() {
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
index 871fe3b33e4..edb174ab52b 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
@@ -36,7 +36,6 @@ import java.io.Closeable;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.LinkedHashSet;
@@ -460,7 +459,7 @@ public class WorkerCoordinator extends AbstractCoordinator 
implements Closeable
 
     public static class ConnectorsAndTasks {
         public static final ConnectorsAndTasks EMPTY =
-                new ConnectorsAndTasks(Collections.emptyList(), 
Collections.emptyList());
+                new ConnectorsAndTasks(List.of(), List.of());
 
         private final Collection<String> connectors;
         private final Collection<ConnectorTaskId> tasks;
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java
index 3b9ec5993c6..bb240af82d7 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java
@@ -38,12 +38,11 @@ import java.io.PrintStream;
 import java.nio.charset.StandardCharsets;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 
-import static java.util.Collections.singleton;
-
 /**
  * Write the original consumed record into a dead letter queue. The dead 
letter queue is a Kafka topic located
  * on the same cluster used by the worker to maintain internal topics. Each 
connector is typically configured
@@ -85,7 +84,7 @@ public class DeadLetterQueueReporter implements 
ErrorReporter<ConsumerRecord<byt
             if (!admin.listTopics().names().get().contains(topic)) {
                 log.error("Topic {} doesn't exist. Will attempt to create 
topic.", topic);
                 NewTopic schemaTopicRequest = new NewTopic(topic, 
DLQ_NUM_DESIRED_PARTITIONS, sinkConfig.dlqTopicReplicationFactor());
-                admin.createTopics(singleton(schemaTopicRequest)).all().get();
+                admin.createTopics(Set.of(schemaTopicRequest)).all().get();
             }
         } catch (InterruptedException e) {
             throw new ConnectException("Could not initialize dead letter queue 
with topic=" + topic, e);
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java
index 51f4e66d636..b8b74f12e5f 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java
@@ -26,7 +26,6 @@ import org.apache.kafka.connect.runtime.ConnectorConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -101,7 +100,7 @@ public class RetryWithToleranceOperator<T> implements 
AutoCloseable {
         this.errorHandlingMetrics = errorHandlingMetrics;
         this.stopRequestedLatch = stopRequestedLatch;
         this.stopping = false;
-        this.reporters = Collections.emptyList();
+        this.reporters = List.of();
     }
 
     /**
@@ -137,7 +136,7 @@ public class RetryWithToleranceOperator<T> implements 
AutoCloseable {
     // Visible for testing
     synchronized Future<Void> report(ProcessingContext<T> context) {
         if (reporters.size() == 1) {
-            return new 
WorkerErrantRecordReporter.ErrantRecordFuture(Collections.singletonList(reporters.iterator().next().report(context)));
+            return new 
WorkerErrantRecordReporter.ErrantRecordFuture(List.of(reporters.get(0).report(context)));
         }
         List<Future<RecordMetadata>> futures = reporters.stream()
                 .map(r -> r.report(context))
@@ -357,7 +356,7 @@ public class RetryWithToleranceOperator<T> implements 
AutoCloseable {
                 e.addSuppressed(t);
             }
         }
-        reporters = Collections.emptyList();
+        reporters = List.of();
         if (e != null) {
             throw e;
         }
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
index 9f3cd021750..f89b1f03a75 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
@@ -23,7 +23,6 @@ import org.slf4j.LoggerFactory;
 
 import java.net.URL;
 import java.net.URLClassLoader;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -259,7 +258,7 @@ public class DelegatingClassLoader extends URLClassLoader {
                         fullName,
                         pluginVersion,
                         range
-                ), Collections.singletonList(pluginVersion));
+                ), List.of(pluginVersion));
             }
         }
     }
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanResult.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanResult.java
index 7d5105012b1..04be35f2d26 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanResult.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanResult.java
@@ -26,7 +26,6 @@ import org.apache.kafka.connect.storage.HeaderConverter;
 import org.apache.kafka.connect.transforms.Transformation;
 import org.apache.kafka.connect.transforms.predicates.Predicate;
 
-import java.util.Arrays;
 import java.util.List;
 import java.util.SortedSet;
 import java.util.TreeSet;
@@ -67,7 +66,7 @@ public class PluginScanResult {
         this.restExtensions = restExtensions;
         this.connectorClientConfigPolicies = connectorClientConfigPolicies;
         this.allPlugins =
-            Arrays.asList(sinkConnectors, sourceConnectors, converters, 
headerConverters, transformations, predicates,
+            List.of(sinkConnectors, sourceConnectors, converters, 
headerConverters, transformations, predicates,
                     configProviders, restExtensions, 
connectorClientConfigPolicies);
     }
 
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java
index 433c586b55e..729074d508e 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java
@@ -34,9 +34,7 @@ import java.nio.file.InvalidPathException;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -204,7 +202,7 @@ public class PluginUtils {
 
     public static Set<Path> pluginLocations(String pluginPath, boolean 
failFast) {
         if (pluginPath == null) {
-            return Collections.emptySet();
+            return Set.of();
         }
         String[] pluginPathElements = 
COMMA_WITH_WHITESPACE.split(pluginPath.trim(), -1);
         Set<Path> pluginLocations = new LinkedHashSet<>();
@@ -266,7 +264,7 @@ public class PluginUtils {
         Set<Path> visited = new HashSet<>();
 
         if (isArchive(topPath)) {
-            return Collections.singletonList(topPath);
+            return List.of(topPath);
         }
 
         DirectoryStream<Path> topListing = Files.newDirectoryStream(
@@ -335,12 +333,12 @@ public class PluginUtils {
 
         if (containsClassFiles) {
             if (archives.isEmpty()) {
-                return Collections.singletonList(topPath);
+                return List.of(topPath);
             }
             log.warn("Plugin path contains both java archives and class files. 
Returning only the"
                     + " archives");
         }
-        return Arrays.asList(archives.toArray(new Path[0]));
+        return List.copyOf(archives);
     }
 
     public static Set<PluginSource> pluginSources(Set<Path> pluginLocations, 
ClassLoader classLoader, PluginClassLoaderFactory factory) {
@@ -475,7 +473,7 @@ public class PluginUtils {
             if (classLoader instanceof URLClassLoader) {
                 URL[] urls = ((URLClassLoader) classLoader).getURLs();
                 if (urls != null) {
-                    result.addAll(new HashSet<>(Arrays.asList(urls)));
+                    result.addAll(new HashSet<>(List.of(urls)));
                 }
             }
             classLoader = classLoader.getParent();
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
index 130db0ab61c..daf9f219992 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
@@ -42,9 +42,7 @@ import org.slf4j.LoggerFactory;
 
 import java.nio.file.Path;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -80,7 +78,7 @@ public class Plugins {
     }
 
     public PluginScanResult initLoaders(Set<PluginSource> pluginSources, 
PluginDiscoveryMode discoveryMode) {
-        PluginScanResult empty = new PluginScanResult(Collections.emptyList());
+        PluginScanResult empty = new PluginScanResult(List.of());
         PluginScanResult serviceLoadingScanResult;
         try {
             serviceLoadingScanResult = discoveryMode.serviceLoad() ?
@@ -93,7 +91,7 @@ public class Plugins {
         }
         PluginScanResult reflectiveScanResult = 
discoveryMode.reflectivelyScan() ?
                 new ReflectionScanner().discoverPlugins(pluginSources) : empty;
-        PluginScanResult scanResult = new 
PluginScanResult(Arrays.asList(reflectiveScanResult, serviceLoadingScanResult));
+        PluginScanResult scanResult = new 
PluginScanResult(List.of(reflectiveScanResult, serviceLoadingScanResult));
         maybeReportHybridDiscoveryIssue(discoveryMode, 
serviceLoadingScanResult, scanResult);
         delegatingLoader.installDiscoveredPlugins(scanResult);
         return scanResult;
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginsRecommenders.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginsRecommenders.java
index fa9efeed7d3..da9dd0d2cab 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginsRecommenders.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginsRecommenders.java
@@ -22,7 +22,6 @@ import org.apache.kafka.connect.runtime.ConnectorConfig;
 import org.apache.kafka.connect.transforms.Transformation;
 import org.apache.kafka.connect.transforms.predicates.Predicate;
 
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -90,12 +89,12 @@ public class PluginsRecommenders {
         @Override
         public List<Object> validValues(String name, Map<String, Object> 
parsedConfig) {
             if (plugins == null) {
-                return Collections.emptyList();
+                return List.of();
             }
             String connectorClassOrAlias = (String) 
parsedConfig.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
             if (connectorClassOrAlias == null) {
                 //should never happen
-                return Collections.emptyList();
+                return List.of();
             }
             List<Object> sourceConnectors = 
plugins.sourceConnectors(connectorClassOrAlias).stream()
                     
.map(PluginDesc::version).distinct().collect(Collectors.toList());
@@ -118,7 +117,7 @@ public class PluginsRecommenders {
         @Override
         public List<Object> validValues(String name, Map<String, Object> 
parsedConfig) {
             if (plugins == null) {
-                return Collections.emptyList();
+                return List.of();
             }
             return plugins.converters().stream()
                     
.map(PluginDesc::pluginClass).distinct().collect(Collectors.toList());
@@ -135,7 +134,7 @@ public class PluginsRecommenders {
         @Override
         public List<Object> validValues(String name, Map<String, Object> 
parsedConfig) {
             if (plugins == null) {
-                return Collections.emptyList();
+                return List.of();
             }
             return plugins.headerConverters().stream()
                     
.map(PluginDesc::pluginClass).distinct().collect(Collectors.toList());
@@ -160,10 +159,10 @@ public class PluginsRecommenders {
         @Override
         public List<Object> validValues(String name, Map<String, Object> 
parsedConfig) {
             if (plugins == null) {
-                return Collections.emptyList();
+                return List.of();
             }
             if (parsedConfig.get(converterConfig()) == null) {
-                return Collections.emptyList();
+                return List.of();
             }
             Class converterClass = (Class) parsedConfig.get(converterConfig());
             return recommendations().apply(converterClass.getName());
@@ -221,10 +220,10 @@ public class PluginsRecommenders {
         @SuppressWarnings({"rawtypes"})
         public List<Object> validValues(String name, Map<String, Object> 
parsedConfig) {
             if (plugins == null) {
-                return Collections.emptyList();
+                return List.of();
             }
             if (parsedConfig.get(classOrAliasConfig) == null) {
-                return Collections.emptyList();
+                return List.of();
             }
 
             Class classOrAlias = (Class) parsedConfig.get(classOrAliasConfig);
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/ConnectRestServer.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/ConnectRestServer.java
index f5de82dab73..db660f1651f 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/ConnectRestServer.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/ConnectRestServer.java
@@ -26,9 +26,8 @@ import 
org.apache.kafka.connect.runtime.rest.resources.RootResource;
 import org.glassfish.hk2.utilities.binding.AbstractBinder;
 import org.glassfish.jersey.server.ResourceConfig;
 
-import java.util.Arrays;
 import java.util.Collection;
-import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 
 public class ConnectRestServer extends RestServer {
@@ -48,7 +47,7 @@ public class ConnectRestServer extends RestServer {
 
     @Override
     protected Collection<Class<?>> regularResources() {
-        return Arrays.asList(
+        return List.of(
                 RootResource.class,
                 ConnectorsResource.class,
                 InternalConnectResource.class,
@@ -58,9 +57,7 @@ public class ConnectRestServer extends RestServer {
 
     @Override
     protected Collection<Class<?>> adminResources() {
-        return Collections.singletonList(
-                LoggingResource.class
-        );
+        return List.of(LoggingResource.class);
     }
 
     @Override
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
index 59b02543363..5bbc3312aa7 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
@@ -57,7 +57,6 @@ import java.io.IOException;
 import java.net.URI;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.EnumSet;
 import java.util.List;
 import java.util.Locale;
@@ -97,7 +96,7 @@ public abstract class RestServer {
     private final Server jettyServer;
     private final RequestTimeout requestTimeout;
 
-    private List<Plugin<ConnectRestExtension>> connectRestExtensionPlugins = 
Collections.emptyList();
+    private List<Plugin<ConnectRestExtension>> connectRestExtensionPlugins = 
List.of();
 
     /**
      * Create a REST server for this herder using the specified configs.
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServerConfig.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServerConfig.java
index 96993c37c5c..3c4980675a0 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServerConfig.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServerConfig.java
@@ -28,7 +28,6 @@ import org.eclipse.jetty.util.StringUtil;
 
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
@@ -54,7 +53,7 @@ public abstract class RestServerConfig extends AbstractConfig 
{
             " Leave hostname empty to bind to default interface.\n" +
             " Examples of legal listener lists: 
HTTP://myhost:8083,HTTPS://myhost:8084";
     // Visible for testing
-    static final List<String> LISTENERS_DEFAULT = 
Collections.singletonList("http://:8083";);
+    static final List<String> LISTENERS_DEFAULT = List.of("http://:8083";);
 
     public static final String REST_ADVERTISED_HOST_NAME_CONFIG = 
"rest.advertised.host.name";
     private static final String REST_ADVERTISED_HOST_NAME_DOC
@@ -391,7 +390,7 @@ public abstract class RestServerConfig extends 
AbstractConfig {
         @Override
         public List<String> adminListeners() {
             // Disable admin resources (such as the logging resource)
-            return Collections.emptyList();
+            return List.of();
         }
 
         @Override
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
index c498f309dd0..148e96a4cee 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
@@ -41,7 +41,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.net.URI;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -191,7 +190,7 @@ public class ConnectorsResource {
                     "Topic tracking is disabled.");
         }
         ActiveTopicsInfo info = herder.connectorActiveTopics(connector);
-        return Response.ok(Collections.singletonMap(info.connector(), 
info)).build();
+        return Response.ok(Map.of(info.connector(), info)).build();
     }
 
     @PUT
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/util/SSLUtils.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/util/SSLUtils.java
index 8f51b6e1b94..33e187130d4 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/util/SSLUtils.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/util/SSLUtils.java
@@ -25,7 +25,6 @@ import org.apache.kafka.connect.runtime.rest.RestServer;
 
 import org.eclipse.jetty.util.ssl.SslContextFactory;
 
-import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.regex.Pattern;
@@ -123,7 +122,7 @@ public class SSLUtils {
      */
     @SuppressWarnings("unchecked")
     protected static void 
configureSslContextFactoryAlgorithms(SslContextFactory ssl, Map<String, Object> 
sslConfigValues) {
-        List<String> sslEnabledProtocols = (List<String>) 
getOrDefault(sslConfigValues, SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, 
Arrays.asList(COMMA_WITH_WHITESPACE.split(SslConfigs.DEFAULT_SSL_ENABLED_PROTOCOLS)));
+        List<String> sslEnabledProtocols = (List<String>) 
getOrDefault(sslConfigValues, SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, 
List.of(COMMA_WITH_WHITESPACE.split(SslConfigs.DEFAULT_SSL_ENABLED_PROTOCOLS)));
         ssl.setIncludeProtocols(sslEnabledProtocols.toArray(new String[0]));
 
         String sslProvider = (String) 
sslConfigValues.get(SslConfigs.SSL_PROVIDER_CONFIG);
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/ClusterConfigState.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/ClusterConfigState.java
index fc2327a1bf7..5626fbc809d 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/ClusterConfigState.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/ClusterConfigState.java
@@ -23,7 +23,6 @@ import 
org.apache.kafka.connect.runtime.WorkerConfigTransformer;
 import org.apache.kafka.connect.util.ConnectorTaskId;
 
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -37,15 +36,15 @@ public class ClusterConfigState {
     public static final ClusterConfigState EMPTY = new ClusterConfigState(
             NO_OFFSET,
             null,
-            Collections.emptyMap(),
-            Collections.emptyMap(),
-            Collections.emptyMap(),
-            Collections.emptyMap(),
-            Collections.emptyMap(),
-            Collections.emptyMap(),
-            Collections.emptyMap(),
-            Collections.emptySet(),
-            Collections.emptySet());
+            Map.of(),
+            Map.of(),
+            Map.of(),
+            Map.of(),
+            Map.of(),
+            Map.of(),
+            Map.of(),
+            Set.of(),
+            Set.of());
 
     private final long offset;
     private final SessionKey sessionKey;
@@ -232,12 +231,12 @@ public class ClusterConfigState {
      */
     public List<ConnectorTaskId> tasks(String connectorName) {
         if (inconsistentConnectors.contains(connectorName)) {
-            return Collections.emptyList();
+            return List.of();
         }
 
         Integer numTasks = connectorTaskCounts.get(connectorName);
         if (numTasks == null) {
-            return Collections.emptyList();
+            return List.of();
         }
 
         List<ConnectorTaskId> taskIds = new ArrayList<>(numTasks);
@@ -245,7 +244,7 @@ public class ClusterConfigState {
             ConnectorTaskId taskId = new ConnectorTaskId(connectorName, 
taskIndex);
             taskIds.add(taskId);
         }
-        return Collections.unmodifiableList(taskIds);
+        return List.copyOf(taskIds);
     }
 
     /**
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java
index 85d241bf7f2..99e7f94fc57 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java
@@ -28,7 +28,6 @@ import org.slf4j.LoggerFactory;
 import java.nio.ByteBuffer;
 import java.time.Duration;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
@@ -398,7 +397,7 @@ public class ConnectorOffsetBackingStore implements 
OffsetBackingStore {
     }
 
     private static Future<Map<ByteBuffer, ByteBuffer>> getFromStore(Optional<? 
extends OffsetBackingStore> store, Collection<ByteBuffer> keys) {
-        return store.map(s -> s.get(keys)).orElseGet(() -> 
CompletableFuture.completedFuture(Collections.emptyMap()));
+        return store.map(s -> s.get(keys)).orElseGet(() -> 
CompletableFuture.completedFuture(Map.of()));
     }
 
     private class ChainedOffsetWriteFuture implements Future<Void> {
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/FileOffsetBackingStore.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/FileOffsetBackingStore.java
index 59caa612660..200e5e0b48f 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/FileOffsetBackingStore.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/FileOffsetBackingStore.java
@@ -31,7 +31,6 @@ import java.io.ObjectOutputStream;
 import java.nio.ByteBuffer;
 import java.nio.file.Files;
 import java.nio.file.NoSuchFileException;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
@@ -112,6 +111,6 @@ public class FileOffsetBackingStore extends 
MemoryOffsetBackingStore {
 
     @Override
     public Set<Map<String, Object>> connectorPartitions(String connectorName) {
-        return connectorPartitions.getOrDefault(connectorName, 
Collections.emptySet());
+        return connectorPartitions.getOrDefault(connectorName, Set.of());
     }
 }
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
index 1cd10c79351..0e425301c11 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
@@ -62,7 +62,6 @@ import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Base64;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -545,7 +544,7 @@ public final class KafkaConfigBackingStore extends 
KafkaTopicBasedBackingStore i
         log.debug("Removing connector configuration for connector '{}'", 
connector);
         try {
             Timer timer = time.timer(READ_WRITE_TOTAL_TIMEOUT_MS);
-            List<ProducerKeyValue> keyValues = Arrays.asList(
+            List<ProducerKeyValue> keyValues = List.of(
                     new ProducerKeyValue(CONNECTOR_KEY(connector), null),
                     new ProducerKeyValue(TARGET_STATE_KEY(connector), null)
             );
@@ -792,7 +791,7 @@ public final class KafkaConfigBackingStore extends 
KafkaTopicBasedBackingStore i
 
         Map<String, Object> topicSettings = config instanceof DistributedConfig
                                             ? ((DistributedConfig) 
config).configStorageTopicSettings()
-                                            : Collections.emptyMap();
+                                            : Map.of();
         NewTopic topicDescription = TopicAdmin.defineTopic(topic)
                 .config(topicSettings) // first so that we override 
user-supplied settings as needed
                 .compacted()
@@ -811,7 +810,7 @@ public final class KafkaConfigBackingStore extends 
KafkaTopicBasedBackingStore i
      * @param timer Timer bounding how long this method can block. The timer 
is updated before the method returns.
      */
     private void sendPrivileged(String key, byte[] value, Timer timer) throws 
ExecutionException, InterruptedException, TimeoutException {
-        sendPrivileged(Collections.singletonList(new ProducerKeyValue(key, 
value)), timer);
+        sendPrivileged(List.of(new ProducerKeyValue(key, value)), timer);
     }
 
     /**
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
index 7ea2691e0fa..7920b3d6e0c 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
@@ -45,7 +45,6 @@ import org.slf4j.LoggerFactory;
 import java.nio.ByteBuffer;
 import java.time.Duration;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
@@ -218,7 +217,7 @@ public class KafkaOffsetBackingStore extends 
KafkaTopicBasedBackingStore impleme
     protected NewTopic newTopicDescription(final String topic, final 
WorkerConfig config) {
         Map<String, Object> topicSettings = config instanceof DistributedConfig
                 ? ((DistributedConfig) config).offsetStorageTopicSettings()
-                : Collections.emptyMap();
+                : Map.of();
         return TopicAdmin.defineTopic(topic)
                 .config(topicSettings) // first so that we override 
user-supplied settings as needed
                 .compacted()
@@ -297,7 +296,7 @@ public class KafkaOffsetBackingStore extends 
KafkaTopicBasedBackingStore impleme
 
     @Override
     public Set<Map<String, Object>> connectorPartitions(String connectorName) {
-        return connectorPartitions.getOrDefault(connectorName, 
Collections.emptySet());
+        return connectorPartitions.getOrDefault(connectorName, Set.of());
     }
 
     protected final Callback<ConsumerRecord<byte[], byte[]>> consumedCallback 
= (error, record) -> {
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
index b9382399cf4..8de8d9ee18a 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
@@ -53,7 +53,6 @@ import org.slf4j.LoggerFactory;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -199,7 +198,7 @@ public class KafkaStatusBackingStore extends 
KafkaTopicBasedBackingStore impleme
 
         Map<String, Object> topicSettings = config instanceof DistributedConfig
                                             ? ((DistributedConfig) 
config).statusStorageTopicSettings()
-                                            : Collections.emptyMap();
+                                            : Map.of();
         NewTopic topicDescription = TopicAdmin.defineTopic(statusTopic)
                 .config(topicSettings) // first so that we override 
user-supplied settings as needed
                 .compacted()
@@ -402,8 +401,8 @@ public class KafkaStatusBackingStore extends 
KafkaTopicBasedBackingStore impleme
     public Collection<TopicStatus> getAllTopics(String connector) {
         ConcurrentMap<String, TopicStatus> activeTopics = 
topics.get(Objects.requireNonNull(connector));
         return activeTopics != null
-               ? 
Collections.unmodifiableCollection(Objects.requireNonNull(activeTopics.values()))
-               : Collections.emptySet();
+               ? Set.copyOf(Objects.requireNonNull(activeTopics.values()))
+               : Set.of();
     }
 
     @Override
@@ -509,7 +508,7 @@ public class KafkaStatusBackingStore extends 
KafkaTopicBasedBackingStore impleme
         return converter.fromConnectData(
                 statusTopic,
                 TOPIC_STATUS_SCHEMA_V0,
-                Collections.singletonMap(TOPIC_STATE_KEY, struct));
+                Map.of(TOPIC_STATE_KEY, struct));
     }
 
     private String parseConnectorStatusKey(String key) {
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryConfigBackingStore.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryConfigBackingStore.java
index f4981e80953..254aaf89584 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryConfigBackingStore.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryConfigBackingStore.java
@@ -22,11 +22,11 @@ import org.apache.kafka.connect.runtime.TargetState;
 import org.apache.kafka.connect.runtime.WorkerConfigTransformer;
 import org.apache.kafka.connect.util.ConnectorTaskId;
 
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.TimeUnit;
 
@@ -82,11 +82,11 @@ public class MemoryConfigBackingStore implements 
ConfigBackingStore {
                 connectorConfigs,
                 connectorTargetStates,
                 taskConfigs,
-                Collections.emptyMap(),
-                Collections.emptyMap(),
+                Map.of(),
+                Map.of(),
                 appliedConnectorConfigs,
-                Collections.emptySet(),
-                Collections.emptySet(),
+                Set.of(),
+                Set.of(),
                 configTransformer
         );
     }
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryStatusBackingStore.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryStatusBackingStore.java
index a51a405d3de..a465bea9689 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryStatusBackingStore.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryStatusBackingStore.java
@@ -24,7 +24,6 @@ import org.apache.kafka.connect.util.ConnectorTaskId;
 import org.apache.kafka.connect.util.Table;
 
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
@@ -119,8 +118,8 @@ public class MemoryStatusBackingStore implements 
StatusBackingStore {
     public Collection<TopicStatus> getAllTopics(String connector) {
         ConcurrentMap<String, TopicStatus> activeTopics = 
topics.get(Objects.requireNonNull(connector));
         return activeTopics != null
-               ? Collections.unmodifiableCollection(activeTopics.values())
-               : Collections.emptySet();
+               ? Set.copyOf(activeTopics.values())
+               : Set.of();
     }
 
     @Override
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageReaderImpl.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageReaderImpl.java
index d9776e05dd3..c17d2fb099c 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageReaderImpl.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageReaderImpl.java
@@ -23,11 +23,10 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.nio.ByteBuffer;
-import java.util.Arrays;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CancellationException;
@@ -61,7 +60,7 @@ public class OffsetStorageReaderImpl implements 
CloseableOffsetStorageReader {
 
     @Override
     public <T> Map<String, Object> offset(Map<String, T> partition) {
-        return offsets(Collections.singletonList(partition)).get(partition);
+        return offsets(List.of(partition)).get(partition);
     }
 
     @Override
@@ -73,7 +72,7 @@ public class OffsetStorageReaderImpl implements 
CloseableOffsetStorageReader {
             try {
                 // Offsets are treated as schemaless, their format is only 
validated here (and the returned value below)
                 OffsetUtils.validateFormat(key);
-                byte[] keySerialized = keyConverter.fromConnectData(namespace, 
null, Arrays.asList(namespace, key));
+                byte[] keySerialized = keyConverter.fromConnectData(namespace, 
null, List.of(namespace, key));
                 ByteBuffer keyBuffer = (keySerialized != null) ? 
ByteBuffer.wrap(keySerialized) : null;
                 serializedToOriginal.put(keyBuffer, key);
             } catch (Throwable t) {
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/PredicateDoc.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/PredicateDoc.java
index e76b06b55d4..e1415bed575 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/PredicateDoc.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/PredicateDoc.java
@@ -20,9 +20,9 @@ import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.connect.runtime.isolation.Plugins;
 import org.apache.kafka.connect.transforms.predicates.Predicate;
 
-import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
+import java.util.Map;
 
 public class PredicateDoc {
 
@@ -38,7 +38,7 @@ public class PredicateDoc {
         }
     }
 
-    private static final List<DocInfo> PREDICATES = new 
Plugins(Collections.emptyMap()).predicates().stream()
+    private static final List<DocInfo> PREDICATES = new 
Plugins(Map.of()).predicates().stream()
         .map(p -> {
             try {
                 String overviewDoc = (String) 
p.pluginClass().getDeclaredField("OVERVIEW_DOC").get(null);
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/TransformationDoc.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/TransformationDoc.java
index 95eeed2e0f4..e3e9ad063d2 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/TransformationDoc.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/TransformationDoc.java
@@ -34,7 +34,6 @@ import org.apache.kafka.connect.transforms.TimestampConverter;
 import org.apache.kafka.connect.transforms.TimestampRouter;
 import org.apache.kafka.connect.transforms.ValueToKey;
 
-import java.util.Arrays;
 import java.util.List;
 
 public class TransformationDoc {
@@ -42,7 +41,7 @@ public class TransformationDoc {
     private record DocInfo(String transformationName, String overview, 
ConfigDef configDef) {
     }
 
-    private static final List<DocInfo> TRANSFORMATIONS = Arrays.asList(
+    private static final List<DocInfo> TRANSFORMATIONS = List.of(
             new DocInfo(Cast.class.getName(), Cast.OVERVIEW_DOC, 
Cast.CONFIG_DEF),
             new DocInfo(DropHeaders.class.getName(), DropHeaders.OVERVIEW_DOC, 
DropHeaders.CONFIG_DEF),
             new DocInfo(ExtractField.class.getName(), 
ExtractField.OVERVIEW_DOC, ExtractField.CONFIG_DEF),
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
index e36df1b7dbc..5452ee9e1ee 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
@@ -44,7 +44,6 @@ import org.slf4j.LoggerFactory;
 import java.time.Duration;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -185,8 +184,8 @@ public class KafkaBasedLog<K, V> {
         Objects.requireNonNull(topicAdmin);
         Objects.requireNonNull(readTopicPartition);
         return new KafkaBasedLog<>(topic,
-                Collections.emptyMap(),
-                Collections.emptyMap(),
+                Map.of(),
+                Map.of(),
                 () -> topicAdmin,
                 consumedCallback,
                 time,
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/util/LoggingContext.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/util/LoggingContext.java
index fa6a0b9cccd..a83c515e73a 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/util/LoggingContext.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/util/LoggingContext.java
@@ -19,9 +19,9 @@ package org.apache.kafka.connect.util;
 import org.slf4j.MDC;
 
 import java.util.Collection;
-import java.util.Collections;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
 
 /**
  * A utility for defining Mapped Diagnostic Context (MDC) for SLF4J logs.
@@ -49,7 +49,7 @@ public final class LoggingContext implements AutoCloseable {
      */
     public static final String CONNECTOR_CONTEXT = "connector.context";
 
-    public static final Collection<String> ALL_CONTEXTS = 
Collections.singleton(CONNECTOR_CONTEXT);
+    public static final Collection<String> ALL_CONTEXTS = 
Set.of(CONNECTOR_CONTEXT);
 
     /**
      * The Scope values used by Connect when specifying the context.
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/util/SinkUtils.java 
b/connect/runtime/src/main/java/org/apache/kafka/connect/util/SinkUtils.java
index 70bcf8c427e..620eec2f139 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/SinkUtils.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/SinkUtils.java
@@ -23,7 +23,6 @@ import 
org.apache.kafka.connect.runtime.rest.entities.ConnectorOffsets;
 import org.apache.kafka.connect.runtime.rest.errors.BadRequestException;
 
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -48,7 +47,7 @@ public final class SinkUtils {
             partition.put(KAFKA_TOPIC_KEY, 
topicPartitionOffset.getKey().topic());
             partition.put(KAFKA_PARTITION_KEY, 
topicPartitionOffset.getKey().partition());
             connectorOffsets.add(new ConnectorOffset(partition,
-                    Collections.singletonMap(KAFKA_OFFSET_KEY, 
topicPartitionOffset.getValue().offset())));
+                   Map.of(KAFKA_OFFSET_KEY, 
topicPartitionOffset.getValue().offset())));
         }
 
         return new ConnectorOffsets(connectorOffsets);
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/util/Table.java 
b/connect/runtime/src/main/java/org/apache/kafka/connect/util/Table.java
index 82c216b4121..a8f783a5709 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/Table.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/Table.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.connect.util;
 
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -54,8 +53,8 @@ public class Table<R, C, V> {
     public Map<C, V> row(R row) {
         Map<C, V> columns = table.get(row);
         if (columns == null)
-            return Collections.emptyMap();
-        return Collections.unmodifiableMap(columns);
+            return Map.of();
+        return Map.copyOf(columns);
     }
 
     public boolean isEmpty() {
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java 
b/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
index 348beb00233..67285c1c197 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
@@ -56,7 +56,6 @@ import org.slf4j.LoggerFactory;
 import java.time.Duration;
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -73,8 +72,8 @@ import java.util.stream.Collectors;
  */
 public class TopicAdmin implements AutoCloseable {
 
-    public static final TopicCreationResponse EMPTY_CREATION = new 
TopicCreationResponse(Collections.emptySet(), Collections.emptySet());
-    private static final List<Class<? extends Exception>> 
CAUSES_TO_RETRY_TOPIC_CREATION = Arrays.asList(
+    public static final TopicCreationResponse EMPTY_CREATION = new 
TopicCreationResponse(Set.of(), Set.of());
+    private static final List<Class<? extends Exception>> 
CAUSES_TO_RETRY_TOPIC_CREATION = List.of(
             InvalidReplicationFactorException.class,
             TimeoutException.class);
 
@@ -84,8 +83,8 @@ public class TopicAdmin implements AutoCloseable {
         private final Set<String> existing;
 
         public TopicCreationResponse(Set<String> createdTopicNames, 
Set<String> existingTopicNames) {
-            this.created = Collections.unmodifiableSet(createdTopicNames);
-            this.existing = Collections.unmodifiableSet(existingTopicNames);
+            this.created = Set.copyOf(createdTopicNames);
+            this.existing = Set.copyOf(existingTopicNames);
         }
 
         public Set<String> createdTopics() {
@@ -473,12 +472,12 @@ public class TopicAdmin implements AutoCloseable {
      */
     public Map<String, TopicDescription> describeTopics(String... topics) {
         if (topics == null) {
-            return Collections.emptyMap();
+            return Map.of();
         }
         String topicNameList = String.join(", ", topics);
 
         Map<String, KafkaFuture<TopicDescription>> newResults =
-                admin.describeTopics(Arrays.asList(topics), new 
DescribeTopicsOptions()).topicNameValues();
+                admin.describeTopics(List.of(topics), new 
DescribeTopicsOptions()).topicNameValues();
 
         // Iterate over each future so that we can handle individual failures 
like when some topics don't exist
         Map<String, TopicDescription> existingTopics = new HashMap<>();
@@ -536,7 +535,7 @@ public class TopicAdmin implements AutoCloseable {
                       + "describe topic configurations.", topic, 
TopicConfig.CLEANUP_POLICY_COMPACT);
             return false;
         }
-        Set<String> expectedPolicies = 
Collections.singleton(TopicConfig.CLEANUP_POLICY_COMPACT);
+        Set<String> expectedPolicies = 
Set.of(TopicConfig.CLEANUP_POLICY_COMPACT);
         if (!cleanupPolicies.equals(expectedPolicies)) {
             String expectedPolicyStr = String.join(",", expectedPolicies);
             String cleanupPolicyStr = String.join(",", cleanupPolicies);
@@ -566,7 +565,7 @@ public class TopicAdmin implements AutoCloseable {
         if (topicConfig == null) {
             // The topic must not exist
             log.debug("Unable to find topic '{}' when getting cleanup policy", 
topic);
-            return Collections.emptySet();
+            return Set.of();
         }
         ConfigEntry entry = topicConfig.get(CLEANUP_POLICY_CONFIG);
         if (entry != null && entry.value() != null) {
@@ -581,7 +580,7 @@ public class TopicAdmin implements AutoCloseable {
         // This is unexpected, as the topic config should include the 
cleanup.policy even if
         // the topic settings don't override the broker's log.cleanup.policy. 
But just to be safe.
         log.debug("Found no cleanup.policy for topic '{}'", topic);
-        return Collections.emptySet();
+        return Set.of();
     }
 
     /**
@@ -620,7 +619,7 @@ public class TopicAdmin implements AutoCloseable {
      */
     public Map<String, Config> describeTopicConfigs(String... topicNames) {
         if (topicNames == null) {
-            return Collections.emptyMap();
+            return Map.of();
         }
         Collection<String> topics = Arrays.stream(topicNames)
                                           .filter(Objects::nonNull)
@@ -628,7 +627,7 @@ public class TopicAdmin implements AutoCloseable {
                                           .filter(s -> !s.isEmpty())
                                           .collect(Collectors.toList());
         if (topics.isEmpty()) {
-            return Collections.emptyMap();
+            return Map.of();
         }
         String topicNameList = String.join(", ", topics);
         Collection<ConfigResource> resources = topics.stream()
@@ -686,7 +685,7 @@ public class TopicAdmin implements AutoCloseable {
      */
     public Map<TopicPartition, Long> endOffsets(Set<TopicPartition> 
partitions) {
         if (partitions == null || partitions.isEmpty()) {
-            return Collections.emptyMap();
+            return Map.of();
         }
         Map<TopicPartition, OffsetSpec> offsetSpecMap = 
partitions.stream().collect(Collectors.toMap(Function.identity(), tp -> 
OffsetSpec.latest()));
         ListOffsetsResult resultFuture = admin.listOffsets(offsetSpecMap, new 
ListOffsetsOptions(IsolationLevel.READ_UNCOMMITTED));
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicCreation.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicCreation.java
index 45c12aa292a..f98d1afa5b2 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicCreation.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicCreation.java
@@ -18,7 +18,6 @@ package org.apache.kafka.connect.util;
 
 import org.apache.kafka.connect.runtime.WorkerConfig;
 
-import java.util.Collections;
 import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.Map;
@@ -32,7 +31,7 @@ import static 
org.apache.kafka.connect.runtime.TopicCreationConfig.DEFAULT_TOPIC
  */
 public class TopicCreation {
     private static final TopicCreation EMPTY =
-            new TopicCreation(false, null, Collections.emptyMap(), 
Collections.emptySet());
+            new TopicCreation(false, null, Map.of(), Set.of());
 
     private final boolean isTopicCreationEnabled;
     private final TopicCreationGroup defaultTopicGroup;
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicCreationGroup.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicCreationGroup.java
index 5393fd2a013..e5694c944d4 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicCreationGroup.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicCreationGroup.java
@@ -20,7 +20,6 @@ import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.connect.runtime.SourceConnectorConfig;
 import org.apache.kafka.connect.runtime.TopicCreationConfig;
 
-import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -64,7 +63,7 @@ public class TopicCreationGroup {
      */
     public static Map<String, TopicCreationGroup> 
configuredGroups(SourceConnectorConfig config) {
         if (!config.usesTopicCreation()) {
-            return Collections.emptyMap();
+            return Map.of();
         }
         List<String> groupNames = config.getList(TOPIC_CREATION_GROUPS_CONFIG);
         Map<String, TopicCreationGroup> groups = new LinkedHashMap<>();

Reply via email to