This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new cae9e803439 KAFKA-20301: Upgrade downgrade integration tests for 
session store with headers (#21748)
cae9e803439 is described below

commit cae9e80343958c87df5d1486f315649f4cecb7a5
Author: Bill Bejeck <[email protected]>
AuthorDate: Sun Mar 15 00:31:56 2026 -0400

    KAFKA-20301: Upgrade downgrade integration tests for session store with 
headers (#21748)
    
    Adds an integration tests for session store migration (plain
    SessionStore  to SessionStoreWithHeaders), proxy mode, and downgrade
    scenarios.
    
    Updates changelog restoration for session stores with headers by  adding
    a new rawValueToSessionHeadersValue() converter.
    
    Reviewers: Matthias J. Sax <[email protected]>
---
 .../HeadersStoreUpgradeIntegrationTest.java        | 474 ++++++++++++++++++++-
 .../processor/internals/StateManagerUtil.java      |   5 +
 .../streams/state/internals/RecordConverters.java  |  55 +++
 3 files changed, 516 insertions(+), 18 deletions(-)

diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HeadersStoreUpgradeIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HeadersStoreUpgradeIntegrationTest.java
index ea6d141ad50..6470f3942bb 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HeadersStoreUpgradeIntegrationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HeadersStoreUpgradeIntegrationTest.java
@@ -30,14 +30,19 @@ import 
org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
 import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.internals.SessionWindow;
 import org.apache.kafka.streams.processor.api.Processor;
 import org.apache.kafka.streams.processor.api.ProcessorContext;
 import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.state.AggregationWithHeaders;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.QueryableStoreTypes;
 import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
+import org.apache.kafka.streams.state.ReadOnlySessionStore;
 import org.apache.kafka.streams.state.ReadOnlyWindowStore;
+import org.apache.kafka.streams.state.SessionStore;
+import org.apache.kafka.streams.state.SessionStoreWithHeaders;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.state.TimestampedKeyValueStore;
 import org.apache.kafka.streams.state.TimestampedKeyValueStoreWithHeaders;
@@ -55,12 +60,15 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Tag;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.time.Duration;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Properties;
+import java.util.concurrent.atomic.AtomicReference;
 
 import static java.util.Arrays.asList;
 import static java.util.Collections.singletonList;
@@ -72,9 +80,10 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
 public class HeadersStoreUpgradeIntegrationTest {
     private static final String STORE_NAME = "store";
     private static final String WINDOW_STORE_NAME = "window-store";
+    private static final String SESSION_STORE_NAME = "session-store";
     private static final long WINDOW_SIZE_MS = 1000L;
     private static final long RETENTION_MS = Duration.ofDays(1).toMillis();
-
+    private static final Logger LOG = 
LoggerFactory.getLogger(HeadersStoreUpgradeIntegrationTest.class);
     private String inputStream;
 
     private KafkaStreams kafkaStreams;
@@ -381,8 +390,7 @@ public class HeadersStoreUpgradeIntegrationTest {
                     final ValueAndTimestamp<V> result = store.get(key);
                     return result != null && result.value().equals(value) && 
result.timestamp() == timestamp;
                 } catch (final Exception swallow) {
-                    swallow.printStackTrace();
-                    System.err.println(swallow.getMessage());
+                    LOG.error("Error while checking store result", swallow);
                     return false;
                 }
             },
@@ -416,8 +424,7 @@ public class HeadersStoreUpgradeIntegrationTest {
                     final V result = store.get(key);
                     return result != null && result.equals(value);
                 } catch (final Exception swallow) {
-                    swallow.printStackTrace();
-                    System.err.println(swallow.getMessage());
+                    LOG.error("Error while verifying legacy value", swallow);
                     return false;
                 }
             },
@@ -443,8 +450,7 @@ public class HeadersStoreUpgradeIntegrationTest {
                     final ValueAndTimestamp<V> result = store.get(key);
                     return result != null && result.value().equals(value) && 
result.timestamp() == timestamp;
                 } catch (final Exception swallow) {
-                    swallow.printStackTrace();
-                    System.err.println(swallow.getMessage());
+                    LOG.error("Error while waiting for expected result", 
swallow);
                     return false;
                 }
             },
@@ -484,8 +490,7 @@ public class HeadersStoreUpgradeIntegrationTest {
                         && result.timestamp() == timestamp
                         && result.headers().equals(expectedHeaders);
                 } catch (final Exception swallow) {
-                    swallow.printStackTrace();
-                    System.err.println(swallow.getMessage());
+                    LOG.error("Failed to retrieve expected result", swallow);
                     return false;
                 }
             },
@@ -526,8 +531,7 @@ public class HeadersStoreUpgradeIntegrationTest {
                         && result.timestamp() == expectedTimestamp
                         && result.headers().equals(expectedHeaders);
                 } catch (final Exception swallow) {
-                    swallow.printStackTrace();
-                    System.err.println(swallow.getMessage());
+                    LOG.error("Failed to retrieve expected result", swallow);
                     return false;
                 }
             },
@@ -553,8 +557,7 @@ public class HeadersStoreUpgradeIntegrationTest {
                         && result.timestamp() == timestamp
                         && result.headers().toArray().length == 0;
                 } catch (final Exception swallow) {
-                    swallow.printStackTrace();
-                    System.err.println(swallow.getMessage());
+                    LOG.error("Failed to retrieve expected result", swallow);
                     return false;
                 }
             },
@@ -578,8 +581,7 @@ public class HeadersStoreUpgradeIntegrationTest {
                         && result.value().equals(value)
                         && result.headers().toArray().length == 0;
                 } catch (final Exception swallow) {
-                    swallow.printStackTrace();
-                    System.err.println(swallow.getMessage());
+                    LOG.error("Error while verifying legacy value with empty 
headers", swallow);
                     return false;
                 }
             },
@@ -937,7 +939,7 @@ public class HeadersStoreUpgradeIntegrationTest {
 
                 return true;
             } catch (final Exception e) {
-                e.printStackTrace();
+                LOG.error("Error while verifying plain window value with empty 
headers and timestamp", e);
                 return false;
             }
         }, 60_000L, "Could not verify plain window value with empty headers 
and timestamp in time.");
@@ -1075,7 +1077,7 @@ public class HeadersStoreUpgradeIntegrationTest {
                     && result.timestamp() == timestamp
                     && result.headers().equals(expectedHeaders);
             } catch (final Exception e) {
-                e.printStackTrace();
+                LOG.error("Error while verifying windowed value with headers", 
e);
                 return false;
             }
         }, 60_000L, "Could not verify windowed value with headers in time.");
@@ -1120,7 +1122,7 @@ public class HeadersStoreUpgradeIntegrationTest {
 
                 return true;
             } catch (final Exception e) {
-                e.printStackTrace();
+                LOG.error("Error while verifying legacy value with empty 
headers", e);
                 return false;
             }
         }, 60_000L, "Could not verify legacy value with empty headers in 
time.");
@@ -1599,4 +1601,440 @@ public class HeadersStoreUpgradeIntegrationTest {
 
         kafkaStreams.close();
     }
+
+    // ==================== Session Store Tests ====================
+
+    @Test
+    public void 
shouldMigratePersistentSessionStoreToSessionStoreWithHeadersUsingPapi() throws 
Exception {
+        shouldMigrateSessionStoreToSessionStoreWithHeaders(true);
+    }
+
+    @Test
+    public void 
shouldMigrateInMemorySessionStoreToSessionStoreWithHeadersUsingPapi() throws 
Exception {
+        shouldMigrateSessionStoreToSessionStoreWithHeaders(false);
+    }
+
+    private void shouldMigrateSessionStoreToSessionStoreWithHeaders(final 
boolean isPersistent) throws Exception {
+        // Phase 1: Run with plain SessionStore
+        final StreamsBuilder oldBuilder = new StreamsBuilder();
+        oldBuilder.addStateStore(
+                Stores.sessionStoreBuilder(
+                    isPersistent ? 
Stores.persistentSessionStore(SESSION_STORE_NAME, 
Duration.ofMillis(RETENTION_MS)) :
+                        Stores.inMemorySessionStore(SESSION_STORE_NAME, 
Duration.ofMillis(RETENTION_MS)),
+                    Serdes.String(),
+                    Serdes.String()))
+            .stream(inputStream, Consumed.with(Serdes.String(), 
Serdes.String()))
+            .process(SessionProcessor::new, SESSION_STORE_NAME);
+
+        final Properties props = props();
+        kafkaStreams = new KafkaStreams(oldBuilder.build(), props);
+        IntegrationTestUtils.startApplicationAndWaitUntilRunning(kafkaStreams);
+
+        final long baseTime = CLUSTER.time.milliseconds();
+        processSessionKeyValueAndVerify("key1", "value1", baseTime + 100);
+        processSessionKeyValueAndVerify("key2", "value2", baseTime + 200);
+        processSessionKeyValueAndVerify("key3", "value3", baseTime + 300);
+
+        kafkaStreams.close(Duration.ofSeconds(5L));
+        kafkaStreams = null;
+
+        // Phase 2: Restart with SessionStoreWithHeaders (headers-aware 
supplier)
+        final StreamsBuilder newBuilder = new StreamsBuilder();
+        final AtomicReference<SessionWithHeadersProcessor> processorRef = new 
AtomicReference<>();
+        newBuilder.addStateStore(
+                Stores.sessionStoreBuilderWithHeaders(
+                    isPersistent ? 
Stores.persistentSessionStoreWithHeaders(SESSION_STORE_NAME, 
Duration.ofMillis(RETENTION_MS)) :
+                        Stores.inMemorySessionStore(SESSION_STORE_NAME, 
Duration.ofMillis(RETENTION_MS)),
+                    Serdes.String(),
+                    Serdes.String()))
+            .stream(inputStream, Consumed.with(Serdes.String(), 
Serdes.String()))
+            .process(() -> {
+                final SessionWithHeadersProcessor sessionStore = new 
SessionWithHeadersProcessor();
+                processorRef.set(sessionStore);
+                return sessionStore;
+            }, SESSION_STORE_NAME);
+
+        kafkaStreams = new KafkaStreams(newBuilder.build(), props);
+        IntegrationTestUtils.startApplicationAndWaitUntilRunning(kafkaStreams);
+
+        // Verify legacy data can be read with empty headers
+        verifySessionValueWithEmptyHeaders("key1", "value1", baseTime + 100, 
processorRef);
+        verifySessionValueWithEmptyHeaders("key2", "value2", baseTime + 200, 
processorRef);
+        verifySessionValueWithEmptyHeaders("key3", "value3", baseTime + 300, 
processorRef);
+
+        // Process new records with headers
+        final Headers headers = new RecordHeaders();
+        headers.add("source", "migration-test".getBytes());
+
+        processSessionKeyValueWithHeadersAndVerify("key4", "value4", baseTime 
+ 400, headers, headers, processorRef);
+        processSessionKeyValueWithHeadersAndVerify("key5", "value5", baseTime 
+ 500, headers, headers, processorRef);
+
+        kafkaStreams.close();
+    }
+
+    @Test
+    public void shouldProxySessionStoreToSessionStoreWithHeaders() throws 
Exception {
+        // Phase 1: Run with plain SessionStore
+        final StreamsBuilder oldBuilder = new StreamsBuilder();
+        oldBuilder.addStateStore(
+                Stores.sessionStoreBuilder(
+                    Stores.persistentSessionStore(SESSION_STORE_NAME, 
Duration.ofMillis(RETENTION_MS)),
+                    Serdes.String(),
+                    Serdes.String()))
+            .stream(inputStream, Consumed.with(Serdes.String(), 
Serdes.String()))
+            .process(SessionProcessor::new, SESSION_STORE_NAME);
+
+        final Properties props = props();
+        kafkaStreams = new KafkaStreams(oldBuilder.build(), props);
+        kafkaStreams.start();
+
+        final long baseTime = CLUSTER.time.milliseconds();
+        processSessionKeyValueAndVerify("key1", "value1", baseTime + 100);
+        processSessionKeyValueAndVerify("key2", "value2", baseTime + 200);
+        processSessionKeyValueAndVerify("key3", "value3", baseTime + 300);
+
+        kafkaStreams.close();
+        kafkaStreams = null;
+
+        // Phase 2: Restart with headers-aware builder but non-headers 
supplier (proxy/adapter mode)
+        final StreamsBuilder newBuilder = new StreamsBuilder();
+        final AtomicReference<SessionWithHeadersProcessor> processorRef = new 
AtomicReference<>();
+        newBuilder.addStateStore(
+                Stores.sessionStoreBuilderWithHeaders(
+                    Stores.persistentSessionStore(SESSION_STORE_NAME, 
Duration.ofMillis(RETENTION_MS)),  // non-headers supplier!
+                    Serdes.String(),
+                    Serdes.String()))
+            .stream(inputStream, Consumed.with(Serdes.String(), 
Serdes.String()))
+            .process(() -> {
+                final SessionWithHeadersProcessor p = new 
SessionWithHeadersProcessor();
+                processorRef.set(p);
+                return p;
+            }, SESSION_STORE_NAME);
+
+        kafkaStreams = new KafkaStreams(newBuilder.build(), props);
+        IntegrationTestUtils.startApplicationAndWaitUntilRunning(kafkaStreams);
+
+        // Verify legacy data can be read with empty headers
+        verifySessionValueWithEmptyHeaders("key1", "value1", baseTime + 100, 
processorRef);
+        verifySessionValueWithEmptyHeaders("key2", "value2", baseTime + 200, 
processorRef);
+        verifySessionValueWithEmptyHeaders("key3", "value3", baseTime + 300, 
processorRef);
+
+        // In proxy mode, headers are stripped when writing to non-headers 
store
+        // So we expect empty headers when reading back
+        final RecordHeaders headers = new RecordHeaders();
+        headers.add("source", "proxy-test".getBytes());
+        final Headers expectedHeaders = new RecordHeaders();
+
+        processSessionKeyValueWithHeadersAndVerify("key4", "value4", baseTime 
+ 400, headers, expectedHeaders, processorRef);
+        processSessionKeyValueWithHeadersAndVerify("key5", "value5", baseTime 
+ 500, headers, expectedHeaders, processorRef);
+
+        kafkaStreams.close();
+    }
+
+    @Test
+    public void shouldFailDowngradeFromSessionStoreWithHeadersToSessionStore() 
throws Exception {
+        final Properties props = props();
+        setupAndPopulateSessionStoreWithHeaders(props);
+        kafkaStreams = null;
+
+        // Attempt to downgrade to plain session store
+        final StreamsBuilder downgradedBuilder = new StreamsBuilder();
+        downgradedBuilder.addStateStore(
+                Stores.sessionStoreBuilder(
+                    Stores.persistentSessionStore(SESSION_STORE_NAME, 
Duration.ofMillis(RETENTION_MS)),
+                    Serdes.String(),
+                    Serdes.String()))
+            .stream(inputStream, Consumed.with(Serdes.String(), 
Serdes.String()))
+            .process(SessionProcessor::new, SESSION_STORE_NAME);
+
+        kafkaStreams = new KafkaStreams(downgradedBuilder.build(), props);
+
+        boolean exceptionThrown = false;
+        try {
+            kafkaStreams.start();
+        } catch (final Exception e) {
+            Throwable cause = e;
+            while (cause != null) {
+                if (cause instanceof ProcessorStateException &&
+                    cause.getMessage() != null &&
+                    cause.getMessage().contains("incompatible settings")) {
+                    exceptionThrown = true;
+                    break;
+                }
+                cause = cause.getCause();
+            }
+
+            if (!exceptionThrown) {
+                throw new AssertionError("Expected ProcessorStateException 
about incompatible settings, but got: " + e.getMessage(), e);
+            }
+        } finally {
+            kafkaStreams.close(Duration.ofSeconds(30L));
+        }
+
+        if (!exceptionThrown) {
+            throw new AssertionError("Expected ProcessorStateException to be 
thrown when attempting to downgrade from headers-aware to plain session store");
+        }
+    }
+
+    @Test
+    public void 
shouldSuccessfullyDowngradeFromSessionStoreWithHeadersToSessionStoreAfterCleanup()
 throws Exception {
+        final Properties props = props();
+        setupAndPopulateSessionStoreWithHeaders(props);
+
+        kafkaStreams.cleanUp(); // Delete local state
+        kafkaStreams = null;
+
+        final StreamsBuilder downgradedBuilder = new StreamsBuilder();
+        downgradedBuilder.addStateStore(
+                Stores.sessionStoreBuilder(
+                    Stores.persistentSessionStore(SESSION_STORE_NAME, 
Duration.ofMillis(RETENTION_MS)),
+                    Serdes.String(),
+                    Serdes.String()))
+            .stream(inputStream, Consumed.with(Serdes.String(), 
Serdes.String()))
+            .process(SessionProcessor::new, SESSION_STORE_NAME);
+
+        kafkaStreams = new KafkaStreams(downgradedBuilder.build(), props);
+        kafkaStreams.start();
+
+        final long newTime = CLUSTER.time.milliseconds();
+        processSessionKeyValueAndVerify("key3", "value3", newTime + 300);
+        processSessionKeyValueAndVerify("key4", "value4", newTime + 400);
+
+        kafkaStreams.close();
+    }
+
+    // ==================== Session Store Helper Methods ====================
+
+    private void processSessionKeyValueAndVerify(final String key,
+                                                  final String value,
+                                                  final long timestamp) throws 
Exception {
+        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+            inputStream,
+            singletonList(KeyValue.pair(key, value)),
+            TestUtils.producerConfig(CLUSTER.bootstrapServers(),
+                StringSerializer.class,
+                StringSerializer.class),
+            timestamp,
+            false);
+
+        TestUtils.waitForCondition(() -> {
+            try {
+                final ReadOnlySessionStore<String, String> store =
+                    IntegrationTestUtils.getStore(SESSION_STORE_NAME, 
kafkaStreams, QueryableStoreTypes.sessionStore());
+
+                if (store == null) {
+                    return false;
+                }
+
+                try (final KeyValueIterator<Windowed<String>, String> iterator 
= store.fetch(key)) {
+                    while (iterator.hasNext()) {
+                        final KeyValue<Windowed<String>, String> kv = 
iterator.next();
+                        if (kv.key.key().equals(key) && 
kv.value.equals(value)) {
+                            return true;
+                        }
+                    }
+                }
+                return false;
+            } catch (final Exception e) {
+                return false;
+            }
+        }, 60_000L, "Could not verify session value in time.");
+    }
+
+    private void verifySessionValueWithEmptyHeaders(final String key,
+                                                    final String value,
+                                                    final long timestamp,
+                                                    final 
AtomicReference<SessionWithHeadersProcessor> processorRef) throws Exception {
+        TestUtils.waitForCondition(() -> {
+            try {
+                if (processorRef.get() == null) {
+                    return false;
+                }
+                final ReadOnlySessionStore<String, 
AggregationWithHeaders<String>> store = processorRef.get().store();
+                if (store == null) {
+                    return false;
+                }
+
+                try (final KeyValueIterator<Windowed<String>, 
AggregationWithHeaders<String>> iterator = store.fetch(key)) {
+                    while (iterator.hasNext()) {
+                        final KeyValue<Windowed<String>, 
AggregationWithHeaders<String>> kv = iterator.next();
+                        if (kv.key.key().equals(key)
+                            && kv.key.window().start() == timestamp
+                            && kv.key.window().end() == timestamp) {
+
+                            final AggregationWithHeaders<String> result = 
kv.value;
+                            assertNotNull(result, "Result should not be null");
+                            assertEquals(value, result.aggregation(), "Value 
should match");
+
+                            // Verify headers exist but are empty (migrated 
from plain session store)
+                            assertNotNull(result.headers(), "Headers should 
not be null for migrated data");
+                            assertEquals(0, result.headers().toArray().length, 
"Headers should be empty for migrated data");
+
+                            return true;
+                        }
+                    }
+                }
+                return false;
+            } catch (final Exception e) {
+                LOG.error("Error verifying legacy session value with empty 
headers", e);
+                return false;
+            }
+        }, 60_000L, "Could not verify legacy session value with empty headers 
in time.");
+    }
+
+    private void processSessionKeyValueWithHeadersAndVerify(final String key,
+                                                            final String value,
+                                                            final long 
timestamp,
+                                                            final Headers 
headers,
+                                                            final Headers 
expectedHeaders,
+                                                            final 
AtomicReference<SessionWithHeadersProcessor> processorRef) throws Exception {
+        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+            inputStream,
+            singletonList(KeyValue.pair(key, value)),
+            TestUtils.producerConfig(CLUSTER.bootstrapServers(),
+                StringSerializer.class,
+                StringSerializer.class),
+            headers,
+            timestamp,
+            false);
+
+        TestUtils.waitForCondition(() -> {
+            try {
+                if (processorRef.get() == null) {
+                    return false;
+                }
+                final ReadOnlySessionStore<String, 
AggregationWithHeaders<String>> store = processorRef.get().store();
+
+                if (store == null) {
+                    return false;
+                }
+
+                try (final KeyValueIterator<Windowed<String>, 
AggregationWithHeaders<String>> iterator = store.fetch(key)) {
+                    while (iterator.hasNext()) {
+                        final KeyValue<Windowed<String>, 
AggregationWithHeaders<String>> kv = iterator.next();
+                        if (kv.key.key().equals(key)
+                            && kv.key.window().start() == timestamp
+                            && kv.key.window().end() == timestamp) {
+
+                            final AggregationWithHeaders<String> result = 
kv.value;
+                            return result != null
+                                && result.aggregation().equals(value)
+                                && result.headers().equals(expectedHeaders);
+                        }
+                    }
+                }
+                return false;
+            } catch (final Exception e) {
+                LOG.error("Error verifying session value with headers", e);
+                return false;
+            }
+        }, 60_000L, "Could not verify session value with headers in time.");
+    }
+
+    private boolean sessionStoreContainsKey(final String key,
+                                            final long timestamp,
+                                            final 
AtomicReference<SessionWithHeadersProcessor> processorRef) {
+        try {
+            if (processorRef.get() == null) {
+                return false;
+            }
+            final SessionStoreWithHeaders<String, String> store = 
processorRef.get().store();
+            if (store == null) {
+                return false;
+            }
+
+            try (final KeyValueIterator<Windowed<String>, 
AggregationWithHeaders<String>> iterator = store.fetch(key)) {
+                while (iterator.hasNext()) {
+                    final KeyValue<Windowed<String>, 
AggregationWithHeaders<String>> kv = iterator.next();
+                    if (kv.key.key().equals(key) && kv.key.window().start() == 
timestamp) {
+                        return true;
+                    }
+                }
+            }
+            return false;
+        } catch (final Exception e) {
+            return false;
+        }
+    }
+
+    private void setupAndPopulateSessionStoreWithHeaders(final Properties 
props) throws Exception {
+        final StreamsBuilder headersBuilder = new StreamsBuilder();
+        final AtomicReference<SessionWithHeadersProcessor> processorRef = new 
AtomicReference<>();
+        headersBuilder.addStateStore(
+                Stores.sessionStoreBuilderWithHeaders(
+                    
Stores.persistentSessionStoreWithHeaders(SESSION_STORE_NAME, 
Duration.ofMillis(RETENTION_MS)),
+                    Serdes.String(),
+                    Serdes.String()))
+            .stream(inputStream, Consumed.with(Serdes.String(), 
Serdes.String()))
+            .process(() -> {
+                final SessionWithHeadersProcessor p = new 
SessionWithHeadersProcessor();
+                processorRef.set(p);
+                return p;
+            }, SESSION_STORE_NAME);
+
+        kafkaStreams = new KafkaStreams(headersBuilder.build(), props);
+        IntegrationTestUtils.startApplicationAndWaitUntilRunning(kafkaStreams);
+
+        final long baseTime = CLUSTER.time.milliseconds();
+        final Headers headers = new RecordHeaders();
+        headers.add("source", "test".getBytes());
+
+        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+            inputStream,
+            singletonList(KeyValue.pair("key1", "value1")),
+            TestUtils.producerConfig(CLUSTER.bootstrapServers(), 
StringSerializer.class, StringSerializer.class),
+            headers,
+            baseTime + 100,
+            false);
+
+        TestUtils.waitForCondition(
+            () -> sessionStoreContainsKey("key1", baseTime + 100, 
processorRef),
+            30_000L,
+            "Store was not populated with expected data"
+        );
+
+        kafkaStreams.close();
+    }
+
+    // ==================== Session Store Processors ====================
+
+    private static class SessionProcessor implements Processor<String, String, 
Void, Void> {
+        private SessionStore<String, String> store;
+
+        public SessionStore<String, String> store() {
+            return store;
+        }
+
+        @Override
+        public void init(final ProcessorContext<Void, Void> context) {
+            store = context.getStateStore(SESSION_STORE_NAME);
+        }
+
+        @Override
+        public void process(final Record<String, String> record) {
+            final Windowed<String> sessionKey = new Windowed<>(record.key(),
+                new SessionWindow(record.timestamp(), record.timestamp()));
+            store.put(sessionKey, record.value());
+        }
+    }
+
+    private static class SessionWithHeadersProcessor implements 
Processor<String, String, Void, Void> {
+        private SessionStoreWithHeaders<String, String> store;
+
+        public SessionStoreWithHeaders<String, String> store() {
+            return store;
+        }
+
+        @Override
+        public void init(final ProcessorContext<Void, Void> context) {
+            store = context.getStateStore(SESSION_STORE_NAME);
+        }
+
+        @Override
+        public void process(final Record<String, String> record) {
+            final Windowed<String> sessionKey = new Windowed<>(record.key(),
+                new SessionWindow(record.timestamp(), record.timestamp()));
+            store.put(sessionKey, AggregationWithHeaders.make(record.value(), 
record.headers()));
+        }
+    }
 }
\ No newline at end of file
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java
index e2b92352bf2..b5e73567330 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java
@@ -26,6 +26,7 @@ import org.apache.kafka.streams.errors.TaskIdFormatException;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.Task.TaskType;
+import org.apache.kafka.streams.state.SessionStore;
 import org.apache.kafka.streams.state.internals.RecordConverter;
 
 import org.slf4j.Logger;
@@ -36,6 +37,7 @@ import java.util.concurrent.atomic.AtomicReference;
 
 import static 
org.apache.kafka.streams.state.internals.RecordConverters.identity;
 import static 
org.apache.kafka.streams.state.internals.RecordConverters.rawValueToHeadersValue;
+import static 
org.apache.kafka.streams.state.internals.RecordConverters.rawValueToSessionHeadersValue;
 import static 
org.apache.kafka.streams.state.internals.RecordConverters.rawValueToTimestampedValue;
 import static 
org.apache.kafka.streams.state.internals.WrappedStateStore.isHeadersAware;
 import static 
org.apache.kafka.streams.state.internals.WrappedStateStore.isTimestamped;
@@ -53,6 +55,9 @@ final class StateManagerUtil {
 
     static RecordConverter converterForStore(final StateStore store) {
         if (isHeadersAware(store)) {
+            if (store instanceof SessionStore) {
+                return rawValueToSessionHeadersValue();
+            }
             return rawValueToHeadersValue();
         } else if (isTimestamped(store) && !isVersioned(store)) {
             // should not prepend timestamp when restoring records for 
versioned store, as
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RecordConverters.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RecordConverters.java
index 8eb7be4f5de..118bc019b5f 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RecordConverters.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RecordConverters.java
@@ -82,6 +82,34 @@ public final class RecordConverters {
         return RAW_TO_WITH_HEADERS_INSTANCE;
     }
 
+    private static final RecordConverter RAW_TO_SESSION_WITH_HEADERS_INSTANCE 
= record -> {
+        final byte[] rawValue = record.value();
+
+        // Format: [headersSize(varint)][headersBytes][aggregation] (no 
timestamp)
+        final byte[] recordValue = reconstructSessionFromRaw(
+            rawValue,
+            record.headers()
+        );
+
+        return new ConsumerRecord<>(
+            record.topic(),
+            record.partition(),
+            record.offset(),
+            record.timestamp(),
+            record.timestampType(),
+            record.serializedKeySize(),
+            record.serializedValueSize(),
+            record.key(),
+            recordValue,
+            record.headers(),
+            record.leaderEpoch()
+        );
+    };
+
+    public static RecordConverter rawValueToSessionHeadersValue() {
+        return RAW_TO_SESSION_WITH_HEADERS_INSTANCE;
+    }
+
     // privatize the constructor so the class cannot be instantiated (only 
used for its static members)
     private RecordConverters() {}
 
@@ -93,6 +121,33 @@ public final class RecordConverters {
         return IDENTITY_INSTANCE;
     }
 
+    /**
+     * Reconstructs the AggregationWithHeaders format from raw value bytes and 
headers (no timestamp).
+     * Used during state restoration from changelog topics for session stores.
+     *
+     * @param rawValue the raw aggregation bytes
+     * @param headers the headers
+     * @return the serialized AggregationWithHeaders format
+     */
+    static byte[] reconstructSessionFromRaw(final byte[] rawValue, final 
Headers headers) {
+        if (rawValue == null) {
+            return null;
+        }
+        final byte[] rawHeaders = HeadersSerializer.serialize(headers);
+
+        try (final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+             final DataOutputStream out = new DataOutputStream(baos)) {
+
+            ByteUtils.writeVarint(rawHeaders.length, out);
+            out.write(rawHeaders);
+            out.write(rawValue);
+
+            return baos.toByteArray();
+        } catch (final IOException e) {
+            throw new SerializationException("Failed to reconstruct 
AggregationWithHeaders", e);
+        }
+    }
+
     /**
      * Reconstructs the ValueTimestampHeaders format from raw value bytes, 
timestamp, and headers.
      * Used during state restoration from changelog topics.


Reply via email to