This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 0fc80603026 MINOR: perf optimization for header serialization and type
conversion (#21762)
0fc80603026 is described below
commit 0fc806030265f0978644a8ae0d4c6d76c22ab704
Author: Matthias J. Sax <[email protected]>
AuthorDate: Mon Mar 16 12:48:22 2026 -0700
MINOR: perf optimization for header serialization and type conversion
(#21762)
This PR replaces the usage of ByteArrayOutputStreams with ByteBuffers.
Some manual benchmarks show a perf improvement for the value-ts-header
and session-header converters of 3x.
Reviewers: PoAn Yang <[email protected]>, Nilesh Kumar
<[email protected]>, aliehsaeedii <[email protected]>,
Chia-Ping Tsai <[email protected]>
---
.../AggregationWithHeadersSerializer.java | 29 +++---
.../streams/state/internals/HeadersSerializer.java | 106 ++++++++++++++-------
.../streams/state/internals/RecordConverters.java | 67 ++++++-------
.../internals/ValueTimestampHeadersSerializer.java | 33 ++-----
.../state/internals/HeadersDeserializerTest.java | 43 +++++++--
.../state/internals/HeadersSerializerTest.java | 41 ++++++--
.../state/internals/RecordConvertersTest.java | 23 ++++-
7 files changed, 216 insertions(+), 126 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AggregationWithHeadersSerializer.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AggregationWithHeadersSerializer.java
index 1781d1fa970..4792052aa9e 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AggregationWithHeadersSerializer.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AggregationWithHeadersSerializer.java
@@ -16,7 +16,6 @@
*/
package org.apache.kafka.streams.state.internals;
-import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.ByteUtils;
@@ -24,9 +23,7 @@ import
org.apache.kafka.streams.kstream.internals.WrappingNullableSerializer;
import org.apache.kafka.streams.processor.internals.SerdeGetter;
import org.apache.kafka.streams.state.AggregationWithHeaders;
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.Map;
import java.util.Objects;
@@ -75,23 +72,27 @@ class AggregationWithHeadersSerializer<AGG> implements
WrappingNullableSerialize
final byte[] rawAggregation = aggregationSerializer.serialize(topic,
headers, plainAggregation);
+ // Since we can't control the result of the internal serializer, we
make sure that the result
+ // is not null as well.
+ // Serializing non-null values to null can be useful when working with
Optional-like values
+ // where the Optional.empty case is serialized to null.
+ // See the discussion here: https://github.com/apache/kafka/pull/7679
if (rawAggregation == null) {
return null;
}
- final byte[] rawHeaders = HeadersSerializer.serialize(headers);
+ final HeadersSerializer.PreSerializedHeaders preSerializedHeaders =
HeadersSerializer.prepareSerialization(headers);
- try (final ByteArrayOutputStream baos = new ByteArrayOutputStream();
- final DataOutputStream out = new DataOutputStream(baos)) {
+ final int payloadSize =
preSerializedHeaders.requiredBufferSizeForHeaders + rawAggregation.length;
- ByteUtils.writeVarint(rawHeaders.length, out);
- out.write(rawHeaders);
- out.write(rawAggregation);
+ // Format: [headersSize(varint)][headersBytes][value]
+ final ByteBuffer buffer =
ByteBuffer.allocate(ByteUtils.sizeOfVarint(preSerializedHeaders.requiredBufferSizeForHeaders)
+ payloadSize);
+
ByteUtils.writeVarint(preSerializedHeaders.requiredBufferSizeForHeaders,
buffer);
- return baos.toByteArray();
- } catch (final IOException e) {
- throw new SerializationException("Failed to serialize
AggregationWithHeaders on topic: " + topic, e);
- }
+ // empty (byte[0]) for null/empty headers, or
[count][header1][header2]... for non-empty
+ return HeadersSerializer.serialize(preSerializedHeaders, buffer)
+ .put(rawAggregation)
+ .array();
}
@Override
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/HeadersSerializer.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/HeadersSerializer.java
index f8c664c7820..6608a55f67b 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/HeadersSerializer.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/HeadersSerializer.java
@@ -16,14 +16,11 @@
*/
package org.apache.kafka.streams.state.internals;
-import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.utils.ByteUtils;
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
+import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
/**
@@ -48,8 +45,59 @@ import java.nio.charset.StandardCharsets;
*/
class HeadersSerializer {
+ static final class PreSerializedHeaders {
+ final int requiredBufferSizeForHeaders;
+ final byte[][] rawHeaderKeys;
+ final byte[][] rawHeaderValues;
+
+ PreSerializedHeaders(
+ final int requiredBufferSizeForHeaders,
+ final byte[][] rawHeaderKeys,
+ final byte[][] rawHeaderValues
+ ) {
+ this.requiredBufferSizeForHeaders = requiredBufferSizeForHeaders;
+ this.rawHeaderKeys = rawHeaderKeys;
+ this.rawHeaderValues = rawHeaderValues;
+ }
+ }
+
+ public static PreSerializedHeaders prepareSerialization(final Headers
headers) {
+ final Header[] headersArray = (headers == null) ? new Header[0] :
headers.toArray();
+
+ if (headersArray.length == 0) {
+ return new PreSerializedHeaders(0, null, null);
+ }
+
+ // we first compute the size for the buffer we need,
+ // so we can allocate the whole buffer at once later
+
+ // cache to avoid translating String header-keys to byte[] twice
+ final byte[][] serializerHeaderKeys = new byte[headersArray.length][];
+ final byte[][] serializedHeaderValues = new
byte[headersArray.length][];
+
+ // start with varint encoding of header count
+ int requiredBufferSizeForHeaders =
ByteUtils.sizeOfVarint(headersArray.length);
+
+ int i = 0;
+ for (final Header header : headersArray) {
+ serializerHeaderKeys[i] =
header.key().getBytes(StandardCharsets.UTF_8);
+ requiredBufferSizeForHeaders +=
ByteUtils.sizeOfVarint(serializerHeaderKeys[i].length) +
serializerHeaderKeys[i].length;
+
+ serializedHeaderValues[i] = header.value();
+ if (serializedHeaderValues[i] == null) {
+ ++requiredBufferSizeForHeaders;
+ } else {
+ requiredBufferSizeForHeaders +=
ByteUtils.sizeOfVarint(serializedHeaderValues[i].length) +
serializedHeaderValues[i].length;
+ }
+
+ ++i;
+ }
+
+ return new PreSerializedHeaders(requiredBufferSizeForHeaders,
serializerHeaderKeys, serializedHeaderValues);
+ }
+
/**
- * Serializes headers into a byte array using varint encoding per KIP-1271.
+ * Serializes headers into a ByteBuffer using varint encoding per KIP-1271.
* <p>
* The output format is [count][header1][header2]... without a size prefix.
* The size prefix is added by the outer serializer that uses this.
@@ -57,41 +105,31 @@ class HeadersSerializer {
* For null or empty headers, returns an empty byte array (0 bytes)
* instead of encoding headerCount=0 (1 byte).
*
- * @param headers the headers to serialize (can be null)
- * @return the serialized byte array (empty array if headers are null or
empty)
+ * @param preSerializedHeaders the preSerializedHeaders
+ * @param buffer the buffer to write the serialized header into (it's
expected that the buffer position is set correctly)
+ * @return the modified {@code buffer} containing the serializer headers
(empty array if headers are null or empty),
+ * with corresponding advanced position
*/
- public static byte[] serialize(final Headers headers) {
- final Header[] headersArray = (headers == null) ? new Header[0] :
headers.toArray();
-
- if (headersArray.length == 0) {
- return new byte[0];
+ public static ByteBuffer serialize(final PreSerializedHeaders
preSerializedHeaders, final ByteBuffer buffer) {
+ if (preSerializedHeaders.requiredBufferSizeForHeaders == 0) {
+ return buffer;
}
- try (final ByteArrayOutputStream baos = new ByteArrayOutputStream();
- final DataOutputStream out = new DataOutputStream(baos)) {
-
- ByteUtils.writeVarint(headersArray.length, out);
+ final int numberOfHeaders = preSerializedHeaders.rawHeaderKeys.length;
- for (final Header header : headersArray) {
- final byte[] keyBytes =
header.key().getBytes(StandardCharsets.UTF_8);
- final byte[] valueBytes = header.value();
+ ByteUtils.writeVarint(numberOfHeaders, buffer);
+ for (int i = 0; i < numberOfHeaders; ++i) {
+
ByteUtils.writeVarint(preSerializedHeaders.rawHeaderKeys[i].length, buffer);
+ buffer.put(preSerializedHeaders.rawHeaderKeys[i]);
- ByteUtils.writeVarint(keyBytes.length, out);
- out.write(keyBytes);
-
- // Write value length and value bytes (varint + raw bytes)
- // null is represented as -1, encoded as varint
- if (valueBytes == null) {
- ByteUtils.writeVarint(-1, out);
- } else {
- ByteUtils.writeVarint(valueBytes.length, out);
- out.write(valueBytes);
- }
+ if (preSerializedHeaders.rawHeaderValues[i] != null) {
+
ByteUtils.writeVarint(preSerializedHeaders.rawHeaderValues[i].length, buffer);
+ buffer.put(preSerializedHeaders.rawHeaderValues[i]);
+ } else {
+ buffer.put((byte) 0x01); // hardcoded varint encoding for `-1`
}
-
- return baos.toByteArray();
- } catch (final IOException e) {
- throw new SerializationException("Failed to serialize headers", e);
}
+
+ return buffer;
}
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RecordConverters.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RecordConverters.java
index 118bc019b5f..058edad52c8 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RecordConverters.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RecordConverters.java
@@ -17,14 +17,9 @@
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.header.Headers;
-import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.utils.ByteUtils;
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
import java.nio.ByteBuffer;
public final class RecordConverters {
@@ -33,7 +28,7 @@ public final class RecordConverters {
private static final RecordConverter RAW_TO_TIMESTAMED_INSTANCE = record
-> {
final byte[] rawValue = record.value();
final long timestamp = record.timestamp();
- final byte[] recordValue = rawValue == null ? null :
+ final byte[] recordValueWithTimestamp = rawValue == null ? null :
ByteBuffer.allocate(8 + rawValue.length)
.putLong(timestamp)
.put(rawValue)
@@ -45,9 +40,9 @@ public final class RecordConverters {
timestamp,
record.timestampType(),
record.serializedKeySize(),
- record.serializedValueSize(),
+ recordValueWithTimestamp != null ? recordValueWithTimestamp.length
: 0,
record.key(),
- recordValue,
+ recordValueWithTimestamp,
record.headers(),
record.leaderEpoch()
);
@@ -57,7 +52,7 @@ public final class RecordConverters {
final byte[] rawValue = record.value();
// Format: [headersSize(varint)][headersBytes][timestamp(8)][value]
- final byte[] recordValue = reconstructFromRaw(
+ final byte[] recordValueWithTimestampAndHeaders = reconstructFromRaw(
rawValue,
record.timestamp(),
record.headers()
@@ -70,9 +65,9 @@ public final class RecordConverters {
record.timestamp(),
record.timestampType(),
record.serializedKeySize(),
- record.serializedValueSize(),
+ recordValueWithTimestampAndHeaders != null ?
recordValueWithTimestampAndHeaders.length : 0,
record.key(),
- recordValue,
+ recordValueWithTimestampAndHeaders,
record.headers(),
record.leaderEpoch()
);
@@ -86,7 +81,7 @@ public final class RecordConverters {
final byte[] rawValue = record.value();
// Format: [headersSize(varint)][headersBytes][aggregation] (no
timestamp)
- final byte[] recordValue = reconstructSessionFromRaw(
+ final byte[] recordValueWithHeaders = reconstructSessionFromRaw(
rawValue,
record.headers()
);
@@ -98,9 +93,9 @@ public final class RecordConverters {
record.timestamp(),
record.timestampType(),
record.serializedKeySize(),
- record.serializedValueSize(),
+ recordValueWithHeaders != null ? recordValueWithHeaders.length : 0,
record.key(),
- recordValue,
+ recordValueWithHeaders,
record.headers(),
record.leaderEpoch()
);
@@ -133,19 +128,18 @@ public final class RecordConverters {
if (rawValue == null) {
return null;
}
- final byte[] rawHeaders = HeadersSerializer.serialize(headers);
- try (final ByteArrayOutputStream baos = new ByteArrayOutputStream();
- final DataOutputStream out = new DataOutputStream(baos)) {
+ final HeadersSerializer.PreSerializedHeaders preSerializedHeaders =
HeadersSerializer.prepareSerialization(headers);
- ByteUtils.writeVarint(rawHeaders.length, out);
- out.write(rawHeaders);
- out.write(rawValue);
+ final int payloadSize =
preSerializedHeaders.requiredBufferSizeForHeaders + rawValue.length;
- return baos.toByteArray();
- } catch (final IOException e) {
- throw new SerializationException("Failed to reconstruct
AggregationWithHeaders", e);
- }
+ // Format: [headersSize(varint)][headersBytes][value]
+ final ByteBuffer buffer =
ByteBuffer.allocate(ByteUtils.sizeOfVarint(preSerializedHeaders.requiredBufferSizeForHeaders)
+ payloadSize);
+
ByteUtils.writeVarint(preSerializedHeaders.requiredBufferSizeForHeaders,
buffer);
+
+ return HeadersSerializer.serialize(preSerializedHeaders, buffer)
+ .put(rawValue)
+ .array();
}
/**
@@ -161,23 +155,18 @@ public final class RecordConverters {
if (rawValue == null) {
return null;
}
- final byte[] rawTimestamp;
- try (LongSerializer timestampSerializer = new LongSerializer()) {
- rawTimestamp = timestampSerializer.serialize("", timestamp);
- }
- final byte[] rawHeaders = HeadersSerializer.serialize(headers);
- try (final ByteArrayOutputStream baos = new ByteArrayOutputStream();
- final DataOutputStream out = new DataOutputStream(baos)) {
+ final HeadersSerializer.PreSerializedHeaders preSerializedHeaders =
HeadersSerializer.prepareSerialization(headers);
- ByteUtils.writeVarint(rawHeaders.length, out);
- out.write(rawHeaders);
- out.write(rawTimestamp);
- out.write(rawValue);
+ final int payloadSize =
preSerializedHeaders.requiredBufferSizeForHeaders + 8 + rawValue.length;
- return baos.toByteArray();
- } catch (final IOException e) {
- throw new SerializationException("Failed to reconstruct
ValueTimestampHeaders", e);
- }
+ // Format: [headersSize(varint)][headersBytes][timestamp(8)][value]
+ final ByteBuffer buffer =
ByteBuffer.allocate(ByteUtils.sizeOfVarint(preSerializedHeaders.requiredBufferSizeForHeaders)
+ payloadSize);
+
ByteUtils.writeVarint(preSerializedHeaders.requiredBufferSizeForHeaders,
buffer);
+
+ return HeadersSerializer.serialize(preSerializedHeaders, buffer)
+ .putLong(timestamp)
+ .put(rawValue)
+ .array();
}
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueTimestampHeadersSerializer.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueTimestampHeadersSerializer.java
index 1da1ad1f65f..f4e6fe07a22 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueTimestampHeadersSerializer.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueTimestampHeadersSerializer.java
@@ -16,18 +16,14 @@
*/
package org.apache.kafka.streams.state.internals;
-import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.header.Headers;
-import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.ByteUtils;
import org.apache.kafka.streams.kstream.internals.WrappingNullableSerializer;
import org.apache.kafka.streams.processor.internals.SerdeGetter;
import org.apache.kafka.streams.state.ValueTimestampHeaders;
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.Map;
import java.util.Objects;
@@ -51,18 +47,15 @@ import static
org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.i
*/
class ValueTimestampHeadersSerializer<V> implements
WrappingNullableSerializer<ValueTimestampHeaders<V>, Void, V> {
public final Serializer<V> valueSerializer;
- private final LongSerializer timestampSerializer;
ValueTimestampHeadersSerializer(final Serializer<V> valueSerializer) {
Objects.requireNonNull(valueSerializer);
this.valueSerializer = valueSerializer;
- this.timestampSerializer = new LongSerializer();
}
@Override
public void configure(final Map<String, ?> configs, final boolean isKey) {
valueSerializer.configure(configs, isKey);
- timestampSerializer.configure(configs, isKey);
}
@Override
@@ -89,30 +82,24 @@ class ValueTimestampHeadersSerializer<V> implements
WrappingNullableSerializer<V
return null;
}
- final byte[] rawTimestamp = timestampSerializer.serialize(topic,
timestamp);
+ final HeadersSerializer.PreSerializedHeaders preSerializedHeaders =
HeadersSerializer.prepareSerialization(headers);
- // empty (byte[0]) for null/empty headers, or
[count][header1][header2]... for non-empty
- final byte[] rawHeaders = HeadersSerializer.serialize(headers);
+ final int payloadSize =
preSerializedHeaders.requiredBufferSizeForHeaders + 8 + rawValue.length;
// Format: [headersSize(varint)][headersBytes][timestamp(8)][value]
- try (final ByteArrayOutputStream baos = new ByteArrayOutputStream();
- final DataOutputStream out = new DataOutputStream(baos)) {
-
- ByteUtils.writeVarint(rawHeaders.length, out); // headersSize (it
may be 0 due to null/empty headers)
- out.write(rawHeaders); // empty (byte[0])
for null/empty headers, or [count][header1][header2]... for non-empty
- out.write(rawTimestamp); // [timestamp(8)]
- out.write(rawValue); // [value]
+ final ByteBuffer buffer =
ByteBuffer.allocate(ByteUtils.sizeOfVarint(preSerializedHeaders.requiredBufferSizeForHeaders)
+ payloadSize);
+
ByteUtils.writeVarint(preSerializedHeaders.requiredBufferSizeForHeaders,
buffer);
- return baos.toByteArray();
- } catch (final IOException e) {
- throw new SerializationException("Failed to serialize
ValueTimestampHeaders", e);
- }
+ // empty (byte[0]) for null/empty headers, or
[count][header1][header2]... for non-empty
+ return HeadersSerializer.serialize(preSerializedHeaders, buffer)
+ .putLong(timestamp)
+ .put(rawValue)
+ .array();
}
@Override
public void close() {
valueSerializer.close();
- timestampSerializer.close();
}
@Override
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/HeadersDeserializerTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/HeadersDeserializerTest.java
index 8f6e1ce3d7a..451f534c392 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/HeadersDeserializerTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/HeadersDeserializerTest.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.header.internals.RecordHeaders;
import org.junit.jupiter.api.Test;
+import java.nio.ByteBuffer;
import java.util.Iterator;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
@@ -49,8 +50,9 @@ public class HeadersDeserializerTest {
@Test
public void shouldRoundTripEmptyHeaders() {
- final Headers original = new RecordHeaders();
- final byte[] serialized = HeadersSerializer.serialize(original);
+ final HeadersSerializer.PreSerializedHeaders preSerializedHeaders =
HeadersSerializer.prepareSerialization(new RecordHeaders());
+ final byte[] serialized =
HeadersSerializer.serialize(preSerializedHeaders,
ByteBuffer.allocate(0)).array();
+
final Headers deserialized =
HeadersDeserializer.deserialize(serialized);
assertNotNull(deserialized);
@@ -61,7 +63,12 @@ public class HeadersDeserializerTest {
public void shouldRoundTripSingleHeader() {
final Headers original = new RecordHeaders()
.add("key1", "value1".getBytes());
- final byte[] serialized = HeadersSerializer.serialize(original);
+ final HeadersSerializer.PreSerializedHeaders preSerializedHeaders =
HeadersSerializer.prepareSerialization(original);
+ final byte[] serialized = HeadersSerializer.serialize(
+ preSerializedHeaders,
+
ByteBuffer.allocate(preSerializedHeaders.requiredBufferSizeForHeaders)
+ ).array();
+
final Headers deserialized =
HeadersDeserializer.deserialize(serialized);
assertNotNull(deserialized);
@@ -79,8 +86,14 @@ public class HeadersDeserializerTest {
.add("key0", "value0".getBytes())
.add("key1", "value1".getBytes())
.add("key2", "value2".getBytes());
- final byte[] serialized = HeadersSerializer.serialize(original);
+ final HeadersSerializer.PreSerializedHeaders preSerializedHeaders =
HeadersSerializer.prepareSerialization(original);
+ final byte[] serialized = HeadersSerializer.serialize(
+ preSerializedHeaders,
+
ByteBuffer.allocate(preSerializedHeaders.requiredBufferSizeForHeaders)
+ ).array();
+
final Headers deserialized =
HeadersDeserializer.deserialize(serialized);
+
assertNotNull(deserialized);
final Header[] headerArray = deserialized.toArray();
@@ -96,7 +109,12 @@ public class HeadersDeserializerTest {
public void shouldRoundTripHeaderWithNullValue() {
final Headers original = new RecordHeaders()
.add("key1", null);
- final byte[] serialized = HeadersSerializer.serialize(original);
+ final HeadersSerializer.PreSerializedHeaders preSerializedHeaders =
HeadersSerializer.prepareSerialization(original);
+ final byte[] serialized = HeadersSerializer.serialize(
+ preSerializedHeaders,
+
ByteBuffer.allocate(preSerializedHeaders.requiredBufferSizeForHeaders)
+ ).array();
+
final Headers deserialized =
HeadersDeserializer.deserialize(serialized);
assertNotNull(deserialized);
@@ -112,7 +130,12 @@ public class HeadersDeserializerTest {
public void shouldRoundTripHeaderWithEmptyValue() {
final Headers original = new RecordHeaders()
.add("key1", new byte[0]);
- final byte[] serialized = HeadersSerializer.serialize(original);
+ final HeadersSerializer.PreSerializedHeaders preSerializedHeaders =
HeadersSerializer.prepareSerialization(original);
+ final byte[] serialized = HeadersSerializer.serialize(
+ preSerializedHeaders,
+
ByteBuffer.allocate(preSerializedHeaders.requiredBufferSizeForHeaders)
+ ).array();
+
final Headers deserialized =
HeadersDeserializer.deserialize(serialized);
assertNotNull(deserialized);
@@ -132,8 +155,14 @@ public class HeadersDeserializerTest {
.add("key1", "value1".getBytes())
.add("key2", "value2".getBytes())
.add("key2", "value3".getBytes());
- final byte[] serialized = HeadersSerializer.serialize(original);
+ final HeadersSerializer.PreSerializedHeaders preSerializedHeaders =
HeadersSerializer.prepareSerialization(original);
+ final byte[] serialized = HeadersSerializer.serialize(
+ preSerializedHeaders,
+
ByteBuffer.allocate(preSerializedHeaders.requiredBufferSizeForHeaders)
+ ).array();
+
final Headers deserialized =
HeadersDeserializer.deserialize(serialized);
+
assertNotNull(deserialized);
final Header[] headerArray = deserialized.toArray();
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/HeadersSerializerTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/HeadersSerializerTest.java
index 7c791c93984..259449d2849 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/HeadersSerializerTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/HeadersSerializerTest.java
@@ -22,6 +22,8 @@ import org.apache.kafka.common.header.internals.RecordHeaders;
import org.junit.jupiter.api.Test;
+import java.nio.ByteBuffer;
+
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -32,7 +34,10 @@ public class HeadersSerializerTest {
@Test
public void shouldSerializeNullHeaders() {
- final byte[] serialized = HeadersSerializer.serialize(null);
+ final HeadersSerializer.PreSerializedHeaders preSerializedHeaders =
HeadersSerializer.prepareSerialization(null);
+ assertEquals(0, preSerializedHeaders.requiredBufferSizeForHeaders);
+
+ final byte[] serialized =
HeadersSerializer.serialize(preSerializedHeaders,
ByteBuffer.allocate(0)).array();
assertNotNull(serialized);
assertEquals(0, serialized.length, "Null headers should serialize to
empty byte array (0 bytes)");
@@ -40,8 +45,10 @@ public class HeadersSerializerTest {
@Test
public void shouldSerializeEmptyHeaders() {
- final Headers headers = new RecordHeaders();
- final byte[] serialized = HeadersSerializer.serialize(headers);
+ final HeadersSerializer.PreSerializedHeaders preSerializedHeaders =
HeadersSerializer.prepareSerialization(new RecordHeaders());
+ assertEquals(0, preSerializedHeaders.requiredBufferSizeForHeaders);
+
+ final byte[] serialized =
HeadersSerializer.serialize(preSerializedHeaders,
ByteBuffer.allocate(0)).array();
assertNotNull(serialized);
assertEquals(0, serialized.length, "Empty headers should serialize to
empty byte array (0 bytes)");
@@ -51,7 +58,12 @@ public class HeadersSerializerTest {
public void shouldSerializeSingleHeader() {
final Headers headers = new RecordHeaders()
.add("key1", "value1".getBytes());
- final byte[] serialized = HeadersSerializer.serialize(headers);
+ final HeadersSerializer.PreSerializedHeaders preSerializedHeaders =
HeadersSerializer.prepareSerialization(headers);
+
+ final byte[] serialized = HeadersSerializer.serialize(
+ preSerializedHeaders,
+
ByteBuffer.allocate(preSerializedHeaders.requiredBufferSizeForHeaders)
+ ).array();
assertNotNull(serialized);
assertTrue(serialized.length > 0);
@@ -72,7 +84,12 @@ public class HeadersSerializerTest {
.add("key0", "value0".getBytes())
.add("key1", "value1".getBytes())
.add("key2", "value2".getBytes());
- final byte[] serialized = HeadersSerializer.serialize(headers);
+ final HeadersSerializer.PreSerializedHeaders preSerializedHeaders =
HeadersSerializer.prepareSerialization(headers);
+
+ final byte[] serialized = HeadersSerializer.serialize(
+ preSerializedHeaders,
+
ByteBuffer.allocate(preSerializedHeaders.requiredBufferSizeForHeaders)
+ ).array();
assertNotNull(serialized);
assertTrue(serialized.length > 0);
@@ -93,7 +110,12 @@ public class HeadersSerializerTest {
public void shouldSerializeHeaderWithNullValue() {
final Headers headers = new RecordHeaders()
.add("key1", null);
- final byte[] serialized = HeadersSerializer.serialize(headers);
+ final HeadersSerializer.PreSerializedHeaders preSerializedHeaders =
HeadersSerializer.prepareSerialization(headers);
+
+ final byte[] serialized = HeadersSerializer.serialize(
+ preSerializedHeaders,
+
ByteBuffer.allocate(preSerializedHeaders.requiredBufferSizeForHeaders)
+ ).array();
assertNotNull(serialized);
assertTrue(serialized.length > 0);
@@ -112,7 +134,12 @@ public class HeadersSerializerTest {
public void shouldSerializeHeadersWithEmptyValue() {
final Headers headers = new RecordHeaders()
.add("key1", new byte[0]);
- final byte[] serialized = HeadersSerializer.serialize(headers);
+ final HeadersSerializer.PreSerializedHeaders preSerializedHeaders =
HeadersSerializer.prepareSerialization(headers);
+
+ final byte[] serialized = HeadersSerializer.serialize(
+ preSerializedHeaders,
+
ByteBuffer.allocate(preSerializedHeaders.requiredBufferSizeForHeaders)
+ ).array();
assertNotNull(serialized);
assertTrue(serialized.length > 0);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RecordConvertersTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RecordConvertersTest.java
index 93d08c00293..d28334c1cc1 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RecordConvertersTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RecordConvertersTest.java
@@ -27,6 +27,7 @@ import java.nio.ByteBuffer;
import java.util.Optional;
import static
org.apache.kafka.streams.state.internals.RecordConverters.rawValueToHeadersValue;
+import static
org.apache.kafka.streams.state.internals.RecordConverters.rawValueToSessionHeadersValue;
import static
org.apache.kafka.streams.state.internals.RecordConverters.rawValueToTimestampedValue;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
@@ -35,6 +36,7 @@ public class RecordConvertersTest {
private final RecordConverter timestampedValueConverter =
rawValueToTimestampedValue();
private final RecordConverter headersValueConverter =
rawValueToHeadersValue();
+ private final RecordConverter sessionValueConverter =
rawValueToSessionHeadersValue();
@Test
@@ -42,6 +44,7 @@ public class RecordConvertersTest {
final ConsumerRecord<byte[], byte[]> nullValueRecord = new
ConsumerRecord<>("", 0, 0L, new byte[0], null);
assertNull(timestampedValueConverter.convert(nullValueRecord).value());
assertNull(headersValueConverter.convert(nullValueRecord).value());
+ assertNull(sessionValueConverter.convert(nullValueRecord).value());
}
@Test
@@ -66,9 +69,25 @@ public class RecordConvertersTest {
headers, Optional.empty());
// Expected format:
[headersSize(varint)][headersBytes][timestamp(8)][value]
final byte[] expectedValue =
- {50, 2, 20, 104, 101, 97, 100, 101, 114, 45, 107, 101, 121, 24,
104, 101, 97, 100, 101,
- 114, 45, 118, 97, 108, 117, 101, 0, 0, 0, 0, 0, 0, 0, 10, 0};
+ {50, 2, 20, 'h', 'e', 'a', 'd', 'e', 'r', '-', 'k', 'e', 'y', 24,
'h', 'e', 'a', 'd', 'e',
+ 'r', '-', 'v', 'a', 'l', 'u', 'e', 0, 0, 0, 0, 0, 0, 0, 10,
value[0]};
final byte[] actualValue =
headersValueConverter.convert(inputRecord).value();
assertArrayEquals(expectedValue, actualValue);
}
+
+ @Test
+ public void shouldAddHeadersToValueOnConversionWhenValueIsNotNull() {
+ final byte[] value = new byte[1];
+ final Headers headers = new RecordHeaders().add("header-key",
"header-value".getBytes());
+ final ConsumerRecord<byte[], byte[]> inputRecord = new
ConsumerRecord<>(
+ "topic", 1, 0, 0, TimestampType.CREATE_TIME, 0, 0, new byte[0],
value,
+ headers, Optional.empty());
+ // Expected format: [headersSize(varint)][headersBytes][value]
+ final byte[] expectedValue =
+ {50, 2, 20, 'h', 'e', 'a', 'd', 'e', 'r', '-', 'k', 'e', 'y', 24,
'h', 'e', 'a', 'd', 'e',
+ 'r', '-', 'v', 'a', 'l', 'u', 'e', value[0]};
+ final byte[] actualValue =
sessionValueConverter.convert(inputRecord).value();
+ assertArrayEquals(expectedValue, actualValue);
+ }
+
}