This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ce8c5f84978 KAFKA-20303: Optimize deserializers and HeadersBytesStore 
(#21769)
ce8c5f84978 is described below

commit ce8c5f84978d065cfde12343347da1384c0ab726
Author: Zheguang Zhao <[email protected]>
AuthorDate: Tue Mar 17 06:07:49 2026 +1100

    KAFKA-20303: Optimize deserializers and HeadersBytesStore (#21769)
    
    This patch
    - optimizes `HeadersBytesStore.convertToHeaderFormat` convertion method
    from value with timetamp to with empty headers
    - does not optimize `HeadersDeserializer.deserialize` because existing
    logic on empty headers using ByteBuffer's first byte isn't slower than
    `byte[0]` (Benchmark result in reverted commit:
    
    
https://github.com/apache/kafka/pull/21769/commits/cd6bbe595ba2b0319952aded2622984fcc368544
    for reproducibility)
    
    The optimization is to use `System.arraycopy` in place of
    `ByteBuffer.put`.  The JMH benchmark in this patch shows 1.2x speedup in
    throughput.
    
    **Benchmark**
    ```
    Benchmark                                                  Mode  Cnt
    Score     Error  Units
    RawBytesExtractionBenchmark.testConvertToHeaderFormat     thrpt   15
    9286.590 ±  83.966  ops/s
    RawBytesExtractionBenchmark.testConvertToHeaderFormatOpt  thrpt   15
    11636.125 ±  54.813  ops/s
    
    RawBytesExtractionBenchmark.testDeserialize               thrpt   15
    15556.237 ± 243.880  ops/s
    RawBytesExtractionBenchmark.testDeserializeOpt            thrpt   15
    15676.273 ±  65.334  ops/s
    JMH benchmarks done
    ```
    
    **Test**
    - HeadersBytesStoreTest existing coverage
    
    
    Reviewers: Bill Bejeck <[email protected]>
---
 checkstyle/import-control-jmh-benchmarks.xml       |   1 +
 .../jmh/streams/RawBytesExtractionBenchmark.java   | 104 +++++++++++++++++++++
 .../kafka/streams/state/HeadersBytesStore.java     |  10 +-
 3 files changed, 109 insertions(+), 6 deletions(-)

diff --git a/checkstyle/import-control-jmh-benchmarks.xml 
b/checkstyle/import-control-jmh-benchmarks.xml
index 7cafa743e15..8828e713e49 100644
--- a/checkstyle/import-control-jmh-benchmarks.xml
+++ b/checkstyle/import-control-jmh-benchmarks.xml
@@ -43,6 +43,7 @@
     <allow pkg="org.apache.kafka.security.authorizer"/>
     <allow pkg="org.apache.kafka.server"/>
     <allow pkg="org.apache.kafka.storage"/>
+    <allow pkg="org.apache.kafka.streams"/>
     <allow pkg="org.apache.kafka.clients"/>
     <allow pkg="org.apache.kafka.coordinator.common.runtime"/>
     <allow pkg="org.apache.kafka.coordinator.group"/>
diff --git 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/streams/RawBytesExtractionBenchmark.java
 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/streams/RawBytesExtractionBenchmark.java
new file mode 100644
index 00000000000..f0f50f81955
--- /dev/null
+++ 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/streams/RawBytesExtractionBenchmark.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.jmh.streams;
+
+import org.apache.kafka.streams.state.HeadersBytesStore;
+import org.apache.kafka.streams.state.StateSerdes;
+
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.CompilerControl;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.infra.Blackhole;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.TimeUnit;
+
+@OutputTimeUnit(TimeUnit.SECONDS)
+@Fork(3)
+@Warmup(iterations = 3, time = 1)
+@Measurement(iterations = 5, time = 1)
+public class RawBytesExtractionBenchmark {
+    private static final int DATA_SET_SAMPLE_SIZE = 16384;
+
+    @State(Scope.Benchmark)
+    public static class IterationStateForValues {
+        protected byte[][] values;
+
+        byte[][] getRandomValues() {
+            return values;
+        }
+    }
+
+    @State(Scope.Benchmark)
+    public static class IterationStateForEmptyHeadersTimestamp extends 
IterationStateForValues {
+        @Setup(Level.Iteration)
+        public void setup() {
+            this.values = new byte[DATA_SET_SAMPLE_SIZE][];
+            for (int i = 0; i < DATA_SET_SAMPLE_SIZE; i++) {
+                values[i] = new byte[1 + StateSerdes.TIMESTAMP_SIZE + 8];
+                final ByteBuffer buf = ByteBuffer.wrap(values[i]);
+                buf.put((byte) 0x00); // header size
+                buf.putLong(123456789L); // timestamp
+                buf.putLong((long) i); // non-header payload
+            }
+        }
+    }
+
+    @Benchmark
+    @CompilerControl(CompilerControl.Mode.DONT_INLINE)
+    public void 
testConvertToHeaderFormat(IterationStateForEmptyHeadersTimestamp state, 
Blackhole bh) {
+        for (byte[] randomValue : state.getRandomValues()) {
+            bh.consume(convertToHeaderFormatPre20303(randomValue));
+        }
+    }
+
+    @Benchmark
+    @CompilerControl(CompilerControl.Mode.DONT_INLINE)
+    public void 
testConvertToHeaderFormatOpt(IterationStateForEmptyHeadersTimestamp state, 
Blackhole bh) {
+        for (byte[] randomValue : state.getRandomValues()) {
+            bh.consume(HeadersBytesStore.convertToHeaderFormat(randomValue));
+        }
+    }
+
+    /**
+     * Prior to KAFKA-20303 - HeadersBytesStore.convertToHeaderFormat
+     */
+    private static byte[] convertToHeaderFormatPre20303(final byte[] 
valueAndTimestamp) {
+        if (valueAndTimestamp == null) {
+            return null;
+        }
+
+        // Format: [headersSize(varint)][headersBytes][payload]
+        // For empty headers:
+        //   headersSize = varint(0) = [0x00]
+        //   headersBytes = [] (empty, 0 bytes)
+        // Result: [0x00][payload]
+        return ByteBuffer
+            .allocate(1 + valueAndTimestamp.length)
+            .put((byte) 0x00)
+            .put(valueAndTimestamp)
+            .array();
+    }
+}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/HeadersBytesStore.java 
b/streams/src/main/java/org/apache/kafka/streams/state/HeadersBytesStore.java
index 06f85595047..67c47fbfd80 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/HeadersBytesStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/HeadersBytesStore.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.streams.state;
 
-import java.nio.ByteBuffer;
 
 /**
  * Marker interface to indicate that a bytes store understands the 
value-with-headers format
@@ -52,11 +51,10 @@ public interface HeadersBytesStore {
         //   headersSize = varint(0) = [0x00]
         //   headersBytes = [] (empty, 0 bytes)
         // Result: [0x00][payload]
-        return ByteBuffer
-            .allocate(1 + valueAndTimestamp.length)
-            .put((byte) 0x00)
-            .put(valueAndTimestamp)
-            .array();
+        final byte[] res = new byte[1 + valueAndTimestamp.length];
+        // res[0] is initialized to 0x00 per Java Specification
+        System.arraycopy(valueAndTimestamp, 0, res, 1, 
valueAndTimestamp.length);
+        return res;
     }
 
     static byte[] convertFromPlainToHeaderFormat(final byte[] value) {

Reply via email to