This is an automated email from the ASF dual-hosted git repository.
bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new ce8c5f84978 KAFKA-20303: Optimize deserializers and HeadersBytesStore
(#21769)
ce8c5f84978 is described below
commit ce8c5f84978d065cfde12343347da1384c0ab726
Author: Zheguang Zhao <[email protected]>
AuthorDate: Tue Mar 17 06:07:49 2026 +1100
KAFKA-20303: Optimize deserializers and HeadersBytesStore (#21769)
This patch
- optimizes `HeadersBytesStore.convertToHeaderFormat` convertion method
from value with timetamp to with empty headers
- does not optimize `HeadersDeserializer.deserialize` because existing
logic on empty headers using ByteBuffer's first byte isn't slower than
`byte[0]` (Benchmark result in reverted commit:
https://github.com/apache/kafka/pull/21769/commits/cd6bbe595ba2b0319952aded2622984fcc368544
for reproducibility)
The optimization is to use `System.arraycopy` in place of
`ByteBuffer.put`. The JMH benchmark in this patch shows 1.2x speedup in
throughput.
**Benchmark**
```
Benchmark Mode Cnt
Score Error Units
RawBytesExtractionBenchmark.testConvertToHeaderFormat thrpt 15
9286.590 ± 83.966 ops/s
RawBytesExtractionBenchmark.testConvertToHeaderFormatOpt thrpt 15
11636.125 ± 54.813 ops/s
RawBytesExtractionBenchmark.testDeserialize thrpt 15
15556.237 ± 243.880 ops/s
RawBytesExtractionBenchmark.testDeserializeOpt thrpt 15
15676.273 ± 65.334 ops/s
JMH benchmarks done
```
**Test**
- HeadersBytesStoreTest existing coverage
Reviewers: Bill Bejeck <[email protected]>
---
checkstyle/import-control-jmh-benchmarks.xml | 1 +
.../jmh/streams/RawBytesExtractionBenchmark.java | 104 +++++++++++++++++++++
.../kafka/streams/state/HeadersBytesStore.java | 10 +-
3 files changed, 109 insertions(+), 6 deletions(-)
diff --git a/checkstyle/import-control-jmh-benchmarks.xml
b/checkstyle/import-control-jmh-benchmarks.xml
index 7cafa743e15..8828e713e49 100644
--- a/checkstyle/import-control-jmh-benchmarks.xml
+++ b/checkstyle/import-control-jmh-benchmarks.xml
@@ -43,6 +43,7 @@
<allow pkg="org.apache.kafka.security.authorizer"/>
<allow pkg="org.apache.kafka.server"/>
<allow pkg="org.apache.kafka.storage"/>
+ <allow pkg="org.apache.kafka.streams"/>
<allow pkg="org.apache.kafka.clients"/>
<allow pkg="org.apache.kafka.coordinator.common.runtime"/>
<allow pkg="org.apache.kafka.coordinator.group"/>
diff --git
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/streams/RawBytesExtractionBenchmark.java
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/streams/RawBytesExtractionBenchmark.java
new file mode 100644
index 00000000000..f0f50f81955
--- /dev/null
+++
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/streams/RawBytesExtractionBenchmark.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.jmh.streams;
+
+import org.apache.kafka.streams.state.HeadersBytesStore;
+import org.apache.kafka.streams.state.StateSerdes;
+
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.CompilerControl;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.infra.Blackhole;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.TimeUnit;
+
+@OutputTimeUnit(TimeUnit.SECONDS)
+@Fork(3)
+@Warmup(iterations = 3, time = 1)
+@Measurement(iterations = 5, time = 1)
+public class RawBytesExtractionBenchmark {
+ private static final int DATA_SET_SAMPLE_SIZE = 16384;
+
+ @State(Scope.Benchmark)
+ public static class IterationStateForValues {
+ protected byte[][] values;
+
+ byte[][] getRandomValues() {
+ return values;
+ }
+ }
+
+ @State(Scope.Benchmark)
+ public static class IterationStateForEmptyHeadersTimestamp extends
IterationStateForValues {
+ @Setup(Level.Iteration)
+ public void setup() {
+ this.values = new byte[DATA_SET_SAMPLE_SIZE][];
+ for (int i = 0; i < DATA_SET_SAMPLE_SIZE; i++) {
+ values[i] = new byte[1 + StateSerdes.TIMESTAMP_SIZE + 8];
+ final ByteBuffer buf = ByteBuffer.wrap(values[i]);
+ buf.put((byte) 0x00); // header size
+ buf.putLong(123456789L); // timestamp
+ buf.putLong((long) i); // non-header payload
+ }
+ }
+ }
+
+ @Benchmark
+ @CompilerControl(CompilerControl.Mode.DONT_INLINE)
+ public void
testConvertToHeaderFormat(IterationStateForEmptyHeadersTimestamp state,
Blackhole bh) {
+ for (byte[] randomValue : state.getRandomValues()) {
+ bh.consume(convertToHeaderFormatPre20303(randomValue));
+ }
+ }
+
+ @Benchmark
+ @CompilerControl(CompilerControl.Mode.DONT_INLINE)
+ public void
testConvertToHeaderFormatOpt(IterationStateForEmptyHeadersTimestamp state,
Blackhole bh) {
+ for (byte[] randomValue : state.getRandomValues()) {
+ bh.consume(HeadersBytesStore.convertToHeaderFormat(randomValue));
+ }
+ }
+
+ /**
+ * Prior to KAFKA-20303 - HeadersBytesStore.convertToHeaderFormat
+ */
+ private static byte[] convertToHeaderFormatPre20303(final byte[]
valueAndTimestamp) {
+ if (valueAndTimestamp == null) {
+ return null;
+ }
+
+ // Format: [headersSize(varint)][headersBytes][payload]
+ // For empty headers:
+ // headersSize = varint(0) = [0x00]
+ // headersBytes = [] (empty, 0 bytes)
+ // Result: [0x00][payload]
+ return ByteBuffer
+ .allocate(1 + valueAndTimestamp.length)
+ .put((byte) 0x00)
+ .put(valueAndTimestamp)
+ .array();
+ }
+}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/HeadersBytesStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/HeadersBytesStore.java
index 06f85595047..67c47fbfd80 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/HeadersBytesStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/HeadersBytesStore.java
@@ -16,7 +16,6 @@
*/
package org.apache.kafka.streams.state;
-import java.nio.ByteBuffer;
/**
* Marker interface to indicate that a bytes store understands the
value-with-headers format
@@ -52,11 +51,10 @@ public interface HeadersBytesStore {
// headersSize = varint(0) = [0x00]
// headersBytes = [] (empty, 0 bytes)
// Result: [0x00][payload]
- return ByteBuffer
- .allocate(1 + valueAndTimestamp.length)
- .put((byte) 0x00)
- .put(valueAndTimestamp)
- .array();
+ final byte[] res = new byte[1 + valueAndTimestamp.length];
+ // res[0] is initialized to 0x00 per Java Specification
+ System.arraycopy(valueAndTimestamp, 0, res, 1,
valueAndTimestamp.length);
+ return res;
}
static byte[] convertFromPlainToHeaderFormat(final byte[] value) {