This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch 3.9
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.9 by this push:
new 525ae0ab662 MINOR: improve ListDeserializer (#22463)
525ae0ab662 is described below
commit 525ae0ab6627d1b4a00cb53e29eff568e1e06c2b
Author: Matthias J. Sax <[email protected]>
AuthorDate: Fri Jun 5 12:59:14 2026 -0700
MINOR: improve ListDeserializer (#22463)
Reviewers: Lucas Brutschy <[email protected]>
---
.../common/serialization/ListDeserializer.java | 41 ++++++-
.../common/serialization/ListDeserializerTest.java | 123 ++++++++++++++++++++-
2 files changed, 156 insertions(+), 8 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/common/serialization/ListDeserializer.java
b/clients/src/main/java/org/apache/kafka/common/serialization/ListDeserializer.java
index 4fe1313bda7..7b203465878 100644
---
a/clients/src/main/java/org/apache/kafka/common/serialization/ListDeserializer.java
+++
b/clients/src/main/java/org/apache/kafka/common/serialization/ListDeserializer.java
@@ -148,8 +148,14 @@ public class ListDeserializer<Inner> implements
Deserializer<List<Inner>> {
return SerializationStrategy.VALUES[serializationStrategyFlag];
}
- private List<Integer> deserializeNullIndexList(final DataInputStream dis)
throws IOException {
+ private List<Integer> deserializeNullIndexList(final DataInputStream dis,
final int length) throws IOException {
int nullIndexListSize = dis.readInt();
+ if (nullIndexListSize < 0) {
+ throw new SerializationException("Corrupted byte[]. The number of
null list entries cannot be negative.");
+ }
+ if (nullIndexListSize > length) {
+ throw new SerializationException("Corrupted byte[]. The number of
null list entries cannot be larger than overall number of bytes.");
+ }
List<Integer> nullIndexList = new ArrayList<>(nullIndexListSize);
while (nullIndexListSize != 0) {
nullIndexList.add(dis.readInt());
@@ -168,12 +174,13 @@ public class ListDeserializer<Inner> implements
Deserializer<List<Inner>> {
List<Integer> nullIndexList = null;
if (serStrategy == SerializationStrategy.CONSTANT_SIZE) {
// In CONSTANT_SIZE strategy, indexes of null entries are
decoded from a null index list
- nullIndexList = deserializeNullIndexList(dis);
+ nullIndexList = deserializeNullIndexList(dis, data.length);
}
- final int size = dis.readInt();
+ final int size = readListSize(dis, data.length);
List<Inner> deserializedList = createListInstance(size);
for (int i = 0; i < size; i++) {
- int entrySize = serStrategy ==
SerializationStrategy.CONSTANT_SIZE ? primitiveSize : dis.readInt();
+ int entrySize = readEntrySize(dis, serStrategy, data.length);
+
if (entrySize == ListSerde.NULL_ENTRY_VALUE || (nullIndexList
!= null && nullIndexList.contains(i))) {
deserializedList.add(null);
continue;
@@ -192,6 +199,32 @@ public class ListDeserializer<Inner> implements
Deserializer<List<Inner>> {
}
}
+ private int readListSize(final DataInputStream dis, final int length)
throws IOException {
+ final int size = dis.readInt();
+ if (size < 0) {
+ throw new SerializationException("Corrupted byte[]. The number of
list entries cannot be negative.");
+ }
+ if (size > length) {
+ throw new SerializationException("Corrupted byte[]. The number of
list entries cannot be larger than overall number of bytes.");
+ }
+ return size;
+ }
+
+ private int readEntrySize(
+ final DataInputStream dis,
+ final SerializationStrategy serStrategy,
+ final int length
+ ) throws IOException {
+ final int entrySize = serStrategy ==
SerializationStrategy.CONSTANT_SIZE ? primitiveSize : dis.readInt();
+ if (entrySize < -1) { // value `-1` is valid, encoding a null entry
(-> ListSerde.NULL_ENTRY_VALUE)
+ throw new SerializationException("Corrupted byte[]. A list entry
cannot have negative size.");
+ }
+ if (entrySize > length) {
+ throw new SerializationException("Corrupted byte[]. A list entry
cannot be larger than the overall number of bytes.");
+ }
+ return entrySize;
+ }
+
@Override
public void close() {
if (inner != null) {
diff --git
a/clients/src/test/java/org/apache/kafka/common/serialization/ListDeserializerTest.java
b/clients/src/test/java/org/apache/kafka/common/serialization/ListDeserializerTest.java
index 2bfb7a86334..eb5f5eb6780 100644
---
a/clients/src/test/java/org/apache/kafka/common/serialization/ListDeserializerTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/serialization/ListDeserializerTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.common.serialization;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.errors.SerializationException;
import org.junit.jupiter.api.Test;
@@ -227,8 +228,7 @@ public class ListDeserializerTest {
public void
testListKeyDeserializerShouldThrowConfigExceptionDueAlreadyInitialized() {
props.put(CommonClientConfigs.DEFAULT_LIST_KEY_SERDE_TYPE_CLASS,
ArrayList.class);
props.put(CommonClientConfigs.DEFAULT_LIST_KEY_SERDE_INNER_CLASS,
Serdes.StringSerde.class);
- final ListDeserializer<Integer> initializedListDeserializer = new
ListDeserializer<>(ArrayList.class,
- Serdes.Integer().deserializer());
+ final ListDeserializer<Integer> initializedListDeserializer = new
ListDeserializer<>(ArrayList.class, new IntegerDeserializer());
final ConfigException exception = assertThrows(
ConfigException.class,
() -> initializedListDeserializer.configure(props, true)
@@ -240,8 +240,7 @@ public class ListDeserializerTest {
public void
testListValueDeserializerShouldThrowConfigExceptionDueAlreadyInitialized() {
props.put(CommonClientConfigs.DEFAULT_LIST_VALUE_SERDE_TYPE_CLASS,
ArrayList.class);
props.put(CommonClientConfigs.DEFAULT_LIST_VALUE_SERDE_INNER_CLASS,
Serdes.StringSerde.class);
- final ListDeserializer<Integer> initializedListDeserializer = new
ListDeserializer<>(ArrayList.class,
- Serdes.Integer().deserializer());
+ final ListDeserializer<Integer> initializedListDeserializer = new
ListDeserializer<>(ArrayList.class, new IntegerDeserializer());
final ConfigException exception = assertThrows(
ConfigException.class,
() -> initializedListDeserializer.configure(props, true)
@@ -249,4 +248,120 @@ public class ListDeserializerTest {
assertEquals("List deserializer was already initialized using a
non-default constructor", exception.getMessage());
}
+ @Test
+ public void shouldThrowOnNegativeLength() {
+ final byte[] corruptedData = new byte[] {
+ (byte)
Serdes.ListSerde.SerializationStrategy.VARIABLE_SIZE.ordinal(),
+ (byte) 0xFF, (byte) 0xFF, (byte) 0xFF, (byte) 0xFF // encodes
length == -1
+ };
+
+ final ListDeserializer<String> testDeserializer = new
ListDeserializer<>(ArrayList.class, new StringDeserializer());
+
+ final SerializationException exception = assertThrows(
+ SerializationException.class,
+ () -> testDeserializer.deserialize(null, corruptedData)
+ );
+ assertEquals(
+ "Corrupted byte[]. The number of list entries cannot be negative.",
+ exception.getMessage()
+ );
+ }
+
+ @Test
+ public void shouldThrowOnTooLargeLength() {
+ final byte[] corruptedData = new byte[] {
+ (byte)
Serdes.ListSerde.SerializationStrategy.VARIABLE_SIZE.ordinal(),
+ (byte) 0x00, (byte) 0x00, (byte) 0x00, (byte) 0xFF // encodes
length 255
+ };
+
+ final ListDeserializer<String> testDeserializer = new
ListDeserializer<>(ArrayList.class, new StringDeserializer());
+
+ final SerializationException exception = assertThrows(
+ SerializationException.class,
+ () -> testDeserializer.deserialize(null, corruptedData)
+ );
+ assertEquals(
+ "Corrupted byte[]. The number of list entries cannot be larger
than overall number of bytes.",
+ exception.getMessage()
+ );
+ }
+
+ @Test
+ public void shouldThrowOnNegativeEntrySize() {
+ final byte[] corruptedData = new byte[] {
+ (byte)
Serdes.ListSerde.SerializationStrategy.VARIABLE_SIZE.ordinal(),
+ (byte) 0x00, (byte) 0x00, (byte) 0x00, (byte) 0x01, // encodes
length == 0
+ (byte) 0xFF, (byte) 0xFF, (byte) 0xFF, (byte) 0xFE // encodes
entrySize == -2 (-1 would be a valid `null` entry)
+ };
+
+ final ListDeserializer<String> testDeserializer = new
ListDeserializer<>(ArrayList.class, new StringDeserializer());
+
+ final SerializationException exception = assertThrows(
+ SerializationException.class,
+ () -> testDeserializer.deserialize(null, corruptedData)
+ );
+ assertEquals(
+ "Corrupted byte[]. A list entry cannot have negative size.",
+ exception.getMessage()
+ );
+ }
+
+ @Test
+ public void shouldThrowOnTooLargeEntrySize() {
+ final byte[] corruptedData = new byte[] {
+ (byte)
Serdes.ListSerde.SerializationStrategy.VARIABLE_SIZE.ordinal(),
+ (byte) 0x00, (byte) 0x00, (byte) 0x00, (byte) 0x01, // encodes
length == 1
+ (byte) 0x00, (byte) 0x00, (byte) 0x00, (byte) 0xFF, // encodes
entrySize == 255
+ };
+
+ final ListDeserializer<String> testDeserializer = new
ListDeserializer<>(ArrayList.class, new StringDeserializer());
+
+ final SerializationException exception = assertThrows(
+ SerializationException.class,
+ () -> testDeserializer.deserialize(null, corruptedData)
+ );
+ assertEquals(
+ "Corrupted byte[]. A list entry cannot be larger than the overall
number of bytes.",
+ exception.getMessage()
+ );
+ }
+
+ @Test
+ public void shouldThrowOnNegativeNullEntryLength() {
+ final byte[] corruptedData = new byte[] {
+ (byte)
Serdes.ListSerde.SerializationStrategy.CONSTANT_SIZE.ordinal(),
+ (byte) 0xFF, (byte) 0xFF, (byte) 0xFF, (byte) 0xFF // encodes
number of null entries == -1
+ };
+
+ final ListDeserializer<String> testDeserializer = new
ListDeserializer<>(ArrayList.class, new StringDeserializer());
+
+ final SerializationException exception = assertThrows(
+ SerializationException.class,
+ () -> testDeserializer.deserialize(null, corruptedData)
+ );
+ assertEquals(
+ "Corrupted byte[]. The number of null list entries cannot be
negative.",
+ exception.getMessage()
+ );
+ }
+
+ @Test
+ public void shouldThrowOnTooLargeNullEntryLength() {
+ final byte[] corruptedData = new byte[] {
+ (byte)
Serdes.ListSerde.SerializationStrategy.CONSTANT_SIZE.ordinal(),
+ (byte) 0x00, (byte) 0x00, (byte) 0x00, (byte) 0xFF // encodes
number of null entries == 255
+ };
+
+ final ListDeserializer<String> testDeserializer = new
ListDeserializer<>(ArrayList.class, new StringDeserializer());
+
+ final SerializationException exception = assertThrows(
+ SerializationException.class,
+ () -> testDeserializer.deserialize(null, corruptedData)
+ );
+ assertEquals(
+ "Corrupted byte[]. The number of null list entries cannot be
larger than overall number of bytes.",
+ exception.getMessage()
+ );
+ }
+
}