This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 4.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.3 by this push:
     new 000d82c0c14 KAFKA-20249: Optimize raw value extraction in 
headers-aware deserializers (#21706)
000d82c0c14 is described below

commit 000d82c0c14a4acbfaec78b8c2f03dd19d2e47ee
Author: Zheguang Zhao <[email protected]>
AuthorDate: Sat Mar 21 10:08:43 2026 +1100

    KAFKA-20249: Optimize raw value extraction in headers-aware deserializers 
(#21706)
    
    This patch implements two optimizations, and their JMH benchmarks, and
    opportunistic refactoring.
    
    1. Skipping headers
    Previously the raw value extraction in headers-aware deserializers
    undergoes deserialization and/or copying of headers, while only skipping
    is required.   This happens in one case for both empty and nonempty
    headers.
    
    2. Extracting value / timestamp after empty headers
    Empty headers have constant metadata footprint: the headers size is
    varint-encoded 1 byte of 0, and headers themselves consume no bytes.
    Based on this invariant, the ByteBuffer-based extraction can be replaced
    with a direct `System.arraycopy`, which is a Java native method
    optimized for specific platforms.
    
    The optimized headers-aware extraction methods:
    - rawAggregation
    - rawTimestampedValue
    - rawValue / rawPlainValue
    - ValueTimestampHeadersDeserializer.timestamp
    - ValueTimestampHeadersDeserializer.headers
    - AggregationWithHeadersDeserializer.headers
    
    **Benchmark:**
    This patch also includes JMH benchmarks to test the speedup.  On my
    local machine, Optimization 1 speedup is 2-6x speedup.  Optimization 2
    is 1.2-1.3x.
    
    Below is the throughput comparison of a recorded JMH run (higher score
    is better):
    ```
    Benchmark
    Mode  Cnt      Score      Error  Units
    RawBytesExtractionBenchmark.testHeadersWithoutHeaders
    thrpt   15  10158.764 ±   85.564  ops/s
    RawBytesExtractionBenchmark.testHeadersWithoutHeadersOpt
    thrpt   15  14824.176 ± 1244.455  ops/s
    
    RawBytesExtractionBenchmark.testRawAggregationWithHeaders thrpt   15
    1473.459 ±    7.170  ops/s
    RawBytesExtractionBenchmark.testRawAggregationWithHeadersOpt thrpt   15
    11618.187 ±  235.385  ops/s
    
    RawBytesExtractionBenchmark.testRawAggregationWithoutHeaders thrpt   15
    8337.728 ±  199.919  ops/s
    RawBytesExtractionBenchmark.testRawAggregationWithoutHeadersOpt thrpt
    15  14564.899 ±  186.405  ops/s
    
    RawBytesExtractionBenchmark.testRawTimestampedValueWithoutHeaders thrpt
    15  10217.292 ±  108.552  ops/s
    RawBytesExtractionBenchmark.testRawTimestampedValueWithoutHeadersOpt
    thrpt   15  12121.074 ±  201.235  ops/s
    
    RawBytesExtractionBenchmark.testRawValueWithoutHeaders thrpt   15
    11632.484 ±  138.505  ops/s
    RawBytesExtractionBenchmark.testRawValueWithoutHeadersOpt thrpt   15
    14669.563 ±   43.458  ops/s
    
    RawBytesExtractionBenchmark.testTimestampWithoutHeaders
    thrpt   15  14858.778 ±   39.301  ops/s
    RawBytesExtractionBenchmark.testTimestampWithoutHeadersOpt
    thrpt   15  19832.718 ±  916.980  ops/s
    JMH benchmarks done
    ```
    
    **Test:**
    - `AggregationWithHeadersDeserializer.rawAggregate`
      - empty headers:
    
    `SessionToHeadersStoreAdapterTest.shouldStripHeadersFromRawAggregationValue`
    - `Utils.rawPlainValue`
      - empty headers: `UtilsTest.shouldExtractRawValueWithEmptyHeaders`
      - empty headers, no timestamp:
    `UtilsTest.testRawPlainValueWithEmptyHeadersAndInvalidTimestamp`
    - `Utils.rawTimestampedValue`
      - empty headers: `UtilsTest.testRawTimestampedValueWithEmptyHeaders`
      - empty headers, no timestamp:
    `UtilsTest.testRawTimestampedValueWithEmptyHeadersAndInvalidTimestamp`
    - `Utils.hasEmptyHeadersAndTimestamp`
      - min size violation
      - empty headers
      - non-empty headers
    
    **Refactor**
    - point all calls to raw value (with timestamp and headers) extraction
    to common one in Utils.
    
    Reviewers: Alieh Saeedi <[email protected]>, TengYao Chi
    <[email protected]>, Matthias J. Sax <[email protected]>
---
 .../jmh/streams/RawBytesExtractionBenchmark.java   | 221 ++++++++++++++++++++-
 .../kafka/streams/state/HeadersBytesStore.java     |   8 +-
 .../AggregationWithHeadersDeserializer.java        |  35 +---
 .../ChangeLoggingSessionBytesStoreWithHeaders.java |   4 +-
 ...ngTimestampedKeyValueBytesStoreWithHeaders.java |  14 +-
 ...gingTimestampedWindowBytesStoreWithHeaders.java |   8 +-
 ...MeteredTimestampedKeyValueStoreWithHeaders.java |   1 -
 .../MeteredTimestampedWindowStoreWithHeaders.java  |   1 -
 .../internals/SessionToHeadersStoreAdapter.java    |   2 +-
 .../kafka/streams/state/internals/Utils.java       | 149 +++++++++++---
 .../ValueTimestampHeadersDeserializer.java         |  50 +----
 .../AggregationWithHeadersDeserializerTest.java    |  33 ++-
 ...ocksDBMigratingSessionStoreWithHeadersTest.java |   6 +-
 .../SessionToHeadersStoreAdapterTest.java          |   4 +-
 .../TimeOrderedSessionStoreUpgradeTest.java        |  28 +--
 .../kafka/streams/state/internals/UtilsTest.java   |  65 +++++-
 .../ValueTimestampHeadersDeserializerTest.java     |  28 ++-
 17 files changed, 485 insertions(+), 172 deletions(-)

diff --git 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/streams/RawBytesExtractionBenchmark.java
 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/streams/RawBytesExtractionBenchmark.java
index f0f50f81955..6fca76f18eb 100644
--- 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/streams/RawBytesExtractionBenchmark.java
+++ 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/streams/RawBytesExtractionBenchmark.java
@@ -17,8 +17,13 @@
 
 package org.apache.kafka.jmh.streams;
 
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.utils.ByteUtils;
 import org.apache.kafka.streams.state.HeadersBytesStore;
 import org.apache.kafka.streams.state.StateSerdes;
+import org.apache.kafka.streams.state.internals.Utils;
 
 import org.openjdk.jmh.annotations.Benchmark;
 import org.openjdk.jmh.annotations.CompilerControl;
@@ -41,6 +46,7 @@ import java.util.concurrent.TimeUnit;
 @Measurement(iterations = 5, time = 1)
 public class RawBytesExtractionBenchmark {
     private static final int DATA_SET_SAMPLE_SIZE = 16384;
+    private static final LongDeserializer LONG_DESERIALIZER = new 
LongDeserializer();
 
     @State(Scope.Benchmark)
     public static class IterationStateForValues {
@@ -52,11 +58,25 @@ public class RawBytesExtractionBenchmark {
     }
 
     @State(Scope.Benchmark)
-    public static class IterationStateForEmptyHeadersTimestamp extends 
IterationStateForValues {
+    public static class IterationStateForEmptyHeaders extends 
IterationStateForValues {
         @Setup(Level.Iteration)
         public void setup() {
             this.values = new byte[DATA_SET_SAMPLE_SIZE][];
             for (int i = 0; i < DATA_SET_SAMPLE_SIZE; i++) {
+                values[i] = new byte[1 + 8];
+                final ByteBuffer buf = ByteBuffer.wrap(values[i]);
+                buf.put((byte) 0x00); // header size
+                buf.putLong((long) i); // non-header payload
+            }
+        }
+    }
+
+    @State(Scope.Benchmark)
+    public static class IterationStateForEmptyHeadersTimestamp extends 
IterationStateForValues {
+        @Setup(Level.Iteration)
+        public void setup() {
+            this.values = new byte[DATA_SET_SAMPLE_SIZE][];
+            for (int i = 0; i < DATA_SET_SAMPLE_SIZE; i++) { 
                 values[i] = new byte[1 + StateSerdes.TIMESTAMP_SIZE + 8];
                 final ByteBuffer buf = ByteBuffer.wrap(values[i]);
                 buf.put((byte) 0x00); // header size
@@ -66,6 +86,122 @@ public class RawBytesExtractionBenchmark {
         }
     }
 
+    @State(Scope.Benchmark)
+    public static class IterationStateForHeaders extends 
IterationStateForValues {
+        @Setup(Level.Iteration)
+        public void setup() {
+            this.values = new byte[DATA_SET_SAMPLE_SIZE][];
+            for (int i = 0; i < DATA_SET_SAMPLE_SIZE; i++) {
+                values[i] = new byte[1 + 1 + (1 + 4) + (1 + 4) + 8];
+                final ByteBuffer buf = ByteBuffer.wrap(values[i]);
+                ByteUtils.writeVarint(11, buf);  // 1-byte header size of 11
+                ByteUtils.writeVarint(1, buf);  // 1-byte header count of 1
+                ByteUtils.writeVarint(4, buf);  // 1-byte header key size
+                buf.putInt(i + 1); // 4-byte header key
+                ByteUtils.writeVarint(4, buf);  // 1-byte header value size
+                buf.putInt(i + 1); // 4-byte header value
+                buf.putLong((long) i + 1); // plain value
+            }
+        }
+    }
+
+    
+    @Benchmark
+    @CompilerControl(CompilerControl.Mode.DONT_INLINE)
+    public void testRawAggregationWithoutHeaders(IterationStateForEmptyHeaders 
state, Blackhole bh) {
+        for (byte[] randomValue : state.getRandomValues()) {
+            bh.consume(rawAggregationPre20249(randomValue));
+        }
+    }
+
+    @Benchmark
+    @CompilerControl(CompilerControl.Mode.DONT_INLINE)
+    public void 
testRawAggregationWithoutHeadersOpt(IterationStateForEmptyHeaders state, 
Blackhole bh) {
+        for (byte[] randomValue : state.getRandomValues()) {
+            bh.consume(Utils.rawAggregation(randomValue));
+        }
+    }
+
+    @Benchmark
+    @CompilerControl(CompilerControl.Mode.DONT_INLINE)
+    public void testRawAggregationWithHeaders(IterationStateForHeaders state, 
Blackhole bh) {
+        for (byte[] randomValue : state.getRandomValues()) {
+            bh.consume(rawAggregationPre20249(randomValue));
+        }
+    }
+
+    @Benchmark
+    @CompilerControl(CompilerControl.Mode.DONT_INLINE)
+    public void testRawAggregationWithHeadersOpt(IterationStateForHeaders 
state, Blackhole bh) {
+        for (byte[] randomValue : state.getRandomValues()) {
+            bh.consume(Utils.rawAggregation(randomValue));
+        }
+    }
+
+    @Benchmark
+    @CompilerControl(CompilerControl.Mode.DONT_INLINE)
+    public void 
testRawValueWithoutHeaders(IterationStateForEmptyHeadersTimestamp state, 
Blackhole bh) {
+        for (byte[] randomValue : state.getRandomValues()) {
+            bh.consume(rawValuePre20249(randomValue));
+        }
+    }
+
+    @Benchmark
+    @CompilerControl(CompilerControl.Mode.DONT_INLINE)
+    public void 
testRawValueWithoutHeadersOpt(IterationStateForEmptyHeadersTimestamp state, 
Blackhole bh) {
+        for (byte[] randomValue : state.getRandomValues()) {
+            bh.consume(Utils.rawPlainValue(randomValue));
+        }
+    }
+
+    @Benchmark
+    @CompilerControl(CompilerControl.Mode.DONT_INLINE)
+    public void 
testRawTimestampedValueWithoutHeaders(IterationStateForEmptyHeadersTimestamp 
state, Blackhole bh) {
+        for (byte[] randomValue : state.getRandomValues()) {
+            bh.consume(rawTimestampedValuePre20249(randomValue));
+        }
+    }
+
+    @Benchmark
+    @CompilerControl(CompilerControl.Mode.DONT_INLINE)
+    public void 
testRawTimestampedValueWithoutHeadersOpt(IterationStateForEmptyHeadersTimestamp 
state, Blackhole bh) {
+        for (byte[] randomValue : state.getRandomValues()) {
+            bh.consume(Utils.rawTimestampedValue(randomValue));
+        }
+    }
+
+    @Benchmark
+    @CompilerControl(CompilerControl.Mode.DONT_INLINE)
+    public void testHeadersWithoutHeaders(IterationStateForEmptyHeaders state, 
Blackhole bh) {
+        for (byte[] randomValue : state.getRandomValues()) {
+            bh.consume(headersPre20249(randomValue));
+        }
+    }
+
+    @Benchmark
+    @CompilerControl(CompilerControl.Mode.DONT_INLINE)
+    public void testHeadersWithoutHeadersOpt(IterationStateForEmptyHeaders 
state, Blackhole bh) {
+        for (byte[] randomValue : state.getRandomValues()) {
+            bh.consume(Utils.headers(randomValue));
+        }
+    }
+
+    @Benchmark
+    @CompilerControl(CompilerControl.Mode.DONT_INLINE)
+    public void 
testTimestampWithoutHeaders(IterationStateForEmptyHeadersTimestamp state, 
Blackhole bh) {
+        for (byte[] randomValue : state.getRandomValues()) {
+            bh.consume(timestampPre20249(randomValue));
+        }
+    }
+
+    @Benchmark
+    @CompilerControl(CompilerControl.Mode.DONT_INLINE)
+    public void 
testTimestampWithoutHeadersOpt(IterationStateForEmptyHeadersTimestamp state, 
Blackhole bh) {
+        for (byte[] randomValue : state.getRandomValues()) {
+            bh.consume(Utils.timestamp(randomValue));
+        }
+    }
+
     @Benchmark
     @CompilerControl(CompilerControl.Mode.DONT_INLINE)
     public void 
testConvertToHeaderFormat(IterationStateForEmptyHeadersTimestamp state, 
Blackhole bh) {
@@ -101,4 +237,87 @@ public class RawBytesExtractionBenchmark {
             .put(valueAndTimestamp)
             .array();
     }
+
+    /**
+     * Prior to KAFKA-20249: AggregationWithHeadersDeserializer - Extract the 
raw aggregation bytes from 
+     * serialized AggregationWithHeaders, stripping the headers prefix. 
+     */
+    private static byte[] rawAggregationPre20249(final byte[] 
aggregationWithHeaders) {
+        if (aggregationWithHeaders == null) {
+            return null;
+        }
+
+        final ByteBuffer buffer = ByteBuffer.wrap(aggregationWithHeaders);
+        Utils.readHeaders(buffer); 
+        return Utils.readBytes(buffer, buffer.remaining());
+    }
+
+    /**
+     * Prior to KAFKA-20249: ValueAndTimestampDeserializer - Extract raw value 
from serialized 
+     * ValueTimestampHeaders.
+     */
+    private static byte[] rawValuePre20249(final byte[] 
rawValueTimestampHeaders) {
+        if (rawValueTimestampHeaders == null) {
+            return null;
+        }
+
+        final ByteBuffer buffer = ByteBuffer.wrap(rawValueTimestampHeaders);
+        final int headersSize = ByteUtils.readVarint(buffer);
+        buffer.position(buffer.position() + headersSize + Long.BYTES);
+        return Utils.readBytes(buffer, buffer.remaining());
+    }
+
+    /**
+     * Prior to KAFKA-20249: Extract raw timestamped value (timestamp + value) 
from serialized ValueTimestampHeaders.
+     * This strips the headers portion but keeps timestamp and value intact.
+     *
+     * Format conversion:
+     * Input:  [headersSize(varint)][headers][timestamp(8)][value]
+     * Output: [timestamp(8)][value]
+     */
+    private static byte[] rawTimestampedValuePre20249(final byte[] 
rawValueTimestampHeaders) {
+        if (rawValueTimestampHeaders == null) {
+            return null;
+        }
+
+        final ByteBuffer buffer = ByteBuffer.wrap(rawValueTimestampHeaders);
+        final int headersSize = ByteUtils.readVarint(buffer);
+        // Skip headers, keep timestamp + value
+        if (headersSize < 0 || headersSize > buffer.remaining() || 
buffer.remaining() - headersSize < StateSerdes.TIMESTAMP_SIZE) {
+            throw new SerializationException(
+                "Invalid format: headers size " + headersSize + 
+                ", timestamp expected size " + StateSerdes.TIMESTAMP_SIZE + 
+                ", but buffer size " + buffer.remaining()
+            );
+        }
+        buffer.position(buffer.position() + headersSize);
+
+        final byte[] result = new byte[buffer.remaining()];
+        buffer.get(result);
+        return result;
+    }
+
+    /**
+     * Prior to KAFKA-20249 - AggregationWithHeadersDeserializer - Extract 
headers from serialized AggregationWithHeaders
+     */
+    private static Headers headersPre20249(final byte[] 
rawAggregationWithHeaders) {
+        if (rawAggregationWithHeaders == null) {
+            return null;
+        }
+
+        final ByteBuffer buffer = ByteBuffer.wrap(rawAggregationWithHeaders);
+        return Utils.readHeaders(buffer);
+    }
+
+    /**
+     * Prior to KAFKA-20249 - ValueTimestampHeadersDeserializer - Extract 
timestamp from serialized ValueTimestampHeaders.
+     */
+    private static long timestampPre20249(final byte[] 
rawValueTimestampHeaders) {
+        final ByteBuffer buffer = ByteBuffer.wrap(rawValueTimestampHeaders);
+        final int headersSize = ByteUtils.readVarint(buffer);
+        buffer.position(buffer.position() + headersSize);
+
+        final byte[] rawTimestamp = Utils.readBytes(buffer, Long.BYTES);
+        return LONG_DESERIALIZER.deserialize("", rawTimestamp);
+    }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/HeadersBytesStore.java 
b/streams/src/main/java/org/apache/kafka/streams/state/HeadersBytesStore.java
index 8dd74fd98ee..8d20b1a9705 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/HeadersBytesStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/HeadersBytesStore.java
@@ -54,10 +54,10 @@ public interface HeadersBytesStore {
         //   headersSize = varint(0) = [0x00]
         //   headersBytes = [] (empty, 0 bytes)
         // Result: [0x00][payload]
-        final byte[] res = new byte[1 + value.length];
-        // res[0] is initialized to 0x00 per Java Specification
-        System.arraycopy(value, 0, res, 1, value.length);
-        return res;
+        final byte[] valueHeaders = new byte[1 + value.length];
+        // valueHeaders[0] is initialized to 0x00 per Java Specification
+        System.arraycopy(value, 0, valueHeaders, 1, value.length);
+        return valueHeaders;
     }
 
     static byte[] convertFromPlainToHeaderFormat(final byte[] value) {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AggregationWithHeadersDeserializer.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AggregationWithHeadersDeserializer.java
index 1c4e5e0827b..c53606c1cc8 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AggregationWithHeadersDeserializer.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AggregationWithHeadersDeserializer.java
@@ -18,7 +18,6 @@ package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.utils.ByteUtils;
 import org.apache.kafka.streams.kstream.internals.WrappingNullableDeserializer;
 import org.apache.kafka.streams.processor.internals.SerdeGetter;
 import org.apache.kafka.streams.state.AggregationWithHeaders;
@@ -65,7 +64,7 @@ class AggregationWithHeadersDeserializer<AGG> implements 
WrappingNullableDeseria
         }
 
         final ByteBuffer buffer = ByteBuffer.wrap(aggregationWithHeaders);
-        final Headers headers = readHeaders(buffer);
+        final Headers headers = Utils.readHeaders(buffer);
         final byte[] rawAggregation = readBytes(buffer, buffer.remaining());
         final AGG aggregation = aggregationDeserializer.deserialize(topic, 
headers, rawAggregation);
 
@@ -81,36 +80,4 @@ class AggregationWithHeadersDeserializer<AGG> implements 
WrappingNullableDeseria
     public void setIfUnset(final SerdeGetter getter) {
         initNullableDeserializer(aggregationDeserializer, getter);
     }
-
-    /**
-     * Extract headers from serialized AggregationWithHeaders.
-     */
-    static Headers headers(final byte[] rawAggregationWithHeaders) {
-        if (rawAggregationWithHeaders == null) {
-            return null;
-        }
-
-        final ByteBuffer buffer = ByteBuffer.wrap(rawAggregationWithHeaders);
-        return readHeaders(buffer);
-    }
-
-    /**
-     * Extract the raw aggregation bytes from serialized 
AggregationWithHeaders,
-     * stripping the headers prefix.
-     */
-    static byte[] rawAggregation(final byte[] aggregationWithHeaders) {
-        if (aggregationWithHeaders == null) {
-            return null;
-        }
-
-        final ByteBuffer buffer = ByteBuffer.wrap(aggregationWithHeaders);
-        readHeaders(buffer);
-        return readBytes(buffer, buffer.remaining());
-    }
-
-    private static Headers readHeaders(final ByteBuffer buffer) {
-        final int headersSize = ByteUtils.readVarint(buffer);
-        final byte[] rawHeaders = readBytes(buffer, headersSize);
-        return HeadersDeserializer.deserialize(rawHeaders);
-    }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreWithHeaders.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreWithHeaders.java
index 1f8bf204dbf..4f076ada623 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreWithHeaders.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreWithHeaders.java
@@ -20,8 +20,8 @@ import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.state.SessionStore;
 
-import static 
org.apache.kafka.streams.state.internals.AggregationWithHeadersDeserializer.headers;
-import static 
org.apache.kafka.streams.state.internals.AggregationWithHeadersDeserializer.rawAggregation;
+import static org.apache.kafka.streams.state.internals.Utils.headers;
+import static org.apache.kafka.streams.state.internals.Utils.rawAggregation;
 
 /**
  * Change-logging wrapper for a session bytes store whose values also carry 
headers.
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStoreWithHeaders.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStoreWithHeaders.java
index bce5dbe0130..68ab181d725 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStoreWithHeaders.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStoreWithHeaders.java
@@ -22,9 +22,9 @@ import org.apache.kafka.streams.state.KeyValueStore;
 
 import java.util.List;
 
-import static 
org.apache.kafka.streams.state.internals.ValueTimestampHeadersDeserializer.headers;
-import static 
org.apache.kafka.streams.state.internals.ValueTimestampHeadersDeserializer.rawValue;
-import static 
org.apache.kafka.streams.state.internals.ValueTimestampHeadersDeserializer.timestamp;
+import static org.apache.kafka.streams.state.internals.Utils.headers;
+import static org.apache.kafka.streams.state.internals.Utils.rawPlainValue;
+import static org.apache.kafka.streams.state.internals.Utils.timestamp;
 
 /**
  * Change-logging wrapper for a timestamped key-value bytes store whose values 
also carry headers.
@@ -50,7 +50,7 @@ public class 
ChangeLoggingTimestampedKeyValueBytesStoreWithHeaders
         wrapped().put(key, valueTimestampHeaders);
         log(
             key,
-            rawValue(valueTimestampHeaders),
+            rawPlainValue(valueTimestampHeaders),
             valueTimestampHeaders == null
                 ? internalContext.recordContext().timestamp()
                 : timestamp(valueTimestampHeaders),
@@ -68,7 +68,7 @@ public class 
ChangeLoggingTimestampedKeyValueBytesStoreWithHeaders
             // then it was absent
             log(
                 key,
-                rawValue(valueTimestampHeaders),
+                rawPlainValue(valueTimestampHeaders),
                 valueTimestampHeaders == null
                     ? internalContext.recordContext().timestamp()
                     : timestamp(valueTimestampHeaders),
@@ -87,7 +87,7 @@ public class 
ChangeLoggingTimestampedKeyValueBytesStoreWithHeaders
             final byte[] valueTimestampHeaders = entry.value;
             log(
                 entry.key,
-                rawValue(valueTimestampHeaders),
+                rawPlainValue(valueTimestampHeaders),
                 valueTimestampHeaders == null
                     ? internalContext.recordContext().timestamp()
                     : timestamp(valueTimestampHeaders),
@@ -104,4 +104,4 @@ public class 
ChangeLoggingTimestampedKeyValueBytesStoreWithHeaders
         log(key, null, internalContext.recordContext().timestamp(), 
internalContext.recordContext().headers());
         return deletedValue;
     }
-}
\ No newline at end of file
+}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStoreWithHeaders.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStoreWithHeaders.java
index baf99ecc344..483c5775344 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStoreWithHeaders.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStoreWithHeaders.java
@@ -19,9 +19,9 @@ package org.apache.kafka.streams.state.internals;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.state.WindowStore;
 
-import static 
org.apache.kafka.streams.state.internals.ValueTimestampHeadersDeserializer.headers;
-import static 
org.apache.kafka.streams.state.internals.ValueTimestampHeadersDeserializer.rawValue;
-import static 
org.apache.kafka.streams.state.internals.ValueTimestampHeadersDeserializer.timestamp;
+import static org.apache.kafka.streams.state.internals.Utils.headers;
+import static org.apache.kafka.streams.state.internals.Utils.rawPlainValue;
+import static org.apache.kafka.streams.state.internals.Utils.timestamp;
 
 /**
  * Change-logging wrapper for window stores that support headers.
@@ -47,7 +47,7 @@ public class 
ChangeLoggingTimestampedWindowBytesStoreWithHeaders extends ChangeL
         internalContext.logChange(
             name(),
             key,
-            rawValue(valueTimestampHeaders),
+            rawPlainValue(valueTimestampHeaders),
             valueTimestampHeaders != null
                 ? timestamp(valueTimestampHeaders)
                 : internalContext.recordContext().timestamp(),
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java
index f77471ad905..e93aad8c8f2 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java
@@ -52,7 +52,6 @@ import java.util.function.Function;
 import static org.apache.kafka.common.utils.Utils.mkEntry;
 import static org.apache.kafka.common.utils.Utils.mkMap;
 import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency;
-import static 
org.apache.kafka.streams.state.internals.ValueTimestampHeadersDeserializer.headers;
 
 
 /**
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreWithHeaders.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreWithHeaders.java
index e88ec3398d5..74aafc87bd4 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreWithHeaders.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreWithHeaders.java
@@ -46,7 +46,6 @@ import java.util.Objects;
 import java.util.function.Function;
 
 import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency;
-import static 
org.apache.kafka.streams.state.internals.ValueTimestampHeadersDeserializer.headers;
 
 /**
  * A Metered {@link TimestampedWindowStoreWithHeaders} wrapper that is used 
for recording operation metrics,
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionToHeadersStoreAdapter.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionToHeadersStoreAdapter.java
index 4771a86a5f9..db753e52bbe 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionToHeadersStoreAdapter.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionToHeadersStoreAdapter.java
@@ -138,7 +138,7 @@ public class SessionToHeadersStoreAdapter implements 
SessionStore<Bytes, byte[]>
 
     @Override
     public void put(final Windowed<Bytes> sessionKey, final byte[] 
aggregateWithHeader) {
-        store.put(sessionKey, 
AggregationWithHeadersDeserializer.rawAggregation(aggregateWithHeader));
+        store.put(sessionKey, Utils.rawAggregation(aggregateWithHeader));
     }
 
     @Override
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/Utils.java 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/Utils.java
index abaa2b16376..c67bfc8dfbe 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/Utils.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/Utils.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.state.internals;
 import org.apache.kafka.common.errors.SerializationException;
 import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.serialization.LongDeserializer;
 import org.apache.kafka.common.utils.ByteUtils;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.kstream.Windowed;
@@ -27,6 +28,88 @@ import org.apache.kafka.streams.state.StateSerdes;
 import java.nio.ByteBuffer;
 
 public class Utils {
+    private static final LongDeserializer LONG_DESERIALIZER = new 
LongDeserializer();
+
+    /**
+     * Check if the input value (with timestamp or not) contains headers size 
equal to zero
+     */
+    public static boolean hasEmptyHeaders(final byte[] rawValueHeaders) {
+        return rawValueHeaders.length > 0 && rawValueHeaders[0] == 0x00;
+    }
+
+    /**
+     * Extract headers from serialized value (with timestamp or not) with 
headers
+     */
+    public static Headers headers(final byte[] valueWithHeaders) {
+        if (valueWithHeaders == null) {
+            return null;
+        }
+
+        // If the header is empty, simply return it
+        if (hasEmptyHeaders(valueWithHeaders)) {
+            return new RecordHeaders();
+        }
+
+        final ByteBuffer buffer = ByteBuffer.wrap(valueWithHeaders);
+        return readHeaders(buffer);
+    }
+
+    /**
+     * Serialize the key with headers into bytes
+     * @param key the key to serialize
+     * @param headers the Headers as context
+     * @param serdes the StateSerdes as serializer
+     * @return the Bytes of the key
+     */
+    public static <K> Bytes keyBytes(final K key, final Headers headers, final 
StateSerdes<K, ?> serdes) {
+        return Bytes.wrap(serdes.rawKey(key, headers));
+    }
+
+    /**
+     * Serialize the key into bytes
+     * @param key the key to serialize
+     * @param serdes the StateSerdes as serializer
+     * @return the Bytes of the key
+     */
+    static <K> Bytes keyBytes(final K key, final StateSerdes<K, ?> serdes) {
+        return keyBytes(key, new RecordHeaders(), serdes);
+    }
+
+    /**
+     * Serialize the session key with headers into bytes
+     * @param sessionKey the Windowed session key to serialize
+     * @param headers the Headers as context
+     * @param serdes the StateSerdes as serializer
+     * @return the Bytes of the key
+     */
+    static <K> Bytes keyBytes(final Windowed<K> sessionKey, final Headers 
headers, final StateSerdes<K, ?> serdes) {
+        return keyBytes(sessionKey.key(), headers, serdes);
+    }
+
+    /**
+     * Extract the raw aggregation bytes from serialized 
AggregationWithHeaders,
+     * stripping the headers prefix.
+     */
+    public static byte[] rawAggregation(final byte[] aggregationWithHeaders) {
+        if (aggregationWithHeaders == null) {
+            return null;
+        }
+
+        // If the header is empty, then copy the value bytes directly
+        if (hasEmptyHeaders(aggregationWithHeaders)) {
+            // Strip header size's varint byte, and empty headers consume no 
bytes
+            final byte[] aggregation = new byte[aggregationWithHeaders.length 
- 1]; 
+            System.arraycopy(aggregationWithHeaders, 1, aggregation, 0, 
aggregation.length);
+            return aggregation;
+        }
+
+        final ByteBuffer buffer = ByteBuffer.wrap(aggregationWithHeaders);
+        // Skip the headers bytes without deserializing or copying
+        final int headersSize = ByteUtils.readVarint(buffer);
+        buffer.position(buffer.position() + headersSize); 
+        return readBytes(buffer, buffer.remaining());
+    }
+
     /**
      * Extract raw plain value from serialized ValueTimestampHeaders.
      * This strips both the headers and timestamp portions.
@@ -35,11 +118,19 @@ public class Utils {
      * Input:  [headersSize(varint)][headers][timestamp(8)][value]
      * Output: [value]
      */
-    static byte[] rawPlainValue(final byte[] rawValueTimestampHeaders) {
+    public static byte[] rawPlainValue(final byte[] rawValueTimestampHeaders) {
         if (rawValueTimestampHeaders == null) {
             return null;
         }
 
+        // If the header is empty, then copy the value bytes directly
+        if (hasEmptyHeaders(rawValueTimestampHeaders)) {
+            // Strip header size (varint 1 byte), empty headers (no bytes), 
and timestamp
+            final byte[] rawValue = new byte[rawValueTimestampHeaders.length - 
1 - StateSerdes.TIMESTAMP_SIZE]; 
+            System.arraycopy(rawValueTimestampHeaders, 1 + 
StateSerdes.TIMESTAMP_SIZE, rawValue, 0, rawValue.length);
+            return rawValue;
+        }
+
         final ByteBuffer buffer = ByteBuffer.wrap(rawValueTimestampHeaders);
         final int headersSize = ByteUtils.readVarint(buffer);
         // Skip headers and timestamp (8 bytes)
@@ -58,11 +149,19 @@ public class Utils {
      * Input:  [headersSize(varint)][headers][timestamp(8)][value]
      * Output: [timestamp(8)][value]
      */
-    static byte[] rawTimestampedValue(final byte[] rawValueTimestampHeaders) {
+    public static byte[] rawTimestampedValue(final byte[] 
rawValueTimestampHeaders) {
         if (rawValueTimestampHeaders == null) {
             return null;
         }
 
+        // If the header is empty, then copy the value and timestamp bytes 
directly
+        if (hasEmptyHeaders(rawValueTimestampHeaders)) {
+            // Strip header size (varint 1 byte), empty headers (no bytes)
+            final byte[] rawValueAndTimestamp = new 
byte[rawValueTimestampHeaders.length - 1]; 
+            System.arraycopy(rawValueTimestampHeaders, 1, 
rawValueAndTimestamp, 0, rawValueAndTimestamp.length);
+            return rawValueAndTimestamp;
+        }
+
         final ByteBuffer buffer = ByteBuffer.wrap(rawValueTimestampHeaders);
         final int headersSize = ByteUtils.readVarint(buffer);
         // Skip headers, keep timestamp + value
@@ -88,7 +187,7 @@ public class Utils {
      * @return the byte array containing the read bytes
      * @throws SerializationException if buffer doesn't have enough bytes or 
length is negative
      */
-    static byte[] readBytes(final ByteBuffer buffer, final int length) {
+    public static byte[] readBytes(final ByteBuffer buffer, final int length) {
         if (length < 0) {
             throw new SerializationException(
                 "Invalid format: negative length " + length
@@ -105,35 +204,29 @@ public class Utils {
         return bytes;
     }
 
-    /**
-     * Serialize the key with headers into bytes
-     * @param key the key to serialize
-     * @param headers the Headers as context
-     * @param serdes the StateSerdes as serializer
-     * @return the Bytes of the key
-     */
-    public static <K> Bytes keyBytes(final K key, final Headers headers, final 
StateSerdes<K, ?> serdes) {
-        return Bytes.wrap(serdes.rawKey(key, headers));
+    public static Headers readHeaders(final ByteBuffer buffer) {
+        final int headersSize = ByteUtils.readVarint(buffer);
+        final byte[] rawHeaders = readBytes(buffer, headersSize);
+        return HeadersDeserializer.deserialize(rawHeaders);
     }
 
     /**
-     * Serialize the key into bytes
-     * @param key the key to serialize
-     * @param serdes the StateSerdes as serializer
-     * @return the Bytes of the key
+     * Extract timestamp from serialized ValueTimestampHeaders.
      */
-    static <K> Bytes keyBytes(final K key, final StateSerdes<K, ?> serdes) {
-        return keyBytes(key, new RecordHeaders(), serdes);
-    }
+    public static long timestamp(final byte[] rawValueTimestampHeaders) {
+        // If the headers is empty, then do not need to skip the headers
+        if (hasEmptyHeaders(rawValueTimestampHeaders)) {
+            final byte[] rawTimestamp = new byte[StateSerdes.TIMESTAMP_SIZE];
+            System.arraycopy(rawValueTimestampHeaders, 1, rawTimestamp, 0, 
StateSerdes.TIMESTAMP_SIZE);
+            return LONG_DESERIALIZER.deserialize("", rawTimestamp);
+        }
 
-    /**
-     * Serialize the session key with headers into bytes
-     * @param sessionKey the Windowed session key to serialize
-     * @param headers the Headers as context
-     * @param serdes the StateSerdes as serializer
-     * @return the Bytes of the key
-     */
-    static <K> Bytes keyBytes(final Windowed<K> sessionKey, final Headers 
headers, final StateSerdes<K, ?> serdes) {
-        return keyBytes(sessionKey.key(), headers, serdes);
+        final ByteBuffer buffer = ByteBuffer.wrap(rawValueTimestampHeaders);
+        final int headersSize = ByteUtils.readVarint(buffer);
+        buffer.position(buffer.position() + headersSize);
+
+        final byte[] rawTimestamp = readBytes(buffer, Long.BYTES);
+        return LONG_DESERIALIZER.deserialize("", rawTimestamp);
     }
+
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueTimestampHeadersDeserializer.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueTimestampHeadersDeserializer.java
index 44e9a62d89e..a0f9acc4c9b 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueTimestampHeadersDeserializer.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueTimestampHeadersDeserializer.java
@@ -48,8 +48,6 @@ import static 
org.apache.kafka.streams.state.internals.Utils.readBytes;
  * This is used by KIP-1271 to deserialize values with timestamps and headers 
from state stores.
  */
 class ValueTimestampHeadersDeserializer<V> implements 
WrappingNullableDeserializer<ValueTimestampHeaders<V>, Void, V> {
-    private static final LongDeserializer LONG_DESERIALIZER = new 
LongDeserializer();
-
     public final Deserializer<V> valueDeserializer;
     private final LongDeserializer timestampDeserializer;
 
@@ -104,52 +102,6 @@ class ValueTimestampHeadersDeserializer<V> implements 
WrappingNullableDeserializ
         if (rawValueTimestampHeaders == null) {
             return null;
         }
-
-        final ByteBuffer buffer = ByteBuffer.wrap(rawValueTimestampHeaders);
-        final int headersSize = ByteUtils.readVarint(buffer);
-        // skip headers plus timestamp
-        buffer.position(buffer.position() + headersSize + Long.BYTES);
-        final byte[] bytes = readBytes(buffer, buffer.remaining());
-
-        return deserializer.deserialize("", bytes);
-    }
-
-    /**
-     * Extract timestamp from serialized ValueTimestampHeaders.
-     */
-    static long timestamp(final byte[] rawValueTimestampHeaders) {
-        final ByteBuffer buffer = ByteBuffer.wrap(rawValueTimestampHeaders);
-        final int headersSize = ByteUtils.readVarint(buffer);
-        buffer.position(buffer.position() + headersSize);
-
-        final byte[] rawTimestamp = readBytes(buffer, Long.BYTES);
-        return LONG_DESERIALIZER.deserialize("", rawTimestamp);
-    }
-
-    /**
-     * Extract headers from serialized ValueTimestampHeaders.
-     */
-    static Headers headers(final byte[] rawValueTimestampHeaders) {
-        if (rawValueTimestampHeaders == null) {
-            return null;
-        }
-
-        final ByteBuffer buffer = ByteBuffer.wrap(rawValueTimestampHeaders);
-        final int headersSize = ByteUtils.readVarint(buffer);
-        final byte[] rawHeaders = readBytes(buffer, headersSize);
-        return HeadersDeserializer.deserialize(rawHeaders);
-    }
-    /**
-     * Extract raw value from serialized ValueTimestampHeaders.
-     */
-    static byte[] rawValue(final byte[] rawValueTimestampHeaders) {
-        if (rawValueTimestampHeaders == null) {
-            return null;
-        }
-
-        final ByteBuffer buffer = ByteBuffer.wrap(rawValueTimestampHeaders);
-        final int headersSize = ByteUtils.readVarint(buffer);
-        buffer.position(buffer.position() + headersSize + Long.BYTES);
-        return readBytes(buffer, buffer.remaining());
+        return deserializer.deserialize("", 
Utils.rawPlainValue(rawValueTimestampHeaders));
     }
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AggregationWithHeadersDeserializerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AggregationWithHeadersDeserializerTest.java
index d517f7387a2..e301f6d5f19 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AggregationWithHeadersDeserializerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AggregationWithHeadersDeserializerTest.java
@@ -25,8 +25,12 @@ import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.state.AggregationWithHeaders;
 
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
 
 import java.util.Iterator;
+import java.util.stream.Stream;
 
 import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -94,26 +98,35 @@ public class AggregationWithHeadersDeserializerTest {
         assertThrows(SerializationException.class, () -> 
deserializer.deserialize("topic", invalidData));
     }
 
-    @Test
+    @ParameterizedTest
+    @MethodSource("headers")
     public void shouldExtractHeaders() {
         final Long aggregation = 100L;
         final Headers headers = new RecordHeaders();
         headers.add("key1", "value1".getBytes());
         final AggregationWithHeaders<Long> aggregationWithHeaders = 
AggregationWithHeaders.make(aggregation, headers);
 
-        final AggregationWithHeadersSerializer<Long> serializer = new 
AggregationWithHeadersSerializer<>(Serdes.Long().serializer());
-        final byte[] serialized = serializer.serialize("topic", 
aggregationWithHeaders);
+        try (final AggregationWithHeadersSerializer<Long> serializer = new 
AggregationWithHeadersSerializer<>(Serdes.Long().serializer())) {
+            final byte[] serialized = serializer.serialize("topic", 
aggregationWithHeaders);
 
-        final Headers extractedHeaders = 
AggregationWithHeadersDeserializer.headers(serialized);
-        assertNotNull(extractedHeaders);
+            final Headers extractedHeaders = Utils.headers(serialized);
+            assertNotNull(extractedHeaders);
+
+            final Header header = extractedHeaders.iterator().next();
+            assertEquals("key1", header.key());
+            assertArrayEquals("value1".getBytes(), header.value());
+        }
+    }
 
-        final Header header = extractedHeaders.iterator().next();
-        assertEquals("key1", header.key());
-        assertArrayEquals("value1".getBytes(), header.value());
+    private static Stream<Arguments> headers() {
+        return Stream.of(
+                new RecordHeaders().add("key1", "value1".getBytes()),
+                new RecordHeaders()
+            ).map(Arguments::of);
     }
 
     @Test
     public void shouldReturnNullForNullInput() {
-        assertNull(AggregationWithHeadersDeserializer.headers(null));
+        assertNull(Utils.headers(null));
     }
-}
\ No newline at end of file
+}
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBMigratingSessionStoreWithHeadersTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBMigratingSessionStoreWithHeadersTest.java
index 0f5bd979ee3..48ef00a8190 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBMigratingSessionStoreWithHeadersTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBMigratingSessionStoreWithHeadersTest.java
@@ -22,7 +22,6 @@ import org.apache.kafka.common.header.internals.RecordHeader;
 import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.LongDeserializer;
-import org.apache.kafka.common.serialization.LongSerializer;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
@@ -54,7 +53,6 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 public class RocksDBMigratingSessionStoreWithHeadersTest extends 
RocksDBStoreTest {
 
     private final Serializer<String> stringSerializer = new StringSerializer();
-    private final Serializer<Long> longSerializer = new LongSerializer();
     private final Deserializer<Long> longDeserializer = new LongDeserializer();
     private final AggregationWithHeadersSerializer<String> aggSerializer =
         new AggregationWithHeadersSerializer<>(new StringSerializer());
@@ -520,11 +518,11 @@ public class RocksDBMigratingSessionStoreWithHeadersTest 
extends RocksDBStoreTes
     }
 
     private void assertMigratedValue(final byte[] value, final String 
expectedAggregation) {
-        final Headers headers = 
AggregationWithHeadersDeserializer.headers(value);
+        final Headers headers = Utils.headers(value);
         assertFalse(headers.iterator().hasNext(), "Migrated value should have 
empty headers");
         assertArrayEquals(
             expectedAggregation.getBytes(StandardCharsets.UTF_8),
-            AggregationWithHeadersDeserializer.rawAggregation(value),
+            Utils.rawAggregation(value),
             "Migrated value should preserve original aggregation: " + 
expectedAggregation);
     }
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionToHeadersStoreAdapterTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionToHeadersStoreAdapterTest.java
index 242f404e761..2f3b3ebe65f 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionToHeadersStoreAdapterTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionToHeadersStoreAdapterTest.java
@@ -245,12 +245,12 @@ public class SessionToHeadersStoreAdapterTest {
 
     @Test
     public void shouldStripHeadersFromRawAggregationValue() {
-        final byte[] result = 
AggregationWithHeadersDeserializer.rawAggregation(VALUE_WITH_EMPTY_HEADERS);
+        final byte[] result = Utils.rawAggregation(VALUE_WITH_EMPTY_HEADERS);
         assertArrayEquals(RAW_VALUE, result);
     }
 
     @Test
     public void shouldReturnNullFromRawAggregationValueForNull() {
-        assertNull(AggregationWithHeadersDeserializer.rawAggregation(null));
+        assertNull(Utils.rawAggregation(null));
     }
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedSessionStoreUpgradeTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedSessionStoreUpgradeTest.java
index d9edd2cacdc..260567f137c 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedSessionStoreUpgradeTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedSessionStoreUpgradeTest.java
@@ -161,18 +161,18 @@ public class TimeOrderedSessionStoreUpgradeTest {
         // Verify old data readable with empty headers via lazy migration
         byte[] fetch = newStore.fetchSession(key1, 100, 200);
         assertNotNull(fetch);
-        assertEquals("value1", new 
String(AggregationWithHeadersDeserializer.rawAggregation(fetch)));
-        assertEquals(0, 
AggregationWithHeadersDeserializer.headers(fetch).toArray().length, "Old data 
should have empty headers after migration");
+        assertEquals("value1", new String(Utils.rawAggregation(fetch)));
+        assertEquals(0, Utils.headers(fetch).toArray().length, "Old data 
should have empty headers after migration");
 
         fetch = newStore.fetchSession(key2, 150, 250);
         assertNotNull(fetch);
-        assertEquals("value2", new 
String(AggregationWithHeadersDeserializer.rawAggregation(fetch)));
-        assertEquals(0, 
AggregationWithHeadersDeserializer.headers(fetch).toArray().length, "Old data 
should have empty headers after migration");
+        assertEquals("value2", new String(Utils.rawAggregation(fetch)));
+        assertEquals(0, Utils.headers(fetch).toArray().length, "Old data 
should have empty headers after migration");
 
         fetch = newStore.fetchSession(key3, 200, 300);
         assertNotNull(fetch);
-        assertEquals("value3", new 
String(AggregationWithHeadersDeserializer.rawAggregation(fetch)));
-        assertEquals(0, 
AggregationWithHeadersDeserializer.headers(fetch).toArray().length, "Old data 
should have empty headers after migration");
+        assertEquals("value3", new String(Utils.rawAggregation(fetch)));
+        assertEquals(0, Utils.headers(fetch).toArray().length, "Old data 
should have empty headers after migration");
 
         newStore.close();
     }
@@ -201,7 +201,7 @@ public class TimeOrderedSessionStoreUpgradeTest {
         // Verify old data still accessible
         final byte[] fetch = newStore.fetchSession(key1, 100, 200);
         assertNotNull(fetch);
-        assertEquals("value1", new 
String(AggregationWithHeadersDeserializer.rawAggregation(fetch)));
+        assertEquals("value1", new String(Utils.rawAggregation(fetch)));
 
         newStore.close();
     }
@@ -230,7 +230,7 @@ public class TimeOrderedSessionStoreUpgradeTest {
         // Verify old data still accessible
         final byte[] fetch = newStore.fetchSession(key1, 100, 200);
         assertNotNull(fetch);
-        assertEquals("value1", new 
String(AggregationWithHeadersDeserializer.rawAggregation(fetch)));
+        assertEquals("value1", new String(Utils.rawAggregation(fetch)));
 
         newStore.close();
     }
@@ -260,15 +260,15 @@ public class TimeOrderedSessionStoreUpgradeTest {
             serializeValueWithHeaders("value2".getBytes(), headersWithData));
 
         // Verify values
-        assertEquals("value1", new 
String(AggregationWithHeadersDeserializer.rawAggregation(store.fetchSession(key1,
 100, 200))));
-        assertEquals("value2", new 
String(AggregationWithHeadersDeserializer.rawAggregation(store.fetchSession(key2,
 150, 250))));
+        assertEquals("value1", new 
String(Utils.rawAggregation(store.fetchSession(key1, 100, 200))));
+        assertEquals("value2", new 
String(Utils.rawAggregation(store.fetchSession(key2, 150, 250))));
 
         // Verify headers for key1 (empty)
-        final Headers key1Headers = 
AggregationWithHeadersDeserializer.headers(store.fetchSession(key1, 100, 200));
+        final Headers key1Headers = Utils.headers(store.fetchSession(key1, 
100, 200));
         assertEquals(0, key1Headers.toArray().length);
 
         // Verify headers for key2 (with data)
-        final Headers key2Headers = 
AggregationWithHeadersDeserializer.headers(store.fetchSession(key2, 150, 250));
+        final Headers key2Headers = Utils.headers(store.fetchSession(key2, 
150, 250));
         assertEquals(2, key2Headers.toArray().length);
         assertEquals("header-value-1", new 
String(key2Headers.lastHeader("header-key-1").value()));
         assertEquals("header-value-2", new 
String(key2Headers.lastHeader("header-key-2").value()));
@@ -323,8 +323,8 @@ public class TimeOrderedSessionStoreUpgradeTest {
         // Verify old data readable with empty headers via lazy migration
         final byte[] fetch = newStore.fetchSession(key1, 100, 200);
         assertNotNull(fetch, "Old data should be readable after upgrade via 
DSL supplier path");
-        assertEquals("value1", new 
String(AggregationWithHeadersDeserializer.rawAggregation(fetch)));
-        assertEquals(0, 
AggregationWithHeadersDeserializer.headers(fetch).toArray().length,
+        assertEquals("value1", new String(Utils.rawAggregation(fetch)));
+        assertEquals(0, Utils.headers(fetch).toArray().length,
             "Old data should have empty headers after migration");
 
         newStore.close();
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/UtilsTest.java 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/UtilsTest.java
index 91f39e83a80..631f906ee83 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/UtilsTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/UtilsTest.java
@@ -35,13 +35,16 @@ import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 import java.util.stream.Stream;
 
+import static org.apache.kafka.streams.state.internals.Utils.hasEmptyHeaders;
 import static org.apache.kafka.streams.state.internals.Utils.rawPlainValue;
 import static 
org.apache.kafka.streams.state.internals.Utils.rawTimestampedValue;
 import static org.apache.kafka.streams.state.internals.Utils.readBytes;
 import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class UtilsTest {
     private static final String TOPIC = "test-topic";
@@ -52,6 +55,8 @@ public class UtilsTest {
     // Header size's varint encoding cannot exceed 5 bytes (see @{link 
ByteUtils#readVarint(ByteBuffer)})
     private static final int MAX_VARINT_SIZE = 5;
     private static final int OVERFLOW_HEADERS_SIZE = (1 + MAX_VARINT_SIZE) + 
HEADERS.length + StateSerdes.TIMESTAMP_SIZE + VALUE.length;
+    // 1 byte header size, 0 byte empty headers, and timetsamp
+    private static final int MIN_SIZE = 1 + 0 + StateSerdes.TIMESTAMP_SIZE;
 
     @Test
     public void shouldExtractRawPlainValue() {
@@ -75,7 +80,13 @@ public class UtilsTest {
 
     @Test
     public void shouldReturnNullForNullRawTimestampedValue() {
-        assertNull(rawPlainValue(null));
+        assertNull(rawTimestampedValue(null));
+    }
+
+    @Test
+    public void shouldExtractRawValueWithEmptyHeaders() {
+        final byte[] res = 
rawPlainValue(timestampedValueWithEmptyHeaders(VALUE));
+        assertArrayEquals(VALUE, res);
     }
 
     @ParameterizedTest
@@ -90,6 +101,19 @@ public class UtilsTest {
         assertArrayEquals(timestampedValueOf(value), outputBytes);
     }
 
+    @Test
+    public void testRawTimestampedValueWithEmptyHeaders() {
+        final byte[] input = timestampedValueWithEmptyHeaders(VALUE);
+        final byte[] res = rawTimestampedValue(input);
+        final ByteBuffer buf = ByteBuffer.wrap(res);
+        assertEquals(TIMESTAMP, buf.getLong());
+
+        assertEquals(VALUE.length, buf.remaining());
+        final byte[] resValue = new byte[buf.remaining()];
+        buf.get(resValue);
+        assertArrayEquals(VALUE, resValue);
+    }
+
     @ParameterizedTest
     @MethodSource("invalidHeaderSizes")
     public void testRawTimestampedValueWithInvalidHeadersSize(final int 
invalidHeaderSize) {
@@ -120,6 +144,25 @@ public class UtilsTest {
         }
     }
 
+    @Test
+    public void testEmptyHeadersAndTimestamp() {
+        final byte[] empty = new byte[MIN_SIZE];
+        empty[0] = (byte) 0x00; // header size
+        assertTrue(hasEmptyHeaders(empty));
+
+        final byte[] nonEmpty = new byte[MIN_SIZE];
+        nonEmpty[0] = (byte) 0x01; // header size
+        assertFalse(hasEmptyHeaders(nonEmpty));
+    }
+
+    @ParameterizedTest
+    @ValueSource(bytes = { 0x10, 0x11 })
+    public void testEmptyHeadersAndTimestampWithInvalidHeaderSizes(final int 
invalidSize) {
+        final byte[] invalid = new byte[MIN_SIZE];
+        invalid[0] = (byte) invalidSize; // header size
+        assertFalse(hasEmptyHeaders(invalid));
+    }
+
     @Test
     public void testReadBytes() {
         final ByteBuffer buf = ByteBuffer.wrap(VALUE);
@@ -142,6 +185,26 @@ public class UtilsTest {
         assertThrows(SerializationException.class, () -> readBytes(buf, 1));
     }
 
+    private static byte[] timestampedValueWithEmptyHeaders(final byte[] value) 
{
+        // header size: 1 byte, emtpy headers: 0 byte, timestamp: 8 bytes, 
plain value length
+        final byte[] res = new byte[1 + 0 + StateSerdes.TIMESTAMP_SIZE + 
value.length];
+        final ByteBuffer buf = ByteBuffer.wrap(res);
+        buf.put((byte) 0x00); // header size
+        buf.putLong(TIMESTAMP);
+        buf.put(VALUE);
+        return res;
+    }
+
+    private static byte[] timestampedValueWithEmptyHeadersInvalidTimestamp() {
+        // header size: 1 byte, empty headers: 0 bytes, invalid timestamp: 1 
byte, value: 1 byte
+        final byte[] invalid = new byte[1 + 0 + 1 + 1];
+        final ByteBuffer buf = ByteBuffer.wrap(invalid);
+        buf.put((byte) 0x00); // header size
+        buf.put((byte) 0x01); // invalid timestamp, should be 
StateSerde.TIMESTAMP_SIZE
+        buf.put((byte) 0x02); // plain value, small enough for the whole data 
to go under the min data size
+        return invalid;
+    }
+
     private static Stream<Arguments> invalidHeaderSizes() {
         return Stream.of(-1, Integer.MAX_VALUE, Integer.MIN_VALUE, 
OVERFLOW_HEADERS_SIZE).map(Arguments::of);
     }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ValueTimestampHeadersDeserializerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ValueTimestampHeadersDeserializerTest.java
index 8a4471580c6..4755eb3257c 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ValueTimestampHeadersDeserializerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ValueTimestampHeadersDeserializerTest.java
@@ -28,8 +28,12 @@ import org.apache.kafka.streams.state.ValueTimestampHeaders;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
 
 import java.util.Iterator;
+import java.util.stream.Stream;
 
 import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -202,24 +206,30 @@ public class ValueTimestampHeadersDeserializerTest {
         assertNull(value);
     }
 
-    @Test
-    public void shouldExtractTimestamp() {
-        final Headers headers = new RecordHeaders()
-            .add("key1", "value1".getBytes());
+    @ParameterizedTest
+    @MethodSource("headers")
+    public void shouldExtractTimestamp(final Headers headers) {
         final ValueTimestampHeaders<String> original =
             ValueTimestampHeaders.make("test-value", 123456789L, headers);
 
         final byte[] serialized = serializer.serialize(TOPIC, original);
-        final long timestamp = 
ValueTimestampHeadersDeserializer.timestamp(serialized);
+        final long timestamp = Utils.timestamp(serialized);
 
         assertEquals(123456789L, timestamp);
     }
 
+    private static Stream<Arguments> headers() {
+        return Stream.of(
+                new RecordHeaders().add("key1", "value1".getBytes()),
+                new RecordHeaders()
+            ).map(Arguments::of);
+    }
+
     @Test
     public void shouldThrowExceptionWhenExtractingTimestampFromNull() {
         // ByteBuffer.wrap() throws NullPointerException for null input
         assertThrows(NullPointerException.class, () ->
-            ValueTimestampHeadersDeserializer.timestamp(null)
+            Utils.timestamp(null)
         );
     }
 
@@ -232,7 +242,7 @@ public class ValueTimestampHeadersDeserializerTest {
             ValueTimestampHeaders.make("test-value", 123456789L, headers);
 
         final byte[] serialized = serializer.serialize(TOPIC, original);
-        final Headers extractedHeaders = 
ValueTimestampHeadersDeserializer.headers(serialized);
+        final Headers extractedHeaders = Utils.headers(serialized);
 
         assertNotNull(extractedHeaders);
         assertEquals(2, extractedHeaders.toArray().length);
@@ -247,7 +257,7 @@ public class ValueTimestampHeadersDeserializerTest {
             ValueTimestampHeaders.make("test-value", 123456789L, headers);
 
         final byte[] serialized = serializer.serialize(TOPIC, original);
-        final Headers extractedHeaders = 
ValueTimestampHeadersDeserializer.headers(serialized);
+        final Headers extractedHeaders = Utils.headers(serialized);
 
         assertNotNull(extractedHeaders);
         assertEquals(0, extractedHeaders.toArray().length);
@@ -255,7 +265,7 @@ public class ValueTimestampHeadersDeserializerTest {
 
     @Test
     public void shouldReturnNullWhenExtractingHeadersFromNull() {
-        final Headers headers = 
ValueTimestampHeadersDeserializer.headers(null);
+        final Headers headers = Utils.headers(null);
         assertNull(headers);
     }
 

Reply via email to