This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch 4.3
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.3 by this push:
new 000d82c0c14 KAFKA-20249: Optimize raw value extraction in
headers-aware deserializers (#21706)
000d82c0c14 is described below
commit 000d82c0c14a4acbfaec78b8c2f03dd19d2e47ee
Author: Zheguang Zhao <[email protected]>
AuthorDate: Sat Mar 21 10:08:43 2026 +1100
KAFKA-20249: Optimize raw value extraction in headers-aware deserializers
(#21706)
This patch implements two optimizations, and their JMH benchmarks, and
opportunistic refactoring.
1. Skipping headers
Previously the raw value extraction in headers-aware deserializers
undergoes deserialization and/or copying of headers, while only skipping
is required. This happens in one case for both empty and nonempty
headers.
2. Extracting value / timestamp after empty headers
Empty headers have constant metadata footprint: the headers size is
varint-encoded 1 byte of 0, and headers themselves consume no bytes.
Based on this invariant, the ByteBuffer-based extraction can be replaced
with a direct `System.arraycopy`, which is a Java native method
optimized for specific platforms.
The optimized headers-aware extraction methods:
- rawAggregation
- rawTimestampedValue
- rawValue / rawPlainValue
- ValueTimestampHeadersDeserializer.timestamp
- ValueTimestampHeadersDeserializer.headers
- AggregationWithHeadersDeserializer.headers
**Benchmark:**
This patch also includes JMH benchmarks to test the speedup. On my
local machine, Optimization 1 speedup is 2-6x speedup. Optimization 2
is 1.2-1.3x.
Below is the throughput comparison of a recorded JMH run (higher score
is better):
```
Benchmark
Mode Cnt Score Error Units
RawBytesExtractionBenchmark.testHeadersWithoutHeaders
thrpt 15 10158.764 ± 85.564 ops/s
RawBytesExtractionBenchmark.testHeadersWithoutHeadersOpt
thrpt 15 14824.176 ± 1244.455 ops/s
RawBytesExtractionBenchmark.testRawAggregationWithHeaders thrpt 15
1473.459 ± 7.170 ops/s
RawBytesExtractionBenchmark.testRawAggregationWithHeadersOpt thrpt 15
11618.187 ± 235.385 ops/s
RawBytesExtractionBenchmark.testRawAggregationWithoutHeaders thrpt 15
8337.728 ± 199.919 ops/s
RawBytesExtractionBenchmark.testRawAggregationWithoutHeadersOpt thrpt
15 14564.899 ± 186.405 ops/s
RawBytesExtractionBenchmark.testRawTimestampedValueWithoutHeaders thrpt
15 10217.292 ± 108.552 ops/s
RawBytesExtractionBenchmark.testRawTimestampedValueWithoutHeadersOpt
thrpt 15 12121.074 ± 201.235 ops/s
RawBytesExtractionBenchmark.testRawValueWithoutHeaders thrpt 15
11632.484 ± 138.505 ops/s
RawBytesExtractionBenchmark.testRawValueWithoutHeadersOpt thrpt 15
14669.563 ± 43.458 ops/s
RawBytesExtractionBenchmark.testTimestampWithoutHeaders
thrpt 15 14858.778 ± 39.301 ops/s
RawBytesExtractionBenchmark.testTimestampWithoutHeadersOpt
thrpt 15 19832.718 ± 916.980 ops/s
JMH benchmarks done
```
**Test:**
- `AggregationWithHeadersDeserializer.rawAggregate`
- empty headers:
`SessionToHeadersStoreAdapterTest.shouldStripHeadersFromRawAggregationValue`
- `Utils.rawPlainValue`
- empty headers: `UtilsTest.shouldExtractRawValueWithEmptyHeaders`
- empty headers, no timestamp:
`UtilsTest.testRawPlainValueWithEmptyHeadersAndInvalidTimestamp`
- `Utils.rawTimestampedValue`
- empty headers: `UtilsTest.testRawTimestampedValueWithEmptyHeaders`
- empty headers, no timestamp:
`UtilsTest.testRawTimestampedValueWithEmptyHeadersAndInvalidTimestamp`
- `Utils.hasEmptyHeadersAndTimestamp`
- min size violation
- empty headers
- non-empty headers
**Refactor**
- point all calls to raw value (with timestamp and headers) extraction
to common one in Utils.
Reviewers: Alieh Saeedi <[email protected]>, TengYao Chi
<[email protected]>, Matthias J. Sax <[email protected]>
---
.../jmh/streams/RawBytesExtractionBenchmark.java | 221 ++++++++++++++++++++-
.../kafka/streams/state/HeadersBytesStore.java | 8 +-
.../AggregationWithHeadersDeserializer.java | 35 +---
.../ChangeLoggingSessionBytesStoreWithHeaders.java | 4 +-
...ngTimestampedKeyValueBytesStoreWithHeaders.java | 14 +-
...gingTimestampedWindowBytesStoreWithHeaders.java | 8 +-
...MeteredTimestampedKeyValueStoreWithHeaders.java | 1 -
.../MeteredTimestampedWindowStoreWithHeaders.java | 1 -
.../internals/SessionToHeadersStoreAdapter.java | 2 +-
.../kafka/streams/state/internals/Utils.java | 149 +++++++++++---
.../ValueTimestampHeadersDeserializer.java | 50 +----
.../AggregationWithHeadersDeserializerTest.java | 33 ++-
...ocksDBMigratingSessionStoreWithHeadersTest.java | 6 +-
.../SessionToHeadersStoreAdapterTest.java | 4 +-
.../TimeOrderedSessionStoreUpgradeTest.java | 28 +--
.../kafka/streams/state/internals/UtilsTest.java | 65 +++++-
.../ValueTimestampHeadersDeserializerTest.java | 28 ++-
17 files changed, 485 insertions(+), 172 deletions(-)
diff --git
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/streams/RawBytesExtractionBenchmark.java
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/streams/RawBytesExtractionBenchmark.java
index f0f50f81955..6fca76f18eb 100644
---
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/streams/RawBytesExtractionBenchmark.java
+++
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/streams/RawBytesExtractionBenchmark.java
@@ -17,8 +17,13 @@
package org.apache.kafka.jmh.streams;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.utils.ByteUtils;
import org.apache.kafka.streams.state.HeadersBytesStore;
import org.apache.kafka.streams.state.StateSerdes;
+import org.apache.kafka.streams.state.internals.Utils;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.CompilerControl;
@@ -41,6 +46,7 @@ import java.util.concurrent.TimeUnit;
@Measurement(iterations = 5, time = 1)
public class RawBytesExtractionBenchmark {
private static final int DATA_SET_SAMPLE_SIZE = 16384;
+ private static final LongDeserializer LONG_DESERIALIZER = new
LongDeserializer();
@State(Scope.Benchmark)
public static class IterationStateForValues {
@@ -52,11 +58,25 @@ public class RawBytesExtractionBenchmark {
}
@State(Scope.Benchmark)
- public static class IterationStateForEmptyHeadersTimestamp extends
IterationStateForValues {
+ public static class IterationStateForEmptyHeaders extends
IterationStateForValues {
@Setup(Level.Iteration)
public void setup() {
this.values = new byte[DATA_SET_SAMPLE_SIZE][];
for (int i = 0; i < DATA_SET_SAMPLE_SIZE; i++) {
+ values[i] = new byte[1 + 8];
+ final ByteBuffer buf = ByteBuffer.wrap(values[i]);
+ buf.put((byte) 0x00); // header size
+ buf.putLong((long) i); // non-header payload
+ }
+ }
+ }
+
+ @State(Scope.Benchmark)
+ public static class IterationStateForEmptyHeadersTimestamp extends
IterationStateForValues {
+ @Setup(Level.Iteration)
+ public void setup() {
+ this.values = new byte[DATA_SET_SAMPLE_SIZE][];
+ for (int i = 0; i < DATA_SET_SAMPLE_SIZE; i++) {
values[i] = new byte[1 + StateSerdes.TIMESTAMP_SIZE + 8];
final ByteBuffer buf = ByteBuffer.wrap(values[i]);
buf.put((byte) 0x00); // header size
@@ -66,6 +86,122 @@ public class RawBytesExtractionBenchmark {
}
}
+ @State(Scope.Benchmark)
+ public static class IterationStateForHeaders extends
IterationStateForValues {
+ @Setup(Level.Iteration)
+ public void setup() {
+ this.values = new byte[DATA_SET_SAMPLE_SIZE][];
+ for (int i = 0; i < DATA_SET_SAMPLE_SIZE; i++) {
+ values[i] = new byte[1 + 1 + (1 + 4) + (1 + 4) + 8];
+ final ByteBuffer buf = ByteBuffer.wrap(values[i]);
+ ByteUtils.writeVarint(11, buf); // 1-byte header size of 11
+ ByteUtils.writeVarint(1, buf); // 1-byte header count of 1
+ ByteUtils.writeVarint(4, buf); // 1-byte header key size
+ buf.putInt(i + 1); // 4-byte header key
+ ByteUtils.writeVarint(4, buf); // 1-byte header value size
+ buf.putInt(i + 1); // 4-byte header value
+ buf.putLong((long) i + 1); // plain value
+ }
+ }
+ }
+
+
+ @Benchmark
+ @CompilerControl(CompilerControl.Mode.DONT_INLINE)
+ public void testRawAggregationWithoutHeaders(IterationStateForEmptyHeaders
state, Blackhole bh) {
+ for (byte[] randomValue : state.getRandomValues()) {
+ bh.consume(rawAggregationPre20249(randomValue));
+ }
+ }
+
+ @Benchmark
+ @CompilerControl(CompilerControl.Mode.DONT_INLINE)
+ public void
testRawAggregationWithoutHeadersOpt(IterationStateForEmptyHeaders state,
Blackhole bh) {
+ for (byte[] randomValue : state.getRandomValues()) {
+ bh.consume(Utils.rawAggregation(randomValue));
+ }
+ }
+
+ @Benchmark
+ @CompilerControl(CompilerControl.Mode.DONT_INLINE)
+ public void testRawAggregationWithHeaders(IterationStateForHeaders state,
Blackhole bh) {
+ for (byte[] randomValue : state.getRandomValues()) {
+ bh.consume(rawAggregationPre20249(randomValue));
+ }
+ }
+
+ @Benchmark
+ @CompilerControl(CompilerControl.Mode.DONT_INLINE)
+ public void testRawAggregationWithHeadersOpt(IterationStateForHeaders
state, Blackhole bh) {
+ for (byte[] randomValue : state.getRandomValues()) {
+ bh.consume(Utils.rawAggregation(randomValue));
+ }
+ }
+
+ @Benchmark
+ @CompilerControl(CompilerControl.Mode.DONT_INLINE)
+ public void
testRawValueWithoutHeaders(IterationStateForEmptyHeadersTimestamp state,
Blackhole bh) {
+ for (byte[] randomValue : state.getRandomValues()) {
+ bh.consume(rawValuePre20249(randomValue));
+ }
+ }
+
+ @Benchmark
+ @CompilerControl(CompilerControl.Mode.DONT_INLINE)
+ public void
testRawValueWithoutHeadersOpt(IterationStateForEmptyHeadersTimestamp state,
Blackhole bh) {
+ for (byte[] randomValue : state.getRandomValues()) {
+ bh.consume(Utils.rawPlainValue(randomValue));
+ }
+ }
+
+ @Benchmark
+ @CompilerControl(CompilerControl.Mode.DONT_INLINE)
+ public void
testRawTimestampedValueWithoutHeaders(IterationStateForEmptyHeadersTimestamp
state, Blackhole bh) {
+ for (byte[] randomValue : state.getRandomValues()) {
+ bh.consume(rawTimestampedValuePre20249(randomValue));
+ }
+ }
+
+ @Benchmark
+ @CompilerControl(CompilerControl.Mode.DONT_INLINE)
+ public void
testRawTimestampedValueWithoutHeadersOpt(IterationStateForEmptyHeadersTimestamp
state, Blackhole bh) {
+ for (byte[] randomValue : state.getRandomValues()) {
+ bh.consume(Utils.rawTimestampedValue(randomValue));
+ }
+ }
+
+ @Benchmark
+ @CompilerControl(CompilerControl.Mode.DONT_INLINE)
+ public void testHeadersWithoutHeaders(IterationStateForEmptyHeaders state,
Blackhole bh) {
+ for (byte[] randomValue : state.getRandomValues()) {
+ bh.consume(headersPre20249(randomValue));
+ }
+ }
+
+ @Benchmark
+ @CompilerControl(CompilerControl.Mode.DONT_INLINE)
+ public void testHeadersWithoutHeadersOpt(IterationStateForEmptyHeaders
state, Blackhole bh) {
+ for (byte[] randomValue : state.getRandomValues()) {
+ bh.consume(Utils.headers(randomValue));
+ }
+ }
+
+ @Benchmark
+ @CompilerControl(CompilerControl.Mode.DONT_INLINE)
+ public void
testTimestampWithoutHeaders(IterationStateForEmptyHeadersTimestamp state,
Blackhole bh) {
+ for (byte[] randomValue : state.getRandomValues()) {
+ bh.consume(timestampPre20249(randomValue));
+ }
+ }
+
+ @Benchmark
+ @CompilerControl(CompilerControl.Mode.DONT_INLINE)
+ public void
testTimestampWithoutHeadersOpt(IterationStateForEmptyHeadersTimestamp state,
Blackhole bh) {
+ for (byte[] randomValue : state.getRandomValues()) {
+ bh.consume(Utils.timestamp(randomValue));
+ }
+ }
+
@Benchmark
@CompilerControl(CompilerControl.Mode.DONT_INLINE)
public void
testConvertToHeaderFormat(IterationStateForEmptyHeadersTimestamp state,
Blackhole bh) {
@@ -101,4 +237,87 @@ public class RawBytesExtractionBenchmark {
.put(valueAndTimestamp)
.array();
}
+
+ /**
+ * Prior to KAFKA-20249: AggregationWithHeadersDeserializer - Extract the
raw aggregation bytes from
+ * serialized AggregationWithHeaders, stripping the headers prefix.
+ */
+ private static byte[] rawAggregationPre20249(final byte[]
aggregationWithHeaders) {
+ if (aggregationWithHeaders == null) {
+ return null;
+ }
+
+ final ByteBuffer buffer = ByteBuffer.wrap(aggregationWithHeaders);
+ Utils.readHeaders(buffer);
+ return Utils.readBytes(buffer, buffer.remaining());
+ }
+
+ /**
+ * Prior to KAFKA-20249: ValueAndTimestampDeserializer - Extract raw value
from serialized
+ * ValueTimestampHeaders.
+ */
+ private static byte[] rawValuePre20249(final byte[]
rawValueTimestampHeaders) {
+ if (rawValueTimestampHeaders == null) {
+ return null;
+ }
+
+ final ByteBuffer buffer = ByteBuffer.wrap(rawValueTimestampHeaders);
+ final int headersSize = ByteUtils.readVarint(buffer);
+ buffer.position(buffer.position() + headersSize + Long.BYTES);
+ return Utils.readBytes(buffer, buffer.remaining());
+ }
+
+ /**
+ * Prior to KAFKA-20249: Extract raw timestamped value (timestamp + value)
from serialized ValueTimestampHeaders.
+ * This strips the headers portion but keeps timestamp and value intact.
+ *
+ * Format conversion:
+ * Input: [headersSize(varint)][headers][timestamp(8)][value]
+ * Output: [timestamp(8)][value]
+ */
+ private static byte[] rawTimestampedValuePre20249(final byte[]
rawValueTimestampHeaders) {
+ if (rawValueTimestampHeaders == null) {
+ return null;
+ }
+
+ final ByteBuffer buffer = ByteBuffer.wrap(rawValueTimestampHeaders);
+ final int headersSize = ByteUtils.readVarint(buffer);
+ // Skip headers, keep timestamp + value
+ if (headersSize < 0 || headersSize > buffer.remaining() ||
buffer.remaining() - headersSize < StateSerdes.TIMESTAMP_SIZE) {
+ throw new SerializationException(
+ "Invalid format: headers size " + headersSize +
+ ", timestamp expected size " + StateSerdes.TIMESTAMP_SIZE +
+ ", but buffer size " + buffer.remaining()
+ );
+ }
+ buffer.position(buffer.position() + headersSize);
+
+ final byte[] result = new byte[buffer.remaining()];
+ buffer.get(result);
+ return result;
+ }
+
+ /**
+ * Prior to KAFKA-20249 - AggregationWithHeadersDeserializer - Extract
headers from serialized AggregationWithHeaders
+ */
+ private static Headers headersPre20249(final byte[]
rawAggregationWithHeaders) {
+ if (rawAggregationWithHeaders == null) {
+ return null;
+ }
+
+ final ByteBuffer buffer = ByteBuffer.wrap(rawAggregationWithHeaders);
+ return Utils.readHeaders(buffer);
+ }
+
+ /**
+ * Prior to KAFKA-20249 - ValueTimestampHeadersDeserializer - Extract
timestamp from serialized ValueTimestampHeaders.
+ */
+ private static long timestampPre20249(final byte[]
rawValueTimestampHeaders) {
+ final ByteBuffer buffer = ByteBuffer.wrap(rawValueTimestampHeaders);
+ final int headersSize = ByteUtils.readVarint(buffer);
+ buffer.position(buffer.position() + headersSize);
+
+ final byte[] rawTimestamp = Utils.readBytes(buffer, Long.BYTES);
+ return LONG_DESERIALIZER.deserialize("", rawTimestamp);
+ }
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/HeadersBytesStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/HeadersBytesStore.java
index 8dd74fd98ee..8d20b1a9705 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/HeadersBytesStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/HeadersBytesStore.java
@@ -54,10 +54,10 @@ public interface HeadersBytesStore {
// headersSize = varint(0) = [0x00]
// headersBytes = [] (empty, 0 bytes)
// Result: [0x00][payload]
- final byte[] res = new byte[1 + value.length];
- // res[0] is initialized to 0x00 per Java Specification
- System.arraycopy(value, 0, res, 1, value.length);
- return res;
+ final byte[] valueHeaders = new byte[1 + value.length];
+ // valueHeaders[0] is initialized to 0x00 per Java Specification
+ System.arraycopy(value, 0, valueHeaders, 1, value.length);
+ return valueHeaders;
}
static byte[] convertFromPlainToHeaderFormat(final byte[] value) {
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AggregationWithHeadersDeserializer.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AggregationWithHeadersDeserializer.java
index 1c4e5e0827b..c53606c1cc8 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AggregationWithHeadersDeserializer.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AggregationWithHeadersDeserializer.java
@@ -18,7 +18,6 @@ package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.utils.ByteUtils;
import org.apache.kafka.streams.kstream.internals.WrappingNullableDeserializer;
import org.apache.kafka.streams.processor.internals.SerdeGetter;
import org.apache.kafka.streams.state.AggregationWithHeaders;
@@ -65,7 +64,7 @@ class AggregationWithHeadersDeserializer<AGG> implements
WrappingNullableDeseria
}
final ByteBuffer buffer = ByteBuffer.wrap(aggregationWithHeaders);
- final Headers headers = readHeaders(buffer);
+ final Headers headers = Utils.readHeaders(buffer);
final byte[] rawAggregation = readBytes(buffer, buffer.remaining());
final AGG aggregation = aggregationDeserializer.deserialize(topic,
headers, rawAggregation);
@@ -81,36 +80,4 @@ class AggregationWithHeadersDeserializer<AGG> implements
WrappingNullableDeseria
public void setIfUnset(final SerdeGetter getter) {
initNullableDeserializer(aggregationDeserializer, getter);
}
-
- /**
- * Extract headers from serialized AggregationWithHeaders.
- */
- static Headers headers(final byte[] rawAggregationWithHeaders) {
- if (rawAggregationWithHeaders == null) {
- return null;
- }
-
- final ByteBuffer buffer = ByteBuffer.wrap(rawAggregationWithHeaders);
- return readHeaders(buffer);
- }
-
- /**
- * Extract the raw aggregation bytes from serialized
AggregationWithHeaders,
- * stripping the headers prefix.
- */
- static byte[] rawAggregation(final byte[] aggregationWithHeaders) {
- if (aggregationWithHeaders == null) {
- return null;
- }
-
- final ByteBuffer buffer = ByteBuffer.wrap(aggregationWithHeaders);
- readHeaders(buffer);
- return readBytes(buffer, buffer.remaining());
- }
-
- private static Headers readHeaders(final ByteBuffer buffer) {
- final int headersSize = ByteUtils.readVarint(buffer);
- final byte[] rawHeaders = readBytes(buffer, headersSize);
- return HeadersDeserializer.deserialize(rawHeaders);
- }
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreWithHeaders.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreWithHeaders.java
index 1f8bf204dbf..4f076ada623 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreWithHeaders.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreWithHeaders.java
@@ -20,8 +20,8 @@ import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.state.SessionStore;
-import static
org.apache.kafka.streams.state.internals.AggregationWithHeadersDeserializer.headers;
-import static
org.apache.kafka.streams.state.internals.AggregationWithHeadersDeserializer.rawAggregation;
+import static org.apache.kafka.streams.state.internals.Utils.headers;
+import static org.apache.kafka.streams.state.internals.Utils.rawAggregation;
/**
* Change-logging wrapper for a session bytes store whose values also carry
headers.
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStoreWithHeaders.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStoreWithHeaders.java
index bce5dbe0130..68ab181d725 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStoreWithHeaders.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStoreWithHeaders.java
@@ -22,9 +22,9 @@ import org.apache.kafka.streams.state.KeyValueStore;
import java.util.List;
-import static
org.apache.kafka.streams.state.internals.ValueTimestampHeadersDeserializer.headers;
-import static
org.apache.kafka.streams.state.internals.ValueTimestampHeadersDeserializer.rawValue;
-import static
org.apache.kafka.streams.state.internals.ValueTimestampHeadersDeserializer.timestamp;
+import static org.apache.kafka.streams.state.internals.Utils.headers;
+import static org.apache.kafka.streams.state.internals.Utils.rawPlainValue;
+import static org.apache.kafka.streams.state.internals.Utils.timestamp;
/**
* Change-logging wrapper for a timestamped key-value bytes store whose values
also carry headers.
@@ -50,7 +50,7 @@ public class
ChangeLoggingTimestampedKeyValueBytesStoreWithHeaders
wrapped().put(key, valueTimestampHeaders);
log(
key,
- rawValue(valueTimestampHeaders),
+ rawPlainValue(valueTimestampHeaders),
valueTimestampHeaders == null
? internalContext.recordContext().timestamp()
: timestamp(valueTimestampHeaders),
@@ -68,7 +68,7 @@ public class
ChangeLoggingTimestampedKeyValueBytesStoreWithHeaders
// then it was absent
log(
key,
- rawValue(valueTimestampHeaders),
+ rawPlainValue(valueTimestampHeaders),
valueTimestampHeaders == null
? internalContext.recordContext().timestamp()
: timestamp(valueTimestampHeaders),
@@ -87,7 +87,7 @@ public class
ChangeLoggingTimestampedKeyValueBytesStoreWithHeaders
final byte[] valueTimestampHeaders = entry.value;
log(
entry.key,
- rawValue(valueTimestampHeaders),
+ rawPlainValue(valueTimestampHeaders),
valueTimestampHeaders == null
? internalContext.recordContext().timestamp()
: timestamp(valueTimestampHeaders),
@@ -104,4 +104,4 @@ public class
ChangeLoggingTimestampedKeyValueBytesStoreWithHeaders
log(key, null, internalContext.recordContext().timestamp(),
internalContext.recordContext().headers());
return deletedValue;
}
-}
\ No newline at end of file
+}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStoreWithHeaders.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStoreWithHeaders.java
index baf99ecc344..483c5775344 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStoreWithHeaders.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStoreWithHeaders.java
@@ -19,9 +19,9 @@ package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.state.WindowStore;
-import static
org.apache.kafka.streams.state.internals.ValueTimestampHeadersDeserializer.headers;
-import static
org.apache.kafka.streams.state.internals.ValueTimestampHeadersDeserializer.rawValue;
-import static
org.apache.kafka.streams.state.internals.ValueTimestampHeadersDeserializer.timestamp;
+import static org.apache.kafka.streams.state.internals.Utils.headers;
+import static org.apache.kafka.streams.state.internals.Utils.rawPlainValue;
+import static org.apache.kafka.streams.state.internals.Utils.timestamp;
/**
* Change-logging wrapper for window stores that support headers.
@@ -47,7 +47,7 @@ public class
ChangeLoggingTimestampedWindowBytesStoreWithHeaders extends ChangeL
internalContext.logChange(
name(),
key,
- rawValue(valueTimestampHeaders),
+ rawPlainValue(valueTimestampHeaders),
valueTimestampHeaders != null
? timestamp(valueTimestampHeaders)
: internalContext.recordContext().timestamp(),
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java
index f77471ad905..e93aad8c8f2 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java
@@ -52,7 +52,6 @@ import java.util.function.Function;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency;
-import static
org.apache.kafka.streams.state.internals.ValueTimestampHeadersDeserializer.headers;
/**
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreWithHeaders.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreWithHeaders.java
index e88ec3398d5..74aafc87bd4 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreWithHeaders.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreWithHeaders.java
@@ -46,7 +46,6 @@ import java.util.Objects;
import java.util.function.Function;
import static
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency;
-import static
org.apache.kafka.streams.state.internals.ValueTimestampHeadersDeserializer.headers;
/**
* A Metered {@link TimestampedWindowStoreWithHeaders} wrapper that is used
for recording operation metrics,
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionToHeadersStoreAdapter.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionToHeadersStoreAdapter.java
index 4771a86a5f9..db753e52bbe 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionToHeadersStoreAdapter.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionToHeadersStoreAdapter.java
@@ -138,7 +138,7 @@ public class SessionToHeadersStoreAdapter implements
SessionStore<Bytes, byte[]>
@Override
public void put(final Windowed<Bytes> sessionKey, final byte[]
aggregateWithHeader) {
- store.put(sessionKey,
AggregationWithHeadersDeserializer.rawAggregation(aggregateWithHeader));
+ store.put(sessionKey, Utils.rawAggregation(aggregateWithHeader));
}
@Override
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/Utils.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/Utils.java
index abaa2b16376..c67bfc8dfbe 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/Utils.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/Utils.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.utils.ByteUtils;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.Windowed;
@@ -27,6 +28,88 @@ import org.apache.kafka.streams.state.StateSerdes;
import java.nio.ByteBuffer;
public class Utils {
+ private static final LongDeserializer LONG_DESERIALIZER = new
LongDeserializer();
+
+ /**
+ * Check if the input value (with timestamp or not) contains headers size
equal to zero
+ */
+ public static boolean hasEmptyHeaders(final byte[] rawValueHeaders) {
+ return rawValueHeaders.length > 0 && rawValueHeaders[0] == 0x00;
+ }
+
+ /**
+ * Extract headers from serialized value (with timestamp or not) with
headers
+ */
+ public static Headers headers(final byte[] valueWithHeaders) {
+ if (valueWithHeaders == null) {
+ return null;
+ }
+
+ // If the header is empty, simply return it
+ if (hasEmptyHeaders(valueWithHeaders)) {
+ return new RecordHeaders();
+ }
+
+ final ByteBuffer buffer = ByteBuffer.wrap(valueWithHeaders);
+ return readHeaders(buffer);
+ }
+
+ /**
+ * Serialize the key with headers into bytes
+ * @param key the key to serialize
+ * @param headers the Headers as context
+ * @param serdes the StateSerdes as serializer
+ * @return the Bytes of the key
+ */
+ public static <K> Bytes keyBytes(final K key, final Headers headers, final
StateSerdes<K, ?> serdes) {
+ return Bytes.wrap(serdes.rawKey(key, headers));
+ }
+
+ /**
+ * Serialize the key into bytes
+ * @param key the key to serialize
+ * @param serdes the StateSerdes as serializer
+ * @return the Bytes of the key
+ */
+ static <K> Bytes keyBytes(final K key, final StateSerdes<K, ?> serdes) {
+ return keyBytes(key, new RecordHeaders(), serdes);
+ }
+
+ /**
+ * Serialize the session key with headers into bytes
+ * @param sessionKey the Windowed session key to serialize
+ * @param headers the Headers as context
+ * @param serdes the StateSerdes as serializer
+ * @return the Bytes of the key
+ */
+ static <K> Bytes keyBytes(final Windowed<K> sessionKey, final Headers
headers, final StateSerdes<K, ?> serdes) {
+ return keyBytes(sessionKey.key(), headers, serdes);
+ }
+
+ /**
+ * Extract the raw aggregation bytes from serialized
AggregationWithHeaders,
+ * stripping the headers prefix.
+ */
+ public static byte[] rawAggregation(final byte[] aggregationWithHeaders) {
+ if (aggregationWithHeaders == null) {
+ return null;
+ }
+
+ // If the header is empty, then copy the value bytes directly
+ if (hasEmptyHeaders(aggregationWithHeaders)) {
+ // Strip header size's varint byte, and empty headers consume no
bytes
+ final byte[] aggregation = new byte[aggregationWithHeaders.length
- 1];
+ System.arraycopy(aggregationWithHeaders, 1, aggregation, 0,
aggregation.length);
+ return aggregation;
+ }
+
+ final ByteBuffer buffer = ByteBuffer.wrap(aggregationWithHeaders);
+ // Skip the headers bytes without deserializing or copying
+ final int headersSize = ByteUtils.readVarint(buffer);
+ buffer.position(buffer.position() + headersSize);
+ return readBytes(buffer, buffer.remaining());
+ }
+
/**
* Extract raw plain value from serialized ValueTimestampHeaders.
* This strips both the headers and timestamp portions.
@@ -35,11 +118,19 @@ public class Utils {
* Input: [headersSize(varint)][headers][timestamp(8)][value]
* Output: [value]
*/
- static byte[] rawPlainValue(final byte[] rawValueTimestampHeaders) {
+ public static byte[] rawPlainValue(final byte[] rawValueTimestampHeaders) {
if (rawValueTimestampHeaders == null) {
return null;
}
+ // If the header is empty, then copy the value bytes directly
+ if (hasEmptyHeaders(rawValueTimestampHeaders)) {
+ // Strip header size (varint 1 byte), empty headers (no bytes),
and timestamp
+ final byte[] rawValue = new byte[rawValueTimestampHeaders.length -
1 - StateSerdes.TIMESTAMP_SIZE];
+ System.arraycopy(rawValueTimestampHeaders, 1 +
StateSerdes.TIMESTAMP_SIZE, rawValue, 0, rawValue.length);
+ return rawValue;
+ }
+
final ByteBuffer buffer = ByteBuffer.wrap(rawValueTimestampHeaders);
final int headersSize = ByteUtils.readVarint(buffer);
// Skip headers and timestamp (8 bytes)
@@ -58,11 +149,19 @@ public class Utils {
* Input: [headersSize(varint)][headers][timestamp(8)][value]
* Output: [timestamp(8)][value]
*/
- static byte[] rawTimestampedValue(final byte[] rawValueTimestampHeaders) {
+ public static byte[] rawTimestampedValue(final byte[]
rawValueTimestampHeaders) {
if (rawValueTimestampHeaders == null) {
return null;
}
+ // If the header is empty, then copy the value and timestamp bytes
directly
+ if (hasEmptyHeaders(rawValueTimestampHeaders)) {
+ // Strip header size (varint 1 byte), empty headers (no bytes)
+ final byte[] rawValueAndTimestamp = new
byte[rawValueTimestampHeaders.length - 1];
+ System.arraycopy(rawValueTimestampHeaders, 1,
rawValueAndTimestamp, 0, rawValueAndTimestamp.length);
+ return rawValueAndTimestamp;
+ }
+
final ByteBuffer buffer = ByteBuffer.wrap(rawValueTimestampHeaders);
final int headersSize = ByteUtils.readVarint(buffer);
// Skip headers, keep timestamp + value
@@ -88,7 +187,7 @@ public class Utils {
* @return the byte array containing the read bytes
* @throws SerializationException if buffer doesn't have enough bytes or
length is negative
*/
- static byte[] readBytes(final ByteBuffer buffer, final int length) {
+ public static byte[] readBytes(final ByteBuffer buffer, final int length) {
if (length < 0) {
throw new SerializationException(
"Invalid format: negative length " + length
@@ -105,35 +204,29 @@ public class Utils {
return bytes;
}
- /**
- * Serialize the key with headers into bytes
- * @param key the key to serialize
- * @param headers the Headers as context
- * @param serdes the StateSerdes as serializer
- * @return the Bytes of the key
- */
- public static <K> Bytes keyBytes(final K key, final Headers headers, final
StateSerdes<K, ?> serdes) {
- return Bytes.wrap(serdes.rawKey(key, headers));
+ public static Headers readHeaders(final ByteBuffer buffer) {
+ final int headersSize = ByteUtils.readVarint(buffer);
+ final byte[] rawHeaders = readBytes(buffer, headersSize);
+ return HeadersDeserializer.deserialize(rawHeaders);
}
/**
- * Serialize the key into bytes
- * @param key the key to serialize
- * @param serdes the StateSerdes as serializer
- * @return the Bytes of the key
+ * Extract timestamp from serialized ValueTimestampHeaders.
*/
- static <K> Bytes keyBytes(final K key, final StateSerdes<K, ?> serdes) {
- return keyBytes(key, new RecordHeaders(), serdes);
- }
+ public static long timestamp(final byte[] rawValueTimestampHeaders) {
+ // If the headers is empty, then do not need to skip the headers
+ if (hasEmptyHeaders(rawValueTimestampHeaders)) {
+ final byte[] rawTimestamp = new byte[StateSerdes.TIMESTAMP_SIZE];
+ System.arraycopy(rawValueTimestampHeaders, 1, rawTimestamp, 0,
StateSerdes.TIMESTAMP_SIZE);
+ return LONG_DESERIALIZER.deserialize("", rawTimestamp);
+ }
- /**
- * Serialize the session key with headers into bytes
- * @param sessionKey the Windowed session key to serialize
- * @param headers the Headers as context
- * @param serdes the StateSerdes as serializer
- * @return the Bytes of the key
- */
- static <K> Bytes keyBytes(final Windowed<K> sessionKey, final Headers
headers, final StateSerdes<K, ?> serdes) {
- return keyBytes(sessionKey.key(), headers, serdes);
+ final ByteBuffer buffer = ByteBuffer.wrap(rawValueTimestampHeaders);
+ final int headersSize = ByteUtils.readVarint(buffer);
+ buffer.position(buffer.position() + headersSize);
+
+ final byte[] rawTimestamp = readBytes(buffer, Long.BYTES);
+ return LONG_DESERIALIZER.deserialize("", rawTimestamp);
}
+
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueTimestampHeadersDeserializer.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueTimestampHeadersDeserializer.java
index 44e9a62d89e..a0f9acc4c9b 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueTimestampHeadersDeserializer.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueTimestampHeadersDeserializer.java
@@ -48,8 +48,6 @@ import static
org.apache.kafka.streams.state.internals.Utils.readBytes;
* This is used by KIP-1271 to deserialize values with timestamps and headers
from state stores.
*/
class ValueTimestampHeadersDeserializer<V> implements
WrappingNullableDeserializer<ValueTimestampHeaders<V>, Void, V> {
- private static final LongDeserializer LONG_DESERIALIZER = new
LongDeserializer();
-
public final Deserializer<V> valueDeserializer;
private final LongDeserializer timestampDeserializer;
@@ -104,52 +102,6 @@ class ValueTimestampHeadersDeserializer<V> implements
WrappingNullableDeserializ
if (rawValueTimestampHeaders == null) {
return null;
}
-
- final ByteBuffer buffer = ByteBuffer.wrap(rawValueTimestampHeaders);
- final int headersSize = ByteUtils.readVarint(buffer);
- // skip headers plus timestamp
- buffer.position(buffer.position() + headersSize + Long.BYTES);
- final byte[] bytes = readBytes(buffer, buffer.remaining());
-
- return deserializer.deserialize("", bytes);
- }
-
- /**
- * Extract timestamp from serialized ValueTimestampHeaders.
- */
- static long timestamp(final byte[] rawValueTimestampHeaders) {
- final ByteBuffer buffer = ByteBuffer.wrap(rawValueTimestampHeaders);
- final int headersSize = ByteUtils.readVarint(buffer);
- buffer.position(buffer.position() + headersSize);
-
- final byte[] rawTimestamp = readBytes(buffer, Long.BYTES);
- return LONG_DESERIALIZER.deserialize("", rawTimestamp);
- }
-
- /**
- * Extract headers from serialized ValueTimestampHeaders.
- */
- static Headers headers(final byte[] rawValueTimestampHeaders) {
- if (rawValueTimestampHeaders == null) {
- return null;
- }
-
- final ByteBuffer buffer = ByteBuffer.wrap(rawValueTimestampHeaders);
- final int headersSize = ByteUtils.readVarint(buffer);
- final byte[] rawHeaders = readBytes(buffer, headersSize);
- return HeadersDeserializer.deserialize(rawHeaders);
- }
- /**
- * Extract raw value from serialized ValueTimestampHeaders.
- */
- static byte[] rawValue(final byte[] rawValueTimestampHeaders) {
- if (rawValueTimestampHeaders == null) {
- return null;
- }
-
- final ByteBuffer buffer = ByteBuffer.wrap(rawValueTimestampHeaders);
- final int headersSize = ByteUtils.readVarint(buffer);
- buffer.position(buffer.position() + headersSize + Long.BYTES);
- return readBytes(buffer, buffer.remaining());
+ return deserializer.deserialize("",
Utils.rawPlainValue(rawValueTimestampHeaders));
}
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AggregationWithHeadersDeserializerTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AggregationWithHeadersDeserializerTest.java
index d517f7387a2..e301f6d5f19 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AggregationWithHeadersDeserializerTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AggregationWithHeadersDeserializerTest.java
@@ -25,8 +25,12 @@ import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.state.AggregationWithHeaders;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
import java.util.Iterator;
+import java.util.stream.Stream;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -94,26 +98,35 @@ public class AggregationWithHeadersDeserializerTest {
assertThrows(SerializationException.class, () ->
deserializer.deserialize("topic", invalidData));
}
- @Test
+ @ParameterizedTest
+ @MethodSource("headers")
public void shouldExtractHeaders() {
final Long aggregation = 100L;
final Headers headers = new RecordHeaders();
headers.add("key1", "value1".getBytes());
final AggregationWithHeaders<Long> aggregationWithHeaders =
AggregationWithHeaders.make(aggregation, headers);
- final AggregationWithHeadersSerializer<Long> serializer = new
AggregationWithHeadersSerializer<>(Serdes.Long().serializer());
- final byte[] serialized = serializer.serialize("topic",
aggregationWithHeaders);
+ try (final AggregationWithHeadersSerializer<Long> serializer = new
AggregationWithHeadersSerializer<>(Serdes.Long().serializer())) {
+ final byte[] serialized = serializer.serialize("topic",
aggregationWithHeaders);
- final Headers extractedHeaders =
AggregationWithHeadersDeserializer.headers(serialized);
- assertNotNull(extractedHeaders);
+ final Headers extractedHeaders = Utils.headers(serialized);
+ assertNotNull(extractedHeaders);
+
+ final Header header = extractedHeaders.iterator().next();
+ assertEquals("key1", header.key());
+ assertArrayEquals("value1".getBytes(), header.value());
+ }
+ }
- final Header header = extractedHeaders.iterator().next();
- assertEquals("key1", header.key());
- assertArrayEquals("value1".getBytes(), header.value());
+ private static Stream<Arguments> headers() {
+ return Stream.of(
+ new RecordHeaders().add("key1", "value1".getBytes()),
+ new RecordHeaders()
+ ).map(Arguments::of);
}
@Test
public void shouldReturnNullForNullInput() {
- assertNull(AggregationWithHeadersDeserializer.headers(null));
+ assertNull(Utils.headers(null));
}
-}
\ No newline at end of file
+}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBMigratingSessionStoreWithHeadersTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBMigratingSessionStoreWithHeadersTest.java
index 0f5bd979ee3..48ef00a8190 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBMigratingSessionStoreWithHeadersTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBMigratingSessionStoreWithHeadersTest.java
@@ -22,7 +22,6 @@ import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.LongDeserializer;
-import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
@@ -54,7 +53,6 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
public class RocksDBMigratingSessionStoreWithHeadersTest extends
RocksDBStoreTest {
private final Serializer<String> stringSerializer = new StringSerializer();
- private final Serializer<Long> longSerializer = new LongSerializer();
private final Deserializer<Long> longDeserializer = new LongDeserializer();
private final AggregationWithHeadersSerializer<String> aggSerializer =
new AggregationWithHeadersSerializer<>(new StringSerializer());
@@ -520,11 +518,11 @@ public class RocksDBMigratingSessionStoreWithHeadersTest
extends RocksDBStoreTes
}
private void assertMigratedValue(final byte[] value, final String
expectedAggregation) {
- final Headers headers =
AggregationWithHeadersDeserializer.headers(value);
+ final Headers headers = Utils.headers(value);
assertFalse(headers.iterator().hasNext(), "Migrated value should have
empty headers");
assertArrayEquals(
expectedAggregation.getBytes(StandardCharsets.UTF_8),
- AggregationWithHeadersDeserializer.rawAggregation(value),
+ Utils.rawAggregation(value),
"Migrated value should preserve original aggregation: " +
expectedAggregation);
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionToHeadersStoreAdapterTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionToHeadersStoreAdapterTest.java
index 242f404e761..2f3b3ebe65f 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionToHeadersStoreAdapterTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionToHeadersStoreAdapterTest.java
@@ -245,12 +245,12 @@ public class SessionToHeadersStoreAdapterTest {
@Test
public void shouldStripHeadersFromRawAggregationValue() {
- final byte[] result =
AggregationWithHeadersDeserializer.rawAggregation(VALUE_WITH_EMPTY_HEADERS);
+ final byte[] result = Utils.rawAggregation(VALUE_WITH_EMPTY_HEADERS);
assertArrayEquals(RAW_VALUE, result);
}
@Test
public void shouldReturnNullFromRawAggregationValueForNull() {
- assertNull(AggregationWithHeadersDeserializer.rawAggregation(null));
+ assertNull(Utils.rawAggregation(null));
}
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedSessionStoreUpgradeTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedSessionStoreUpgradeTest.java
index d9edd2cacdc..260567f137c 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedSessionStoreUpgradeTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedSessionStoreUpgradeTest.java
@@ -161,18 +161,18 @@ public class TimeOrderedSessionStoreUpgradeTest {
// Verify old data readable with empty headers via lazy migration
byte[] fetch = newStore.fetchSession(key1, 100, 200);
assertNotNull(fetch);
- assertEquals("value1", new
String(AggregationWithHeadersDeserializer.rawAggregation(fetch)));
- assertEquals(0,
AggregationWithHeadersDeserializer.headers(fetch).toArray().length, "Old data
should have empty headers after migration");
+ assertEquals("value1", new String(Utils.rawAggregation(fetch)));
+ assertEquals(0, Utils.headers(fetch).toArray().length, "Old data
should have empty headers after migration");
fetch = newStore.fetchSession(key2, 150, 250);
assertNotNull(fetch);
- assertEquals("value2", new
String(AggregationWithHeadersDeserializer.rawAggregation(fetch)));
- assertEquals(0,
AggregationWithHeadersDeserializer.headers(fetch).toArray().length, "Old data
should have empty headers after migration");
+ assertEquals("value2", new String(Utils.rawAggregation(fetch)));
+ assertEquals(0, Utils.headers(fetch).toArray().length, "Old data
should have empty headers after migration");
fetch = newStore.fetchSession(key3, 200, 300);
assertNotNull(fetch);
- assertEquals("value3", new
String(AggregationWithHeadersDeserializer.rawAggregation(fetch)));
- assertEquals(0,
AggregationWithHeadersDeserializer.headers(fetch).toArray().length, "Old data
should have empty headers after migration");
+ assertEquals("value3", new String(Utils.rawAggregation(fetch)));
+ assertEquals(0, Utils.headers(fetch).toArray().length, "Old data
should have empty headers after migration");
newStore.close();
}
@@ -201,7 +201,7 @@ public class TimeOrderedSessionStoreUpgradeTest {
// Verify old data still accessible
final byte[] fetch = newStore.fetchSession(key1, 100, 200);
assertNotNull(fetch);
- assertEquals("value1", new
String(AggregationWithHeadersDeserializer.rawAggregation(fetch)));
+ assertEquals("value1", new String(Utils.rawAggregation(fetch)));
newStore.close();
}
@@ -230,7 +230,7 @@ public class TimeOrderedSessionStoreUpgradeTest {
// Verify old data still accessible
final byte[] fetch = newStore.fetchSession(key1, 100, 200);
assertNotNull(fetch);
- assertEquals("value1", new
String(AggregationWithHeadersDeserializer.rawAggregation(fetch)));
+ assertEquals("value1", new String(Utils.rawAggregation(fetch)));
newStore.close();
}
@@ -260,15 +260,15 @@ public class TimeOrderedSessionStoreUpgradeTest {
serializeValueWithHeaders("value2".getBytes(), headersWithData));
// Verify values
- assertEquals("value1", new
String(AggregationWithHeadersDeserializer.rawAggregation(store.fetchSession(key1,
100, 200))));
- assertEquals("value2", new
String(AggregationWithHeadersDeserializer.rawAggregation(store.fetchSession(key2,
150, 250))));
+ assertEquals("value1", new
String(Utils.rawAggregation(store.fetchSession(key1, 100, 200))));
+ assertEquals("value2", new
String(Utils.rawAggregation(store.fetchSession(key2, 150, 250))));
// Verify headers for key1 (empty)
- final Headers key1Headers =
AggregationWithHeadersDeserializer.headers(store.fetchSession(key1, 100, 200));
+ final Headers key1Headers = Utils.headers(store.fetchSession(key1,
100, 200));
assertEquals(0, key1Headers.toArray().length);
// Verify headers for key2 (with data)
- final Headers key2Headers =
AggregationWithHeadersDeserializer.headers(store.fetchSession(key2, 150, 250));
+ final Headers key2Headers = Utils.headers(store.fetchSession(key2,
150, 250));
assertEquals(2, key2Headers.toArray().length);
assertEquals("header-value-1", new
String(key2Headers.lastHeader("header-key-1").value()));
assertEquals("header-value-2", new
String(key2Headers.lastHeader("header-key-2").value()));
@@ -323,8 +323,8 @@ public class TimeOrderedSessionStoreUpgradeTest {
// Verify old data readable with empty headers via lazy migration
final byte[] fetch = newStore.fetchSession(key1, 100, 200);
assertNotNull(fetch, "Old data should be readable after upgrade via
DSL supplier path");
- assertEquals("value1", new
String(AggregationWithHeadersDeserializer.rawAggregation(fetch)));
- assertEquals(0,
AggregationWithHeadersDeserializer.headers(fetch).toArray().length,
+ assertEquals("value1", new String(Utils.rawAggregation(fetch)));
+ assertEquals(0, Utils.headers(fetch).toArray().length,
"Old data should have empty headers after migration");
newStore.close();
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/UtilsTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/UtilsTest.java
index 91f39e83a80..631f906ee83 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/UtilsTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/UtilsTest.java
@@ -35,13 +35,16 @@ import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.stream.Stream;
+import static org.apache.kafka.streams.state.internals.Utils.hasEmptyHeaders;
import static org.apache.kafka.streams.state.internals.Utils.rawPlainValue;
import static
org.apache.kafka.streams.state.internals.Utils.rawTimestampedValue;
import static org.apache.kafka.streams.state.internals.Utils.readBytes;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
public class UtilsTest {
private static final String TOPIC = "test-topic";
@@ -52,6 +55,8 @@ public class UtilsTest {
// Header size's varint encoding cannot exceed 5 bytes (see @{link
ByteUtils#readVarint(ByteBuffer)})
private static final int MAX_VARINT_SIZE = 5;
private static final int OVERFLOW_HEADERS_SIZE = (1 + MAX_VARINT_SIZE) +
HEADERS.length + StateSerdes.TIMESTAMP_SIZE + VALUE.length;
+ // 1 byte header size, 0 byte empty headers, and timetsamp
+ private static final int MIN_SIZE = 1 + 0 + StateSerdes.TIMESTAMP_SIZE;
@Test
public void shouldExtractRawPlainValue() {
@@ -75,7 +80,13 @@ public class UtilsTest {
@Test
public void shouldReturnNullForNullRawTimestampedValue() {
- assertNull(rawPlainValue(null));
+ assertNull(rawTimestampedValue(null));
+ }
+
+ @Test
+ public void shouldExtractRawValueWithEmptyHeaders() {
+ final byte[] res =
rawPlainValue(timestampedValueWithEmptyHeaders(VALUE));
+ assertArrayEquals(VALUE, res);
}
@ParameterizedTest
@@ -90,6 +101,19 @@ public class UtilsTest {
assertArrayEquals(timestampedValueOf(value), outputBytes);
}
+ @Test
+ public void testRawTimestampedValueWithEmptyHeaders() {
+ final byte[] input = timestampedValueWithEmptyHeaders(VALUE);
+ final byte[] res = rawTimestampedValue(input);
+ final ByteBuffer buf = ByteBuffer.wrap(res);
+ assertEquals(TIMESTAMP, buf.getLong());
+
+ assertEquals(VALUE.length, buf.remaining());
+ final byte[] resValue = new byte[buf.remaining()];
+ buf.get(resValue);
+ assertArrayEquals(VALUE, resValue);
+ }
+
@ParameterizedTest
@MethodSource("invalidHeaderSizes")
public void testRawTimestampedValueWithInvalidHeadersSize(final int
invalidHeaderSize) {
@@ -120,6 +144,25 @@ public class UtilsTest {
}
}
+ @Test
+ public void testEmptyHeadersAndTimestamp() {
+ final byte[] empty = new byte[MIN_SIZE];
+ empty[0] = (byte) 0x00; // header size
+ assertTrue(hasEmptyHeaders(empty));
+
+ final byte[] nonEmpty = new byte[MIN_SIZE];
+ nonEmpty[0] = (byte) 0x01; // header size
+ assertFalse(hasEmptyHeaders(nonEmpty));
+ }
+
+ @ParameterizedTest
+ @ValueSource(bytes = { 0x10, 0x11 })
+ public void testEmptyHeadersAndTimestampWithInvalidHeaderSizes(final int
invalidSize) {
+ final byte[] invalid = new byte[MIN_SIZE];
+ invalid[0] = (byte) invalidSize; // header size
+ assertFalse(hasEmptyHeaders(invalid));
+ }
+
@Test
public void testReadBytes() {
final ByteBuffer buf = ByteBuffer.wrap(VALUE);
@@ -142,6 +185,26 @@ public class UtilsTest {
assertThrows(SerializationException.class, () -> readBytes(buf, 1));
}
+ private static byte[] timestampedValueWithEmptyHeaders(final byte[] value)
{
+ // header size: 1 byte, emtpy headers: 0 byte, timestamp: 8 bytes,
plain value length
+ final byte[] res = new byte[1 + 0 + StateSerdes.TIMESTAMP_SIZE +
value.length];
+ final ByteBuffer buf = ByteBuffer.wrap(res);
+ buf.put((byte) 0x00); // header size
+ buf.putLong(TIMESTAMP);
+ buf.put(VALUE);
+ return res;
+ }
+
+ private static byte[] timestampedValueWithEmptyHeadersInvalidTimestamp() {
+ // header size: 1 byte, empty headers: 0 bytes, invalid timestamp: 1
byte, value: 1 byte
+ final byte[] invalid = new byte[1 + 0 + 1 + 1];
+ final ByteBuffer buf = ByteBuffer.wrap(invalid);
+ buf.put((byte) 0x00); // header size
+ buf.put((byte) 0x01); // invalid timestamp, should be
StateSerde.TIMESTAMP_SIZE
+ buf.put((byte) 0x02); // plain value, small enough for the whole data
to go under the min data size
+ return invalid;
+ }
+
private static Stream<Arguments> invalidHeaderSizes() {
return Stream.of(-1, Integer.MAX_VALUE, Integer.MIN_VALUE,
OVERFLOW_HEADERS_SIZE).map(Arguments::of);
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ValueTimestampHeadersDeserializerTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ValueTimestampHeadersDeserializerTest.java
index 8a4471580c6..4755eb3257c 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ValueTimestampHeadersDeserializerTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ValueTimestampHeadersDeserializerTest.java
@@ -28,8 +28,12 @@ import org.apache.kafka.streams.state.ValueTimestampHeaders;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
import java.util.Iterator;
+import java.util.stream.Stream;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -202,24 +206,30 @@ public class ValueTimestampHeadersDeserializerTest {
assertNull(value);
}
- @Test
- public void shouldExtractTimestamp() {
- final Headers headers = new RecordHeaders()
- .add("key1", "value1".getBytes());
+ @ParameterizedTest
+ @MethodSource("headers")
+ public void shouldExtractTimestamp(final Headers headers) {
final ValueTimestampHeaders<String> original =
ValueTimestampHeaders.make("test-value", 123456789L, headers);
final byte[] serialized = serializer.serialize(TOPIC, original);
- final long timestamp =
ValueTimestampHeadersDeserializer.timestamp(serialized);
+ final long timestamp = Utils.timestamp(serialized);
assertEquals(123456789L, timestamp);
}
+ private static Stream<Arguments> headers() {
+ return Stream.of(
+ new RecordHeaders().add("key1", "value1".getBytes()),
+ new RecordHeaders()
+ ).map(Arguments::of);
+ }
+
@Test
public void shouldThrowExceptionWhenExtractingTimestampFromNull() {
// ByteBuffer.wrap() throws NullPointerException for null input
assertThrows(NullPointerException.class, () ->
- ValueTimestampHeadersDeserializer.timestamp(null)
+ Utils.timestamp(null)
);
}
@@ -232,7 +242,7 @@ public class ValueTimestampHeadersDeserializerTest {
ValueTimestampHeaders.make("test-value", 123456789L, headers);
final byte[] serialized = serializer.serialize(TOPIC, original);
- final Headers extractedHeaders =
ValueTimestampHeadersDeserializer.headers(serialized);
+ final Headers extractedHeaders = Utils.headers(serialized);
assertNotNull(extractedHeaders);
assertEquals(2, extractedHeaders.toArray().length);
@@ -247,7 +257,7 @@ public class ValueTimestampHeadersDeserializerTest {
ValueTimestampHeaders.make("test-value", 123456789L, headers);
final byte[] serialized = serializer.serialize(TOPIC, original);
- final Headers extractedHeaders =
ValueTimestampHeadersDeserializer.headers(serialized);
+ final Headers extractedHeaders = Utils.headers(serialized);
assertNotNull(extractedHeaders);
assertEquals(0, extractedHeaders.toArray().length);
@@ -255,7 +265,7 @@ public class ValueTimestampHeadersDeserializerTest {
@Test
public void shouldReturnNullWhenExtractingHeadersFromNull() {
- final Headers headers =
ValueTimestampHeadersDeserializer.headers(null);
+ final Headers headers = Utils.headers(null);
assertNull(headers);
}