This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.0 by this push:
     new bc714bf0deb MINOR: improve ListDeserializer (#22463)
bc714bf0deb is described below

commit bc714bf0deb34e7ed5697255dce1cbd95d4a236d
Author: Matthias J. Sax <[email protected]>
AuthorDate: Fri Jun 5 12:59:14 2026 -0700

    MINOR: improve ListDeserializer (#22463)
    
    Reviewers: Lucas Brutschy <[email protected]>
---
 .../common/serialization/ListDeserializer.java     |  41 ++++++-
 .../common/serialization/ListDeserializerTest.java | 123 ++++++++++++++++++++-
 2 files changed, 156 insertions(+), 8 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/serialization/ListDeserializer.java
 
b/clients/src/main/java/org/apache/kafka/common/serialization/ListDeserializer.java
index 4fe1313bda7..7b203465878 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/serialization/ListDeserializer.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/serialization/ListDeserializer.java
@@ -148,8 +148,14 @@ public class ListDeserializer<Inner> implements 
Deserializer<List<Inner>> {
         return SerializationStrategy.VALUES[serializationStrategyFlag];
     }
 
-    private List<Integer> deserializeNullIndexList(final DataInputStream dis) 
throws IOException {
+    private List<Integer> deserializeNullIndexList(final DataInputStream dis, 
final int length) throws IOException {
         int nullIndexListSize = dis.readInt();
+        if (nullIndexListSize < 0) {
+            throw new SerializationException("Corrupted byte[]. The number of 
null list entries cannot be negative.");
+        }
+        if (nullIndexListSize > length) {
+            throw new SerializationException("Corrupted byte[]. The number of 
null list entries cannot be larger than overall number of bytes.");
+        }
         List<Integer> nullIndexList = new ArrayList<>(nullIndexListSize);
         while (nullIndexListSize != 0) {
             nullIndexList.add(dis.readInt());
@@ -168,12 +174,13 @@ public class ListDeserializer<Inner> implements 
Deserializer<List<Inner>> {
             List<Integer> nullIndexList = null;
             if (serStrategy == SerializationStrategy.CONSTANT_SIZE) {
                 // In CONSTANT_SIZE strategy, indexes of null entries are 
decoded from a null index list
-                nullIndexList = deserializeNullIndexList(dis);
+                nullIndexList = deserializeNullIndexList(dis, data.length);
             }
-            final int size = dis.readInt();
+            final int size = readListSize(dis, data.length);
             List<Inner> deserializedList = createListInstance(size);
             for (int i = 0; i < size; i++) {
-                int entrySize = serStrategy == 
SerializationStrategy.CONSTANT_SIZE ? primitiveSize : dis.readInt();
+                int entrySize = readEntrySize(dis, serStrategy, data.length);
+
                 if (entrySize == ListSerde.NULL_ENTRY_VALUE || (nullIndexList 
!= null && nullIndexList.contains(i))) {
                     deserializedList.add(null);
                     continue;
@@ -192,6 +199,32 @@ public class ListDeserializer<Inner> implements 
Deserializer<List<Inner>> {
         }
     }
 
+    private int readListSize(final DataInputStream dis, final int length) 
throws IOException {
+        final int size = dis.readInt();
+        if (size < 0) {
+            throw new SerializationException("Corrupted byte[]. The number of 
list entries cannot be negative.");
+        }
+        if (size > length) {
+            throw new SerializationException("Corrupted byte[]. The number of 
list entries cannot be larger than overall number of bytes.");
+        }
+        return size;
+    }
+
+    private int readEntrySize(
+        final DataInputStream dis,
+        final SerializationStrategy serStrategy,
+        final int length
+    ) throws IOException {
+        final int entrySize = serStrategy == 
SerializationStrategy.CONSTANT_SIZE ? primitiveSize : dis.readInt();
+        if (entrySize < -1) { // value `-1` is valid, encoding a null entry 
(-> ListSerde.NULL_ENTRY_VALUE)
+            throw new SerializationException("Corrupted byte[]. A list entry 
cannot have negative size.");
+        }
+        if (entrySize > length) {
+            throw new SerializationException("Corrupted byte[]. A list entry 
cannot be larger than the overall number of bytes.");
+        }
+        return entrySize;
+    }
+
     @Override
     public void close() {
         if (inner != null) {
diff --git 
a/clients/src/test/java/org/apache/kafka/common/serialization/ListDeserializerTest.java
 
b/clients/src/test/java/org/apache/kafka/common/serialization/ListDeserializerTest.java
index 2bfb7a86334..eb5f5eb6780 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/serialization/ListDeserializerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/serialization/ListDeserializerTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.common.serialization;
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.errors.SerializationException;
 
 import org.junit.jupiter.api.Test;
 
@@ -227,8 +228,7 @@ public class ListDeserializerTest {
     public void 
testListKeyDeserializerShouldThrowConfigExceptionDueAlreadyInitialized() {
         props.put(CommonClientConfigs.DEFAULT_LIST_KEY_SERDE_TYPE_CLASS, 
ArrayList.class);
         props.put(CommonClientConfigs.DEFAULT_LIST_KEY_SERDE_INNER_CLASS, 
Serdes.StringSerde.class);
-        final ListDeserializer<Integer> initializedListDeserializer = new 
ListDeserializer<>(ArrayList.class,
-            Serdes.Integer().deserializer());
+        final ListDeserializer<Integer> initializedListDeserializer = new 
ListDeserializer<>(ArrayList.class, new IntegerDeserializer());
         final ConfigException exception = assertThrows(
             ConfigException.class,
             () -> initializedListDeserializer.configure(props, true)
@@ -240,8 +240,7 @@ public class ListDeserializerTest {
     public void 
testListValueDeserializerShouldThrowConfigExceptionDueAlreadyInitialized() {
         props.put(CommonClientConfigs.DEFAULT_LIST_VALUE_SERDE_TYPE_CLASS, 
ArrayList.class);
         props.put(CommonClientConfigs.DEFAULT_LIST_VALUE_SERDE_INNER_CLASS, 
Serdes.StringSerde.class);
-        final ListDeserializer<Integer> initializedListDeserializer = new 
ListDeserializer<>(ArrayList.class,
-            Serdes.Integer().deserializer());
+        final ListDeserializer<Integer> initializedListDeserializer = new 
ListDeserializer<>(ArrayList.class, new IntegerDeserializer());
         final ConfigException exception = assertThrows(
             ConfigException.class,
             () -> initializedListDeserializer.configure(props, true)
@@ -249,4 +248,120 @@ public class ListDeserializerTest {
         assertEquals("List deserializer was already initialized using a 
non-default constructor", exception.getMessage());
     }
 
+    @Test
+    public void shouldThrowOnNegativeLength() {
+        final byte[] corruptedData = new byte[] {
+            (byte) 
Serdes.ListSerde.SerializationStrategy.VARIABLE_SIZE.ordinal(),
+            (byte) 0xFF, (byte) 0xFF, (byte) 0xFF, (byte) 0xFF // encodes 
length == -1
+        };
+
+        final ListDeserializer<String> testDeserializer = new 
ListDeserializer<>(ArrayList.class, new StringDeserializer());
+
+        final SerializationException exception = assertThrows(
+            SerializationException.class,
+            () -> testDeserializer.deserialize(null, corruptedData)
+        );
+        assertEquals(
+            "Corrupted byte[]. The number of list entries cannot be negative.",
+            exception.getMessage()
+        );
+    }
+
+    @Test
+    public void shouldThrowOnTooLargeLength() {
+        final byte[] corruptedData = new byte[] {
+            (byte) 
Serdes.ListSerde.SerializationStrategy.VARIABLE_SIZE.ordinal(),
+            (byte) 0x00, (byte) 0x00, (byte) 0x00, (byte) 0xFF // encodes 
length 255
+        };
+
+        final ListDeserializer<String> testDeserializer = new 
ListDeserializer<>(ArrayList.class, new StringDeserializer());
+
+        final SerializationException exception = assertThrows(
+            SerializationException.class,
+            () -> testDeserializer.deserialize(null, corruptedData)
+        );
+        assertEquals(
+            "Corrupted byte[]. The number of list entries cannot be larger 
than overall number of bytes.",
+            exception.getMessage()
+        );
+    }
+
+    @Test
+    public void shouldThrowOnNegativeEntrySize() {
+        final byte[] corruptedData = new byte[] {
+            (byte) 
Serdes.ListSerde.SerializationStrategy.VARIABLE_SIZE.ordinal(),
+            (byte) 0x00, (byte) 0x00, (byte) 0x00, (byte) 0x01, // encodes 
length == 0
+            (byte) 0xFF, (byte) 0xFF, (byte) 0xFF, (byte) 0xFE // encodes 
entrySize == -2 (-1 would be a valid `null` entry)
+        };
+
+        final ListDeserializer<String> testDeserializer = new 
ListDeserializer<>(ArrayList.class, new StringDeserializer());
+
+        final SerializationException exception = assertThrows(
+            SerializationException.class,
+            () -> testDeserializer.deserialize(null, corruptedData)
+        );
+        assertEquals(
+            "Corrupted byte[]. A list entry cannot have negative size.",
+            exception.getMessage()
+        );
+    }
+
+    @Test
+    public void shouldThrowOnTooLargeEntrySize() {
+        final byte[] corruptedData = new byte[] {
+            (byte) 
Serdes.ListSerde.SerializationStrategy.VARIABLE_SIZE.ordinal(),
+            (byte) 0x00, (byte) 0x00, (byte) 0x00, (byte) 0x01, // encodes 
length == 1
+            (byte) 0x00, (byte) 0x00, (byte) 0x00, (byte) 0xFF, // encodes 
entrySize == 255
+        };
+
+        final ListDeserializer<String> testDeserializer = new 
ListDeserializer<>(ArrayList.class, new StringDeserializer());
+
+        final SerializationException exception = assertThrows(
+            SerializationException.class,
+            () -> testDeserializer.deserialize(null, corruptedData)
+        );
+        assertEquals(
+            "Corrupted byte[]. A list entry cannot be larger than the overall 
number of bytes.",
+            exception.getMessage()
+        );
+    }
+
+    @Test
+    public void shouldThrowOnNegativeNullEntryLength() {
+        final byte[] corruptedData = new byte[] {
+            (byte) 
Serdes.ListSerde.SerializationStrategy.CONSTANT_SIZE.ordinal(),
+            (byte) 0xFF, (byte) 0xFF, (byte) 0xFF, (byte) 0xFF // encodes 
number of null entries == -1
+        };
+
+        final ListDeserializer<String> testDeserializer = new 
ListDeserializer<>(ArrayList.class, new StringDeserializer());
+
+        final SerializationException exception = assertThrows(
+            SerializationException.class,
+            () -> testDeserializer.deserialize(null, corruptedData)
+        );
+        assertEquals(
+            "Corrupted byte[]. The number of null list entries cannot be 
negative.",
+            exception.getMessage()
+        );
+    }
+
+    @Test
+    public void shouldThrowOnTooLargeNullEntryLength() {
+        final byte[] corruptedData = new byte[] {
+            (byte) 
Serdes.ListSerde.SerializationStrategy.CONSTANT_SIZE.ordinal(),
+            (byte) 0x00, (byte) 0x00, (byte) 0x00, (byte) 0xFF // encodes 
number of null entries == 255
+        };
+
+        final ListDeserializer<String> testDeserializer = new 
ListDeserializer<>(ArrayList.class, new StringDeserializer());
+
+        final SerializationException exception = assertThrows(
+            SerializationException.class,
+            () -> testDeserializer.deserialize(null, corruptedData)
+        );
+        assertEquals(
+            "Corrupted byte[]. The number of null list entries cannot be 
larger than overall number of bytes.",
+            exception.getMessage()
+        );
+    }
+
 }

Reply via email to