This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 0fc80603026 MINOR: perf optimization for header serialization and type 
conversion (#21762)
0fc80603026 is described below

commit 0fc806030265f0978644a8ae0d4c6d76c22ab704
Author: Matthias J. Sax <[email protected]>
AuthorDate: Mon Mar 16 12:48:22 2026 -0700

    MINOR: perf optimization for header serialization and type conversion 
(#21762)
    
    This PR replaces the usage of ByteArrayOutputStreams with ByteBuffers.
    Some manual benchmarks show a perf improvement for the value-ts-header
    and session-header converters of 3x.
    
    Reviewers: PoAn Yang <[email protected]>, Nilesh Kumar
     <[email protected]>, aliehsaeedii <[email protected]>,
     Chia-Ping Tsai <[email protected]>
---
 .../AggregationWithHeadersSerializer.java          |  29 +++---
 .../streams/state/internals/HeadersSerializer.java | 106 ++++++++++++++-------
 .../streams/state/internals/RecordConverters.java  |  67 ++++++-------
 .../internals/ValueTimestampHeadersSerializer.java |  33 ++-----
 .../state/internals/HeadersDeserializerTest.java   |  43 +++++++--
 .../state/internals/HeadersSerializerTest.java     |  41 ++++++--
 .../state/internals/RecordConvertersTest.java      |  23 ++++-
 7 files changed, 216 insertions(+), 126 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AggregationWithHeadersSerializer.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AggregationWithHeadersSerializer.java
index 1781d1fa970..4792052aa9e 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AggregationWithHeadersSerializer.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AggregationWithHeadersSerializer.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.streams.state.internals;
 
-import org.apache.kafka.common.errors.SerializationException;
 import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.ByteUtils;
@@ -24,9 +23,7 @@ import 
org.apache.kafka.streams.kstream.internals.WrappingNullableSerializer;
 import org.apache.kafka.streams.processor.internals.SerdeGetter;
 import org.apache.kafka.streams.state.AggregationWithHeaders;
 
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.Map;
 import java.util.Objects;
 
@@ -75,23 +72,27 @@ class AggregationWithHeadersSerializer<AGG> implements 
WrappingNullableSerialize
 
         final byte[] rawAggregation = aggregationSerializer.serialize(topic, 
headers, plainAggregation);
 
+        // Since we can't control the result of the internal serializer, we 
make sure that the result
+        // is not null as well.
+        // Serializing non-null values to null can be useful when working with 
Optional-like values
+        // where the Optional.empty case is serialized to null.
+        // See the discussion here: https://github.com/apache/kafka/pull/7679
         if (rawAggregation == null) {
             return null;
         }
 
-        final byte[] rawHeaders = HeadersSerializer.serialize(headers);
+        final HeadersSerializer.PreSerializedHeaders preSerializedHeaders = 
HeadersSerializer.prepareSerialization(headers);
 
-        try (final ByteArrayOutputStream baos = new ByteArrayOutputStream();
-             final DataOutputStream out = new DataOutputStream(baos)) {
+        final int payloadSize = 
preSerializedHeaders.requiredBufferSizeForHeaders + rawAggregation.length;
 
-            ByteUtils.writeVarint(rawHeaders.length, out);
-            out.write(rawHeaders);
-            out.write(rawAggregation);
+        // Format: [headersSize(varint)][headersBytes][value]
+        final ByteBuffer buffer = 
ByteBuffer.allocate(ByteUtils.sizeOfVarint(preSerializedHeaders.requiredBufferSizeForHeaders)
 + payloadSize);
+        
ByteUtils.writeVarint(preSerializedHeaders.requiredBufferSizeForHeaders, 
buffer);
 
-            return baos.toByteArray();
-        } catch (final IOException e) {
-            throw new SerializationException("Failed to serialize 
AggregationWithHeaders on topic: " + topic, e);
-        }
+        // empty (byte[0]) for null/empty headers, or 
[count][header1][header2]... for non-empty
+        return HeadersSerializer.serialize(preSerializedHeaders, buffer)
+            .put(rawAggregation)
+            .array();
     }
 
     @Override
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/HeadersSerializer.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/HeadersSerializer.java
index f8c664c7820..6608a55f67b 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/HeadersSerializer.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/HeadersSerializer.java
@@ -16,14 +16,11 @@
  */
 package org.apache.kafka.streams.state.internals;
 
-import org.apache.kafka.common.errors.SerializationException;
 import org.apache.kafka.common.header.Header;
 import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.utils.ByteUtils;
 
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 
 /**
@@ -48,8 +45,59 @@ import java.nio.charset.StandardCharsets;
  */
 class HeadersSerializer {
 
+    static final class PreSerializedHeaders {
+        final int requiredBufferSizeForHeaders;
+        final byte[][] rawHeaderKeys;
+        final byte[][] rawHeaderValues;
+
+        PreSerializedHeaders(
+            final int requiredBufferSizeForHeaders,
+            final byte[][] rawHeaderKeys,
+            final byte[][] rawHeaderValues
+        ) {
+            this.requiredBufferSizeForHeaders = requiredBufferSizeForHeaders;
+            this.rawHeaderKeys = rawHeaderKeys;
+            this.rawHeaderValues = rawHeaderValues;
+        }
+    }
+
+    public static PreSerializedHeaders prepareSerialization(final Headers 
headers) {
+        final Header[] headersArray = (headers == null) ? new Header[0] : 
headers.toArray();
+
+        if (headersArray.length == 0) {
+            return new PreSerializedHeaders(0, null, null);
+        }
+
+        // we first compute the size for the buffer we need,
+        // so we can allocate the whole buffer at once later
+
+        // cache to avoid translating String header-keys to byte[] twice
+        final byte[][] serializerHeaderKeys = new byte[headersArray.length][];
+        final byte[][] serializedHeaderValues = new 
byte[headersArray.length][];
+
+        // start with varint encoding of header count
+        int requiredBufferSizeForHeaders = 
ByteUtils.sizeOfVarint(headersArray.length);
+
+        int i = 0;
+        for (final Header header : headersArray) {
+            serializerHeaderKeys[i] = 
header.key().getBytes(StandardCharsets.UTF_8);
+            requiredBufferSizeForHeaders += 
ByteUtils.sizeOfVarint(serializerHeaderKeys[i].length) + 
serializerHeaderKeys[i].length;
+
+            serializedHeaderValues[i] = header.value();
+            if (serializedHeaderValues[i] == null) {
+                ++requiredBufferSizeForHeaders;
+            } else {
+                requiredBufferSizeForHeaders += 
ByteUtils.sizeOfVarint(serializedHeaderValues[i].length) + 
serializedHeaderValues[i].length;
+            }
+
+            ++i;
+        }
+
+        return new PreSerializedHeaders(requiredBufferSizeForHeaders, 
serializerHeaderKeys, serializedHeaderValues);
+    }
+
     /**
-     * Serializes headers into a byte array using varint encoding per KIP-1271.
+     * Serializes headers into a ByteBuffer using varint encoding per KIP-1271.
      * <p>
      * The output format is [count][header1][header2]... without a size prefix.
      * The size prefix is added by the outer serializer that uses this.
@@ -57,41 +105,31 @@ class HeadersSerializer {
      * For null or empty headers, returns an empty byte array (0 bytes)
      * instead of encoding headerCount=0 (1 byte).
      *
-     * @param headers the headers to serialize (can be null)
-     * @return the serialized byte array (empty array if headers are null or 
empty)
+     * @param preSerializedHeaders the preSerializedHeaders
+     * @param buffer the buffer to write the serialized header into (it's 
expected that the buffer position is set correctly)
+     * @return the modified {@code buffer} containing the serializer headers 
(empty array if headers are null or empty),
+     * with corresponding advanced position
      */
-    public static byte[] serialize(final Headers headers) {
-        final Header[] headersArray = (headers == null) ? new Header[0] : 
headers.toArray();
-
-        if (headersArray.length == 0) {
-            return new byte[0];
+    public static ByteBuffer serialize(final PreSerializedHeaders 
preSerializedHeaders, final ByteBuffer buffer) {
+        if (preSerializedHeaders.requiredBufferSizeForHeaders == 0) {
+            return buffer;
         }
 
-        try (final ByteArrayOutputStream baos = new ByteArrayOutputStream();
-             final DataOutputStream out = new DataOutputStream(baos)) {
-
-            ByteUtils.writeVarint(headersArray.length, out);
+        final int numberOfHeaders = preSerializedHeaders.rawHeaderKeys.length;
 
-            for (final Header header : headersArray) {
-                final byte[] keyBytes = 
header.key().getBytes(StandardCharsets.UTF_8);
-                final byte[] valueBytes = header.value();
+        ByteUtils.writeVarint(numberOfHeaders, buffer);
+        for (int i = 0; i < numberOfHeaders; ++i) {
+            
ByteUtils.writeVarint(preSerializedHeaders.rawHeaderKeys[i].length, buffer);
+            buffer.put(preSerializedHeaders.rawHeaderKeys[i]);
 
-                ByteUtils.writeVarint(keyBytes.length, out);
-                out.write(keyBytes);
-
-                // Write value length and value bytes (varint + raw bytes)
-                // null is represented as -1, encoded as varint
-                if (valueBytes == null) {
-                    ByteUtils.writeVarint(-1, out);
-                } else {
-                    ByteUtils.writeVarint(valueBytes.length, out);
-                    out.write(valueBytes);
-                }
+            if (preSerializedHeaders.rawHeaderValues[i] != null) {
+                
ByteUtils.writeVarint(preSerializedHeaders.rawHeaderValues[i].length, buffer);
+                buffer.put(preSerializedHeaders.rawHeaderValues[i]);
+            } else {
+                buffer.put((byte) 0x01); // hardcoded varint encoding for `-1`
             }
-
-            return baos.toByteArray();
-        } catch (final IOException e) {
-            throw new SerializationException("Failed to serialize headers", e);
         }
+
+        return buffer;
     }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RecordConverters.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RecordConverters.java
index 118bc019b5f..058edad52c8 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RecordConverters.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RecordConverters.java
@@ -17,14 +17,9 @@
 package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.common.errors.SerializationException;
 import org.apache.kafka.common.header.Headers;
-import org.apache.kafka.common.serialization.LongSerializer;
 import org.apache.kafka.common.utils.ByteUtils;
 
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
 import java.nio.ByteBuffer;
 
 public final class RecordConverters {
@@ -33,7 +28,7 @@ public final class RecordConverters {
     private static final RecordConverter RAW_TO_TIMESTAMED_INSTANCE = record 
-> {
         final byte[] rawValue = record.value();
         final long timestamp = record.timestamp();
-        final byte[] recordValue = rawValue == null ? null :
+        final byte[] recordValueWithTimestamp = rawValue == null ? null :
             ByteBuffer.allocate(8 + rawValue.length)
                 .putLong(timestamp)
                 .put(rawValue)
@@ -45,9 +40,9 @@ public final class RecordConverters {
             timestamp,
             record.timestampType(),
             record.serializedKeySize(),
-            record.serializedValueSize(),
+            recordValueWithTimestamp != null ? recordValueWithTimestamp.length 
: 0,
             record.key(),
-            recordValue,
+            recordValueWithTimestamp,
             record.headers(),
             record.leaderEpoch()
         );
@@ -57,7 +52,7 @@ public final class RecordConverters {
         final byte[] rawValue = record.value();
 
         // Format: [headersSize(varint)][headersBytes][timestamp(8)][value]
-        final byte[] recordValue = reconstructFromRaw(
+        final byte[] recordValueWithTimestampAndHeaders = reconstructFromRaw(
             rawValue,
             record.timestamp(),
             record.headers()
@@ -70,9 +65,9 @@ public final class RecordConverters {
             record.timestamp(),
             record.timestampType(),
             record.serializedKeySize(),
-            record.serializedValueSize(),
+            recordValueWithTimestampAndHeaders != null ? 
recordValueWithTimestampAndHeaders.length : 0,
             record.key(),
-            recordValue,
+            recordValueWithTimestampAndHeaders,
             record.headers(),
             record.leaderEpoch()
         );
@@ -86,7 +81,7 @@ public final class RecordConverters {
         final byte[] rawValue = record.value();
 
         // Format: [headersSize(varint)][headersBytes][aggregation] (no 
timestamp)
-        final byte[] recordValue = reconstructSessionFromRaw(
+        final byte[] recordValueWithHeaders = reconstructSessionFromRaw(
             rawValue,
             record.headers()
         );
@@ -98,9 +93,9 @@ public final class RecordConverters {
             record.timestamp(),
             record.timestampType(),
             record.serializedKeySize(),
-            record.serializedValueSize(),
+            recordValueWithHeaders != null ? recordValueWithHeaders.length : 0,
             record.key(),
-            recordValue,
+            recordValueWithHeaders,
             record.headers(),
             record.leaderEpoch()
         );
@@ -133,19 +128,18 @@ public final class RecordConverters {
         if (rawValue == null) {
             return null;
         }
-        final byte[] rawHeaders = HeadersSerializer.serialize(headers);
 
-        try (final ByteArrayOutputStream baos = new ByteArrayOutputStream();
-             final DataOutputStream out = new DataOutputStream(baos)) {
+        final HeadersSerializer.PreSerializedHeaders preSerializedHeaders = 
HeadersSerializer.prepareSerialization(headers);
 
-            ByteUtils.writeVarint(rawHeaders.length, out);
-            out.write(rawHeaders);
-            out.write(rawValue);
+        final int payloadSize = 
preSerializedHeaders.requiredBufferSizeForHeaders + rawValue.length;
 
-            return baos.toByteArray();
-        } catch (final IOException e) {
-            throw new SerializationException("Failed to reconstruct 
AggregationWithHeaders", e);
-        }
+        // Format: [headersSize(varint)][headersBytes][value]
+        final ByteBuffer buffer = 
ByteBuffer.allocate(ByteUtils.sizeOfVarint(preSerializedHeaders.requiredBufferSizeForHeaders)
 + payloadSize);
+        
ByteUtils.writeVarint(preSerializedHeaders.requiredBufferSizeForHeaders, 
buffer);
+
+        return HeadersSerializer.serialize(preSerializedHeaders, buffer)
+            .put(rawValue)
+            .array();
     }
 
     /**
@@ -161,23 +155,18 @@ public final class RecordConverters {
         if (rawValue == null) {
             return null;
         }
-        final byte[] rawTimestamp;
-        try (LongSerializer timestampSerializer = new LongSerializer()) {
-            rawTimestamp = timestampSerializer.serialize("", timestamp);
-        }
-        final byte[] rawHeaders = HeadersSerializer.serialize(headers);
 
-        try (final ByteArrayOutputStream baos = new ByteArrayOutputStream();
-             final DataOutputStream out = new DataOutputStream(baos)) {
+        final HeadersSerializer.PreSerializedHeaders preSerializedHeaders = 
HeadersSerializer.prepareSerialization(headers);
 
-            ByteUtils.writeVarint(rawHeaders.length, out);
-            out.write(rawHeaders);
-            out.write(rawTimestamp);
-            out.write(rawValue);
+        final int payloadSize = 
preSerializedHeaders.requiredBufferSizeForHeaders + 8 + rawValue.length;
 
-            return baos.toByteArray();
-        } catch (final IOException e) {
-            throw new SerializationException("Failed to reconstruct 
ValueTimestampHeaders", e);
-        }
+        // Format: [headersSize(varint)][headersBytes][timestamp(8)][value]
+        final ByteBuffer buffer = 
ByteBuffer.allocate(ByteUtils.sizeOfVarint(preSerializedHeaders.requiredBufferSizeForHeaders)
 + payloadSize);
+        
ByteUtils.writeVarint(preSerializedHeaders.requiredBufferSizeForHeaders, 
buffer);
+
+        return HeadersSerializer.serialize(preSerializedHeaders, buffer)
+            .putLong(timestamp)
+            .put(rawValue)
+            .array();
     }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueTimestampHeadersSerializer.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueTimestampHeadersSerializer.java
index 1da1ad1f65f..f4e6fe07a22 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueTimestampHeadersSerializer.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueTimestampHeadersSerializer.java
@@ -16,18 +16,14 @@
  */
 package org.apache.kafka.streams.state.internals;
 
-import org.apache.kafka.common.errors.SerializationException;
 import org.apache.kafka.common.header.Headers;
-import org.apache.kafka.common.serialization.LongSerializer;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.ByteUtils;
 import org.apache.kafka.streams.kstream.internals.WrappingNullableSerializer;
 import org.apache.kafka.streams.processor.internals.SerdeGetter;
 import org.apache.kafka.streams.state.ValueTimestampHeaders;
 
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.Map;
 import java.util.Objects;
 
@@ -51,18 +47,15 @@ import static 
org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.i
  */
 class ValueTimestampHeadersSerializer<V> implements 
WrappingNullableSerializer<ValueTimestampHeaders<V>, Void, V> {
     public final Serializer<V> valueSerializer;
-    private final LongSerializer timestampSerializer;
 
     ValueTimestampHeadersSerializer(final Serializer<V> valueSerializer) {
         Objects.requireNonNull(valueSerializer);
         this.valueSerializer = valueSerializer;
-        this.timestampSerializer = new LongSerializer();
     }
 
     @Override
     public void configure(final Map<String, ?> configs, final boolean isKey) {
         valueSerializer.configure(configs, isKey);
-        timestampSerializer.configure(configs, isKey);
     }
 
     @Override
@@ -89,30 +82,24 @@ class ValueTimestampHeadersSerializer<V> implements 
WrappingNullableSerializer<V
             return null;
         }
 
-        final byte[] rawTimestamp = timestampSerializer.serialize(topic, 
timestamp);
+        final HeadersSerializer.PreSerializedHeaders preSerializedHeaders = 
HeadersSerializer.prepareSerialization(headers);
 
-        // empty (byte[0]) for null/empty headers, or 
[count][header1][header2]... for non-empty
-        final byte[] rawHeaders = HeadersSerializer.serialize(headers);
+        final int payloadSize = 
preSerializedHeaders.requiredBufferSizeForHeaders + 8 + rawValue.length;
 
         // Format: [headersSize(varint)][headersBytes][timestamp(8)][value]
-        try (final ByteArrayOutputStream baos = new ByteArrayOutputStream();
-             final DataOutputStream out = new DataOutputStream(baos)) {
-
-            ByteUtils.writeVarint(rawHeaders.length, out);  // headersSize (it 
may be 0 due to null/empty headers)
-            out.write(rawHeaders);                          // empty (byte[0]) 
for null/empty headers, or [count][header1][header2]... for non-empty
-            out.write(rawTimestamp);                        // [timestamp(8)]
-            out.write(rawValue);                            // [value]
+        final ByteBuffer buffer = 
ByteBuffer.allocate(ByteUtils.sizeOfVarint(preSerializedHeaders.requiredBufferSizeForHeaders)
 + payloadSize);
+        
ByteUtils.writeVarint(preSerializedHeaders.requiredBufferSizeForHeaders, 
buffer);
 
-            return baos.toByteArray();
-        } catch (final IOException e) {
-            throw new SerializationException("Failed to serialize 
ValueTimestampHeaders", e);
-        }
+        // empty (byte[0]) for null/empty headers, or 
[count][header1][header2]... for non-empty
+        return HeadersSerializer.serialize(preSerializedHeaders, buffer)
+            .putLong(timestamp)
+            .put(rawValue)
+            .array();
     }
 
     @Override
     public void close() {
         valueSerializer.close();
-        timestampSerializer.close();
     }
 
     @Override
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/HeadersDeserializerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/HeadersDeserializerTest.java
index 8f6e1ce3d7a..451f534c392 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/HeadersDeserializerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/HeadersDeserializerTest.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.header.internals.RecordHeaders;
 
 import org.junit.jupiter.api.Test;
 
+import java.nio.ByteBuffer;
 import java.util.Iterator;
 
 import static org.junit.jupiter.api.Assertions.assertArrayEquals;
@@ -49,8 +50,9 @@ public class HeadersDeserializerTest {
 
     @Test
     public void shouldRoundTripEmptyHeaders() {
-        final Headers original = new RecordHeaders();
-        final byte[] serialized = HeadersSerializer.serialize(original);
+        final HeadersSerializer.PreSerializedHeaders preSerializedHeaders = 
HeadersSerializer.prepareSerialization(new RecordHeaders());
+        final byte[] serialized = 
HeadersSerializer.serialize(preSerializedHeaders, 
ByteBuffer.allocate(0)).array();
+
         final Headers deserialized = 
HeadersDeserializer.deserialize(serialized);
 
         assertNotNull(deserialized);
@@ -61,7 +63,12 @@ public class HeadersDeserializerTest {
     public void shouldRoundTripSingleHeader() {
         final Headers original = new RecordHeaders()
             .add("key1", "value1".getBytes());
-        final byte[] serialized = HeadersSerializer.serialize(original);
+        final HeadersSerializer.PreSerializedHeaders preSerializedHeaders = 
HeadersSerializer.prepareSerialization(original);
+        final byte[] serialized = HeadersSerializer.serialize(
+            preSerializedHeaders,
+            
ByteBuffer.allocate(preSerializedHeaders.requiredBufferSizeForHeaders)
+        ).array();
+
         final Headers deserialized = 
HeadersDeserializer.deserialize(serialized);
 
         assertNotNull(deserialized);
@@ -79,8 +86,14 @@ public class HeadersDeserializerTest {
             .add("key0", "value0".getBytes())
             .add("key1", "value1".getBytes())
             .add("key2", "value2".getBytes());
-        final byte[] serialized = HeadersSerializer.serialize(original);
+        final HeadersSerializer.PreSerializedHeaders preSerializedHeaders = 
HeadersSerializer.prepareSerialization(original);
+        final byte[] serialized = HeadersSerializer.serialize(
+            preSerializedHeaders,
+            
ByteBuffer.allocate(preSerializedHeaders.requiredBufferSizeForHeaders)
+        ).array();
+
         final Headers deserialized = 
HeadersDeserializer.deserialize(serialized);
+
         assertNotNull(deserialized);
 
         final Header[] headerArray = deserialized.toArray();
@@ -96,7 +109,12 @@ public class HeadersDeserializerTest {
     public void shouldRoundTripHeaderWithNullValue() {
         final Headers original = new RecordHeaders()
             .add("key1", null);
-        final byte[] serialized = HeadersSerializer.serialize(original);
+        final HeadersSerializer.PreSerializedHeaders preSerializedHeaders = 
HeadersSerializer.prepareSerialization(original);
+        final byte[] serialized = HeadersSerializer.serialize(
+            preSerializedHeaders,
+            
ByteBuffer.allocate(preSerializedHeaders.requiredBufferSizeForHeaders)
+        ).array();
+
         final Headers deserialized = 
HeadersDeserializer.deserialize(serialized);
 
         assertNotNull(deserialized);
@@ -112,7 +130,12 @@ public class HeadersDeserializerTest {
     public void shouldRoundTripHeaderWithEmptyValue() {
         final Headers original = new RecordHeaders()
             .add("key1", new byte[0]);
-        final byte[] serialized = HeadersSerializer.serialize(original);
+        final HeadersSerializer.PreSerializedHeaders preSerializedHeaders = 
HeadersSerializer.prepareSerialization(original);
+        final byte[] serialized = HeadersSerializer.serialize(
+            preSerializedHeaders,
+            
ByteBuffer.allocate(preSerializedHeaders.requiredBufferSizeForHeaders)
+        ).array();
+
         final Headers deserialized = 
HeadersDeserializer.deserialize(serialized);
 
         assertNotNull(deserialized);
@@ -132,8 +155,14 @@ public class HeadersDeserializerTest {
             .add("key1", "value1".getBytes())
             .add("key2", "value2".getBytes())
             .add("key2", "value3".getBytes());
-        final byte[] serialized = HeadersSerializer.serialize(original);
+        final HeadersSerializer.PreSerializedHeaders preSerializedHeaders = 
HeadersSerializer.prepareSerialization(original);
+        final byte[] serialized = HeadersSerializer.serialize(
+            preSerializedHeaders,
+            
ByteBuffer.allocate(preSerializedHeaders.requiredBufferSizeForHeaders)
+        ).array();
+
         final Headers deserialized = 
HeadersDeserializer.deserialize(serialized);
+
         assertNotNull(deserialized);
 
         final Header[] headerArray = deserialized.toArray();
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/HeadersSerializerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/HeadersSerializerTest.java
index 7c791c93984..259449d2849 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/HeadersSerializerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/HeadersSerializerTest.java
@@ -22,6 +22,8 @@ import org.apache.kafka.common.header.internals.RecordHeaders;
 
 import org.junit.jupiter.api.Test;
 
+import java.nio.ByteBuffer;
+
 import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -32,7 +34,10 @@ public class HeadersSerializerTest {
 
     @Test
     public void shouldSerializeNullHeaders() {
-        final byte[] serialized = HeadersSerializer.serialize(null);
+        final HeadersSerializer.PreSerializedHeaders preSerializedHeaders = 
HeadersSerializer.prepareSerialization(null);
+        assertEquals(0, preSerializedHeaders.requiredBufferSizeForHeaders);
+
+        final byte[] serialized = 
HeadersSerializer.serialize(preSerializedHeaders, 
ByteBuffer.allocate(0)).array();
 
         assertNotNull(serialized);
         assertEquals(0, serialized.length, "Null headers should serialize to 
empty byte array (0 bytes)");
@@ -40,8 +45,10 @@ public class HeadersSerializerTest {
 
     @Test
     public void shouldSerializeEmptyHeaders() {
-        final Headers headers = new RecordHeaders();
-        final byte[] serialized = HeadersSerializer.serialize(headers);
+        final HeadersSerializer.PreSerializedHeaders preSerializedHeaders = 
HeadersSerializer.prepareSerialization(new RecordHeaders());
+        assertEquals(0, preSerializedHeaders.requiredBufferSizeForHeaders);
+
+        final byte[] serialized = 
HeadersSerializer.serialize(preSerializedHeaders, 
ByteBuffer.allocate(0)).array();
 
         assertNotNull(serialized);
         assertEquals(0, serialized.length, "Empty headers should serialize to 
empty byte array (0 bytes)");
@@ -51,7 +58,12 @@ public class HeadersSerializerTest {
     public void shouldSerializeSingleHeader() {
         final Headers headers = new RecordHeaders()
             .add("key1", "value1".getBytes());
-        final byte[] serialized = HeadersSerializer.serialize(headers);
+        final HeadersSerializer.PreSerializedHeaders preSerializedHeaders = 
HeadersSerializer.prepareSerialization(headers);
+
+        final byte[] serialized = HeadersSerializer.serialize(
+            preSerializedHeaders,
+            
ByteBuffer.allocate(preSerializedHeaders.requiredBufferSizeForHeaders)
+        ).array();
 
         assertNotNull(serialized);
         assertTrue(serialized.length > 0);
@@ -72,7 +84,12 @@ public class HeadersSerializerTest {
             .add("key0", "value0".getBytes())
             .add("key1", "value1".getBytes())
             .add("key2", "value2".getBytes());
-        final byte[] serialized = HeadersSerializer.serialize(headers);
+        final HeadersSerializer.PreSerializedHeaders preSerializedHeaders = 
HeadersSerializer.prepareSerialization(headers);
+
+        final byte[] serialized = HeadersSerializer.serialize(
+            preSerializedHeaders,
+            
ByteBuffer.allocate(preSerializedHeaders.requiredBufferSizeForHeaders)
+        ).array();
 
         assertNotNull(serialized);
         assertTrue(serialized.length > 0);
@@ -93,7 +110,12 @@ public class HeadersSerializerTest {
     public void shouldSerializeHeaderWithNullValue() {
         final Headers headers = new RecordHeaders()
             .add("key1", null);
-        final byte[] serialized = HeadersSerializer.serialize(headers);
+        final HeadersSerializer.PreSerializedHeaders preSerializedHeaders = 
HeadersSerializer.prepareSerialization(headers);
+
+        final byte[] serialized = HeadersSerializer.serialize(
+            preSerializedHeaders,
+            
ByteBuffer.allocate(preSerializedHeaders.requiredBufferSizeForHeaders)
+        ).array();
 
         assertNotNull(serialized);
         assertTrue(serialized.length > 0);
@@ -112,7 +134,12 @@ public class HeadersSerializerTest {
     public void shouldSerializeHeadersWithEmptyValue() {
         final Headers headers = new RecordHeaders()
             .add("key1", new byte[0]);
-        final byte[] serialized = HeadersSerializer.serialize(headers);
+        final HeadersSerializer.PreSerializedHeaders preSerializedHeaders = 
HeadersSerializer.prepareSerialization(headers);
+
+        final byte[] serialized = HeadersSerializer.serialize(
+            preSerializedHeaders,
+            
ByteBuffer.allocate(preSerializedHeaders.requiredBufferSizeForHeaders)
+        ).array();
 
         assertNotNull(serialized);
         assertTrue(serialized.length > 0);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RecordConvertersTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RecordConvertersTest.java
index 93d08c00293..d28334c1cc1 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RecordConvertersTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RecordConvertersTest.java
@@ -27,6 +27,7 @@ import java.nio.ByteBuffer;
 import java.util.Optional;
 
 import static 
org.apache.kafka.streams.state.internals.RecordConverters.rawValueToHeadersValue;
+import static 
org.apache.kafka.streams.state.internals.RecordConverters.rawValueToSessionHeadersValue;
 import static 
org.apache.kafka.streams.state.internals.RecordConverters.rawValueToTimestampedValue;
 import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertNull;
@@ -35,6 +36,7 @@ public class RecordConvertersTest {
 
     private final RecordConverter timestampedValueConverter = 
rawValueToTimestampedValue();
     private final RecordConverter headersValueConverter = 
rawValueToHeadersValue();
+    private final RecordConverter sessionValueConverter = 
rawValueToSessionHeadersValue();
 
 
     @Test
@@ -42,6 +44,7 @@ public class RecordConvertersTest {
         final ConsumerRecord<byte[], byte[]> nullValueRecord = new 
ConsumerRecord<>("", 0, 0L, new byte[0], null);
         assertNull(timestampedValueConverter.convert(nullValueRecord).value());
         assertNull(headersValueConverter.convert(nullValueRecord).value());
+        assertNull(sessionValueConverter.convert(nullValueRecord).value());
     }
 
     @Test
@@ -66,9 +69,25 @@ public class RecordConvertersTest {
             headers, Optional.empty());
         // Expected format: 
[headersSize(varint)][headersBytes][timestamp(8)][value]
         final byte[] expectedValue =
-            {50, 2, 20, 104, 101, 97, 100, 101, 114, 45, 107, 101, 121, 24, 
104, 101, 97, 100, 101,
-                114, 45, 118, 97, 108, 117, 101, 0, 0, 0, 0, 0, 0, 0, 10, 0};
+            {50, 2, 20, 'h', 'e', 'a', 'd', 'e', 'r', '-', 'k', 'e', 'y', 24, 
'h', 'e', 'a', 'd', 'e',
+                'r', '-', 'v', 'a', 'l', 'u', 'e', 0, 0, 0, 0, 0, 0, 0, 10, 
value[0]};
         final byte[] actualValue = 
headersValueConverter.convert(inputRecord).value();
         assertArrayEquals(expectedValue, actualValue);
     }
+
+    @Test
+    public void shouldAddHeadersToValueOnConversionWhenValueIsNotNull() {
+        final byte[] value = new byte[1];
+        final Headers headers = new RecordHeaders().add("header-key", 
"header-value".getBytes());
+        final ConsumerRecord<byte[], byte[]> inputRecord = new 
ConsumerRecord<>(
+            "topic", 1, 0, 0, TimestampType.CREATE_TIME, 0, 0, new byte[0], 
value,
+            headers, Optional.empty());
+        // Expected format: [headersSize(varint)][headersBytes][value]
+        final byte[] expectedValue =
+            {50, 2, 20, 'h', 'e', 'a', 'd', 'e', 'r', '-', 'k', 'e', 'y', 24, 
'h', 'e', 'a', 'd', 'e',
+                'r', '-', 'v', 'a', 'l', 'u', 'e', value[0]};
+        final byte[] actualValue = 
sessionValueConverter.convert(inputRecord).value();
+        assertArrayEquals(expectedValue, actualValue);
+    }
+
 }


Reply via email to