This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new a1c155ee283 KAFKA-20297 Cleanup
`org.apache.kafka.common.utils.CollectionUtils` (#21818)
a1c155ee283 is described below
commit a1c155ee2835b5e6a6fdd2db5da14f83af5a4da3
Author: Eric Chang <[email protected]>
AuthorDate: Mon Mar 30 17:40:21 2026 +0800
KAFKA-20297 Cleanup `org.apache.kafka.common.utils.CollectionUtils` (#21818)
This PR removes `CollectionUtils` entirely (KAFKA-20297) by replacing
all usages with alternatives.
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../admin/internals/DescribeProducersHandler.java | 17 ++--
.../admin/internals/ListOffsetsHandler.java | 21 +++--
.../kafka/clients/consumer/StickyAssignor.java | 9 ++-
.../OAuthBearerExtensionsValidatorCallback.java | 11 ++-
.../apache/kafka/common/utils/CollectionUtils.java | 93 ----------------------
.../common/message/DescribeProducersRequest.json | 2 +-
.../internals/DescribeProducersHandlerTest.java | 31 ++------
.../kafka/clients/consumer/StickyAssignorTest.java | 7 +-
.../internals/AbstractStickyAssignorTest.java | 12 +--
.../kafka/common/utils/CollectionUtilsTest.java | 65 ---------------
.../kafka/api/AuthorizerIntegrationTest.scala | 4 +-
.../scala/unit/kafka/server/KafkaApisTest.scala | 4 +-
.../scala/unit/kafka/server/RequestQuotaTest.scala | 4 +-
13 files changed, 59 insertions(+), 221 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeProducersHandler.java
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeProducersHandler.java
index 3ae5638423c..526d0d227e2 100644
---
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeProducersHandler.java
+++
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeProducersHandler.java
@@ -30,7 +30,6 @@ import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.ApiError;
import org.apache.kafka.common.requests.DescribeProducersRequest;
import org.apache.kafka.common.requests.DescribeProducersResponse;
-import org.apache.kafka.common.utils.CollectionUtils;
import org.apache.kafka.common.utils.LogContext;
import org.slf4j.Logger;
@@ -89,15 +88,17 @@ public class DescribeProducersHandler extends
AdminApiHandler.Batched<TopicParti
Set<TopicPartition> topicPartitions
) {
DescribeProducersRequestData request = new
DescribeProducersRequestData();
- DescribeProducersRequest.Builder builder = new
DescribeProducersRequest.Builder(request);
- CollectionUtils.groupPartitionsByTopic(
- topicPartitions,
- builder::addTopic,
- (topicRequest, partitionId) ->
topicRequest.partitionIndexes().add(partitionId)
- );
+ for (TopicPartition tp : topicPartitions) {
+ DescribeProducersRequestData.TopicRequest topicRequest =
request.topics().find(tp.topic());
+ if (topicRequest == null) {
+ topicRequest = new
DescribeProducersRequestData.TopicRequest().setName(tp.topic());
+ request.topics().add(topicRequest);
+ }
+ topicRequest.partitionIndexes().add(tp.partition());
+ }
- return builder;
+ return new DescribeProducersRequest.Builder(request);
}
private void handlePartitionError(
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListOffsetsHandler.java
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListOffsetsHandler.java
index c03a6c5bee0..742c8a276f0 100644
---
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListOffsetsHandler.java
+++
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListOffsetsHandler.java
@@ -32,7 +32,6 @@ import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.ListOffsetsRequest;
import org.apache.kafka.common.requests.ListOffsetsResponse;
-import org.apache.kafka.common.utils.CollectionUtils;
import org.apache.kafka.common.utils.LogContext;
import org.slf4j.Logger;
@@ -80,17 +79,15 @@ public final class ListOffsetsHandler extends
Batched<TopicPartition, ListOffset
@Override
ListOffsetsRequest.Builder buildBatchedRequest(int brokerId,
Set<TopicPartition> keys) {
- Map<String, ListOffsetsTopic> topicsByName =
CollectionUtils.groupPartitionsByTopic(
- keys,
- topicName -> new ListOffsetsTopic().setName(topicName),
- (listOffsetsTopic, partitionId) -> {
- TopicPartition topicPartition = new
TopicPartition(listOffsetsTopic.name(), partitionId);
- long offsetTimestamp =
offsetTimestampsByPartition.get(topicPartition);
- listOffsetsTopic.partitions().add(
- new ListOffsetsPartition()
- .setPartitionIndex(partitionId)
- .setTimestamp(offsetTimestamp));
- });
+ Map<String, ListOffsetsTopic> topicsByName = new HashMap<>();
+ for (TopicPartition topicPartition : keys) {
+ ListOffsetsTopic topic = topicsByName.computeIfAbsent(
+ topicPartition.topic(), t -> new
ListOffsetsTopic().setName(t));
+ long offsetTimestamp =
offsetTimestampsByPartition.get(topicPartition);
+ topic.partitions().add(new ListOffsetsPartition()
+ .setPartitionIndex(topicPartition.partition())
+ .setTimestamp(offsetTimestamp));
+ }
boolean supportsMaxTimestamp = keys
.stream()
.anyMatch(key -> offsetTimestampsByPartition.get(key) ==
ListOffsetsRequest.MAX_TIMESTAMP);
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java
index c7560ca3898..4e3cc855fed 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java
@@ -23,11 +23,11 @@ import org.apache.kafka.common.protocol.types.Field;
import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.protocol.types.Type;
-import org.apache.kafka.common.utils.CollectionUtils;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -230,7 +230,11 @@ public class StickyAssignor extends AbstractStickyAssignor
{
static ByteBuffer serializeTopicPartitionAssignment(MemberData memberData)
{
Struct struct = new Struct(STICKY_ASSIGNOR_USER_DATA_V1);
List<Struct> topicAssignments = new ArrayList<>();
- for (Map.Entry<String, List<Integer>> topicEntry :
CollectionUtils.groupPartitionsByTopic(memberData.partitions).entrySet()) {
+ var partitionsByTopic = new HashMap<String, List<Integer>>();
+ for (TopicPartition tp : memberData.partitions) {
+ partitionsByTopic.computeIfAbsent(tp.topic(), t -> new
ArrayList<>()).add(tp.partition());
+ }
+ for (Map.Entry<String, List<Integer>> topicEntry :
partitionsByTopic.entrySet()) {
Struct topicAssignment = new Struct(TOPIC_ASSIGNMENT);
topicAssignment.set(TOPIC_KEY_NAME, topicEntry.getKey());
topicAssignment.set(PARTITIONS_KEY_NAME,
topicEntry.getValue().toArray());
@@ -272,4 +276,5 @@ public class StickyAssignor extends AbstractStickyAssignor {
Optional<Integer> generation = struct.hasField(GENERATION_KEY_NAME) ?
Optional.of(struct.getInt(GENERATION_KEY_NAME)) : Optional.empty();
return new MemberData(partitions, generation);
}
+
}
\ No newline at end of file
diff --git
a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerExtensionsValidatorCallback.java
b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerExtensionsValidatorCallback.java
index e17cfd1746d..7874833e27a 100644
---
a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerExtensionsValidatorCallback.java
+++
b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerExtensionsValidatorCallback.java
@@ -25,8 +25,6 @@ import java.util.Objects;
import javax.security.auth.callback.Callback;
-import static org.apache.kafka.common.utils.CollectionUtils.subtractMap;
-
/**
* A {@code Callback} for use by the {@code SaslServer} implementation when it
* needs to validate the SASL extensions for the OAUTHBEARER mechanism
@@ -87,7 +85,14 @@ public class OAuthBearerExtensionsValidatorCallback
implements Callback {
* @return An immutable {@link Map} consisting of the extensions that have
neither been validated nor invalidated
*/
public Map<String, String> ignoredExtensions() {
- return
Collections.unmodifiableMap(subtractMap(subtractMap(inputExtensions.map(),
invalidExtensions), validatedExtensions));
+ Map<String, String> ignored = new HashMap<>();
+ for (Map.Entry<String, String> entry :
inputExtensions.map().entrySet()) {
+ String key = entry.getKey();
+ if (!invalidExtensions.containsKey(key) &&
!validatedExtensions.containsKey(key)) {
+ ignored.put(key, entry.getValue());
+ }
+ }
+ return Collections.unmodifiableMap(ignored);
}
/**
diff --git
a/clients/src/main/java/org/apache/kafka/common/utils/CollectionUtils.java
b/clients/src/main/java/org/apache/kafka/common/utils/CollectionUtils.java
deleted file mode 100644
index c4d427a7070..00000000000
--- a/clients/src/main/java/org/apache/kafka/common/utils/CollectionUtils.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.common.utils;
-
-import org.apache.kafka.common.TopicPartition;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.function.BiConsumer;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-
-public final class CollectionUtils {
-
- private CollectionUtils() {}
-
- /**
- * Given two maps (A, B), returns all the key-value pairs in A whose keys
are not contained in B
- */
- public static <K, V> Map<K, V> subtractMap(Map<? extends K, ? extends V>
minuend, Map<? extends K, ? extends V> subtrahend) {
- return minuend.entrySet().stream()
- .filter(entry -> !subtrahend.containsKey(entry.getKey()))
- .collect(Collectors.toMap(Map.Entry::getKey,
Map.Entry::getValue));
- }
-
- /**
- * group data by topic
- *
- * @param data Data to be partitioned
- * @param <T> Partition data type
- * @return partitioned data
- */
- public static <T> Map<String, Map<Integer, T>>
groupPartitionDataByTopic(Map<TopicPartition, ? extends T> data) {
- Map<String, Map<Integer, T>> dataByTopic = new HashMap<>();
- for (Map.Entry<TopicPartition, ? extends T> entry : data.entrySet()) {
- String topic = entry.getKey().topic();
- int partition = entry.getKey().partition();
- Map<Integer, T> topicData = dataByTopic.computeIfAbsent(topic, t
-> new HashMap<>());
- topicData.put(partition, entry.getValue());
- }
- return dataByTopic;
- }
-
- /**
- * Group a list of partitions by the topic name.
- *
- * @param partitions The partitions to collect
- * @return partitions per topic
- */
- public static Map<String, List<Integer>>
groupPartitionsByTopic(Collection<TopicPartition> partitions) {
- return groupPartitionsByTopic(
- partitions,
- topic -> new ArrayList<>(),
- List::add
- );
- }
-
- /**
- * Group a collection of partitions by topic
- *
- * @return The map used to group the partitions
- */
- public static <T> Map<String, T> groupPartitionsByTopic(
- Collection<TopicPartition> partitions,
- Function<String, T> buildGroup,
- BiConsumer<T, Integer> addToGroup
- ) {
- Map<String, T> dataByTopic = new HashMap<>();
- for (TopicPartition tp : partitions) {
- String topic = tp.topic();
- T topicData = dataByTopic.computeIfAbsent(topic, buildGroup);
- addToGroup.accept(topicData, tp.partition());
- }
- return dataByTopic;
- }
-}
diff --git
a/clients/src/main/resources/common/message/DescribeProducersRequest.json
b/clients/src/main/resources/common/message/DescribeProducersRequest.json
index b7889ef1f1e..20f531c7db9 100644
--- a/clients/src/main/resources/common/message/DescribeProducersRequest.json
+++ b/clients/src/main/resources/common/message/DescribeProducersRequest.json
@@ -23,7 +23,7 @@
"fields": [
{ "name": "Topics", "type": "[]TopicRequest", "versions": "0+",
"about": "The topics to list producers for.", "fields": [
- { "name": "Name", "type": "string", "versions": "0+", "entityType":
"topicName",
+ { "name": "Name", "type": "string", "versions": "0+", "mapKey": true,
"entityType": "topicName",
"about": "The topic name." },
{ "name": "PartitionIndexes", "type": "[]int32", "versions": "0+",
"about": "The indexes of the partitions to list producers for." }
diff --git
a/clients/src/test/java/org/apache/kafka/clients/admin/internals/DescribeProducersHandlerTest.java
b/clients/src/test/java/org/apache/kafka/clients/admin/internals/DescribeProducersHandlerTest.java
index 214d5fe47e9..3a2ab506249 100644
---
a/clients/src/test/java/org/apache/kafka/clients/admin/internals/DescribeProducersHandlerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/admin/internals/DescribeProducersHandlerTest.java
@@ -33,7 +33,6 @@ import
org.apache.kafka.common.message.DescribeProducersResponseData.TopicRespon
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.DescribeProducersRequest;
import org.apache.kafka.common.requests.DescribeProducersResponse;
-import org.apache.kafka.common.utils.CollectionUtils;
import org.apache.kafka.common.utils.LogContext;
import org.junit.jupiter.api.Test;
@@ -50,7 +49,6 @@ import static java.util.Arrays.asList;
import static java.util.Collections.emptyList;
import static java.util.Collections.emptyMap;
import static java.util.Collections.singletonList;
-import static java.util.Collections.singletonMap;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -120,7 +118,7 @@ public class DescribeProducersHandlerTest {
int brokerId = 3;
DescribeProducersRequest.Builder request =
handler.buildBatchedRequest(brokerId, topicPartitions);
- List<DescribeProducersRequestData.TopicRequest> topics =
request.data.topics();
+ DescribeProducersRequestData.TopicRequestCollection topics =
request.data.topics();
assertEquals(Set.of("foo", "bar"), topics.stream()
.map(DescribeProducersRequestData.TopicRequest::name)
@@ -195,9 +193,7 @@ public class DescribeProducersHandlerTest {
DescribeProducersHandler handler = newHandler(options);
PartitionResponse partitionResponse =
sampleProducerState(topicPartition);
- DescribeProducersResponse response = describeProducersResponse(
- singletonMap(topicPartition, partitionResponse)
- );
+ DescribeProducersResponse response =
describeProducersResponse(topicPartition, partitionResponse);
Node node = new Node(3, "host", 1);
ApiResult<TopicPartition, PartitionProducerState> result =
@@ -252,7 +248,7 @@ public class DescribeProducersHandlerTest {
PartitionResponse partitionResponse = new PartitionResponse()
.setPartitionIndex(topicPartition.partition())
.setErrorCode(error.code());
- return describeProducersResponse(singletonMap(topicPartition,
partitionResponse));
+ return describeProducersResponse(topicPartition, partitionResponse);
}
private PartitionResponse sampleProducerState(TopicPartition
topicPartition) {
@@ -304,27 +300,16 @@ public class DescribeProducersHandlerTest {
}
private DescribeProducersResponse describeProducersResponse(
- Map<TopicPartition, PartitionResponse> partitionResponses
+ TopicPartition partition, PartitionResponse partitionResponse
) {
DescribeProducersResponseData response = new
DescribeProducersResponseData();
- Map<String, Map<Integer, PartitionResponse>> partitionResponsesByTopic
=
- CollectionUtils.groupPartitionDataByTopic(partitionResponses);
-
- for (Map.Entry<String, Map<Integer, PartitionResponse>> topicEntry :
partitionResponsesByTopic.entrySet()) {
- String topic = topicEntry.getKey();
- Map<Integer, PartitionResponse> topicPartitionResponses =
topicEntry.getValue();
- TopicResponse topicResponse = new TopicResponse().setName(topic);
- response.topics().add(topicResponse);
+ TopicResponse topicResponse = new
TopicResponse().setName(partition.topic());
+ int partitionId = partition.partition();
+
topicResponse.partitions().add(partitionResponse.setPartitionIndex(partitionId));
- for (Map.Entry<Integer, PartitionResponse> partitionEntry :
topicPartitionResponses.entrySet()) {
- Integer partitionId = partitionEntry.getKey();
- PartitionResponse partitionResponse =
partitionEntry.getValue();
-
topicResponse.partitions().add(partitionResponse.setPartitionIndex(partitionId));
- }
- }
+ response.topics().add(topicResponse);
return new DescribeProducersResponse(response);
}
-
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/StickyAssignorTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/StickyAssignorTest.java
index c225c291e8f..b2a04eaec81 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/StickyAssignorTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/StickyAssignorTest.java
@@ -25,7 +25,6 @@ import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.test.api.Flaky;
-import org.apache.kafka.common.utils.CollectionUtils;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
@@ -368,7 +367,11 @@ public class StickyAssignorTest extends
AbstractStickyAssignorTest {
private Subscription buildSubscriptionWithOldSchema(List<String> topics,
List<TopicPartition> partitions, int consumerIndex) {
Struct struct = new
Struct(StickyAssignor.STICKY_ASSIGNOR_USER_DATA_V0);
List<Struct> topicAssignments = new ArrayList<>();
- for (Map.Entry<String, List<Integer>> topicEntry :
CollectionUtils.groupPartitionsByTopic(partitions).entrySet()) {
+ Map<String, List<Integer>> partitionsByTopic = new HashMap<>();
+ for (TopicPartition tp : partitions) {
+ partitionsByTopic.computeIfAbsent(tp.topic(), t -> new
ArrayList<>()).add(tp.partition());
+ }
+ for (Map.Entry<String, List<Integer>> topicEntry :
partitionsByTopic.entrySet()) {
Struct topicAssignment = new
Struct(StickyAssignor.TOPIC_ASSIGNMENT);
topicAssignment.set(StickyAssignor.TOPIC_KEY_NAME,
topicEntry.getKey());
topicAssignment.set(StickyAssignor.PARTITIONS_KEY_NAME,
topicEntry.getValue().toArray());
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java
index 81132564593..c7b0c909477 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java
@@ -23,7 +23,6 @@ import
org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignorTest
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.test.api.Flaky;
-import org.apache.kafka.common.utils.CollectionUtils;
import org.apache.kafka.common.utils.Utils;
import org.junit.jupiter.api.BeforeEach;
@@ -1522,15 +1521,16 @@ public abstract class AbstractStickyAssignorTest {
if (Math.abs(len - otherLen) <= 1)
continue;
- Map<String, List<Integer>> map =
CollectionUtils.groupPartitionsByTopic(partitions);
- Map<String, List<Integer>> otherMap =
CollectionUtils.groupPartitionsByTopic(otherPartitions);
-
int moreLoaded = len > otherLen ? i : j;
int lessLoaded = len > otherLen ? j : i;
+ Set<String> otherTopics = otherPartitions.stream()
+ .map(TopicPartition::topic)
+ .collect(Collectors.toSet());
+
// If there's any overlap in the subscribed topics, we should
have been able to balance partitions
- for (String topic: map.keySet()) {
- assertFalse(otherMap.containsKey(topic),
+ for (TopicPartition tp : partitions) {
+ assertFalse(otherTopics.contains(tp.topic()),
"Error: Some partitions can be moved from c" +
moreLoaded + " to c" + lessLoaded + " to achieve a better balance" +
"\nc" + i + " has " + len + " partitions, and c" + j +
" has " + otherLen + " partitions." +
"\nSubscriptions: " + subscriptions +
diff --git
a/clients/src/test/java/org/apache/kafka/common/utils/CollectionUtilsTest.java
b/clients/src/test/java/org/apache/kafka/common/utils/CollectionUtilsTest.java
deleted file mode 100644
index 7c2a9d66e3a..00000000000
---
a/clients/src/test/java/org/apache/kafka/common/utils/CollectionUtilsTest.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.common.utils;
-
-import org.junit.jupiter.api.Test;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.apache.kafka.common.utils.CollectionUtils.subtractMap;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertNotSame;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-public class CollectionUtilsTest {
-
- @Test
- public void testSubtractMapRemovesSecondMapsKeys() {
- Map<String, String> mainMap = new HashMap<>();
- mainMap.put("one", "1");
- mainMap.put("two", "2");
- mainMap.put("three", "3");
- Map<String, String> secondaryMap = new HashMap<>();
- secondaryMap.put("one", "4");
- secondaryMap.put("two", "5");
-
- Map<String, String> newMap = subtractMap(mainMap, secondaryMap);
-
- assertEquals(3, mainMap.size()); // original map should not be
modified
- assertEquals(1, newMap.size());
- assertTrue(newMap.containsKey("three"));
- assertEquals("3", newMap.get("three"));
- }
-
- @Test
- public void testSubtractMapDoesntRemoveAnythingWhenEmptyMap() {
- Map<String, String> mainMap = new HashMap<>();
- mainMap.put("one", "1");
- mainMap.put("two", "2");
- mainMap.put("three", "3");
- Map<String, String> secondaryMap = new HashMap<>();
-
- Map<String, String> newMap = subtractMap(mainMap, secondaryMap);
-
- assertEquals(3, newMap.size());
- assertEquals("1", newMap.get("one"));
- assertEquals("2", newMap.get("two"));
- assertEquals("3", newMap.get("three"));
- assertNotSame(newMap, mainMap);
- }
-}
diff --git
a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index e2ac0e1433c..3c4e424590d 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -648,11 +648,11 @@ class AuthorizerIntegrationTest extends
AbstractAuthorizerIntegrationTest {
private def describeProducersRequest: DescribeProducersRequest = new
DescribeProducersRequest.Builder(
new DescribeProducersRequestData()
- .setTopics(java.util.List.of(
+ .setTopics(new
DescribeProducersRequestData.TopicRequestCollection(java.util.List.of(
new DescribeProducersRequestData.TopicRequest()
.setName(tp.topic)
.setPartitionIndexes(java.util.List.of(Int.box(tp.partition)))
- ))
+ ).iterator()))
).build()
private def describeTransactionsRequest: DescribeTransactionsRequest = new
DescribeTransactionsRequest.Builder(
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index c5974e7e317..a8277531bbc 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -10496,7 +10496,7 @@ class KafkaApisTest extends Logging {
val tp4 = new TopicPartition("invalid;topic", 1)
val authorizer: Authorizer = mock(classOf[Authorizer])
- val data = new DescribeProducersRequestData().setTopics(util.List.of(
+ val data = new DescribeProducersRequestData().setTopics(new
DescribeProducersRequestData.TopicRequestCollection(util.List.of(
new DescribeProducersRequestData.TopicRequest()
.setName(tp1.topic)
.setPartitionIndexes(util.List.of(Int.box(tp1.partition))),
@@ -10509,7 +10509,7 @@ class KafkaApisTest extends Logging {
new DescribeProducersRequestData.TopicRequest()
.setName(tp4.topic)
.setPartitionIndexes(util.List.of(Int.box(tp4.partition)))
- ))
+ ).iterator()))
def buildExpectedActions(topic: String): util.List[Action] = {
val pattern = new ResourcePattern(ResourceType.TOPIC, topic,
PatternType.LITERAL)
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index df8683fe65f..ac35fb8f6ad 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -652,9 +652,9 @@ class RequestQuotaTest extends BaseRequestTest {
case ApiKeys.DESCRIBE_PRODUCERS =>
new DescribeProducersRequest.Builder(new
DescribeProducersRequestData()
- .setTopics(util.List.of(new
DescribeProducersRequestData.TopicRequest()
+ .setTopics(new
DescribeProducersRequestData.TopicRequestCollection(util.List.of(new
DescribeProducersRequestData.TopicRequest()
.setName("test-topic")
- .setPartitionIndexes(util.List.of[Integer](1, 2, 3)))))
+ .setPartitionIndexes(util.List.of[Integer](1, 2,
3))).iterator())))
case ApiKeys.BROKER_REGISTRATION =>
new BrokerRegistrationRequest.Builder(new
BrokerRegistrationRequestData())