This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new f6ca0f69d69 MINOR: replace mkMap/mkEntry with Map.of/Map.entry in
clients module (#21726)
f6ca0f69d69 is described below
commit f6ca0f69d69ecb8bae1dcbfb3e9f76e5d50fa5e2
Author: Lan Ding <[email protected]>
AuthorDate: Thu Mar 19 00:53:03 2026 +0800
MINOR: replace mkMap/mkEntry with Map.of/Map.entry in clients module
(#21726)
see https://github.com/apache/kafka/pull/21573#discussion_r2867553866
Replace `Utils.mkMap()` calls with `Map.of()` equivalents throughout
the clients module. Use `LinkedHashMap` where iteration order matters
(toString tests, partition assignor logic).
Reviewers: Christo Lolov <[email protected]>, Mickael Maison
<[email protected]>, Nilesh Kumar
<[email protected]>, Chia-Ping Tsai <[email protected]>
---
.../kafka/clients/consumer/internals/Fetch.java | 9 +-
.../consumer/internals/FetchMetricsManager.java | 3 +
.../kafka/common/feature/BaseVersionRange.java | 2 +
.../common/serialization/ListDeserializer.java | 16 ++-
.../kafka/clients/admin/KafkaAdminClientTest.java | 24 ++---
.../kafka/clients/consumer/KafkaConsumerTest.java | 8 +-
.../kafka/clients/consumer/RangeAssignorTest.java | 18 ++--
.../internals/AbstractPartitionAssignorTest.java | 5 +-
.../internals/AbstractStickyAssignorTest.java | 18 ++--
.../consumer/internals/AsyncKafkaConsumerTest.java | 4 +-
.../internals/ConsumerCoordinatorTest.java | 16 ++-
.../internals/ConsumerMembershipManagerTest.java | 28 +----
.../consumer/internals/OffsetFetcherTest.java | 11 +-
.../internals/ShareMembershipManagerTest.java | 19 ++--
.../internals/StreamsMembershipManagerTest.java | 114 ++++++++++-----------
.../apache/kafka/common/feature/FeaturesTest.java | 35 +++----
.../common/feature/SupportedVersionRangeTest.java | 16 ++-
.../common/requests/ApiVersionsResponseTest.java | 11 +-
.../kafka/common/utils/FixedOrderMapTest.java | 7 +-
.../org/apache/kafka/common/utils/UtilsTest.java | 23 ++---
20 files changed, 170 insertions(+), 217 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetch.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetch.java
index fa45de7e2cb..1c680a7ee7a 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetch.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetch.java
@@ -27,9 +27,6 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
-import static org.apache.kafka.common.utils.Utils.mkEntry;
-import static org.apache.kafka.common.utils.Utils.mkMap;
-
public class Fetch<K, V> {
private final Map<TopicPartition, List<ConsumerRecord<K, V>>> records;
private final Map<TopicPartition, OffsetAndMetadata> nextOffsetAndMetadata;
@@ -47,9 +44,9 @@ public class Fetch<K, V> {
OffsetAndMetadata nextOffsetAndMetadata
) {
Map<TopicPartition, List<ConsumerRecord<K, V>>> recordsMap =
records.isEmpty()
- ? new HashMap<>()
- : mkMap(mkEntry(partition, records));
- Map<TopicPartition, OffsetAndMetadata> nextOffsetAndMetadataMap =
mkMap(mkEntry(partition, nextOffsetAndMetadata));
+ ? Map.of()
+ : Map.of(partition, records);
+ Map<TopicPartition, OffsetAndMetadata> nextOffsetAndMetadataMap =
Map.of(partition, nextOffsetAndMetadata);
return new Fetch<>(recordsMap, positionAdvanced, records.size(),
nextOffsetAndMetadataMap);
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchMetricsManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchMetricsManager.java
index 98644180e8b..b9a1af69359 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchMetricsManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchMetricsManager.java
@@ -34,6 +34,9 @@ import static org.apache.kafka.common.utils.Utils.mkMap;
* The {@link FetchMetricsManager} class provides wrapper methods to record
lag, lead, latency, and fetch metrics.
* It keeps an internal ID of the assigned set of partitions which is updated
to ensure the set of metrics it
* records matches up with the topic-partitions in use.
+ *
+ * <p>Note: metric tag maps use {@code Utils.mkMap} to preserve insertion
order; do not replace
+ * with {@code Map.of} as tag order affects JMX MBean names.
*/
public class FetchMetricsManager {
diff --git
a/clients/src/main/java/org/apache/kafka/common/feature/BaseVersionRange.java
b/clients/src/main/java/org/apache/kafka/common/feature/BaseVersionRange.java
index bc1121ba395..e73e63cd917 100644
---
a/clients/src/main/java/org/apache/kafka/common/feature/BaseVersionRange.java
+++
b/clients/src/main/java/org/apache/kafka/common/feature/BaseVersionRange.java
@@ -94,6 +94,8 @@ class BaseVersionRange {
mapToString(toMap()));
}
+ // Uses Utils.mkMap to preserve insertion order so that min version appears
+ // before max version when converted to a String via toString().
public Map<String, Short> toMap() {
return Utils.mkMap(Utils.mkEntry(minKeyLabel, min()),
Utils.mkEntry(maxKeyLabel, max()));
}
diff --git
a/clients/src/main/java/org/apache/kafka/common/serialization/ListDeserializer.java
b/clients/src/main/java/org/apache/kafka/common/serialization/ListDeserializer.java
index 77ff3e68419..c2cfce62c38 100644
---
a/clients/src/main/java/org/apache/kafka/common/serialization/ListDeserializer.java
+++
b/clients/src/main/java/org/apache/kafka/common/serialization/ListDeserializer.java
@@ -38,20 +38,18 @@ import java.util.List;
import java.util.Map;
import static
org.apache.kafka.common.serialization.Serdes.ListSerde.SerializationStrategy;
-import static org.apache.kafka.common.utils.Utils.mkEntry;
-import static org.apache.kafka.common.utils.Utils.mkMap;
public class ListDeserializer<Inner> implements Deserializer<List<Inner>> {
final Logger log = LoggerFactory.getLogger(ListDeserializer.class);
- private static final Map<Class<? extends Deserializer<?>>, Integer>
FIXED_LENGTH_DESERIALIZERS = mkMap(
- mkEntry(ShortDeserializer.class, Short.BYTES),
- mkEntry(IntegerDeserializer.class, Integer.BYTES),
- mkEntry(FloatDeserializer.class, Float.BYTES),
- mkEntry(LongDeserializer.class, Long.BYTES),
- mkEntry(DoubleDeserializer.class, Double.BYTES),
- mkEntry(UUIDDeserializer.class, 36)
+ private static final Map<Class<? extends Deserializer<?>>, Integer>
FIXED_LENGTH_DESERIALIZERS = Map.of(
+ ShortDeserializer.class, Short.BYTES,
+ IntegerDeserializer.class, Integer.BYTES,
+ FloatDeserializer.class, Float.BYTES,
+ LongDeserializer.class, Long.BYTES,
+ DoubleDeserializer.class, Double.BYTES,
+ UUIDDeserializer.class, 36
);
private Deserializer<Inner> inner;
diff --git
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 89d1890ab68..1bb7e2e171e 100644
---
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -767,9 +767,9 @@ public class KafkaAdminClientTest {
private static FeatureMetadata defaultFeatureMetadata() {
return new FeatureMetadata(
- Utils.mkMap(Utils.mkEntry("test_feature_1", new
FinalizedVersionRange((short) 2, (short) 2))),
+ Map.of("test_feature_1", new FinalizedVersionRange((short) 2,
(short) 2)),
Optional.of(1L),
- Utils.mkMap(Utils.mkEntry("test_feature_1", new
SupportedVersionRange((short) 1, (short) 5))));
+ Map.of("test_feature_1", new SupportedVersionRange((short) 1,
(short) 5)));
}
private static
Features<org.apache.kafka.common.feature.SupportedVersionRange>
convertSupportedFeaturesMap(Map<String, SupportedVersionRange> features) {
@@ -5259,7 +5259,7 @@ public class KafkaAdminClientTest {
ListConsumerGroupOffsetsSpec groupASpec = new
ListConsumerGroupOffsetsSpec().topicPartitions(groupAPartitions);
ListConsumerGroupOffsetsSpec groupBSpec = new
ListConsumerGroupOffsetsSpec().topicPartitions(groupBPartitions);
- return Utils.mkMap(Utils.mkEntry("groupA", groupASpec),
Utils.mkEntry("groupB", groupBSpec));
+ return Map.of("groupA", groupASpec, "groupB", groupBSpec);
}
private Map<String, ListStreamsGroupOffsetsSpec>
batchedListStreamsGroupOffsetsSpec() {
@@ -5268,7 +5268,7 @@ public class KafkaAdminClientTest {
ListStreamsGroupOffsetsSpec groupASpec = new
ListStreamsGroupOffsetsSpec().topicPartitions(groupAPartitions);
ListStreamsGroupOffsetsSpec groupBSpec = new
ListStreamsGroupOffsetsSpec().topicPartitions(groupBPartitions);
- return Utils.mkMap(Utils.mkEntry("groupA", groupASpec),
Utils.mkEntry("groupB", groupBSpec));
+ return Map.of("groupA", groupASpec, "groupB", groupBSpec);
}
private void waitForRequest(MockClient mockClient, ApiKeys apiKeys) throws
Exception {
@@ -8801,9 +8801,9 @@ public class KafkaAdminClientTest {
}
private Map<String, FeatureUpdate> makeTestFeatureUpdates() {
- return Utils.mkMap(
- Utils.mkEntry("test_feature_1", new FeatureUpdate((short) 2,
FeatureUpdate.UpgradeType.UPGRADE)),
- Utils.mkEntry("test_feature_2", new FeatureUpdate((short) 3,
FeatureUpdate.UpgradeType.SAFE_DOWNGRADE)));
+ return Map.of(
+ "test_feature_1", new FeatureUpdate((short) 2,
FeatureUpdate.UpgradeType.UPGRADE),
+ "test_feature_2", new FeatureUpdate((short) 3,
FeatureUpdate.UpgradeType.SAFE_DOWNGRADE));
}
private void testUpdateFeatures(Map<String, FeatureUpdate> featureUpdates,
@@ -8871,9 +8871,9 @@ public class KafkaAdminClientTest {
0),
env.cluster().nodeById(controllerId));
final KafkaFuture<Void> future = env.adminClient().updateFeatures(
- Utils.mkMap(
- Utils.mkEntry("test_feature_1", new FeatureUpdate((short)
2, FeatureUpdate.UpgradeType.UPGRADE)),
- Utils.mkEntry("test_feature_2", new FeatureUpdate((short)
3, FeatureUpdate.UpgradeType.SAFE_DOWNGRADE))),
+ Map.of(
+ "test_feature_1", new FeatureUpdate((short) 2,
FeatureUpdate.UpgradeType.UPGRADE),
+ "test_feature_2", new FeatureUpdate((short) 3,
FeatureUpdate.UpgradeType.SAFE_DOWNGRADE)),
new UpdateFeaturesOptions().timeoutMs(10000)
).all();
future.get();
@@ -8895,8 +8895,8 @@ public class KafkaAdminClientTest {
assertThrows(
IllegalArgumentException.class,
() -> env.adminClient().updateFeatures(
- Utils.mkMap(Utils.mkEntry("feature", new
FeatureUpdate((short) 2, FeatureUpdate.UpgradeType.UPGRADE)),
- Utils.mkEntry("", new FeatureUpdate((short) 2,
FeatureUpdate.UpgradeType.UPGRADE)))));
+ Map.of("feature", new FeatureUpdate((short) 2,
FeatureUpdate.UpgradeType.UPGRADE),
+ "", new FeatureUpdate((short) 2,
FeatureUpdate.UpgradeType.UPGRADE))));
}
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 9d1011601ab..f70a618bb65 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -1284,7 +1284,7 @@ public class KafkaConsumerTest {
Node coordinator = new Node(Integer.MAX_VALUE - node.id(),
node.host(), node.port());
// fetch offset for one topic
-
client.prepareResponseFrom(offsetResponse(Utils.mkMap(Utils.mkEntry(tp0,
offset1), Utils.mkEntry(tp1, -1L)), Errors.NONE), coordinator);
+ client.prepareResponseFrom(offsetResponse(Map.of(tp0, offset1, tp1,
-1L), Errors.NONE), coordinator);
final Map<TopicPartition, OffsetAndMetadata> committed =
consumer.committed(Set.of(tp0, tp1));
assertEquals(2, committed.size());
assertEquals(offset1, committed.get(tp0).offset());
@@ -2467,7 +2467,7 @@ public class KafkaConsumerTest {
ConsumerPartitionAssignor assignor = new CooperativeStickyAssignor();
KafkaConsumer<String, String> consumer = newConsumer(groupProtocol,
time, client, subscription, metadata, assignor, true, groupInstanceId);
- initMetadata(client, Utils.mkMap(Utils.mkEntry(topic, 1),
Utils.mkEntry(topic2, 1), Utils.mkEntry(topic3, 1)));
+ initMetadata(client, Map.of(topic, 1, topic2, 1, topic3, 1));
consumer.subscribe(Arrays.asList(topic, topic2),
getConsumerRebalanceListener(consumer));
@@ -3512,7 +3512,7 @@ public void testPollIdleRatio(GroupProtocol
groupProtocol) {
MockClient client = new MockClient(time, metadata);
KafkaConsumer<String, String> consumer = newConsumer(groupProtocol,
time, client, subscription, metadata, assignor, true, groupInstanceId);
MockRebalanceListener countingRebalanceListener = new
MockRebalanceListener();
- initMetadata(client, Utils.mkMap(Utils.mkEntry(topic, 1),
Utils.mkEntry(topic2, 1), Utils.mkEntry(topic3, 1)));
+ initMetadata(client, Map.of(topic, 1, topic2, 1, topic3, 1));
consumer.subscribe(Arrays.asList(topic, topic2),
countingRebalanceListener);
Node node = metadata.fetch().nodes().get(0);
@@ -3542,7 +3542,7 @@ public void testPollIdleRatio(GroupProtocol
groupProtocol) {
ConsumerMetadata metadata = createMetadata(subscription);
MockClient client = new MockClient(time, metadata);
- initMetadata(client, Utils.mkMap(Utils.mkEntry(topic, 1)));
+ initMetadata(client, Map.of(topic, 1));
Node node = metadata.fetch().nodes().get(0);
consumer = newConsumer(
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/RangeAssignorTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/RangeAssignorTest.java
index eb45d027977..5737553dc1a 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/RangeAssignorTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/RangeAssignorTest.java
@@ -46,8 +46,6 @@ import static
org.apache.kafka.clients.consumer.internals.AbstractPartitionAssig
import static
org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignorTest.nullRacks;
import static
org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignorTest.racks;
import static
org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignorTest.verifyRackAssignment;
-import static org.apache.kafka.common.utils.Utils.mkEntry;
-import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -388,7 +386,7 @@ public class RangeAssignorTest {
@Test
public void testRackAwareAssignmentWithUniformSubscription() {
- Map<String, Integer> topics = mkMap(mkEntry("t1", 6), mkEntry("t2",
7), mkEntry("t3", 2));
+ Map<String, Integer> topics = Map.of("t1", 6, "t2", 7, "t3", 2);
List<String> allTopics = asList("t1", "t2", "t3");
List<List<String>> consumerTopics = asList(allTopics, allTopics,
allTopics);
@@ -409,7 +407,7 @@ public class RangeAssignorTest {
@Test
public void testRackAwareAssignmentWithNonEqualSubscription() {
- Map<String, Integer> topics = mkMap(mkEntry("t1", 6), mkEntry("t2",
7), mkEntry("t3", 2));
+ Map<String, Integer> topics = Map.of("t1", 6, "t2", 7, "t3", 2);
List<String> allTopics = asList("t1", "t2", "t3");
List<List<String>> consumerTopics = asList(allTopics, allTopics,
asList("t1", "t3"));
@@ -430,7 +428,7 @@ public class RangeAssignorTest {
@Test
public void testRackAwareAssignmentWithUniformPartitions() {
- Map<String, Integer> topics = mkMap(mkEntry("t1", 5), mkEntry("t2",
5), mkEntry("t3", 5));
+ Map<String, Integer> topics = Map.of("t1", 5, "t2", 5, "t3", 5);
List<String> allTopics = asList("t1", "t2", "t3");
List<List<String>> consumerTopics = asList(allTopics, allTopics,
allTopics);
List<String> nonRackAwareAssignment = asList(
@@ -450,7 +448,7 @@ public class RangeAssignorTest {
@Test
public void
testRackAwareAssignmentWithUniformPartitionsNonEqualSubscription() {
- Map<String, Integer> topics = mkMap(mkEntry("t1", 5), mkEntry("t2",
5), mkEntry("t3", 5));
+ Map<String, Integer> topics = Map.of("t1", 5, "t2", 5, "t3", 5);
List<String> allTopics = asList("t1", "t2", "t3");
List<List<String>> consumerTopics = asList(allTopics, allTopics,
asList("t1", "t3"));
@@ -471,7 +469,7 @@ public class RangeAssignorTest {
@Test
public void testRackAwareAssignmentWithCoPartitioning() {
- Map<String, Integer> topics = mkMap(mkEntry("t1", 6), mkEntry("t2",
6), mkEntry("t3", 2), mkEntry("t4", 2));
+ Map<String, Integer> topics = Map.of("t1", 6, "t2", 6, "t3", 2, "t4",
2);
List<List<String>> consumerTopics = asList(asList("t1", "t2"),
asList("t1", "t2"), asList("t3", "t4"), asList("t3", "t4"));
List<String> consumerRacks = asList(ALL_RACKS[0], ALL_RACKS[1],
ALL_RACKS[1], ALL_RACKS[0]);
List<String> nonRackAwareAssignment = asList(
@@ -506,9 +504,9 @@ public class RangeAssignorTest {
@Test
public void testCoPartitionedAssignmentWithSameSubscription() {
- Map<String, Integer> topics = mkMap(mkEntry("t1", 6), mkEntry("t2", 6),
- mkEntry("t3", 2), mkEntry("t4", 2),
- mkEntry("t5", 4), mkEntry("t6", 4));
+ Map<String, Integer> topics = Map.of("t1", 6, "t2", 6,
+ "t3", 2, "t4", 2,
+ "t5", 4, "t6", 4);
List<String> topicList = asList("t1", "t2", "t3", "t4", "t5", "t6",
"t7", "t8", "t9");
List<List<String>> consumerTopics = asList(topicList, topicList,
topicList);
List<String> consumerRacks = asList(ALL_RACKS[0], ALL_RACKS[1],
ALL_RACKS[2]);
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractPartitionAssignorTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractPartitionAssignorTest.java
index 1ea9fcdf545..3f5352ee570 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractPartitionAssignorTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractPartitionAssignorTest.java
@@ -36,6 +36,7 @@ import java.util.Map.Entry;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
+import java.util.TreeMap;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@@ -258,7 +259,9 @@ public class AbstractPartitionAssignorTest {
List<String> brokerRacks) {
Map<String, List<PartitionInfo>> partitionsPerTopic = new HashMap<>();
int nextIndex = 0;
- for (Map.Entry<String, Integer> entry :
numPartitionsPerTopic.entrySet()) {
+ // Wrap with TreeMap to ensure deterministic iteration order, since
nextIndex
+ // accumulates across topics. This allows callers to use Map.of()
safely.
+ for (Map.Entry<String, Integer> entry : new
TreeMap<>(numPartitionsPerTopic).entrySet()) {
String topic = entry.getKey();
int numPartitions = entry.getValue();
partitionsPerTopic.put(topic, partitionInfos(topic, numPartitions,
replicationFactor, brokerRacks, nextIndex));
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java
index 4e9525264a0..81132564593 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java
@@ -54,8 +54,6 @@ import static
org.apache.kafka.clients.consumer.internals.AbstractPartitionAssig
import static
org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignorTest.racks;
import static
org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignorTest.verifyRackAssignment;
import static
org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.DEFAULT_GENERATION;
-import static org.apache.kafka.common.utils.Utils.mkEntry;
-import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
@@ -184,13 +182,11 @@ public abstract class AbstractStickyAssignorTest {
initializeRacks(rackConfig);
Map<String, List<PartitionInfo>> partitionsPerTopic = new HashMap<>();
partitionsPerTopic.put(topic, partitionInfos(topic, 2));
- subscriptions = mkMap(
- mkEntry(consumerId, buildSubscriptionV2Above(
- topics(topic),
- Arrays.asList(tp(topic, 0), tp(topic, 1),
tp(otherTopic, 0), tp(otherTopic, 1)),
- generationId, 0)
- )
- );
+ subscriptions = new HashMap<>();
+ subscriptions.put(consumerId, buildSubscriptionV2Above(
+ topics(topic),
+ List.of(tp(topic, 0), tp(topic, 1), tp(otherTopic, 0),
tp(otherTopic, 1)),
+ generationId, 0));
Map<String, List<TopicPartition>> assignment =
assignor.assignPartitions(partitionsPerTopic, subscriptions);
assertEquals(partitions(tp(topic, 0), tp(topic, 1)),
assignment.get(consumerId));
@@ -1272,7 +1268,7 @@ public abstract class AbstractStickyAssignorTest {
@Test
public void testRackAwareAssignmentWithUniformSubscription() {
- Map<String, Integer> topics = mkMap(mkEntry("t1", 6), mkEntry("t2",
7), mkEntry("t3", 2));
+ Map<String, Integer> topics = Map.of("t1", 6, "t2", 7, "t3", 2);
List<String> allTopics = asList("t1", "t2", "t3");
List<List<String>> consumerTopics = asList(allTopics, allTopics,
allTopics);
List<String> nonRackAwareAssignment = asList(
@@ -1339,7 +1335,7 @@ public abstract class AbstractStickyAssignorTest {
@Test
public void testRackAwareAssignmentWithNonEqualSubscription() {
- Map<String, Integer> topics = mkMap(mkEntry("t1", 6), mkEntry("t2",
7), mkEntry("t3", 2));
+ Map<String, Integer> topics = Map.of("t1", 6, "t2", 7, "t3", 2);
List<String> allTopics = asList("t1", "t2", "t3");
List<List<String>> consumerTopics = asList(allTopics, allTopics,
asList("t1", "t3"));
List<String> nonRackAwareAssignment = asList(
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
index ef939d9f7c2..c8efa6bb69f 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
@@ -135,8 +135,6 @@ import static
org.apache.kafka.clients.consumer.internals.ConsumerRebalanceListe
import static
org.apache.kafka.clients.consumer.internals.ConsumerRebalanceListenerMethodName.ON_PARTITIONS_REVOKED;
import static
org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_METRIC_GROUP;
import static
org.apache.kafka.clients.consumer.internals.ConsumerUtils.THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED;
-import static org.apache.kafka.common.utils.Utils.mkEntry;
-import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.test.TestUtils.requiredConsumerConfig;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -488,7 +486,7 @@ public class AsyncKafkaConsumerTest {
ConsumerRebalanceListener listener = new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(final Collection<TopicPartition>
partitions) {
- assertDoesNotThrow(() -> consumer.commitSync(mkMap(mkEntry(tp,
new OffsetAndMetadata(0)))));
+ assertDoesNotThrow(() -> consumer.commitSync(Map.of(tp, new
OffsetAndMetadata(0))));
callbackExecuted.set(true);
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index c07127a77d6..4ce2b2c7787 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -124,8 +124,6 @@ import static
org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Rebala
import static
org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.RebalanceProtocol.EAGER;
import static
org.apache.kafka.clients.consumer.CooperativeStickyAssignor.COOPERATIVE_STICKY_ASSIGNOR_NAME;
import static
org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.DEFAULT_GENERATION;
-import static org.apache.kafka.common.utils.Utils.mkEntry;
-import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
@@ -189,9 +187,9 @@ public abstract class ConsumerCoordinatorTest {
new IllegalStateException("Illegal state for assignment!"),
"throw-fatal-error-on-assignment-assignor");
this.assignors = Arrays.asList(partitionAssignor,
throwOnAssignmentAssignor, throwFatalErrorOnAssignmentAssignor);
- this.assignorMap = mkMap(mkEntry(partitionAssignor.name(),
partitionAssignor),
- mkEntry(throwOnAssignmentAssignor.name(),
throwOnAssignmentAssignor),
- mkEntry(throwFatalErrorOnAssignmentAssignor.name(),
throwFatalErrorOnAssignmentAssignor));
+ this.assignorMap = Map.of(partitionAssignor.name(), partitionAssignor,
+ throwOnAssignmentAssignor.name(), throwOnAssignmentAssignor,
+ throwFatalErrorOnAssignmentAssignor.name(),
throwFatalErrorOnAssignmentAssignor);
}
@BeforeEach
@@ -1478,7 +1476,7 @@ public abstract class ConsumerCoordinatorTest {
@Test
public void testRebalanceWithMetadataChange() {
MetadataResponse metadataResponse1 =
RequestTestUtils.metadataUpdateWith(1,
- Utils.mkMap(Utils.mkEntry(topic1, 1), Utils.mkEntry(topic2,
1)));
+ Map.of(topic1, 1, topic2, 1));
MetadataResponse metadataResponse2 =
RequestTestUtils.metadataUpdateWith(1, singletonMap(topic1, 1));
verifyRebalanceWithMetadataChange(Optional.empty(), partitionAssignor,
metadataResponse1, metadataResponse2, true);
}
@@ -2005,9 +2003,9 @@ public abstract class ConsumerCoordinatorTest {
// the leader is responsible for picking up metadata changes and
forcing a group rebalance.
// note that `MockPartitionAssignor.prepare` is not called therefore
calling `MockPartitionAssignor.assign`
// will throw a IllegalStateException. this indirectly verifies that
`assign` is correctly skipped.
- Map<String, List<String>> memberSubscriptions = mkMap(
- mkEntry(consumerId, singletonList(topic1)),
- mkEntry(consumerId2, singletonList(topic2))
+ Map<String, List<String>> memberSubscriptions = Map.of(
+ consumerId, List.of(topic1),
+ consumerId2, List.of(topic2)
);
client.prepareResponse(joinGroupLeaderResponse(1, consumerId,
memberSubscriptions, true, Errors.NONE, Optional.empty()));
client.prepareResponse(syncGroupResponse(singletonList(t1p),
Errors.NONE));
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java
index 15ac6513576..3a659970cdd 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java
@@ -75,8 +75,6 @@ import static
org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.inv
import static
org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_METRIC_GROUP_PREFIX;
import static
org.apache.kafka.clients.consumer.internals.ConsumerUtils.COORDINATOR_METRICS_SUFFIX;
import static
org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
-import static org.apache.kafka.common.utils.Utils.mkEntry;
-import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.common.utils.Utils.mkSortedSet;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -858,12 +856,7 @@ public class ConsumerMembershipManagerTest {
ConsumerMembershipManager membershipManager =
createMembershipManagerJoiningGroup();
when(subscriptionState.hasAutoAssignedPartitions()).thenReturn(true);
- when(metadata.topicNames()).thenReturn(
- mkMap(
- mkEntry(topicId1, topic1),
- mkEntry(topicId2, topic2)
- )
- );
+ when(metadata.topicNames()).thenReturn(Map.of(topicId1, topic1,
topicId2, topic2));
// Receive assignment with only topic1-0, getting stuck during commit.
final CompletableFuture<Void> commitFuture =
mockNewAssignmentAndRevocationStuckOnCommit(membershipManager, topicId1,
@@ -871,11 +864,7 @@ public class ConsumerMembershipManagerTest {
// New assignment adding a new topic2-0 (not in metadata).
// No reconciliation triggered, because another reconciliation is in
progress.
- Map<Uuid, SortedSet<Integer>> newAssignment =
- mkMap(
- mkEntry(topicId1, mkSortedSet(0)),
- mkEntry(topicId2, mkSortedSet(0))
- );
+ Map<Uuid, SortedSet<Integer>> newAssignment = Map.of(topicId1,
mkSortedSet(0), topicId2, mkSortedSet(0));
receiveAssignment(newAssignment, membershipManager);
membershipManager.maybeReconcile(false);
@@ -933,11 +922,7 @@ public class ConsumerMembershipManagerTest {
// New assignment adding a new topic2-0 (not in metadata).
// No reconciliation triggered, because new topic in assignment is
waiting for metadata.
- Map<Uuid, SortedSet<Integer>> newAssignment =
- mkMap(
- mkEntry(topicId1, mkSortedSet(0)),
- mkEntry(topicId2, mkSortedSet(0))
- );
+ Map<Uuid, SortedSet<Integer>> newAssignment = Map.of(topicId1,
mkSortedSet(0), topicId2, mkSortedSet(0));
receiveAssignment(newAssignment, membershipManager);
membershipManager.maybeReconcile(false);
@@ -957,10 +942,7 @@ public class ConsumerMembershipManagerTest {
// Metadata discovered for topic2. Should trigger reconciliation to
complete the assignment,
// with membership manager entering ACKNOWLEDGING state.
- Map<Uuid, String> fullTopicMetadata = mkMap(
- mkEntry(topicId1, topic1),
- mkEntry(topicId2, topic2)
- );
+ Map<Uuid, String> fullTopicMetadata = Map.of(topicId1, topic1,
topicId2, topic2);
when(metadata.topicNames()).thenReturn(fullTopicMetadata);
membershipManager.maybeReconcile(true);
@@ -2812,7 +2794,7 @@ public class ConsumerMembershipManagerTest {
private void mockOwnedPartition(ConsumerMembershipManager
membershipManager, Uuid topicId, String topic) {
int partition = 0;
TopicPartition previouslyOwned = new TopicPartition(topic, partition);
- membershipManager.updateAssignment(mkMap(mkEntry(topicId, new
TreeSet<>(Collections.singletonList(partition)))));
+ membershipManager.updateAssignment(Map.of(topicId, new
TreeSet<>(Collections.singletonList(partition))));
when(subscriptionState.assignedPartitions()).thenReturn(Collections.singleton(previouslyOwned));
when(subscriptionState.hasAutoAssignedPartitions()).thenReturn(true);
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherTest.java
index 182900c0207..4f917b66167 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherTest.java
@@ -58,7 +58,6 @@ import org.apache.kafka.common.requests.RequestTestUtils;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
-import org.apache.kafka.common.utils.Utils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
@@ -851,12 +850,12 @@ public class OffsetFetcherTest {
Map<TopicPartition, OffsetAndTimestamp> offsetAndTimestampMap =
offsetFetcher.offsetsForTimes(
- Utils.mkMap(Utils.mkEntry(tp0, fetchTimestamp),
- Utils.mkEntry(tp1, fetchTimestamp)),
time.timer(Integer.MAX_VALUE));
+ Map.of(tp0, fetchTimestamp, tp1, fetchTimestamp),
+ time.timer(Integer.MAX_VALUE));
- assertEquals(Utils.mkMap(
- Utils.mkEntry(tp0, new OffsetAndTimestamp(4L, fetchTimestamp)),
- Utils.mkEntry(tp1, new OffsetAndTimestamp(5L,
fetchTimestamp))), offsetAndTimestampMap);
+ assertEquals(Map.of(
+ tp0, new OffsetAndTimestamp(4L, fetchTimestamp),
+ tp1, new OffsetAndTimestamp(5L, fetchTimestamp)),
offsetAndTimestampMap);
// The NOT_LEADER exception future should not be cleared as we
already refreshed the metadata before
// first retry, thus never hitting.
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareMembershipManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareMembershipManagerTest.java
index bc782045577..f65b7fc301d 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareMembershipManagerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareMembershipManagerTest.java
@@ -56,8 +56,6 @@ import java.util.stream.Stream;
import static
org.apache.kafka.clients.consumer.internals.AbstractMembershipManager.TOPIC_PARTITION_COMPARATOR;
import static
org.apache.kafka.common.requests.ShareGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
-import static org.apache.kafka.common.utils.Utils.mkEntry;
-import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.common.utils.Utils.mkSortedSet;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -464,11 +462,10 @@ public class ShareMembershipManagerTest {
// New assignment adding a new topic2-0 (not in metadata).
// No reconciliation triggered, because new topic in assignment is
waiting for metadata.
- Map<Uuid, SortedSet<Integer>> newAssignment =
- mkMap(
- mkEntry(topicId1, mkSortedSet(0)),
- mkEntry(topicId2, mkSortedSet(0))
- );
+ Map<Uuid, SortedSet<Integer>> newAssignment = Map.of(
+ topicId1, mkSortedSet(0),
+ topicId2, mkSortedSet(0)
+ );
receiveAssignment(newAssignment, membershipManager);
membershipManager.poll(time.milliseconds());
@@ -488,9 +485,9 @@ public class ShareMembershipManagerTest {
// Metadata discovered for topic2. Should trigger reconciliation to
complete the assignment,
// with membership manager entering ACKNOWLEDGING state.
- Map<Uuid, String> fullTopicMetadata = mkMap(
- mkEntry(topicId1, topic1),
- mkEntry(topicId2, topic2)
+ Map<Uuid, String> fullTopicMetadata = Map.of(
+ topicId1, topic1,
+ topicId2, topic2
);
when(metadata.topicNames()).thenReturn(fullTopicMetadata);
@@ -1491,7 +1488,7 @@ public class ShareMembershipManagerTest {
private void mockOwnedPartition(ShareMembershipManager membershipManager,
Uuid topicId, String topic) {
int partition = 0;
TopicPartition previouslyOwned = new TopicPartition(topic, partition);
- membershipManager.updateAssignment(mkMap(mkEntry(topicId, new
TreeSet<>(List.of(partition)))));
+ membershipManager.updateAssignment(Map.of(topicId, new
TreeSet<>(List.of(partition))));
when(subscriptionState.assignedPartitions()).thenReturn(Set.of(previouslyOwned));
when(subscriptionState.hasAutoAssignedPartitions()).thenReturn(true);
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManagerTest.java
index 50e582f5cc4..4b6ee98090e 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManagerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManagerTest.java
@@ -47,8 +47,8 @@ import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import java.util.Collection;
-import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.SortedSet;
@@ -61,8 +61,6 @@ import java.util.stream.Collectors;
import static
org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_METRIC_GROUP_PREFIX;
import static
org.apache.kafka.clients.consumer.internals.ConsumerUtils.COORDINATOR_METRICS_SUFFIX;
import static
org.apache.kafka.common.requests.ShareGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
-import static org.apache.kafka.common.utils.Utils.mkEntry;
-import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
@@ -165,17 +163,17 @@ public class StreamsMembershipManagerTest {
@Test
public void testActiveTasksAreNullInHeartbeatResponse() {
- testTasksAreNullInHeartbeatResponse(null, Collections.emptyList(),
Collections.emptyList());
+ testTasksAreNullInHeartbeatResponse(null, List.of(), List.of());
}
@Test
public void testStandbyTasksAreNullInHeartbeatResponse() {
- testTasksAreNullInHeartbeatResponse(Collections.emptyList(), null,
Collections.emptyList());
+ testTasksAreNullInHeartbeatResponse(List.of(), null, List.of());
}
@Test
public void testWarmupTasksAreNullInHeartbeatResponse() {
- testTasksAreNullInHeartbeatResponse(Collections.emptyList(),
Collections.emptyList(), null);
+ testTasksAreNullInHeartbeatResponse(List.of(), List.of(), null);
}
private void testTasksAreNullInHeartbeatResponse(final
List<StreamsGroupHeartbeatResponseData.TaskIds> activeTasks,
@@ -1950,9 +1948,9 @@ public class StreamsMembershipManagerTest {
);
final StreamsGroupHeartbeatResponse response = makeHeartbeatResponse(
- Collections.emptyList(),
- Collections.emptyList(),
- Collections.emptyList(),
+ List.of(),
+ List.of(),
+ List.of(),
MEMBER_EPOCH,
statuses
);
@@ -1982,9 +1980,9 @@ public class StreamsMembershipManagerTest {
);
final StreamsGroupHeartbeatResponse response = makeHeartbeatResponse(
- Collections.emptyList(),
- Collections.emptyList(),
- Collections.emptyList(),
+ List.of(),
+ List.of(),
+ List.of(),
MEMBER_EPOCH,
statuses
);
@@ -2014,9 +2012,9 @@ public class StreamsMembershipManagerTest {
);
final StreamsGroupHeartbeatResponse response = makeHeartbeatResponse(
- Collections.emptyList(),
- Collections.emptyList(),
- Collections.emptyList(),
+ List.of(),
+ List.of(),
+ List.of(),
MEMBER_EPOCH,
statuses
);
@@ -2046,9 +2044,9 @@ public class StreamsMembershipManagerTest {
);
final StreamsGroupHeartbeatResponse response = makeHeartbeatResponse(
- Collections.emptyList(),
- Collections.emptyList(),
- Collections.emptyList(),
+ List.of(),
+ List.of(),
+ List.of(),
MEMBER_EPOCH,
statuses
);
@@ -2072,9 +2070,9 @@ public class StreamsMembershipManagerTest {
joining();
final StreamsGroupHeartbeatResponse response = makeHeartbeatResponse(
- Collections.emptyList(),
- Collections.emptyList(),
- Collections.emptyList(),
+ List.of(),
+ List.of(),
+ List.of(),
MEMBER_EPOCH,
null
);
@@ -2104,9 +2102,9 @@ public class StreamsMembershipManagerTest {
);
final StreamsGroupHeartbeatResponse response = makeHeartbeatResponse(
- Collections.emptyList(),
- Collections.emptyList(),
- Collections.emptyList(),
+ List.of(),
+ List.of(),
+ List.of(),
MEMBER_EPOCH,
statuses
);
@@ -2130,9 +2128,9 @@ public class StreamsMembershipManagerTest {
joining();
final StreamsGroupHeartbeatResponse responseWithTasks =
makeHeartbeatResponse(
- Collections.emptyList(),
- Collections.emptyList(),
- Collections.emptyList(),
+ List.of(),
+ List.of(),
+ List.of(),
MEMBER_EPOCH,
null
);
@@ -2317,16 +2315,14 @@ public class StreamsMembershipManagerTest {
private void
setupStreamsRebalanceDataWithOneSubtopologyOneSourceTopic(final String
subtopologyId,
final String topicName) {
lenient().when(streamsRebalanceData.subtopologies()).thenReturn(
- mkMap(
- mkEntry(
- subtopologyId,
- new StreamsRebalanceData.Subtopology(
- Set.of(topicName),
- Set.of(),
- Collections.emptyMap(),
- Collections.emptyMap(),
- Collections.emptyList()
- )
+ Map.of(
+ subtopologyId,
+ new StreamsRebalanceData.Subtopology(
+ Set.of(topicName),
+ Set.of(),
+ Map.of(),
+ Map.of(),
+ List.of()
)
)
);
@@ -2337,26 +2333,22 @@ public class StreamsMembershipManagerTest {
final String
subtopologyId2,
final String
topicName2) {
lenient().when(streamsRebalanceData.subtopologies()).thenReturn(
- mkMap(
- mkEntry(
- subtopologyId1,
- new StreamsRebalanceData.Subtopology(
- Set.of(topicName1),
- Set.of(),
- Collections.emptyMap(),
- Collections.emptyMap(),
- Collections.emptyList()
- )
+ Map.of(
+ subtopologyId1,
+ new StreamsRebalanceData.Subtopology(
+ Set.of(topicName1),
+ Set.of(),
+ Map.of(),
+ Map.of(),
+ List.of()
),
- mkEntry(
- subtopologyId2,
- new StreamsRebalanceData.Subtopology(
- Set.of(topicName2),
- Set.of(),
- Collections.emptyMap(),
- Collections.emptyMap(),
- Collections.emptyList()
- )
+ subtopologyId2,
+ new StreamsRebalanceData.Subtopology(
+ Set.of(topicName2),
+ Set.of(),
+ Map.of(),
+ Map.of(),
+ List.of()
)
)
);
@@ -2388,13 +2380,13 @@ public class StreamsMembershipManagerTest {
private StreamsGroupHeartbeatResponse
makeHeartbeatResponseWithStandbyTasks(final String subtopologyId,
final List<Integer> partitions) {
return makeHeartbeatResponse(
- Collections.emptyList(),
+ List.of(),
List.of(
new StreamsGroupHeartbeatResponseData.TaskIds()
.setSubtopologyId(subtopologyId)
.setPartitions(partitions)
),
- Collections.emptyList(),
+ List.of(),
MEMBER_EPOCH
);
}
@@ -2402,8 +2394,8 @@ public class StreamsMembershipManagerTest {
private StreamsGroupHeartbeatResponse
makeHeartbeatResponseWithWarmupTasks(final String subtopologyId,
final List<Integer> partitions) {
return makeHeartbeatResponse(
- Collections.emptyList(),
- Collections.emptyList(),
+ List.of(),
+ List.of(),
List.of(
new StreamsGroupHeartbeatResponseData.TaskIds()
.setSubtopologyId(subtopologyId)
@@ -2430,7 +2422,7 @@ public class StreamsMembershipManagerTest {
private StreamsGroupHeartbeatResponse
makeHeartbeatResponseWithActiveTasks(final
List<StreamsGroupHeartbeatResponseData.TaskIds> activeTasks,
final int memberEpoch) {
- return makeHeartbeatResponse(activeTasks, Collections.emptyList(),
Collections.emptyList(), memberEpoch);
+ return makeHeartbeatResponse(activeTasks, List.of(), List.of(),
memberEpoch);
}
private StreamsGroupHeartbeatResponse makeHeartbeatResponse(final
List<StreamsGroupHeartbeatResponseData.TaskIds> activeTasks,
diff --git
a/clients/src/test/java/org/apache/kafka/common/feature/FeaturesTest.java
b/clients/src/test/java/org/apache/kafka/common/feature/FeaturesTest.java
index eb66625a2f7..cd2aa221c49 100644
--- a/clients/src/test/java/org/apache/kafka/common/feature/FeaturesTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/feature/FeaturesTest.java
@@ -22,8 +22,6 @@ import org.junit.jupiter.api.Test;
import java.util.HashMap;
import java.util.Map;
-import static org.apache.kafka.common.utils.Utils.mkEntry;
-import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
@@ -53,8 +51,7 @@ public class FeaturesTest {
public void testGetAllFeaturesAPI() {
SupportedVersionRange v1 = new SupportedVersionRange((short) 1,
(short) 2);
SupportedVersionRange v2 = new SupportedVersionRange((short) 3,
(short) 4);
- Map<String, SupportedVersionRange> allFeatures =
- mkMap(mkEntry("feature_1", v1), mkEntry("feature_2", v2));
+ Map<String, SupportedVersionRange> allFeatures = Map.of("feature_1",
v1, "feature_2", v2);
Features<SupportedVersionRange> features =
Features.supportedFeatures(allFeatures);
assertEquals(allFeatures, features.features());
}
@@ -63,7 +60,7 @@ public class FeaturesTest {
public void testGetAPI() {
SupportedVersionRange v1 = new SupportedVersionRange((short) 1,
(short) 2);
SupportedVersionRange v2 = new SupportedVersionRange((short) 3,
(short) 4);
- Map<String, SupportedVersionRange> allFeatures =
mkMap(mkEntry("feature_1", v1), mkEntry("feature_2", v2));
+ Map<String, SupportedVersionRange> allFeatures = Map.of("feature_1",
v1, "feature_2", v2);
Features<SupportedVersionRange> features =
Features.supportedFeatures(allFeatures);
assertEquals(v1, features.get("feature_1"));
assertEquals(v2, features.get("feature_2"));
@@ -74,13 +71,13 @@ public class FeaturesTest {
public void testFromFeaturesMapToFeaturesMap() {
SupportedVersionRange v1 = new SupportedVersionRange((short) 1,
(short) 2);
SupportedVersionRange v2 = new SupportedVersionRange((short) 3,
(short) 4);
- Map<String, SupportedVersionRange> allFeatures =
mkMap(mkEntry("feature_1", v1), mkEntry("feature_2", v2));
+ Map<String, SupportedVersionRange> allFeatures = Map.of("feature_1",
v1, "feature_2", v2);
Features<SupportedVersionRange> features =
Features.supportedFeatures(allFeatures);
- Map<String, Map<String, Short>> expected = mkMap(
- mkEntry("feature_1", mkMap(mkEntry("min_version", (short) 1),
mkEntry("max_version", (short) 2))),
- mkEntry("feature_2", mkMap(mkEntry("min_version", (short) 3),
mkEntry("max_version", (short) 4))));
+ Map<String, Map<String, Short>> expected = Map.of(
+ "feature_1", Map.of("min_version", (short) 1, "max_version",
(short) 2),
+ "feature_2", Map.of("min_version", (short) 3, "max_version",
(short) 4));
assertEquals(expected, features.toMap());
assertEquals(features, Features.fromSupportedFeaturesMap(expected));
}
@@ -89,21 +86,19 @@ public class FeaturesTest {
public void testToStringSupportedFeatures() {
SupportedVersionRange v1 = new SupportedVersionRange((short) 1,
(short) 2);
SupportedVersionRange v2 = new SupportedVersionRange((short) 3,
(short) 4);
- Map<String, SupportedVersionRange> allFeatures
- = mkMap(mkEntry("feature_1", v1), mkEntry("feature_2", v2));
+ Features<SupportedVersionRange> features = Features.supportedFeatures(
+ Map.of("feature_1", v1, "feature_2", v2));
- Features<SupportedVersionRange> features =
Features.supportedFeatures(allFeatures);
-
- assertEquals(
- "Features{(feature_1 -> SupportedVersionRange[min_version:1,
max_version:2]), (feature_2 -> SupportedVersionRange[min_version:3,
max_version:4])}",
- features.toString());
+ String result = features.toString();
+ assertTrue(result.startsWith("Features{"));
+ assertTrue(result.contains("(feature_1 ->
SupportedVersionRange[min_version:1, max_version:2])"));
+ assertTrue(result.contains("(feature_2 ->
SupportedVersionRange[min_version:3, max_version:4])"));
}
@Test
public void
testSupportedFeaturesFromMapFailureWithInvalidMissingMaxVersion() {
// This is invalid because 'max_version' key is missing.
- Map<String, Map<String, Short>> invalidFeatures = mkMap(
- mkEntry("feature_1", mkMap(mkEntry("min_version", (short) 1))));
+ Map<String, Map<String, Short>> invalidFeatures = Map.of("feature_1",
Map.of("min_version", (short) 1));
assertThrows(
IllegalArgumentException.class,
() -> Features.fromSupportedFeaturesMap(invalidFeatures));
@@ -112,13 +107,13 @@ public class FeaturesTest {
@Test
public void testEquals() {
SupportedVersionRange v1 = new SupportedVersionRange((short) 1,
(short) 2);
- Map<String, SupportedVersionRange> allFeatures =
mkMap(mkEntry("feature_1", v1));
+ Map<String, SupportedVersionRange> allFeatures = Map.of("feature_1",
v1);
Features<SupportedVersionRange> features =
Features.supportedFeatures(allFeatures);
Features<SupportedVersionRange> featuresClone =
Features.supportedFeatures(allFeatures);
assertEquals(features, featuresClone);
SupportedVersionRange v2 = new SupportedVersionRange((short) 1,
(short) 3);
- Map<String, SupportedVersionRange> allFeaturesDifferent =
mkMap(mkEntry("feature_1", v2));
+ Map<String, SupportedVersionRange> allFeaturesDifferent =
Map.of("feature_1", v2);
Features<SupportedVersionRange> featuresDifferent =
Features.supportedFeatures(allFeaturesDifferent);
assertNotEquals(features, featuresDifferent);
diff --git
a/clients/src/test/java/org/apache/kafka/common/feature/SupportedVersionRangeTest.java
b/clients/src/test/java/org/apache/kafka/common/feature/SupportedVersionRangeTest.java
index 9bc6f05106e..0052769398f 100644
---
a/clients/src/test/java/org/apache/kafka/common/feature/SupportedVersionRangeTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/feature/SupportedVersionRangeTest.java
@@ -21,8 +21,6 @@ import org.junit.jupiter.api.Test;
import java.util.Map;
-import static org.apache.kafka.common.utils.Utils.mkEntry;
-import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
@@ -62,7 +60,7 @@ public class SupportedVersionRangeTest {
Map<String, Short> versionRangeMap = versionRange.toMap();
assertEquals(
- mkMap(mkEntry("min_version", versionRange.min()),
mkEntry("max_version", versionRange.max())),
+ Map.of("min_version", versionRange.min(), "max_version",
versionRange.max()),
versionRangeMap);
SupportedVersionRange newVersionRange =
SupportedVersionRange.fromMap(versionRangeMap);
@@ -75,42 +73,42 @@ public class SupportedVersionRangeTest {
public void testFromMapFailure() {
// min_version can't be < 0.
Map<String, Short> invalidWithBadMinVersion =
- mkMap(mkEntry("min_version", (short) -1), mkEntry("max_version",
(short) 0));
+ Map.of("min_version", (short) -1, "max_version", (short) 0);
assertThrows(
IllegalArgumentException.class,
() -> SupportedVersionRange.fromMap(invalidWithBadMinVersion));
// max_version can't be < 0.
Map<String, Short> invalidWithBadMaxVersion =
- mkMap(mkEntry("min_version", (short) 0), mkEntry("max_version",
(short) -1));
+ Map.of("min_version", (short) 0, "max_version", (short) -1);
assertThrows(
IllegalArgumentException.class,
() -> SupportedVersionRange.fromMap(invalidWithBadMaxVersion));
// min_version and max_version can't be < 0.
Map<String, Short> invalidWithBadMinMaxVersion =
- mkMap(mkEntry("min_version", (short) -1), mkEntry("max_version",
(short) -1));
+ Map.of("min_version", (short) -1, "max_version", (short) -1);
assertThrows(
IllegalArgumentException.class,
() -> SupportedVersionRange.fromMap(invalidWithBadMinMaxVersion));
// min_version can't be > max_version.
Map<String, Short> invalidWithLowerMaxVersion =
- mkMap(mkEntry("min_version", (short) 2), mkEntry("max_version",
(short) 1));
+ Map.of("min_version", (short) 2, "max_version", (short) 1);
assertThrows(
IllegalArgumentException.class,
() -> SupportedVersionRange.fromMap(invalidWithLowerMaxVersion));
// min_version key missing.
Map<String, Short> invalidWithMinKeyMissing =
- mkMap(mkEntry("max_version", (short) 1));
+ Map.of("max_version", (short) 1);
assertThrows(
IllegalArgumentException.class,
() -> SupportedVersionRange.fromMap(invalidWithMinKeyMissing));
// max_version key missing.
Map<String, Short> invalidWithMaxKeyMissing =
- mkMap(mkEntry("min_version", (short) 1));
+ Map.of("min_version", (short) 1);
assertThrows(
IllegalArgumentException.class,
() -> SupportedVersionRange.fromMap(invalidWithMaxKeyMissing));
diff --git
a/clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java
b/clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java
index c94d021ef21..ae1755a035c 100644
---
a/clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java
@@ -25,7 +25,6 @@ import
org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersion;
import
org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersionCollection;
import
org.apache.kafka.common.message.ApiVersionsResponseData.SupportedFeatureKey;
import org.apache.kafka.common.protocol.ApiKeys;
-import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Test;
@@ -95,15 +94,15 @@ public class ApiVersionsResponseTest {
final ApiKeys nonForwardableAPIKey = ApiKeys.JOIN_GROUP;
final short minVersion = 2;
final short maxVersion = 3;
- Map<ApiKeys, ApiVersion> activeControllerApiVersions = Utils.mkMap(
- Utils.mkEntry(forwardableAPIKey, new ApiVersion()
+ Map<ApiKeys, ApiVersion> activeControllerApiVersions = Map.of(
+ forwardableAPIKey, new ApiVersion()
.setApiKey(forwardableAPIKey.id)
.setMinVersion(minVersion)
- .setMaxVersion(maxVersion)),
- Utils.mkEntry(nonForwardableAPIKey, new ApiVersion()
+ .setMaxVersion(maxVersion),
+ nonForwardableAPIKey, new ApiVersion()
.setApiKey(nonForwardableAPIKey.id)
.setMinVersion(minVersion)
- .setMaxVersion(maxVersion))
+ .setMaxVersion(maxVersion)
);
ApiVersionCollection commonResponse =
ApiVersionsResponse.intersectForwardableApis(
diff --git
a/clients/src/test/java/org/apache/kafka/common/utils/FixedOrderMapTest.java
b/clients/src/test/java/org/apache/kafka/common/utils/FixedOrderMapTest.java
index e07be6ab64b..4365de3d497 100644
--- a/clients/src/test/java/org/apache/kafka/common/utils/FixedOrderMapTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/utils/FixedOrderMapTest.java
@@ -21,7 +21,6 @@ import org.junit.jupiter.api.Test;
import java.util.Iterator;
import java.util.Map;
-import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -36,9 +35,9 @@ public class FixedOrderMapTest {
map.put("c", 2);
map.put("b", 3);
final Iterator<Map.Entry<String, Integer>> iterator =
map.entrySet().iterator();
- assertEquals(mkEntry("a", 0), iterator.next());
- assertEquals(mkEntry("b", 3), iterator.next());
- assertEquals(mkEntry("c", 2), iterator.next());
+ assertEquals(Map.entry("a", 0), iterator.next());
+ assertEquals(Map.entry("b", 3), iterator.next());
+ assertEquals(Map.entry("c", 2), iterator.next());
assertFalse(iterator.hasNext());
}
diff --git a/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
b/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
index 2f325d7bcf0..8870e26a259 100755
--- a/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
@@ -79,7 +79,6 @@ import static org.apache.kafka.common.utils.Utils.formatBytes;
import static org.apache.kafka.common.utils.Utils.getHost;
import static org.apache.kafka.common.utils.Utils.getPort;
import static org.apache.kafka.common.utils.Utils.intersection;
-import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.murmur2;
import static org.apache.kafka.common.utils.Utils.union;
import static org.apache.kafka.common.utils.Utils.validHostPattern;
@@ -1269,11 +1268,11 @@ public class UtilsTest {
recordingCallable(recorded, "valid-2", null),
recordingCallable(recorded, null, new TestException("exception-3"))
));
- Map<String, Object> expected = Utils.mkMap(
- mkEntry("valid-0", "valid-0"),
- mkEntry("exception-1", new TestException("exception-1")),
- mkEntry("valid-2", "valid-2"),
- mkEntry("exception-3", new TestException("exception-3"))
+ Map<String, Object> expected = Map.of(
+ "valid-0", "valid-0",
+ "exception-1", new TestException("exception-1"),
+ "valid-2", "valid-2",
+ "exception-3", new TestException("exception-3")
);
assertEquals(expected, recorded);
@@ -1282,9 +1281,9 @@ public class UtilsTest {
recordingCallable(recorded, "valid-0", null),
recordingCallable(recorded, "valid-1", null)
));
- expected = Utils.mkMap(
- mkEntry("valid-0", "valid-0"),
- mkEntry("valid-1", "valid-1")
+ expected = Map.of(
+ "valid-0", "valid-0",
+ "valid-1", "valid-1"
);
assertEquals(expected, recorded);
@@ -1293,9 +1292,9 @@ public class UtilsTest {
recordingCallable(recorded, null, new
TestException("exception-0")),
recordingCallable(recorded, null, new
TestException("exception-1")))
);
- expected = Utils.mkMap(
- mkEntry("exception-0", new TestException("exception-0")),
- mkEntry("exception-1", new TestException("exception-1"))
+ expected = Map.of(
+ "exception-0", new TestException("exception-0"),
+ "exception-1", new TestException("exception-1")
);
assertEquals(expected, recorded);
}