This is an automated email from the ASF dual-hosted git repository.
schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 84d4f35387f MINOR: Share group tidying (#21711)
84d4f35387f is described below
commit 84d4f35387f7cc029bf487aeaa17aa78cc917c9b
Author: Andrew Schofield <[email protected]>
AuthorDate: Fri Mar 13 12:19:56 2026 +0000
MINOR: Share group tidying (#21711)
Minor tidying up of share group code, mainly fixing compiler warnings.
Also fixed three flaky tests, all of which were failing occasionally
because of impatience.
Reviewers: Manikumar Reddy <[email protected]>
---
.../kafka/clients/consumer/ShareConsumerTest.java | 30 ++-
.../kafka/clients/consumer/MockShareConsumer.java | 3 +-
.../consumer/internals/ShareCompletedFetch.java | 4 +-
.../internals/ShareConsumeRequestManager.java | 2 +-
.../consumer/internals/ShareConsumerImpl.java | 19 +-
.../clients/consumer/internals/ShareFetch.java | 3 +-
.../consumer/internals/ShareFetchCollector.java | 4 +-
.../consumer/internals/ShareSessionHandler.java | 8 +-
.../events/ShareAcknowledgementEvent.java | 2 +-
.../consumer/KafkaShareConsumerMetricsTest.java | 27 +--
.../clients/consumer/MockShareConsumerTest.java | 4 +-
.../AcknowledgementCommitCallbackHandlerTest.java | 7 +-
.../internals/ShareConsumeRequestManagerTest.java | 211 ++++++++++-----------
.../consumer/internals/ShareConsumerImplTest.java | 34 ++--
.../internals/ShareFetchCollectorTest.java | 5 +-
.../ShareHeartbeatRequestManagerTest.java | 25 +--
.../internals/ShareMembershipManagerTest.java | 135 +++++++------
.../internals/ShareSessionHandlerTest.java | 11 +-
18 files changed, 269 insertions(+), 265 deletions(-)
diff --git
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
index d915e70b831..837db109d94 100644
---
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
+++
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
@@ -1281,9 +1281,8 @@ public class ShareConsumerTest {
broker.awaitShutdown();
// Assert that close completes in less than 5 seconds, not the full
30-second timeout.
- assertTimeoutPreemptively(Duration.ofSeconds(5), () -> {
- shareConsumer.close();
- }, "Consumer close should not wait for full timeout when broker is
already shutdown");
+ assertTimeoutPreemptively(Duration.ofSeconds(5), () ->
shareConsumer.close(),
+ "Consumer close should not wait for full timeout when broker is
already shut down");
}
@ClusterTest
@@ -1871,7 +1870,7 @@ public class ShareConsumerTest {
}
// We delete records before offset 5, so the LSO should move to 5.
- adminClient.deleteRecords(Map.of(tp,
RecordsToDelete.beforeOffset(5L)));
+ assertDoesNotThrow(() -> adminClient.deleteRecords(Map.of(tp,
RecordsToDelete.beforeOffset(5L))).all().get(), "Failed to delete records");
int messageCount = consumeMessages(new AtomicInteger(0), 5,
groupId, 1, 10, true);
// The records returned belong to offsets 5-9.
@@ -1883,14 +1882,14 @@ public class ShareConsumerTest {
}
// We delete records before offset 14, so the LSO should move to
14.
- adminClient.deleteRecords(Map.of(tp,
RecordsToDelete.beforeOffset(14L)));
+ assertDoesNotThrow(() -> adminClient.deleteRecords(Map.of(tp,
RecordsToDelete.beforeOffset(14L))).all().get(), "Failed to delete records");
int consumeMessagesCount = consumeMessages(new AtomicInteger(0),
1, groupId, 1, 10, true);
// The record returned belong to offset 14.
assertEquals(1, consumeMessagesCount);
// We delete records before offset 15, so the LSO should move to
15 and now no records should be returned.
- adminClient.deleteRecords(Map.of(tp,
RecordsToDelete.beforeOffset(15L)));
+ assertDoesNotThrow(() -> adminClient.deleteRecords(Map.of(tp,
RecordsToDelete.beforeOffset(15L))).all().get(), "Failed to delete records");
messageCount = consumeMessages(new AtomicInteger(0), 0, groupId,
1, 5, true);
assertEquals(0, messageCount);
@@ -1964,7 +1963,7 @@ public class ShareConsumerTest {
}
// We delete records before offset 5, so the LSO should move to 5.
- adminClient.deleteRecords(Map.of(tp,
RecordsToDelete.beforeOffset(5L)));
+ assertDoesNotThrow(() -> adminClient.deleteRecords(Map.of(tp,
RecordsToDelete.beforeOffset(5L))).all().get(), "Failed to delete records");
int consumedMessageCount = consumeMessages(new AtomicInteger(0),
5, "group1", 1, 10, true);
// The records returned belong to offsets 5-9.
@@ -4379,6 +4378,23 @@ public class ShareConsumerTest {
private void alterShareDeliveryCountLimit(String groupId, String newValue)
{
alterShareGroupConfig(groupId,
GroupConfig.SHARE_DELIVERY_COUNT_LIMIT_CONFIG, newValue);
+
+ // This config is changed dynamically in tests, and we need it to have
propagated before the test proceeds.
+ // Describing the config with a new admin client is not totally
foolproof, but it's better than just
+ // altering the config and continuing.
+ try (Admin adminClient = createAdminClient()) {
+ ConfigResource groupConfigResource = new
ConfigResource(ConfigResource.Type.GROUP, groupId);
+ assertDoesNotThrow(() ->
+ TestUtils.waitForCondition(() -> {
+ try {
+ Config config =
adminClient.describeConfigs(List.of(groupConfigResource)).all().get().get(groupConfigResource);
+ ConfigEntry entry =
config.get(GroupConfig.SHARE_DELIVERY_COUNT_LIMIT_CONFIG);
+ return entry != null && entry.value().equals(newValue);
+ } catch (Exception e) {
+ return false;
+ }
+ }, 10000L, 100L, () -> "New config value did not propagate"),
"Failed to describe configs");
+ }
}
private void alterShareIsolationLevel(String groupId, String newValue) {
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/MockShareConsumer.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/MockShareConsumer.java
index 8aeab59a2b4..049732bb2ea 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/MockShareConsumer.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/MockShareConsumer.java
@@ -30,7 +30,6 @@ import org.apache.kafka.common.utils.LogContext;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -142,7 +141,7 @@ public class MockShareConsumer<K, V> implements
ShareConsumer<K, V> {
@Override
public synchronized Map<MetricName, ? extends Metric> metrics() {
ensureNotClosed();
- return Collections.emptyMap();
+ return Map.of();
}
@Override
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetch.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetch.java
index 7fc39c5d5a1..7a6c2291c93 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetch.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetch.java
@@ -179,7 +179,7 @@ public class ShareCompletedFetch {
ShareInFlightBatch<K, V> inFlightBatch = new
ShareInFlightBatch<>(nodeId, partition, acquisitionLockTimeoutMs);
if (cachedBatchException != null) {
- // If the event that a CRC check fails, reject the entire record
batch because it is corrupt.
+ // In the event that a CRC check fails, reject the entire record
batch because it is corrupt.
Set<Long> offsets = rejectRecordBatch(inFlightBatch, currentBatch);
inFlightBatch.setException(new
ShareInFlightBatchException(cachedBatchException, offsets));
cachedBatchException = null;
@@ -249,7 +249,7 @@ public class ShareCompletedFetch {
}
} catch (CorruptRecordException e) {
if (inFlightBatch.isEmpty()) {
- // If the event that a CRC check fails, reject the entire
record batch because it is corrupt.
+ // In the event that a CRC check fails, reject the entire
record batch because it is corrupt.
Set<Long> offsets = rejectRecordBatch(inFlightBatch,
currentBatch);
inFlightBatch.setException(new ShareInFlightBatchException(e,
offsets));
} else {
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
index c30073d4104..dd1b829bcb3 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
@@ -1237,7 +1237,7 @@ public class ShareConsumeRequestManager implements
RequestManager, MemberStateLi
sessionHandler.addPartitionToFetch(entry.getKey(),
entry.getValue());
}
- ShareAcknowledgeRequest.Builder requestBuilder =
sessionHandler.newShareAcknowledgeBuilder(groupId, shareFetchConfig);
+ ShareAcknowledgeRequest.Builder requestBuilder =
sessionHandler.newShareAcknowledgeBuilder(groupId);
isProcessed = false;
Node nodeToSend = metadata.fetch().nodeById(nodeId);
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
index 3a7eb81df60..bc7e591d78b 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
@@ -81,7 +81,6 @@ import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
-import java.util.Collections;
import java.util.ConcurrentModificationException;
import java.util.HashMap;
import java.util.LinkedList;
@@ -262,7 +261,7 @@ public class ShareConsumerImpl<K, V> implements
ShareConsumerDelegate<K, V> {
this.metrics = createMetrics(config, time, reporters);
this.asyncConsumerMetrics = new AsyncConsumerMetrics(metrics,
CONSUMER_SHARE_METRIC_GROUP);
- this.acknowledgementMode = initializeAcknowledgementMode(config,
log);
+ this.acknowledgementMode = initializeAcknowledgementMode(config);
this.deserializers = new Deserializers<>(config, keyDeserializer,
valueDeserializer, metrics);
this.currentFetch = ShareFetch.empty();
this.subscriptions = createSubscriptionState(config, logContext);
@@ -379,7 +378,7 @@ public class ShareConsumerImpl<K, V> implements
ShareConsumerDelegate<K, V> {
this.metadata = metadata;
this.requestTimeoutMs =
config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
this.defaultApiTimeoutMs =
config.getInt(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG);
- this.acknowledgementMode = initializeAcknowledgementMode(config, log);
+ this.acknowledgementMode = initializeAcknowledgementMode(config);
this.fetchBuffer = new ShareFetchBuffer(logContext);
this.completedAcknowledgements = new LinkedList<>();
@@ -490,7 +489,7 @@ public class ShareConsumerImpl<K, V> implements
ShareConsumerDelegate<K, V> {
this.applicationEventHandler = applicationEventHandler;
this.kafkaShareConsumerMetrics = new
KafkaShareConsumerMetrics(metrics);
this.clientTelemetryReporter = Optional.empty();
- this.completedAcknowledgements = Collections.emptyList();
+ this.completedAcknowledgements = List.of();
this.asyncConsumerMetrics = new AsyncConsumerMetrics(metrics,
CONSUMER_SHARE_METRIC_GROUP);
this.acknowledgementEventHandler = new
ShareAcknowledgementEventHandler(acknowledgementEventQueue);
this.backgroundEventHandler = new BackgroundEventHandler(
@@ -532,7 +531,7 @@ public class ShareConsumerImpl<K, V> implements
ShareConsumerDelegate<K, V> {
public Set<String> subscription() {
acquireAndEnsureOpen();
try {
- return Collections.unmodifiableSet(subscriptions.subscription());
+ return Set.copyOf(subscriptions.subscription());
} finally {
release();
}
@@ -675,7 +674,7 @@ public class ShareConsumerImpl<K, V> implements
ShareConsumerDelegate<K, V> {
wakeupTrigger.clearTask();
}
- return collect(Collections.emptyMap());
+ return collect(Map.of());
}
private ShareFetch<K, V> collect(Map<TopicIdPartition,
NodeAcknowledgements> acknowledgementsMap) {
@@ -797,7 +796,7 @@ public class ShareConsumerImpl<K, V> implements
ShareConsumerDelegate<K, V> {
Timer requestTimer = time.timer(timeout.toMillis());
Map<TopicIdPartition, NodeAcknowledgements> acknowledgementsMap =
acknowledgementsToSend();
if (acknowledgementsMap.isEmpty()) {
- return Collections.emptyMap();
+ return Map.of();
} else {
ShareAcknowledgeSyncEvent event = new
ShareAcknowledgeSyncEvent(acknowledgementsMap,
calculateDeadlineMs(requestTimer));
applicationEventHandler.add(event);
@@ -908,7 +907,7 @@ public class ShareConsumerImpl<K, V> implements
ShareConsumerDelegate<K, V> {
*/
@Override
public Map<MetricName, ? extends Metric> metrics() {
- return Collections.unmodifiableMap(metrics.metrics());
+ return Map.copyOf(metrics.metrics());
}
/**
@@ -1180,7 +1179,7 @@ public class ShareConsumerImpl<K, V> implements
ShareConsumerDelegate<K, V> {
/**
* Initializes the acknowledgement mode based on the configuration.
*/
- private static ShareAcknowledgementMode
initializeAcknowledgementMode(ConsumerConfig config, Logger log) {
+ private static ShareAcknowledgementMode
initializeAcknowledgementMode(ConsumerConfig config) {
String s =
config.getString(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG);
return ShareAcknowledgementMode.fromString(s);
}
@@ -1223,8 +1222,6 @@ public class ShareConsumerImpl<K, V> implements
ShareConsumerDelegate<K, V> {
* It is possible that {@link ErrorEvent an error} could occur when
processing the events. In such
* cases, the processor will take a reference to the first error, continue
to process the remaining
* events, and then throw the first error that occurred.
- *
- * Visible for testing.
*/
boolean processBackgroundEvents() {
AtomicReference<KafkaException> firstError = new AtomicReference<>();
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetch.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetch.java
index 83a43cb69ae..a15231512fc 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetch.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetch.java
@@ -23,7 +23,6 @@ import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import java.time.Duration;
-import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
@@ -83,7 +82,7 @@ public class ShareFetch<K, V> {
public Map<TopicPartition, List<ConsumerRecord<K, V>>> records() {
final LinkedHashMap<TopicPartition, List<ConsumerRecord<K, V>>> result
= new LinkedHashMap<>();
batches.forEach((tip, batch) -> result.put(tip.topicPartition(),
batch.getInFlightRecords()));
- return Collections.unmodifiableMap(result);
+ return Map.copyOf(result);
}
/**
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetchCollector.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetchCollector.java
index 8b4a103e23d..109fe3da48c 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetchCollector.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetchCollector.java
@@ -27,7 +27,7 @@ import org.apache.kafka.common.utils.LogContext;
import org.slf4j.Logger;
-import java.util.Collections;
+import java.util.Set;
import static
org.apache.kafka.clients.consumer.internals.FetchUtils.requestMetadataUpdate;
@@ -172,7 +172,7 @@ public class ShareFetchCollector<K, V> {
} else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) {
// Log the actual partition and not just the topic to help with
ACL propagation issues in large clusters
log.warn("Not authorized to read from partition {}.",
tp.topicPartition());
- throw new
TopicAuthorizationException(Collections.singleton(tp.topic()));
+ throw new TopicAuthorizationException(Set.of(tp.topic()));
} else if (error == Errors.UNKNOWN_LEADER_EPOCH) {
log.debug("Received unknown leader epoch error in fetch for
partition {}.", tp);
} else if (error == Errors.UNKNOWN_SERVER_ERROR) {
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareSessionHandler.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareSessionHandler.java
index 0b6cdf0a6db..e4e16d1506f 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareSessionHandler.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareSessionHandler.java
@@ -35,13 +35,13 @@ import org.slf4j.Logger;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Set;
import java.util.stream.Collectors;
/**
@@ -95,7 +95,7 @@ public class ShareSessionHandler {
}
public Collection<TopicIdPartition> sessionPartitions() {
- return Collections.unmodifiableCollection(sessionPartitions.values());
+ return Set.copyOf(sessionPartitions.values());
}
public void addPartitionToFetch(TopicIdPartition topicIdPartition,
Acknowledgements partitionAcknowledgements) {
@@ -218,9 +218,9 @@ public class ShareSessionHandler {
}
}
- public ShareAcknowledgeRequest.Builder newShareAcknowledgeBuilder(String
groupId, ShareFetchConfig shareFetchConfig) {
+ public ShareAcknowledgeRequest.Builder newShareAcknowledgeBuilder(String
groupId) {
if (nextMetadata.isNewSession()) {
- // A share session cannot be started with a ShareAcknowledge
request
+ // A share session cannot be started with a ShareAcknowledge
request. The caller handles completing the acks.
nextPartitions.clear();
nextAcknowledgements.clear();
return null;
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ShareAcknowledgementEvent.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ShareAcknowledgementEvent.java
index ff7f7598f68..f9e47a32d8a 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ShareAcknowledgementEvent.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ShareAcknowledgementEvent.java
@@ -36,7 +36,7 @@ public class ShareAcknowledgementEvent {
public ShareAcknowledgementEvent(Map<TopicIdPartition, Acknowledgements>
acknowledgementsMap,
boolean checkForRenewAcknowledgements,
Optional<Integer>
acquisitionLockTimeoutMs) {
- this.acknowledgementsMap = acknowledgementsMap;
+ this.acknowledgementsMap = Map.copyOf(acknowledgementsMap);
this.checkForRenewAcknowledgements = checkForRenewAcknowledgements;
this.acquisitionLockTimeoutMs = acquisitionLockTimeoutMs;
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaShareConsumerMetricsTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaShareConsumerMetricsTest.java
index 3dbe0a01cbb..502cee9ae62 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaShareConsumerMetricsTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaShareConsumerMetricsTest.java
@@ -49,10 +49,11 @@ import
org.mockito.internal.stubbing.answers.CallsRealMethods;
import java.time.Duration;
import java.util.AbstractMap;
-import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -82,10 +83,10 @@ public class KafkaShareConsumerMetricsTest {
public void testPollTimeMetrics() {
ShareConsumerMetadata metadata = createMetadata(subscription);
MockClient client = new MockClient(time, metadata);
- initMetadata(client, Collections.singletonMap(topic, 1));
+ initMetadata(client, Map.of(topic, 1));
KafkaShareConsumer<String, String> consumer = newShareConsumer(time,
client, subscription, metadata);
- consumer.subscribe(Collections.singletonList(topic));
+ consumer.subscribe(Set.of(topic));
// MetricName objects to check
Metrics metrics = consumer.metricsRegistry();
MetricName lastPollSecondsAgoName =
metrics.metricName("last-poll-seconds-ago", CONSUMER_SHARE_METRIC_GROUP_PREFIX
+ "-metrics");
@@ -127,7 +128,7 @@ public class KafkaShareConsumerMetricsTest {
public void testPollIdleRatio() {
ShareConsumerMetadata metadata = createMetadata(subscription);
MockClient client = new MockClient(time, metadata);
- initMetadata(client, Collections.singletonMap(topic, 1));
+ initMetadata(client, Map.of(topic, 1));
KafkaShareConsumer<String, String> consumer = newShareConsumer(time,
client, subscription, metadata);
// MetricName object to check
@@ -165,7 +166,7 @@ public class KafkaShareConsumerMetricsTest {
}
private static boolean consumerMetricPresent(KafkaShareConsumer<String,
String> consumer, String name) {
- MetricName metricName = new MetricName(name,
CONSUMER_SHARE_METRIC_GROUP_PREFIX + "-metrics", "", Collections.emptyMap());
+ MetricName metricName = new MetricName(name,
CONSUMER_SHARE_METRIC_GROUP_PREFIX + "-metrics", "", Map.of());
return consumer.metricsRegistry().metrics().containsKey(metricName);
}
@@ -174,10 +175,10 @@ public class KafkaShareConsumerMetricsTest {
Time time = new MockTime(1L);
ShareConsumerMetadata metadata = createMetadata(subscription);
MockClient client = new MockClient(time, metadata);
- initMetadata(client, Collections.singletonMap(topic, 1));
+ initMetadata(client, Map.of(topic, 1));
KafkaShareConsumer<String, String> consumer = newShareConsumer(time,
client, subscription, metadata);
- consumer.subscribe(Collections.singletonList(topic));
+ consumer.subscribe(List.of(topic));
assertTrue(consumerMetricPresent(consumer, "last-poll-seconds-ago"));
assertTrue(consumerMetricPresent(consumer, "time-between-poll-avg"));
assertTrue(consumerMetricPresent(consumer, "time-between-poll-max"));
@@ -192,7 +193,7 @@ public class KafkaShareConsumerMetricsTest {
Time time = new MockTime(1L);
ShareConsumerMetadata metadata = createMetadata(subscription);
MockClient client = new MockClient(time, metadata);
- initMetadata(client, Collections.singletonMap(topic, 1));
+ initMetadata(client, Map.of(topic, 1));
KafkaShareConsumer<String, String> consumer = newShareConsumer(time,
client, subscription, metadata);
Map<MetricName, KafkaMetric> customMetrics = customMetrics();
@@ -209,7 +210,7 @@ public class KafkaShareConsumerMetricsTest {
Time time = new MockTime(1L);
ShareConsumerMetadata metadata = createMetadata(subscription);
MockClient client = new MockClient(time, metadata);
- initMetadata(client, Collections.singletonMap(topic, 1));
+ initMetadata(client, Map.of(topic, 1));
KafkaShareConsumer<String, String> consumer =
newShareConsumer(time, client, subscription, metadata);
KafkaMetric existingMetricToAdd = (KafkaMetric)
consumer.metrics().entrySet().iterator().next().getValue();
@@ -226,7 +227,7 @@ public class KafkaShareConsumerMetricsTest {
Time time = new MockTime(1L);
ShareConsumerMetadata metadata = createMetadata(subscription);
MockClient client = new MockClient(time, metadata);
- initMetadata(client, Collections.singletonMap(topic, 1));
+ initMetadata(client, Map.of(topic, 1));
KafkaShareConsumer<String, String> consumer =
newShareConsumer(time, client, subscription, metadata);
KafkaMetric existingMetricToRemove = (KafkaMetric)
consumer.metrics().entrySet().iterator().next().getValue();
@@ -246,7 +247,7 @@ public class KafkaShareConsumerMetricsTest {
Time time = new MockTime(1L);
ShareConsumerMetadata metadata = createMetadata(subscription);
MockClient client = new MockClient(time, metadata);
- initMetadata(client, Collections.singletonMap(topic, 1));
+ initMetadata(client, Map.of(topic, 1));
KafkaShareConsumer<String, String> consumer =
newShareConsumer(time, client, subscription, metadata);
@@ -267,7 +268,7 @@ public class KafkaShareConsumerMetricsTest {
Time time = new MockTime(1L);
ShareConsumerMetadata metadata = createMetadata(subscription);
MockClient client = new MockClient(time, metadata);
- initMetadata(client, Collections.singletonMap(topic, 1));
+ initMetadata(client, Map.of(topic, 1));
KafkaShareConsumer<String, String> consumer =
newShareConsumer(time, client, subscription, metadata);
@@ -282,7 +283,7 @@ public class KafkaShareConsumerMetricsTest {
Time time = new MockTime(1L);
ShareConsumerMetadata metadata = createMetadata(subscription);
MockClient client = new MockClient(time, metadata);
- initMetadata(client, Collections.singletonMap(topic, 1));
+ initMetadata(client, Map.of(topic, 1));
KafkaShareConsumer<String, String> consumer = newShareConsumer(time,
client, subscription, metadata);
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/MockShareConsumerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/MockShareConsumerTest.java
index 458da8ccc94..701547a5c1b 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/MockShareConsumerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/MockShareConsumerTest.java
@@ -22,9 +22,9 @@ import org.apache.kafka.common.record.TimestampType;
import org.junit.jupiter.api.Test;
import java.time.Duration;
-import java.util.Collections;
import java.util.Iterator;
import java.util.Optional;
+import java.util.Set;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -34,7 +34,7 @@ public class MockShareConsumerTest {
@Test
public void testSimpleMock() {
- consumer.subscribe(Collections.singleton("test"));
+ consumer.subscribe(Set.of("test"));
assertEquals(0, consumer.poll(Duration.ZERO).count());
ConsumerRecord<String, String> rec1 = new ConsumerRecord<>("test", 0,
0, 0L, TimestampType.CREATE_TIME,
0, 0, "key1", "value1", new RecordHeaders(), Optional.empty());
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AcknowledgementCommitCallbackHandlerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AcknowledgementCommitCallbackHandlerTest.java
index c6e10040d32..d06e495dde3 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AcknowledgementCommitCallbackHandlerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AcknowledgementCommitCallbackHandlerTest.java
@@ -29,7 +29,6 @@ import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.LinkedList;
@@ -72,7 +71,7 @@ class AcknowledgementCommitCallbackHandlerTest {
acknowledgements.add(1L, AcknowledgeType.REJECT);
acknowledgementsMap.put(tip0, acknowledgements);
-
acknowledgementCommitCallbackHandler.onComplete(Collections.singletonList(acknowledgementsMap));
+
acknowledgementCommitCallbackHandler.onComplete(List.of(acknowledgementsMap));
TestUtils.retryOnExceptionWithTimeout(() -> {
assertNull(exceptionMap.get(tpo00));
@@ -88,7 +87,7 @@ class AcknowledgementCommitCallbackHandlerTest {
acknowledgements.complete(Errors.INVALID_RECORD_STATE.exception());
acknowledgementsMap.put(tip0, acknowledgements);
-
acknowledgementCommitCallbackHandler.onComplete(Collections.singletonList(acknowledgementsMap));
+
acknowledgementCommitCallbackHandler.onComplete(List.of(acknowledgementsMap));
TestUtils.retryOnExceptionWithTimeout(() -> {
assertInstanceOf(InvalidRecordStateException.class,
exceptionMap.get(tpo00));
assertInstanceOf(InvalidRecordStateException.class,
exceptionMap.get(tpo01));
@@ -104,7 +103,7 @@ class AcknowledgementCommitCallbackHandlerTest {
acknowledgements.complete(Errors.TOPIC_AUTHORIZATION_FAILED.exception());
acknowledgementsMap.put(tip0, acknowledgements);
-
acknowledgementCommitCallbackHandler.onComplete(Collections.singletonList(acknowledgementsMap));
+
acknowledgementCommitCallbackHandler.onComplete(List.of(acknowledgementsMap));
TestUtils.retryOnExceptionWithTimeout(() -> {
assertInstanceOf(TopicAuthorizationException.class,
exceptionMap.get(tpo00));
assertInstanceOf(TopicAuthorizationException.class,
exceptionMap.get(tpo01));
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
index fe249d72895..f3b2063c136 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
@@ -90,7 +90,6 @@ import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -109,8 +108,6 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;
-import static java.util.Collections.singleton;
-import static java.util.Collections.singletonList;
import static
org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
import static
org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
import static
org.apache.kafka.clients.consumer.internals.events.CompletableEvent.calculateDeadlineMs;
@@ -193,7 +190,7 @@ public class ShareConsumeRequestManagerTest {
// A dummy metadata update to ensure valid leader epoch.
metadata.updateWithCurrentRequestVersion(RequestTestUtils.metadataUpdateWithIds("kafka-cluster",
1,
- Collections.emptyMap(), topicPartitionCounts,
+ Map.of(), topicPartitionCounts,
tp -> validLeaderEpoch, topicIds), false, 0L);
}
@@ -213,7 +210,7 @@ public class ShareConsumeRequestManagerTest {
public void testFetchNormal() {
buildRequestManager();
- assignFromSubscribed(Collections.singleton(tp0));
+ assignFromSubscribed(Set.of(tp0));
sendFetchAndVerifyResponse(records, acquiredRecords, Errors.NONE);
Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>>
partitionRecords = fetchRecords();
@@ -227,7 +224,7 @@ public class ShareConsumeRequestManagerTest {
public void testFetchWithAcquiredRecords() {
buildRequestManager();
- assignFromSubscribed(Collections.singleton(tp0));
+ assignFromSubscribed(Set.of(tp0));
sendFetchAndVerifyResponse(records,
ShareCompletedFetchTest.acquiredRecords(1L, 1), Errors.NONE);
Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>>
partitionRecords = fetchRecords();
@@ -243,7 +240,7 @@ public class ShareConsumeRequestManagerTest {
buildRequestManager();
// Enabling the config so that background event is sent when the
acknowledgement response is received.
shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
- assignFromSubscribed(Collections.singleton(tp0));
+ assignFromSubscribed(Set.of(tp0));
sendFetchAndVerifyResponse(records,
ShareCompletedFetchTest.acquiredRecords(1L, 1), Errors.NONE);
@@ -272,7 +269,7 @@ public class ShareConsumeRequestManagerTest {
shareConsumeRequestManager.fetch(Map.of(tip0, new
NodeAcknowledgements(0, acknowledgements2)));
// Preparing a response with an acknowledgement error.
- sendFetchAndVerifyResponse(records, Collections.emptyList(),
Errors.NONE, Errors.INVALID_RECORD_STATE);
+ sendFetchAndVerifyResponse(records, List.of(), Errors.NONE,
Errors.INVALID_RECORD_STATE);
assertEquals(2.0,
metrics.metrics().get(metrics.metricInstance(shareFetchMetricsRegistry.acknowledgementSendTotal)).metricValue());
@@ -290,7 +287,7 @@ public class ShareConsumeRequestManagerTest {
// Enabling the config so that background event is sent when the
acknowledgement response is received.
shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
- assignFromSubscribed(Collections.singleton(tp0));
+ assignFromSubscribed(Set.of(tp0));
sendFetchAndVerifyResponse(records, acquiredRecords, Errors.NONE);
Acknowledgements acknowledgements = getAcknowledgements(1,
AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT, AcknowledgeType.REJECT);
@@ -313,7 +310,7 @@ public class ShareConsumeRequestManagerTest {
// Enabling the config so that background event is sent when the
acknowledgement response is received.
shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
- assignFromSubscribed(Collections.singleton(tp0));
+ assignFromSubscribed(Set.of(tp0));
sendFetchAndVerifyResponse(records, acquiredRecords, Errors.NONE);
Acknowledgements acknowledgements = getAcknowledgements(1,
AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT, AcknowledgeType.REJECT);
@@ -336,7 +333,7 @@ public class ShareConsumeRequestManagerTest {
// Enabling the config so that background event is sent when the
acknowledgement response is received.
shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
- assignFromSubscribed(Collections.singleton(tp0));
+ assignFromSubscribed(Set.of(tp0));
sendFetchAndVerifyResponse(records, acquiredRecords, Errors.NONE);
fetchRecords();
@@ -389,7 +386,7 @@ public class ShareConsumeRequestManagerTest {
// Enabling the config so that background event is sent when the
acknowledgement response is received.
shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
- assignFromSubscribed(Collections.singleton(tp0));
+ assignFromSubscribed(Set.of(tp0));
sendFetchAndVerifyResponse(records, acquiredRecords, Errors.NONE);
Acknowledgements acknowledgements = Acknowledgements.empty();
@@ -424,7 +421,7 @@ public class ShareConsumeRequestManagerTest {
@Test
public void testCloseFutureCompletedWhenMemberIdIsNull() {
buildRequestManager(new MetricConfig(), new ByteArrayDeserializer(),
new ByteArrayDeserializer(), null, ShareAcquireMode.BATCH_OPTIMIZED);
- assignFromSubscribed(Collections.singleton(tp0));
+ assignFromSubscribed(Set.of(tp0));
CompletableFuture<Void> closeFuture =
shareConsumeRequestManager.acknowledgeOnClose(Map.of(),
calculateDeadlineMs(time.timer(100)));
@@ -442,14 +439,14 @@ public class ShareConsumeRequestManagerTest {
// Enabling the config so that background event is sent when the
acknowledgement response is received.
shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
- assignFromSubscribed(Collections.singleton(tp0));
+ assignFromSubscribed(Set.of(tp0));
sendFetchAndVerifyResponse(records, acquiredRecords, Errors.NONE);
Acknowledgements acknowledgements = getAcknowledgements(1,
AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT, AcknowledgeType.REJECT);
shareConsumeRequestManager.commitAsync(Map.of(tip0, new
NodeAcknowledgements(0, acknowledgements)),
calculateDeadlineMs(time.timer(defaultApiTimeoutMs)));
- shareConsumeRequestManager.acknowledgeOnClose(Collections.emptyMap(),
+ shareConsumeRequestManager.acknowledgeOnClose(Map.of(),
calculateDeadlineMs(time.timer(100)));
assertEquals(1, shareConsumeRequestManager.sendAcknowledgements());
@@ -470,14 +467,14 @@ public class ShareConsumeRequestManagerTest {
// Enabling the config so that background event is sent when the
acknowledgement response is received.
shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
- assignFromSubscribed(Collections.singleton(tp0));
+ assignFromSubscribed(Set.of(tp0));
sendFetchAndVerifyResponse(records, acquiredRecords, Errors.NONE);
Acknowledgements acknowledgements = getAcknowledgements(1,
AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT, AcknowledgeType.REJECT);
shareConsumeRequestManager.commitSync(Map.of(tip0, new
NodeAcknowledgements(0, acknowledgements)),
calculateDeadlineMs(time.timer(100)));
- shareConsumeRequestManager.acknowledgeOnClose(Collections.emptyMap(),
+ shareConsumeRequestManager.acknowledgeOnClose(Map.of(),
calculateDeadlineMs(time.timer(100)));
assertEquals(1, shareConsumeRequestManager.sendAcknowledgements());
@@ -573,7 +570,7 @@ public class ShareConsumeRequestManagerTest {
public void testBatchingAcknowledgeRequestStates() {
buildRequestManager();
- assignFromSubscribed(Collections.singleton(tp0));
+ assignFromSubscribed(Set.of(tp0));
sendFetchAndVerifyResponse(buildRecords(1L, 6, 1),
ShareCompletedFetchTest.acquiredRecords(1L, 6), Errors.NONE);
@@ -605,7 +602,7 @@ public class ShareConsumeRequestManagerTest {
public void testPendingCommitAsyncBeforeCommitSync() {
buildRequestManager();
- assignFromSubscribed(Collections.singleton(tp0));
+ assignFromSubscribed(Set.of(tp0));
sendFetchAndVerifyResponse(buildRecords(1L, 6, 1),
ShareCompletedFetchTest.acquiredRecords(1L, 6), Errors.NONE);
@@ -649,7 +646,7 @@ public class ShareConsumeRequestManagerTest {
public void testRetryAcknowledgements() throws InterruptedException {
buildRequestManager();
- assignFromSubscribed(Collections.singleton(tp0));
+ assignFromSubscribed(Set.of(tp0));
sendFetchAndVerifyResponse(buildRecords(1L, 6, 1),
ShareCompletedFetchTest.acquiredRecords(1L, 6), Errors.NONE);
@@ -691,7 +688,7 @@ public class ShareConsumeRequestManagerTest {
buildRequestManager();
shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
- assignFromSubscribed(Collections.singleton(tp0));
+ assignFromSubscribed(Set.of(tp0));
sendFetchAndVerifyResponse(records, acquiredRecords, Errors.NONE);
Acknowledgements acknowledgements = getAcknowledgements(1,
AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT, AcknowledgeType.REJECT);
@@ -715,7 +712,7 @@ public class ShareConsumeRequestManagerTest {
buildRequestManager();
shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
- assignFromSubscribed(Collections.singleton(tp0));
+ assignFromSubscribed(Set.of(tp0));
sendFetchAndVerifyResponse(buildRecords(1L, 6, 1),
ShareCompletedFetchTest.acquiredRecords(1L, 6), Errors.NONE);
@@ -771,7 +768,7 @@ public class ShareConsumeRequestManagerTest {
buildRequestManager();
shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
- assignFromSubscribed(Collections.singleton(tp0));
+ assignFromSubscribed(Set.of(tp0));
sendFetchAndVerifyResponse(buildRecords(1L, 6, 1),
ShareCompletedFetchTest.acquiredRecords(1L, 6), Errors.NONE);
@@ -814,7 +811,7 @@ public class ShareConsumeRequestManagerTest {
public void testPiggybackAcknowledgementsInFlight() {
buildRequestManager();
- assignFromSubscribed(Collections.singleton(tp0));
+ assignFromSubscribed(Set.of(tp0));
sendFetchAndVerifyResponse(records, acquiredRecords, Errors.NONE);
Acknowledgements acknowledgements = getAcknowledgements(1,
@@ -853,7 +850,7 @@ public class ShareConsumeRequestManagerTest {
buildRequestManager();
shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
- assignFromSubscribed(Collections.singleton(tp0));
+ assignFromSubscribed(Set.of(tp0));
sendFetchAndVerifyResponse(records, acquiredRecords, Errors.NONE);
fetchRecords();
@@ -898,13 +895,13 @@ public class ShareConsumeRequestManagerTest {
buildRequestManager();
shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
- assignFromSubscribed(singleton(tp0));
+ assignFromSubscribed(Set.of(tp0));
sendFetchAndVerifyResponse(records, acquiredRecords, Errors.NONE);
Acknowledgements acknowledgements = getAcknowledgements(1,
AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT, AcknowledgeType.REJECT);
- subscriptions.subscribeToShareGroup(Collections.singleton(topicName2));
- subscriptions.assignFromSubscribed(Collections.singleton(t2p0));
+ subscriptions.subscribeToShareGroup(Set.of(topicName2));
+ subscriptions.assignFromSubscribed(Set.of(t2p0));
client.updateMetadata(
RequestTestUtils.metadataUpdateWithIds(1, Map.of(topicName2,
1),
@@ -934,13 +931,13 @@ public class ShareConsumeRequestManagerTest {
buildRequestManager();
shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
- assignFromSubscribed(singleton(tp0));
+ assignFromSubscribed(Set.of(tp0));
sendFetchAndVerifyResponse(records, acquiredRecords, Errors.NONE);
Acknowledgements acknowledgements = getAcknowledgements(1,
AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT, AcknowledgeType.REJECT);
- subscriptions.subscribeToShareGroup(Collections.singleton(topicName2));
- subscriptions.assignFromSubscribed(Collections.singleton(t2p0));
+ subscriptions.subscribeToShareGroup(Set.of(topicName2));
+ subscriptions.assignFromSubscribed(Set.of(t2p0));
client.updateMetadata(
RequestTestUtils.metadataUpdateWithIds(1, Map.of(topicName2,
1),
@@ -970,13 +967,13 @@ public class ShareConsumeRequestManagerTest {
buildRequestManager();
shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
- assignFromSubscribed(singleton(tp0));
+ assignFromSubscribed(Set.of(tp0));
sendFetchAndVerifyResponse(records, acquiredRecords, Errors.NONE);
Acknowledgements acknowledgements = getAcknowledgements(1,
AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT, AcknowledgeType.REJECT);
- subscriptions.subscribeToShareGroup(Collections.singleton(topicName2));
- subscriptions.assignFromSubscribed(Collections.singleton(t2p0));
+ subscriptions.subscribeToShareGroup(Set.of(topicName2));
+ subscriptions.assignFromSubscribed(Set.of(t2p0));
client.updateMetadata(
RequestTestUtils.metadataUpdateWithIds(1, Map.of(topicName2,
1),
@@ -1001,7 +998,7 @@ public class ShareConsumeRequestManagerTest {
public void testShareFetchWithSubscriptionChange() {
buildRequestManager();
- assignFromSubscribed(singleton(tp0));
+ assignFromSubscribed(Set.of(tp0));
sendFetchAndVerifyResponse(records, acquiredRecords, Errors.NONE);
Acknowledgements acknowledgements = getAcknowledgements(1,
AcknowledgeType.ACCEPT, AcknowledgeType.RELEASE, AcknowledgeType.ACCEPT);
@@ -1011,8 +1008,8 @@ public class ShareConsumeRequestManagerTest {
fetchRecords();
// Subscription changes.
- subscriptions.subscribeToShareGroup(Collections.singleton(topicName2));
- subscriptions.assignFromSubscribed(Collections.singleton(t2p0));
+ subscriptions.subscribeToShareGroup(Set.of(topicName2));
+ subscriptions.assignFromSubscribed(Set.of(t2p0));
client.updateMetadata(
RequestTestUtils.metadataUpdateWithIds(1, Map.of(topicName2,
1),
@@ -1028,8 +1025,8 @@ public class ShareConsumeRequestManagerTest {
public void testShareFetchWithSubscriptionChangeMultipleNodes() {
buildRequestManager();
- subscriptions.subscribeToShareGroup(Collections.singleton(topicName));
- subscriptions.assignFromSubscribed(Collections.singletonList(tp0));
+ subscriptions.subscribeToShareGroup(Set.of(topicName));
+ subscriptions.assignFromSubscribed(List.of(tp0));
client.updateMetadata(
RequestTestUtils.metadataUpdateWithIds(2, Map.of(topicName, 2),
@@ -1051,7 +1048,7 @@ public class ShareConsumeRequestManagerTest {
fetchRecords();
// Subscription changes.
- subscriptions.assignFromSubscribed(Collections.singletonList(tp1));
+ subscriptions.assignFromSubscribed(List.of(tp1));
NetworkClientDelegate.PollResult pollResult =
shareConsumeRequestManager.sendFetchesReturnPollResult();
assertEquals(2, pollResult.unsentRequests.size());
@@ -1096,8 +1093,8 @@ public class ShareConsumeRequestManagerTest {
public void
testShareFetchWithSubscriptionChangeMultipleNodesEmptyAcknowledgements() {
buildRequestManager();
- subscriptions.subscribeToShareGroup(Collections.singleton(topicName));
- subscriptions.assignFromSubscribed(Collections.singletonList(tp0));
+ subscriptions.subscribeToShareGroup(Set.of(topicName));
+ subscriptions.assignFromSubscribed(List.of(tp0));
client.updateMetadata(
RequestTestUtils.metadataUpdateWithIds(2, Map.of(topicName, 2),
@@ -1116,7 +1113,7 @@ public class ShareConsumeRequestManagerTest {
fetchRecords();
// Change the subscription.
- subscriptions.assignFromSubscribed(Collections.singletonList(tp1));
+ subscriptions.assignFromSubscribed(List.of(tp1));
// Now we will be sending the request to node1 only as leader for tip1
is node1.
// We do not build the request for tip0 as there are no
acknowledgements to send.
@@ -1139,7 +1136,7 @@ public class ShareConsumeRequestManagerTest {
buildRequestManager();
shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
- subscriptions.subscribeToShareGroup(Collections.singleton(topicName));
+ subscriptions.subscribeToShareGroup(Set.of(topicName));
subscriptions.assignFromSubscribed(List.of(tp0, tp1));
client.updateMetadata(
@@ -1180,7 +1177,7 @@ public class ShareConsumeRequestManagerTest {
public void testRetryAcknowledgementsWithLeaderChange() {
buildRequestManager();
- subscriptions.subscribeToShareGroup(Collections.singleton(topicName));
+ subscriptions.subscribeToShareGroup(Set.of(topicName));
Set<TopicPartition> partitions = new HashSet<>();
partitions.add(tp0);
subscriptions.assignFromSubscribed(partitions);
@@ -1224,7 +1221,7 @@ public class ShareConsumeRequestManagerTest {
buildRequestManager();
shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
- assignFromSubscribed(Collections.singleton(tp0));
+ assignFromSubscribed(Set.of(tp0));
sendFetchAndVerifyResponse(records, acquiredRecords, Errors.NONE);
@@ -1282,7 +1279,7 @@ public class ShareConsumeRequestManagerTest {
LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData>
partitionDataMap =
buildPartitionDataMap(tip0, records, acquiredRecords,
Errors.NONE, Errors.NONE);
partitionDataMap.put(t2ip0, partitionDataForFetch(t2ip0, records,
acquiredRecords, Errors.NONE, Errors.NONE));
- client.prepareResponse(ShareFetchResponse.of(Errors.NONE, 0,
partitionDataMap, Collections.emptyList(), 0));
+ client.prepareResponse(ShareFetchResponse.of(Errors.NONE, 0,
partitionDataMap, List.of(), 0));
networkClientDelegate.poll(time.timer(0));
assertTrue(shareConsumeRequestManager.hasCompletedFetches());
@@ -1328,7 +1325,7 @@ public class ShareConsumeRequestManagerTest {
LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData>
partitionDataMap =
buildPartitionDataMap(tip0, records, acquiredRecords,
Errors.NONE, Errors.NONE);
partitionDataMap.put(t2ip0, partitionDataForFetch(t2ip0, records,
emptyAcquiredRecords, Errors.TOPIC_AUTHORIZATION_FAILED, Errors.NONE));
- client.prepareResponse(ShareFetchResponse.of(Errors.NONE, 0,
partitionDataMap, Collections.emptyList(), 0));
+ client.prepareResponse(ShareFetchResponse.of(Errors.NONE, 0,
partitionDataMap, List.of(), 0));
networkClientDelegate.poll(time.timer(0));
assertTrue(shareConsumeRequestManager.hasCompletedFetches());
@@ -1357,7 +1354,7 @@ public class ShareConsumeRequestManagerTest {
LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData>
partitionDataMap =
buildPartitionDataMap(t2ip0, records, emptyAcquiredRecords,
Errors.TOPIC_AUTHORIZATION_FAILED, Errors.NONE);
partitionDataMap.put(tip0, partitionDataForFetch(tip0, records,
acquiredRecords, Errors.NONE, Errors.NONE));
- client.prepareResponse(ShareFetchResponse.of(Errors.NONE, 0,
partitionDataMap, Collections.emptyList(), 0));
+ client.prepareResponse(ShareFetchResponse.of(Errors.NONE, 0,
partitionDataMap, List.of(), 0));
networkClientDelegate.poll(time.timer(0));
assertTrue(shareConsumeRequestManager.hasCompletedFetches());
@@ -1379,8 +1376,8 @@ public class ShareConsumeRequestManagerTest {
buildRequestManager();
shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
- subscriptions.subscribeToShareGroup(Collections.singleton(topicName));
- subscriptions.assignFromSubscribed(Collections.singleton(tp0));
+ subscriptions.subscribeToShareGroup(Set.of(topicName));
+ subscriptions.assignFromSubscribed(Set.of(tp0));
client.updateMetadata(
RequestTestUtils.metadataUpdateWithIds(1, Map.of(topicName, 1),
@@ -1399,8 +1396,8 @@ public class ShareConsumeRequestManagerTest {
buildRequestManager();
shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
- subscriptions.subscribeToShareGroup(Collections.singleton(topicName));
- subscriptions.assignFromSubscribed(Collections.singleton(tp0));
+ subscriptions.subscribeToShareGroup(Set.of(topicName));
+ subscriptions.assignFromSubscribed(Set.of(tp0));
client.updateMetadata(
RequestTestUtils.metadataUpdateWithIds(1, Map.of(topicName, 1),
@@ -1467,7 +1464,7 @@ public class ShareConsumeRequestManagerTest {
public void testFetchError() {
buildRequestManager();
- assignFromSubscribed(singleton(tp0));
+ assignFromSubscribed(Set.of(tp0));
sendFetchAndVerifyResponse(records, emptyAcquiredRecords,
Errors.NOT_LEADER_OR_FOLLOWER);
Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>>
partitionRecords = fetchRecords();
@@ -1479,7 +1476,7 @@ public class ShareConsumeRequestManagerTest {
buildRequestManager();
shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
- assignFromSubscribed(singleton(tp0));
+ assignFromSubscribed(Set.of(tp0));
Acknowledgements acknowledgements = getAcknowledgements(1,
AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT, AcknowledgeType.REJECT);
@@ -1501,7 +1498,7 @@ public class ShareConsumeRequestManagerTest {
buildRequestManager();
shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
- assignFromSubscribed(singleton(tp0));
+ assignFromSubscribed(Set.of(tp0));
sendFetchAndVerifyResponse(records, acquiredRecords, Errors.NONE);
fetchRecords();
@@ -1514,7 +1511,7 @@ public class ShareConsumeRequestManagerTest {
// Simulate a metadata update with no topics in the response.
client.updateMetadata(
- RequestTestUtils.metadataUpdateWithIds(1,
Collections.emptyMap(),
+ RequestTestUtils.metadataUpdateWithIds(1, Map.of(),
tp -> validLeaderEpoch, null, false));
// The acknowledgements for the initial fetch from tip0 are processed
now and sent to the background thread.
@@ -1537,7 +1534,7 @@ public class ShareConsumeRequestManagerTest {
buildRequestManager();
shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
- assignFromSubscribed(singleton(tp0));
+ assignFromSubscribed(Set.of(tp0));
sendFetchAndVerifyResponse(records, acquiredRecords, Errors.NONE);
fetchRecords();
@@ -1586,7 +1583,7 @@ public class ShareConsumeRequestManagerTest {
buffer.put("beef".getBytes());
buffer.position(0);
- assignFromSubscribed(singleton(tp0));
+ assignFromSubscribed(Set.of(tp0));
// normal fetch
assertEquals(1, sendFetches());
@@ -1617,7 +1614,7 @@ public class ShareConsumeRequestManagerTest {
// flip some bits to fail the crc
buffer.putInt(32, buffer.get(32) ^ 87238423);
- assignFromSubscribed(singleton(tp0));
+ assignFromSubscribed(Set.of(tp0));
// normal fetch
assertEquals(1, sendFetches());
@@ -1649,7 +1646,7 @@ public class ShareConsumeRequestManagerTest {
MemoryRecords memoryRecords = builder.build();
List<ConsumerRecord<byte[], byte[]>> records;
- assignFromSubscribed(singleton(tp0));
+ assignFromSubscribed(Set.of(tp0));
client.prepareResponse(fullFetchResponse(tip0,
memoryRecords,
@@ -1681,7 +1678,7 @@ public class ShareConsumeRequestManagerTest {
public void testUnauthorizedTopic() {
buildRequestManager();
- assignFromSubscribed(singleton(tp0));
+ assignFromSubscribed(Set.of(tp0));
assertEquals(1, sendFetches());
client.prepareResponse(fullFetchResponse(tip0, records,
emptyAcquiredRecords, Errors.TOPIC_AUTHORIZATION_FAILED));
@@ -1690,14 +1687,14 @@ public class ShareConsumeRequestManagerTest {
collectFetch();
fail("collectFetch should have thrown a
TopicAuthorizationException");
} catch (TopicAuthorizationException e) {
- assertEquals(singleton(topicName), e.unauthorizedTopics());
+ assertEquals(Set.of(topicName), e.unauthorizedTopics());
}
}
@Test
public void testUnknownTopicIdError() {
buildRequestManager();
- assignFromSubscribed(singleton(tp0));
+ assignFromSubscribed(Set.of(tp0));
assertEquals(1, sendFetches());
client.prepareResponse(fetchResponseWithTopLevelError(tip0,
Errors.UNKNOWN_TOPIC_ID));
@@ -1712,7 +1709,7 @@ public class ShareConsumeRequestManagerTest {
boolean hasTopLevelError,
boolean
shouldRequestMetadataUpdate) {
buildRequestManager();
- assignFromSubscribed(singleton(tp0));
+ assignFromSubscribed(Set.of(tp0));
assertEquals(1, sendFetches());
@@ -1754,7 +1751,7 @@ public class ShareConsumeRequestManagerTest {
public void testFetchDisconnected() {
buildRequestManager();
- assignFromSubscribed(singleton(tp0));
+ assignFromSubscribed(Set.of(tp0));
assertEquals(1, sendFetches());
client.prepareResponse(fullFetchResponse(tip0, records,
acquiredRecords, Errors.NONE), true);
@@ -1787,7 +1784,7 @@ public class ShareConsumeRequestManagerTest {
result.outputBuffer().flip();
MemoryRecords compactedRecords =
MemoryRecords.readableRecords(result.outputBuffer());
- assignFromSubscribed(singleton(tp0));
+ assignFromSubscribed(Set.of(tp0));
assertEquals(1, sendFetches());
client.prepareResponse(fullFetchResponse(tip0,
compactedRecords,
@@ -1816,7 +1813,7 @@ public class ShareConsumeRequestManagerTest {
@Test
public void testCorruptMessageError() {
buildRequestManager();
- assignFromSubscribed(singleton(tp0));
+ assignFromSubscribed(Set.of(tp0));
assertEquals(1, sendFetches());
assertFalse(shareConsumeRequestManager.hasCompletedFetches());
@@ -1843,7 +1840,7 @@ public class ShareConsumeRequestManagerTest {
public void
testWhenShareFetchResponseReturnsALeadershipChangeErrorButNoNewLeaderInformation(Errors
error) {
buildRequestManager();
- subscriptions.subscribeToShareGroup(Collections.singleton(topicName));
+ subscriptions.subscribeToShareGroup(Set.of(topicName));
Set<TopicPartition> partitions = new HashSet<>();
partitions.add(tp0);
partitions.add(tp1);
@@ -1865,13 +1862,13 @@ public class ShareConsumeRequestManagerTest {
LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData>
partitionData =
buildPartitionDataMap(tip0, records,
ShareCompletedFetchTest.acquiredRecords(1L, 1), Errors.NONE, Errors.NONE);
- client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0,
partitionData, Collections.emptyList(), 0), nodeId0);
+ client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0,
partitionData, List.of(), 0), nodeId0);
partitionData.clear();
partitionData.put(tip1,
new ShareFetchResponseData.PartitionData()
.setPartitionIndex(tip1.topicPartition().partition())
.setErrorCode(error.code()));
- client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0,
partitionData, Collections.emptyList(), 0), nodeId1);
+ client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0,
partitionData, List.of(), 0), nodeId1);
networkClientDelegate.poll(time.timer(0));
assertTrue(shareConsumeRequestManager.hasCompletedFetches());
@@ -1908,7 +1905,7 @@ public class ShareConsumeRequestManagerTest {
.setRecords(records)
.setAcquiredRecords(ShareCompletedFetchTest.acquiredRecords(1L, 1))
.setAcknowledgeErrorCode(Errors.NONE.code()));
- client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0,
partitionData, Collections.emptyList(), 0), nodeId0);
+ client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0,
partitionData, List.of(), 0), nodeId0);
networkClientDelegate.poll(time.timer(0));
assertTrue(shareConsumeRequestManager.hasCompletedFetches());
@@ -1931,7 +1928,7 @@ public class ShareConsumeRequestManagerTest {
public void
testWhenFetchResponseReturnsWithALeadershipChangeErrorAndNewLeaderInformation(Errors
error) {
buildRequestManager();
- subscriptions.subscribeToShareGroup(Collections.singleton(topicName));
+ subscriptions.subscribeToShareGroup(Set.of(topicName));
Set<TopicPartition> partitions = new HashSet<>();
partitions.add(tp0);
partitions.add(tp1);
@@ -1952,7 +1949,7 @@ public class ShareConsumeRequestManagerTest {
LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData>
partitionData =
buildPartitionDataMap(tip0, records,
ShareCompletedFetchTest.acquiredRecords(1L, 1), Errors.NONE, Errors.NONE);
- client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0,
partitionData, Collections.emptyList(), 0), nodeId0);
+ client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0,
partitionData, List.of(), 0), nodeId0);
partitionData = new LinkedHashMap<>();
partitionData.put(tip1,
new ShareFetchResponseData.PartitionData()
@@ -1961,7 +1958,7 @@ public class ShareConsumeRequestManagerTest {
.setCurrentLeader(new ShareFetchResponseData.LeaderIdAndEpoch()
.setLeaderId(tp0Leader.id())
.setLeaderEpoch(validLeaderEpoch + 1)));
- client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0,
partitionData, singletonList(tp0Leader), 0), nodeId1);
+ client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0,
partitionData, List.of(tp0Leader), 0), nodeId1);
networkClientDelegate.poll(time.timer(0));
assertTrue(shareConsumeRequestManager.hasCompletedFetches());
@@ -1993,7 +1990,7 @@ public class ShareConsumeRequestManagerTest {
.setRecords(records)
.setAcquiredRecords(ShareCompletedFetchTest.acquiredRecords(1L, 1))
.setAcknowledgeErrorCode(Errors.NONE.code()));
- client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0,
partitionData, Collections.emptyList(), 0), nodeId0);
+ client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0,
partitionData, List.of(), 0), nodeId0);
networkClientDelegate.poll(time.timer(0));
assertTrue(shareConsumeRequestManager.hasCompletedFetches());
@@ -2017,7 +2014,7 @@ public class ShareConsumeRequestManagerTest {
buildRequestManager();
shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
- subscriptions.subscribeToShareGroup(Collections.singleton(topicName));
+ subscriptions.subscribeToShareGroup(Set.of(topicName));
Set<TopicPartition> partitions = new HashSet<>();
partitions.add(tp0);
partitions.add(tp1);
@@ -2037,13 +2034,13 @@ public class ShareConsumeRequestManagerTest {
LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData>
partitionData =
buildPartitionDataMap(tip0, records,
ShareCompletedFetchTest.acquiredRecords(1L, 1), Errors.NONE, Errors.NONE);
- client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0,
partitionData, Collections.emptyList(), 0), nodeId0);
+ client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0,
partitionData, List.of(), 0), nodeId0);
partitionData = new LinkedHashMap<>();
partitionData.put(tip1,
new ShareFetchResponseData.PartitionData()
.setPartitionIndex(tip1.topicPartition().partition())
.setErrorCode(Errors.NONE.code()));
- client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0,
partitionData, Collections.emptyList(), 0), nodeId1);
+ client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0,
partitionData, List.of(), 0), nodeId1);
networkClientDelegate.poll(time.timer(0));
assertTrue(shareConsumeRequestManager.hasCompletedFetches());
@@ -2078,7 +2075,7 @@ public class ShareConsumeRequestManagerTest {
.setPartitionIndex(tip0.topicPartition().partition())
.setErrorCode(Errors.NONE.code())
.setAcknowledgeErrorCode(error.code()));
- client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0,
partitionData, Collections.emptyList(), 0), nodeId0);
+ client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0,
partitionData, List.of(), 0), nodeId0);
partitionData = buildPartitionDataMap(tip0, records,
ShareCompletedFetchTest.acquiredRecords(1L, 1), Errors.NONE, Errors.NONE);
partitionData.put(tip1,
new ShareFetchResponseData.PartitionData()
@@ -2086,7 +2083,7 @@ public class ShareConsumeRequestManagerTest {
.setRecords(records)
.setAcquiredRecords(ShareCompletedFetchTest.acquiredRecords(1L, 1))
.setAcknowledgeErrorCode(Errors.NONE.code()));
- client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0,
partitionData, Collections.emptyList(), 0), nodeId1);
+ client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0,
partitionData, List.of(), 0), nodeId1);
networkClientDelegate.poll(time.timer(0));
assertTrue(shareConsumeRequestManager.hasCompletedFetches());
@@ -2105,7 +2102,7 @@ public class ShareConsumeRequestManagerTest {
buildRequestManager();
shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
- subscriptions.subscribeToShareGroup(Collections.singleton(topicName));
+ subscriptions.subscribeToShareGroup(Set.of(topicName));
Set<TopicPartition> partitions = new HashSet<>();
partitions.add(tp0);
partitions.add(tp1);
@@ -2125,9 +2122,9 @@ public class ShareConsumeRequestManagerTest {
LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData>
partitionData =
buildPartitionDataMap(tip0, records,
ShareCompletedFetchTest.acquiredRecords(1L, 1), Errors.NONE, Errors.NONE);
- client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0,
partitionData, Collections.emptyList(), 0), nodeId0);
+ client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0,
partitionData, List.of(), 0), nodeId0);
partitionData = buildPartitionDataMap(tip1, records,
ShareCompletedFetchTest.acquiredRecords(1L, 2), Errors.NONE, Errors.NONE);
- client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0,
partitionData, Collections.emptyList(), 0), nodeId1);
+ client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0,
partitionData, List.of(), 0), nodeId1);
networkClientDelegate.poll(time.timer(0));
assertTrue(shareConsumeRequestManager.hasCompletedFetches());
@@ -2178,7 +2175,7 @@ public class ShareConsumeRequestManagerTest {
buildRequestManager();
shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
- subscriptions.subscribeToShareGroup(Collections.singleton(topicName));
+ subscriptions.subscribeToShareGroup(Set.of(topicName));
subscriptions.assignFromSubscribed(List.of(tp0, tp1));
client.updateMetadata(
@@ -2195,9 +2192,9 @@ public class ShareConsumeRequestManagerTest {
LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData>
partitionData =
buildPartitionDataMap(tip0, records,
ShareCompletedFetchTest.acquiredRecords(1L, 1), Errors.NONE, Errors.NONE);
- client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0,
partitionData, Collections.emptyList(), 0), nodeId0);
+ client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0,
partitionData, List.of(), 0), nodeId0);
partitionData = buildPartitionDataMap(tip1, records,
ShareCompletedFetchTest.acquiredRecords(1L, 2), Errors.NONE, Errors.NONE);
- client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0,
partitionData, Collections.emptyList(), 0), nodeId1);
+ client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0,
partitionData, List.of(), 0), nodeId1);
networkClientDelegate.poll(time.timer(0));
assertTrue(shareConsumeRequestManager.hasCompletedFetches());
@@ -2250,7 +2247,7 @@ public class ShareConsumeRequestManagerTest {
buildRequestManager();
shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
- subscriptions.subscribeToShareGroup(Collections.singleton(topicName));
+ subscriptions.subscribeToShareGroup(Set.of(topicName));
Set<TopicPartition> partitions = new HashSet<>();
partitions.add(tp0);
partitions.add(tp1);
@@ -2270,9 +2267,9 @@ public class ShareConsumeRequestManagerTest {
LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData>
partitionData =
buildPartitionDataMap(tip0, records,
ShareCompletedFetchTest.acquiredRecords(1L, 1), Errors.NONE, Errors.NONE);
- client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0,
partitionData, Collections.emptyList(), 0), nodeId0);
+ client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0,
partitionData, List.of(), 0), nodeId0);
partitionData = buildPartitionDataMap(tip1, records,
ShareCompletedFetchTest.acquiredRecords(1L, 2), Errors.NONE, Errors.NONE);
- client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0,
partitionData, Collections.emptyList(), 0), nodeId1);
+ client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0,
partitionData, List.of(), 0), nodeId1);
networkClientDelegate.poll(time.timer(0));
assertTrue(shareConsumeRequestManager.hasCompletedFetches());
@@ -2328,7 +2325,7 @@ public class ShareConsumeRequestManagerTest {
buildRequestManager();
shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
- subscriptions.subscribeToShareGroup(Collections.singleton(topicName));
+ subscriptions.subscribeToShareGroup(Set.of(topicName));
Set<TopicPartition> partitions = new HashSet<>();
partitions.add(tp0);
partitions.add(tp1);
@@ -2354,13 +2351,13 @@ public class ShareConsumeRequestManagerTest {
.setRecords(records)
.setAcquiredRecords(ShareCompletedFetchTest.acquiredRecords(1L, 1))
.setAcknowledgeErrorCode(Errors.NONE.code()));
- client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0,
partitionData, Collections.emptyList(), 0), nodeId0);
+ client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0,
partitionData, List.of(), 0), nodeId0);
partitionData.clear();
partitionData.put(tip1,
new ShareFetchResponseData.PartitionData()
.setPartitionIndex(tip1.topicPartition().partition())
.setErrorCode(Errors.NONE.code()));
- client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0,
partitionData, Collections.emptyList(), 0), nodeId1);
+ client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0,
partitionData, List.of(), 0), nodeId1);
networkClientDelegate.poll(time.timer(0));
assertTrue(shareConsumeRequestManager.hasCompletedFetches());
@@ -2390,7 +2387,7 @@ public class ShareConsumeRequestManagerTest {
.setPartitionIndex(tip0.topicPartition().partition())
.setErrorCode(Errors.NONE.code())
.setAcknowledgeErrorCode(Errors.NONE.code()));
- client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0,
partitionData, Collections.emptyList(), 0), nodeId0, true);
+ client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0,
partitionData, List.of(), 0), nodeId0, true);
partitionData.clear();
partitionData.put(tip1,
new ShareFetchResponseData.PartitionData()
@@ -2398,7 +2395,7 @@ public class ShareConsumeRequestManagerTest {
.setRecords(records)
.setAcquiredRecords(ShareCompletedFetchTest.acquiredRecords(1L, 1))
.setAcknowledgeErrorCode(Errors.NONE.code()));
- client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0,
partitionData, Collections.emptyList(), 0), nodeId1);
+ client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0,
partitionData, List.of(), 0), nodeId1);
networkClientDelegate.poll(time.timer(0));
assertTrue(shareConsumeRequestManager.hasCompletedFetches());
@@ -2434,7 +2431,7 @@ public class ShareConsumeRequestManagerTest {
partitionData.put(tip1,
new ShareFetchResponseData.PartitionData()
.setPartitionIndex(tip1.topicPartition().partition()));
- client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0,
partitionData, Collections.emptyList(), 0), nodeId1);
+ client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0,
partitionData, List.of(), 0), nodeId1);
networkClientDelegate.poll(time.timer(0));
assertTrue(shareConsumeRequestManager.hasCompletedFetches());
@@ -2453,7 +2450,7 @@ public class ShareConsumeRequestManagerTest {
// We will simulate two nodes, each with one partition. The first node
will have more records
buildRequestManager(ShareAcquireMode.RECORD_LIMIT);
- subscriptions.subscribeToShareGroup(Collections.singleton(topicName));
+ subscriptions.subscribeToShareGroup(Set.of(topicName));
Set<TopicPartition> partitions = new LinkedHashSet<>();
partitions.add(tp0);
partitions.add(tp1);
@@ -2566,7 +2563,7 @@ public class ShareConsumeRequestManagerTest {
public void testShareFetchWithRenewAcknowledgement() {
buildRequestManager();
- assignFromSubscribed(Collections.singleton(tp0));
+ assignFromSubscribed(Set.of(tp0));
sendFetchAndVerifyResponse(records, acquiredRecords, Errors.NONE);
Acknowledgements acknowledgements = getAcknowledgements(1,
@@ -2624,7 +2621,7 @@ public class ShareConsumeRequestManagerTest {
new ShareFetchResponseData.PartitionData()
.setPartitionIndex(tp.topicPartition().partition())
.setErrorCode(error.code()));
- return ShareFetchResponse.of(error, 0, new
LinkedHashMap<>(partitions), Collections.emptyList(), 0);
+ return ShareFetchResponse.of(error, 0, new
LinkedHashMap<>(partitions), List.of(), 0);
}
private ShareFetchResponse fullFetchResponse(TopicIdPartition tp,
@@ -2641,30 +2638,30 @@ public class ShareConsumeRequestManagerTest {
Errors acknowledgeError) {
Map<TopicIdPartition, ShareFetchResponseData.PartitionData> partitions
= Map.of(tp,
partitionDataForFetch(tp, records, acquiredRecords, error,
acknowledgeError));
- return ShareFetchResponse.of(Errors.NONE, 0, new
LinkedHashMap<>(partitions), Collections.emptyList(), 0);
+ return ShareFetchResponse.of(Errors.NONE, 0, new
LinkedHashMap<>(partitions), List.of(), 0);
}
private ShareAcknowledgeResponse emptyAcknowledgeResponse() {
- Map<TopicIdPartition, ShareAcknowledgeResponseData.PartitionData>
partitions = Collections.emptyMap();
- return ShareAcknowledgeResponse.of(Errors.NONE, 0, new
LinkedHashMap<>(partitions), Collections.emptyList(), 0);
+ Map<TopicIdPartition, ShareAcknowledgeResponseData.PartitionData>
partitions = Map.of();
+ return ShareAcknowledgeResponse.of(Errors.NONE, 0, new
LinkedHashMap<>(partitions), List.of(), 0);
}
private ShareAcknowledgeResponse
acknowledgeResponseWithTopLevelError(TopicIdPartition tp, Errors error) {
Map<TopicIdPartition, ShareAcknowledgeResponseData.PartitionData>
partitions = Map.of(tp,
partitionDataForAcknowledge(tp, Errors.NONE));
- return ShareAcknowledgeResponse.of(error, 0, new
LinkedHashMap<>(partitions), Collections.emptyList(), 0);
+ return ShareAcknowledgeResponse.of(error, 0, new
LinkedHashMap<>(partitions), List.of(), 0);
}
private ShareAcknowledgeResponse fullAcknowledgeResponse(TopicIdPartition
tp, Errors error) {
Map<TopicIdPartition, ShareAcknowledgeResponseData.PartitionData>
partitions = Map.of(tp,
partitionDataForAcknowledge(tp, error));
- return ShareAcknowledgeResponse.of(Errors.NONE, 0, new
LinkedHashMap<>(partitions), Collections.emptyList(), 0);
+ return ShareAcknowledgeResponse.of(Errors.NONE, 0, new
LinkedHashMap<>(partitions), List.of(), 0);
}
private ShareAcknowledgeResponse
fullAcknowledgeResponse(Map<TopicIdPartition, Errors> partitionErrorsMap) {
Map<TopicIdPartition, ShareAcknowledgeResponseData.PartitionData>
partitions = new HashMap<>();
partitionErrorsMap.forEach((tip, error) -> partitions.put(tip,
partitionDataForAcknowledge(tip, error)));
- return ShareAcknowledgeResponse.of(Errors.NONE, 0, new
LinkedHashMap<>(partitions), Collections.emptyList(), 0);
+ return ShareAcknowledgeResponse.of(Errors.NONE, 0, new
LinkedHashMap<>(partitions), List.of(), 0);
}
private ShareAcknowledgeResponse fullAcknowledgeResponse(TopicIdPartition
tp,
@@ -2712,7 +2709,7 @@ public class ShareConsumeRequestManagerTest {
*/
private void assertEmptyFetch(String reason) {
ShareFetch<?, ?> fetch = collectFetch();
- assertEquals(Collections.emptyMap(), fetch.records(), reason);
+ assertEquals(Map.of(), fetch.records(), reason);
assertTrue(fetch.isEmpty(), reason);
}
@@ -2728,7 +2725,7 @@ public class ShareConsumeRequestManagerTest {
private <K, V> Map<TopicPartition, List<ConsumerRecord<K, V>>>
fetchRecords() {
ShareFetch<K, V> fetch = collectFetch();
if (fetch.isEmpty()) {
- return Collections.emptyMap();
+ return Map.of();
}
return fetch.records();
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java
index 6943c39f060..24baa003d5a 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java
@@ -88,8 +88,6 @@ import java.util.function.Predicate;
import javax.security.auth.login.LoginException;
-import static java.util.Collections.singleton;
-import static java.util.Collections.singletonList;
import static
org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_SHARE_METRIC_GROUP;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -254,7 +252,7 @@ public class ShareConsumerImplTest {
final String topicName = "foo";
doReturn(ShareFetch.empty()).when(fetchCollector).collect(any(ShareFetchBuffer.class));
- final List<String> subscriptionTopic =
Collections.singletonList(topicName);
+ final List<String> subscriptionTopic = List.of(topicName);
completeShareSubscriptionChangeApplicationEventSuccessfully(subscriptions,
subscriptionTopic);
consumer.subscribe(subscriptionTopic);
@@ -271,7 +269,7 @@ public class ShareConsumerImplTest {
// Set up subscription
final String topicName = "foo";
- final List<String> subscriptionTopic =
Collections.singletonList(topicName);
+ final List<String> subscriptionTopic = List.of(topicName);
completeShareSubscriptionChangeApplicationEventSuccessfully(subscriptions,
subscriptionTopic);
consumer.subscribe(subscriptionTopic);
@@ -312,7 +310,7 @@ public class ShareConsumerImplTest {
return ShareFetch.empty();
}).doAnswer(invocation ->
ShareFetch.empty()).when(fetchCollector).collect(any(ShareFetchBuffer.class));
- final List<String> subscriptionTopic =
Collections.singletonList(topicName);
+ final List<String> subscriptionTopic = List.of(topicName);
completeShareSubscriptionChangeApplicationEventSuccessfully(subscriptions,
subscriptionTopic);
consumer.subscribe(subscriptionTopic);
@@ -337,7 +335,7 @@ public class ShareConsumerImplTest {
return fetch;
}).when(fetchCollector).collect(Mockito.any(ShareFetchBuffer.class));
- final List<String> subscriptionTopic =
Collections.singletonList(topicName);
+ final List<String> subscriptionTopic = List.of(topicName);
completeShareSubscriptionChangeApplicationEventSuccessfully(subscriptions,
subscriptionTopic);
consumer.subscribe(subscriptionTopic);
@@ -377,7 +375,7 @@ public class ShareConsumerImplTest {
.collect(any(ShareFetchBuffer.class));
// Set up subscription
- List<String> topics = Collections.singletonList(topic);
+ List<String> topics = List.of(topic);
completeShareSubscriptionChangeApplicationEventSuccessfully(subscriptions,
topics);
consumer.subscribe(topics);
@@ -436,7 +434,7 @@ public class ShareConsumerImplTest {
.collect(any(ShareFetchBuffer.class));
// Set up subscription
- List<String> topics = Collections.singletonList(topic);
+ List<String> topics = List.of(topic);
completeShareSubscriptionChangeApplicationEventSuccessfully(subscriptions,
topics);
consumer.subscribe(topics);
assertEquals(Optional.empty(), consumer.acquisitionLockTimeoutMs());
@@ -516,7 +514,7 @@ public class ShareConsumerImplTest {
.collect(any(ShareFetchBuffer.class));
// Set up subscription
- List<String> topics = Collections.singletonList(topic);
+ List<String> topics = List.of(topic);
completeShareSubscriptionChangeApplicationEventSuccessfully(subscriptions,
topics);
consumer.subscribe(topics);
@@ -674,10 +672,10 @@ public class ShareConsumerImplTest {
consumer = newConsumer(subscriptions);
String topic = "topic1";
- final List<String> subscriptionTopic = singletonList(topic);
+ final List<String> subscriptionTopic = List.of(topic);
completeShareSubscriptionChangeApplicationEventSuccessfully(subscriptions,
subscriptionTopic);
consumer.subscribe(subscriptionTopic);
- assertEquals(singleton(topic), consumer.subscription());
+ assertEquals(Set.of(topic), consumer.subscription());
verify(applicationEventHandler).addAndGet(ArgumentMatchers.isA(ShareSubscriptionChangeEvent.class));
}
@@ -700,7 +698,7 @@ public class ShareConsumerImplTest {
completeShareUnsubscribeApplicationEventSuccessfully(subscriptions);
- consumer.subscribe(Collections.emptyList());
+ consumer.subscribe(List.of());
verify(applicationEventHandler).addAndGet(ArgumentMatchers.isA(ShareUnsubscribeEvent.class));
}
@@ -714,14 +712,14 @@ public class ShareConsumerImplTest {
@Test
public void testSubscriptionOnNullTopic() {
consumer = newConsumer();
- assertThrows(IllegalArgumentException.class, () ->
consumer.subscribe(singletonList(null)));
+ assertThrows(IllegalArgumentException.class, () ->
consumer.subscribe(Collections.singletonList(null)));
}
@Test
public void testSubscriptionOnEmptyTopic() {
consumer = newConsumer();
String emptyTopic = " ";
- assertThrows(IllegalArgumentException.class, () ->
consumer.subscribe(singletonList(emptyTopic)));
+ assertThrows(IllegalArgumentException.class, () ->
consumer.subscribe(Collections.singletonList(emptyTopic)));
}
@Test
@@ -733,7 +731,7 @@ public class ShareConsumerImplTest {
final KafkaException expectedException = new KafkaException("Nobody
expects the Spanish Inquisition");
final ErrorEvent errorBackgroundEvent = new
ErrorEvent(expectedException);
backgroundEventQueue.add(errorBackgroundEvent);
- consumer.subscribe(Collections.singletonList("t1"));
+ consumer.subscribe(List.of("t1"));
final KafkaException exception = assertThrows(KafkaException.class, ()
-> consumer.poll(Duration.ZERO));
assertEquals(expectedException.getMessage(), exception.getMessage());
@@ -751,7 +749,7 @@ public class ShareConsumerImplTest {
final KafkaException expectedException2 = new KafkaException("Spam,
Spam, Spam");
final ErrorEvent errorBackgroundEvent2 = new
ErrorEvent(expectedException2);
backgroundEventQueue.add(errorBackgroundEvent2);
- consumer.subscribe(Collections.singletonList("t1"));
+ consumer.subscribe(List.of("t1"));
final KafkaException exception = assertThrows(KafkaException.class, ()
-> consumer.poll(Duration.ZERO));
assertEquals(expectedException1.getMessage(), exception.getMessage());
@@ -808,7 +806,7 @@ public class ShareConsumerImplTest {
.when(fetchCollector)
.collect(Mockito.any(ShareFetchBuffer.class));
- final List<String> subscriptionTopic = singletonList("topic");
+ final List<String> subscriptionTopic = List.of("topic");
completeShareSubscriptionChangeApplicationEventSuccessfully(subscriptions,
subscriptionTopic);
consumer.subscribe(subscriptionTopic);
@@ -958,7 +956,7 @@ public class ShareConsumerImplTest {
/**
* This test ensures that the {@link ShareConsumer} implementation fails
on creation when the underlying
* {@link NetworkClient} fails creation.
- *
+ * <p>
* The logic to check for this case is admittedly a bit awkward because
the constructor can fail for all
* manner of reasons. So a failure case is created by specifying an invalid
* {@link javax.security.auth.spi.LoginModule} class name, which in turn
causes the {@link NetworkClient}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareFetchCollectorTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareFetchCollectorTest.java
index c144ee4093e..492a8b857a1 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareFetchCollectorTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareFetchCollectorTest.java
@@ -48,7 +48,6 @@ import org.junit.jupiter.params.provider.MethodSource;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -336,8 +335,8 @@ public class ShareFetchCollectorTest {
}
private void subscribeAndAssign(TopicIdPartition tp) {
- subscriptions.subscribe(Collections.singleton(tp.topic()),
Optional.empty());
-
subscriptions.assignFromSubscribed(Collections.singleton(tp.topicPartition()));
+ subscriptions.subscribe(Set.of(tp.topic()), Optional.empty());
+ subscriptions.assignFromSubscribed(Set.of(tp.topicPartition()));
}
/**
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareHeartbeatRequestManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareHeartbeatRequestManagerTest.java
index 20528e775de..ce457a3a028 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareHeartbeatRequestManagerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareHeartbeatRequestManagerTest.java
@@ -52,7 +52,8 @@ import org.mockito.ArgumentCaptor;
import java.util.Arrays;
import java.util.Collection;
-import java.util.Collections;
+import java.util.List;
+import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Random;
@@ -216,7 +217,7 @@ public class ShareHeartbeatRequestManagerTest {
createHeartbeatRequestStateWithZeroHeartbeatInterval();
time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS);
String topic = "topic1";
- Set<String> set = Collections.singleton(topic);
+ Set<String> set = Set.of(topic);
when(subscriptions.subscription()).thenReturn(set);
subscriptions.subscribeToShareGroup(set);
@@ -236,7 +237,7 @@ public class ShareHeartbeatRequestManagerTest {
assertEquals(0, heartbeatRequest.data().memberEpoch());
// Should include subscription and group basic info to start getting
assignments.
- assertEquals(Collections.singletonList(topic),
heartbeatRequest.data().subscribedTopicNames());
+ assertEquals(List.of(topic),
heartbeatRequest.data().subscribedTopicNames());
assertEquals(DEFAULT_GROUP_ID, heartbeatRequest.data().groupId());
}
@@ -532,7 +533,7 @@ public class ShareHeartbeatRequestManagerTest {
assertEquals(DEFAULT_GROUP_ID, data.groupId());
assertEquals("", data.memberId());
assertEquals(0, data.memberEpoch());
- assertEquals(Collections.emptyList(), data.subscribedTopicNames());
+ assertEquals(List.of(), data.subscribedTopicNames());
membershipManager.onHeartbeatRequestGenerated();
// Mock a response from the group coordinator, that supplies the
member ID and a new epoch
@@ -549,37 +550,37 @@ public class ShareHeartbeatRequestManagerTest {
// Join the group and subscribe to a topic, but the response has not
yet been received
String topic = "topic1";
- subscriptions.subscribe(Collections.singleton(topic),
Optional.empty());
-
when(subscriptions.subscription()).thenReturn(Collections.singleton(topic));
+ subscriptions.subscribe(Set.of(topic), Optional.empty());
+ when(subscriptions.subscription()).thenReturn(Set.of(topic));
mockRejoiningMemberData();
data = heartbeatState.buildRequestData();
assertEquals(DEFAULT_GROUP_ID, data.groupId());
assertEquals(DEFAULT_MEMBER_ID, data.memberId());
assertEquals(0, data.memberEpoch());
- assertEquals(Collections.singletonList(topic),
data.subscribedTopicNames());
+ assertEquals(List.of(topic), data.subscribedTopicNames());
membershipManager.onHeartbeatRequestGenerated();
data = heartbeatState.buildRequestData();
assertEquals(DEFAULT_GROUP_ID, data.groupId());
assertEquals(DEFAULT_MEMBER_ID, data.memberId());
assertEquals(0, data.memberEpoch());
- assertEquals(Collections.singletonList(topic),
data.subscribedTopicNames());
+ assertEquals(List.of(topic), data.subscribedTopicNames());
// Mock the response from the group coordinator which returns an
assignment
ShareGroupHeartbeatResponseData.TopicPartitions tpTopic1 =
new ShareGroupHeartbeatResponseData.TopicPartitions();
Uuid topicId = Uuid.randomUuid();
tpTopic1.setTopicId(topicId);
- tpTopic1.setPartitions(Collections.singletonList(0));
+ tpTopic1.setPartitions(List.of(0));
ShareGroupHeartbeatResponseData.Assignment assignmentTopic1 =
new ShareGroupHeartbeatResponseData.Assignment();
-
assignmentTopic1.setTopicPartitions(Collections.singletonList(tpTopic1));
+ assignmentTopic1.setTopicPartitions(List.of(tpTopic1));
ShareGroupHeartbeatResponse rs1 = new ShareGroupHeartbeatResponse(new
ShareGroupHeartbeatResponseData()
.setHeartbeatIntervalMs(DEFAULT_HEARTBEAT_INTERVAL_MS)
.setMemberId(DEFAULT_MEMBER_ID)
.setMemberEpoch(DEFAULT_MEMBER_EPOCH)
.setAssignment(assignmentTopic1));
-
when(metadata.topicNames()).thenReturn(Collections.singletonMap(topicId,
"topic1"));
+ when(metadata.topicNames()).thenReturn(Map.of(topicId, "topic1"));
membershipManager.onHeartbeatSuccess(rs1);
}
@@ -811,7 +812,7 @@ public class ShareHeartbeatRequestManagerTest {
}
private void mockStableMemberData() {
- when(membershipManager.currentAssignment()).thenReturn(new
AbstractMembershipManager.LocalAssignment(0, Collections.emptyMap()));
+ when(membershipManager.currentAssignment()).thenReturn(new
AbstractMembershipManager.LocalAssignment(0, Map.of()));
when(membershipManager.groupId()).thenReturn(DEFAULT_GROUP_ID);
when(membershipManager.memberId()).thenReturn(DEFAULT_MEMBER_ID);
when(membershipManager.memberEpoch()).thenReturn(DEFAULT_MEMBER_EPOCH);
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareMembershipManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareMembershipManagerTest.java
index 346f7f4f7d9..bc782045577 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareMembershipManagerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareMembershipManagerTest.java
@@ -42,7 +42,6 @@ import org.junit.jupiter.params.provider.ValueSource;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
-import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -186,7 +185,7 @@ public class ShareMembershipManagerTest {
when(subscriptionState.hasAutoAssignedPartitions()).thenReturn(true);
membershipManager.transitionToFatal();
assertEquals(MemberState.FATAL, membershipManager.state());
- verify(subscriptionState).assignFromSubscribed(Collections.emptySet());
+ verify(subscriptionState).assignFromSubscribed(Set.of());
membershipManager.leaveGroup();
verify(subscriptionState).unsubscribe();
@@ -208,7 +207,7 @@ public class ShareMembershipManagerTest {
public void testFencingWhenStateIsStable() {
ShareMembershipManager membershipManager = createMemberInStableState();
testFencedMemberReleasesAssignmentAndTransitionsToJoining(membershipManager);
- verify(subscriptionState).assignFromSubscribed(Collections.emptySet());
+ verify(subscriptionState).assignFromSubscribed(Set.of());
}
@Test
@@ -280,7 +279,7 @@ public class ShareMembershipManagerTest {
assertEquals(MemberState.RECONCILING, membershipManager.state());
testFencedMemberReleasesAssignmentAndTransitionsToJoining(membershipManager);
- verify(subscriptionState).assignFromSubscribed(Collections.emptySet());
+ verify(subscriptionState).assignFromSubscribed(Set.of());
}
@Test
@@ -340,12 +339,12 @@ public class ShareMembershipManagerTest {
Uuid topic1 = Uuid.randomUuid();
final ShareGroupHeartbeatResponseData.Assignment assignment1 = new
ShareGroupHeartbeatResponseData.Assignment();
final ShareGroupHeartbeatResponseData.Assignment assignment2 = new
ShareGroupHeartbeatResponseData.Assignment()
- .setTopicPartitions(Collections.singletonList(
+ .setTopicPartitions(List.of(
new ShareGroupHeartbeatResponseData.TopicPartitions()
.setTopicId(topic1)
.setPartitions(Arrays.asList(0, 1, 2))
));
-
when(metadata.topicNames()).thenReturn(Collections.singletonMap(topic1,
"topic1"));
+ when(metadata.topicNames()).thenReturn(Map.of(topic1, "topic1"));
assertEquals(toTopicIdPartitionMap(assignment1),
membershipManager.currentAssignment().partitions);
// Receive assignment, wait on commit
@@ -382,7 +381,7 @@ public class ShareMembershipManagerTest {
* This is the case where we receive a new assignment while reconciling an
existing one. The intermediate assignment
* is not applied, and a new assignment containing the same partitions is
received and reconciled. In all assignments,
* one topic is not resolvable.
- *
+ * <p>
* We need to make sure that the last assignment is acked and applied,
even though the set of partitions does not change.
* In this case, no rebalance listeners are run.
*/
@@ -393,15 +392,15 @@ public class ShareMembershipManagerTest {
Uuid topic2 = Uuid.randomUuid();
final ShareGroupHeartbeatResponseData.Assignment assignment1 = new
ShareGroupHeartbeatResponseData.Assignment()
.setTopicPartitions(Arrays.asList(
- new
ShareGroupHeartbeatResponseData.TopicPartitions().setTopicId(topic1).setPartitions(Collections.singletonList(0)),
- new
ShareGroupHeartbeatResponseData.TopicPartitions().setTopicId(topic2).setPartitions(Collections.singletonList(0))
+ new
ShareGroupHeartbeatResponseData.TopicPartitions().setTopicId(topic1).setPartitions(List.of(0)),
+ new
ShareGroupHeartbeatResponseData.TopicPartitions().setTopicId(topic2).setPartitions(List.of(0))
));
final ShareGroupHeartbeatResponseData.Assignment assignment2 = new
ShareGroupHeartbeatResponseData.Assignment()
.setTopicPartitions(Arrays.asList(
new
ShareGroupHeartbeatResponseData.TopicPartitions().setTopicId(topic1).setPartitions(Arrays.asList(0,
1)),
- new
ShareGroupHeartbeatResponseData.TopicPartitions().setTopicId(topic2).setPartitions(Collections.singletonList(0))
+ new
ShareGroupHeartbeatResponseData.TopicPartitions().setTopicId(topic2).setPartitions(List.of(0))
));
-
when(metadata.topicNames()).thenReturn(Collections.singletonMap(topic1,
"topic1"));
+ when(metadata.topicNames()).thenReturn(Map.of(topic1, "topic1"));
// Receive assignment - full reconciliation triggered
// stay in RECONCILING state, since an unresolved topic is assigned
@@ -410,7 +409,7 @@ public class ShareMembershipManagerTest {
assertEquals(MemberState.RECONCILING, membershipManager.state());
membershipManager.poll(time.milliseconds());
verifyReconciliationTriggeredAndCompleted(membershipManager,
- Collections.singletonList(new TopicIdPartition(topic1, new
TopicPartition("topic1", 0)))
+ List.of(new TopicIdPartition(topic1, new
TopicPartition("topic1", 0)))
);
membershipManager.onHeartbeatRequestGenerated();
assertEquals(MemberState.RECONCILING, membershipManager.state());
@@ -427,7 +426,7 @@ public class ShareMembershipManagerTest {
membershipManager.poll(time.milliseconds());
assertEquals(MemberState.ACKNOWLEDGING, membershipManager.state());
verifyReconciliationNotTriggered(membershipManager);
- assertEquals(Collections.singletonMap(topic1, mkSortedSet(0)),
membershipManager.currentAssignment().partitions);
+ assertEquals(Map.of(topic1, mkSortedSet(0)),
membershipManager.currentAssignment().partitions);
assertEquals(Set.of(topic2),
membershipManager.topicsAwaitingReconciliation());
}
@@ -454,12 +453,12 @@ public class ShareMembershipManagerTest {
// Receive assignment with only topic1-0, entering STABLE state.
ShareMembershipManager membershipManager =
- mockMemberSuccessfullyReceivesAndAcksAssignment(topicId1,
topic1, Collections.singletonList(0));
+ mockMemberSuccessfullyReceivesAndAcksAssignment(topicId1,
topic1, List.of(0));
membershipManager.onHeartbeatRequestGenerated();
assertEquals(MemberState.STABLE, membershipManager.state());
-
when(subscriptionState.assignedPartitions()).thenReturn(getTopicPartitions(Collections.singleton(topicId1Partition0)));
+
when(subscriptionState.assignedPartitions()).thenReturn(getTopicPartitions(Set.of(topicId1Partition0)));
clearInvocations(membershipManager, subscriptionState);
// New assignment adding a new topic2-0 (not in metadata).
@@ -482,7 +481,7 @@ public class ShareMembershipManagerTest {
verifyReconciliationNotTriggered(membershipManager);
assertEquals(MemberState.RECONCILING, membershipManager.state());
- assertEquals(Collections.singleton(topicId2),
membershipManager.topicsAwaitingReconciliation());
+ assertEquals(Set.of(topicId2),
membershipManager.topicsAwaitingReconciliation());
verify(metadata).requestUpdate(anyBoolean());
clearInvocations(membershipManager);
@@ -504,7 +503,7 @@ public class ShareMembershipManagerTest {
public void testLeaveGroupWhenStateIsStable() {
ShareMembershipManager membershipManager = createMemberInStableState();
testLeaveGroupReleasesAssignmentAndResetsEpochToSendLeaveGroup(membershipManager);
- verify(subscriptionState).assignFromSubscribed(Collections.emptySet());
+ verify(subscriptionState).assignFromSubscribed(Set.of());
}
@Test
@@ -637,7 +636,7 @@ public class ShareMembershipManagerTest {
Uuid topicId = Uuid.randomUuid();
String topicName = "topic1";
ShareMembershipManager membershipManager =
createMembershipManagerJoiningGroup();
- mockOwnedPartitionAndAssignmentReceived(membershipManager, topicId,
topicName, Collections.emptyList());
+ mockOwnedPartitionAndAssignmentReceived(membershipManager, topicId,
topicName, List.of());
receiveAssignment(topicId, Arrays.asList(0, 1), membershipManager);
@@ -659,13 +658,13 @@ public class ShareMembershipManagerTest {
ShareMembershipManager membershipManager = createMemberInStableState();
// Clear the assignment
-
when(subscriptionState.assignedPartitions()).thenReturn(Collections.emptySet());
+ when(subscriptionState.assignedPartitions()).thenReturn(Set.of());
when(subscriptionState.hasAutoAssignedPartitions()).thenReturn(false);
membershipManager.transitionToFenced();
// Make sure to never call `assignFromSubscribed` again
- verify(subscriptionState,
never()).assignFromSubscribed(Collections.emptySet());
+ verify(subscriptionState, never()).assignFromSubscribed(Set.of());
}
@Test
@@ -678,7 +677,7 @@ public class ShareMembershipManagerTest {
verify(subscriptionState).unsubscribe();
assertFalse(leaveResult1.isDone());
assertEquals(MemberState.LEAVING, membershipManager.state());
- verify(subscriptionState).assignFromSubscribed(Collections.emptySet());
+ verify(subscriptionState).assignFromSubscribed(Set.of());
clearInvocations(subscriptionState);
// Second leave attempt while the first one has not completed yet.
@@ -693,7 +692,7 @@ public class ShareMembershipManagerTest {
assertFalse(leaveResult2.isCompletedExceptionally());
// Subscription should have been updated only once with the first
leave group.
- verify(subscriptionState,
never()).assignFromSubscribed(Collections.emptySet());
+ verify(subscriptionState, never()).assignFromSubscribed(Set.of());
}
@Test
@@ -705,7 +704,7 @@ public class ShareMembershipManagerTest {
CompletableFuture<Void> leaveResult1 = membershipManager.leaveGroup();
assertEquals(MemberState.LEAVING, membershipManager.state());
assertTransitionToUnsubscribeOnHBSentAndWaitForResponseToCompleteLeave(membershipManager,
leaveResult1);
- verify(subscriptionState).assignFromSubscribed(Collections.emptySet());
+ verify(subscriptionState).assignFromSubscribed(Set.of());
clearInvocations(subscriptionState);
// Call to leave group again, when member already left. Should be
no-op (no assignment updated)
@@ -715,7 +714,7 @@ public class ShareMembershipManagerTest {
assertTrue(leaveResult2.isDone());
assertFalse(leaveResult2.isCompletedExceptionally());
assertEquals(MemberState.UNSUBSCRIBED, membershipManager.state());
- verify(subscriptionState,
never()).assignFromSubscribed(Collections.emptySet());
+ verify(subscriptionState, never()).assignFromSubscribed(Set.of());
}
@Test
@@ -731,7 +730,7 @@ public class ShareMembershipManagerTest {
}
@Test
- public void testFatalFailureWhenStateIsUnjoined() {
+ public void testFatalFailureWhenStateIsJoining() {
ShareMembershipManager membershipManager =
createMembershipManagerJoiningGroup();
assertEquals(MemberState.JOINING, membershipManager.state());
@@ -819,13 +818,13 @@ public class ShareMembershipManagerTest {
// reconcile the new assignment.
Uuid topicId = Uuid.randomUuid();
String topicName = "topic1";
-
when(metadata.topicNames()).thenReturn(Collections.singletonMap(topicId,
topicName));
- receiveAssignment(topicId, Collections.singletonList(0),
membershipManager);
+ when(metadata.topicNames()).thenReturn(Map.of(topicId, topicName));
+ receiveAssignment(topicId, List.of(0), membershipManager);
verifyReconciliationNotTriggered(membershipManager);
membershipManager.poll(time.milliseconds());
- Set<TopicPartition> expectedAssignment = Collections.singleton(new
TopicPartition(topicName, 0));
+ Set<TopicPartition> expectedAssignment = Set.of(new
TopicPartition(topicName, 0));
assertEquals(MemberState.ACKNOWLEDGING, membershipManager.state());
verify(subscriptionState).assignFromSubscribedAwaitingCallback(expectedAssignment,
expectedAssignment);
@@ -860,14 +859,14 @@ public class ShareMembershipManagerTest {
// reconcile the new assignment.
Uuid topicId = Uuid.randomUuid();
String topicName = "topic1";
-
when(metadata.topicNames()).thenReturn(Collections.singletonMap(topicId,
topicName));
+ when(metadata.topicNames()).thenReturn(Map.of(topicId, topicName));
receiveEmptyAssignment(membershipManager);
verifyReconciliationNotTriggered(membershipManager);
membershipManager.poll(time.milliseconds());
- verifyReconciliationTriggeredAndCompleted(membershipManager,
Collections.emptyList());
+ verifyReconciliationTriggeredAndCompleted(membershipManager,
List.of());
membershipManager.onHeartbeatRequestGenerated();
@@ -885,8 +884,8 @@ public class ShareMembershipManagerTest {
// one that is waiting for metadata, so the member will discard the
topics that were
// waiting for metadata, and just keep the new one as unresolved.
Uuid topicId = Uuid.randomUuid();
- when(metadata.topicNames()).thenReturn(Collections.emptyMap());
- receiveAssignment(topicId, Collections.singletonList(0),
membershipManager);
+ when(metadata.topicNames()).thenReturn(Map.of());
+ receiveAssignment(topicId, List.of(0), membershipManager);
verifyReconciliationNotTriggered(membershipManager);
membershipManager.poll(time.milliseconds());
@@ -907,19 +906,19 @@ public class ShareMembershipManagerTest {
// Assignment not in metadata. Member cannot reconcile it yet, but
keeps it to be
// reconciled when metadata is discovered.
Uuid topicId = Uuid.randomUuid();
- receiveAssignment(topicId, Collections.singletonList(1),
membershipManager);
+ receiveAssignment(topicId, List.of(1), membershipManager);
assertEquals(MemberState.RECONCILING, membershipManager.state());
assertFalse(membershipManager.topicsAwaitingReconciliation().isEmpty());
// Metadata update received, including the missing topic name.
String topicName = "topic1";
-
when(metadata.topicNames()).thenReturn(Collections.singletonMap(topicId,
topicName));
+ when(metadata.topicNames()).thenReturn(Map.of(topicId, topicName));
when(subscriptionState.hasAutoAssignedPartitions()).thenReturn(true);
membershipManager.poll(time.milliseconds());
// Assignment should have been reconciled.
- Set<TopicPartition> expectedAssignment = Collections.singleton(new
TopicPartition(topicName, 1));
+ Set<TopicPartition> expectedAssignment = Set.of(new
TopicPartition(topicName, 1));
verify(subscriptionState).assignFromSubscribedAwaitingCallback(expectedAssignment,
expectedAssignment);
assertEquals(MemberState.ACKNOWLEDGING, membershipManager.state());
assertTrue(membershipManager.topicsAwaitingReconciliation().isEmpty());
@@ -940,19 +939,19 @@ public class ShareMembershipManagerTest {
.setTopicPartitions(Arrays.asList(
new ShareGroupHeartbeatResponseData.TopicPartitions()
.setTopicId(topic1)
- .setPartitions(Collections.singletonList(0)),
+ .setPartitions(List.of(0)),
new ShareGroupHeartbeatResponseData.TopicPartitions()
.setTopicId(topic2)
.setPartitions(Arrays.asList(1, 3))
));
-
when(metadata.topicNames()).thenReturn(Collections.singletonMap(topic1,
topic1Name));
+ when(metadata.topicNames()).thenReturn(Map.of(topic1, topic1Name));
// Receive assignment partly in metadata - reconcile+ack what's in
metadata, keep the
// unresolved and request metadata update.
ShareMembershipManager membershipManager =
mockJoinAndReceiveAssignment(true, assignment);
assertEquals(MemberState.ACKNOWLEDGING, membershipManager.state());
verify(metadata).requestUpdate(anyBoolean());
- assertEquals(Collections.singleton(topic2),
membershipManager.topicsAwaitingReconciliation());
+ assertEquals(Set.of(topic2),
membershipManager.topicsAwaitingReconciliation());
// When the ack is sent the member should go back to RECONCILING
because it still has
// unresolved assignment to be reconciled.
@@ -964,7 +963,7 @@ public class ShareMembershipManagerTest {
clearInvocations(subscriptionState);
membershipManager.onHeartbeatSuccess(createShareGroupHeartbeatResponse(assignment,
membershipManager.memberId()));
assertEquals(MemberState.RECONCILING, membershipManager.state());
- assertEquals(Collections.singleton(topic2),
membershipManager.topicsAwaitingReconciliation());
+ assertEquals(Set.of(topic2),
membershipManager.topicsAwaitingReconciliation());
verify(subscriptionState,
never()).assignFromSubscribed(anyCollection());
}
@@ -973,7 +972,7 @@ public class ShareMembershipManagerTest {
Uuid topicId = Uuid.randomUuid();
String topicName = "topic1";
ShareMembershipManager membershipManager =
createMembershipManagerJoiningGroup();
- mockOwnedPartitionAndAssignmentReceived(membershipManager, topicId,
topicName, Collections.emptyList());
+ mockOwnedPartitionAndAssignmentReceived(membershipManager, topicId,
topicName, List.of());
receiveAssignment(topicId, Arrays.asList(0, 1), membershipManager);
@@ -991,7 +990,7 @@ public class ShareMembershipManagerTest {
TopicIdPartition ownedPartition = new TopicIdPartition(topicId, new
TopicPartition(topicName, 0));
ShareMembershipManager membershipManager = createMemberInStableState();
mockOwnedPartitionAndAssignmentReceived(membershipManager, topicId,
topicName,
- Collections.singletonList(ownedPartition));
+ List.of(ownedPartition));
// New assignment received, adding partitions 1 and 2 to the
previously owned partition 0.
receiveAssignment(topicId, Arrays.asList(0, 1, 2), membershipManager);
@@ -1013,7 +1012,7 @@ public class ShareMembershipManagerTest {
String topicName = "topic1";
// Receive assignment different from what the member owns - should
reconcile
- mockOwnedPartitionAndAssignmentReceived(membershipManager, topicId,
topicName, Collections.emptyList());
+ mockOwnedPartitionAndAssignmentReceived(membershipManager, topicId,
topicName, List.of());
List<TopicIdPartition> expectedAssignmentReconciled =
topicIdPartitions(topicId, topicName, 0, 1);
receiveAssignment(topicId, Arrays.asList(0, 1), membershipManager);
@@ -1048,7 +1047,7 @@ public class ShareMembershipManagerTest {
new TopicPartition(topicName, 0));
ShareMembershipManager membershipManager =
createMembershipManagerJoiningGroup();
mockOwnedPartitionAndAssignmentReceived(membershipManager, topicId,
topicName,
- Collections.singletonList(ownedPartition));
+ List.of(ownedPartition));
mockRevocation();
@@ -1074,7 +1073,7 @@ public class ShareMembershipManagerTest {
// Assignment not in metadata
ShareGroupHeartbeatResponseData.Assignment targetAssignment = new
ShareGroupHeartbeatResponseData.Assignment()
- .setTopicPartitions(Collections.singletonList(
+ .setTopicPartitions(List.of(
new ShareGroupHeartbeatResponseData.TopicPartitions()
.setTopicId(topicId)
.setPartitions(Arrays.asList(0, 1))));
@@ -1084,11 +1083,11 @@ public class ShareMembershipManagerTest {
// Should not trigger reconciliation, and request a metadata update.
verifyReconciliationNotTriggered(membershipManager);
- assertEquals(Collections.singleton(topicId),
membershipManager.topicsAwaitingReconciliation());
+ assertEquals(Set.of(topicId),
membershipManager.topicsAwaitingReconciliation());
verify(metadata).requestUpdate(anyBoolean());
String topicName = "topic1";
- mockTopicNameInMetadataCache(Collections.singletonMap(topicId,
topicName), true);
+ mockTopicNameInMetadataCache(Map.of(topicId, topicName), true);
// When metadata is updated, the member should re-trigger
reconciliation
membershipManager.poll(time.milliseconds());
@@ -1104,7 +1103,7 @@ public class ShareMembershipManagerTest {
// Assignment not in metadata
ShareGroupHeartbeatResponseData.Assignment targetAssignment = new
ShareGroupHeartbeatResponseData.Assignment()
- .setTopicPartitions(Collections.singletonList(
+ .setTopicPartitions(List.of(
new ShareGroupHeartbeatResponseData.TopicPartitions()
.setTopicId(topicId)
.setPartitions(Arrays.asList(0, 1))));
@@ -1114,15 +1113,15 @@ public class ShareMembershipManagerTest {
// Should not trigger reconciliation, and request a metadata update.
verifyReconciliationNotTriggered(membershipManager);
- assertEquals(Collections.singleton(topicId),
membershipManager.topicsAwaitingReconciliation());
+ assertEquals(Set.of(topicId),
membershipManager.topicsAwaitingReconciliation());
verify(metadata).requestUpdate(anyBoolean());
// Next poll is run, but metadata still without the unresolved topic
in it. Should keep
// the unresolved and request update again.
- when(metadata.topicNames()).thenReturn(Collections.emptyMap());
+ when(metadata.topicNames()).thenReturn(Map.of());
membershipManager.poll(time.milliseconds());
verifyReconciliationNotTriggered(membershipManager);
- assertEquals(Collections.singleton(topicId),
membershipManager.topicsAwaitingReconciliation());
+ assertEquals(Set.of(topicId),
membershipManager.topicsAwaitingReconciliation());
verify(metadata, times(2)).requestUpdate(anyBoolean());
}
@@ -1132,7 +1131,7 @@ public class ShareMembershipManagerTest {
String topicName = "topic1";
ShareMembershipManager membershipManager =
createMembershipManagerJoiningGroup();
- mockOwnedPartitionAndAssignmentReceived(membershipManager, topicId,
topicName, Collections.emptyList());
+ mockOwnedPartitionAndAssignmentReceived(membershipManager, topicId,
topicName, List.of());
// Member received assignment to reconcile;
@@ -1147,7 +1146,7 @@ public class ShareMembershipManagerTest {
List<Integer> partitions = Arrays.asList(0, 1);
Set<TopicPartition> assignedPartitions =
partitions.stream().map(p -> new TopicPartition(topicName,
p)).collect(Collectors.toSet());
- Map<Uuid, SortedSet<Integer>> assignedTopicIdPartitions =
Collections.singletonMap(topicId,
+ Map<Uuid, SortedSet<Integer>> assignedTopicIdPartitions =
Map.of(topicId,
new TreeSet<>(partitions));
assertEquals(assignedTopicIdPartitions,
membershipManager.currentAssignment().partitions);
assertFalse(membershipManager.reconciliationInProgress());
@@ -1158,10 +1157,10 @@ public class ShareMembershipManagerTest {
// Revocation of topic not found in metadata cache
when(subscriptionState.hasAutoAssignedPartitions()).thenReturn(true);
mockRevocation();
- mockTopicNameInMetadataCache(Collections.singletonMap(topicId,
topicName), false);
+ mockTopicNameInMetadataCache(Map.of(topicId, topicName), false);
// Revoke one of the 2 partitions
- receiveAssignment(topicId, Collections.singletonList(1),
membershipManager);
+ receiveAssignment(topicId, List.of(1), membershipManager);
membershipManager.poll(time.milliseconds());
verify(subscriptionState, times(2)).markPendingRevocation(Set.of(new
TopicPartition(topicName, 0)));
@@ -1287,7 +1286,7 @@ public class ShareMembershipManagerTest {
for (int partition : partitions)
topicIdPartitions.add(partition);
- return Collections.singletonMap(topicId, topicIdPartitions);
+ return Map.of(topicId, topicIdPartitions);
}
private void testFenceIsNoOp(ShareMembershipManager membershipManager) {
@@ -1300,7 +1299,7 @@ public class ShareMembershipManagerTest {
// Should reset epoch to leave the group and release the assignment
(right away because
// there is no onPartitionsLost callback defined)
- verify(subscriptionState).assignFromSubscribed(Collections.emptySet());
+ verify(subscriptionState).assignFromSubscribed(Set.of());
assertTrue(membershipManager.currentAssignment().partitions.isEmpty());
assertTrue(membershipManager.topicsAwaitingReconciliation().isEmpty());
assertEquals(ShareGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH,
membershipManager.memberEpoch());
@@ -1387,7 +1386,7 @@ public class ShareMembershipManagerTest {
Uuid topicId, String topicName, List<Integer> partitions) {
ShareMembershipManager membershipManager =
createMembershipManagerJoiningGroup();
mockOwnedPartitionAndAssignmentReceived(membershipManager, topicId,
topicName,
- Collections.emptyList());
+ List.of());
receiveAssignment(topicId, partitions, membershipManager);
@@ -1432,7 +1431,7 @@ public class ShareMembershipManagerTest {
if (isPresent) {
when(metadata.topicNames()).thenReturn(topicNames);
} else {
- when(metadata.topicNames()).thenReturn(Collections.emptyMap());
+ when(metadata.topicNames()).thenReturn(Map.of());
}
}
@@ -1445,7 +1444,7 @@ public class ShareMembershipManagerTest {
private void mockMemberHasAutoAssignedPartition() {
String topicName = "topic1";
TopicPartition ownedPartition = new TopicPartition(topicName, 0);
-
when(subscriptionState.assignedPartitions()).thenReturn(Collections.singleton(ownedPartition));
+
when(subscriptionState.assignedPartitions()).thenReturn(Set.of(ownedPartition));
when(subscriptionState.hasAutoAssignedPartitions()).thenReturn(true);
}
@@ -1459,7 +1458,7 @@ public class ShareMembershipManagerTest {
List<TopicPartition> expectedTopicPartitionAssignment =
buildTopicPartitions(expectedCurrentAssignment);
HashSet<TopicPartition> expectedSet = new
HashSet<>(expectedTopicPartitionAssignment);
-
verify(subscriptionState).assignFromSubscribedAwaitingCallback(expectedSet,
Collections.emptySet());
+
verify(subscriptionState).assignFromSubscribedAwaitingCallback(expectedSet,
Set.of());
}
private Map<Uuid, SortedSet<Integer>>
assignmentByTopicId(List<TopicIdPartition> topicIdPartitions) {
@@ -1479,7 +1478,7 @@ public class ShareMembershipManagerTest {
HashMap<Uuid, SortedSet<Integer>> partitionsByTopicId = new
HashMap<>();
partitionsByTopicId.put(topicId, new
TreeSet<>(previouslyOwned.stream().map(TopicIdPartition::partition).collect(Collectors.toSet())));
membershipManager.updateAssignment(partitionsByTopicId);
-
when(metadata.topicNames()).thenReturn(Collections.singletonMap(topicId,
topicName));
+ when(metadata.topicNames()).thenReturn(Map.of(topicId, topicName));
when(subscriptionState.hasAutoAssignedPartitions()).thenReturn(true);
}
@@ -1492,8 +1491,8 @@ public class ShareMembershipManagerTest {
private void mockOwnedPartition(ShareMembershipManager membershipManager,
Uuid topicId, String topic) {
int partition = 0;
TopicPartition previouslyOwned = new TopicPartition(topic, partition);
- membershipManager.updateAssignment(mkMap(mkEntry(topicId, new
TreeSet<>(Collections.singletonList(partition)))));
-
when(subscriptionState.assignedPartitions()).thenReturn(Collections.singleton(previouslyOwned));
+ membershipManager.updateAssignment(mkMap(mkEntry(topicId, new
TreeSet<>(List.of(partition)))));
+
when(subscriptionState.assignedPartitions()).thenReturn(Set.of(previouslyOwned));
when(subscriptionState.hasAutoAssignedPartitions()).thenReturn(true);
}
@@ -1548,7 +1547,7 @@ public class ShareMembershipManagerTest {
private void receiveAssignment(Uuid topicId, List<Integer> partitions,
ShareMembershipManager membershipManager) {
ShareGroupHeartbeatResponseData.Assignment targetAssignment = new
ShareGroupHeartbeatResponseData.Assignment()
- .setTopicPartitions(Collections.singletonList(
+ .setTopicPartitions(List.of(
new ShareGroupHeartbeatResponseData.TopicPartitions()
.setTopicId(topicId)
.setPartitions(partitions)));
@@ -1559,7 +1558,7 @@ public class ShareMembershipManagerTest {
private void receiveEmptyAssignment(ShareMembershipManager
membershipManager) {
// New empty assignment received, revoking owned partition.
ShareGroupHeartbeatResponseData.Assignment targetAssignment = new
ShareGroupHeartbeatResponseData.Assignment()
- .setTopicPartitions(Collections.emptyList());
+ .setTopicPartitions(List.of());
ShareGroupHeartbeatResponse heartbeatResponse =
createShareGroupHeartbeatResponse(targetAssignment,
membershipManager.memberId());
membershipManager.onHeartbeatSuccess(heartbeatResponse);
}
@@ -1596,7 +1595,7 @@ public class ShareMembershipManagerTest {
assertTransitionToUnsubscribeOnHBSentAndWaitForResponseToCompleteLeave(membershipManager,
leaveResult);
assertEquals(-1, membershipManager.memberEpoch());
assertTrue(membershipManager.currentAssignment().isNone());
- verify(subscriptionState).assignFromSubscribed(Collections.emptySet());
+ verify(subscriptionState).assignFromSubscribed(Set.of());
}
private void mockLeaveGroup() {
@@ -1672,8 +1671,8 @@ public class ShareMembershipManagerTest {
Uuid topicId = Uuid.randomUuid();
ShareMembershipManager membershipManager =
mockJoinAndReceiveAssignment(true);
membershipManager.onHeartbeatRequestGenerated();
-
when(metadata.topicNames()).thenReturn(Collections.singletonMap(topicId,
"topic"));
- receiveAssignment(topicId, Collections.singletonList(0),
membershipManager);
+ when(metadata.topicNames()).thenReturn(Map.of(topicId, "topic"));
+ receiveAssignment(topicId, List.of(0), membershipManager);
membershipManager.onHeartbeatRequestGenerated();
assertFalse(membershipManager.currentAssignment().isNone());
return membershipManager;
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareSessionHandlerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareSessionHandlerTest.java
index 826978aab5a..b508b388010 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareSessionHandlerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareSessionHandlerTest.java
@@ -37,7 +37,6 @@ import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.MethodSource;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
@@ -338,10 +337,10 @@ public class ShareSessionHandlerTest {
// If we started with an ID, only a new ID will count towards replaced.
// The old topic ID partition should be forgotten, and the new one
should be fetched.
- assertEquals(Collections.singletonList(tp),
reqForgetList(requestData2, topicNames));
+ assertEquals(List.of(tp), reqForgetList(requestData2, topicNames));
assertMapsEqual(reqMap(new TopicIdPartition(topicId2, 0, "foo")),
handler.sessionPartitionMap());
- assertListEquals(Collections.singletonList(tp2),
reqFetchList(requestData2, topicNames));
+ assertListEquals(List.of(tp2), reqFetchList(requestData2, topicNames));
// Should have the same session ID, and next epoch and can use topic
IDs if it ended with topic IDs.
assertEquals(memberId.toString(), requestData2.memberId(), "Did not
use same session");
@@ -376,7 +375,7 @@ public class ShareSessionHandlerTest {
// Remove the topic from the session by setting acknowledgements only
- this is not asking to fetch records
ShareFetchRequestData requestData2 =
handler.newShareFetchBuilder(groupId, shareFetchConfig, false).build().data();
handler.addPartitionToAcknowledgeOnly(foo0, Acknowledgements.empty());
- assertEquals(Collections.singletonList(foo0),
reqForgetList(requestData2, topicNames));
+ assertEquals(List.of(foo0), reqForgetList(requestData2, topicNames));
// Should have the same session ID, next epoch, and same ID usage
assertEquals(memberId.toString(), requestData2.memberId(), "Did not
use same session");
@@ -410,7 +409,7 @@ public class ShareSessionHandlerTest {
// Remove the topic from the session
ShareFetchRequestData requestData2 =
handler.newShareFetchBuilder(groupId, shareFetchConfig, false).build().data();
- assertEquals(Collections.singletonList(foo0),
reqForgetList(requestData2, topicNames));
+ assertEquals(List.of(foo0), reqForgetList(requestData2, topicNames));
// Should have the same session ID, next epoch, and same ID usage
assertEquals(memberId.toString(), requestData2.memberId(), "Did not
use same session");
@@ -475,7 +474,7 @@ public class ShareSessionHandlerTest {
handler.addPartitionToFetch(foo0, acknowledgements);
// As we start with a ShareAcknowledge on epoch 0, we expect a null
response.
- assertNull(handler.newShareAcknowledgeBuilder(groupId,
shareFetchConfig));
+ assertNull(handler.newShareAcknowledgeBuilder(groupId));
// Attempt a new ShareFetch
TopicIdPartition foo1 = new TopicIdPartition(fooId, 1, "foo");