This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 5af84cf1e1d KAFKA-20262: Changelog tombstone records drop custom 
headers when usi… (#21639)
5af84cf1e1d is described below

commit 5af84cf1e1de679407c34441c1fcc87c8c4de4e1
Author: TengYao Chi <[email protected]>
AuthorDate: Tue Mar 17 20:31:23 2026 +0000

    KAFKA-20262: Changelog tombstone records drop custom headers when usi… 
(#21639)
    
    When utilizing headers-aware state stores in Kafka Streams, we need to
    ensure that any delete operation (remove(), put(key, null), or similar)
    handles headers correctly. While a delete is, well, a delete on the
    state store itself, it's a tombstone in the changelog topic.
    
    This PR ensures, that tombstones written into changelog topics have the
    correct headers set.
    
    Reviewers: Alieh Saeedi <[email protected]>, Bill Bejeck
     <[email protected]>, Matthias J. Sax <[email protected]>
---
 ...ngTimestampedKeyValueBytesStoreWithHeaders.java |   7 +
 .../internals/MeteredSessionStoreWithHeaders.java  |  79 +++++-
 ...MeteredTimestampedKeyValueStoreWithHeaders.java | 118 ++++++++-
 .../MeteredTimestampedWindowStoreWithHeaders.java  |  33 ++-
 .../state/internals/MeteredWindowStore.java        |   2 +-
 .../MeteredSessionStoreWithHeadersTest.java        |   6 +-
 ...redTimestampedKeyValueStoreWithHeadersTest.java |   2 +
 ...onStoreWithHeadersSerializerSideEffectTest.java | 243 ++++++++++++++++++
 ...ueStoreWithHeadersSerializerSideEffectTest.java | 279 +++++++++++++++++++++
 ...owStoreWithHeadersSerializerSideEffectTest.java | 218 ++++++++++++++++
 10 files changed, 975 insertions(+), 12 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStoreWithHeaders.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStoreWithHeaders.java
index fe96a6acd16..bce5dbe0130 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStoreWithHeaders.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStoreWithHeaders.java
@@ -97,4 +97,11 @@ public class 
ChangeLoggingTimestampedKeyValueBytesStoreWithHeaders
             );
         }
     }
+
+    @Override
+    public byte[] delete(final Bytes key) {
+        final byte[] deletedValue = wrapped().delete(key);
+        log(key, null, internalContext.recordContext().timestamp(), 
internalContext.recordContext().headers());
+        return deletedValue;
+    }
 }
\ No newline at end of file
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreWithHeaders.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreWithHeaders.java
index 221cf461ba4..b74ebd9ec88 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreWithHeaders.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreWithHeaders.java
@@ -24,6 +24,7 @@ import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
 import org.apache.kafka.streams.processor.internals.SerdeGetter;
 import org.apache.kafka.streams.query.FailureReason;
 import org.apache.kafka.streams.query.PositionBound;
@@ -69,10 +70,40 @@ public class MeteredSessionStoreWithHeaders<K, AGG>
     public void put(final Windowed<K> sessionKey, final 
AggregationWithHeaders<AGG> aggregate) {
         Objects.requireNonNull(sessionKey, "sessionKey can't be null");
         try {
-            final Headers headers = aggregate != null ? aggregate.headers() : 
new RecordHeaders();
-            final Bytes key = keyBytes(sessionKey, headers, serdes);
-            maybeMeasureLatency(() -> wrapped().put(new Windowed<>(key, 
sessionKey.window()),
-                serdes.rawValue(aggregate, headers)), time, putSensor);
+            maybeMeasureLatency(
+                () -> {
+                    if (aggregate == null) {
+                        final ProcessorRecordContext currentContext = 
internalContext.recordContext();
+
+                        // Create new headers object to isolate tombstone 
operation from input record
+                        final Headers deleteHeaders = new 
RecordHeaders(currentContext.headers());
+
+                        // Create temporary context with new headers
+                        final ProcessorRecordContext temporaryContext = new 
ProcessorRecordContext(
+                            currentContext.timestamp(),
+                            currentContext.offset(),
+                            currentContext.partition(),
+                            currentContext.topic(),
+                            deleteHeaders
+                        );
+
+                        try {
+                            internalContext.setRecordContext(temporaryContext);
+                            final Bytes key = keyBytes(sessionKey, 
deleteHeaders, serdes);
+                            wrapped().put(new Windowed<>(key, 
sessionKey.window()), serdes.rawValue(null, deleteHeaders));
+                        } finally {
+                            // Restore original context
+                            internalContext.setRecordContext(currentContext);
+                        }
+                    } else {
+                        final Headers headers = aggregate.headers();
+                        final Bytes key = keyBytes(sessionKey, headers, 
serdes);
+                        wrapped().put(new Windowed<>(key, 
sessionKey.window()), serdes.rawValue(aggregate, headers));
+                    }
+                },
+                time,
+                putSensor
+            );
             maybeRecordE2ELatency();
         } catch (final ProcessorStateException e) {
             final String message = String.format(e.getMessage(), 
sessionKey.key(), aggregate);
@@ -81,6 +112,46 @@ public class MeteredSessionStoreWithHeaders<K, AGG>
 
     }
 
+    @Override
+    public void remove(final Windowed<K> sessionKey) {
+        Objects.requireNonNull(sessionKey, "sessionKey can't be null");
+        Objects.requireNonNull(sessionKey.key(), "sessionKey.key() can't be 
null");
+        Objects.requireNonNull(sessionKey.window(), "sessionKey.window() can't 
be null");
+
+        try {
+            maybeMeasureLatency(
+                () -> {
+                    final ProcessorRecordContext currentContext = 
internalContext.recordContext();
+
+                    // Create new headers object to isolate delete operation 
from input record
+                    final Headers deleteHeaders = new 
RecordHeaders(currentContext.headers());
+
+                    // Create temporary context with new headers
+                    final ProcessorRecordContext temporaryContext = new 
ProcessorRecordContext(
+                        currentContext.timestamp(),
+                        currentContext.offset(),
+                        currentContext.partition(),
+                        currentContext.topic(),
+                        deleteHeaders
+                    );
+
+                    try {
+                        internalContext.setRecordContext(temporaryContext);
+                        final Bytes key = keyBytes(sessionKey, deleteHeaders, 
serdes);
+                        wrapped().remove(new Windowed<>(key, 
sessionKey.window()));
+                    } finally {
+                        // Restore original context
+                        internalContext.setRecordContext(currentContext);
+                    }
+                },
+                time,
+                removeSensor
+            );
+        } catch (final ProcessorStateException e) {
+            throw new ProcessorStateException(String.format(e.getMessage(), 
sessionKey.key()), e);
+        }
+    }
+
     @SuppressWarnings("unchecked")
     @Override
     public <R> QueryResult<R> query(final Query<R> query,
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java
index 86624961650..2346c7646af 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java
@@ -25,6 +25,7 @@ import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.errors.ProcessorStateException;
+import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
 import org.apache.kafka.streams.processor.internals.SerdeGetter;
 import org.apache.kafka.streams.query.KeyQuery;
 import org.apache.kafka.streams.query.PositionBound;
@@ -49,6 +50,7 @@ import java.util.function.Function;
 import static org.apache.kafka.common.utils.Utils.mkEntry;
 import static org.apache.kafka.common.utils.Utils.mkMap;
 import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency;
+import static 
org.apache.kafka.streams.state.internals.ValueTimestampHeadersDeserializer.headers;
 
 
 /**
@@ -110,8 +112,38 @@ public class MeteredTimestampedKeyValueStoreWithHeaders<K, 
V>
                     final ValueTimestampHeaders<V> value) {
         Objects.requireNonNull(key, "key cannot be null");
         try {
-            final Headers headers = value != null ? value.headers() : new 
RecordHeaders();
-            maybeMeasureLatency(() -> wrapped().put(keyBytes(key, headers), 
serdes.rawValue(value, headers)), time, putSensor);
+            maybeMeasureLatency(
+                () -> {
+                    if (value == null) {
+                        final ProcessorRecordContext currentContext = 
internalContext.recordContext();
+
+                        // Create new headers object to isolate tombstone 
operation from input record
+                        final Headers deleteHeaders = new 
RecordHeaders(currentContext.headers());
+
+                        // Create temporary context with new headers
+                        final ProcessorRecordContext temporaryContext = new 
ProcessorRecordContext(
+                            currentContext.timestamp(),
+                            currentContext.offset(),
+                            currentContext.partition(),
+                            currentContext.topic(),
+                            deleteHeaders
+                        );
+
+                        try {
+                            internalContext.setRecordContext(temporaryContext);
+                            wrapped().put(keyBytes(key, deleteHeaders), 
serdes.rawValue(null, deleteHeaders));
+                        } finally {
+                            // Restore original context
+                            internalContext.setRecordContext(currentContext);
+                        }
+                    } else {
+                        final Headers headers = value.headers();
+                        wrapped().put(keyBytes(key, headers), 
serdes.rawValue(value, headers));
+                    }
+                },
+                time,
+                putSensor
+            );
             maybeRecordE2ELatency();
         } catch (final ProcessorStateException e) {
             final String message = String.format(e.getMessage(), key, value);
@@ -123,9 +155,35 @@ public class MeteredTimestampedKeyValueStoreWithHeaders<K, 
V>
     public ValueTimestampHeaders<V> putIfAbsent(final K key,
                                                 final ValueTimestampHeaders<V> 
value) {
         Objects.requireNonNull(key, "key cannot be null");
-        final Headers headers = value != null ? value.headers() : new 
RecordHeaders();
         final ValueTimestampHeaders<V> currentValue = maybeMeasureLatency(
-            () -> deserializeValue(wrapped().putIfAbsent(keyBytes(key, 
headers), serdes.rawValue(value, headers))),
+            () -> {
+                if (value == null) {
+                    final ProcessorRecordContext currentContext = 
internalContext.recordContext();
+
+                    // Create new headers object to isolate tombstone 
operation from input record
+                    final Headers deleteHeaders = new 
RecordHeaders(currentContext.headers());
+
+                    // Create temporary context with new headers
+                    final ProcessorRecordContext temporaryContext = new 
ProcessorRecordContext(
+                        currentContext.timestamp(),
+                        currentContext.offset(),
+                        currentContext.partition(),
+                        currentContext.topic(),
+                        deleteHeaders
+                    );
+
+                    try {
+                        internalContext.setRecordContext(temporaryContext);
+                        return 
deserializeValue(wrapped().putIfAbsent(keyBytes(key, deleteHeaders), 
serdes.rawValue(null, deleteHeaders)));
+                    } finally {
+                        // Restore original context
+                        internalContext.setRecordContext(currentContext);
+                    }
+                } else {
+                    final Headers headers = value.headers();
+                    return 
deserializeValue(wrapped().putIfAbsent(keyBytes(key, headers), 
serdes.rawValue(value, headers)));
+                }
+            },
             time,
             putIfAbsentSensor
         );
@@ -133,6 +191,58 @@ public class MeteredTimestampedKeyValueStoreWithHeaders<K, 
V>
         return currentValue;
     }
 
+    @Override
+    public void putAll(final java.util.List<KeyValue<K, 
ValueTimestampHeaders<V>>> entries) {
+        entries.forEach(entry -> Objects.requireNonNull(entry.key, "key cannot 
be null"));
+
+        final boolean hasNullValue = entries.stream().anyMatch(entry -> 
entry.value == null);
+
+        if (hasNullValue) {
+            entries.forEach(entry -> put(entry.key, entry.value));
+        } else {
+            // If no null values, use parent's batch optimization
+            super.putAll(entries);
+        }
+    }
+
+    @Override
+    public ValueTimestampHeaders<V> delete(final K key) {
+        Objects.requireNonNull(key, "key cannot be null");
+        try {
+            return maybeMeasureLatency(
+                () -> {
+                    final ProcessorRecordContext currentContext = 
internalContext.recordContext();
+
+                    // Create new headers object to isolate delete operation 
from input record
+                    final Headers deleteHeaders = new 
RecordHeaders(currentContext.headers());
+
+                    // Create temporary context with new headers
+                    final ProcessorRecordContext temporaryContext = new 
ProcessorRecordContext(
+                        currentContext.timestamp(),
+                        currentContext.offset(),
+                        currentContext.partition(),
+                        currentContext.topic(),
+                        deleteHeaders
+                    );
+
+                    try {
+                        internalContext.setRecordContext(temporaryContext);
+                        final byte[] deletedValue = 
wrapped().delete(keyBytes(key, deleteHeaders));
+                        return deserializeValue(deletedValue);
+                    } finally {
+                        // Restore original context
+                        internalContext.setRecordContext(currentContext);
+                    }
+                },
+                time,
+                deleteSensor
+            );
+        } catch (final ProcessorStateException e) {
+            final String message = String.format(e.getMessage(), key);
+            throw new ProcessorStateException(message, e);
+        }
+    }
+
     /**
      * Executes a query against this store.
      *
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreWithHeaders.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreWithHeaders.java
index 96b7fbae9be..e88ec3398d5 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreWithHeaders.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreWithHeaders.java
@@ -24,6 +24,7 @@ import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
 import org.apache.kafka.streams.processor.internals.SerdeGetter;
 import org.apache.kafka.streams.query.FailureReason;
 import org.apache.kafka.streams.query.PositionBound;
@@ -45,6 +46,7 @@ import java.util.Objects;
 import java.util.function.Function;
 
 import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency;
+import static 
org.apache.kafka.streams.state.internals.ValueTimestampHeadersDeserializer.headers;
 
 /**
  * A Metered {@link TimestampedWindowStoreWithHeaders} wrapper that is used 
for recording operation metrics,
@@ -81,10 +83,37 @@ public class MeteredTimestampedWindowStoreWithHeaders<K, V>
     @Override
     public void put(final K key, final ValueTimestampHeaders<V> value, final 
long windowStartTimestamp) {
         Objects.requireNonNull(key, "key cannot be null");
-        final Headers headers = value == null || value.headers() == null ? new 
RecordHeaders() : value.headers();
         try {
             maybeMeasureLatency(
-                () -> wrapped().put(keyBytes(key, headers), 
serdes.rawValue(value, headers), windowStartTimestamp),
+                () -> {
+                    if (value == null) {
+                        // Deletion path
+                        final ProcessorRecordContext currentContext = 
internalContext.recordContext();
+
+                        // Create new headers object to isolate delete 
operation from input record
+                        final Headers deleteHeaders = new 
RecordHeaders(currentContext.headers());
+
+                        // Create temporary context with new headers
+                        final ProcessorRecordContext temporaryContext = new 
ProcessorRecordContext(
+                            currentContext.timestamp(),
+                            currentContext.offset(),
+                            currentContext.partition(),
+                            currentContext.topic(),
+                            deleteHeaders
+                        );
+
+                        try {
+                            internalContext.setRecordContext(temporaryContext);
+                            wrapped().put(keyBytes(key, deleteHeaders), null, 
windowStartTimestamp);
+                        } finally {
+                            // Restore original context
+                            internalContext.setRecordContext(currentContext);
+                        }
+                    } else {
+                        final Headers headers = value.headers() == null ? new 
RecordHeaders() : value.headers();
+                        wrapped().put(keyBytes(key, headers), 
serdes.rawValue(value, headers), windowStartTimestamp);
+                    }
+                },
                 time,
                 putSensor
             );
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
index 2658772b7bd..8df7fbe161e 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
@@ -79,7 +79,7 @@ public class MeteredWindowStore<K, V>
     private Sensor flushSensor;
     private Sensor e2eLatencySensor;
     protected Sensor iteratorDurationSensor;
-    private InternalProcessorContext<?, ?> internalContext;
+    protected InternalProcessorContext<?, ?> internalContext;
     private TaskId taskId;
     private Sensor restoreSensor;
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreWithHeadersTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreWithHeadersTest.java
index 4d6aaa8e2d7..f95254237d6 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreWithHeadersTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreWithHeadersTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.header.internals.RecordHeaders;
@@ -36,6 +37,7 @@ import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.internals.SessionWindow;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.query.FailureReason;
 import org.apache.kafka.streams.query.PositionBound;
@@ -136,6 +138,8 @@ public class MeteredSessionStoreWithHeadersTest {
         when(context.taskId()).thenReturn(taskId);
         when(context.changelogFor(STORE_NAME)).thenReturn(CHANGELOG_TOPIC);
         when(innerStore.name()).thenReturn(STORE_NAME);
+        lenient().when(context.recordContext()).thenReturn(new 
ProcessorRecordContext(
+            0L, 0L, 0, "topic", new RecordHeaders()));
     }
 
     private void init() {
@@ -470,7 +474,7 @@ public class MeteredSessionStoreWithHeadersTest {
 
         store.recordRestoreTime(100L);
 
-        final Map<MetricName, ? extends org.apache.kafka.common.Metric> 
allMetrics = metrics.metrics();
+        final Map<MetricName, ? extends Metric> allMetrics = metrics.metrics();
         final List<MetricName> restoreMetrics = allMetrics.keySet().stream()
             .filter(metricName -> metricName.name().equals("restore-rate"))
             .collect(Collectors.toList());
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeadersTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeadersTest.java
index 851891ecced..fc340282988 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeadersTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeadersTest.java
@@ -128,6 +128,8 @@ public class MeteredTimestampedKeyValueStoreWithHeadersTest 
{
         when(context.changelogFor(STORE_NAME)).thenReturn(CHANGELOG_TOPIC);
         when(inner.name()).thenReturn(STORE_NAME);
         when(context.appConfigs()).thenReturn(CONFIGS);
+        lenient().when(context.recordContext()).thenReturn(new 
org.apache.kafka.streams.processor.internals.ProcessorRecordContext(
+            0L, 0L, 0, "topic", new 
org.apache.kafka.common.header.internals.RecordHeaders()));
     }
 
     @SuppressWarnings({"unchecked", "rawtypes"})
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionStoreWithHeadersSerializerSideEffectTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionStoreWithHeadersSerializerSideEffectTest.java
new file mode 100644
index 00000000000..b5eda458c78
--- /dev/null
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionStoreWithHeadersSerializerSideEffectTest.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.TestOutputTopic;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.internals.SessionWindow;
+import org.apache.kafka.streams.processor.api.ContextualProcessor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.state.AggregationWithHeaders;
+import org.apache.kafka.streams.state.SessionStoreWithHeaders;
+import org.apache.kafka.streams.state.Stores;
+
+import org.junit.jupiter.api.Test;
+
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.Properties;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+/**
+ * Test to verify that key serializers can modify headers as a side-effect,
+ * and that this side-effect makes it into the changelog topic for session 
stores.
+ *
+ * This test verifies the core assumption of the headers-aware state store 
implementation:
+ * when we create a temporary context with new headers and serialize the key, 
the key
+ * serializer will add metadata to those headers, and those headers
+ * will be used when logging the change to the changelog topic.
+ */
+public class SessionStoreWithHeadersSerializerSideEffectTest {
+
+    private static final String STORE_NAME = "test-session-store";
+    private static final String INPUT_TOPIC = "input";
+    private static final String OUTPUT_TOPIC = "output";
+
+    /**
+     * Custom serializer that adds a header during serialization as a 
side-effect.
+     * This simulates real-world serializers that add metadata to headers.
+     */
+    private static class HeaderAddingSerializer implements Serializer<String> {
+        @Override
+        public byte[] serialize(final String topic, final String data) {
+            return data == null ? null : data.getBytes(StandardCharsets.UTF_8);
+        }
+
+        @Override
+        public byte[] serialize(final String topic, final Headers headers, 
final String data) {
+            headers.add("serializer-metadata", 
"session-test-value".getBytes(StandardCharsets.UTF_8));
+            return serialize(topic, data);
+        }
+    }
+
+    private static class HeaderAddingSerde implements Serde<String> {
+        @Override
+        public Serializer<String> serializer() {
+            return new HeaderAddingSerializer();
+        }
+
+        @Override
+        public Deserializer<String> deserializer() {
+            return Serdes.String().deserializer();
+        }
+    }
+
+    /**
+     * Processor that puts and removes from a session store with headers.
+     * Uses command values to test different deletion methods:
+     * - "remove" → uses remove() API
+     * - "put(null)" → uses put(sessionKey, null) API
+     * - other values → normal put operation
+     */
+    private static class SessionStoreProcessor extends 
ContextualProcessor<String, String, String, String> {
+        private SessionStoreWithHeaders<String, String> store;
+
+        @Override
+        public void init(final ProcessorContext<String, String> context) {
+            super.init(context);
+            store = context.getStateStore(STORE_NAME);
+        }
+
+        @Override
+        public void process(final Record<String, String> record) {
+            final long timestamp = record.timestamp();
+            final Windowed<String> sessionKey = new Windowed<>(
+                record.key(),
+                new SessionWindow(timestamp, timestamp)
+            );
+
+            if ("remove".equals(record.value())) {
+                store.remove(sessionKey);
+            } else if ("put(null)".equals(record.value())) {
+                store.put(sessionKey, null);
+            } else {
+                store.put(
+                    sessionKey,
+                    AggregationWithHeaders.make(record.value(), 
record.headers())
+                );
+            }
+
+            context().forward(record);
+        }
+    }
+
+    @Test
+    public void shouldPropagateSerializerHeaderSideEffectToChangelog() {
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        // Create a session store with headers using our custom serializer
+        builder.addStateStore(
+            Stores.sessionStoreBuilderWithHeaders(
+                Stores.inMemorySessionStore(
+                    STORE_NAME,
+                    Duration.ofMillis(10000L)
+                ),
+                new HeaderAddingSerde(),  // Custom key serializer that adds 
headers
+                Serdes.String()
+            )
+        );
+
+        // Add a processor that uses the store and forwards to output
+        builder.stream(INPUT_TOPIC, Consumed.with(Serdes.String(), 
Serdes.String()))
+            .process(SessionStoreProcessor::new, STORE_NAME)
+            .to(OUTPUT_TOPIC);
+
+        final Properties props = new Properties();
+        props.put("application.id", "test-session-app");
+        props.put("default.key.serde", Serdes.StringSerde.class);
+        props.put("default.value.serde", Serdes.StringSerde.class);
+
+        try (TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), props)) {
+            final TestInputTopic<String, String> inputTopic = 
driver.createInputTopic(
+                INPUT_TOPIC,
+                Serdes.String().serializer(),
+                Serdes.String().serializer()
+            );
+
+            final String changelogTopic = "test-session-app-" + STORE_NAME + 
"-changelog";
+            final TestOutputTopic<String, String> changelogOutputTopic =
+                driver.createOutputTopic(
+                    changelogTopic,
+                    Serdes.String().deserializer(),
+                    Serdes.String().deserializer()
+                );
+
+            // Create output topic reader (using regular StringSerde, not 
HeaderAddingSerde)
+            final TestOutputTopic<String, String> outputTopic =
+                driver.createOutputTopic(
+                    OUTPUT_TOPIC,
+                    Serdes.String().deserializer(),
+                    Serdes.String().deserializer()
+                );
+
+            inputTopic.pipeInput("key1", "value1", 1000L);
+
+            // Verify changelog has the put record with header
+            final var putChangelogRecord = changelogOutputTopic.readRecord();
+            assertNotNull(putChangelogRecord.key());
+            assertEquals("value1", putChangelogRecord.value());
+
+            // Verify the serializer added metadata header to changelog
+            final Header putMetadataHeader = 
putChangelogRecord.headers().lastHeader("serializer-metadata");
+            assertNotNull(putMetadataHeader, "metadata header should be 
present in changelog put record");
+            assertEquals("session-test-value", new 
String(putMetadataHeader.value(), StandardCharsets.UTF_8));
+
+            final var putOutputRecord = outputTopic.readRecord();
+            assertEquals("key1", putOutputRecord.key());
+            assertEquals("value1", putOutputRecord.value());
+            final Header outputMetadataHeader = 
putOutputRecord.headers().lastHeader("serializer-metadata");
+            assertNotNull(outputMetadataHeader, "Output record should contain 
serializer-metadata header for normal put operations");
+            assertEquals("session-test-value", new 
String(outputMetadataHeader.value(), StandardCharsets.UTF_8));
+
+            inputTopic.pipeInput("key1", "remove", 1000L);
+
+            // Verify changelog has the delete record (tombstone) with header
+            final var removeChangelogRecord = 
changelogOutputTopic.readRecord();
+            assertNotNull(removeChangelogRecord.key());
+            assertNull(removeChangelogRecord.value(), "remove() should produce 
tombstone");
+
+            // Verify the serializer's side-effect made it into the changelog
+            final Header removeMetadataHeader = 
removeChangelogRecord.headers().lastHeader("serializer-metadata");
+            assertNotNull(removeMetadataHeader, "metadata header should be 
present in changelog tombstone from remove()");
+            assertEquals("session-test-value", new 
String(removeMetadataHeader.value(), StandardCharsets.UTF_8));
+
+            // CRITICAL: Verify output tombstone does NOT contain the 
serializer-added header
+            final var removeOutputRecord = outputTopic.readRecord();
+            assertEquals("key1", removeOutputRecord.key());
+            assertEquals("remove", removeOutputRecord.value());
+            final Header outputRemoveMetadataHeader = 
removeOutputRecord.headers().lastHeader("serializer-metadata");
+            assertNull(outputRemoveMetadataHeader, "Output record should not 
contain serializer-metadata header - side-effect should be isolated to 
changelog");
+
+            inputTopic.pipeInput("key2", "value2", 2000L);
+            changelogOutputTopic.readRecord();
+            outputTopic.readRecord();
+
+            inputTopic.pipeInput("key2", "put(null)", 2000L);
+
+            // Verify changelog has the delete record (tombstone) with header
+            final var putNullChangelogRecord = 
changelogOutputTopic.readRecord();
+            assertNotNull(putNullChangelogRecord.key());
+            assertNull(putNullChangelogRecord.value(), "put(null) should 
produce tombstone");
+
+            // Verify the serializer's side-effect made it into the changelog
+            final Header putNullMetadataHeader = 
putNullChangelogRecord.headers().lastHeader("serializer-metadata");
+            assertNotNull(putNullMetadataHeader, "metadata header should be 
present in changelog tombstone from put(null)");
+            assertEquals("session-test-value", new 
String(putNullMetadataHeader.value(), StandardCharsets.UTF_8));
+
+            // CRITICAL: Verify output tombstone does NOT contain the 
serializer-added header
+            final var putNullOutputRecord = outputTopic.readRecord();
+            assertEquals("key2", putNullOutputRecord.key());
+            assertEquals("put(null)", putNullOutputRecord.value());
+            final Header outputPutNullMetadataHeader = 
putNullOutputRecord.headers().lastHeader("serializer-metadata");
+            assertNull(outputPutNullMetadataHeader, "Output record should not 
contain serializer-metadata header - side-effect should be isolated to 
changelog");
+        }
+    }
+}
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreWithHeadersSerializerSideEffectTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreWithHeadersSerializerSideEffectTest.java
new file mode 100644
index 00000000000..918e3e37d7a
--- /dev/null
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreWithHeadersSerializerSideEffectTest.java
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.TestOutputTopic;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.processor.api.ContextualProcessor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.TimestampedKeyValueStoreWithHeaders;
+import org.apache.kafka.streams.state.ValueTimestampHeaders;
+
+import org.junit.jupiter.api.Test;
+
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Properties;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+/**
+ * Test to verify that key serializers can modify headers as a side-effect,
+ * and that this side-effect makes it into the changelog topic.
+ *
+ * This test verifies the core assumption of the headers-aware state store 
implementation:
+ * when we create a temporary context with new headers and serialize the key, 
the key
+ * serializer will add metadata to those headers, and those headers
+ * will be used when logging the change to the changelog topic.
+ */
+public class TimestampedKeyValueStoreWithHeadersSerializerSideEffectTest {
+
+    private static final String STORE_NAME = "test-store";
+    private static final String INPUT_TOPIC = "input";
+    private static final String OUTPUT_TOPIC = "output";
+
+    /**
+     * Custom serializer that adds a header during serialization as a 
side-effect.
+     * This simulates real-world serializers that add metadata to headers.
+     */
+    private static class HeaderAddingSerializer implements Serializer<String> {
+        @Override
+        public byte[] serialize(final String topic, final String data) {
+            return data == null ? null : data.getBytes(StandardCharsets.UTF_8);
+        }
+
+        @Override
+        public byte[] serialize(final String topic, final Headers headers, 
final String data) {
+            // Add metadata header during serialization (side-effect)
+            headers.add("serializer-metadata", 
"test-value".getBytes(StandardCharsets.UTF_8));
+            return serialize(topic, data);
+        }
+    }
+
+    private static class HeaderAddingSerde implements Serde<String> {
+        @Override
+        public Serializer<String> serializer() {
+            return new HeaderAddingSerializer();
+        }
+
+        @Override
+        public Deserializer<String> deserializer() {
+            return Serdes.String().deserializer();
+        }
+    }
+
+    /**
+     * Processor that puts and deletes from a timestamped key-value store with 
headers.
+     * Uses command values to test different deletion methods:
+     * - "delete" → uses delete() API
+     * - "put(null)" → uses put(key, null) API
+     * - "putIfAbsent(null)" → uses putIfAbsent(key, null) API
+     * - "putAll(null)" → uses putAll() with null value
+     * - other values → normal put operation
+     */
+    private static class StoreProcessor extends ContextualProcessor<String, 
String, String, String> {
+        private TimestampedKeyValueStoreWithHeaders<String, String> store;
+
+        @Override
+        public void init(final ProcessorContext<String, String> context) {
+            super.init(context);
+            store = context.getStateStore(STORE_NAME);
+        }
+
+        @Override
+        public void process(final Record<String, String> record) {
+            if ("delete".equals(record.value())) {
+                store.delete(record.key());
+            } else if ("put(null)".equals(record.value())) {
+                store.put(record.key(), null);
+            } else if ("putIfAbsent(null)".equals(record.value())) {
+                store.putIfAbsent(record.key(), null);
+            } else if ("putAll(null)".equals(record.value())) {
+                store.putAll(List.of(KeyValue.pair(record.key(), null)));
+            } else {
+                store.put(record.key(), 
ValueTimestampHeaders.make(record.value(), record.timestamp(), 
record.headers()));
+            }
+
+            context().forward(record);
+        }
+    }
+
+    @Test
+    public void shouldPropagateSerializerHeaderSideEffectToChangelog() {
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        // Create a timestamped key-value store with headers using our custom 
serializer
+        builder.addStateStore(
+            Stores.timestampedKeyValueStoreBuilderWithHeaders(
+                Stores.inMemoryKeyValueStore(STORE_NAME),
+                new HeaderAddingSerde(),  // Custom key serializer that adds 
headers
+                Serdes.String()
+            )
+        );
+
+        // Add a processor that uses the store and forwards to output
+        builder.stream(INPUT_TOPIC, Consumed.with(Serdes.String(), 
Serdes.String()))
+            .process(StoreProcessor::new, STORE_NAME)
+            .to(OUTPUT_TOPIC);
+
+        final Properties props = new Properties();
+        props.put("application.id", "test-app");
+        props.put("default.key.serde", Serdes.StringSerde.class);
+        props.put("default.value.serde", Serdes.StringSerde.class);
+
+        try (TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), props)) {
+            final TestInputTopic<String, String> inputTopic = 
driver.createInputTopic(
+                INPUT_TOPIC,
+                Serdes.String().serializer(),
+                Serdes.String().serializer()
+            );
+
+            final String changelogTopic = "test-app-" + STORE_NAME + 
"-changelog";
+            final TestOutputTopic<String, String> changelogOutputTopic =
+                driver.createOutputTopic(
+                    changelogTopic,
+                    Serdes.String().deserializer(),
+                    Serdes.String().deserializer()
+                );
+
+            final TestOutputTopic<String, String> outputTopic =
+                driver.createOutputTopic(
+                    OUTPUT_TOPIC,
+                    Serdes.String().deserializer(),
+                    Serdes.String().deserializer()
+                );
+
+            inputTopic.pipeInput("key1", "value1");
+
+            // Verify changelog has the put record with header
+            final var putChangelogRecord = changelogOutputTopic.readRecord();
+            assertEquals("key1", putChangelogRecord.key());
+            assertEquals("value1", putChangelogRecord.value());
+
+            // Verify the serializer added metadata header to changelog
+            final Header putMetadataHeader = 
putChangelogRecord.headers().lastHeader("serializer-metadata");
+            assertNotNull(putMetadataHeader, "metadata header should be 
present in changelog put record");
+            assertEquals("test-value", new String(putMetadataHeader.value(), 
StandardCharsets.UTF_8));
+
+            // Verify output record - for normal put, serializer side-effects 
ARE expected in output
+            final var putOutputRecord = outputTopic.readRecord();
+            assertEquals("key1", putOutputRecord.key());
+            assertEquals("value1", putOutputRecord.value());
+            final Header outputMetadataHeader = 
putOutputRecord.headers().lastHeader("serializer-metadata");
+            assertNotNull(outputMetadataHeader, "Output record SHOULD contain 
serializer-metadata header for normal put operations");
+            assertEquals("test-value", new 
String(outputMetadataHeader.value(), StandardCharsets.UTF_8));
+
+            inputTopic.pipeInput("key1", "delete");
+
+            // Verify changelog has the delete record (tombstone) with header
+            final var deleteChangelogRecord = 
changelogOutputTopic.readRecord();
+            assertEquals("key1", deleteChangelogRecord.key());
+            assertNull(deleteChangelogRecord.value(), "delete() should produce 
tombstone");
+
+            // Verify the serializer's side-effect made it into the changelog
+            final Header deleteMetadataHeader = 
deleteChangelogRecord.headers().lastHeader("serializer-metadata");
+            assertNotNull(deleteMetadataHeader, "metadata header should be 
present in changelog tombstone from delete()");
+            assertEquals("test-value", new 
String(deleteMetadataHeader.value(), StandardCharsets.UTF_8));
+
+            // CRITICAL: Verify output record does NOT contain the 
serializer-added header
+            final var deleteOutputRecord = outputTopic.readRecord();
+            assertEquals("key1", deleteOutputRecord.key());
+            assertEquals("delete", deleteOutputRecord.value());
+            final Header outputDeleteMetadataHeader = 
deleteOutputRecord.headers().lastHeader("serializer-metadata");
+            assertNull(outputDeleteMetadataHeader, "Output record should not 
contain serializer-metadata header - side-effect should be isolated to 
changelog");
+
+            inputTopic.pipeInput("key2", "value2");
+            changelogOutputTopic.readRecord();
+            outputTopic.readRecord();
+
+            inputTopic.pipeInput("key2", "put(null)");
+
+            // Verify changelog has the delete record (tombstone) with header
+            final var putNullChangelogRecord = 
changelogOutputTopic.readRecord();
+            assertEquals("key2", putNullChangelogRecord.key());
+            assertNull(putNullChangelogRecord.value(), "put(null) should 
produce tombstone");
+
+            // Verify the serializer's side-effect made it into the changelog
+            final Header putNullMetadataHeader = 
putNullChangelogRecord.headers().lastHeader("serializer-metadata");
+            assertNotNull(putNullMetadataHeader, "metadata header should be 
present in changelog tombstone from put(null)");
+            assertEquals("test-value", new 
String(putNullMetadataHeader.value(), StandardCharsets.UTF_8));
+
+            // CRITICAL: Verify output record does NOT contain the 
serializer-added header
+            final var putNullOutputRecord = outputTopic.readRecord();
+            assertEquals("key2", putNullOutputRecord.key());
+            assertEquals("put(null)", putNullOutputRecord.value());
+            final Header outputPutNullMetadataHeader = 
putNullOutputRecord.headers().lastHeader("serializer-metadata");
+            assertNull(outputPutNullMetadataHeader, "Output record should not 
contain serializer-metadata header - side-effect should be isolated to 
changelog");
+
+            inputTopic.pipeInput("key3", "putIfAbsent(null)");
+
+            // Verify changelog has the delete record (tombstone) with header
+            final var putIfAbsentNullChangelogRecord = 
changelogOutputTopic.readRecord();
+            assertEquals("key3", putIfAbsentNullChangelogRecord.key());
+            assertNull(putIfAbsentNullChangelogRecord.value(), 
"putIfAbsent(null) should produce tombstone");
+
+            // Verify the serializer's side-effect made it into the changelog
+            final Header putIfAbsentNullMetadataHeader = 
putIfAbsentNullChangelogRecord.headers().lastHeader("serializer-metadata");
+            assertNotNull(putIfAbsentNullMetadataHeader, "metadata header 
should be present in changelog tombstone from putIfAbsent(null)");
+            assertEquals("test-value", new 
String(putIfAbsentNullMetadataHeader.value(), StandardCharsets.UTF_8));
+
+            // CRITICAL: Verify output record does NOT contain the 
serializer-added header
+            final var putIfAbsentNullOutputRecord = outputTopic.readRecord();
+            assertEquals("key3", putIfAbsentNullOutputRecord.key());
+            assertEquals("putIfAbsent(null)", 
putIfAbsentNullOutputRecord.value());
+            final Header outputPutIfAbsentNullMetadataHeader = 
putIfAbsentNullOutputRecord.headers().lastHeader("serializer-metadata");
+            assertNull(outputPutIfAbsentNullMetadataHeader, "Output record 
should not contain serializer-metadata header - side-effect should be isolated 
to changelog");
+
+            inputTopic.pipeInput("key4", "value4");
+            changelogOutputTopic.readRecord();
+            outputTopic.readRecord();
+
+            inputTopic.pipeInput("key4", "putAll(null)");
+
+            // Verify changelog has the delete record (tombstone) with header
+            final var putAllNullChangelogRecord = 
changelogOutputTopic.readRecord();
+            assertEquals("key4", putAllNullChangelogRecord.key());
+            assertNull(putAllNullChangelogRecord.value(), "putAll(null) should 
produce tombstone");
+
+            // Verify the serializer's side-effect made it into the changelog
+            final Header putAllNullMetadataHeader = 
putAllNullChangelogRecord.headers().lastHeader("serializer-metadata");
+            assertNotNull(putAllNullMetadataHeader, "metadata header should be 
present in changelog tombstone from putAll(null)");
+            assertEquals("test-value", new 
String(putAllNullMetadataHeader.value(), StandardCharsets.UTF_8));
+
+            // CRITICAL: Verify output record does NOT contain the 
serializer-added header
+            final var putAllNullOutputRecord = outputTopic.readRecord();
+            assertEquals("key4", putAllNullOutputRecord.key());
+            assertEquals("putAll(null)", putAllNullOutputRecord.value());
+            final Header outputPutAllNullMetadataHeader = 
putAllNullOutputRecord.headers().lastHeader("serializer-metadata");
+            assertNull(outputPutAllNullMetadataHeader, "Output record should 
not contain serializer-metadata header - side-effect should be isolated to 
changelog");
+        }
+    }
+}
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreWithHeadersSerializerSideEffectTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreWithHeadersSerializerSideEffectTest.java
new file mode 100644
index 00000000000..d3b661df17e
--- /dev/null
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreWithHeadersSerializerSideEffectTest.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.TestOutputTopic;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.processor.api.ContextualProcessor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.TimestampedWindowStoreWithHeaders;
+import org.apache.kafka.streams.state.ValueTimestampHeaders;
+import org.apache.kafka.test.TestUtils;
+
+import org.junit.jupiter.api.Test;
+
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.Properties;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+/**
+ * Test to verify that key serializers can modify headers as a side-effect,
+ * and that this side-effect makes it into the changelog topic for window 
stores.
+ *
+ * This test verifies the core assumption of the headers-aware state store 
implementation:
+ * when we create a temporary context with new headers and serialize the key, 
the key
+ * serializer will add metadata to those headers, and those headers
+ * will be used when logging the change to the changelog topic.
+ */
+public class TimestampedWindowStoreWithHeadersSerializerSideEffectTest {
+
+    private static final String STORE_NAME = "test-window-store";
+    private static final String INPUT_TOPIC = "input";
+    private static final String OUTPUT_TOPIC = "output";
+    private static final long WINDOW_SIZE_MS = 10000L;
+
+    /**
+     * Custom serializer that adds a header during serialization as a 
side-effect.
+     * This simulates real-world serializers that add metadata to headers.
+     */
+    private static class HeaderAddingSerializer implements Serializer<String> {
+        @Override
+        public byte[] serialize(final String topic, final String data) {
+            return data == null ? null : data.getBytes(StandardCharsets.UTF_8);
+        }
+
+        @Override
+        public byte[] serialize(final String topic, final Headers headers, 
final String data) {
+            // Add metadata header during serialization (side-effect)
+            headers.add("serializer-metadata", 
"window-test-value".getBytes(StandardCharsets.UTF_8));
+            return serialize(topic, data);
+        }
+    }
+
+    private static class HeaderAddingSerde implements Serde<String> {
+        @Override
+        public Serializer<String> serializer() {
+            return new HeaderAddingSerializer();
+        }
+
+        @Override
+        public Deserializer<String> deserializer() {
+            return Serdes.String().deserializer();
+        }
+    }
+
+    /**
+     * Processor that puts and deletes from a timestamped window store with 
headers.
+     * Uses command value "put(null)" to test deletion via put(key, null, 
windowStartTimestamp).
+     */
+    private static class WindowStoreProcessor extends 
ContextualProcessor<String, String, String, String> {
+        private TimestampedWindowStoreWithHeaders<String, String> store;
+
+        @Override
+        public void init(final ProcessorContext<String, String> context) {
+            super.init(context);
+            store = context.getStateStore(STORE_NAME);
+        }
+
+        @Override
+        public void process(final Record<String, String> record) {
+            final long windowStartTimestamp = record.timestamp();
+            if ("put(null)".equals(record.value())) {
+                // Delete using put(key, null, windowStartTimestamp)
+                store.put(record.key(), null, windowStartTimestamp);
+            } else {
+                // Put with timestamp and headers
+                store.put(
+                    record.key(),
+                    ValueTimestampHeaders.make(record.value(), 
record.timestamp(), record.headers()),
+                    windowStartTimestamp
+                );
+            }
+
+            context().forward(record);
+        }
+    }
+
+    @Test
+    public void shouldPropagateSerializerHeaderSideEffectToChangelog() {
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        // Create a timestamped window store with headers using our custom 
serializer
+        builder.addStateStore(
+            Stores.timestampedWindowStoreWithHeadersBuilder(
+                Stores.persistentTimestampedWindowStore(
+                    STORE_NAME,
+                    Duration.ofMillis(WINDOW_SIZE_MS),
+                    Duration.ofMillis(WINDOW_SIZE_MS),
+                    false
+                ),
+                new HeaderAddingSerde(),  // Custom key serializer that adds 
headers
+                Serdes.String()
+            )
+        );
+
+        // Add a processor that uses the store and forwards to output
+        builder.stream(INPUT_TOPIC, Consumed.with(Serdes.String(), 
Serdes.String()))
+            .process(WindowStoreProcessor::new, STORE_NAME)
+            .to(OUTPUT_TOPIC);
+
+        final Properties props = new Properties();
+        props.put("application.id", "test-window-app");
+        props.put("bootstrap.servers", "dummy:1234");
+        props.put("state.dir", TestUtils.tempDirectory().getAbsolutePath());
+        props.put("default.key.serde", Serdes.StringSerde.class);
+        props.put("default.value.serde", Serdes.StringSerde.class);
+
+        try (TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), props)) {
+            final TestInputTopic<String, String> inputTopic = 
driver.createInputTopic(
+                INPUT_TOPIC,
+                Serdes.String().serializer(),
+                Serdes.String().serializer()
+            );
+
+            final String changelogTopic = "test-window-app-" + STORE_NAME + 
"-changelog";
+            final TestOutputTopic<String, String> changelogOutputTopic =
+                driver.createOutputTopic(
+                    changelogTopic,
+                    Serdes.String().deserializer(),
+                    Serdes.String().deserializer()
+                );
+
+            final TestOutputTopic<String, String> outputTopic =
+                driver.createOutputTopic(
+                    OUTPUT_TOPIC,
+                    Serdes.String().deserializer(),
+                    Serdes.String().deserializer()
+                );
+
+            inputTopic.pipeInput("key1", "value1", 1000L);
+
+            // Verify changelog has the put record with header
+            final var putChangelogRecord = changelogOutputTopic.readRecord();
+            assertNotNull(putChangelogRecord.key());
+            assertEquals("value1", putChangelogRecord.value());
+
+            // Verify the serializer added metadata header to changelog
+            final Header putMetadataHeader = 
putChangelogRecord.headers().lastHeader("serializer-metadata");
+            assertNotNull(putMetadataHeader, "metadata header should be 
present in changelog put record");
+            assertEquals("window-test-value", new 
String(putMetadataHeader.value(), StandardCharsets.UTF_8));
+
+            final var putOutputRecord = outputTopic.readRecord();
+            assertEquals("key1", putOutputRecord.key());
+            assertEquals("value1", putOutputRecord.value());
+            final Header outputMetadataHeader = 
putOutputRecord.headers().lastHeader("serializer-metadata");
+            assertNotNull(outputMetadataHeader,
+                "Output record SHOULD contain serializer-metadata header for 
normal put operations");
+            assertEquals("window-test-value", new 
String(outputMetadataHeader.value(), StandardCharsets.UTF_8));
+
+            inputTopic.pipeInput("key1", "put(null)", 1000L);
+
+            // Verify changelog has the delete record (tombstone) with header
+            final var putNullChangelogRecord = 
changelogOutputTopic.readRecord();
+            assertNotNull(putNullChangelogRecord.key());
+            assertNull(putNullChangelogRecord.value(), "put(null) should 
produce tombstone");
+
+            // Verify the serializer's side-effect made it into the changelog
+            final Header putNullMetadataHeader = 
putNullChangelogRecord.headers().lastHeader("serializer-metadata");
+            assertNotNull(putNullMetadataHeader, "metadata header should be 
present in changelog tombstone from put(null)");
+            assertEquals("window-test-value", new 
String(putNullMetadataHeader.value(), StandardCharsets.UTF_8));
+
+            // CRITICAL: Verify output record does NOT contain the 
serializer-added header
+            final var putNullOutputRecord = outputTopic.readRecord();
+            assertEquals("key1", putNullOutputRecord.key());
+            assertEquals("put(null)", putNullOutputRecord.value());
+            final Header outputPutNullMetadataHeader = 
putNullOutputRecord.headers().lastHeader("serializer-metadata");
+            assertNull(outputPutNullMetadataHeader, "Output record should NOT 
contain serializer-metadata header - side-effect should be isolated to 
changelog");
+        }
+    }
+}

Reply via email to