This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch 4.3
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.3 by this push:
new 20079a3cccb MINOR: Remove boilerplate code from header serializer test
(#21837)
20079a3cccb is described below
commit 20079a3cccbdac9524c8cf3d5950a6ab2ddfe645
Author: Arpit Goyal <[email protected]>
AuthorDate: Sat Mar 28 04:51:06 2026 +0530
MINOR: Remove boilerplate code from header serializer test (#21837)
Introduces HeadersSerializer.serialize(Headers) as a one-line
alternative to the two-phase serialization API (prepareSerialization +
serialize), reducing boilerplate in test code.
Reviewers: Matthias J. Sax <[email protected]>, Nilesh Kumar
<[email protected]>
---
.../streams/state/internals/HeadersSerializer.java | 8 +++++
.../state/internals/HeadersDeserializerTest.java | 34 ++++----------------
.../state/internals/HeadersSerializerTest.java | 36 ++++------------------
.../TimeOrderedSessionStoreUpgradeTest.java | 6 +---
.../TimeOrderedWindowStoreUpgradeTest.java | 6 +---
5 files changed, 22 insertions(+), 68 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/HeadersSerializer.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/HeadersSerializer.java
index 6608a55f67b..efadd05ba11 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/HeadersSerializer.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/HeadersSerializer.java
@@ -61,6 +61,14 @@ class HeadersSerializer {
}
}
+ // for testing
+ static byte[] serialize(final Headers headers) {
+ final PreSerializedHeaders prep = prepareSerialization(headers);
+ final ByteBuffer buffer =
ByteBuffer.allocate(prep.requiredBufferSizeForHeaders);
+ serialize(prep, buffer);
+ return buffer.array();
+ }
+
public static PreSerializedHeaders prepareSerialization(final Headers
headers) {
final Header[] headersArray = (headers == null) ? new Header[0] :
headers.toArray();
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/HeadersDeserializerTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/HeadersDeserializerTest.java
index 451f534c392..78d6cabada0 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/HeadersDeserializerTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/HeadersDeserializerTest.java
@@ -22,7 +22,6 @@ import org.apache.kafka.common.header.internals.RecordHeaders;
import org.junit.jupiter.api.Test;
-import java.nio.ByteBuffer;
import java.util.Iterator;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
@@ -50,8 +49,7 @@ public class HeadersDeserializerTest {
@Test
public void shouldRoundTripEmptyHeaders() {
- final HeadersSerializer.PreSerializedHeaders preSerializedHeaders =
HeadersSerializer.prepareSerialization(new RecordHeaders());
- final byte[] serialized =
HeadersSerializer.serialize(preSerializedHeaders,
ByteBuffer.allocate(0)).array();
+ final byte[] serialized = HeadersSerializer.serialize(new
RecordHeaders());
final Headers deserialized =
HeadersDeserializer.deserialize(serialized);
@@ -63,11 +61,7 @@ public class HeadersDeserializerTest {
public void shouldRoundTripSingleHeader() {
final Headers original = new RecordHeaders()
.add("key1", "value1".getBytes());
- final HeadersSerializer.PreSerializedHeaders preSerializedHeaders =
HeadersSerializer.prepareSerialization(original);
- final byte[] serialized = HeadersSerializer.serialize(
- preSerializedHeaders,
-
ByteBuffer.allocate(preSerializedHeaders.requiredBufferSizeForHeaders)
- ).array();
+ final byte[] serialized = HeadersSerializer.serialize(original);
final Headers deserialized =
HeadersDeserializer.deserialize(serialized);
@@ -86,11 +80,7 @@ public class HeadersDeserializerTest {
.add("key0", "value0".getBytes())
.add("key1", "value1".getBytes())
.add("key2", "value2".getBytes());
- final HeadersSerializer.PreSerializedHeaders preSerializedHeaders =
HeadersSerializer.prepareSerialization(original);
- final byte[] serialized = HeadersSerializer.serialize(
- preSerializedHeaders,
-
ByteBuffer.allocate(preSerializedHeaders.requiredBufferSizeForHeaders)
- ).array();
+ final byte[] serialized = HeadersSerializer.serialize(original);
final Headers deserialized =
HeadersDeserializer.deserialize(serialized);
@@ -109,11 +99,7 @@ public class HeadersDeserializerTest {
public void shouldRoundTripHeaderWithNullValue() {
final Headers original = new RecordHeaders()
.add("key1", null);
- final HeadersSerializer.PreSerializedHeaders preSerializedHeaders =
HeadersSerializer.prepareSerialization(original);
- final byte[] serialized = HeadersSerializer.serialize(
- preSerializedHeaders,
-
ByteBuffer.allocate(preSerializedHeaders.requiredBufferSizeForHeaders)
- ).array();
+ final byte[] serialized = HeadersSerializer.serialize(original);
final Headers deserialized =
HeadersDeserializer.deserialize(serialized);
@@ -130,11 +116,7 @@ public class HeadersDeserializerTest {
public void shouldRoundTripHeaderWithEmptyValue() {
final Headers original = new RecordHeaders()
.add("key1", new byte[0]);
- final HeadersSerializer.PreSerializedHeaders preSerializedHeaders =
HeadersSerializer.prepareSerialization(original);
- final byte[] serialized = HeadersSerializer.serialize(
- preSerializedHeaders,
-
ByteBuffer.allocate(preSerializedHeaders.requiredBufferSizeForHeaders)
- ).array();
+ final byte[] serialized = HeadersSerializer.serialize(original);
final Headers deserialized =
HeadersDeserializer.deserialize(serialized);
@@ -155,11 +137,7 @@ public class HeadersDeserializerTest {
.add("key1", "value1".getBytes())
.add("key2", "value2".getBytes())
.add("key2", "value3".getBytes());
- final HeadersSerializer.PreSerializedHeaders preSerializedHeaders =
HeadersSerializer.prepareSerialization(original);
- final byte[] serialized = HeadersSerializer.serialize(
- preSerializedHeaders,
-
ByteBuffer.allocate(preSerializedHeaders.requiredBufferSizeForHeaders)
- ).array();
+ final byte[] serialized = HeadersSerializer.serialize(original);
final Headers deserialized =
HeadersDeserializer.deserialize(serialized);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/HeadersSerializerTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/HeadersSerializerTest.java
index 259449d2849..a4f53b188cd 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/HeadersSerializerTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/HeadersSerializerTest.java
@@ -22,8 +22,6 @@ import org.apache.kafka.common.header.internals.RecordHeaders;
import org.junit.jupiter.api.Test;
-import java.nio.ByteBuffer;
-
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -34,10 +32,7 @@ public class HeadersSerializerTest {
@Test
public void shouldSerializeNullHeaders() {
- final HeadersSerializer.PreSerializedHeaders preSerializedHeaders =
HeadersSerializer.prepareSerialization(null);
- assertEquals(0, preSerializedHeaders.requiredBufferSizeForHeaders);
-
- final byte[] serialized =
HeadersSerializer.serialize(preSerializedHeaders,
ByteBuffer.allocate(0)).array();
+ final byte[] serialized = HeadersSerializer.serialize(null);
assertNotNull(serialized);
assertEquals(0, serialized.length, "Null headers should serialize to
empty byte array (0 bytes)");
@@ -45,10 +40,7 @@ public class HeadersSerializerTest {
@Test
public void shouldSerializeEmptyHeaders() {
- final HeadersSerializer.PreSerializedHeaders preSerializedHeaders =
HeadersSerializer.prepareSerialization(new RecordHeaders());
- assertEquals(0, preSerializedHeaders.requiredBufferSizeForHeaders);
-
- final byte[] serialized =
HeadersSerializer.serialize(preSerializedHeaders,
ByteBuffer.allocate(0)).array();
+ final byte[] serialized = HeadersSerializer.serialize(new
RecordHeaders());
assertNotNull(serialized);
assertEquals(0, serialized.length, "Empty headers should serialize to
empty byte array (0 bytes)");
@@ -58,12 +50,8 @@ public class HeadersSerializerTest {
public void shouldSerializeSingleHeader() {
final Headers headers = new RecordHeaders()
.add("key1", "value1".getBytes());
- final HeadersSerializer.PreSerializedHeaders preSerializedHeaders =
HeadersSerializer.prepareSerialization(headers);
- final byte[] serialized = HeadersSerializer.serialize(
- preSerializedHeaders,
-
ByteBuffer.allocate(preSerializedHeaders.requiredBufferSizeForHeaders)
- ).array();
+ final byte[] serialized = HeadersSerializer.serialize(headers);
assertNotNull(serialized);
assertTrue(serialized.length > 0);
@@ -84,12 +72,8 @@ public class HeadersSerializerTest {
.add("key0", "value0".getBytes())
.add("key1", "value1".getBytes())
.add("key2", "value2".getBytes());
- final HeadersSerializer.PreSerializedHeaders preSerializedHeaders =
HeadersSerializer.prepareSerialization(headers);
- final byte[] serialized = HeadersSerializer.serialize(
- preSerializedHeaders,
-
ByteBuffer.allocate(preSerializedHeaders.requiredBufferSizeForHeaders)
- ).array();
+ final byte[] serialized = HeadersSerializer.serialize(headers);
assertNotNull(serialized);
assertTrue(serialized.length > 0);
@@ -110,12 +94,8 @@ public class HeadersSerializerTest {
public void shouldSerializeHeaderWithNullValue() {
final Headers headers = new RecordHeaders()
.add("key1", null);
- final HeadersSerializer.PreSerializedHeaders preSerializedHeaders =
HeadersSerializer.prepareSerialization(headers);
- final byte[] serialized = HeadersSerializer.serialize(
- preSerializedHeaders,
-
ByteBuffer.allocate(preSerializedHeaders.requiredBufferSizeForHeaders)
- ).array();
+ final byte[] serialized = HeadersSerializer.serialize(headers);
assertNotNull(serialized);
assertTrue(serialized.length > 0);
@@ -134,12 +114,8 @@ public class HeadersSerializerTest {
public void shouldSerializeHeadersWithEmptyValue() {
final Headers headers = new RecordHeaders()
.add("key1", new byte[0]);
- final HeadersSerializer.PreSerializedHeaders preSerializedHeaders =
HeadersSerializer.prepareSerialization(headers);
- final byte[] serialized = HeadersSerializer.serialize(
- preSerializedHeaders,
-
ByteBuffer.allocate(preSerializedHeaders.requiredBufferSizeForHeaders)
- ).array();
+ final byte[] serialized = HeadersSerializer.serialize(headers);
assertNotNull(serialized);
assertTrue(serialized.length > 0);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedSessionStoreUpgradeTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedSessionStoreUpgradeTest.java
index 260567f137c..260fe0f325d 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedSessionStoreUpgradeTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedSessionStoreUpgradeTest.java
@@ -55,7 +55,6 @@ import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
-import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
@@ -110,10 +109,7 @@ public class TimeOrderedSessionStoreUpgradeTest {
if (value == null) {
return null;
}
- final HeadersSerializer.PreSerializedHeaders preSerializedHeaders =
HeadersSerializer.prepareSerialization(headers);
- final byte[] rawHeaders = HeadersSerializer
- .serialize(preSerializedHeaders,
ByteBuffer.allocate(preSerializedHeaders.requiredBufferSizeForHeaders))
- .array();
+ final byte[] rawHeaders = HeadersSerializer.serialize(headers);
try (final ByteArrayOutputStream baos = new ByteArrayOutputStream();
final DataOutputStream out = new DataOutputStream(baos)) {
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedWindowStoreUpgradeTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedWindowStoreUpgradeTest.java
index df278a35972..9a216ee2b2d 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedWindowStoreUpgradeTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedWindowStoreUpgradeTest.java
@@ -91,11 +91,7 @@ public class TimeOrderedWindowStoreUpgradeTest {
if (value == null) {
return null;
}
- final HeadersSerializer.PreSerializedHeaders preSerializedHeaders =
HeadersSerializer.prepareSerialization(headers);
- final byte[] rawHeaders = HeadersSerializer.serialize(
- preSerializedHeaders,
-
ByteBuffer.allocate(preSerializedHeaders.requiredBufferSizeForHeaders)
- ).array();
+ final byte[] rawHeaders = HeadersSerializer.serialize(headers);
try (final ByteArrayOutputStream baos = new ByteArrayOutputStream();
final DataOutputStream out = new DataOutputStream(baos)) {