This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 5af84cf1e1d KAFKA-20262: Changelog tombstone records drop custom
headers when usi… (#21639)
5af84cf1e1d is described below
commit 5af84cf1e1de679407c34441c1fcc87c8c4de4e1
Author: TengYao Chi <[email protected]>
AuthorDate: Tue Mar 17 20:31:23 2026 +0000
KAFKA-20262: Changelog tombstone records drop custom headers when usi…
(#21639)
When utilizing headers-aware state stores in Kafka Streams, we need to
ensure that any delete operation (remove(), put(key, null), or similar)
handles headers correctly. While a delete is, well, a delete on the
state store itself, it's a tombstone in the changelog topic.
This PR ensures, that tombstones written into changelog topics have the
correct headers set.
Reviewers: Alieh Saeedi <[email protected]>, Bill Bejeck
<[email protected]>, Matthias J. Sax <[email protected]>
---
...ngTimestampedKeyValueBytesStoreWithHeaders.java | 7 +
.../internals/MeteredSessionStoreWithHeaders.java | 79 +++++-
...MeteredTimestampedKeyValueStoreWithHeaders.java | 118 ++++++++-
.../MeteredTimestampedWindowStoreWithHeaders.java | 33 ++-
.../state/internals/MeteredWindowStore.java | 2 +-
.../MeteredSessionStoreWithHeadersTest.java | 6 +-
...redTimestampedKeyValueStoreWithHeadersTest.java | 2 +
...onStoreWithHeadersSerializerSideEffectTest.java | 243 ++++++++++++++++++
...ueStoreWithHeadersSerializerSideEffectTest.java | 279 +++++++++++++++++++++
...owStoreWithHeadersSerializerSideEffectTest.java | 218 ++++++++++++++++
10 files changed, 975 insertions(+), 12 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStoreWithHeaders.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStoreWithHeaders.java
index fe96a6acd16..bce5dbe0130 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStoreWithHeaders.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStoreWithHeaders.java
@@ -97,4 +97,11 @@ public class
ChangeLoggingTimestampedKeyValueBytesStoreWithHeaders
);
}
}
+
+ @Override
+ public byte[] delete(final Bytes key) {
+ final byte[] deletedValue = wrapped().delete(key);
+ log(key, null, internalContext.recordContext().timestamp(),
internalContext.recordContext().headers());
+ return deletedValue;
+ }
}
\ No newline at end of file
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreWithHeaders.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreWithHeaders.java
index 221cf461ba4..b74ebd9ec88 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreWithHeaders.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreWithHeaders.java
@@ -24,6 +24,7 @@ import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.SerdeGetter;
import org.apache.kafka.streams.query.FailureReason;
import org.apache.kafka.streams.query.PositionBound;
@@ -69,10 +70,40 @@ public class MeteredSessionStoreWithHeaders<K, AGG>
public void put(final Windowed<K> sessionKey, final
AggregationWithHeaders<AGG> aggregate) {
Objects.requireNonNull(sessionKey, "sessionKey can't be null");
try {
- final Headers headers = aggregate != null ? aggregate.headers() :
new RecordHeaders();
- final Bytes key = keyBytes(sessionKey, headers, serdes);
- maybeMeasureLatency(() -> wrapped().put(new Windowed<>(key,
sessionKey.window()),
- serdes.rawValue(aggregate, headers)), time, putSensor);
+ maybeMeasureLatency(
+ () -> {
+ if (aggregate == null) {
+ final ProcessorRecordContext currentContext =
internalContext.recordContext();
+
+ // Create new headers object to isolate tombstone
operation from input record
+ final Headers deleteHeaders = new
RecordHeaders(currentContext.headers());
+
+ // Create temporary context with new headers
+ final ProcessorRecordContext temporaryContext = new
ProcessorRecordContext(
+ currentContext.timestamp(),
+ currentContext.offset(),
+ currentContext.partition(),
+ currentContext.topic(),
+ deleteHeaders
+ );
+
+ try {
+ internalContext.setRecordContext(temporaryContext);
+ final Bytes key = keyBytes(sessionKey,
deleteHeaders, serdes);
+ wrapped().put(new Windowed<>(key,
sessionKey.window()), serdes.rawValue(null, deleteHeaders));
+ } finally {
+ // Restore original context
+ internalContext.setRecordContext(currentContext);
+ }
+ } else {
+ final Headers headers = aggregate.headers();
+ final Bytes key = keyBytes(sessionKey, headers,
serdes);
+ wrapped().put(new Windowed<>(key,
sessionKey.window()), serdes.rawValue(aggregate, headers));
+ }
+ },
+ time,
+ putSensor
+ );
maybeRecordE2ELatency();
} catch (final ProcessorStateException e) {
final String message = String.format(e.getMessage(),
sessionKey.key(), aggregate);
@@ -81,6 +112,46 @@ public class MeteredSessionStoreWithHeaders<K, AGG>
}
+ @Override
+ public void remove(final Windowed<K> sessionKey) {
+ Objects.requireNonNull(sessionKey, "sessionKey can't be null");
+ Objects.requireNonNull(sessionKey.key(), "sessionKey.key() can't be
null");
+ Objects.requireNonNull(sessionKey.window(), "sessionKey.window() can't
be null");
+
+ try {
+ maybeMeasureLatency(
+ () -> {
+ final ProcessorRecordContext currentContext =
internalContext.recordContext();
+
+ // Create new headers object to isolate delete operation
from input record
+ final Headers deleteHeaders = new
RecordHeaders(currentContext.headers());
+
+ // Create temporary context with new headers
+ final ProcessorRecordContext temporaryContext = new
ProcessorRecordContext(
+ currentContext.timestamp(),
+ currentContext.offset(),
+ currentContext.partition(),
+ currentContext.topic(),
+ deleteHeaders
+ );
+
+ try {
+ internalContext.setRecordContext(temporaryContext);
+ final Bytes key = keyBytes(sessionKey, deleteHeaders,
serdes);
+ wrapped().remove(new Windowed<>(key,
sessionKey.window()));
+ } finally {
+ // Restore original context
+ internalContext.setRecordContext(currentContext);
+ }
+ },
+ time,
+ removeSensor
+ );
+ } catch (final ProcessorStateException e) {
+ throw new ProcessorStateException(String.format(e.getMessage(),
sessionKey.key()), e);
+ }
+ }
+
@SuppressWarnings("unchecked")
@Override
public <R> QueryResult<R> query(final Query<R> query,
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java
index 86624961650..2346c7646af 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java
@@ -25,6 +25,7 @@ import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.ProcessorStateException;
+import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.SerdeGetter;
import org.apache.kafka.streams.query.KeyQuery;
import org.apache.kafka.streams.query.PositionBound;
@@ -49,6 +50,7 @@ import java.util.function.Function;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency;
+import static
org.apache.kafka.streams.state.internals.ValueTimestampHeadersDeserializer.headers;
/**
@@ -110,8 +112,38 @@ public class MeteredTimestampedKeyValueStoreWithHeaders<K,
V>
final ValueTimestampHeaders<V> value) {
Objects.requireNonNull(key, "key cannot be null");
try {
- final Headers headers = value != null ? value.headers() : new
RecordHeaders();
- maybeMeasureLatency(() -> wrapped().put(keyBytes(key, headers),
serdes.rawValue(value, headers)), time, putSensor);
+ maybeMeasureLatency(
+ () -> {
+ if (value == null) {
+ final ProcessorRecordContext currentContext =
internalContext.recordContext();
+
+ // Create new headers object to isolate tombstone
operation from input record
+ final Headers deleteHeaders = new
RecordHeaders(currentContext.headers());
+
+ // Create temporary context with new headers
+ final ProcessorRecordContext temporaryContext = new
ProcessorRecordContext(
+ currentContext.timestamp(),
+ currentContext.offset(),
+ currentContext.partition(),
+ currentContext.topic(),
+ deleteHeaders
+ );
+
+ try {
+ internalContext.setRecordContext(temporaryContext);
+ wrapped().put(keyBytes(key, deleteHeaders),
serdes.rawValue(null, deleteHeaders));
+ } finally {
+ // Restore original context
+ internalContext.setRecordContext(currentContext);
+ }
+ } else {
+ final Headers headers = value.headers();
+ wrapped().put(keyBytes(key, headers),
serdes.rawValue(value, headers));
+ }
+ },
+ time,
+ putSensor
+ );
maybeRecordE2ELatency();
} catch (final ProcessorStateException e) {
final String message = String.format(e.getMessage(), key, value);
@@ -123,9 +155,35 @@ public class MeteredTimestampedKeyValueStoreWithHeaders<K,
V>
public ValueTimestampHeaders<V> putIfAbsent(final K key,
final ValueTimestampHeaders<V>
value) {
Objects.requireNonNull(key, "key cannot be null");
- final Headers headers = value != null ? value.headers() : new
RecordHeaders();
final ValueTimestampHeaders<V> currentValue = maybeMeasureLatency(
- () -> deserializeValue(wrapped().putIfAbsent(keyBytes(key,
headers), serdes.rawValue(value, headers))),
+ () -> {
+ if (value == null) {
+ final ProcessorRecordContext currentContext =
internalContext.recordContext();
+
+ // Create new headers object to isolate tombstone
operation from input record
+ final Headers deleteHeaders = new
RecordHeaders(currentContext.headers());
+
+ // Create temporary context with new headers
+ final ProcessorRecordContext temporaryContext = new
ProcessorRecordContext(
+ currentContext.timestamp(),
+ currentContext.offset(),
+ currentContext.partition(),
+ currentContext.topic(),
+ deleteHeaders
+ );
+
+ try {
+ internalContext.setRecordContext(temporaryContext);
+ return
deserializeValue(wrapped().putIfAbsent(keyBytes(key, deleteHeaders),
serdes.rawValue(null, deleteHeaders)));
+ } finally {
+ // Restore original context
+ internalContext.setRecordContext(currentContext);
+ }
+ } else {
+ final Headers headers = value.headers();
+ return
deserializeValue(wrapped().putIfAbsent(keyBytes(key, headers),
serdes.rawValue(value, headers)));
+ }
+ },
time,
putIfAbsentSensor
);
@@ -133,6 +191,58 @@ public class MeteredTimestampedKeyValueStoreWithHeaders<K,
V>
return currentValue;
}
+ @Override
+ public void putAll(final java.util.List<KeyValue<K,
ValueTimestampHeaders<V>>> entries) {
+ entries.forEach(entry -> Objects.requireNonNull(entry.key, "key cannot
be null"));
+
+ final boolean hasNullValue = entries.stream().anyMatch(entry ->
entry.value == null);
+
+ if (hasNullValue) {
+ entries.forEach(entry -> put(entry.key, entry.value));
+ } else {
+ // If no null values, use parent's batch optimization
+ super.putAll(entries);
+ }
+ }
+
+ @Override
+ public ValueTimestampHeaders<V> delete(final K key) {
+ Objects.requireNonNull(key, "key cannot be null");
+ try {
+ return maybeMeasureLatency(
+ () -> {
+ final ProcessorRecordContext currentContext =
internalContext.recordContext();
+
+ // Create new headers object to isolate delete operation
from input record
+ final Headers deleteHeaders = new
RecordHeaders(currentContext.headers());
+
+ // Create temporary context with new headers
+ final ProcessorRecordContext temporaryContext = new
ProcessorRecordContext(
+ currentContext.timestamp(),
+ currentContext.offset(),
+ currentContext.partition(),
+ currentContext.topic(),
+ deleteHeaders
+ );
+
+ try {
+ internalContext.setRecordContext(temporaryContext);
+ final byte[] deletedValue =
wrapped().delete(keyBytes(key, deleteHeaders));
+ return deserializeValue(deletedValue);
+ } finally {
+ // Restore original context
+ internalContext.setRecordContext(currentContext);
+ }
+ },
+ time,
+ deleteSensor
+ );
+ } catch (final ProcessorStateException e) {
+ final String message = String.format(e.getMessage(), key);
+ throw new ProcessorStateException(message, e);
+ }
+ }
+
/**
* Executes a query against this store.
*
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreWithHeaders.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreWithHeaders.java
index 96b7fbae9be..e88ec3398d5 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreWithHeaders.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreWithHeaders.java
@@ -24,6 +24,7 @@ import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.SerdeGetter;
import org.apache.kafka.streams.query.FailureReason;
import org.apache.kafka.streams.query.PositionBound;
@@ -45,6 +46,7 @@ import java.util.Objects;
import java.util.function.Function;
import static
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency;
+import static
org.apache.kafka.streams.state.internals.ValueTimestampHeadersDeserializer.headers;
/**
* A Metered {@link TimestampedWindowStoreWithHeaders} wrapper that is used
for recording operation metrics,
@@ -81,10 +83,37 @@ public class MeteredTimestampedWindowStoreWithHeaders<K, V>
@Override
public void put(final K key, final ValueTimestampHeaders<V> value, final
long windowStartTimestamp) {
Objects.requireNonNull(key, "key cannot be null");
- final Headers headers = value == null || value.headers() == null ? new
RecordHeaders() : value.headers();
try {
maybeMeasureLatency(
- () -> wrapped().put(keyBytes(key, headers),
serdes.rawValue(value, headers), windowStartTimestamp),
+ () -> {
+ if (value == null) {
+ // Deletion path
+ final ProcessorRecordContext currentContext =
internalContext.recordContext();
+
+ // Create new headers object to isolate delete
operation from input record
+ final Headers deleteHeaders = new
RecordHeaders(currentContext.headers());
+
+ // Create temporary context with new headers
+ final ProcessorRecordContext temporaryContext = new
ProcessorRecordContext(
+ currentContext.timestamp(),
+ currentContext.offset(),
+ currentContext.partition(),
+ currentContext.topic(),
+ deleteHeaders
+ );
+
+ try {
+ internalContext.setRecordContext(temporaryContext);
+ wrapped().put(keyBytes(key, deleteHeaders), null,
windowStartTimestamp);
+ } finally {
+ // Restore original context
+ internalContext.setRecordContext(currentContext);
+ }
+ } else {
+ final Headers headers = value.headers() == null ? new
RecordHeaders() : value.headers();
+ wrapped().put(keyBytes(key, headers),
serdes.rawValue(value, headers), windowStartTimestamp);
+ }
+ },
time,
putSensor
);
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
index 2658772b7bd..8df7fbe161e 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
@@ -79,7 +79,7 @@ public class MeteredWindowStore<K, V>
private Sensor flushSensor;
private Sensor e2eLatencySensor;
protected Sensor iteratorDurationSensor;
- private InternalProcessorContext<?, ?> internalContext;
+ protected InternalProcessorContext<?, ?> internalContext;
private TaskId taskId;
private Sensor restoreSensor;
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreWithHeadersTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreWithHeadersTest.java
index 4d6aaa8e2d7..f95254237d6 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreWithHeadersTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreWithHeadersTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.state.internals;
+import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
@@ -36,6 +37,7 @@ import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.SessionWindow;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.query.FailureReason;
import org.apache.kafka.streams.query.PositionBound;
@@ -136,6 +138,8 @@ public class MeteredSessionStoreWithHeadersTest {
when(context.taskId()).thenReturn(taskId);
when(context.changelogFor(STORE_NAME)).thenReturn(CHANGELOG_TOPIC);
when(innerStore.name()).thenReturn(STORE_NAME);
+ lenient().when(context.recordContext()).thenReturn(new
ProcessorRecordContext(
+ 0L, 0L, 0, "topic", new RecordHeaders()));
}
private void init() {
@@ -470,7 +474,7 @@ public class MeteredSessionStoreWithHeadersTest {
store.recordRestoreTime(100L);
- final Map<MetricName, ? extends org.apache.kafka.common.Metric>
allMetrics = metrics.metrics();
+ final Map<MetricName, ? extends Metric> allMetrics = metrics.metrics();
final List<MetricName> restoreMetrics = allMetrics.keySet().stream()
.filter(metricName -> metricName.name().equals("restore-rate"))
.collect(Collectors.toList());
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeadersTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeadersTest.java
index 851891ecced..fc340282988 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeadersTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeadersTest.java
@@ -128,6 +128,8 @@ public class MeteredTimestampedKeyValueStoreWithHeadersTest
{
when(context.changelogFor(STORE_NAME)).thenReturn(CHANGELOG_TOPIC);
when(inner.name()).thenReturn(STORE_NAME);
when(context.appConfigs()).thenReturn(CONFIGS);
+ lenient().when(context.recordContext()).thenReturn(new
org.apache.kafka.streams.processor.internals.ProcessorRecordContext(
+ 0L, 0L, 0, "topic", new
org.apache.kafka.common.header.internals.RecordHeaders()));
}
@SuppressWarnings({"unchecked", "rawtypes"})
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionStoreWithHeadersSerializerSideEffectTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionStoreWithHeadersSerializerSideEffectTest.java
new file mode 100644
index 00000000000..b5eda458c78
--- /dev/null
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionStoreWithHeadersSerializerSideEffectTest.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.TestOutputTopic;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.internals.SessionWindow;
+import org.apache.kafka.streams.processor.api.ContextualProcessor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.state.AggregationWithHeaders;
+import org.apache.kafka.streams.state.SessionStoreWithHeaders;
+import org.apache.kafka.streams.state.Stores;
+
+import org.junit.jupiter.api.Test;
+
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.Properties;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+/**
+ * Test to verify that key serializers can modify headers as a side-effect,
+ * and that this side-effect makes it into the changelog topic for session
stores.
+ *
+ * This test verifies the core assumption of the headers-aware state store
implementation:
+ * when we create a temporary context with new headers and serialize the key,
the key
+ * serializer will add metadata to those headers, and those headers
+ * will be used when logging the change to the changelog topic.
+ */
+public class SessionStoreWithHeadersSerializerSideEffectTest {
+
+ private static final String STORE_NAME = "test-session-store";
+ private static final String INPUT_TOPIC = "input";
+ private static final String OUTPUT_TOPIC = "output";
+
+ /**
+ * Custom serializer that adds a header during serialization as a
side-effect.
+ * This simulates real-world serializers that add metadata to headers.
+ */
+ private static class HeaderAddingSerializer implements Serializer<String> {
+ @Override
+ public byte[] serialize(final String topic, final String data) {
+ return data == null ? null : data.getBytes(StandardCharsets.UTF_8);
+ }
+
+ @Override
+ public byte[] serialize(final String topic, final Headers headers,
final String data) {
+ headers.add("serializer-metadata",
"session-test-value".getBytes(StandardCharsets.UTF_8));
+ return serialize(topic, data);
+ }
+ }
+
+ private static class HeaderAddingSerde implements Serde<String> {
+ @Override
+ public Serializer<String> serializer() {
+ return new HeaderAddingSerializer();
+ }
+
+ @Override
+ public Deserializer<String> deserializer() {
+ return Serdes.String().deserializer();
+ }
+ }
+
+ /**
+ * Processor that puts and removes from a session store with headers.
+ * Uses command values to test different deletion methods:
+ * - "remove" → uses remove() API
+ * - "put(null)" → uses put(sessionKey, null) API
+ * - other values → normal put operation
+ */
+ private static class SessionStoreProcessor extends
ContextualProcessor<String, String, String, String> {
+ private SessionStoreWithHeaders<String, String> store;
+
+ @Override
+ public void init(final ProcessorContext<String, String> context) {
+ super.init(context);
+ store = context.getStateStore(STORE_NAME);
+ }
+
+ @Override
+ public void process(final Record<String, String> record) {
+ final long timestamp = record.timestamp();
+ final Windowed<String> sessionKey = new Windowed<>(
+ record.key(),
+ new SessionWindow(timestamp, timestamp)
+ );
+
+ if ("remove".equals(record.value())) {
+ store.remove(sessionKey);
+ } else if ("put(null)".equals(record.value())) {
+ store.put(sessionKey, null);
+ } else {
+ store.put(
+ sessionKey,
+ AggregationWithHeaders.make(record.value(),
record.headers())
+ );
+ }
+
+ context().forward(record);
+ }
+ }
+
+ @Test
+ public void shouldPropagateSerializerHeaderSideEffectToChangelog() {
+ final StreamsBuilder builder = new StreamsBuilder();
+
+ // Create a session store with headers using our custom serializer
+ builder.addStateStore(
+ Stores.sessionStoreBuilderWithHeaders(
+ Stores.inMemorySessionStore(
+ STORE_NAME,
+ Duration.ofMillis(10000L)
+ ),
+ new HeaderAddingSerde(), // Custom key serializer that adds
headers
+ Serdes.String()
+ )
+ );
+
+ // Add a processor that uses the store and forwards to output
+ builder.stream(INPUT_TOPIC, Consumed.with(Serdes.String(),
Serdes.String()))
+ .process(SessionStoreProcessor::new, STORE_NAME)
+ .to(OUTPUT_TOPIC);
+
+ final Properties props = new Properties();
+ props.put("application.id", "test-session-app");
+ props.put("default.key.serde", Serdes.StringSerde.class);
+ props.put("default.value.serde", Serdes.StringSerde.class);
+
+ try (TopologyTestDriver driver = new
TopologyTestDriver(builder.build(), props)) {
+ final TestInputTopic<String, String> inputTopic =
driver.createInputTopic(
+ INPUT_TOPIC,
+ Serdes.String().serializer(),
+ Serdes.String().serializer()
+ );
+
+ final String changelogTopic = "test-session-app-" + STORE_NAME +
"-changelog";
+ final TestOutputTopic<String, String> changelogOutputTopic =
+ driver.createOutputTopic(
+ changelogTopic,
+ Serdes.String().deserializer(),
+ Serdes.String().deserializer()
+ );
+
+ // Create output topic reader (using regular StringSerde, not
HeaderAddingSerde)
+ final TestOutputTopic<String, String> outputTopic =
+ driver.createOutputTopic(
+ OUTPUT_TOPIC,
+ Serdes.String().deserializer(),
+ Serdes.String().deserializer()
+ );
+
+ inputTopic.pipeInput("key1", "value1", 1000L);
+
+ // Verify changelog has the put record with header
+ final var putChangelogRecord = changelogOutputTopic.readRecord();
+ assertNotNull(putChangelogRecord.key());
+ assertEquals("value1", putChangelogRecord.value());
+
+ // Verify the serializer added metadata header to changelog
+ final Header putMetadataHeader =
putChangelogRecord.headers().lastHeader("serializer-metadata");
+ assertNotNull(putMetadataHeader, "metadata header should be
present in changelog put record");
+ assertEquals("session-test-value", new
String(putMetadataHeader.value(), StandardCharsets.UTF_8));
+
+ final var putOutputRecord = outputTopic.readRecord();
+ assertEquals("key1", putOutputRecord.key());
+ assertEquals("value1", putOutputRecord.value());
+ final Header outputMetadataHeader =
putOutputRecord.headers().lastHeader("serializer-metadata");
+ assertNotNull(outputMetadataHeader, "Output record should contain
serializer-metadata header for normal put operations");
+ assertEquals("session-test-value", new
String(outputMetadataHeader.value(), StandardCharsets.UTF_8));
+
+ inputTopic.pipeInput("key1", "remove", 1000L);
+
+ // Verify changelog has the delete record (tombstone) with header
+ final var removeChangelogRecord =
changelogOutputTopic.readRecord();
+ assertNotNull(removeChangelogRecord.key());
+ assertNull(removeChangelogRecord.value(), "remove() should produce
tombstone");
+
+ // Verify the serializer's side-effect made it into the changelog
+ final Header removeMetadataHeader =
removeChangelogRecord.headers().lastHeader("serializer-metadata");
+ assertNotNull(removeMetadataHeader, "metadata header should be
present in changelog tombstone from remove()");
+ assertEquals("session-test-value", new
String(removeMetadataHeader.value(), StandardCharsets.UTF_8));
+
+ // CRITICAL: Verify output tombstone does NOT contain the
serializer-added header
+ final var removeOutputRecord = outputTopic.readRecord();
+ assertEquals("key1", removeOutputRecord.key());
+ assertEquals("remove", removeOutputRecord.value());
+ final Header outputRemoveMetadataHeader =
removeOutputRecord.headers().lastHeader("serializer-metadata");
+ assertNull(outputRemoveMetadataHeader, "Output record should not
contain serializer-metadata header - side-effect should be isolated to
changelog");
+
+ inputTopic.pipeInput("key2", "value2", 2000L);
+ changelogOutputTopic.readRecord();
+ outputTopic.readRecord();
+
+ inputTopic.pipeInput("key2", "put(null)", 2000L);
+
+ // Verify changelog has the delete record (tombstone) with header
+ final var putNullChangelogRecord =
changelogOutputTopic.readRecord();
+ assertNotNull(putNullChangelogRecord.key());
+ assertNull(putNullChangelogRecord.value(), "put(null) should
produce tombstone");
+
+ // Verify the serializer's side-effect made it into the changelog
+ final Header putNullMetadataHeader =
putNullChangelogRecord.headers().lastHeader("serializer-metadata");
+ assertNotNull(putNullMetadataHeader, "metadata header should be
present in changelog tombstone from put(null)");
+ assertEquals("session-test-value", new
String(putNullMetadataHeader.value(), StandardCharsets.UTF_8));
+
+ // CRITICAL: Verify output tombstone does NOT contain the
serializer-added header
+ final var putNullOutputRecord = outputTopic.readRecord();
+ assertEquals("key2", putNullOutputRecord.key());
+ assertEquals("put(null)", putNullOutputRecord.value());
+ final Header outputPutNullMetadataHeader =
putNullOutputRecord.headers().lastHeader("serializer-metadata");
+ assertNull(outputPutNullMetadataHeader, "Output record should not
contain serializer-metadata header - side-effect should be isolated to
changelog");
+ }
+ }
+}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreWithHeadersSerializerSideEffectTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreWithHeadersSerializerSideEffectTest.java
new file mode 100644
index 00000000000..918e3e37d7a
--- /dev/null
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreWithHeadersSerializerSideEffectTest.java
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.TestOutputTopic;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.processor.api.ContextualProcessor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.TimestampedKeyValueStoreWithHeaders;
+import org.apache.kafka.streams.state.ValueTimestampHeaders;
+
+import org.junit.jupiter.api.Test;
+
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Properties;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+/**
+ * Test to verify that key serializers can modify headers as a side-effect,
+ * and that this side-effect makes it into the changelog topic.
+ *
+ * This test verifies the core assumption of the headers-aware state store
implementation:
+ * when we create a temporary context with new headers and serialize the key,
the key
+ * serializer will add metadata to those headers, and those headers
+ * will be used when logging the change to the changelog topic.
+ */
+public class TimestampedKeyValueStoreWithHeadersSerializerSideEffectTest {
+
+ private static final String STORE_NAME = "test-store";
+ private static final String INPUT_TOPIC = "input";
+ private static final String OUTPUT_TOPIC = "output";
+
+ /**
+ * Custom serializer that adds a header during serialization as a
side-effect.
+ * This simulates real-world serializers that add metadata to headers.
+ */
+ private static class HeaderAddingSerializer implements Serializer<String> {
+ @Override
+ public byte[] serialize(final String topic, final String data) {
+ return data == null ? null : data.getBytes(StandardCharsets.UTF_8);
+ }
+
+ @Override
+ public byte[] serialize(final String topic, final Headers headers,
final String data) {
+ // Add metadata header during serialization (side-effect)
+ headers.add("serializer-metadata",
"test-value".getBytes(StandardCharsets.UTF_8));
+ return serialize(topic, data);
+ }
+ }
+
+ private static class HeaderAddingSerde implements Serde<String> {
+ @Override
+ public Serializer<String> serializer() {
+ return new HeaderAddingSerializer();
+ }
+
+ @Override
+ public Deserializer<String> deserializer() {
+ return Serdes.String().deserializer();
+ }
+ }
+
+ /**
+ * Processor that puts and deletes from a timestamped key-value store with
headers.
+ * Uses command values to test different deletion methods:
+ * - "delete" → uses delete() API
+ * - "put(null)" → uses put(key, null) API
+ * - "putIfAbsent(null)" → uses putIfAbsent(key, null) API
+ * - "putAll(null)" → uses putAll() with null value
+ * - other values → normal put operation
+ */
+ private static class StoreProcessor extends ContextualProcessor<String,
String, String, String> {
+ private TimestampedKeyValueStoreWithHeaders<String, String> store;
+
+ @Override
+ public void init(final ProcessorContext<String, String> context) {
+ super.init(context);
+ store = context.getStateStore(STORE_NAME);
+ }
+
+ @Override
+ public void process(final Record<String, String> record) {
+ if ("delete".equals(record.value())) {
+ store.delete(record.key());
+ } else if ("put(null)".equals(record.value())) {
+ store.put(record.key(), null);
+ } else if ("putIfAbsent(null)".equals(record.value())) {
+ store.putIfAbsent(record.key(), null);
+ } else if ("putAll(null)".equals(record.value())) {
+ store.putAll(List.of(KeyValue.pair(record.key(), null)));
+ } else {
+ store.put(record.key(),
ValueTimestampHeaders.make(record.value(), record.timestamp(),
record.headers()));
+ }
+
+ context().forward(record);
+ }
+ }
+
+ @Test
+ public void shouldPropagateSerializerHeaderSideEffectToChangelog() {
+ final StreamsBuilder builder = new StreamsBuilder();
+
+ // Create a timestamped key-value store with headers using our custom
serializer
+ builder.addStateStore(
+ Stores.timestampedKeyValueStoreBuilderWithHeaders(
+ Stores.inMemoryKeyValueStore(STORE_NAME),
+ new HeaderAddingSerde(), // Custom key serializer that adds
headers
+ Serdes.String()
+ )
+ );
+
+ // Add a processor that uses the store and forwards to output
+ builder.stream(INPUT_TOPIC, Consumed.with(Serdes.String(),
Serdes.String()))
+ .process(StoreProcessor::new, STORE_NAME)
+ .to(OUTPUT_TOPIC);
+
+ final Properties props = new Properties();
+ props.put("application.id", "test-app");
+ props.put("default.key.serde", Serdes.StringSerde.class);
+ props.put("default.value.serde", Serdes.StringSerde.class);
+
+ try (TopologyTestDriver driver = new
TopologyTestDriver(builder.build(), props)) {
+ final TestInputTopic<String, String> inputTopic =
driver.createInputTopic(
+ INPUT_TOPIC,
+ Serdes.String().serializer(),
+ Serdes.String().serializer()
+ );
+
+ final String changelogTopic = "test-app-" + STORE_NAME +
"-changelog";
+ final TestOutputTopic<String, String> changelogOutputTopic =
+ driver.createOutputTopic(
+ changelogTopic,
+ Serdes.String().deserializer(),
+ Serdes.String().deserializer()
+ );
+
+ final TestOutputTopic<String, String> outputTopic =
+ driver.createOutputTopic(
+ OUTPUT_TOPIC,
+ Serdes.String().deserializer(),
+ Serdes.String().deserializer()
+ );
+
+ inputTopic.pipeInput("key1", "value1");
+
+ // Verify changelog has the put record with header
+ final var putChangelogRecord = changelogOutputTopic.readRecord();
+ assertEquals("key1", putChangelogRecord.key());
+ assertEquals("value1", putChangelogRecord.value());
+
+ // Verify the serializer added metadata header to changelog
+ final Header putMetadataHeader =
putChangelogRecord.headers().lastHeader("serializer-metadata");
+ assertNotNull(putMetadataHeader, "metadata header should be
present in changelog put record");
+ assertEquals("test-value", new String(putMetadataHeader.value(),
StandardCharsets.UTF_8));
+
+ // Verify output record - for normal put, serializer side-effects
ARE expected in output
+ final var putOutputRecord = outputTopic.readRecord();
+ assertEquals("key1", putOutputRecord.key());
+ assertEquals("value1", putOutputRecord.value());
+ final Header outputMetadataHeader =
putOutputRecord.headers().lastHeader("serializer-metadata");
+ assertNotNull(outputMetadataHeader, "Output record SHOULD contain
serializer-metadata header for normal put operations");
+ assertEquals("test-value", new
String(outputMetadataHeader.value(), StandardCharsets.UTF_8));
+
+ inputTopic.pipeInput("key1", "delete");
+
+ // Verify changelog has the delete record (tombstone) with header
+ final var deleteChangelogRecord =
changelogOutputTopic.readRecord();
+ assertEquals("key1", deleteChangelogRecord.key());
+ assertNull(deleteChangelogRecord.value(), "delete() should produce
tombstone");
+
+ // Verify the serializer's side-effect made it into the changelog
+ final Header deleteMetadataHeader =
deleteChangelogRecord.headers().lastHeader("serializer-metadata");
+ assertNotNull(deleteMetadataHeader, "metadata header should be
present in changelog tombstone from delete()");
+ assertEquals("test-value", new
String(deleteMetadataHeader.value(), StandardCharsets.UTF_8));
+
+ // CRITICAL: Verify output record does NOT contain the
serializer-added header
+ final var deleteOutputRecord = outputTopic.readRecord();
+ assertEquals("key1", deleteOutputRecord.key());
+ assertEquals("delete", deleteOutputRecord.value());
+ final Header outputDeleteMetadataHeader =
deleteOutputRecord.headers().lastHeader("serializer-metadata");
+ assertNull(outputDeleteMetadataHeader, "Output record should not
contain serializer-metadata header - side-effect should be isolated to
changelog");
+
+ inputTopic.pipeInput("key2", "value2");
+ changelogOutputTopic.readRecord();
+ outputTopic.readRecord();
+
+ inputTopic.pipeInput("key2", "put(null)");
+
+ // Verify changelog has the delete record (tombstone) with header
+ final var putNullChangelogRecord =
changelogOutputTopic.readRecord();
+ assertEquals("key2", putNullChangelogRecord.key());
+ assertNull(putNullChangelogRecord.value(), "put(null) should
produce tombstone");
+
+ // Verify the serializer's side-effect made it into the changelog
+ final Header putNullMetadataHeader =
putNullChangelogRecord.headers().lastHeader("serializer-metadata");
+ assertNotNull(putNullMetadataHeader, "metadata header should be
present in changelog tombstone from put(null)");
+ assertEquals("test-value", new
String(putNullMetadataHeader.value(), StandardCharsets.UTF_8));
+
+ // CRITICAL: Verify output record does NOT contain the
serializer-added header
+ final var putNullOutputRecord = outputTopic.readRecord();
+ assertEquals("key2", putNullOutputRecord.key());
+ assertEquals("put(null)", putNullOutputRecord.value());
+ final Header outputPutNullMetadataHeader =
putNullOutputRecord.headers().lastHeader("serializer-metadata");
+ assertNull(outputPutNullMetadataHeader, "Output record should not
contain serializer-metadata header - side-effect should be isolated to
changelog");
+
+ inputTopic.pipeInput("key3", "putIfAbsent(null)");
+
+ // Verify changelog has the delete record (tombstone) with header
+ final var putIfAbsentNullChangelogRecord =
changelogOutputTopic.readRecord();
+ assertEquals("key3", putIfAbsentNullChangelogRecord.key());
+ assertNull(putIfAbsentNullChangelogRecord.value(),
"putIfAbsent(null) should produce tombstone");
+
+ // Verify the serializer's side-effect made it into the changelog
+ final Header putIfAbsentNullMetadataHeader =
putIfAbsentNullChangelogRecord.headers().lastHeader("serializer-metadata");
+ assertNotNull(putIfAbsentNullMetadataHeader, "metadata header
should be present in changelog tombstone from putIfAbsent(null)");
+ assertEquals("test-value", new
String(putIfAbsentNullMetadataHeader.value(), StandardCharsets.UTF_8));
+
+ // CRITICAL: Verify output record does NOT contain the
serializer-added header
+ final var putIfAbsentNullOutputRecord = outputTopic.readRecord();
+ assertEquals("key3", putIfAbsentNullOutputRecord.key());
+ assertEquals("putIfAbsent(null)",
putIfAbsentNullOutputRecord.value());
+ final Header outputPutIfAbsentNullMetadataHeader =
putIfAbsentNullOutputRecord.headers().lastHeader("serializer-metadata");
+ assertNull(outputPutIfAbsentNullMetadataHeader, "Output record
should not contain serializer-metadata header - side-effect should be isolated
to changelog");
+
+ inputTopic.pipeInput("key4", "value4");
+ changelogOutputTopic.readRecord();
+ outputTopic.readRecord();
+
+ inputTopic.pipeInput("key4", "putAll(null)");
+
+ // Verify changelog has the delete record (tombstone) with header
+ final var putAllNullChangelogRecord =
changelogOutputTopic.readRecord();
+ assertEquals("key4", putAllNullChangelogRecord.key());
+ assertNull(putAllNullChangelogRecord.value(), "putAll(null) should
produce tombstone");
+
+ // Verify the serializer's side-effect made it into the changelog
+ final Header putAllNullMetadataHeader =
putAllNullChangelogRecord.headers().lastHeader("serializer-metadata");
+ assertNotNull(putAllNullMetadataHeader, "metadata header should be
present in changelog tombstone from putAll(null)");
+ assertEquals("test-value", new
String(putAllNullMetadataHeader.value(), StandardCharsets.UTF_8));
+
+ // CRITICAL: Verify output record does NOT contain the
serializer-added header
+ final var putAllNullOutputRecord = outputTopic.readRecord();
+ assertEquals("key4", putAllNullOutputRecord.key());
+ assertEquals("putAll(null)", putAllNullOutputRecord.value());
+ final Header outputPutAllNullMetadataHeader =
putAllNullOutputRecord.headers().lastHeader("serializer-metadata");
+ assertNull(outputPutAllNullMetadataHeader, "Output record should
not contain serializer-metadata header - side-effect should be isolated to
changelog");
+ }
+ }
+}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreWithHeadersSerializerSideEffectTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreWithHeadersSerializerSideEffectTest.java
new file mode 100644
index 00000000000..d3b661df17e
--- /dev/null
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreWithHeadersSerializerSideEffectTest.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.TestOutputTopic;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.processor.api.ContextualProcessor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.TimestampedWindowStoreWithHeaders;
+import org.apache.kafka.streams.state.ValueTimestampHeaders;
+import org.apache.kafka.test.TestUtils;
+
+import org.junit.jupiter.api.Test;
+
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.Properties;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+/**
+ * Test to verify that key serializers can modify headers as a side-effect,
+ * and that this side-effect makes it into the changelog topic for window
stores.
+ *
+ * This test verifies the core assumption of the headers-aware state store
implementation:
+ * when we create a temporary context with new headers and serialize the key,
the key
+ * serializer will add metadata to those headers, and those headers
+ * will be used when logging the change to the changelog topic.
+ */
+public class TimestampedWindowStoreWithHeadersSerializerSideEffectTest {
+
+ private static final String STORE_NAME = "test-window-store";
+ private static final String INPUT_TOPIC = "input";
+ private static final String OUTPUT_TOPIC = "output";
+ private static final long WINDOW_SIZE_MS = 10000L;
+
+ /**
+ * Custom serializer that adds a header during serialization as a
side-effect.
+ * This simulates real-world serializers that add metadata to headers.
+ */
+ private static class HeaderAddingSerializer implements Serializer<String> {
+ @Override
+ public byte[] serialize(final String topic, final String data) {
+ return data == null ? null : data.getBytes(StandardCharsets.UTF_8);
+ }
+
+ @Override
+ public byte[] serialize(final String topic, final Headers headers,
final String data) {
+ // Add metadata header during serialization (side-effect)
+ headers.add("serializer-metadata",
"window-test-value".getBytes(StandardCharsets.UTF_8));
+ return serialize(topic, data);
+ }
+ }
+
+ private static class HeaderAddingSerde implements Serde<String> {
+ @Override
+ public Serializer<String> serializer() {
+ return new HeaderAddingSerializer();
+ }
+
+ @Override
+ public Deserializer<String> deserializer() {
+ return Serdes.String().deserializer();
+ }
+ }
+
+ /**
+ * Processor that puts and deletes from a timestamped window store with
headers.
+ * Uses command value "put(null)" to test deletion via put(key, null,
windowStartTimestamp).
+ */
+ private static class WindowStoreProcessor extends
ContextualProcessor<String, String, String, String> {
+ private TimestampedWindowStoreWithHeaders<String, String> store;
+
+ @Override
+ public void init(final ProcessorContext<String, String> context) {
+ super.init(context);
+ store = context.getStateStore(STORE_NAME);
+ }
+
+ @Override
+ public void process(final Record<String, String> record) {
+ final long windowStartTimestamp = record.timestamp();
+ if ("put(null)".equals(record.value())) {
+ // Delete using put(key, null, windowStartTimestamp)
+ store.put(record.key(), null, windowStartTimestamp);
+ } else {
+ // Put with timestamp and headers
+ store.put(
+ record.key(),
+ ValueTimestampHeaders.make(record.value(),
record.timestamp(), record.headers()),
+ windowStartTimestamp
+ );
+ }
+
+ context().forward(record);
+ }
+ }
+
+ @Test
+ public void shouldPropagateSerializerHeaderSideEffectToChangelog() {
+ final StreamsBuilder builder = new StreamsBuilder();
+
+ // Create a timestamped window store with headers using our custom
serializer
+ builder.addStateStore(
+ Stores.timestampedWindowStoreWithHeadersBuilder(
+ Stores.persistentTimestampedWindowStore(
+ STORE_NAME,
+ Duration.ofMillis(WINDOW_SIZE_MS),
+ Duration.ofMillis(WINDOW_SIZE_MS),
+ false
+ ),
+ new HeaderAddingSerde(), // Custom key serializer that adds
headers
+ Serdes.String()
+ )
+ );
+
+ // Add a processor that uses the store and forwards to output
+ builder.stream(INPUT_TOPIC, Consumed.with(Serdes.String(),
Serdes.String()))
+ .process(WindowStoreProcessor::new, STORE_NAME)
+ .to(OUTPUT_TOPIC);
+
+ final Properties props = new Properties();
+ props.put("application.id", "test-window-app");
+ props.put("bootstrap.servers", "dummy:1234");
+ props.put("state.dir", TestUtils.tempDirectory().getAbsolutePath());
+ props.put("default.key.serde", Serdes.StringSerde.class);
+ props.put("default.value.serde", Serdes.StringSerde.class);
+
+ try (TopologyTestDriver driver = new
TopologyTestDriver(builder.build(), props)) {
+ final TestInputTopic<String, String> inputTopic =
driver.createInputTopic(
+ INPUT_TOPIC,
+ Serdes.String().serializer(),
+ Serdes.String().serializer()
+ );
+
+ final String changelogTopic = "test-window-app-" + STORE_NAME +
"-changelog";
+ final TestOutputTopic<String, String> changelogOutputTopic =
+ driver.createOutputTopic(
+ changelogTopic,
+ Serdes.String().deserializer(),
+ Serdes.String().deserializer()
+ );
+
+ final TestOutputTopic<String, String> outputTopic =
+ driver.createOutputTopic(
+ OUTPUT_TOPIC,
+ Serdes.String().deserializer(),
+ Serdes.String().deserializer()
+ );
+
+ inputTopic.pipeInput("key1", "value1", 1000L);
+
+ // Verify changelog has the put record with header
+ final var putChangelogRecord = changelogOutputTopic.readRecord();
+ assertNotNull(putChangelogRecord.key());
+ assertEquals("value1", putChangelogRecord.value());
+
+ // Verify the serializer added metadata header to changelog
+ final Header putMetadataHeader =
putChangelogRecord.headers().lastHeader("serializer-metadata");
+ assertNotNull(putMetadataHeader, "metadata header should be
present in changelog put record");
+ assertEquals("window-test-value", new
String(putMetadataHeader.value(), StandardCharsets.UTF_8));
+
+ final var putOutputRecord = outputTopic.readRecord();
+ assertEquals("key1", putOutputRecord.key());
+ assertEquals("value1", putOutputRecord.value());
+ final Header outputMetadataHeader =
putOutputRecord.headers().lastHeader("serializer-metadata");
+ assertNotNull(outputMetadataHeader,
+ "Output record SHOULD contain serializer-metadata header for
normal put operations");
+ assertEquals("window-test-value", new
String(outputMetadataHeader.value(), StandardCharsets.UTF_8));
+
+ inputTopic.pipeInput("key1", "put(null)", 1000L);
+
+ // Verify changelog has the delete record (tombstone) with header
+ final var putNullChangelogRecord =
changelogOutputTopic.readRecord();
+ assertNotNull(putNullChangelogRecord.key());
+ assertNull(putNullChangelogRecord.value(), "put(null) should
produce tombstone");
+
+ // Verify the serializer's side-effect made it into the changelog
+ final Header putNullMetadataHeader =
putNullChangelogRecord.headers().lastHeader("serializer-metadata");
+ assertNotNull(putNullMetadataHeader, "metadata header should be
present in changelog tombstone from put(null)");
+ assertEquals("window-test-value", new
String(putNullMetadataHeader.value(), StandardCharsets.UTF_8));
+
+ // CRITICAL: Verify output record does NOT contain the
serializer-added header
+ final var putNullOutputRecord = outputTopic.readRecord();
+ assertEquals("key1", putNullOutputRecord.key());
+ assertEquals("put(null)", putNullOutputRecord.value());
+ final Header outputPutNullMetadataHeader =
putNullOutputRecord.headers().lastHeader("serializer-metadata");
+ assertNull(outputPutNullMetadataHeader, "Output record should NOT
contain serializer-metadata header - side-effect should be isolated to
changelog");
+ }
+ }
+}