This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a1c155ee283 KAFKA-20297 Cleanup 
`org.apache.kafka.common.utils.CollectionUtils` (#21818)
a1c155ee283 is described below

commit a1c155ee2835b5e6a6fdd2db5da14f83af5a4da3
Author: Eric Chang <[email protected]>
AuthorDate: Mon Mar 30 17:40:21 2026 +0800

    KAFKA-20297 Cleanup `org.apache.kafka.common.utils.CollectionUtils` (#21818)
    
    This PR removes `CollectionUtils` entirely (KAFKA-20297) by replacing
    all usages with alternatives.
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../admin/internals/DescribeProducersHandler.java  | 17 ++--
 .../admin/internals/ListOffsetsHandler.java        | 21 +++--
 .../kafka/clients/consumer/StickyAssignor.java     |  9 ++-
 .../OAuthBearerExtensionsValidatorCallback.java    | 11 ++-
 .../apache/kafka/common/utils/CollectionUtils.java | 93 ----------------------
 .../common/message/DescribeProducersRequest.json   |  2 +-
 .../internals/DescribeProducersHandlerTest.java    | 31 ++------
 .../kafka/clients/consumer/StickyAssignorTest.java |  7 +-
 .../internals/AbstractStickyAssignorTest.java      | 12 +--
 .../kafka/common/utils/CollectionUtilsTest.java    | 65 ---------------
 .../kafka/api/AuthorizerIntegrationTest.scala      |  4 +-
 .../scala/unit/kafka/server/KafkaApisTest.scala    |  4 +-
 .../scala/unit/kafka/server/RequestQuotaTest.scala |  4 +-
 13 files changed, 59 insertions(+), 221 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeProducersHandler.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeProducersHandler.java
index 3ae5638423c..526d0d227e2 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeProducersHandler.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeProducersHandler.java
@@ -30,7 +30,6 @@ import org.apache.kafka.common.requests.AbstractResponse;
 import org.apache.kafka.common.requests.ApiError;
 import org.apache.kafka.common.requests.DescribeProducersRequest;
 import org.apache.kafka.common.requests.DescribeProducersResponse;
-import org.apache.kafka.common.utils.CollectionUtils;
 import org.apache.kafka.common.utils.LogContext;
 
 import org.slf4j.Logger;
@@ -89,15 +88,17 @@ public class DescribeProducersHandler extends 
AdminApiHandler.Batched<TopicParti
         Set<TopicPartition> topicPartitions
     ) {
         DescribeProducersRequestData request = new 
DescribeProducersRequestData();
-        DescribeProducersRequest.Builder builder = new 
DescribeProducersRequest.Builder(request);
 
-        CollectionUtils.groupPartitionsByTopic(
-            topicPartitions,
-            builder::addTopic,
-            (topicRequest, partitionId) -> 
topicRequest.partitionIndexes().add(partitionId)
-        );
+        for (TopicPartition tp : topicPartitions) {
+            DescribeProducersRequestData.TopicRequest topicRequest = 
request.topics().find(tp.topic());
+            if (topicRequest == null) {
+                topicRequest = new 
DescribeProducersRequestData.TopicRequest().setName(tp.topic());
+                request.topics().add(topicRequest);
+            }
+            topicRequest.partitionIndexes().add(tp.partition());
+        }
 
-        return builder;
+        return new DescribeProducersRequest.Builder(request);
     }
 
     private void handlePartitionError(
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListOffsetsHandler.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListOffsetsHandler.java
index c03a6c5bee0..742c8a276f0 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListOffsetsHandler.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListOffsetsHandler.java
@@ -32,7 +32,6 @@ import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.AbstractResponse;
 import org.apache.kafka.common.requests.ListOffsetsRequest;
 import org.apache.kafka.common.requests.ListOffsetsResponse;
-import org.apache.kafka.common.utils.CollectionUtils;
 import org.apache.kafka.common.utils.LogContext;
 
 import org.slf4j.Logger;
@@ -80,17 +79,15 @@ public final class ListOffsetsHandler extends 
Batched<TopicPartition, ListOffset
 
     @Override
     ListOffsetsRequest.Builder buildBatchedRequest(int brokerId, 
Set<TopicPartition> keys) {
-        Map<String, ListOffsetsTopic> topicsByName = 
CollectionUtils.groupPartitionsByTopic(
-            keys,
-            topicName -> new ListOffsetsTopic().setName(topicName),
-            (listOffsetsTopic, partitionId) -> {
-                TopicPartition topicPartition = new 
TopicPartition(listOffsetsTopic.name(), partitionId);
-                long offsetTimestamp = 
offsetTimestampsByPartition.get(topicPartition);
-                listOffsetsTopic.partitions().add(
-                    new ListOffsetsPartition()
-                        .setPartitionIndex(partitionId)
-                        .setTimestamp(offsetTimestamp));
-            });
+        Map<String, ListOffsetsTopic> topicsByName = new HashMap<>();
+        for (TopicPartition topicPartition : keys) {
+            ListOffsetsTopic topic = topicsByName.computeIfAbsent(
+                topicPartition.topic(), t -> new 
ListOffsetsTopic().setName(t));
+            long offsetTimestamp = 
offsetTimestampsByPartition.get(topicPartition);
+            topic.partitions().add(new ListOffsetsPartition()
+                .setPartitionIndex(topicPartition.partition())
+                .setTimestamp(offsetTimestamp));
+        }
         boolean supportsMaxTimestamp = keys
             .stream()
             .anyMatch(key -> offsetTimestampsByPartition.get(key) == 
ListOffsetsRequest.MAX_TIMESTAMP);
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java
index c7560ca3898..4e3cc855fed 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java
@@ -23,11 +23,11 @@ import org.apache.kafka.common.protocol.types.Field;
 import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.protocol.types.Type;
-import org.apache.kafka.common.utils.CollectionUtils;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -230,7 +230,11 @@ public class StickyAssignor extends AbstractStickyAssignor 
{
     static ByteBuffer serializeTopicPartitionAssignment(MemberData memberData) 
{
         Struct struct = new Struct(STICKY_ASSIGNOR_USER_DATA_V1);
         List<Struct> topicAssignments = new ArrayList<>();
-        for (Map.Entry<String, List<Integer>> topicEntry : 
CollectionUtils.groupPartitionsByTopic(memberData.partitions).entrySet()) {
+        var partitionsByTopic = new HashMap<String, List<Integer>>();
+        for (TopicPartition tp : memberData.partitions) {
+            partitionsByTopic.computeIfAbsent(tp.topic(), t -> new 
ArrayList<>()).add(tp.partition());
+        }
+        for (Map.Entry<String, List<Integer>> topicEntry : 
partitionsByTopic.entrySet()) {
             Struct topicAssignment = new Struct(TOPIC_ASSIGNMENT);
             topicAssignment.set(TOPIC_KEY_NAME, topicEntry.getKey());
             topicAssignment.set(PARTITIONS_KEY_NAME, 
topicEntry.getValue().toArray());
@@ -272,4 +276,5 @@ public class StickyAssignor extends AbstractStickyAssignor {
         Optional<Integer> generation = struct.hasField(GENERATION_KEY_NAME) ? 
Optional.of(struct.getInt(GENERATION_KEY_NAME)) : Optional.empty();
         return new MemberData(partitions, generation);
     }
+
 }
\ No newline at end of file
diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerExtensionsValidatorCallback.java
 
b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerExtensionsValidatorCallback.java
index e17cfd1746d..7874833e27a 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerExtensionsValidatorCallback.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerExtensionsValidatorCallback.java
@@ -25,8 +25,6 @@ import java.util.Objects;
 
 import javax.security.auth.callback.Callback;
 
-import static org.apache.kafka.common.utils.CollectionUtils.subtractMap;
-
 /**
  * A {@code Callback} for use by the {@code SaslServer} implementation when it
  * needs to validate the SASL extensions for the OAUTHBEARER mechanism
@@ -87,7 +85,14 @@ public class OAuthBearerExtensionsValidatorCallback 
implements Callback {
      * @return An immutable {@link Map} consisting of the extensions that have 
neither been validated nor invalidated
      */
     public Map<String, String> ignoredExtensions() {
-        return 
Collections.unmodifiableMap(subtractMap(subtractMap(inputExtensions.map(), 
invalidExtensions), validatedExtensions));
+        Map<String, String> ignored = new HashMap<>();
+        for (Map.Entry<String, String> entry : 
inputExtensions.map().entrySet()) {
+            String key = entry.getKey();
+            if (!invalidExtensions.containsKey(key) && 
!validatedExtensions.containsKey(key)) {
+                ignored.put(key, entry.getValue());
+            }
+        }
+        return Collections.unmodifiableMap(ignored);
     }
 
     /**
diff --git 
a/clients/src/main/java/org/apache/kafka/common/utils/CollectionUtils.java 
b/clients/src/main/java/org/apache/kafka/common/utils/CollectionUtils.java
deleted file mode 100644
index c4d427a7070..00000000000
--- a/clients/src/main/java/org/apache/kafka/common/utils/CollectionUtils.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.common.utils;
-
-import org.apache.kafka.common.TopicPartition;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.function.BiConsumer;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-
-public final class CollectionUtils {
-
-    private CollectionUtils() {}
-
-    /**
-     * Given two maps (A, B), returns all the key-value pairs in A whose keys 
are not contained in B
-     */
-    public static <K, V> Map<K, V> subtractMap(Map<? extends K, ? extends V> 
minuend, Map<? extends K, ? extends V> subtrahend) {
-        return minuend.entrySet().stream()
-                .filter(entry -> !subtrahend.containsKey(entry.getKey()))
-                .collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
-    }
-
-    /**
-     * group data by topic
-     *
-     * @param data Data to be partitioned
-     * @param <T> Partition data type
-     * @return partitioned data
-     */
-    public static <T> Map<String, Map<Integer, T>> 
groupPartitionDataByTopic(Map<TopicPartition, ? extends T> data) {
-        Map<String, Map<Integer, T>> dataByTopic = new HashMap<>();
-        for (Map.Entry<TopicPartition, ? extends T> entry : data.entrySet()) {
-            String topic = entry.getKey().topic();
-            int partition = entry.getKey().partition();
-            Map<Integer, T> topicData = dataByTopic.computeIfAbsent(topic, t 
-> new HashMap<>());
-            topicData.put(partition, entry.getValue());
-        }
-        return dataByTopic;
-    }
-
-    /**
-     * Group a list of partitions by the topic name.
-     *
-     * @param partitions The partitions to collect
-     * @return partitions per topic
-     */
-    public static Map<String, List<Integer>> 
groupPartitionsByTopic(Collection<TopicPartition> partitions) {
-        return groupPartitionsByTopic(
-            partitions,
-            topic -> new ArrayList<>(),
-            List::add
-        );
-    }
-
-    /**
-     * Group a collection of partitions by topic
-     *
-     * @return The map used to group the partitions
-     */
-    public static <T> Map<String, T> groupPartitionsByTopic(
-        Collection<TopicPartition> partitions,
-        Function<String, T> buildGroup,
-        BiConsumer<T, Integer> addToGroup
-    ) {
-        Map<String, T> dataByTopic = new HashMap<>();
-        for (TopicPartition tp : partitions) {
-            String topic = tp.topic();
-            T topicData = dataByTopic.computeIfAbsent(topic, buildGroup);
-            addToGroup.accept(topicData, tp.partition());
-        }
-        return dataByTopic;
-    }
-}
diff --git 
a/clients/src/main/resources/common/message/DescribeProducersRequest.json 
b/clients/src/main/resources/common/message/DescribeProducersRequest.json
index b7889ef1f1e..20f531c7db9 100644
--- a/clients/src/main/resources/common/message/DescribeProducersRequest.json
+++ b/clients/src/main/resources/common/message/DescribeProducersRequest.json
@@ -23,7 +23,7 @@
   "fields": [
     { "name": "Topics", "type": "[]TopicRequest", "versions": "0+",
       "about": "The topics to list producers for.", "fields": [
-      { "name": "Name", "type": "string", "versions": "0+", "entityType": 
"topicName",
+      { "name": "Name", "type": "string", "versions": "0+", "mapKey": true, 
"entityType": "topicName",
         "about": "The topic name." },
       { "name": "PartitionIndexes", "type": "[]int32", "versions": "0+",
         "about": "The indexes of the partitions to list producers for." }
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/internals/DescribeProducersHandlerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/admin/internals/DescribeProducersHandlerTest.java
index 214d5fe47e9..3a2ab506249 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/admin/internals/DescribeProducersHandlerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/admin/internals/DescribeProducersHandlerTest.java
@@ -33,7 +33,6 @@ import 
org.apache.kafka.common.message.DescribeProducersResponseData.TopicRespon
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.DescribeProducersRequest;
 import org.apache.kafka.common.requests.DescribeProducersResponse;
-import org.apache.kafka.common.utils.CollectionUtils;
 import org.apache.kafka.common.utils.LogContext;
 
 import org.junit.jupiter.api.Test;
@@ -50,7 +49,6 @@ import static java.util.Arrays.asList;
 import static java.util.Collections.emptyList;
 import static java.util.Collections.emptyMap;
 import static java.util.Collections.singletonList;
-import static java.util.Collections.singletonMap;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertInstanceOf;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -120,7 +118,7 @@ public class DescribeProducersHandlerTest {
         int brokerId = 3;
         DescribeProducersRequest.Builder request = 
handler.buildBatchedRequest(brokerId, topicPartitions);
 
-        List<DescribeProducersRequestData.TopicRequest> topics = 
request.data.topics();
+        DescribeProducersRequestData.TopicRequestCollection topics = 
request.data.topics();
 
         assertEquals(Set.of("foo", "bar"), topics.stream()
             .map(DescribeProducersRequestData.TopicRequest::name)
@@ -195,9 +193,7 @@ public class DescribeProducersHandlerTest {
         DescribeProducersHandler handler = newHandler(options);
 
         PartitionResponse partitionResponse = 
sampleProducerState(topicPartition);
-        DescribeProducersResponse response = describeProducersResponse(
-            singletonMap(topicPartition, partitionResponse)
-        );
+        DescribeProducersResponse response = 
describeProducersResponse(topicPartition, partitionResponse);
         Node node = new Node(3, "host", 1);
 
         ApiResult<TopicPartition, PartitionProducerState> result =
@@ -252,7 +248,7 @@ public class DescribeProducersHandlerTest {
         PartitionResponse partitionResponse = new PartitionResponse()
             .setPartitionIndex(topicPartition.partition())
             .setErrorCode(error.code());
-        return describeProducersResponse(singletonMap(topicPartition, 
partitionResponse));
+        return describeProducersResponse(topicPartition, partitionResponse);
     }
 
     private PartitionResponse sampleProducerState(TopicPartition 
topicPartition) {
@@ -304,27 +300,16 @@ public class DescribeProducersHandlerTest {
     }
 
     private DescribeProducersResponse describeProducersResponse(
-        Map<TopicPartition, PartitionResponse> partitionResponses
+        TopicPartition partition, PartitionResponse partitionResponse
     ) {
         DescribeProducersResponseData response = new 
DescribeProducersResponseData();
-        Map<String, Map<Integer, PartitionResponse>> partitionResponsesByTopic 
=
-            CollectionUtils.groupPartitionDataByTopic(partitionResponses);
-
-        for (Map.Entry<String, Map<Integer, PartitionResponse>> topicEntry : 
partitionResponsesByTopic.entrySet()) {
-            String topic = topicEntry.getKey();
-            Map<Integer, PartitionResponse> topicPartitionResponses = 
topicEntry.getValue();
 
-            TopicResponse topicResponse = new TopicResponse().setName(topic);
-            response.topics().add(topicResponse);
+        TopicResponse topicResponse = new 
TopicResponse().setName(partition.topic());
+        int partitionId = partition.partition();
+        
topicResponse.partitions().add(partitionResponse.setPartitionIndex(partitionId));
 
-            for (Map.Entry<Integer, PartitionResponse> partitionEntry : 
topicPartitionResponses.entrySet()) {
-                Integer partitionId = partitionEntry.getKey();
-                PartitionResponse partitionResponse = 
partitionEntry.getValue();
-                
topicResponse.partitions().add(partitionResponse.setPartitionIndex(partitionId));
-            }
-        }
+        response.topics().add(topicResponse);
 
         return new DescribeProducersResponse(response);
     }
-
 }
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/StickyAssignorTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/StickyAssignorTest.java
index c225c291e8f..b2a04eaec81 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/StickyAssignorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/StickyAssignorTest.java
@@ -25,7 +25,6 @@ import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.test.api.Flaky;
-import org.apache.kafka.common.utils.CollectionUtils;
 
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
@@ -368,7 +367,11 @@ public class StickyAssignorTest extends 
AbstractStickyAssignorTest {
     private Subscription buildSubscriptionWithOldSchema(List<String> topics, 
List<TopicPartition> partitions, int consumerIndex) {
         Struct struct = new 
Struct(StickyAssignor.STICKY_ASSIGNOR_USER_DATA_V0);
         List<Struct> topicAssignments = new ArrayList<>();
-        for (Map.Entry<String, List<Integer>> topicEntry : 
CollectionUtils.groupPartitionsByTopic(partitions).entrySet()) {
+        Map<String, List<Integer>> partitionsByTopic = new HashMap<>();
+        for (TopicPartition tp : partitions) {
+            partitionsByTopic.computeIfAbsent(tp.topic(), t -> new 
ArrayList<>()).add(tp.partition());
+        }
+        for (Map.Entry<String, List<Integer>> topicEntry : 
partitionsByTopic.entrySet()) {
             Struct topicAssignment = new 
Struct(StickyAssignor.TOPIC_ASSIGNMENT);
             topicAssignment.set(StickyAssignor.TOPIC_KEY_NAME, 
topicEntry.getKey());
             topicAssignment.set(StickyAssignor.PARTITIONS_KEY_NAME, 
topicEntry.getValue().toArray());
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java
index 81132564593..c7b0c909477 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignorTest.java
@@ -23,7 +23,6 @@ import 
org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignorTest
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.test.api.Flaky;
-import org.apache.kafka.common.utils.CollectionUtils;
 import org.apache.kafka.common.utils.Utils;
 
 import org.junit.jupiter.api.BeforeEach;
@@ -1522,15 +1521,16 @@ public abstract class AbstractStickyAssignorTest {
                 if (Math.abs(len - otherLen) <= 1)
                     continue;
 
-                Map<String, List<Integer>> map = 
CollectionUtils.groupPartitionsByTopic(partitions);
-                Map<String, List<Integer>> otherMap = 
CollectionUtils.groupPartitionsByTopic(otherPartitions);
-
                 int moreLoaded = len > otherLen ? i : j;
                 int lessLoaded = len > otherLen ? j : i;
 
+                Set<String> otherTopics = otherPartitions.stream()
+                    .map(TopicPartition::topic)
+                    .collect(Collectors.toSet());
+
                 // If there's any overlap in the subscribed topics, we should 
have been able to balance partitions
-                for (String topic: map.keySet()) {
-                    assertFalse(otherMap.containsKey(topic),
+                for (TopicPartition tp : partitions) {
+                    assertFalse(otherTopics.contains(tp.topic()),
                         "Error: Some partitions can be moved from c" + 
moreLoaded + " to c" + lessLoaded + " to achieve a better balance" +
                         "\nc" + i + " has " + len + " partitions, and c" + j + 
" has " + otherLen + " partitions." +
                         "\nSubscriptions: " + subscriptions +
diff --git 
a/clients/src/test/java/org/apache/kafka/common/utils/CollectionUtilsTest.java 
b/clients/src/test/java/org/apache/kafka/common/utils/CollectionUtilsTest.java
deleted file mode 100644
index 7c2a9d66e3a..00000000000
--- 
a/clients/src/test/java/org/apache/kafka/common/utils/CollectionUtilsTest.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.common.utils;
-
-import org.junit.jupiter.api.Test;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.apache.kafka.common.utils.CollectionUtils.subtractMap;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertNotSame;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-public class CollectionUtilsTest {
-
-    @Test
-    public void testSubtractMapRemovesSecondMapsKeys() {
-        Map<String, String> mainMap = new HashMap<>();
-        mainMap.put("one", "1");
-        mainMap.put("two", "2");
-        mainMap.put("three", "3");
-        Map<String, String> secondaryMap = new HashMap<>();
-        secondaryMap.put("one", "4");
-        secondaryMap.put("two", "5");
-
-        Map<String, String> newMap = subtractMap(mainMap, secondaryMap);
-
-        assertEquals(3, mainMap.size());  // original map should not be 
modified
-        assertEquals(1, newMap.size());
-        assertTrue(newMap.containsKey("three"));
-        assertEquals("3", newMap.get("three"));
-    }
-
-    @Test
-    public void testSubtractMapDoesntRemoveAnythingWhenEmptyMap() {
-        Map<String, String> mainMap = new HashMap<>();
-        mainMap.put("one", "1");
-        mainMap.put("two", "2");
-        mainMap.put("three", "3");
-        Map<String, String> secondaryMap = new HashMap<>();
-
-        Map<String, String> newMap = subtractMap(mainMap, secondaryMap);
-
-        assertEquals(3, newMap.size());
-        assertEquals("1", newMap.get("one"));
-        assertEquals("2", newMap.get("two"));
-        assertEquals("3", newMap.get("three"));
-        assertNotSame(newMap, mainMap);
-    }
-}
diff --git 
a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index e2ac0e1433c..3c4e424590d 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -648,11 +648,11 @@ class AuthorizerIntegrationTest extends 
AbstractAuthorizerIntegrationTest {
 
   private def describeProducersRequest: DescribeProducersRequest = new 
DescribeProducersRequest.Builder(
     new DescribeProducersRequestData()
-      .setTopics(java.util.List.of(
+      .setTopics(new 
DescribeProducersRequestData.TopicRequestCollection(java.util.List.of(
         new DescribeProducersRequestData.TopicRequest()
           .setName(tp.topic)
           .setPartitionIndexes(java.util.List.of(Int.box(tp.partition)))
-      ))
+      ).iterator()))
   ).build()
 
   private def describeTransactionsRequest: DescribeTransactionsRequest = new 
DescribeTransactionsRequest.Builder(
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index c5974e7e317..a8277531bbc 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -10496,7 +10496,7 @@ class KafkaApisTest extends Logging {
     val tp4 = new TopicPartition("invalid;topic", 1)
 
     val authorizer: Authorizer = mock(classOf[Authorizer])
-    val data = new DescribeProducersRequestData().setTopics(util.List.of(
+    val data = new DescribeProducersRequestData().setTopics(new 
DescribeProducersRequestData.TopicRequestCollection(util.List.of(
       new DescribeProducersRequestData.TopicRequest()
         .setName(tp1.topic)
         .setPartitionIndexes(util.List.of(Int.box(tp1.partition))),
@@ -10509,7 +10509,7 @@ class KafkaApisTest extends Logging {
       new DescribeProducersRequestData.TopicRequest()
         .setName(tp4.topic)
         .setPartitionIndexes(util.List.of(Int.box(tp4.partition)))
-    ))
+    ).iterator()))
 
     def buildExpectedActions(topic: String): util.List[Action] = {
       val pattern = new ResourcePattern(ResourceType.TOPIC, topic, 
PatternType.LITERAL)
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala 
b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index df8683fe65f..ac35fb8f6ad 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -652,9 +652,9 @@ class RequestQuotaTest extends BaseRequestTest {
 
         case ApiKeys.DESCRIBE_PRODUCERS =>
           new DescribeProducersRequest.Builder(new 
DescribeProducersRequestData()
-            .setTopics(util.List.of(new 
DescribeProducersRequestData.TopicRequest()
+            .setTopics(new 
DescribeProducersRequestData.TopicRequestCollection(util.List.of(new 
DescribeProducersRequestData.TopicRequest()
               .setName("test-topic")
-              .setPartitionIndexes(util.List.of[Integer](1, 2, 3)))))
+              .setPartitionIndexes(util.List.of[Integer](1, 2, 
3))).iterator())))
 
         case ApiKeys.BROKER_REGISTRATION =>
           new BrokerRegistrationRequest.Builder(new 
BrokerRegistrationRequestData())

Reply via email to