This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new cae9e803439 KAFKA-20301: Upgrade downgrade integration tests for
session store with headers (#21748)
cae9e803439 is described below
commit cae9e80343958c87df5d1486f315649f4cecb7a5
Author: Bill Bejeck <[email protected]>
AuthorDate: Sun Mar 15 00:31:56 2026 -0400
KAFKA-20301: Upgrade downgrade integration tests for session store with
headers (#21748)
Adds an integration tests for session store migration (plain
SessionStore to SessionStoreWithHeaders), proxy mode, and downgrade
scenarios.
Updates changelog restoration for session stores with headers by adding
a new rawValueToSessionHeadersValue() converter.
Reviewers: Matthias J. Sax <[email protected]>
---
.../HeadersStoreUpgradeIntegrationTest.java | 474 ++++++++++++++++++++-
.../processor/internals/StateManagerUtil.java | 5 +
.../streams/state/internals/RecordConverters.java | 55 +++
3 files changed, 516 insertions(+), 18 deletions(-)
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HeadersStoreUpgradeIntegrationTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HeadersStoreUpgradeIntegrationTest.java
index ea6d141ad50..6470f3942bb 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HeadersStoreUpgradeIntegrationTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HeadersStoreUpgradeIntegrationTest.java
@@ -30,14 +30,19 @@ import
org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.internals.SessionWindow;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.state.AggregationWithHeaders;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
+import org.apache.kafka.streams.state.ReadOnlySessionStore;
import org.apache.kafka.streams.state.ReadOnlyWindowStore;
+import org.apache.kafka.streams.state.SessionStore;
+import org.apache.kafka.streams.state.SessionStoreWithHeaders;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.TimestampedKeyValueStoreWithHeaders;
@@ -55,12 +60,15 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.time.Duration;
import java.util.LinkedList;
import java.util.List;
import java.util.Properties;
+import java.util.concurrent.atomic.AtomicReference;
import static java.util.Arrays.asList;
import static java.util.Collections.singletonList;
@@ -72,9 +80,10 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
public class HeadersStoreUpgradeIntegrationTest {
private static final String STORE_NAME = "store";
private static final String WINDOW_STORE_NAME = "window-store";
+ private static final String SESSION_STORE_NAME = "session-store";
private static final long WINDOW_SIZE_MS = 1000L;
private static final long RETENTION_MS = Duration.ofDays(1).toMillis();
-
+ private static final Logger LOG =
LoggerFactory.getLogger(HeadersStoreUpgradeIntegrationTest.class);
private String inputStream;
private KafkaStreams kafkaStreams;
@@ -381,8 +390,7 @@ public class HeadersStoreUpgradeIntegrationTest {
final ValueAndTimestamp<V> result = store.get(key);
return result != null && result.value().equals(value) &&
result.timestamp() == timestamp;
} catch (final Exception swallow) {
- swallow.printStackTrace();
- System.err.println(swallow.getMessage());
+ LOG.error("Error while checking store result", swallow);
return false;
}
},
@@ -416,8 +424,7 @@ public class HeadersStoreUpgradeIntegrationTest {
final V result = store.get(key);
return result != null && result.equals(value);
} catch (final Exception swallow) {
- swallow.printStackTrace();
- System.err.println(swallow.getMessage());
+ LOG.error("Error while verifying legacy value", swallow);
return false;
}
},
@@ -443,8 +450,7 @@ public class HeadersStoreUpgradeIntegrationTest {
final ValueAndTimestamp<V> result = store.get(key);
return result != null && result.value().equals(value) &&
result.timestamp() == timestamp;
} catch (final Exception swallow) {
- swallow.printStackTrace();
- System.err.println(swallow.getMessage());
+ LOG.error("Error while waiting for expected result",
swallow);
return false;
}
},
@@ -484,8 +490,7 @@ public class HeadersStoreUpgradeIntegrationTest {
&& result.timestamp() == timestamp
&& result.headers().equals(expectedHeaders);
} catch (final Exception swallow) {
- swallow.printStackTrace();
- System.err.println(swallow.getMessage());
+ LOG.error("Failed to retrieve expected result", swallow);
return false;
}
},
@@ -526,8 +531,7 @@ public class HeadersStoreUpgradeIntegrationTest {
&& result.timestamp() == expectedTimestamp
&& result.headers().equals(expectedHeaders);
} catch (final Exception swallow) {
- swallow.printStackTrace();
- System.err.println(swallow.getMessage());
+ LOG.error("Failed to retrieve expected result", swallow);
return false;
}
},
@@ -553,8 +557,7 @@ public class HeadersStoreUpgradeIntegrationTest {
&& result.timestamp() == timestamp
&& result.headers().toArray().length == 0;
} catch (final Exception swallow) {
- swallow.printStackTrace();
- System.err.println(swallow.getMessage());
+ LOG.error("Failed to retrieve expected result", swallow);
return false;
}
},
@@ -578,8 +581,7 @@ public class HeadersStoreUpgradeIntegrationTest {
&& result.value().equals(value)
&& result.headers().toArray().length == 0;
} catch (final Exception swallow) {
- swallow.printStackTrace();
- System.err.println(swallow.getMessage());
+ LOG.error("Error while verifying legacy value with empty
headers", swallow);
return false;
}
},
@@ -937,7 +939,7 @@ public class HeadersStoreUpgradeIntegrationTest {
return true;
} catch (final Exception e) {
- e.printStackTrace();
+ LOG.error("Error while verifying plain window value with empty
headers and timestamp", e);
return false;
}
}, 60_000L, "Could not verify plain window value with empty headers
and timestamp in time.");
@@ -1075,7 +1077,7 @@ public class HeadersStoreUpgradeIntegrationTest {
&& result.timestamp() == timestamp
&& result.headers().equals(expectedHeaders);
} catch (final Exception e) {
- e.printStackTrace();
+ LOG.error("Error while verifying windowed value with headers",
e);
return false;
}
}, 60_000L, "Could not verify windowed value with headers in time.");
@@ -1120,7 +1122,7 @@ public class HeadersStoreUpgradeIntegrationTest {
return true;
} catch (final Exception e) {
- e.printStackTrace();
+ LOG.error("Error while verifying legacy value with empty
headers", e);
return false;
}
}, 60_000L, "Could not verify legacy value with empty headers in
time.");
@@ -1599,4 +1601,440 @@ public class HeadersStoreUpgradeIntegrationTest {
kafkaStreams.close();
}
+
+ // ==================== Session Store Tests ====================
+
+ @Test
+ public void
shouldMigratePersistentSessionStoreToSessionStoreWithHeadersUsingPapi() throws
Exception {
+ shouldMigrateSessionStoreToSessionStoreWithHeaders(true);
+ }
+
+ @Test
+ public void
shouldMigrateInMemorySessionStoreToSessionStoreWithHeadersUsingPapi() throws
Exception {
+ shouldMigrateSessionStoreToSessionStoreWithHeaders(false);
+ }
+
+ private void shouldMigrateSessionStoreToSessionStoreWithHeaders(final
boolean isPersistent) throws Exception {
+ // Phase 1: Run with plain SessionStore
+ final StreamsBuilder oldBuilder = new StreamsBuilder();
+ oldBuilder.addStateStore(
+ Stores.sessionStoreBuilder(
+ isPersistent ?
Stores.persistentSessionStore(SESSION_STORE_NAME,
Duration.ofMillis(RETENTION_MS)) :
+ Stores.inMemorySessionStore(SESSION_STORE_NAME,
Duration.ofMillis(RETENTION_MS)),
+ Serdes.String(),
+ Serdes.String()))
+ .stream(inputStream, Consumed.with(Serdes.String(),
Serdes.String()))
+ .process(SessionProcessor::new, SESSION_STORE_NAME);
+
+ final Properties props = props();
+ kafkaStreams = new KafkaStreams(oldBuilder.build(), props);
+ IntegrationTestUtils.startApplicationAndWaitUntilRunning(kafkaStreams);
+
+ final long baseTime = CLUSTER.time.milliseconds();
+ processSessionKeyValueAndVerify("key1", "value1", baseTime + 100);
+ processSessionKeyValueAndVerify("key2", "value2", baseTime + 200);
+ processSessionKeyValueAndVerify("key3", "value3", baseTime + 300);
+
+ kafkaStreams.close(Duration.ofSeconds(5L));
+ kafkaStreams = null;
+
+ // Phase 2: Restart with SessionStoreWithHeaders (headers-aware
supplier)
+ final StreamsBuilder newBuilder = new StreamsBuilder();
+ final AtomicReference<SessionWithHeadersProcessor> processorRef = new
AtomicReference<>();
+ newBuilder.addStateStore(
+ Stores.sessionStoreBuilderWithHeaders(
+ isPersistent ?
Stores.persistentSessionStoreWithHeaders(SESSION_STORE_NAME,
Duration.ofMillis(RETENTION_MS)) :
+ Stores.inMemorySessionStore(SESSION_STORE_NAME,
Duration.ofMillis(RETENTION_MS)),
+ Serdes.String(),
+ Serdes.String()))
+ .stream(inputStream, Consumed.with(Serdes.String(),
Serdes.String()))
+ .process(() -> {
+ final SessionWithHeadersProcessor sessionStore = new
SessionWithHeadersProcessor();
+ processorRef.set(sessionStore);
+ return sessionStore;
+ }, SESSION_STORE_NAME);
+
+ kafkaStreams = new KafkaStreams(newBuilder.build(), props);
+ IntegrationTestUtils.startApplicationAndWaitUntilRunning(kafkaStreams);
+
+ // Verify legacy data can be read with empty headers
+ verifySessionValueWithEmptyHeaders("key1", "value1", baseTime + 100,
processorRef);
+ verifySessionValueWithEmptyHeaders("key2", "value2", baseTime + 200,
processorRef);
+ verifySessionValueWithEmptyHeaders("key3", "value3", baseTime + 300,
processorRef);
+
+ // Process new records with headers
+ final Headers headers = new RecordHeaders();
+ headers.add("source", "migration-test".getBytes());
+
+ processSessionKeyValueWithHeadersAndVerify("key4", "value4", baseTime
+ 400, headers, headers, processorRef);
+ processSessionKeyValueWithHeadersAndVerify("key5", "value5", baseTime
+ 500, headers, headers, processorRef);
+
+ kafkaStreams.close();
+ }
+
+ @Test
+ public void shouldProxySessionStoreToSessionStoreWithHeaders() throws
Exception {
+ // Phase 1: Run with plain SessionStore
+ final StreamsBuilder oldBuilder = new StreamsBuilder();
+ oldBuilder.addStateStore(
+ Stores.sessionStoreBuilder(
+ Stores.persistentSessionStore(SESSION_STORE_NAME,
Duration.ofMillis(RETENTION_MS)),
+ Serdes.String(),
+ Serdes.String()))
+ .stream(inputStream, Consumed.with(Serdes.String(),
Serdes.String()))
+ .process(SessionProcessor::new, SESSION_STORE_NAME);
+
+ final Properties props = props();
+ kafkaStreams = new KafkaStreams(oldBuilder.build(), props);
+ kafkaStreams.start();
+
+ final long baseTime = CLUSTER.time.milliseconds();
+ processSessionKeyValueAndVerify("key1", "value1", baseTime + 100);
+ processSessionKeyValueAndVerify("key2", "value2", baseTime + 200);
+ processSessionKeyValueAndVerify("key3", "value3", baseTime + 300);
+
+ kafkaStreams.close();
+ kafkaStreams = null;
+
+ // Phase 2: Restart with headers-aware builder but non-headers
supplier (proxy/adapter mode)
+ final StreamsBuilder newBuilder = new StreamsBuilder();
+ final AtomicReference<SessionWithHeadersProcessor> processorRef = new
AtomicReference<>();
+ newBuilder.addStateStore(
+ Stores.sessionStoreBuilderWithHeaders(
+ Stores.persistentSessionStore(SESSION_STORE_NAME,
Duration.ofMillis(RETENTION_MS)), // non-headers supplier!
+ Serdes.String(),
+ Serdes.String()))
+ .stream(inputStream, Consumed.with(Serdes.String(),
Serdes.String()))
+ .process(() -> {
+ final SessionWithHeadersProcessor p = new
SessionWithHeadersProcessor();
+ processorRef.set(p);
+ return p;
+ }, SESSION_STORE_NAME);
+
+ kafkaStreams = new KafkaStreams(newBuilder.build(), props);
+ IntegrationTestUtils.startApplicationAndWaitUntilRunning(kafkaStreams);
+
+ // Verify legacy data can be read with empty headers
+ verifySessionValueWithEmptyHeaders("key1", "value1", baseTime + 100,
processorRef);
+ verifySessionValueWithEmptyHeaders("key2", "value2", baseTime + 200,
processorRef);
+ verifySessionValueWithEmptyHeaders("key3", "value3", baseTime + 300,
processorRef);
+
+ // In proxy mode, headers are stripped when writing to non-headers
store
+ // So we expect empty headers when reading back
+ final RecordHeaders headers = new RecordHeaders();
+ headers.add("source", "proxy-test".getBytes());
+ final Headers expectedHeaders = new RecordHeaders();
+
+ processSessionKeyValueWithHeadersAndVerify("key4", "value4", baseTime
+ 400, headers, expectedHeaders, processorRef);
+ processSessionKeyValueWithHeadersAndVerify("key5", "value5", baseTime
+ 500, headers, expectedHeaders, processorRef);
+
+ kafkaStreams.close();
+ }
+
+ @Test
+ public void shouldFailDowngradeFromSessionStoreWithHeadersToSessionStore()
throws Exception {
+ final Properties props = props();
+ setupAndPopulateSessionStoreWithHeaders(props);
+ kafkaStreams = null;
+
+ // Attempt to downgrade to plain session store
+ final StreamsBuilder downgradedBuilder = new StreamsBuilder();
+ downgradedBuilder.addStateStore(
+ Stores.sessionStoreBuilder(
+ Stores.persistentSessionStore(SESSION_STORE_NAME,
Duration.ofMillis(RETENTION_MS)),
+ Serdes.String(),
+ Serdes.String()))
+ .stream(inputStream, Consumed.with(Serdes.String(),
Serdes.String()))
+ .process(SessionProcessor::new, SESSION_STORE_NAME);
+
+ kafkaStreams = new KafkaStreams(downgradedBuilder.build(), props);
+
+ boolean exceptionThrown = false;
+ try {
+ kafkaStreams.start();
+ } catch (final Exception e) {
+ Throwable cause = e;
+ while (cause != null) {
+ if (cause instanceof ProcessorStateException &&
+ cause.getMessage() != null &&
+ cause.getMessage().contains("incompatible settings")) {
+ exceptionThrown = true;
+ break;
+ }
+ cause = cause.getCause();
+ }
+
+ if (!exceptionThrown) {
+ throw new AssertionError("Expected ProcessorStateException
about incompatible settings, but got: " + e.getMessage(), e);
+ }
+ } finally {
+ kafkaStreams.close(Duration.ofSeconds(30L));
+ }
+
+ if (!exceptionThrown) {
+ throw new AssertionError("Expected ProcessorStateException to be
thrown when attempting to downgrade from headers-aware to plain session store");
+ }
+ }
+
+ @Test
+ public void
shouldSuccessfullyDowngradeFromSessionStoreWithHeadersToSessionStoreAfterCleanup()
throws Exception {
+ final Properties props = props();
+ setupAndPopulateSessionStoreWithHeaders(props);
+
+ kafkaStreams.cleanUp(); // Delete local state
+ kafkaStreams = null;
+
+ final StreamsBuilder downgradedBuilder = new StreamsBuilder();
+ downgradedBuilder.addStateStore(
+ Stores.sessionStoreBuilder(
+ Stores.persistentSessionStore(SESSION_STORE_NAME,
Duration.ofMillis(RETENTION_MS)),
+ Serdes.String(),
+ Serdes.String()))
+ .stream(inputStream, Consumed.with(Serdes.String(),
Serdes.String()))
+ .process(SessionProcessor::new, SESSION_STORE_NAME);
+
+ kafkaStreams = new KafkaStreams(downgradedBuilder.build(), props);
+ kafkaStreams.start();
+
+ final long newTime = CLUSTER.time.milliseconds();
+ processSessionKeyValueAndVerify("key3", "value3", newTime + 300);
+ processSessionKeyValueAndVerify("key4", "value4", newTime + 400);
+
+ kafkaStreams.close();
+ }
+
+ // ==================== Session Store Helper Methods ====================
+
+ private void processSessionKeyValueAndVerify(final String key,
+ final String value,
+ final long timestamp) throws
Exception {
+ IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+ inputStream,
+ singletonList(KeyValue.pair(key, value)),
+ TestUtils.producerConfig(CLUSTER.bootstrapServers(),
+ StringSerializer.class,
+ StringSerializer.class),
+ timestamp,
+ false);
+
+ TestUtils.waitForCondition(() -> {
+ try {
+ final ReadOnlySessionStore<String, String> store =
+ IntegrationTestUtils.getStore(SESSION_STORE_NAME,
kafkaStreams, QueryableStoreTypes.sessionStore());
+
+ if (store == null) {
+ return false;
+ }
+
+ try (final KeyValueIterator<Windowed<String>, String> iterator
= store.fetch(key)) {
+ while (iterator.hasNext()) {
+ final KeyValue<Windowed<String>, String> kv =
iterator.next();
+ if (kv.key.key().equals(key) &&
kv.value.equals(value)) {
+ return true;
+ }
+ }
+ }
+ return false;
+ } catch (final Exception e) {
+ return false;
+ }
+ }, 60_000L, "Could not verify session value in time.");
+ }
+
+ private void verifySessionValueWithEmptyHeaders(final String key,
+ final String value,
+ final long timestamp,
+ final
AtomicReference<SessionWithHeadersProcessor> processorRef) throws Exception {
+ TestUtils.waitForCondition(() -> {
+ try {
+ if (processorRef.get() == null) {
+ return false;
+ }
+ final ReadOnlySessionStore<String,
AggregationWithHeaders<String>> store = processorRef.get().store();
+ if (store == null) {
+ return false;
+ }
+
+ try (final KeyValueIterator<Windowed<String>,
AggregationWithHeaders<String>> iterator = store.fetch(key)) {
+ while (iterator.hasNext()) {
+ final KeyValue<Windowed<String>,
AggregationWithHeaders<String>> kv = iterator.next();
+ if (kv.key.key().equals(key)
+ && kv.key.window().start() == timestamp
+ && kv.key.window().end() == timestamp) {
+
+ final AggregationWithHeaders<String> result =
kv.value;
+ assertNotNull(result, "Result should not be null");
+ assertEquals(value, result.aggregation(), "Value
should match");
+
+ // Verify headers exist but are empty (migrated
from plain session store)
+ assertNotNull(result.headers(), "Headers should
not be null for migrated data");
+ assertEquals(0, result.headers().toArray().length,
"Headers should be empty for migrated data");
+
+ return true;
+ }
+ }
+ }
+ return false;
+ } catch (final Exception e) {
+ LOG.error("Error verifying legacy session value with empty
headers", e);
+ return false;
+ }
+ }, 60_000L, "Could not verify legacy session value with empty headers
in time.");
+ }
+
+ private void processSessionKeyValueWithHeadersAndVerify(final String key,
+ final String value,
+ final long
timestamp,
+ final Headers
headers,
+ final Headers
expectedHeaders,
+ final
AtomicReference<SessionWithHeadersProcessor> processorRef) throws Exception {
+ IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+ inputStream,
+ singletonList(KeyValue.pair(key, value)),
+ TestUtils.producerConfig(CLUSTER.bootstrapServers(),
+ StringSerializer.class,
+ StringSerializer.class),
+ headers,
+ timestamp,
+ false);
+
+ TestUtils.waitForCondition(() -> {
+ try {
+ if (processorRef.get() == null) {
+ return false;
+ }
+ final ReadOnlySessionStore<String,
AggregationWithHeaders<String>> store = processorRef.get().store();
+
+ if (store == null) {
+ return false;
+ }
+
+ try (final KeyValueIterator<Windowed<String>,
AggregationWithHeaders<String>> iterator = store.fetch(key)) {
+ while (iterator.hasNext()) {
+ final KeyValue<Windowed<String>,
AggregationWithHeaders<String>> kv = iterator.next();
+ if (kv.key.key().equals(key)
+ && kv.key.window().start() == timestamp
+ && kv.key.window().end() == timestamp) {
+
+ final AggregationWithHeaders<String> result =
kv.value;
+ return result != null
+ && result.aggregation().equals(value)
+ && result.headers().equals(expectedHeaders);
+ }
+ }
+ }
+ return false;
+ } catch (final Exception e) {
+ LOG.error("Error verifying session value with headers", e);
+ return false;
+ }
+ }, 60_000L, "Could not verify session value with headers in time.");
+ }
+
+ private boolean sessionStoreContainsKey(final String key,
+ final long timestamp,
+ final
AtomicReference<SessionWithHeadersProcessor> processorRef) {
+ try {
+ if (processorRef.get() == null) {
+ return false;
+ }
+ final SessionStoreWithHeaders<String, String> store =
processorRef.get().store();
+ if (store == null) {
+ return false;
+ }
+
+ try (final KeyValueIterator<Windowed<String>,
AggregationWithHeaders<String>> iterator = store.fetch(key)) {
+ while (iterator.hasNext()) {
+ final KeyValue<Windowed<String>,
AggregationWithHeaders<String>> kv = iterator.next();
+ if (kv.key.key().equals(key) && kv.key.window().start() ==
timestamp) {
+ return true;
+ }
+ }
+ }
+ return false;
+ } catch (final Exception e) {
+ return false;
+ }
+ }
+
+ private void setupAndPopulateSessionStoreWithHeaders(final Properties
props) throws Exception {
+ final StreamsBuilder headersBuilder = new StreamsBuilder();
+ final AtomicReference<SessionWithHeadersProcessor> processorRef = new
AtomicReference<>();
+ headersBuilder.addStateStore(
+ Stores.sessionStoreBuilderWithHeaders(
+
Stores.persistentSessionStoreWithHeaders(SESSION_STORE_NAME,
Duration.ofMillis(RETENTION_MS)),
+ Serdes.String(),
+ Serdes.String()))
+ .stream(inputStream, Consumed.with(Serdes.String(),
Serdes.String()))
+ .process(() -> {
+ final SessionWithHeadersProcessor p = new
SessionWithHeadersProcessor();
+ processorRef.set(p);
+ return p;
+ }, SESSION_STORE_NAME);
+
+ kafkaStreams = new KafkaStreams(headersBuilder.build(), props);
+ IntegrationTestUtils.startApplicationAndWaitUntilRunning(kafkaStreams);
+
+ final long baseTime = CLUSTER.time.milliseconds();
+ final Headers headers = new RecordHeaders();
+ headers.add("source", "test".getBytes());
+
+ IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+ inputStream,
+ singletonList(KeyValue.pair("key1", "value1")),
+ TestUtils.producerConfig(CLUSTER.bootstrapServers(),
StringSerializer.class, StringSerializer.class),
+ headers,
+ baseTime + 100,
+ false);
+
+ TestUtils.waitForCondition(
+ () -> sessionStoreContainsKey("key1", baseTime + 100,
processorRef),
+ 30_000L,
+ "Store was not populated with expected data"
+ );
+
+ kafkaStreams.close();
+ }
+
+ // ==================== Session Store Processors ====================
+
+ private static class SessionProcessor implements Processor<String, String,
Void, Void> {
+ private SessionStore<String, String> store;
+
+ public SessionStore<String, String> store() {
+ return store;
+ }
+
+ @Override
+ public void init(final ProcessorContext<Void, Void> context) {
+ store = context.getStateStore(SESSION_STORE_NAME);
+ }
+
+ @Override
+ public void process(final Record<String, String> record) {
+ final Windowed<String> sessionKey = new Windowed<>(record.key(),
+ new SessionWindow(record.timestamp(), record.timestamp()));
+ store.put(sessionKey, record.value());
+ }
+ }
+
+ private static class SessionWithHeadersProcessor implements
Processor<String, String, Void, Void> {
+ private SessionStoreWithHeaders<String, String> store;
+
+ public SessionStoreWithHeaders<String, String> store() {
+ return store;
+ }
+
+ @Override
+ public void init(final ProcessorContext<Void, Void> context) {
+ store = context.getStateStore(SESSION_STORE_NAME);
+ }
+
+ @Override
+ public void process(final Record<String, String> record) {
+ final Windowed<String> sessionKey = new Windowed<>(record.key(),
+ new SessionWindow(record.timestamp(), record.timestamp()));
+ store.put(sessionKey, AggregationWithHeaders.make(record.value(),
record.headers()));
+ }
+ }
}
\ No newline at end of file
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java
index e2b92352bf2..b5e73567330 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java
@@ -26,6 +26,7 @@ import org.apache.kafka.streams.errors.TaskIdFormatException;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.Task.TaskType;
+import org.apache.kafka.streams.state.SessionStore;
import org.apache.kafka.streams.state.internals.RecordConverter;
import org.slf4j.Logger;
@@ -36,6 +37,7 @@ import java.util.concurrent.atomic.AtomicReference;
import static
org.apache.kafka.streams.state.internals.RecordConverters.identity;
import static
org.apache.kafka.streams.state.internals.RecordConverters.rawValueToHeadersValue;
+import static
org.apache.kafka.streams.state.internals.RecordConverters.rawValueToSessionHeadersValue;
import static
org.apache.kafka.streams.state.internals.RecordConverters.rawValueToTimestampedValue;
import static
org.apache.kafka.streams.state.internals.WrappedStateStore.isHeadersAware;
import static
org.apache.kafka.streams.state.internals.WrappedStateStore.isTimestamped;
@@ -53,6 +55,9 @@ final class StateManagerUtil {
static RecordConverter converterForStore(final StateStore store) {
if (isHeadersAware(store)) {
+ if (store instanceof SessionStore) {
+ return rawValueToSessionHeadersValue();
+ }
return rawValueToHeadersValue();
} else if (isTimestamped(store) && !isVersioned(store)) {
// should not prepend timestamp when restoring records for
versioned store, as
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RecordConverters.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RecordConverters.java
index 8eb7be4f5de..118bc019b5f 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RecordConverters.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RecordConverters.java
@@ -82,6 +82,34 @@ public final class RecordConverters {
return RAW_TO_WITH_HEADERS_INSTANCE;
}
+ private static final RecordConverter RAW_TO_SESSION_WITH_HEADERS_INSTANCE
= record -> {
+ final byte[] rawValue = record.value();
+
+ // Format: [headersSize(varint)][headersBytes][aggregation] (no
timestamp)
+ final byte[] recordValue = reconstructSessionFromRaw(
+ rawValue,
+ record.headers()
+ );
+
+ return new ConsumerRecord<>(
+ record.topic(),
+ record.partition(),
+ record.offset(),
+ record.timestamp(),
+ record.timestampType(),
+ record.serializedKeySize(),
+ record.serializedValueSize(),
+ record.key(),
+ recordValue,
+ record.headers(),
+ record.leaderEpoch()
+ );
+ };
+
+ public static RecordConverter rawValueToSessionHeadersValue() {
+ return RAW_TO_SESSION_WITH_HEADERS_INSTANCE;
+ }
+
// privatize the constructor so the class cannot be instantiated (only
used for its static members)
private RecordConverters() {}
@@ -93,6 +121,33 @@ public final class RecordConverters {
return IDENTITY_INSTANCE;
}
+ /**
+ * Reconstructs the AggregationWithHeaders format from raw value bytes and
headers (no timestamp).
+ * Used during state restoration from changelog topics for session stores.
+ *
+ * @param rawValue the raw aggregation bytes
+ * @param headers the headers
+ * @return the serialized AggregationWithHeaders format
+ */
+ static byte[] reconstructSessionFromRaw(final byte[] rawValue, final
Headers headers) {
+ if (rawValue == null) {
+ return null;
+ }
+ final byte[] rawHeaders = HeadersSerializer.serialize(headers);
+
+ try (final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ final DataOutputStream out = new DataOutputStream(baos)) {
+
+ ByteUtils.writeVarint(rawHeaders.length, out);
+ out.write(rawHeaders);
+ out.write(rawValue);
+
+ return baos.toByteArray();
+ } catch (final IOException e) {
+ throw new SerializationException("Failed to reconstruct
AggregationWithHeaders", e);
+ }
+ }
+
/**
* Reconstructs the ValueTimestampHeaders format from raw value bytes,
timestamp, and headers.
* Used during state restoration from changelog topics.