This is an automated email from the ASF dual-hosted git repository.

schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 84d4f35387f MINOR: Share group tidying (#21711)
84d4f35387f is described below

commit 84d4f35387f7cc029bf487aeaa17aa78cc917c9b
Author: Andrew Schofield <[email protected]>
AuthorDate: Fri Mar 13 12:19:56 2026 +0000

    MINOR: Share group tidying (#21711)
    
    Minor tidying up of share group code, mainly fixing compiler warnings.
    Also fixed three flaky tests, all of which were failing occasionally
    because of impatience.
    
    Reviewers: Manikumar Reddy <[email protected]>
---
 .../kafka/clients/consumer/ShareConsumerTest.java  |  30 ++-
 .../kafka/clients/consumer/MockShareConsumer.java  |   3 +-
 .../consumer/internals/ShareCompletedFetch.java    |   4 +-
 .../internals/ShareConsumeRequestManager.java      |   2 +-
 .../consumer/internals/ShareConsumerImpl.java      |  19 +-
 .../clients/consumer/internals/ShareFetch.java     |   3 +-
 .../consumer/internals/ShareFetchCollector.java    |   4 +-
 .../consumer/internals/ShareSessionHandler.java    |   8 +-
 .../events/ShareAcknowledgementEvent.java          |   2 +-
 .../consumer/KafkaShareConsumerMetricsTest.java    |  27 +--
 .../clients/consumer/MockShareConsumerTest.java    |   4 +-
 .../AcknowledgementCommitCallbackHandlerTest.java  |   7 +-
 .../internals/ShareConsumeRequestManagerTest.java  | 211 ++++++++++-----------
 .../consumer/internals/ShareConsumerImplTest.java  |  34 ++--
 .../internals/ShareFetchCollectorTest.java         |   5 +-
 .../ShareHeartbeatRequestManagerTest.java          |  25 +--
 .../internals/ShareMembershipManagerTest.java      | 135 +++++++------
 .../internals/ShareSessionHandlerTest.java         |  11 +-
 18 files changed, 269 insertions(+), 265 deletions(-)

diff --git 
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
index d915e70b831..837db109d94 100644
--- 
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
+++ 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
@@ -1281,9 +1281,8 @@ public class ShareConsumerTest {
         broker.awaitShutdown();
 
         // Assert that close completes in less than 5 seconds, not the full 
30-second timeout.
-        assertTimeoutPreemptively(Duration.ofSeconds(5), () -> {
-            shareConsumer.close();
-        }, "Consumer close should not wait for full timeout when broker is 
already shutdown");
+        assertTimeoutPreemptively(Duration.ofSeconds(5), () -> 
shareConsumer.close(),
+            "Consumer close should not wait for full timeout when broker is 
already shut down");
     }
 
     @ClusterTest
@@ -1871,7 +1870,7 @@ public class ShareConsumerTest {
             }
 
             // We delete records before offset 5, so the LSO should move to 5.
-            adminClient.deleteRecords(Map.of(tp, 
RecordsToDelete.beforeOffset(5L)));
+            assertDoesNotThrow(() -> adminClient.deleteRecords(Map.of(tp, 
RecordsToDelete.beforeOffset(5L))).all().get(), "Failed to delete records");
 
             int messageCount = consumeMessages(new AtomicInteger(0), 5, 
groupId, 1, 10, true);
             // The records returned belong to offsets 5-9.
@@ -1883,14 +1882,14 @@ public class ShareConsumerTest {
             }
 
             // We delete records before offset 14, so the LSO should move to 
14.
-            adminClient.deleteRecords(Map.of(tp, 
RecordsToDelete.beforeOffset(14L)));
+            assertDoesNotThrow(() -> adminClient.deleteRecords(Map.of(tp, 
RecordsToDelete.beforeOffset(14L))).all().get(), "Failed to delete records");
 
             int consumeMessagesCount = consumeMessages(new AtomicInteger(0), 
1, groupId, 1, 10, true);
             // The record returned belong to offset 14.
             assertEquals(1, consumeMessagesCount);
 
             // We delete records before offset 15, so the LSO should move to 
15 and now no records should be returned.
-            adminClient.deleteRecords(Map.of(tp, 
RecordsToDelete.beforeOffset(15L)));
+            assertDoesNotThrow(() -> adminClient.deleteRecords(Map.of(tp, 
RecordsToDelete.beforeOffset(15L))).all().get(), "Failed to delete records");
 
             messageCount = consumeMessages(new AtomicInteger(0), 0, groupId, 
1, 5, true);
             assertEquals(0, messageCount);
@@ -1964,7 +1963,7 @@ public class ShareConsumerTest {
             }
 
             // We delete records before offset 5, so the LSO should move to 5.
-            adminClient.deleteRecords(Map.of(tp, 
RecordsToDelete.beforeOffset(5L)));
+            assertDoesNotThrow(() -> adminClient.deleteRecords(Map.of(tp, 
RecordsToDelete.beforeOffset(5L))).all().get(), "Failed to delete records");
 
             int consumedMessageCount = consumeMessages(new AtomicInteger(0), 
5, "group1", 1, 10, true);
             // The records returned belong to offsets 5-9.
@@ -4379,6 +4378,23 @@ public class ShareConsumerTest {
 
     private void alterShareDeliveryCountLimit(String groupId, String newValue) 
{
         alterShareGroupConfig(groupId, 
GroupConfig.SHARE_DELIVERY_COUNT_LIMIT_CONFIG, newValue);
+
+        // This config is changed dynamically in tests, and we need it to have 
propagated before the test proceeds.
+        // Describing the config with a new admin client is not totally 
foolproof, but it's better than just
+        // altering the config and continuing.
+        try (Admin adminClient = createAdminClient()) {
+            ConfigResource groupConfigResource = new 
ConfigResource(ConfigResource.Type.GROUP, groupId);
+            assertDoesNotThrow(() ->
+                TestUtils.waitForCondition(() -> {
+                    try {
+                        Config config = 
adminClient.describeConfigs(List.of(groupConfigResource)).all().get().get(groupConfigResource);
+                        ConfigEntry entry = 
config.get(GroupConfig.SHARE_DELIVERY_COUNT_LIMIT_CONFIG);
+                        return entry != null && entry.value().equals(newValue);
+                    } catch (Exception e) {
+                        return false;
+                    }
+                }, 10000L, 100L, () -> "New config value did not propagate"), 
"Failed to describe configs");
+        }
     }
 
     private void alterShareIsolationLevel(String groupId, String newValue) {
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/MockShareConsumer.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/MockShareConsumer.java
index 8aeab59a2b4..049732bb2ea 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/MockShareConsumer.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/MockShareConsumer.java
@@ -30,7 +30,6 @@ import org.apache.kafka.common.utils.LogContext;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -142,7 +141,7 @@ public class MockShareConsumer<K, V> implements 
ShareConsumer<K, V> {
     @Override
     public synchronized Map<MetricName, ? extends Metric> metrics() {
         ensureNotClosed();
-        return Collections.emptyMap();
+        return Map.of();
     }
 
     @Override
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetch.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetch.java
index 7fc39c5d5a1..7a6c2291c93 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetch.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetch.java
@@ -179,7 +179,7 @@ public class ShareCompletedFetch {
         ShareInFlightBatch<K, V> inFlightBatch = new 
ShareInFlightBatch<>(nodeId, partition, acquisitionLockTimeoutMs);
 
         if (cachedBatchException != null) {
-            // If the event that a CRC check fails, reject the entire record 
batch because it is corrupt.
+            // In the event that a CRC check fails, reject the entire record 
batch because it is corrupt.
             Set<Long> offsets = rejectRecordBatch(inFlightBatch, currentBatch);
             inFlightBatch.setException(new 
ShareInFlightBatchException(cachedBatchException, offsets));
             cachedBatchException = null;
@@ -249,7 +249,7 @@ public class ShareCompletedFetch {
             }
         } catch (CorruptRecordException e) {
             if (inFlightBatch.isEmpty()) {
-                // If the event that a CRC check fails, reject the entire 
record batch because it is corrupt.
+                // In the event that a CRC check fails, reject the entire 
record batch because it is corrupt.
                 Set<Long> offsets = rejectRecordBatch(inFlightBatch, 
currentBatch);
                 inFlightBatch.setException(new ShareInFlightBatchException(e, 
offsets));
             } else {
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
index c30073d4104..dd1b829bcb3 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
@@ -1237,7 +1237,7 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
                 sessionHandler.addPartitionToFetch(entry.getKey(), 
entry.getValue());
             }
 
-            ShareAcknowledgeRequest.Builder requestBuilder = 
sessionHandler.newShareAcknowledgeBuilder(groupId, shareFetchConfig);
+            ShareAcknowledgeRequest.Builder requestBuilder = 
sessionHandler.newShareAcknowledgeBuilder(groupId);
 
             isProcessed = false;
             Node nodeToSend = metadata.fetch().nodeById(nodeId);
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
index 3a7eb81df60..bc7e591d78b 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
@@ -81,7 +81,6 @@ import java.net.InetSocketAddress;
 import java.time.Duration;
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.ConcurrentModificationException;
 import java.util.HashMap;
 import java.util.LinkedList;
@@ -262,7 +261,7 @@ public class ShareConsumerImpl<K, V> implements 
ShareConsumerDelegate<K, V> {
             this.metrics = createMetrics(config, time, reporters);
             this.asyncConsumerMetrics = new AsyncConsumerMetrics(metrics, 
CONSUMER_SHARE_METRIC_GROUP);
 
-            this.acknowledgementMode = initializeAcknowledgementMode(config, 
log);
+            this.acknowledgementMode = initializeAcknowledgementMode(config);
             this.deserializers = new Deserializers<>(config, keyDeserializer, 
valueDeserializer, metrics);
             this.currentFetch = ShareFetch.empty();
             this.subscriptions = createSubscriptionState(config, logContext);
@@ -379,7 +378,7 @@ public class ShareConsumerImpl<K, V> implements 
ShareConsumerDelegate<K, V> {
         this.metadata = metadata;
         this.requestTimeoutMs = 
config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
         this.defaultApiTimeoutMs = 
config.getInt(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG);
-        this.acknowledgementMode = initializeAcknowledgementMode(config, log);
+        this.acknowledgementMode = initializeAcknowledgementMode(config);
         this.fetchBuffer = new ShareFetchBuffer(logContext);
         this.completedAcknowledgements = new LinkedList<>();
 
@@ -490,7 +489,7 @@ public class ShareConsumerImpl<K, V> implements 
ShareConsumerDelegate<K, V> {
         this.applicationEventHandler = applicationEventHandler;
         this.kafkaShareConsumerMetrics = new 
KafkaShareConsumerMetrics(metrics);
         this.clientTelemetryReporter = Optional.empty();
-        this.completedAcknowledgements = Collections.emptyList();
+        this.completedAcknowledgements = List.of();
         this.asyncConsumerMetrics = new AsyncConsumerMetrics(metrics, 
CONSUMER_SHARE_METRIC_GROUP);
         this.acknowledgementEventHandler = new 
ShareAcknowledgementEventHandler(acknowledgementEventQueue);
         this.backgroundEventHandler = new BackgroundEventHandler(
@@ -532,7 +531,7 @@ public class ShareConsumerImpl<K, V> implements 
ShareConsumerDelegate<K, V> {
     public Set<String> subscription() {
         acquireAndEnsureOpen();
         try {
-            return Collections.unmodifiableSet(subscriptions.subscription());
+            return Set.copyOf(subscriptions.subscription());
         } finally {
             release();
         }
@@ -675,7 +674,7 @@ public class ShareConsumerImpl<K, V> implements 
ShareConsumerDelegate<K, V> {
             wakeupTrigger.clearTask();
         }
 
-        return collect(Collections.emptyMap());
+        return collect(Map.of());
     }
 
     private ShareFetch<K, V> collect(Map<TopicIdPartition, 
NodeAcknowledgements> acknowledgementsMap) {
@@ -797,7 +796,7 @@ public class ShareConsumerImpl<K, V> implements 
ShareConsumerDelegate<K, V> {
             Timer requestTimer = time.timer(timeout.toMillis());
             Map<TopicIdPartition, NodeAcknowledgements> acknowledgementsMap = 
acknowledgementsToSend();
             if (acknowledgementsMap.isEmpty()) {
-                return Collections.emptyMap();
+                return Map.of();
             } else {
                 ShareAcknowledgeSyncEvent event = new 
ShareAcknowledgeSyncEvent(acknowledgementsMap, 
calculateDeadlineMs(requestTimer));
                 applicationEventHandler.add(event);
@@ -908,7 +907,7 @@ public class ShareConsumerImpl<K, V> implements 
ShareConsumerDelegate<K, V> {
      */
     @Override
     public Map<MetricName, ? extends Metric> metrics() {
-        return Collections.unmodifiableMap(metrics.metrics());
+        return Map.copyOf(metrics.metrics());
     }
 
     /**
@@ -1180,7 +1179,7 @@ public class ShareConsumerImpl<K, V> implements 
ShareConsumerDelegate<K, V> {
     /**
      * Initializes the acknowledgement mode based on the configuration.
      */
-    private static ShareAcknowledgementMode 
initializeAcknowledgementMode(ConsumerConfig config, Logger log) {
+    private static ShareAcknowledgementMode 
initializeAcknowledgementMode(ConsumerConfig config) {
         String s = 
config.getString(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG);
         return ShareAcknowledgementMode.fromString(s);
     }
@@ -1223,8 +1222,6 @@ public class ShareConsumerImpl<K, V> implements 
ShareConsumerDelegate<K, V> {
      * It is possible that {@link ErrorEvent an error} could occur when 
processing the events. In such
      * cases, the processor will take a reference to the first error, continue 
to process the remaining
      * events, and then throw the first error that occurred.
-     *
-     * Visible for testing.
      */
     boolean processBackgroundEvents() {
         AtomicReference<KafkaException> firstError = new AtomicReference<>();
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetch.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetch.java
index 83a43cb69ae..a15231512fc 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetch.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetch.java
@@ -23,7 +23,6 @@ import org.apache.kafka.common.TopicIdPartition;
 import org.apache.kafka.common.TopicPartition;
 
 import java.time.Duration;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
@@ -83,7 +82,7 @@ public class ShareFetch<K, V> {
     public Map<TopicPartition, List<ConsumerRecord<K, V>>> records() {
         final LinkedHashMap<TopicPartition, List<ConsumerRecord<K, V>>> result 
= new LinkedHashMap<>();
         batches.forEach((tip, batch) -> result.put(tip.topicPartition(), 
batch.getInFlightRecords()));
-        return Collections.unmodifiableMap(result);
+        return Map.copyOf(result);
     }
 
     /**
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetchCollector.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetchCollector.java
index 8b4a103e23d..109fe3da48c 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetchCollector.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetchCollector.java
@@ -27,7 +27,7 @@ import org.apache.kafka.common.utils.LogContext;
 
 import org.slf4j.Logger;
 
-import java.util.Collections;
+import java.util.Set;
 
 import static 
org.apache.kafka.clients.consumer.internals.FetchUtils.requestMetadataUpdate;
 
@@ -172,7 +172,7 @@ public class ShareFetchCollector<K, V> {
         } else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) {
             // Log the actual partition and not just the topic to help with 
ACL propagation issues in large clusters
             log.warn("Not authorized to read from partition {}.", 
tp.topicPartition());
-            throw new 
TopicAuthorizationException(Collections.singleton(tp.topic()));
+            throw new TopicAuthorizationException(Set.of(tp.topic()));
         } else if (error == Errors.UNKNOWN_LEADER_EPOCH) {
             log.debug("Received unknown leader epoch error in fetch for 
partition {}.", tp);
         } else if (error == Errors.UNKNOWN_SERVER_ERROR) {
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareSessionHandler.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareSessionHandler.java
index 0b6cdf0a6db..e4e16d1506f 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareSessionHandler.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareSessionHandler.java
@@ -35,13 +35,13 @@ import org.slf4j.Logger;
 
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Set;
 import java.util.stream.Collectors;
 
 /**
@@ -95,7 +95,7 @@ public class ShareSessionHandler {
     }
 
     public Collection<TopicIdPartition> sessionPartitions() {
-        return Collections.unmodifiableCollection(sessionPartitions.values());
+        return Set.copyOf(sessionPartitions.values());
     }
 
     public void addPartitionToFetch(TopicIdPartition topicIdPartition, 
Acknowledgements partitionAcknowledgements) {
@@ -218,9 +218,9 @@ public class ShareSessionHandler {
         }
     }
 
-    public ShareAcknowledgeRequest.Builder newShareAcknowledgeBuilder(String 
groupId, ShareFetchConfig shareFetchConfig) {
+    public ShareAcknowledgeRequest.Builder newShareAcknowledgeBuilder(String 
groupId) {
         if (nextMetadata.isNewSession()) {
-            // A share session cannot be started with a ShareAcknowledge 
request
+            // A share session cannot be started with a ShareAcknowledge 
request. The caller handles completing the acks.
             nextPartitions.clear();
             nextAcknowledgements.clear();
             return null;
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ShareAcknowledgementEvent.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ShareAcknowledgementEvent.java
index ff7f7598f68..f9e47a32d8a 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ShareAcknowledgementEvent.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ShareAcknowledgementEvent.java
@@ -36,7 +36,7 @@ public class ShareAcknowledgementEvent {
     public ShareAcknowledgementEvent(Map<TopicIdPartition, Acknowledgements> 
acknowledgementsMap,
                                      boolean checkForRenewAcknowledgements,
                                      Optional<Integer> 
acquisitionLockTimeoutMs) {
-        this.acknowledgementsMap = acknowledgementsMap;
+        this.acknowledgementsMap = Map.copyOf(acknowledgementsMap);
         this.checkForRenewAcknowledgements = checkForRenewAcknowledgements;
         this.acquisitionLockTimeoutMs = acquisitionLockTimeoutMs;
     }
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaShareConsumerMetricsTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaShareConsumerMetricsTest.java
index 3dbe0a01cbb..502cee9ae62 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaShareConsumerMetricsTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaShareConsumerMetricsTest.java
@@ -49,10 +49,11 @@ import 
org.mockito.internal.stubbing.answers.CallsRealMethods;
 
 import java.time.Duration;
 import java.util.AbstractMap;
-import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -82,10 +83,10 @@ public class KafkaShareConsumerMetricsTest {
     public void testPollTimeMetrics() {
         ShareConsumerMetadata metadata = createMetadata(subscription);
         MockClient client = new MockClient(time, metadata);
-        initMetadata(client, Collections.singletonMap(topic, 1));
+        initMetadata(client, Map.of(topic, 1));
 
         KafkaShareConsumer<String, String> consumer = newShareConsumer(time, 
client, subscription, metadata);
-        consumer.subscribe(Collections.singletonList(topic));
+        consumer.subscribe(Set.of(topic));
         // MetricName objects to check
         Metrics metrics = consumer.metricsRegistry();
         MetricName lastPollSecondsAgoName = 
metrics.metricName("last-poll-seconds-ago", CONSUMER_SHARE_METRIC_GROUP_PREFIX 
+ "-metrics");
@@ -127,7 +128,7 @@ public class KafkaShareConsumerMetricsTest {
     public void testPollIdleRatio() {
         ShareConsumerMetadata metadata = createMetadata(subscription);
         MockClient client = new MockClient(time, metadata);
-        initMetadata(client, Collections.singletonMap(topic, 1));
+        initMetadata(client, Map.of(topic, 1));
 
         KafkaShareConsumer<String, String> consumer = newShareConsumer(time, 
client, subscription, metadata);
         // MetricName object to check
@@ -165,7 +166,7 @@ public class KafkaShareConsumerMetricsTest {
     }
 
     private static boolean consumerMetricPresent(KafkaShareConsumer<String, 
String> consumer, String name) {
-        MetricName metricName = new MetricName(name, 
CONSUMER_SHARE_METRIC_GROUP_PREFIX + "-metrics", "", Collections.emptyMap());
+        MetricName metricName = new MetricName(name, 
CONSUMER_SHARE_METRIC_GROUP_PREFIX + "-metrics", "", Map.of());
         return consumer.metricsRegistry().metrics().containsKey(metricName);
     }
 
@@ -174,10 +175,10 @@ public class KafkaShareConsumerMetricsTest {
         Time time = new MockTime(1L);
         ShareConsumerMetadata metadata = createMetadata(subscription);
         MockClient client = new MockClient(time, metadata);
-        initMetadata(client, Collections.singletonMap(topic, 1));
+        initMetadata(client, Map.of(topic, 1));
 
         KafkaShareConsumer<String, String> consumer = newShareConsumer(time, 
client, subscription, metadata);
-        consumer.subscribe(Collections.singletonList(topic));
+        consumer.subscribe(List.of(topic));
         assertTrue(consumerMetricPresent(consumer, "last-poll-seconds-ago"));
         assertTrue(consumerMetricPresent(consumer, "time-between-poll-avg"));
         assertTrue(consumerMetricPresent(consumer, "time-between-poll-max"));
@@ -192,7 +193,7 @@ public class KafkaShareConsumerMetricsTest {
         Time time = new MockTime(1L);
         ShareConsumerMetadata metadata = createMetadata(subscription);
         MockClient client = new MockClient(time, metadata);
-        initMetadata(client, Collections.singletonMap(topic, 1));
+        initMetadata(client, Map.of(topic, 1));
 
         KafkaShareConsumer<String, String> consumer = newShareConsumer(time, 
client, subscription, metadata);
         Map<MetricName, KafkaMetric> customMetrics = customMetrics();
@@ -209,7 +210,7 @@ public class KafkaShareConsumerMetricsTest {
             Time time = new MockTime(1L);
             ShareConsumerMetadata metadata = createMetadata(subscription);
             MockClient client = new MockClient(time, metadata);
-            initMetadata(client, Collections.singletonMap(topic, 1));
+            initMetadata(client, Map.of(topic, 1));
 
             KafkaShareConsumer<String, String> consumer = 
newShareConsumer(time, client, subscription, metadata);
             KafkaMetric existingMetricToAdd = (KafkaMetric) 
consumer.metrics().entrySet().iterator().next().getValue();
@@ -226,7 +227,7 @@ public class KafkaShareConsumerMetricsTest {
             Time time = new MockTime(1L);
             ShareConsumerMetadata metadata = createMetadata(subscription);
             MockClient client = new MockClient(time, metadata);
-            initMetadata(client, Collections.singletonMap(topic, 1));
+            initMetadata(client, Map.of(topic, 1));
 
             KafkaShareConsumer<String, String> consumer = 
newShareConsumer(time, client, subscription, metadata);
             KafkaMetric existingMetricToRemove = (KafkaMetric) 
consumer.metrics().entrySet().iterator().next().getValue();
@@ -246,7 +247,7 @@ public class KafkaShareConsumerMetricsTest {
             Time time = new MockTime(1L);
             ShareConsumerMetadata metadata = createMetadata(subscription);
             MockClient client = new MockClient(time, metadata);
-            initMetadata(client, Collections.singletonMap(topic, 1));
+            initMetadata(client, Map.of(topic, 1));
 
             KafkaShareConsumer<String, String> consumer = 
newShareConsumer(time, client, subscription, metadata);
 
@@ -267,7 +268,7 @@ public class KafkaShareConsumerMetricsTest {
             Time time = new MockTime(1L);
             ShareConsumerMetadata metadata = createMetadata(subscription);
             MockClient client = new MockClient(time, metadata);
-            initMetadata(client, Collections.singletonMap(topic, 1));
+            initMetadata(client, Map.of(topic, 1));
 
             KafkaShareConsumer<String, String> consumer = 
newShareConsumer(time, client, subscription, metadata);
 
@@ -282,7 +283,7 @@ public class KafkaShareConsumerMetricsTest {
         Time time = new MockTime(1L);
         ShareConsumerMetadata metadata = createMetadata(subscription);
         MockClient client = new MockClient(time, metadata);
-        initMetadata(client, Collections.singletonMap(topic, 1));
+        initMetadata(client, Map.of(topic, 1));
 
         KafkaShareConsumer<String, String> consumer = newShareConsumer(time, 
client, subscription, metadata);
 
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/MockShareConsumerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/MockShareConsumerTest.java
index 458da8ccc94..701547a5c1b 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/MockShareConsumerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/MockShareConsumerTest.java
@@ -22,9 +22,9 @@ import org.apache.kafka.common.record.TimestampType;
 import org.junit.jupiter.api.Test;
 
 import java.time.Duration;
-import java.util.Collections;
 import java.util.Iterator;
 import java.util.Optional;
+import java.util.Set;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -34,7 +34,7 @@ public class MockShareConsumerTest {
 
     @Test
     public void testSimpleMock() {
-        consumer.subscribe(Collections.singleton("test"));
+        consumer.subscribe(Set.of("test"));
         assertEquals(0, consumer.poll(Duration.ZERO).count());
         ConsumerRecord<String, String> rec1 = new ConsumerRecord<>("test", 0, 
0, 0L, TimestampType.CREATE_TIME,
                 0, 0, "key1", "value1", new RecordHeaders(), Optional.empty());
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AcknowledgementCommitCallbackHandlerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AcknowledgementCommitCallbackHandlerTest.java
index c6e10040d32..d06e495dde3 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AcknowledgementCommitCallbackHandlerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AcknowledgementCommitCallbackHandlerTest.java
@@ -29,7 +29,6 @@ import org.apache.kafka.test.TestUtils;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.LinkedList;
@@ -72,7 +71,7 @@ class AcknowledgementCommitCallbackHandlerTest {
         acknowledgements.add(1L, AcknowledgeType.REJECT);
         acknowledgementsMap.put(tip0, acknowledgements);
 
-        
acknowledgementCommitCallbackHandler.onComplete(Collections.singletonList(acknowledgementsMap));
+        
acknowledgementCommitCallbackHandler.onComplete(List.of(acknowledgementsMap));
 
         TestUtils.retryOnExceptionWithTimeout(() -> {
             assertNull(exceptionMap.get(tpo00));
@@ -88,7 +87,7 @@ class AcknowledgementCommitCallbackHandlerTest {
         acknowledgements.complete(Errors.INVALID_RECORD_STATE.exception());
         acknowledgementsMap.put(tip0, acknowledgements);
 
-        
acknowledgementCommitCallbackHandler.onComplete(Collections.singletonList(acknowledgementsMap));
+        
acknowledgementCommitCallbackHandler.onComplete(List.of(acknowledgementsMap));
         TestUtils.retryOnExceptionWithTimeout(() -> {
             assertInstanceOf(InvalidRecordStateException.class, 
exceptionMap.get(tpo00));
             assertInstanceOf(InvalidRecordStateException.class, 
exceptionMap.get(tpo01));
@@ -104,7 +103,7 @@ class AcknowledgementCommitCallbackHandlerTest {
         
acknowledgements.complete(Errors.TOPIC_AUTHORIZATION_FAILED.exception());
         acknowledgementsMap.put(tip0, acknowledgements);
 
-        
acknowledgementCommitCallbackHandler.onComplete(Collections.singletonList(acknowledgementsMap));
+        
acknowledgementCommitCallbackHandler.onComplete(List.of(acknowledgementsMap));
         TestUtils.retryOnExceptionWithTimeout(() -> {
             assertInstanceOf(TopicAuthorizationException.class, 
exceptionMap.get(tpo00));
             assertInstanceOf(TopicAuthorizationException.class, 
exceptionMap.get(tpo01));
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
index fe249d72895..f3b2063c136 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
@@ -90,7 +90,6 @@ import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -109,8 +108,6 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
-import static java.util.Collections.singleton;
-import static java.util.Collections.singletonList;
 import static 
org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
 import static 
org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
 import static 
org.apache.kafka.clients.consumer.internals.events.CompletableEvent.calculateDeadlineMs;
@@ -193,7 +190,7 @@ public class ShareConsumeRequestManagerTest {
 
         // A dummy metadata update to ensure valid leader epoch.
         
metadata.updateWithCurrentRequestVersion(RequestTestUtils.metadataUpdateWithIds("kafka-cluster",
 1,
-                Collections.emptyMap(), topicPartitionCounts,
+                Map.of(), topicPartitionCounts,
                 tp -> validLeaderEpoch, topicIds), false, 0L);
     }
 
@@ -213,7 +210,7 @@ public class ShareConsumeRequestManagerTest {
     public void testFetchNormal() {
         buildRequestManager();
 
-        assignFromSubscribed(Collections.singleton(tp0));
+        assignFromSubscribed(Set.of(tp0));
         sendFetchAndVerifyResponse(records, acquiredRecords, Errors.NONE);
 
         Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> 
partitionRecords = fetchRecords();
@@ -227,7 +224,7 @@ public class ShareConsumeRequestManagerTest {
     public void testFetchWithAcquiredRecords() {
         buildRequestManager();
 
-        assignFromSubscribed(Collections.singleton(tp0));
+        assignFromSubscribed(Set.of(tp0));
         sendFetchAndVerifyResponse(records, 
ShareCompletedFetchTest.acquiredRecords(1L, 1), Errors.NONE);
 
         Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> 
partitionRecords = fetchRecords();
@@ -243,7 +240,7 @@ public class ShareConsumeRequestManagerTest {
         buildRequestManager();
         // Enabling the config so that background event is sent when the 
acknowledgement response is received.
         
shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
-        assignFromSubscribed(Collections.singleton(tp0));
+        assignFromSubscribed(Set.of(tp0));
 
         sendFetchAndVerifyResponse(records, 
ShareCompletedFetchTest.acquiredRecords(1L, 1), Errors.NONE);
 
@@ -272,7 +269,7 @@ public class ShareConsumeRequestManagerTest {
         shareConsumeRequestManager.fetch(Map.of(tip0, new 
NodeAcknowledgements(0, acknowledgements2)));
 
         // Preparing a response with an acknowledgement error.
-        sendFetchAndVerifyResponse(records, Collections.emptyList(), 
Errors.NONE, Errors.INVALID_RECORD_STATE);
+        sendFetchAndVerifyResponse(records, List.of(), Errors.NONE, 
Errors.INVALID_RECORD_STATE);
 
         assertEquals(2.0,
                 
metrics.metrics().get(metrics.metricInstance(shareFetchMetricsRegistry.acknowledgementSendTotal)).metricValue());
@@ -290,7 +287,7 @@ public class ShareConsumeRequestManagerTest {
         // Enabling the config so that background event is sent when the 
acknowledgement response is received.
         
shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
 
-        assignFromSubscribed(Collections.singleton(tp0));
+        assignFromSubscribed(Set.of(tp0));
         sendFetchAndVerifyResponse(records, acquiredRecords, Errors.NONE);
 
         Acknowledgements acknowledgements = getAcknowledgements(1, 
AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT, AcknowledgeType.REJECT);
@@ -313,7 +310,7 @@ public class ShareConsumeRequestManagerTest {
         // Enabling the config so that background event is sent when the 
acknowledgement response is received.
         
shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
 
-        assignFromSubscribed(Collections.singleton(tp0));
+        assignFromSubscribed(Set.of(tp0));
         sendFetchAndVerifyResponse(records, acquiredRecords, Errors.NONE);
 
         Acknowledgements acknowledgements = getAcknowledgements(1, 
AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT, AcknowledgeType.REJECT);
@@ -336,7 +333,7 @@ public class ShareConsumeRequestManagerTest {
         // Enabling the config so that background event is sent when the 
acknowledgement response is received.
         
shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
 
-        assignFromSubscribed(Collections.singleton(tp0));
+        assignFromSubscribed(Set.of(tp0));
         sendFetchAndVerifyResponse(records, acquiredRecords, Errors.NONE);
 
         fetchRecords();
@@ -389,7 +386,7 @@ public class ShareConsumeRequestManagerTest {
         // Enabling the config so that background event is sent when the 
acknowledgement response is received.
         
shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
 
-        assignFromSubscribed(Collections.singleton(tp0));
+        assignFromSubscribed(Set.of(tp0));
         sendFetchAndVerifyResponse(records, acquiredRecords, Errors.NONE);
 
         Acknowledgements acknowledgements = Acknowledgements.empty();
@@ -424,7 +421,7 @@ public class ShareConsumeRequestManagerTest {
     @Test
     public void testCloseFutureCompletedWhenMemberIdIsNull() {
         buildRequestManager(new MetricConfig(), new ByteArrayDeserializer(), 
new ByteArrayDeserializer(), null, ShareAcquireMode.BATCH_OPTIMIZED);
-        assignFromSubscribed(Collections.singleton(tp0));
+        assignFromSubscribed(Set.of(tp0));
 
         CompletableFuture<Void> closeFuture = 
shareConsumeRequestManager.acknowledgeOnClose(Map.of(),
                 calculateDeadlineMs(time.timer(100)));
@@ -442,14 +439,14 @@ public class ShareConsumeRequestManagerTest {
         // Enabling the config so that background event is sent when the 
acknowledgement response is received.
         
shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
 
-        assignFromSubscribed(Collections.singleton(tp0));
+        assignFromSubscribed(Set.of(tp0));
         sendFetchAndVerifyResponse(records, acquiredRecords, Errors.NONE);
 
         Acknowledgements acknowledgements = getAcknowledgements(1, 
AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT, AcknowledgeType.REJECT);
 
         shareConsumeRequestManager.commitAsync(Map.of(tip0, new 
NodeAcknowledgements(0, acknowledgements)),
                 calculateDeadlineMs(time.timer(defaultApiTimeoutMs)));
-        shareConsumeRequestManager.acknowledgeOnClose(Collections.emptyMap(),
+        shareConsumeRequestManager.acknowledgeOnClose(Map.of(),
                 calculateDeadlineMs(time.timer(100)));
 
         assertEquals(1, shareConsumeRequestManager.sendAcknowledgements());
@@ -470,14 +467,14 @@ public class ShareConsumeRequestManagerTest {
         // Enabling the config so that background event is sent when the 
acknowledgement response is received.
         
shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
 
-        assignFromSubscribed(Collections.singleton(tp0));
+        assignFromSubscribed(Set.of(tp0));
         sendFetchAndVerifyResponse(records, acquiredRecords, Errors.NONE);
 
         Acknowledgements acknowledgements = getAcknowledgements(1, 
AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT, AcknowledgeType.REJECT);
 
         shareConsumeRequestManager.commitSync(Map.of(tip0, new 
NodeAcknowledgements(0, acknowledgements)),
                 calculateDeadlineMs(time.timer(100)));
-        shareConsumeRequestManager.acknowledgeOnClose(Collections.emptyMap(),
+        shareConsumeRequestManager.acknowledgeOnClose(Map.of(),
                 calculateDeadlineMs(time.timer(100)));
 
         assertEquals(1, shareConsumeRequestManager.sendAcknowledgements());
@@ -573,7 +570,7 @@ public class ShareConsumeRequestManagerTest {
     public void testBatchingAcknowledgeRequestStates() {
         buildRequestManager();
 
-        assignFromSubscribed(Collections.singleton(tp0));
+        assignFromSubscribed(Set.of(tp0));
         sendFetchAndVerifyResponse(buildRecords(1L, 6, 1),
                 ShareCompletedFetchTest.acquiredRecords(1L, 6), Errors.NONE);
 
@@ -605,7 +602,7 @@ public class ShareConsumeRequestManagerTest {
     public void testPendingCommitAsyncBeforeCommitSync() {
         buildRequestManager();
 
-        assignFromSubscribed(Collections.singleton(tp0));
+        assignFromSubscribed(Set.of(tp0));
         sendFetchAndVerifyResponse(buildRecords(1L, 6, 1),
                 ShareCompletedFetchTest.acquiredRecords(1L, 6), Errors.NONE);
 
@@ -649,7 +646,7 @@ public class ShareConsumeRequestManagerTest {
     public void testRetryAcknowledgements() throws InterruptedException {
         buildRequestManager();
 
-        assignFromSubscribed(Collections.singleton(tp0));
+        assignFromSubscribed(Set.of(tp0));
         sendFetchAndVerifyResponse(buildRecords(1L, 6, 1),
                 ShareCompletedFetchTest.acquiredRecords(1L, 6), Errors.NONE);
 
@@ -691,7 +688,7 @@ public class ShareConsumeRequestManagerTest {
         buildRequestManager();
         
shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
 
-        assignFromSubscribed(Collections.singleton(tp0));
+        assignFromSubscribed(Set.of(tp0));
         sendFetchAndVerifyResponse(records, acquiredRecords, Errors.NONE);
 
         Acknowledgements acknowledgements = getAcknowledgements(1, 
AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT, AcknowledgeType.REJECT);
@@ -715,7 +712,7 @@ public class ShareConsumeRequestManagerTest {
         buildRequestManager();
         
shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
 
-        assignFromSubscribed(Collections.singleton(tp0));
+        assignFromSubscribed(Set.of(tp0));
         sendFetchAndVerifyResponse(buildRecords(1L, 6, 1),
                 ShareCompletedFetchTest.acquiredRecords(1L, 6), Errors.NONE);
 
@@ -771,7 +768,7 @@ public class ShareConsumeRequestManagerTest {
         buildRequestManager();
         
shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
 
-        assignFromSubscribed(Collections.singleton(tp0));
+        assignFromSubscribed(Set.of(tp0));
         sendFetchAndVerifyResponse(buildRecords(1L, 6, 1),
                 ShareCompletedFetchTest.acquiredRecords(1L, 6), Errors.NONE);
 
@@ -814,7 +811,7 @@ public class ShareConsumeRequestManagerTest {
     public void testPiggybackAcknowledgementsInFlight() {
         buildRequestManager();
 
-        assignFromSubscribed(Collections.singleton(tp0));
+        assignFromSubscribed(Set.of(tp0));
         sendFetchAndVerifyResponse(records, acquiredRecords, Errors.NONE);
 
         Acknowledgements acknowledgements = getAcknowledgements(1,
@@ -853,7 +850,7 @@ public class ShareConsumeRequestManagerTest {
         buildRequestManager();
         
shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
 
-        assignFromSubscribed(Collections.singleton(tp0));
+        assignFromSubscribed(Set.of(tp0));
         sendFetchAndVerifyResponse(records, acquiredRecords, Errors.NONE);
 
         fetchRecords();
@@ -898,13 +895,13 @@ public class ShareConsumeRequestManagerTest {
         buildRequestManager();
         
shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
 
-        assignFromSubscribed(singleton(tp0));
+        assignFromSubscribed(Set.of(tp0));
         sendFetchAndVerifyResponse(records, acquiredRecords, Errors.NONE);
 
         Acknowledgements acknowledgements = getAcknowledgements(1, 
AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT, AcknowledgeType.REJECT);
 
-        subscriptions.subscribeToShareGroup(Collections.singleton(topicName2));
-        subscriptions.assignFromSubscribed(Collections.singleton(t2p0));
+        subscriptions.subscribeToShareGroup(Set.of(topicName2));
+        subscriptions.assignFromSubscribed(Set.of(t2p0));
 
         client.updateMetadata(
                 RequestTestUtils.metadataUpdateWithIds(1, Map.of(topicName2, 
1),
@@ -934,13 +931,13 @@ public class ShareConsumeRequestManagerTest {
         buildRequestManager();
         
shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
 
-        assignFromSubscribed(singleton(tp0));
+        assignFromSubscribed(Set.of(tp0));
         sendFetchAndVerifyResponse(records, acquiredRecords, Errors.NONE);
 
         Acknowledgements acknowledgements = getAcknowledgements(1, 
AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT, AcknowledgeType.REJECT);
 
-        subscriptions.subscribeToShareGroup(Collections.singleton(topicName2));
-        subscriptions.assignFromSubscribed(Collections.singleton(t2p0));
+        subscriptions.subscribeToShareGroup(Set.of(topicName2));
+        subscriptions.assignFromSubscribed(Set.of(t2p0));
 
         client.updateMetadata(
                 RequestTestUtils.metadataUpdateWithIds(1, Map.of(topicName2, 
1),
@@ -970,13 +967,13 @@ public class ShareConsumeRequestManagerTest {
         buildRequestManager();
         
shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
 
-        assignFromSubscribed(singleton(tp0));
+        assignFromSubscribed(Set.of(tp0));
         sendFetchAndVerifyResponse(records, acquiredRecords, Errors.NONE);
 
         Acknowledgements acknowledgements = getAcknowledgements(1, 
AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT, AcknowledgeType.REJECT);
 
-        subscriptions.subscribeToShareGroup(Collections.singleton(topicName2));
-        subscriptions.assignFromSubscribed(Collections.singleton(t2p0));
+        subscriptions.subscribeToShareGroup(Set.of(topicName2));
+        subscriptions.assignFromSubscribed(Set.of(t2p0));
 
         client.updateMetadata(
                 RequestTestUtils.metadataUpdateWithIds(1, Map.of(topicName2, 
1),
@@ -1001,7 +998,7 @@ public class ShareConsumeRequestManagerTest {
     public void testShareFetchWithSubscriptionChange() {
         buildRequestManager();
 
-        assignFromSubscribed(singleton(tp0));
+        assignFromSubscribed(Set.of(tp0));
         sendFetchAndVerifyResponse(records, acquiredRecords, Errors.NONE);
 
         Acknowledgements acknowledgements = getAcknowledgements(1, 
AcknowledgeType.ACCEPT, AcknowledgeType.RELEASE, AcknowledgeType.ACCEPT);
@@ -1011,8 +1008,8 @@ public class ShareConsumeRequestManagerTest {
         fetchRecords();
 
         // Subscription changes.
-        subscriptions.subscribeToShareGroup(Collections.singleton(topicName2));
-        subscriptions.assignFromSubscribed(Collections.singleton(t2p0));
+        subscriptions.subscribeToShareGroup(Set.of(topicName2));
+        subscriptions.assignFromSubscribed(Set.of(t2p0));
 
         client.updateMetadata(
                 RequestTestUtils.metadataUpdateWithIds(1, Map.of(topicName2, 
1),
@@ -1028,8 +1025,8 @@ public class ShareConsumeRequestManagerTest {
     public void testShareFetchWithSubscriptionChangeMultipleNodes() {
         buildRequestManager();
 
-        subscriptions.subscribeToShareGroup(Collections.singleton(topicName));
-        subscriptions.assignFromSubscribed(Collections.singletonList(tp0));
+        subscriptions.subscribeToShareGroup(Set.of(topicName));
+        subscriptions.assignFromSubscribed(List.of(tp0));
 
         client.updateMetadata(
                 RequestTestUtils.metadataUpdateWithIds(2, Map.of(topicName, 2),
@@ -1051,7 +1048,7 @@ public class ShareConsumeRequestManagerTest {
         fetchRecords();
 
         // Subscription changes.
-        subscriptions.assignFromSubscribed(Collections.singletonList(tp1));
+        subscriptions.assignFromSubscribed(List.of(tp1));
 
         NetworkClientDelegate.PollResult pollResult = 
shareConsumeRequestManager.sendFetchesReturnPollResult();
         assertEquals(2, pollResult.unsentRequests.size());
@@ -1096,8 +1093,8 @@ public class ShareConsumeRequestManagerTest {
     public void 
testShareFetchWithSubscriptionChangeMultipleNodesEmptyAcknowledgements() {
         buildRequestManager();
 
-        subscriptions.subscribeToShareGroup(Collections.singleton(topicName));
-        subscriptions.assignFromSubscribed(Collections.singletonList(tp0));
+        subscriptions.subscribeToShareGroup(Set.of(topicName));
+        subscriptions.assignFromSubscribed(List.of(tp0));
 
         client.updateMetadata(
                 RequestTestUtils.metadataUpdateWithIds(2, Map.of(topicName, 2),
@@ -1116,7 +1113,7 @@ public class ShareConsumeRequestManagerTest {
         fetchRecords();
 
         // Change the subscription.
-        subscriptions.assignFromSubscribed(Collections.singletonList(tp1));
+        subscriptions.assignFromSubscribed(List.of(tp1));
 
         // Now we will be sending the request to node1 only as leader for tip1 
is node1.
         // We do not build the request for tip0 as there are no 
acknowledgements to send.
@@ -1139,7 +1136,7 @@ public class ShareConsumeRequestManagerTest {
         buildRequestManager();
         
shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
 
-        subscriptions.subscribeToShareGroup(Collections.singleton(topicName));
+        subscriptions.subscribeToShareGroup(Set.of(topicName));
         subscriptions.assignFromSubscribed(List.of(tp0, tp1));
 
         client.updateMetadata(
@@ -1180,7 +1177,7 @@ public class ShareConsumeRequestManagerTest {
     public void testRetryAcknowledgementsWithLeaderChange() {
         buildRequestManager();
 
-        subscriptions.subscribeToShareGroup(Collections.singleton(topicName));
+        subscriptions.subscribeToShareGroup(Set.of(topicName));
         Set<TopicPartition> partitions = new HashSet<>();
         partitions.add(tp0);
         subscriptions.assignFromSubscribed(partitions);
@@ -1224,7 +1221,7 @@ public class ShareConsumeRequestManagerTest {
         buildRequestManager();
         
shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
 
-        assignFromSubscribed(Collections.singleton(tp0));
+        assignFromSubscribed(Set.of(tp0));
 
         sendFetchAndVerifyResponse(records, acquiredRecords, Errors.NONE);
 
@@ -1282,7 +1279,7 @@ public class ShareConsumeRequestManagerTest {
         LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> 
partitionDataMap =
                 buildPartitionDataMap(tip0, records, acquiredRecords, 
Errors.NONE, Errors.NONE);
         partitionDataMap.put(t2ip0, partitionDataForFetch(t2ip0, records, 
acquiredRecords, Errors.NONE, Errors.NONE));
-        client.prepareResponse(ShareFetchResponse.of(Errors.NONE, 0, 
partitionDataMap, Collections.emptyList(), 0));
+        client.prepareResponse(ShareFetchResponse.of(Errors.NONE, 0, 
partitionDataMap, List.of(), 0));
 
         networkClientDelegate.poll(time.timer(0));
         assertTrue(shareConsumeRequestManager.hasCompletedFetches());
@@ -1328,7 +1325,7 @@ public class ShareConsumeRequestManagerTest {
         LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> 
partitionDataMap =
                 buildPartitionDataMap(tip0, records, acquiredRecords, 
Errors.NONE, Errors.NONE);
         partitionDataMap.put(t2ip0, partitionDataForFetch(t2ip0, records, 
emptyAcquiredRecords, Errors.TOPIC_AUTHORIZATION_FAILED, Errors.NONE));
-        client.prepareResponse(ShareFetchResponse.of(Errors.NONE, 0, 
partitionDataMap, Collections.emptyList(), 0));
+        client.prepareResponse(ShareFetchResponse.of(Errors.NONE, 0, 
partitionDataMap, List.of(), 0));
 
         networkClientDelegate.poll(time.timer(0));
         assertTrue(shareConsumeRequestManager.hasCompletedFetches());
@@ -1357,7 +1354,7 @@ public class ShareConsumeRequestManagerTest {
         LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> 
partitionDataMap =
                 buildPartitionDataMap(t2ip0, records, emptyAcquiredRecords, 
Errors.TOPIC_AUTHORIZATION_FAILED, Errors.NONE);
         partitionDataMap.put(tip0, partitionDataForFetch(tip0, records, 
acquiredRecords, Errors.NONE, Errors.NONE));
-        client.prepareResponse(ShareFetchResponse.of(Errors.NONE, 0, 
partitionDataMap, Collections.emptyList(), 0));
+        client.prepareResponse(ShareFetchResponse.of(Errors.NONE, 0, 
partitionDataMap, List.of(), 0));
 
         networkClientDelegate.poll(time.timer(0));
         assertTrue(shareConsumeRequestManager.hasCompletedFetches());
@@ -1379,8 +1376,8 @@ public class ShareConsumeRequestManagerTest {
         buildRequestManager();
         
shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
 
-        subscriptions.subscribeToShareGroup(Collections.singleton(topicName));
-        subscriptions.assignFromSubscribed(Collections.singleton(tp0));
+        subscriptions.subscribeToShareGroup(Set.of(topicName));
+        subscriptions.assignFromSubscribed(Set.of(tp0));
 
         client.updateMetadata(
                 RequestTestUtils.metadataUpdateWithIds(1, Map.of(topicName, 1),
@@ -1399,8 +1396,8 @@ public class ShareConsumeRequestManagerTest {
         buildRequestManager();
         
shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
 
-        subscriptions.subscribeToShareGroup(Collections.singleton(topicName));
-        subscriptions.assignFromSubscribed(Collections.singleton(tp0));
+        subscriptions.subscribeToShareGroup(Set.of(topicName));
+        subscriptions.assignFromSubscribed(Set.of(tp0));
 
         client.updateMetadata(
                 RequestTestUtils.metadataUpdateWithIds(1, Map.of(topicName, 1),
@@ -1467,7 +1464,7 @@ public class ShareConsumeRequestManagerTest {
     public void testFetchError() {
         buildRequestManager();
 
-        assignFromSubscribed(singleton(tp0));
+        assignFromSubscribed(Set.of(tp0));
         sendFetchAndVerifyResponse(records, emptyAcquiredRecords, 
Errors.NOT_LEADER_OR_FOLLOWER);
 
         Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> 
partitionRecords = fetchRecords();
@@ -1479,7 +1476,7 @@ public class ShareConsumeRequestManagerTest {
         buildRequestManager();
         
shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
 
-        assignFromSubscribed(singleton(tp0));
+        assignFromSubscribed(Set.of(tp0));
 
         Acknowledgements acknowledgements = getAcknowledgements(1, 
AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT, AcknowledgeType.REJECT);
 
@@ -1501,7 +1498,7 @@ public class ShareConsumeRequestManagerTest {
         buildRequestManager();
         
shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
 
-        assignFromSubscribed(singleton(tp0));
+        assignFromSubscribed(Set.of(tp0));
         sendFetchAndVerifyResponse(records, acquiredRecords, Errors.NONE);
 
         fetchRecords();
@@ -1514,7 +1511,7 @@ public class ShareConsumeRequestManagerTest {
 
         // Simulate a metadata update with no topics in the response.
         client.updateMetadata(
-                RequestTestUtils.metadataUpdateWithIds(1, 
Collections.emptyMap(),
+                RequestTestUtils.metadataUpdateWithIds(1, Map.of(),
                         tp -> validLeaderEpoch, null, false));
 
         // The acknowledgements for the initial fetch from tip0 are processed 
now and sent to the background thread.
@@ -1537,7 +1534,7 @@ public class ShareConsumeRequestManagerTest {
         buildRequestManager();
         
shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
 
-        assignFromSubscribed(singleton(tp0));
+        assignFromSubscribed(Set.of(tp0));
         sendFetchAndVerifyResponse(records, acquiredRecords, Errors.NONE);
 
         fetchRecords();
@@ -1586,7 +1583,7 @@ public class ShareConsumeRequestManagerTest {
         buffer.put("beef".getBytes());
         buffer.position(0);
 
-        assignFromSubscribed(singleton(tp0));
+        assignFromSubscribed(Set.of(tp0));
 
         // normal fetch
         assertEquals(1, sendFetches());
@@ -1617,7 +1614,7 @@ public class ShareConsumeRequestManagerTest {
         // flip some bits to fail the crc
         buffer.putInt(32, buffer.get(32) ^ 87238423);
 
-        assignFromSubscribed(singleton(tp0));
+        assignFromSubscribed(Set.of(tp0));
 
         // normal fetch
         assertEquals(1, sendFetches());
@@ -1649,7 +1646,7 @@ public class ShareConsumeRequestManagerTest {
         MemoryRecords memoryRecords = builder.build();
 
         List<ConsumerRecord<byte[], byte[]>> records;
-        assignFromSubscribed(singleton(tp0));
+        assignFromSubscribed(Set.of(tp0));
 
         client.prepareResponse(fullFetchResponse(tip0,
                 memoryRecords,
@@ -1681,7 +1678,7 @@ public class ShareConsumeRequestManagerTest {
     public void testUnauthorizedTopic() {
         buildRequestManager();
 
-        assignFromSubscribed(singleton(tp0));
+        assignFromSubscribed(Set.of(tp0));
 
         assertEquals(1, sendFetches());
         client.prepareResponse(fullFetchResponse(tip0, records, 
emptyAcquiredRecords, Errors.TOPIC_AUTHORIZATION_FAILED));
@@ -1690,14 +1687,14 @@ public class ShareConsumeRequestManagerTest {
             collectFetch();
             fail("collectFetch should have thrown a 
TopicAuthorizationException");
         } catch (TopicAuthorizationException e) {
-            assertEquals(singleton(topicName), e.unauthorizedTopics());
+            assertEquals(Set.of(topicName), e.unauthorizedTopics());
         }
     }
 
     @Test
     public void testUnknownTopicIdError() {
         buildRequestManager();
-        assignFromSubscribed(singleton(tp0));
+        assignFromSubscribed(Set.of(tp0));
 
         assertEquals(1, sendFetches());
         client.prepareResponse(fetchResponseWithTopLevelError(tip0, 
Errors.UNKNOWN_TOPIC_ID));
@@ -1712,7 +1709,7 @@ public class ShareConsumeRequestManagerTest {
                                              boolean hasTopLevelError,
                                              boolean 
shouldRequestMetadataUpdate) {
         buildRequestManager();
-        assignFromSubscribed(singleton(tp0));
+        assignFromSubscribed(Set.of(tp0));
 
         assertEquals(1, sendFetches());
 
@@ -1754,7 +1751,7 @@ public class ShareConsumeRequestManagerTest {
     public void testFetchDisconnected() {
         buildRequestManager();
 
-        assignFromSubscribed(singleton(tp0));
+        assignFromSubscribed(Set.of(tp0));
 
         assertEquals(1, sendFetches());
         client.prepareResponse(fullFetchResponse(tip0, records, 
acquiredRecords, Errors.NONE), true);
@@ -1787,7 +1784,7 @@ public class ShareConsumeRequestManagerTest {
         result.outputBuffer().flip();
         MemoryRecords compactedRecords = 
MemoryRecords.readableRecords(result.outputBuffer());
 
-        assignFromSubscribed(singleton(tp0));
+        assignFromSubscribed(Set.of(tp0));
         assertEquals(1, sendFetches());
         client.prepareResponse(fullFetchResponse(tip0,
                 compactedRecords,
@@ -1816,7 +1813,7 @@ public class ShareConsumeRequestManagerTest {
     @Test
     public void testCorruptMessageError() {
         buildRequestManager();
-        assignFromSubscribed(singleton(tp0));
+        assignFromSubscribed(Set.of(tp0));
 
         assertEquals(1, sendFetches());
         assertFalse(shareConsumeRequestManager.hasCompletedFetches());
@@ -1843,7 +1840,7 @@ public class ShareConsumeRequestManagerTest {
     public void 
testWhenShareFetchResponseReturnsALeadershipChangeErrorButNoNewLeaderInformation(Errors
 error) {
         buildRequestManager();
 
-        subscriptions.subscribeToShareGroup(Collections.singleton(topicName));
+        subscriptions.subscribeToShareGroup(Set.of(topicName));
         Set<TopicPartition> partitions = new HashSet<>();
         partitions.add(tp0);
         partitions.add(tp1);
@@ -1865,13 +1862,13 @@ public class ShareConsumeRequestManagerTest {
 
         LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> 
partitionData =
                 buildPartitionDataMap(tip0, records, 
ShareCompletedFetchTest.acquiredRecords(1L, 1), Errors.NONE, Errors.NONE);
-        client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, 
partitionData, Collections.emptyList(), 0), nodeId0);
+        client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, 
partitionData, List.of(), 0), nodeId0);
         partitionData.clear();
         partitionData.put(tip1,
             new ShareFetchResponseData.PartitionData()
                 .setPartitionIndex(tip1.topicPartition().partition())
                 .setErrorCode(error.code()));
-        client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, 
partitionData, Collections.emptyList(), 0), nodeId1);
+        client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, 
partitionData, List.of(), 0), nodeId1);
         networkClientDelegate.poll(time.timer(0));
         assertTrue(shareConsumeRequestManager.hasCompletedFetches());
 
@@ -1908,7 +1905,7 @@ public class ShareConsumeRequestManagerTest {
                 .setRecords(records)
                 
.setAcquiredRecords(ShareCompletedFetchTest.acquiredRecords(1L, 1))
                 .setAcknowledgeErrorCode(Errors.NONE.code()));
-        client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, 
partitionData, Collections.emptyList(), 0), nodeId0);
+        client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, 
partitionData, List.of(), 0), nodeId0);
         networkClientDelegate.poll(time.timer(0));
         assertTrue(shareConsumeRequestManager.hasCompletedFetches());
 
@@ -1931,7 +1928,7 @@ public class ShareConsumeRequestManagerTest {
     public void 
testWhenFetchResponseReturnsWithALeadershipChangeErrorAndNewLeaderInformation(Errors
 error) {
         buildRequestManager();
 
-        subscriptions.subscribeToShareGroup(Collections.singleton(topicName));
+        subscriptions.subscribeToShareGroup(Set.of(topicName));
         Set<TopicPartition> partitions = new HashSet<>();
         partitions.add(tp0);
         partitions.add(tp1);
@@ -1952,7 +1949,7 @@ public class ShareConsumeRequestManagerTest {
 
         LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> 
partitionData =
                 buildPartitionDataMap(tip0, records, 
ShareCompletedFetchTest.acquiredRecords(1L, 1), Errors.NONE, Errors.NONE);
-        client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, 
partitionData, Collections.emptyList(), 0), nodeId0);
+        client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, 
partitionData, List.of(), 0), nodeId0);
         partitionData = new LinkedHashMap<>();
         partitionData.put(tip1,
             new ShareFetchResponseData.PartitionData()
@@ -1961,7 +1958,7 @@ public class ShareConsumeRequestManagerTest {
                 .setCurrentLeader(new ShareFetchResponseData.LeaderIdAndEpoch()
                     .setLeaderId(tp0Leader.id())
                     .setLeaderEpoch(validLeaderEpoch + 1)));
-        client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, 
partitionData, singletonList(tp0Leader), 0), nodeId1);
+        client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, 
partitionData, List.of(tp0Leader), 0), nodeId1);
         networkClientDelegate.poll(time.timer(0));
         assertTrue(shareConsumeRequestManager.hasCompletedFetches());
 
@@ -1993,7 +1990,7 @@ public class ShareConsumeRequestManagerTest {
                 .setRecords(records)
                 
.setAcquiredRecords(ShareCompletedFetchTest.acquiredRecords(1L, 1))
                 .setAcknowledgeErrorCode(Errors.NONE.code()));
-        client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, 
partitionData, Collections.emptyList(), 0), nodeId0);
+        client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, 
partitionData, List.of(), 0), nodeId0);
         networkClientDelegate.poll(time.timer(0));
         assertTrue(shareConsumeRequestManager.hasCompletedFetches());
 
@@ -2017,7 +2014,7 @@ public class ShareConsumeRequestManagerTest {
         buildRequestManager();
         
shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
 
-        subscriptions.subscribeToShareGroup(Collections.singleton(topicName));
+        subscriptions.subscribeToShareGroup(Set.of(topicName));
         Set<TopicPartition> partitions = new HashSet<>();
         partitions.add(tp0);
         partitions.add(tp1);
@@ -2037,13 +2034,13 @@ public class ShareConsumeRequestManagerTest {
 
         LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> 
partitionData =
                 buildPartitionDataMap(tip0, records, 
ShareCompletedFetchTest.acquiredRecords(1L, 1), Errors.NONE, Errors.NONE);
-        client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, 
partitionData, Collections.emptyList(), 0), nodeId0);
+        client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, 
partitionData, List.of(), 0), nodeId0);
         partitionData = new LinkedHashMap<>();
         partitionData.put(tip1,
             new ShareFetchResponseData.PartitionData()
                 .setPartitionIndex(tip1.topicPartition().partition())
                 .setErrorCode(Errors.NONE.code()));
-        client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, 
partitionData, Collections.emptyList(), 0), nodeId1);
+        client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, 
partitionData, List.of(), 0), nodeId1);
         networkClientDelegate.poll(time.timer(0));
         assertTrue(shareConsumeRequestManager.hasCompletedFetches());
 
@@ -2078,7 +2075,7 @@ public class ShareConsumeRequestManagerTest {
                 .setPartitionIndex(tip0.topicPartition().partition())
                 .setErrorCode(Errors.NONE.code())
                 .setAcknowledgeErrorCode(error.code()));
-        client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, 
partitionData, Collections.emptyList(), 0), nodeId0);
+        client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, 
partitionData, List.of(), 0), nodeId0);
         partitionData = buildPartitionDataMap(tip0, records, 
ShareCompletedFetchTest.acquiredRecords(1L, 1), Errors.NONE, Errors.NONE);
         partitionData.put(tip1,
             new ShareFetchResponseData.PartitionData()
@@ -2086,7 +2083,7 @@ public class ShareConsumeRequestManagerTest {
                 .setRecords(records)
                 
.setAcquiredRecords(ShareCompletedFetchTest.acquiredRecords(1L, 1))
                 .setAcknowledgeErrorCode(Errors.NONE.code()));
-        client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, 
partitionData, Collections.emptyList(), 0), nodeId1);
+        client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, 
partitionData, List.of(), 0), nodeId1);
         networkClientDelegate.poll(time.timer(0));
         assertTrue(shareConsumeRequestManager.hasCompletedFetches());
 
@@ -2105,7 +2102,7 @@ public class ShareConsumeRequestManagerTest {
         buildRequestManager();
         
shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
 
-        subscriptions.subscribeToShareGroup(Collections.singleton(topicName));
+        subscriptions.subscribeToShareGroup(Set.of(topicName));
         Set<TopicPartition> partitions = new HashSet<>();
         partitions.add(tp0);
         partitions.add(tp1);
@@ -2125,9 +2122,9 @@ public class ShareConsumeRequestManagerTest {
 
         LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> 
partitionData =
                 buildPartitionDataMap(tip0, records, 
ShareCompletedFetchTest.acquiredRecords(1L, 1), Errors.NONE, Errors.NONE);
-        client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, 
partitionData, Collections.emptyList(), 0), nodeId0);
+        client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, 
partitionData, List.of(), 0), nodeId0);
         partitionData = buildPartitionDataMap(tip1, records, 
ShareCompletedFetchTest.acquiredRecords(1L, 2), Errors.NONE, Errors.NONE);
-        client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, 
partitionData, Collections.emptyList(), 0), nodeId1);
+        client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, 
partitionData, List.of(), 0), nodeId1);
         networkClientDelegate.poll(time.timer(0));
         assertTrue(shareConsumeRequestManager.hasCompletedFetches());
 
@@ -2178,7 +2175,7 @@ public class ShareConsumeRequestManagerTest {
         buildRequestManager();
         
shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
 
-        subscriptions.subscribeToShareGroup(Collections.singleton(topicName));
+        subscriptions.subscribeToShareGroup(Set.of(topicName));
         subscriptions.assignFromSubscribed(List.of(tp0, tp1));
 
         client.updateMetadata(
@@ -2195,9 +2192,9 @@ public class ShareConsumeRequestManagerTest {
 
         LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> 
partitionData =
                 buildPartitionDataMap(tip0, records, 
ShareCompletedFetchTest.acquiredRecords(1L, 1), Errors.NONE, Errors.NONE);
-        client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, 
partitionData, Collections.emptyList(), 0), nodeId0);
+        client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, 
partitionData, List.of(), 0), nodeId0);
         partitionData = buildPartitionDataMap(tip1, records, 
ShareCompletedFetchTest.acquiredRecords(1L, 2), Errors.NONE, Errors.NONE);
-        client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, 
partitionData, Collections.emptyList(), 0), nodeId1);
+        client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, 
partitionData, List.of(), 0), nodeId1);
         networkClientDelegate.poll(time.timer(0));
         assertTrue(shareConsumeRequestManager.hasCompletedFetches());
 
@@ -2250,7 +2247,7 @@ public class ShareConsumeRequestManagerTest {
         buildRequestManager();
         
shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
 
-        subscriptions.subscribeToShareGroup(Collections.singleton(topicName));
+        subscriptions.subscribeToShareGroup(Set.of(topicName));
         Set<TopicPartition> partitions = new HashSet<>();
         partitions.add(tp0);
         partitions.add(tp1);
@@ -2270,9 +2267,9 @@ public class ShareConsumeRequestManagerTest {
 
         LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> 
partitionData =
                 buildPartitionDataMap(tip0, records, 
ShareCompletedFetchTest.acquiredRecords(1L, 1), Errors.NONE, Errors.NONE);
-        client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, 
partitionData, Collections.emptyList(), 0), nodeId0);
+        client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, 
partitionData, List.of(), 0), nodeId0);
         partitionData = buildPartitionDataMap(tip1, records, 
ShareCompletedFetchTest.acquiredRecords(1L, 2), Errors.NONE, Errors.NONE);
-        client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, 
partitionData, Collections.emptyList(), 0), nodeId1);
+        client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, 
partitionData, List.of(), 0), nodeId1);
         networkClientDelegate.poll(time.timer(0));
         assertTrue(shareConsumeRequestManager.hasCompletedFetches());
 
@@ -2328,7 +2325,7 @@ public class ShareConsumeRequestManagerTest {
         buildRequestManager();
         
shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
 
-        subscriptions.subscribeToShareGroup(Collections.singleton(topicName));
+        subscriptions.subscribeToShareGroup(Set.of(topicName));
         Set<TopicPartition> partitions = new HashSet<>();
         partitions.add(tp0);
         partitions.add(tp1);
@@ -2354,13 +2351,13 @@ public class ShareConsumeRequestManagerTest {
                 .setRecords(records)
                 
.setAcquiredRecords(ShareCompletedFetchTest.acquiredRecords(1L, 1))
                 .setAcknowledgeErrorCode(Errors.NONE.code()));
-        client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, 
partitionData, Collections.emptyList(), 0), nodeId0);
+        client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, 
partitionData, List.of(), 0), nodeId0);
         partitionData.clear();
         partitionData.put(tip1,
             new ShareFetchResponseData.PartitionData()
                 .setPartitionIndex(tip1.topicPartition().partition())
                 .setErrorCode(Errors.NONE.code()));
-        client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, 
partitionData, Collections.emptyList(), 0), nodeId1);
+        client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, 
partitionData, List.of(), 0), nodeId1);
         networkClientDelegate.poll(time.timer(0));
         assertTrue(shareConsumeRequestManager.hasCompletedFetches());
 
@@ -2390,7 +2387,7 @@ public class ShareConsumeRequestManagerTest {
                 .setPartitionIndex(tip0.topicPartition().partition())
                 .setErrorCode(Errors.NONE.code())
                 .setAcknowledgeErrorCode(Errors.NONE.code()));
-        client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, 
partitionData, Collections.emptyList(), 0), nodeId0, true);
+        client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, 
partitionData, List.of(), 0), nodeId0, true);
         partitionData.clear();
         partitionData.put(tip1,
             new ShareFetchResponseData.PartitionData()
@@ -2398,7 +2395,7 @@ public class ShareConsumeRequestManagerTest {
                 .setRecords(records)
                 
.setAcquiredRecords(ShareCompletedFetchTest.acquiredRecords(1L, 1))
                 .setAcknowledgeErrorCode(Errors.NONE.code()));
-        client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, 
partitionData, Collections.emptyList(), 0), nodeId1);
+        client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, 
partitionData, List.of(), 0), nodeId1);
         networkClientDelegate.poll(time.timer(0));
         assertTrue(shareConsumeRequestManager.hasCompletedFetches());
 
@@ -2434,7 +2431,7 @@ public class ShareConsumeRequestManagerTest {
         partitionData.put(tip1,
             new ShareFetchResponseData.PartitionData()
                 .setPartitionIndex(tip1.topicPartition().partition()));
-        client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, 
partitionData, Collections.emptyList(), 0), nodeId1);
+        client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, 
partitionData, List.of(), 0), nodeId1);
         networkClientDelegate.poll(time.timer(0));
         assertTrue(shareConsumeRequestManager.hasCompletedFetches());
 
@@ -2453,7 +2450,7 @@ public class ShareConsumeRequestManagerTest {
         // We will simulate two nodes, each with one partition. The first node 
will have more records
         buildRequestManager(ShareAcquireMode.RECORD_LIMIT);
 
-        subscriptions.subscribeToShareGroup(Collections.singleton(topicName));
+        subscriptions.subscribeToShareGroup(Set.of(topicName));
         Set<TopicPartition> partitions = new LinkedHashSet<>();
         partitions.add(tp0);
         partitions.add(tp1);
@@ -2566,7 +2563,7 @@ public class ShareConsumeRequestManagerTest {
     public void testShareFetchWithRenewAcknowledgement() {
         buildRequestManager();
 
-        assignFromSubscribed(Collections.singleton(tp0));
+        assignFromSubscribed(Set.of(tp0));
         sendFetchAndVerifyResponse(records, acquiredRecords, Errors.NONE);
 
         Acknowledgements acknowledgements = getAcknowledgements(1,
@@ -2624,7 +2621,7 @@ public class ShareConsumeRequestManagerTest {
                 new ShareFetchResponseData.PartitionData()
                         .setPartitionIndex(tp.topicPartition().partition())
                         .setErrorCode(error.code()));
-        return ShareFetchResponse.of(error, 0, new 
LinkedHashMap<>(partitions), Collections.emptyList(), 0);
+        return ShareFetchResponse.of(error, 0, new 
LinkedHashMap<>(partitions), List.of(), 0);
     }
 
     private ShareFetchResponse fullFetchResponse(TopicIdPartition tp,
@@ -2641,30 +2638,30 @@ public class ShareConsumeRequestManagerTest {
                                                  Errors acknowledgeError) {
         Map<TopicIdPartition, ShareFetchResponseData.PartitionData> partitions 
= Map.of(tp,
                 partitionDataForFetch(tp, records, acquiredRecords, error, 
acknowledgeError));
-        return ShareFetchResponse.of(Errors.NONE, 0, new 
LinkedHashMap<>(partitions), Collections.emptyList(), 0);
+        return ShareFetchResponse.of(Errors.NONE, 0, new 
LinkedHashMap<>(partitions), List.of(), 0);
     }
 
     private ShareAcknowledgeResponse emptyAcknowledgeResponse() {
-        Map<TopicIdPartition, ShareAcknowledgeResponseData.PartitionData> 
partitions = Collections.emptyMap();
-        return ShareAcknowledgeResponse.of(Errors.NONE, 0, new 
LinkedHashMap<>(partitions), Collections.emptyList(), 0);
+        Map<TopicIdPartition, ShareAcknowledgeResponseData.PartitionData> 
partitions = Map.of();
+        return ShareAcknowledgeResponse.of(Errors.NONE, 0, new 
LinkedHashMap<>(partitions), List.of(), 0);
     }
 
     private ShareAcknowledgeResponse 
acknowledgeResponseWithTopLevelError(TopicIdPartition tp, Errors error) {
         Map<TopicIdPartition, ShareAcknowledgeResponseData.PartitionData> 
partitions = Map.of(tp,
                 partitionDataForAcknowledge(tp, Errors.NONE));
-        return ShareAcknowledgeResponse.of(error, 0, new 
LinkedHashMap<>(partitions), Collections.emptyList(), 0);
+        return ShareAcknowledgeResponse.of(error, 0, new 
LinkedHashMap<>(partitions), List.of(), 0);
     }
 
     private ShareAcknowledgeResponse fullAcknowledgeResponse(TopicIdPartition 
tp, Errors error) {
         Map<TopicIdPartition, ShareAcknowledgeResponseData.PartitionData> 
partitions = Map.of(tp,
                 partitionDataForAcknowledge(tp, error));
-        return ShareAcknowledgeResponse.of(Errors.NONE, 0, new 
LinkedHashMap<>(partitions), Collections.emptyList(), 0);
+        return ShareAcknowledgeResponse.of(Errors.NONE, 0, new 
LinkedHashMap<>(partitions), List.of(), 0);
     }
 
     private ShareAcknowledgeResponse 
fullAcknowledgeResponse(Map<TopicIdPartition, Errors> partitionErrorsMap) {
         Map<TopicIdPartition, ShareAcknowledgeResponseData.PartitionData> 
partitions = new HashMap<>();
         partitionErrorsMap.forEach((tip, error) -> partitions.put(tip, 
partitionDataForAcknowledge(tip, error)));
-        return ShareAcknowledgeResponse.of(Errors.NONE, 0, new 
LinkedHashMap<>(partitions), Collections.emptyList(), 0);
+        return ShareAcknowledgeResponse.of(Errors.NONE, 0, new 
LinkedHashMap<>(partitions), List.of(), 0);
     }
 
     private ShareAcknowledgeResponse fullAcknowledgeResponse(TopicIdPartition 
tp,
@@ -2712,7 +2709,7 @@ public class ShareConsumeRequestManagerTest {
      */
     private void assertEmptyFetch(String reason) {
         ShareFetch<?, ?> fetch = collectFetch();
-        assertEquals(Collections.emptyMap(), fetch.records(), reason);
+        assertEquals(Map.of(), fetch.records(), reason);
         assertTrue(fetch.isEmpty(), reason);
     }
 
@@ -2728,7 +2725,7 @@ public class ShareConsumeRequestManagerTest {
     private <K, V> Map<TopicPartition, List<ConsumerRecord<K, V>>> 
fetchRecords() {
         ShareFetch<K, V> fetch = collectFetch();
         if (fetch.isEmpty()) {
-            return Collections.emptyMap();
+            return Map.of();
         }
         return fetch.records();
     }
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java
index 6943c39f060..24baa003d5a 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java
@@ -88,8 +88,6 @@ import java.util.function.Predicate;
 
 import javax.security.auth.login.LoginException;
 
-import static java.util.Collections.singleton;
-import static java.util.Collections.singletonList;
 import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_SHARE_METRIC_GROUP;
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -254,7 +252,7 @@ public class ShareConsumerImplTest {
         final String topicName = "foo";
         
doReturn(ShareFetch.empty()).when(fetchCollector).collect(any(ShareFetchBuffer.class));
 
-        final List<String> subscriptionTopic = 
Collections.singletonList(topicName);
+        final List<String> subscriptionTopic = List.of(topicName);
         
completeShareSubscriptionChangeApplicationEventSuccessfully(subscriptions, 
subscriptionTopic);
         consumer.subscribe(subscriptionTopic);
 
@@ -271,7 +269,7 @@ public class ShareConsumerImplTest {
 
         // Set up subscription
         final String topicName = "foo";
-        final List<String> subscriptionTopic = 
Collections.singletonList(topicName);
+        final List<String> subscriptionTopic = List.of(topicName);
         
completeShareSubscriptionChangeApplicationEventSuccessfully(subscriptions, 
subscriptionTopic);
         consumer.subscribe(subscriptionTopic);
 
@@ -312,7 +310,7 @@ public class ShareConsumerImplTest {
             return ShareFetch.empty();
         }).doAnswer(invocation -> 
ShareFetch.empty()).when(fetchCollector).collect(any(ShareFetchBuffer.class));
 
-        final List<String> subscriptionTopic = 
Collections.singletonList(topicName);
+        final List<String> subscriptionTopic = List.of(topicName);
         
completeShareSubscriptionChangeApplicationEventSuccessfully(subscriptions, 
subscriptionTopic);
         consumer.subscribe(subscriptionTopic);
 
@@ -337,7 +335,7 @@ public class ShareConsumerImplTest {
             return fetch;
         }).when(fetchCollector).collect(Mockito.any(ShareFetchBuffer.class));
 
-        final List<String> subscriptionTopic = 
Collections.singletonList(topicName);
+        final List<String> subscriptionTopic = List.of(topicName);
         
completeShareSubscriptionChangeApplicationEventSuccessfully(subscriptions, 
subscriptionTopic);
         consumer.subscribe(subscriptionTopic);
 
@@ -377,7 +375,7 @@ public class ShareConsumerImplTest {
                 .collect(any(ShareFetchBuffer.class));
 
         // Set up subscription
-        List<String> topics = Collections.singletonList(topic);
+        List<String> topics = List.of(topic);
         
completeShareSubscriptionChangeApplicationEventSuccessfully(subscriptions, 
topics);
         consumer.subscribe(topics);
 
@@ -436,7 +434,7 @@ public class ShareConsumerImplTest {
             .collect(any(ShareFetchBuffer.class));
 
         // Set up subscription
-        List<String> topics = Collections.singletonList(topic);
+        List<String> topics = List.of(topic);
         
completeShareSubscriptionChangeApplicationEventSuccessfully(subscriptions, 
topics);
         consumer.subscribe(topics);
         assertEquals(Optional.empty(), consumer.acquisitionLockTimeoutMs());
@@ -516,7 +514,7 @@ public class ShareConsumerImplTest {
             .collect(any(ShareFetchBuffer.class));
 
         // Set up subscription
-        List<String> topics = Collections.singletonList(topic);
+        List<String> topics = List.of(topic);
         
completeShareSubscriptionChangeApplicationEventSuccessfully(subscriptions, 
topics);
         consumer.subscribe(topics);
 
@@ -674,10 +672,10 @@ public class ShareConsumerImplTest {
         consumer = newConsumer(subscriptions);
 
         String topic = "topic1";
-        final List<String> subscriptionTopic = singletonList(topic);
+        final List<String> subscriptionTopic = List.of(topic);
         
completeShareSubscriptionChangeApplicationEventSuccessfully(subscriptions, 
subscriptionTopic);
         consumer.subscribe(subscriptionTopic);
-        assertEquals(singleton(topic), consumer.subscription());
+        assertEquals(Set.of(topic), consumer.subscription());
         
verify(applicationEventHandler).addAndGet(ArgumentMatchers.isA(ShareSubscriptionChangeEvent.class));
     }
 
@@ -700,7 +698,7 @@ public class ShareConsumerImplTest {
 
         completeShareUnsubscribeApplicationEventSuccessfully(subscriptions);
 
-        consumer.subscribe(Collections.emptyList());
+        consumer.subscribe(List.of());
 
         
verify(applicationEventHandler).addAndGet(ArgumentMatchers.isA(ShareUnsubscribeEvent.class));
     }
@@ -714,14 +712,14 @@ public class ShareConsumerImplTest {
     @Test
     public void testSubscriptionOnNullTopic() {
         consumer = newConsumer();
-        assertThrows(IllegalArgumentException.class, () -> 
consumer.subscribe(singletonList(null)));
+        assertThrows(IllegalArgumentException.class, () -> 
consumer.subscribe(Collections.singletonList(null)));
     }
 
     @Test
     public void testSubscriptionOnEmptyTopic() {
         consumer = newConsumer();
         String emptyTopic = "  ";
-        assertThrows(IllegalArgumentException.class, () -> 
consumer.subscribe(singletonList(emptyTopic)));
+        assertThrows(IllegalArgumentException.class, () -> 
consumer.subscribe(Collections.singletonList(emptyTopic)));
     }
 
     @Test
@@ -733,7 +731,7 @@ public class ShareConsumerImplTest {
         final KafkaException expectedException = new KafkaException("Nobody 
expects the Spanish Inquisition");
         final ErrorEvent errorBackgroundEvent = new 
ErrorEvent(expectedException);
         backgroundEventQueue.add(errorBackgroundEvent);
-        consumer.subscribe(Collections.singletonList("t1"));
+        consumer.subscribe(List.of("t1"));
         final KafkaException exception = assertThrows(KafkaException.class, () 
-> consumer.poll(Duration.ZERO));
 
         assertEquals(expectedException.getMessage(), exception.getMessage());
@@ -751,7 +749,7 @@ public class ShareConsumerImplTest {
         final KafkaException expectedException2 = new KafkaException("Spam, 
Spam, Spam");
         final ErrorEvent errorBackgroundEvent2 = new 
ErrorEvent(expectedException2);
         backgroundEventQueue.add(errorBackgroundEvent2);
-        consumer.subscribe(Collections.singletonList("t1"));
+        consumer.subscribe(List.of("t1"));
         final KafkaException exception = assertThrows(KafkaException.class, () 
-> consumer.poll(Duration.ZERO));
 
         assertEquals(expectedException1.getMessage(), exception.getMessage());
@@ -808,7 +806,7 @@ public class ShareConsumerImplTest {
                 .when(fetchCollector)
                 .collect(Mockito.any(ShareFetchBuffer.class));
 
-        final List<String> subscriptionTopic = singletonList("topic");
+        final List<String> subscriptionTopic = List.of("topic");
         
completeShareSubscriptionChangeApplicationEventSuccessfully(subscriptions, 
subscriptionTopic);
         consumer.subscribe(subscriptionTopic);
 
@@ -958,7 +956,7 @@ public class ShareConsumerImplTest {
     /**
      * This test ensures that the {@link ShareConsumer} implementation fails 
on creation when the underlying
      * {@link NetworkClient} fails creation.
-     *
+     * <p>
      * The logic to check for this case is admittedly a bit awkward because 
the constructor can fail for all
      * manner of reasons. So a failure case is created by specifying an invalid
      * {@link javax.security.auth.spi.LoginModule} class name, which in turn 
causes the {@link NetworkClient}
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareFetchCollectorTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareFetchCollectorTest.java
index c144ee4093e..492a8b857a1 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareFetchCollectorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareFetchCollectorTest.java
@@ -48,7 +48,6 @@ import org.junit.jupiter.params.provider.MethodSource;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -336,8 +335,8 @@ public class ShareFetchCollectorTest {
     }
 
     private void subscribeAndAssign(TopicIdPartition tp) {
-        subscriptions.subscribe(Collections.singleton(tp.topic()), 
Optional.empty());
-        
subscriptions.assignFromSubscribed(Collections.singleton(tp.topicPartition()));
+        subscriptions.subscribe(Set.of(tp.topic()), Optional.empty());
+        subscriptions.assignFromSubscribed(Set.of(tp.topicPartition()));
     }
 
     /**
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareHeartbeatRequestManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareHeartbeatRequestManagerTest.java
index 20528e775de..ce457a3a028 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareHeartbeatRequestManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareHeartbeatRequestManagerTest.java
@@ -52,7 +52,8 @@ import org.mockito.ArgumentCaptor;
 
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.Collections;
+import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.Properties;
 import java.util.Random;
@@ -216,7 +217,7 @@ public class ShareHeartbeatRequestManagerTest {
         createHeartbeatRequestStateWithZeroHeartbeatInterval();
         time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS);
         String topic = "topic1";
-        Set<String> set = Collections.singleton(topic);
+        Set<String> set = Set.of(topic);
         when(subscriptions.subscription()).thenReturn(set);
         subscriptions.subscribeToShareGroup(set);
 
@@ -236,7 +237,7 @@ public class ShareHeartbeatRequestManagerTest {
         assertEquals(0, heartbeatRequest.data().memberEpoch());
 
         // Should include subscription and group basic info to start getting 
assignments.
-        assertEquals(Collections.singletonList(topic), 
heartbeatRequest.data().subscribedTopicNames());
+        assertEquals(List.of(topic), 
heartbeatRequest.data().subscribedTopicNames());
         assertEquals(DEFAULT_GROUP_ID, heartbeatRequest.data().groupId());
     }
 
@@ -532,7 +533,7 @@ public class ShareHeartbeatRequestManagerTest {
         assertEquals(DEFAULT_GROUP_ID, data.groupId());
         assertEquals("", data.memberId());
         assertEquals(0, data.memberEpoch());
-        assertEquals(Collections.emptyList(), data.subscribedTopicNames());
+        assertEquals(List.of(), data.subscribedTopicNames());
         membershipManager.onHeartbeatRequestGenerated();
 
         // Mock a response from the group coordinator, that supplies the 
member ID and a new epoch
@@ -549,37 +550,37 @@ public class ShareHeartbeatRequestManagerTest {
 
         // Join the group and subscribe to a topic, but the response has not 
yet been received
         String topic = "topic1";
-        subscriptions.subscribe(Collections.singleton(topic), 
Optional.empty());
-        
when(subscriptions.subscription()).thenReturn(Collections.singleton(topic));
+        subscriptions.subscribe(Set.of(topic), Optional.empty());
+        when(subscriptions.subscription()).thenReturn(Set.of(topic));
         mockRejoiningMemberData();
         data = heartbeatState.buildRequestData();
         assertEquals(DEFAULT_GROUP_ID, data.groupId());
         assertEquals(DEFAULT_MEMBER_ID, data.memberId());
         assertEquals(0, data.memberEpoch());
-        assertEquals(Collections.singletonList(topic), 
data.subscribedTopicNames());
+        assertEquals(List.of(topic), data.subscribedTopicNames());
         membershipManager.onHeartbeatRequestGenerated();
 
         data = heartbeatState.buildRequestData();
         assertEquals(DEFAULT_GROUP_ID, data.groupId());
         assertEquals(DEFAULT_MEMBER_ID, data.memberId());
         assertEquals(0, data.memberEpoch());
-        assertEquals(Collections.singletonList(topic), 
data.subscribedTopicNames());
+        assertEquals(List.of(topic), data.subscribedTopicNames());
 
         // Mock the response from the group coordinator which returns an 
assignment
         ShareGroupHeartbeatResponseData.TopicPartitions tpTopic1 =
                 new ShareGroupHeartbeatResponseData.TopicPartitions();
         Uuid topicId = Uuid.randomUuid();
         tpTopic1.setTopicId(topicId);
-        tpTopic1.setPartitions(Collections.singletonList(0));
+        tpTopic1.setPartitions(List.of(0));
         ShareGroupHeartbeatResponseData.Assignment assignmentTopic1 =
                 new ShareGroupHeartbeatResponseData.Assignment();
-        
assignmentTopic1.setTopicPartitions(Collections.singletonList(tpTopic1));
+        assignmentTopic1.setTopicPartitions(List.of(tpTopic1));
         ShareGroupHeartbeatResponse rs1 = new ShareGroupHeartbeatResponse(new 
ShareGroupHeartbeatResponseData()
                 .setHeartbeatIntervalMs(DEFAULT_HEARTBEAT_INTERVAL_MS)
                 .setMemberId(DEFAULT_MEMBER_ID)
                 .setMemberEpoch(DEFAULT_MEMBER_EPOCH)
                 .setAssignment(assignmentTopic1));
-        
when(metadata.topicNames()).thenReturn(Collections.singletonMap(topicId, 
"topic1"));
+        when(metadata.topicNames()).thenReturn(Map.of(topicId, "topic1"));
         membershipManager.onHeartbeatSuccess(rs1);
     }
 
@@ -811,7 +812,7 @@ public class ShareHeartbeatRequestManagerTest {
     }
 
     private void mockStableMemberData() {
-        when(membershipManager.currentAssignment()).thenReturn(new 
AbstractMembershipManager.LocalAssignment(0, Collections.emptyMap()));
+        when(membershipManager.currentAssignment()).thenReturn(new 
AbstractMembershipManager.LocalAssignment(0, Map.of()));
         when(membershipManager.groupId()).thenReturn(DEFAULT_GROUP_ID);
         when(membershipManager.memberId()).thenReturn(DEFAULT_MEMBER_ID);
         when(membershipManager.memberEpoch()).thenReturn(DEFAULT_MEMBER_EPOCH);
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareMembershipManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareMembershipManagerTest.java
index 346f7f4f7d9..bc782045577 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareMembershipManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareMembershipManagerTest.java
@@ -42,7 +42,6 @@ import org.junit.jupiter.params.provider.ValueSource;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -186,7 +185,7 @@ public class ShareMembershipManagerTest {
         when(subscriptionState.hasAutoAssignedPartitions()).thenReturn(true);
         membershipManager.transitionToFatal();
         assertEquals(MemberState.FATAL, membershipManager.state());
-        verify(subscriptionState).assignFromSubscribed(Collections.emptySet());
+        verify(subscriptionState).assignFromSubscribed(Set.of());
 
         membershipManager.leaveGroup();
         verify(subscriptionState).unsubscribe();
@@ -208,7 +207,7 @@ public class ShareMembershipManagerTest {
     public void testFencingWhenStateIsStable() {
         ShareMembershipManager membershipManager = createMemberInStableState();
         
testFencedMemberReleasesAssignmentAndTransitionsToJoining(membershipManager);
-        verify(subscriptionState).assignFromSubscribed(Collections.emptySet());
+        verify(subscriptionState).assignFromSubscribed(Set.of());
     }
 
     @Test
@@ -280,7 +279,7 @@ public class ShareMembershipManagerTest {
         assertEquals(MemberState.RECONCILING, membershipManager.state());
 
         
testFencedMemberReleasesAssignmentAndTransitionsToJoining(membershipManager);
-        verify(subscriptionState).assignFromSubscribed(Collections.emptySet());
+        verify(subscriptionState).assignFromSubscribed(Set.of());
     }
 
     @Test
@@ -340,12 +339,12 @@ public class ShareMembershipManagerTest {
         Uuid topic1 = Uuid.randomUuid();
         final ShareGroupHeartbeatResponseData.Assignment assignment1 = new 
ShareGroupHeartbeatResponseData.Assignment();
         final ShareGroupHeartbeatResponseData.Assignment assignment2 = new 
ShareGroupHeartbeatResponseData.Assignment()
-                .setTopicPartitions(Collections.singletonList(
+                .setTopicPartitions(List.of(
                         new ShareGroupHeartbeatResponseData.TopicPartitions()
                                 .setTopicId(topic1)
                                 .setPartitions(Arrays.asList(0, 1, 2))
                 ));
-        
when(metadata.topicNames()).thenReturn(Collections.singletonMap(topic1, 
"topic1"));
+        when(metadata.topicNames()).thenReturn(Map.of(topic1, "topic1"));
         assertEquals(toTopicIdPartitionMap(assignment1), 
membershipManager.currentAssignment().partitions);
 
         // Receive assignment, wait on commit
@@ -382,7 +381,7 @@ public class ShareMembershipManagerTest {
      * This is the case where we receive a new assignment while reconciling an 
existing one. The intermediate assignment
      * is not applied, and a new assignment containing the same partitions is 
received and reconciled. In all assignments,
      * one topic is not resolvable.
-     *
+     * <p>
      * We need to make sure that the last assignment is acked and applied, 
even though the set of partitions does not change.
      * In this case, no rebalance listeners are run.
      */
@@ -393,15 +392,15 @@ public class ShareMembershipManagerTest {
         Uuid topic2 = Uuid.randomUuid();
         final ShareGroupHeartbeatResponseData.Assignment assignment1 = new 
ShareGroupHeartbeatResponseData.Assignment()
                 .setTopicPartitions(Arrays.asList(
-                        new 
ShareGroupHeartbeatResponseData.TopicPartitions().setTopicId(topic1).setPartitions(Collections.singletonList(0)),
-                        new 
ShareGroupHeartbeatResponseData.TopicPartitions().setTopicId(topic2).setPartitions(Collections.singletonList(0))
+                        new 
ShareGroupHeartbeatResponseData.TopicPartitions().setTopicId(topic1).setPartitions(List.of(0)),
+                        new 
ShareGroupHeartbeatResponseData.TopicPartitions().setTopicId(topic2).setPartitions(List.of(0))
                 ));
         final ShareGroupHeartbeatResponseData.Assignment assignment2 = new 
ShareGroupHeartbeatResponseData.Assignment()
                 .setTopicPartitions(Arrays.asList(
                         new 
ShareGroupHeartbeatResponseData.TopicPartitions().setTopicId(topic1).setPartitions(Arrays.asList(0,
 1)),
-                        new 
ShareGroupHeartbeatResponseData.TopicPartitions().setTopicId(topic2).setPartitions(Collections.singletonList(0))
+                        new 
ShareGroupHeartbeatResponseData.TopicPartitions().setTopicId(topic2).setPartitions(List.of(0))
                 ));
-        
when(metadata.topicNames()).thenReturn(Collections.singletonMap(topic1, 
"topic1"));
+        when(metadata.topicNames()).thenReturn(Map.of(topic1, "topic1"));
 
         // Receive assignment - full reconciliation triggered
         // stay in RECONCILING state, since an unresolved topic is assigned
@@ -410,7 +409,7 @@ public class ShareMembershipManagerTest {
         assertEquals(MemberState.RECONCILING, membershipManager.state());
         membershipManager.poll(time.milliseconds());
         verifyReconciliationTriggeredAndCompleted(membershipManager,
-                Collections.singletonList(new TopicIdPartition(topic1, new 
TopicPartition("topic1", 0)))
+                List.of(new TopicIdPartition(topic1, new 
TopicPartition("topic1", 0)))
         );
         membershipManager.onHeartbeatRequestGenerated();
         assertEquals(MemberState.RECONCILING, membershipManager.state());
@@ -427,7 +426,7 @@ public class ShareMembershipManagerTest {
         membershipManager.poll(time.milliseconds());
         assertEquals(MemberState.ACKNOWLEDGING, membershipManager.state());
         verifyReconciliationNotTriggered(membershipManager);
-        assertEquals(Collections.singletonMap(topic1, mkSortedSet(0)), 
membershipManager.currentAssignment().partitions);
+        assertEquals(Map.of(topic1, mkSortedSet(0)), 
membershipManager.currentAssignment().partitions);
         assertEquals(Set.of(topic2), 
membershipManager.topicsAwaitingReconciliation());
     }
 
@@ -454,12 +453,12 @@ public class ShareMembershipManagerTest {
 
         // Receive assignment with only topic1-0, entering STABLE state.
         ShareMembershipManager membershipManager =
-                mockMemberSuccessfullyReceivesAndAcksAssignment(topicId1, 
topic1, Collections.singletonList(0));
+                mockMemberSuccessfullyReceivesAndAcksAssignment(topicId1, 
topic1, List.of(0));
 
         membershipManager.onHeartbeatRequestGenerated();
 
         assertEquals(MemberState.STABLE, membershipManager.state());
-        
when(subscriptionState.assignedPartitions()).thenReturn(getTopicPartitions(Collections.singleton(topicId1Partition0)));
+        
when(subscriptionState.assignedPartitions()).thenReturn(getTopicPartitions(Set.of(topicId1Partition0)));
         clearInvocations(membershipManager, subscriptionState);
 
         // New assignment adding a new topic2-0 (not in metadata).
@@ -482,7 +481,7 @@ public class ShareMembershipManagerTest {
 
         verifyReconciliationNotTriggered(membershipManager);
         assertEquals(MemberState.RECONCILING, membershipManager.state());
-        assertEquals(Collections.singleton(topicId2), 
membershipManager.topicsAwaitingReconciliation());
+        assertEquals(Set.of(topicId2), 
membershipManager.topicsAwaitingReconciliation());
         verify(metadata).requestUpdate(anyBoolean());
         clearInvocations(membershipManager);
 
@@ -504,7 +503,7 @@ public class ShareMembershipManagerTest {
     public void testLeaveGroupWhenStateIsStable() {
         ShareMembershipManager membershipManager = createMemberInStableState();
         
testLeaveGroupReleasesAssignmentAndResetsEpochToSendLeaveGroup(membershipManager);
-        verify(subscriptionState).assignFromSubscribed(Collections.emptySet());
+        verify(subscriptionState).assignFromSubscribed(Set.of());
     }
 
     @Test
@@ -637,7 +636,7 @@ public class ShareMembershipManagerTest {
         Uuid topicId = Uuid.randomUuid();
         String topicName = "topic1";
         ShareMembershipManager membershipManager = 
createMembershipManagerJoiningGroup();
-        mockOwnedPartitionAndAssignmentReceived(membershipManager, topicId, 
topicName, Collections.emptyList());
+        mockOwnedPartitionAndAssignmentReceived(membershipManager, topicId, 
topicName, List.of());
 
         receiveAssignment(topicId, Arrays.asList(0, 1), membershipManager);
 
@@ -659,13 +658,13 @@ public class ShareMembershipManagerTest {
         ShareMembershipManager membershipManager = createMemberInStableState();
 
         // Clear the assignment
-        
when(subscriptionState.assignedPartitions()).thenReturn(Collections.emptySet());
+        when(subscriptionState.assignedPartitions()).thenReturn(Set.of());
         when(subscriptionState.hasAutoAssignedPartitions()).thenReturn(false);
 
         membershipManager.transitionToFenced();
 
         // Make sure to never call `assignFromSubscribed` again
-        verify(subscriptionState, 
never()).assignFromSubscribed(Collections.emptySet());
+        verify(subscriptionState, never()).assignFromSubscribed(Set.of());
     }
 
     @Test
@@ -678,7 +677,7 @@ public class ShareMembershipManagerTest {
         verify(subscriptionState).unsubscribe();
         assertFalse(leaveResult1.isDone());
         assertEquals(MemberState.LEAVING, membershipManager.state());
-        verify(subscriptionState).assignFromSubscribed(Collections.emptySet());
+        verify(subscriptionState).assignFromSubscribed(Set.of());
         clearInvocations(subscriptionState);
 
         // Second leave attempt while the first one has not completed yet.
@@ -693,7 +692,7 @@ public class ShareMembershipManagerTest {
         assertFalse(leaveResult2.isCompletedExceptionally());
 
         // Subscription should have been updated only once with the first 
leave group.
-        verify(subscriptionState, 
never()).assignFromSubscribed(Collections.emptySet());
+        verify(subscriptionState, never()).assignFromSubscribed(Set.of());
     }
 
     @Test
@@ -705,7 +704,7 @@ public class ShareMembershipManagerTest {
         CompletableFuture<Void> leaveResult1 = membershipManager.leaveGroup();
         assertEquals(MemberState.LEAVING, membershipManager.state());
         
assertTransitionToUnsubscribeOnHBSentAndWaitForResponseToCompleteLeave(membershipManager,
 leaveResult1);
-        verify(subscriptionState).assignFromSubscribed(Collections.emptySet());
+        verify(subscriptionState).assignFromSubscribed(Set.of());
         clearInvocations(subscriptionState);
 
         // Call to leave group again, when member already left. Should be 
no-op (no assignment updated)
@@ -715,7 +714,7 @@ public class ShareMembershipManagerTest {
         assertTrue(leaveResult2.isDone());
         assertFalse(leaveResult2.isCompletedExceptionally());
         assertEquals(MemberState.UNSUBSCRIBED, membershipManager.state());
-        verify(subscriptionState, 
never()).assignFromSubscribed(Collections.emptySet());
+        verify(subscriptionState, never()).assignFromSubscribed(Set.of());
     }
 
     @Test
@@ -731,7 +730,7 @@ public class ShareMembershipManagerTest {
     }
 
     @Test
-    public void testFatalFailureWhenStateIsUnjoined() {
+    public void testFatalFailureWhenStateIsJoining() {
         ShareMembershipManager membershipManager = 
createMembershipManagerJoiningGroup();
         assertEquals(MemberState.JOINING, membershipManager.state());
 
@@ -819,13 +818,13 @@ public class ShareMembershipManagerTest {
         // reconcile the new assignment.
         Uuid topicId = Uuid.randomUuid();
         String topicName = "topic1";
-        
when(metadata.topicNames()).thenReturn(Collections.singletonMap(topicId, 
topicName));
-        receiveAssignment(topicId, Collections.singletonList(0), 
membershipManager);
+        when(metadata.topicNames()).thenReturn(Map.of(topicId, topicName));
+        receiveAssignment(topicId, List.of(0), membershipManager);
 
         verifyReconciliationNotTriggered(membershipManager);
         membershipManager.poll(time.milliseconds());
 
-        Set<TopicPartition> expectedAssignment = Collections.singleton(new 
TopicPartition(topicName, 0));
+        Set<TopicPartition> expectedAssignment = Set.of(new 
TopicPartition(topicName, 0));
         assertEquals(MemberState.ACKNOWLEDGING, membershipManager.state());
         
verify(subscriptionState).assignFromSubscribedAwaitingCallback(expectedAssignment,
 expectedAssignment);
 
@@ -860,14 +859,14 @@ public class ShareMembershipManagerTest {
         // reconcile the new assignment.
         Uuid topicId = Uuid.randomUuid();
         String topicName = "topic1";
-        
when(metadata.topicNames()).thenReturn(Collections.singletonMap(topicId, 
topicName));
+        when(metadata.topicNames()).thenReturn(Map.of(topicId, topicName));
         receiveEmptyAssignment(membershipManager);
 
         verifyReconciliationNotTriggered(membershipManager);
 
         membershipManager.poll(time.milliseconds());
 
-        verifyReconciliationTriggeredAndCompleted(membershipManager, 
Collections.emptyList());
+        verifyReconciliationTriggeredAndCompleted(membershipManager, 
List.of());
 
         membershipManager.onHeartbeatRequestGenerated();
 
@@ -885,8 +884,8 @@ public class ShareMembershipManagerTest {
         // one that is waiting for metadata, so the member will discard the 
topics that were
         // waiting for metadata, and just keep the new one as unresolved.
         Uuid topicId = Uuid.randomUuid();
-        when(metadata.topicNames()).thenReturn(Collections.emptyMap());
-        receiveAssignment(topicId, Collections.singletonList(0), 
membershipManager);
+        when(metadata.topicNames()).thenReturn(Map.of());
+        receiveAssignment(topicId, List.of(0), membershipManager);
 
         verifyReconciliationNotTriggered(membershipManager);
         membershipManager.poll(time.milliseconds());
@@ -907,19 +906,19 @@ public class ShareMembershipManagerTest {
         // Assignment not in metadata. Member cannot reconcile it yet, but 
keeps it to be
         // reconciled when metadata is discovered.
         Uuid topicId = Uuid.randomUuid();
-        receiveAssignment(topicId, Collections.singletonList(1), 
membershipManager);
+        receiveAssignment(topicId, List.of(1), membershipManager);
         assertEquals(MemberState.RECONCILING, membershipManager.state());
         
assertFalse(membershipManager.topicsAwaitingReconciliation().isEmpty());
 
         // Metadata update received, including the missing topic name.
         String topicName = "topic1";
-        
when(metadata.topicNames()).thenReturn(Collections.singletonMap(topicId, 
topicName));
+        when(metadata.topicNames()).thenReturn(Map.of(topicId, topicName));
         when(subscriptionState.hasAutoAssignedPartitions()).thenReturn(true);
 
         membershipManager.poll(time.milliseconds());
 
         // Assignment should have been reconciled.
-        Set<TopicPartition> expectedAssignment = Collections.singleton(new 
TopicPartition(topicName, 1));
+        Set<TopicPartition> expectedAssignment = Set.of(new 
TopicPartition(topicName, 1));
         
verify(subscriptionState).assignFromSubscribedAwaitingCallback(expectedAssignment,
 expectedAssignment);
         assertEquals(MemberState.ACKNOWLEDGING, membershipManager.state());
         assertTrue(membershipManager.topicsAwaitingReconciliation().isEmpty());
@@ -940,19 +939,19 @@ public class ShareMembershipManagerTest {
                 .setTopicPartitions(Arrays.asList(
                         new ShareGroupHeartbeatResponseData.TopicPartitions()
                                 .setTopicId(topic1)
-                                .setPartitions(Collections.singletonList(0)),
+                                .setPartitions(List.of(0)),
                         new ShareGroupHeartbeatResponseData.TopicPartitions()
                                 .setTopicId(topic2)
                                 .setPartitions(Arrays.asList(1, 3))
                 ));
-        
when(metadata.topicNames()).thenReturn(Collections.singletonMap(topic1, 
topic1Name));
+        when(metadata.topicNames()).thenReturn(Map.of(topic1, topic1Name));
 
         // Receive assignment partly in metadata - reconcile+ack what's in 
metadata, keep the
         // unresolved and request metadata update.
         ShareMembershipManager membershipManager = 
mockJoinAndReceiveAssignment(true, assignment);
         assertEquals(MemberState.ACKNOWLEDGING, membershipManager.state());
         verify(metadata).requestUpdate(anyBoolean());
-        assertEquals(Collections.singleton(topic2), 
membershipManager.topicsAwaitingReconciliation());
+        assertEquals(Set.of(topic2), 
membershipManager.topicsAwaitingReconciliation());
 
         // When the ack is sent the member should go back to RECONCILING 
because it still has
         // unresolved assignment to be reconciled.
@@ -964,7 +963,7 @@ public class ShareMembershipManagerTest {
         clearInvocations(subscriptionState);
         
membershipManager.onHeartbeatSuccess(createShareGroupHeartbeatResponse(assignment,
 membershipManager.memberId()));
         assertEquals(MemberState.RECONCILING, membershipManager.state());
-        assertEquals(Collections.singleton(topic2), 
membershipManager.topicsAwaitingReconciliation());
+        assertEquals(Set.of(topic2), 
membershipManager.topicsAwaitingReconciliation());
         verify(subscriptionState, 
never()).assignFromSubscribed(anyCollection());
     }
 
@@ -973,7 +972,7 @@ public class ShareMembershipManagerTest {
         Uuid topicId = Uuid.randomUuid();
         String topicName = "topic1";
         ShareMembershipManager membershipManager = 
createMembershipManagerJoiningGroup();
-        mockOwnedPartitionAndAssignmentReceived(membershipManager, topicId, 
topicName, Collections.emptyList());
+        mockOwnedPartitionAndAssignmentReceived(membershipManager, topicId, 
topicName, List.of());
 
         receiveAssignment(topicId, Arrays.asList(0, 1), membershipManager);
 
@@ -991,7 +990,7 @@ public class ShareMembershipManagerTest {
         TopicIdPartition ownedPartition = new TopicIdPartition(topicId, new 
TopicPartition(topicName, 0));
         ShareMembershipManager membershipManager = createMemberInStableState();
         mockOwnedPartitionAndAssignmentReceived(membershipManager, topicId, 
topicName,
-                Collections.singletonList(ownedPartition));
+                List.of(ownedPartition));
 
         // New assignment received, adding partitions 1 and 2 to the 
previously owned partition 0.
         receiveAssignment(topicId, Arrays.asList(0, 1, 2), membershipManager);
@@ -1013,7 +1012,7 @@ public class ShareMembershipManagerTest {
         String topicName = "topic1";
 
         // Receive assignment different from what the member owns - should 
reconcile
-        mockOwnedPartitionAndAssignmentReceived(membershipManager, topicId, 
topicName, Collections.emptyList());
+        mockOwnedPartitionAndAssignmentReceived(membershipManager, topicId, 
topicName, List.of());
         List<TopicIdPartition> expectedAssignmentReconciled = 
topicIdPartitions(topicId, topicName, 0, 1);
         receiveAssignment(topicId, Arrays.asList(0, 1), membershipManager);
 
@@ -1048,7 +1047,7 @@ public class ShareMembershipManagerTest {
                 new TopicPartition(topicName, 0));
         ShareMembershipManager membershipManager = 
createMembershipManagerJoiningGroup();
         mockOwnedPartitionAndAssignmentReceived(membershipManager, topicId, 
topicName,
-                Collections.singletonList(ownedPartition));
+                List.of(ownedPartition));
 
         mockRevocation();
 
@@ -1074,7 +1073,7 @@ public class ShareMembershipManagerTest {
 
         // Assignment not in metadata
         ShareGroupHeartbeatResponseData.Assignment targetAssignment = new 
ShareGroupHeartbeatResponseData.Assignment()
-                .setTopicPartitions(Collections.singletonList(
+                .setTopicPartitions(List.of(
                         new ShareGroupHeartbeatResponseData.TopicPartitions()
                                 .setTopicId(topicId)
                                 .setPartitions(Arrays.asList(0, 1))));
@@ -1084,11 +1083,11 @@ public class ShareMembershipManagerTest {
 
         // Should not trigger reconciliation, and request a metadata update.
         verifyReconciliationNotTriggered(membershipManager);
-        assertEquals(Collections.singleton(topicId), 
membershipManager.topicsAwaitingReconciliation());
+        assertEquals(Set.of(topicId), 
membershipManager.topicsAwaitingReconciliation());
         verify(metadata).requestUpdate(anyBoolean());
 
         String topicName = "topic1";
-        mockTopicNameInMetadataCache(Collections.singletonMap(topicId, 
topicName), true);
+        mockTopicNameInMetadataCache(Map.of(topicId, topicName), true);
 
         // When metadata is updated, the member should re-trigger 
reconciliation
         membershipManager.poll(time.milliseconds());
@@ -1104,7 +1103,7 @@ public class ShareMembershipManagerTest {
 
         // Assignment not in metadata
         ShareGroupHeartbeatResponseData.Assignment targetAssignment = new 
ShareGroupHeartbeatResponseData.Assignment()
-                .setTopicPartitions(Collections.singletonList(
+                .setTopicPartitions(List.of(
                         new ShareGroupHeartbeatResponseData.TopicPartitions()
                                 .setTopicId(topicId)
                                 .setPartitions(Arrays.asList(0, 1))));
@@ -1114,15 +1113,15 @@ public class ShareMembershipManagerTest {
 
         // Should not trigger reconciliation, and request a metadata update.
         verifyReconciliationNotTriggered(membershipManager);
-        assertEquals(Collections.singleton(topicId), 
membershipManager.topicsAwaitingReconciliation());
+        assertEquals(Set.of(topicId), 
membershipManager.topicsAwaitingReconciliation());
         verify(metadata).requestUpdate(anyBoolean());
 
         // Next poll is run, but metadata still without the unresolved topic 
in it. Should keep
         // the unresolved and request update again.
-        when(metadata.topicNames()).thenReturn(Collections.emptyMap());
+        when(metadata.topicNames()).thenReturn(Map.of());
         membershipManager.poll(time.milliseconds());
         verifyReconciliationNotTriggered(membershipManager);
-        assertEquals(Collections.singleton(topicId), 
membershipManager.topicsAwaitingReconciliation());
+        assertEquals(Set.of(topicId), 
membershipManager.topicsAwaitingReconciliation());
         verify(metadata, times(2)).requestUpdate(anyBoolean());
     }
 
@@ -1132,7 +1131,7 @@ public class ShareMembershipManagerTest {
         String topicName = "topic1";
 
         ShareMembershipManager membershipManager = 
createMembershipManagerJoiningGroup();
-        mockOwnedPartitionAndAssignmentReceived(membershipManager, topicId, 
topicName, Collections.emptyList());
+        mockOwnedPartitionAndAssignmentReceived(membershipManager, topicId, 
topicName, List.of());
 
         // Member received assignment to reconcile;
 
@@ -1147,7 +1146,7 @@ public class ShareMembershipManagerTest {
         List<Integer> partitions = Arrays.asList(0, 1);
         Set<TopicPartition> assignedPartitions =
                 partitions.stream().map(p -> new TopicPartition(topicName, 
p)).collect(Collectors.toSet());
-        Map<Uuid, SortedSet<Integer>> assignedTopicIdPartitions = 
Collections.singletonMap(topicId,
+        Map<Uuid, SortedSet<Integer>> assignedTopicIdPartitions = 
Map.of(topicId,
                 new TreeSet<>(partitions));
         assertEquals(assignedTopicIdPartitions, 
membershipManager.currentAssignment().partitions);
         assertFalse(membershipManager.reconciliationInProgress());
@@ -1158,10 +1157,10 @@ public class ShareMembershipManagerTest {
         // Revocation of topic not found in metadata cache
         when(subscriptionState.hasAutoAssignedPartitions()).thenReturn(true);
         mockRevocation();
-        mockTopicNameInMetadataCache(Collections.singletonMap(topicId, 
topicName), false);
+        mockTopicNameInMetadataCache(Map.of(topicId, topicName), false);
 
         // Revoke one of the 2 partitions
-        receiveAssignment(topicId, Collections.singletonList(1), 
membershipManager);
+        receiveAssignment(topicId, List.of(1), membershipManager);
 
         membershipManager.poll(time.milliseconds());
         verify(subscriptionState, times(2)).markPendingRevocation(Set.of(new 
TopicPartition(topicName, 0)));
@@ -1287,7 +1286,7 @@ public class ShareMembershipManagerTest {
         for (int partition : partitions)
             topicIdPartitions.add(partition);
 
-        return Collections.singletonMap(topicId, topicIdPartitions);
+        return Map.of(topicId, topicIdPartitions);
     }
 
     private void testFenceIsNoOp(ShareMembershipManager membershipManager) {
@@ -1300,7 +1299,7 @@ public class ShareMembershipManagerTest {
 
         // Should reset epoch to leave the group and release the assignment 
(right away because
         // there is no onPartitionsLost callback defined)
-        verify(subscriptionState).assignFromSubscribed(Collections.emptySet());
+        verify(subscriptionState).assignFromSubscribed(Set.of());
         assertTrue(membershipManager.currentAssignment().partitions.isEmpty());
         assertTrue(membershipManager.topicsAwaitingReconciliation().isEmpty());
         assertEquals(ShareGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH, 
membershipManager.memberEpoch());
@@ -1387,7 +1386,7 @@ public class ShareMembershipManagerTest {
             Uuid topicId, String topicName, List<Integer> partitions) {
         ShareMembershipManager membershipManager = 
createMembershipManagerJoiningGroup();
         mockOwnedPartitionAndAssignmentReceived(membershipManager, topicId, 
topicName,
-                Collections.emptyList());
+                List.of());
 
         receiveAssignment(topicId, partitions, membershipManager);
 
@@ -1432,7 +1431,7 @@ public class ShareMembershipManagerTest {
         if (isPresent) {
             when(metadata.topicNames()).thenReturn(topicNames);
         } else {
-            when(metadata.topicNames()).thenReturn(Collections.emptyMap());
+            when(metadata.topicNames()).thenReturn(Map.of());
         }
     }
 
@@ -1445,7 +1444,7 @@ public class ShareMembershipManagerTest {
     private void mockMemberHasAutoAssignedPartition() {
         String topicName = "topic1";
         TopicPartition ownedPartition = new TopicPartition(topicName, 0);
-        
when(subscriptionState.assignedPartitions()).thenReturn(Collections.singleton(ownedPartition));
+        
when(subscriptionState.assignedPartitions()).thenReturn(Set.of(ownedPartition));
         when(subscriptionState.hasAutoAssignedPartitions()).thenReturn(true);
     }
 
@@ -1459,7 +1458,7 @@ public class ShareMembershipManagerTest {
         List<TopicPartition> expectedTopicPartitionAssignment =
                 buildTopicPartitions(expectedCurrentAssignment);
         HashSet<TopicPartition> expectedSet = new 
HashSet<>(expectedTopicPartitionAssignment);
-        
verify(subscriptionState).assignFromSubscribedAwaitingCallback(expectedSet, 
Collections.emptySet());
+        
verify(subscriptionState).assignFromSubscribedAwaitingCallback(expectedSet, 
Set.of());
     }
 
     private Map<Uuid, SortedSet<Integer>> 
assignmentByTopicId(List<TopicIdPartition> topicIdPartitions) {
@@ -1479,7 +1478,7 @@ public class ShareMembershipManagerTest {
         HashMap<Uuid, SortedSet<Integer>> partitionsByTopicId = new 
HashMap<>();
         partitionsByTopicId.put(topicId, new 
TreeSet<>(previouslyOwned.stream().map(TopicIdPartition::partition).collect(Collectors.toSet())));
         membershipManager.updateAssignment(partitionsByTopicId);
-        
when(metadata.topicNames()).thenReturn(Collections.singletonMap(topicId, 
topicName));
+        when(metadata.topicNames()).thenReturn(Map.of(topicId, topicName));
         when(subscriptionState.hasAutoAssignedPartitions()).thenReturn(true);
     }
 
@@ -1492,8 +1491,8 @@ public class ShareMembershipManagerTest {
     private void mockOwnedPartition(ShareMembershipManager membershipManager, 
Uuid topicId, String topic) {
         int partition = 0;
         TopicPartition previouslyOwned = new TopicPartition(topic, partition);
-        membershipManager.updateAssignment(mkMap(mkEntry(topicId, new 
TreeSet<>(Collections.singletonList(partition)))));
-        
when(subscriptionState.assignedPartitions()).thenReturn(Collections.singleton(previouslyOwned));
+        membershipManager.updateAssignment(mkMap(mkEntry(topicId, new 
TreeSet<>(List.of(partition)))));
+        
when(subscriptionState.assignedPartitions()).thenReturn(Set.of(previouslyOwned));
         when(subscriptionState.hasAutoAssignedPartitions()).thenReturn(true);
     }
 
@@ -1548,7 +1547,7 @@ public class ShareMembershipManagerTest {
 
     private void receiveAssignment(Uuid topicId, List<Integer> partitions, 
ShareMembershipManager membershipManager) {
         ShareGroupHeartbeatResponseData.Assignment targetAssignment = new 
ShareGroupHeartbeatResponseData.Assignment()
-                .setTopicPartitions(Collections.singletonList(
+                .setTopicPartitions(List.of(
                         new ShareGroupHeartbeatResponseData.TopicPartitions()
                                 .setTopicId(topicId)
                                 .setPartitions(partitions)));
@@ -1559,7 +1558,7 @@ public class ShareMembershipManagerTest {
     private void receiveEmptyAssignment(ShareMembershipManager 
membershipManager) {
         // New empty assignment received, revoking owned partition.
         ShareGroupHeartbeatResponseData.Assignment targetAssignment = new 
ShareGroupHeartbeatResponseData.Assignment()
-                .setTopicPartitions(Collections.emptyList());
+                .setTopicPartitions(List.of());
         ShareGroupHeartbeatResponse heartbeatResponse = 
createShareGroupHeartbeatResponse(targetAssignment, 
membershipManager.memberId());
         membershipManager.onHeartbeatSuccess(heartbeatResponse);
     }
@@ -1596,7 +1595,7 @@ public class ShareMembershipManagerTest {
         
assertTransitionToUnsubscribeOnHBSentAndWaitForResponseToCompleteLeave(membershipManager,
 leaveResult);
         assertEquals(-1, membershipManager.memberEpoch());
         assertTrue(membershipManager.currentAssignment().isNone());
-        verify(subscriptionState).assignFromSubscribed(Collections.emptySet());
+        verify(subscriptionState).assignFromSubscribed(Set.of());
     }
 
     private void mockLeaveGroup() {
@@ -1672,8 +1671,8 @@ public class ShareMembershipManagerTest {
         Uuid topicId = Uuid.randomUuid();
         ShareMembershipManager membershipManager = 
mockJoinAndReceiveAssignment(true);
         membershipManager.onHeartbeatRequestGenerated();
-        
when(metadata.topicNames()).thenReturn(Collections.singletonMap(topicId, 
"topic"));
-        receiveAssignment(topicId, Collections.singletonList(0), 
membershipManager);
+        when(metadata.topicNames()).thenReturn(Map.of(topicId, "topic"));
+        receiveAssignment(topicId, List.of(0), membershipManager);
         membershipManager.onHeartbeatRequestGenerated();
         assertFalse(membershipManager.currentAssignment().isNone());
         return membershipManager;
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareSessionHandlerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareSessionHandlerTest.java
index 826978aab5a..b508b388010 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareSessionHandlerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareSessionHandlerTest.java
@@ -37,7 +37,6 @@ import org.junit.jupiter.params.provider.EnumSource;
 import org.junit.jupiter.params.provider.MethodSource;
 
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
@@ -338,10 +337,10 @@ public class ShareSessionHandlerTest {
 
         // If we started with an ID, only a new ID will count towards replaced.
         // The old topic ID partition should be forgotten, and the new one 
should be fetched.
-        assertEquals(Collections.singletonList(tp), 
reqForgetList(requestData2, topicNames));
+        assertEquals(List.of(tp), reqForgetList(requestData2, topicNames));
         assertMapsEqual(reqMap(new TopicIdPartition(topicId2, 0, "foo")),
                 handler.sessionPartitionMap());
-        assertListEquals(Collections.singletonList(tp2), 
reqFetchList(requestData2, topicNames));
+        assertListEquals(List.of(tp2), reqFetchList(requestData2, topicNames));
 
         // Should have the same session ID, and next epoch and can use topic 
IDs if it ended with topic IDs.
         assertEquals(memberId.toString(), requestData2.memberId(), "Did not 
use same session");
@@ -376,7 +375,7 @@ public class ShareSessionHandlerTest {
         // Remove the topic from the session by setting acknowledgements only 
- this is not asking to fetch records
         ShareFetchRequestData requestData2 = 
handler.newShareFetchBuilder(groupId, shareFetchConfig, false).build().data();
         handler.addPartitionToAcknowledgeOnly(foo0, Acknowledgements.empty());
-        assertEquals(Collections.singletonList(foo0), 
reqForgetList(requestData2, topicNames));
+        assertEquals(List.of(foo0), reqForgetList(requestData2, topicNames));
 
         // Should have the same session ID, next epoch, and same ID usage
         assertEquals(memberId.toString(), requestData2.memberId(), "Did not 
use same session");
@@ -410,7 +409,7 @@ public class ShareSessionHandlerTest {
 
         // Remove the topic from the session
         ShareFetchRequestData requestData2 = 
handler.newShareFetchBuilder(groupId, shareFetchConfig, false).build().data();
-        assertEquals(Collections.singletonList(foo0), 
reqForgetList(requestData2, topicNames));
+        assertEquals(List.of(foo0), reqForgetList(requestData2, topicNames));
 
         // Should have the same session ID, next epoch, and same ID usage
         assertEquals(memberId.toString(), requestData2.memberId(), "Did not 
use same session");
@@ -475,7 +474,7 @@ public class ShareSessionHandlerTest {
         handler.addPartitionToFetch(foo0, acknowledgements);
 
         // As we start with a ShareAcknowledge on epoch 0, we expect a null 
response.
-        assertNull(handler.newShareAcknowledgeBuilder(groupId, 
shareFetchConfig));
+        assertNull(handler.newShareAcknowledgeBuilder(groupId));
 
         // Attempt a new ShareFetch
         TopicIdPartition foo1 = new TopicIdPartition(fooId, 1, "foo");

Reply via email to