This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch 4.3
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.3 by this push:
new 9945592afc4 MINOR: Use mutable maps in Fetch.forPartition to avoid
potential UnsupportedOperationException (#21816)
9945592afc4 is described below
commit 9945592afc4652624733b373c5a2371eccaa413e
Author: Chih-Yuan Chien <[email protected]>
AuthorDate: Sat Mar 21 11:04:54 2026 +0800
MINOR: Use mutable maps in Fetch.forPartition to avoid potential
UnsupportedOperationException (#21816)
Fetch.forPartition used Map.of() which creates immutable maps. If add()
is called on a forPartition-created Fetch as the target, putAll on the
immutable nextOffsetAndMetadata would throw
UnsupportedOperationException. Replace with HashMap to ensure
mutability.
See https://github.com/apache/kafka/pull/21726#discussion_r2958164222
Added FetchTest to cover forPartition mutability, add() merging,
positionAdvanced propagation, and unmodifiable return maps.
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../kafka/clients/consumer/internals/Fetch.java | 31 +++--
.../clients/consumer/internals/FetchTest.java | 141 +++++++++++++++++++++
2 files changed, 161 insertions(+), 11 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetch.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetch.java
index 1c680a7ee7a..9c9f34ddf01 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetch.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetch.java
@@ -34,7 +34,7 @@ public class Fetch<K, V> {
private int numRecords;
public static <K, V> Fetch<K, V> empty() {
- return new Fetch<>(new HashMap<>(), false, 0, new HashMap<>());
+ return new Fetch<>(false, 0);
}
public static <K, V> Fetch<K, V> forPartition(
@@ -43,25 +43,34 @@ public class Fetch<K, V> {
boolean positionAdvanced,
OffsetAndMetadata nextOffsetAndMetadata
) {
- Map<TopicPartition, List<ConsumerRecord<K, V>>> recordsMap =
records.isEmpty()
- ? Map.of()
- : Map.of(partition, records);
- Map<TopicPartition, OffsetAndMetadata> nextOffsetAndMetadataMap =
Map.of(partition, nextOffsetAndMetadata);
- return new Fetch<>(recordsMap, positionAdvanced, records.size(),
nextOffsetAndMetadataMap);
+ return new Fetch<>(positionAdvanced, partition, records,
nextOffsetAndMetadata);
}
private Fetch(
- Map<TopicPartition, List<ConsumerRecord<K, V>>> records,
boolean positionAdvanced,
- int numRecords,
- Map<TopicPartition, OffsetAndMetadata> nextOffsetAndMetadata
+ int numRecords
) {
- this.records = records;
+ this.records = new HashMap<>();
this.positionAdvanced = positionAdvanced;
this.numRecords = numRecords;
- this.nextOffsetAndMetadata = nextOffsetAndMetadata;
+ this.nextOffsetAndMetadata = new HashMap<>();
}
+ private Fetch(
+ boolean positionAdvanced,
+ TopicPartition partition,
+ List<ConsumerRecord<K, V>> records,
+ OffsetAndMetadata offsetAndMetadata
+ ) {
+ this.records = new HashMap<>();
+ if (!records.isEmpty()) {
+ this.records.put(partition, records);
+ }
+ this.positionAdvanced = positionAdvanced;
+ this.numRecords = records.size();
+ this.nextOffsetAndMetadata = new HashMap<>();
+ this.nextOffsetAndMetadata.put(partition, offsetAndMetadata);
+ }
/**
* Add another {@link Fetch} to this one; all of its records will be added
to this fetch's
* {@link #records() records}, and if the other fetch
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchTest.java
new file mode 100644
index 00000000000..cec662b6333
--- /dev/null
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchTest.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.Optional;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class FetchTest {
+
+ private static final TopicPartition TP0 = new TopicPartition("topic", 0);
+ private static final TopicPartition TP1 = new TopicPartition("topic", 1);
+
+ @Test
+ void testAddToForPartitionFetch() {
+ var records0 = List.of(new ConsumerRecord<>("topic", 0, 0, "key0",
"value0"));
+ var target = Fetch.forPartition(TP0, records0, true, new
OffsetAndMetadata(1, Optional.empty(), ""));
+
+ var records1 = List.of(new ConsumerRecord<>("topic", 1, 0, "key1",
"value1"));
+ var source = Fetch.forPartition(TP1, records1, true, new
OffsetAndMetadata(1, Optional.empty(), ""));
+
+ target.add(source);
+
+ assertEquals(2, target.numRecords());
+ assertTrue(target.records().containsKey(TP0));
+ assertTrue(target.records().containsKey(TP1));
+ assertTrue(target.nextOffsets().containsKey(TP0));
+ assertTrue(target.nextOffsets().containsKey(TP1));
+ }
+
+ @Test
+ void testAddForPartitionFetchToEmptyFetch() {
+ Fetch<String, String> target = Fetch.empty();
+
+ var records = List.of(new ConsumerRecord<>("topic", 0, 0, "key",
"value"));
+ var source = Fetch.forPartition(TP0, records, true, new
OffsetAndMetadata(1, Optional.empty(), ""));
+
+ target.add(source);
+
+ assertEquals(1, target.numRecords());
+ assertTrue(target.records().containsKey(TP0));
+ assertTrue(target.nextOffsets().containsKey(TP0));
+ }
+
+ @Test
+ void testForPartitionWithEmptyRecords() {
+ var fetch = Fetch.forPartition(TP0, List.of(), true, new
OffsetAndMetadata(1, Optional.empty(), ""));
+
+ assertTrue(fetch.records().isEmpty());
+ assertEquals(0, fetch.numRecords());
+ assertTrue(fetch.positionAdvanced());
+ assertFalse(fetch.isEmpty());
+ assertTrue(fetch.nextOffsets().containsKey(TP0));
+ }
+
+ @Test
+ void testAddWithSamePartitionMergesRecords() {
+ var records0 = List.of(new ConsumerRecord<>("topic", 0, 0, "key0",
"value0"));
+ var records1 = List.of(new ConsumerRecord<>("topic", 0, 1, "key1",
"value1"));
+ var target = Fetch.forPartition(TP0, records0, true, new
OffsetAndMetadata(1, Optional.empty(), ""));
+ var source = Fetch.forPartition(TP0, records1, true, new
OffsetAndMetadata(2, Optional.empty(), ""));
+
+ target.add(source);
+
+ assertEquals(2, target.numRecords());
+ assertEquals(2, target.records().get(TP0).size());
+ }
+
+ @Test
+ void testAddPropagatesPositionAdvanced() {
+ var target = Fetch.forPartition(TP0, List.of(), false, new
OffsetAndMetadata(0, Optional.empty(), ""));
+ var source = Fetch.forPartition(TP1, List.of(), true, new
OffsetAndMetadata(1, Optional.empty(), ""));
+
+ assertFalse(target.positionAdvanced());
+
+ target.add(source);
+
+ assertTrue(target.positionAdvanced());
+ }
+
+ @Test
+ void testForPartitionWithoutPositionAdvanced() {
+ var records = List.of(new ConsumerRecord<>("topic", 0, 0, "key",
"value"));
+ var fetch = Fetch.forPartition(TP0, records, false, new
OffsetAndMetadata(1, Optional.empty(), ""));
+
+ assertFalse(fetch.positionAdvanced());
+ assertFalse(fetch.isEmpty());
+ assertEquals(1, fetch.numRecords());
+ }
+
+ @Test
+ void testRecordsReturnsUnmodifiableMap() {
+ var records = List.of(new ConsumerRecord<>("topic", 0, 0, "key",
"value"));
+ var fetch = Fetch.forPartition(TP0, records, true, new
OffsetAndMetadata(1, Optional.empty(), ""));
+
+ assertThrows(UnsupportedOperationException.class, () ->
fetch.records().put(TP1, List.of()));
+ }
+
+ @Test
+ void testNextOffsetsReturnsUnmodifiableMap() {
+ var records = List.of(new ConsumerRecord<>("topic", 0, 0, "key",
"value"));
+ var fetch = Fetch.forPartition(TP0, records, true, new
OffsetAndMetadata(1, Optional.empty(), ""));
+
+ assertThrows(UnsupportedOperationException.class,
+ () -> fetch.nextOffsets().put(TP1, new OffsetAndMetadata(99,
Optional.empty(), "")));
+ }
+
+ @Test
+ void testEmpty() {
+ Fetch<String, String> fetch = Fetch.empty();
+ assertTrue(fetch.isEmpty());
+ assertEquals(0, fetch.numRecords());
+ assertFalse(fetch.positionAdvanced());
+ assertTrue(fetch.records().isEmpty());
+ assertTrue(fetch.nextOffsets().isEmpty());
+ }
+}
\ No newline at end of file