This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch 4.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.3 by this push:
     new 9945592afc4 MINOR: Use mutable maps in Fetch.forPartition to avoid 
potential UnsupportedOperationException  (#21816)
9945592afc4 is described below

commit 9945592afc4652624733b373c5a2371eccaa413e
Author: Chih-Yuan Chien <[email protected]>
AuthorDate: Sat Mar 21 11:04:54 2026 +0800

    MINOR: Use mutable maps in Fetch.forPartition to avoid potential 
UnsupportedOperationException  (#21816)
    
    Fetch.forPartition used Map.of() which creates immutable maps. If add()
    is called on a forPartition-created Fetch as the target, putAll on the
    immutable nextOffsetAndMetadata would throw
    UnsupportedOperationException. Replace with HashMap to ensure
    mutability.
    
    See https://github.com/apache/kafka/pull/21726#discussion_r2958164222
    
    Added FetchTest to cover forPartition mutability, add() merging,
    positionAdvanced propagation, and unmodifiable return maps.
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../kafka/clients/consumer/internals/Fetch.java    |  31 +++--
 .../clients/consumer/internals/FetchTest.java      | 141 +++++++++++++++++++++
 2 files changed, 161 insertions(+), 11 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetch.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetch.java
index 1c680a7ee7a..9c9f34ddf01 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetch.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetch.java
@@ -34,7 +34,7 @@ public class Fetch<K, V> {
     private int numRecords;
 
     public static <K, V> Fetch<K, V> empty() {
-        return new Fetch<>(new HashMap<>(), false, 0, new HashMap<>());
+        return new Fetch<>(false, 0);
     }
 
     public static <K, V> Fetch<K, V> forPartition(
@@ -43,25 +43,34 @@ public class Fetch<K, V> {
             boolean positionAdvanced,
             OffsetAndMetadata nextOffsetAndMetadata
     ) {
-        Map<TopicPartition, List<ConsumerRecord<K, V>>> recordsMap = 
records.isEmpty()
-                ? Map.of()
-                : Map.of(partition, records);
-        Map<TopicPartition, OffsetAndMetadata> nextOffsetAndMetadataMap = 
Map.of(partition, nextOffsetAndMetadata);
-        return new Fetch<>(recordsMap, positionAdvanced, records.size(), 
nextOffsetAndMetadataMap);
+        return new Fetch<>(positionAdvanced, partition, records, 
nextOffsetAndMetadata);
     }
 
     private Fetch(
-            Map<TopicPartition, List<ConsumerRecord<K, V>>> records,
             boolean positionAdvanced,
-            int numRecords,
-            Map<TopicPartition, OffsetAndMetadata> nextOffsetAndMetadata
+            int numRecords
     ) {
-        this.records = records;
+        this.records = new HashMap<>();
         this.positionAdvanced = positionAdvanced;
         this.numRecords = numRecords;
-        this.nextOffsetAndMetadata = nextOffsetAndMetadata;
+        this.nextOffsetAndMetadata = new HashMap<>();
     }
 
+    private Fetch(
+            boolean positionAdvanced,
+            TopicPartition partition,
+            List<ConsumerRecord<K, V>> records,
+            OffsetAndMetadata offsetAndMetadata
+    ) {
+        this.records = new HashMap<>();
+        if (!records.isEmpty()) {
+            this.records.put(partition, records);
+        }
+        this.positionAdvanced = positionAdvanced;
+        this.numRecords = records.size();
+        this.nextOffsetAndMetadata = new HashMap<>();
+        this.nextOffsetAndMetadata.put(partition, offsetAndMetadata);
+    }
     /**
      * Add another {@link Fetch} to this one; all of its records will be added 
to this fetch's
      * {@link #records() records}, and if the other fetch
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchTest.java
new file mode 100644
index 00000000000..cec662b6333
--- /dev/null
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchTest.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.Optional;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class FetchTest {
+
+    private static final TopicPartition TP0 = new TopicPartition("topic", 0);
+    private static final TopicPartition TP1 = new TopicPartition("topic", 1);
+
+    @Test
+    void testAddToForPartitionFetch() {
+        var records0 = List.of(new ConsumerRecord<>("topic", 0, 0, "key0", 
"value0"));
+        var target = Fetch.forPartition(TP0, records0, true, new 
OffsetAndMetadata(1, Optional.empty(), ""));
+
+        var records1 = List.of(new ConsumerRecord<>("topic", 1, 0, "key1", 
"value1"));
+        var source = Fetch.forPartition(TP1, records1, true, new 
OffsetAndMetadata(1, Optional.empty(), ""));
+
+        target.add(source);
+
+        assertEquals(2, target.numRecords());
+        assertTrue(target.records().containsKey(TP0));
+        assertTrue(target.records().containsKey(TP1));
+        assertTrue(target.nextOffsets().containsKey(TP0));
+        assertTrue(target.nextOffsets().containsKey(TP1));
+    }
+
+    @Test
+    void testAddForPartitionFetchToEmptyFetch() {
+        Fetch<String, String> target = Fetch.empty();
+
+        var records = List.of(new ConsumerRecord<>("topic", 0, 0, "key", 
"value"));
+        var source = Fetch.forPartition(TP0, records, true, new 
OffsetAndMetadata(1, Optional.empty(), ""));
+
+        target.add(source);
+
+        assertEquals(1, target.numRecords());
+        assertTrue(target.records().containsKey(TP0));
+        assertTrue(target.nextOffsets().containsKey(TP0));
+    }
+
+    @Test
+    void testForPartitionWithEmptyRecords() {
+        var fetch = Fetch.forPartition(TP0, List.of(), true, new 
OffsetAndMetadata(1, Optional.empty(), ""));
+
+        assertTrue(fetch.records().isEmpty());
+        assertEquals(0, fetch.numRecords());
+        assertTrue(fetch.positionAdvanced());
+        assertFalse(fetch.isEmpty());
+        assertTrue(fetch.nextOffsets().containsKey(TP0));
+    }
+
+    @Test
+    void testAddWithSamePartitionMergesRecords() {
+        var records0 = List.of(new ConsumerRecord<>("topic", 0, 0, "key0", 
"value0"));
+        var records1 = List.of(new ConsumerRecord<>("topic", 0, 1, "key1", 
"value1"));
+        var target = Fetch.forPartition(TP0, records0, true, new 
OffsetAndMetadata(1, Optional.empty(), ""));
+        var source = Fetch.forPartition(TP0, records1, true, new 
OffsetAndMetadata(2, Optional.empty(), ""));
+
+        target.add(source);
+
+        assertEquals(2, target.numRecords());
+        assertEquals(2, target.records().get(TP0).size());
+    }
+
+    @Test
+    void testAddPropagatesPositionAdvanced() {
+        var target = Fetch.forPartition(TP0, List.of(), false, new 
OffsetAndMetadata(0, Optional.empty(), ""));
+        var source = Fetch.forPartition(TP1, List.of(), true, new 
OffsetAndMetadata(1, Optional.empty(), ""));
+
+        assertFalse(target.positionAdvanced());
+
+        target.add(source);
+
+        assertTrue(target.positionAdvanced());
+    }
+
+    @Test
+    void testForPartitionWithoutPositionAdvanced() {
+        var records = List.of(new ConsumerRecord<>("topic", 0, 0, "key", 
"value"));
+        var fetch = Fetch.forPartition(TP0, records, false, new 
OffsetAndMetadata(1, Optional.empty(), ""));
+
+        assertFalse(fetch.positionAdvanced());
+        assertFalse(fetch.isEmpty());
+        assertEquals(1, fetch.numRecords());
+    }
+
+    @Test
+    void testRecordsReturnsUnmodifiableMap() {
+        var records = List.of(new ConsumerRecord<>("topic", 0, 0, "key", 
"value"));
+        var fetch = Fetch.forPartition(TP0, records, true, new 
OffsetAndMetadata(1, Optional.empty(), ""));
+
+        assertThrows(UnsupportedOperationException.class, () -> 
fetch.records().put(TP1, List.of()));
+    }
+
+    @Test
+    void testNextOffsetsReturnsUnmodifiableMap() {
+        var records = List.of(new ConsumerRecord<>("topic", 0, 0, "key", 
"value"));
+        var fetch = Fetch.forPartition(TP0, records, true, new 
OffsetAndMetadata(1, Optional.empty(), ""));
+
+        assertThrows(UnsupportedOperationException.class,
+                () -> fetch.nextOffsets().put(TP1, new OffsetAndMetadata(99, 
Optional.empty(), "")));
+    }
+
+    @Test
+    void testEmpty() {
+        Fetch<String, String> fetch = Fetch.empty();
+        assertTrue(fetch.isEmpty());
+        assertEquals(0, fetch.numRecords());
+        assertFalse(fetch.positionAdvanced());
+        assertTrue(fetch.records().isEmpty());
+        assertTrue(fetch.nextOffsets().isEmpty());
+    }
+}
\ No newline at end of file

Reply via email to