This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 4.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.3 by this push:
     new 28d58f6463b KAFKA-20194: Fix StoreBuilderWrapper for header-stores 
(#21950)
28d58f6463b is described below

commit 28d58f6463b15309443bff32cb3581e6f4b628f7
Author: Matthias J. Sax <[email protected]>
AuthorDate: Tue Apr 7 16:48:45 2026 -0700

    KAFKA-20194: Fix StoreBuilderWrapper for header-stores (#21950)
    
    StoreBuilderWrapper must recognize header-stores correctly. Otherwise,
    we create changelog topics with incorrect topic configuration.
    
    Also adding a fix to public API. The newly added methods to create a
    ts-ks-headers-store and session-headesr-store builder (on `Stores`)  are
    incorrect (cf. KIP-1271).
    
    Also updated the corresponding internal class name for session-headers
    store.
    
    Reviewers: Bill Bejeck <[email protected]>, Alieh Saeedi
     <[email protected]>
---
 .../HeadersStoreUpgradeIntegrationTest.java        |  16 +-
 ...Test.java => SessionsStoreWithHeadersTest.java} | 168 +++++++++----------
 .../TimestampedKeyValueStoreWithHeadersTest.java   |  10 +-
 .../TimestampedWindowStoreWithHeadersTest.java     |  10 +-
 .../AbstractConfigurableStoreFactory.java          |   8 +-
 .../internals/KeyValueStoreMaterializer.java       |   2 +-
 .../internals/SessionStoreMaterializer.java        |   2 +-
 .../internals/SubscriptionStoreFactory.java        |   2 +-
 .../processor/internals/StoreBuilderWrapper.java   |  18 +-
 .../org/apache/kafka/streams/state/Stores.java     | 185 ++++++++++++---------
 ...rs.java => SessionStoreWithHeadersBuilder.java} |   4 +-
 ...KStreamSessionWindowAggregateProcessorTest.java |   2 +-
 .../ForeignTableJoinProcessorSupplierTests.java    |   2 +-
 .../SubscriptionReceiveProcessorSupplierTest.java  |   2 +-
 .../state/QueryableStoreTypesWithHeadersTest.java  |   8 +-
 .../internals/GlobalStateStoreProviderTest.java    |   2 +-
 ...onStoreWithHeadersSerializerSideEffectTest.java |   2 +-
 .../StreamThreadStateStoreProviderTest.java        |   2 +-
 ...ueStoreWithHeadersSerializerSideEffectTest.java |   2 +-
 .../kafka/streams/state/internals/UtilsTest.java   |  13 +-
 20 files changed, 242 insertions(+), 218 deletions(-)

diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HeadersStoreUpgradeIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HeadersStoreUpgradeIntegrationTest.java
index 25fb33dd182..5ba3f95f47d 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HeadersStoreUpgradeIntegrationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HeadersStoreUpgradeIntegrationTest.java
@@ -169,7 +169,7 @@ public class HeadersStoreUpgradeIntegrationTest {
         final StreamsBuilder streamsBuilderForNewStore = new StreamsBuilder();
 
         streamsBuilderForNewStore.addStateStore(
-                Stores.timestampedKeyValueStoreBuilderWithHeaders(
+                Stores.timestampedKeyValueStoreWithHeadersBuilder(
                     persistentStore ? 
Stores.persistentTimestampedKeyValueStoreWithHeaders(STORE_NAME) : 
Stores.inMemoryKeyValueStore(STORE_NAME),
                     Serdes.String(),
                     Serdes.String()))
@@ -222,7 +222,7 @@ public class HeadersStoreUpgradeIntegrationTest {
         final StreamsBuilder streamsBuilderForNewStore = new StreamsBuilder();
 
         streamsBuilderForNewStore.addStateStore(
-                Stores.timestampedKeyValueStoreBuilderWithHeaders(
+                Stores.timestampedKeyValueStoreWithHeadersBuilder(
                     Stores.persistentTimestampedKeyValueStore(STORE_NAME),
                     Serdes.String(),
                     Serdes.String()))
@@ -288,7 +288,7 @@ public class HeadersStoreUpgradeIntegrationTest {
         final StreamsBuilder streamsBuilderForNewStore = new StreamsBuilder();
 
         streamsBuilderForNewStore.addStateStore(
-                Stores.timestampedKeyValueStoreBuilderWithHeaders(
+                Stores.timestampedKeyValueStoreWithHeadersBuilder(
                     persistentStore ? 
Stores.persistentTimestampedKeyValueStoreWithHeaders(STORE_NAME) : 
Stores.inMemoryKeyValueStore(STORE_NAME),
                     Serdes.String(),
                     Serdes.String()))
@@ -341,7 +341,7 @@ public class HeadersStoreUpgradeIntegrationTest {
         final StreamsBuilder streamsBuilderForNewStore = new StreamsBuilder();
 
         streamsBuilderForNewStore.addStateStore(
-                Stores.timestampedKeyValueStoreBuilderWithHeaders(
+                Stores.timestampedKeyValueStoreWithHeadersBuilder(
                     Stores.persistentKeyValueStore(STORE_NAME),
                     Serdes.String(),
                     Serdes.String()))
@@ -1563,7 +1563,7 @@ public class HeadersStoreUpgradeIntegrationTest {
     private void setupAndPopulateKeyValueStoreWithHeaders(final Properties 
props) throws Exception {
         final StreamsBuilder headersBuilder = new StreamsBuilder();
         headersBuilder.addStateStore(
-                Stores.timestampedKeyValueStoreBuilderWithHeaders(
+                Stores.timestampedKeyValueStoreWithHeadersBuilder(
                     
Stores.persistentTimestampedKeyValueStoreWithHeaders(STORE_NAME),
                     Serdes.String(),
                     Serdes.String()))
@@ -1622,7 +1622,7 @@ public class HeadersStoreUpgradeIntegrationTest {
         final StreamsBuilder newBuilder = new StreamsBuilder();
         final AtomicReference<SessionWithHeadersProcessor> processorRef = new 
AtomicReference<>();
         newBuilder.addStateStore(
-                Stores.sessionStoreBuilderWithHeaders(
+                Stores.sessionStoreWithHeadersBuilder(
                     isPersistent ? 
Stores.persistentSessionStoreWithHeaders(SESSION_STORE_NAME, 
Duration.ofMillis(RETENTION_MS)) :
                         Stores.inMemorySessionStore(SESSION_STORE_NAME, 
Duration.ofMillis(RETENTION_MS)),
                     Serdes.String(),
@@ -1680,7 +1680,7 @@ public class HeadersStoreUpgradeIntegrationTest {
         final StreamsBuilder newBuilder = new StreamsBuilder();
         final AtomicReference<SessionWithHeadersProcessor> processorRef = new 
AtomicReference<>();
         newBuilder.addStateStore(
-                Stores.sessionStoreBuilderWithHeaders(
+                Stores.sessionStoreWithHeadersBuilder(
                     Stores.persistentSessionStore(SESSION_STORE_NAME, 
Duration.ofMillis(RETENTION_MS)),  // non-headers supplier!
                     Serdes.String(),
                     Serdes.String()))
@@ -1941,7 +1941,7 @@ public class HeadersStoreUpgradeIntegrationTest {
         final StreamsBuilder headersBuilder = new StreamsBuilder();
         final AtomicReference<SessionWithHeadersProcessor> processorRef = new 
AtomicReference<>();
         headersBuilder.addStateStore(
-                Stores.sessionStoreBuilderWithHeaders(
+                Stores.sessionStoreWithHeadersBuilder(
                     
Stores.persistentSessionStoreWithHeaders(SESSION_STORE_NAME, 
Duration.ofMillis(RETENTION_MS)),
                     Serdes.String(),
                     Serdes.String()))
diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TimestampedWindowStoreWithHeadersTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SessionsStoreWithHeadersTest.java
similarity index 70%
copy from 
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TimestampedWindowStoreWithHeadersTest.java
copy to 
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SessionsStoreWithHeadersTest.java
index 1ae592e8901..bf18dce66e5 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TimestampedWindowStoreWithHeadersTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SessionsStoreWithHeadersTest.java
@@ -32,15 +32,15 @@ import 
org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
 import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.internals.SessionWindow;
 import org.apache.kafka.streams.processor.api.Processor;
 import org.apache.kafka.streams.processor.api.ProcessorContext;
 import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.state.AggregationWithHeaders;
+import org.apache.kafka.streams.state.SessionStore;
+import org.apache.kafka.streams.state.SessionStoreWithHeaders;
 import org.apache.kafka.streams.state.Stores;
-import org.apache.kafka.streams.state.TimestampedWindowStore;
-import org.apache.kafka.streams.state.TimestampedWindowStoreWithHeaders;
-import org.apache.kafka.streams.state.ValueAndTimestamp;
-import org.apache.kafka.streams.state.ValueTimestampHeaders;
-import org.apache.kafka.streams.state.WindowStoreIterator;
 import org.apache.kafka.test.TestUtils;
 
 import org.junit.jupiter.api.AfterAll;
@@ -65,10 +65,10 @@ import static 
org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
 @Tag("integration")
-public class TimestampedWindowStoreWithHeadersTest {
+public class SessionsStoreWithHeadersTest {
 
-    private static final String STORE_NAME = "headers-window-store";
-    private static final long WINDOW_SIZE_MS = 100L;
+    private static final String STORE_NAME = "headers-session-store";
+    private static final long WINDOW_GAP_MS = 100L;
     private static final long RETENTION_MS = 1000L;
 
     private String inputStream;
@@ -102,7 +102,7 @@ public class TimestampedWindowStoreWithHeadersTest {
     }
 
     @BeforeEach
-    public void beforeTest(final TestInfo testInfo) throws 
InterruptedException {
+    public void beforeTest(final TestInfo testInfo) {
         this.testInfo = testInfo;
         final String uniqueTestName = safeUniqueTestName(testInfo);
         inputStream = "input-stream-" + uniqueTestName;
@@ -126,14 +126,14 @@ public class TimestampedWindowStoreWithHeadersTest {
         final StreamsBuilder streamsBuilder = new StreamsBuilder();
 
         streamsBuilder.addStateStore(
-                Stores.timestampedWindowStoreWithHeadersBuilder(
-                    
Stores.persistentTimestampedWindowStoreWithHeaders(STORE_NAME, 
Duration.ofMillis(RETENTION_MS), Duration.ofMillis(WINDOW_SIZE_MS), false),
+                Stores.sessionStoreWithHeadersBuilder(
+                    Stores.persistentSessionStoreWithHeaders(STORE_NAME, 
Duration.ofMillis(RETENTION_MS)),
                     Serdes.Integer(),
                     Serdes.String()
                 )
             )
             .stream(inputStream, Consumed.with(Serdes.Integer(), 
Serdes.String()))
-            .process(() -> new 
TimestampedWindowStoreWithHeadersContentCheckerProcessor(true), STORE_NAME)
+            .process(() -> new 
SessionStoreWithHeadersContentCheckerProcessor(true), STORE_NAME)
             .to(outputStream, Produced.with(Serdes.Integer(), 
Serdes.Integer()));
 
         final Properties props = props();
@@ -152,7 +152,7 @@ public class TimestampedWindowStoreWithHeadersTest {
             KeyValue.pair(1, "a50"), KeyValue.pair(2, null), KeyValue.pair(3, 
"c50"));
 
         // Window 2: [baseTimestamp + WINDOW_SIZE_MS, baseTimestamp + 2 * 
WINDOW_SIZE_MS)
-        numRecordsProduced += produceDataToTopicWithHeaders(inputStream, 
baseTimestamp + WINDOW_SIZE_MS,
+        numRecordsProduced += produceDataToTopicWithHeaders(inputStream, 
baseTimestamp + WINDOW_GAP_MS,
             EMPTY_HEADERS,
             KeyValue.pair(1, "a100"), KeyValue.pair(2, "b100"), 
KeyValue.pair(3, null));
 
@@ -172,14 +172,14 @@ public class TimestampedWindowStoreWithHeadersTest {
         final StreamsBuilder streamsBuilder = new StreamsBuilder();
 
         streamsBuilder.addStateStore(
-                Stores.timestampedWindowStoreWithHeadersBuilder(
-                    
Stores.persistentTimestampedWindowStoreWithHeaders(STORE_NAME, 
Duration.ofMillis(RETENTION_MS), Duration.ofMillis(WINDOW_SIZE_MS), false),
+                Stores.sessionStoreWithHeadersBuilder(
+                    Stores.persistentSessionStoreWithHeaders(STORE_NAME, 
Duration.ofMillis(RETENTION_MS)),
                     Serdes.Integer(),
                     Serdes.String()
                 )
             )
             .stream(inputStream, Consumed.with(Serdes.Integer(), 
Serdes.String()))
-            .process(() -> new 
TimestampedWindowStoreWithHeadersContentCheckerProcessor(false), STORE_NAME)
+            .process(() -> new 
SessionStoreWithHeadersContentCheckerProcessor(false), STORE_NAME)
             .to(outputStream, Produced.with(Serdes.Integer(), 
Serdes.Integer()));
 
         final Properties props = props();
@@ -199,7 +199,7 @@ public class TimestampedWindowStoreWithHeadersTest {
         // verify changelog topic properties
         final String changelogTopic = 
props.getProperty(StreamsConfig.APPLICATION_ID_CONFIG) + "-" + STORE_NAME + 
"-changelog";
         final Properties changelogTopicConfig = 
CLUSTER.getLogConfig(changelogTopic);
-        assertEquals("compact", 
changelogTopicConfig.getProperty("cleanup.policy"));
+        assertEquals("compact,delete", 
changelogTopicConfig.getProperty("cleanup.policy"));
     }
 
     @Test
@@ -207,14 +207,14 @@ public class TimestampedWindowStoreWithHeadersTest {
         StreamsBuilder streamsBuilder = new StreamsBuilder();
 
         streamsBuilder.addStateStore(
-                Stores.timestampedWindowStoreWithHeadersBuilder(
-                    
Stores.persistentTimestampedWindowStoreWithHeaders(STORE_NAME, 
Duration.ofMillis(RETENTION_MS), Duration.ofMillis(WINDOW_SIZE_MS), false),
+                Stores.sessionStoreWithHeadersBuilder(
+                    Stores.persistentSessionStoreWithHeaders(STORE_NAME, 
Duration.ofMillis(RETENTION_MS)),
                     Serdes.Integer(),
                     Serdes.String()
                 )
             )
             .stream(inputStream, Consumed.with(Serdes.Integer(), 
Serdes.String()))
-            .process(() -> new 
TimestampedWindowStoreWithHeadersContentCheckerProcessor(true), STORE_NAME)
+            .process(() -> new 
SessionStoreWithHeadersContentCheckerProcessor(true), STORE_NAME)
             .to(outputStream, Produced.with(Serdes.Integer(), 
Serdes.Integer()));
 
         final Properties props = props();
@@ -229,7 +229,7 @@ public class TimestampedWindowStoreWithHeadersTest {
         initialRecordsProduced += produceDataToTopicWithHeaders(inputStream, 
baseTimestamp + 50, HEADERS2,
             KeyValue.pair(1, "a50"), KeyValue.pair(2, null), KeyValue.pair(3, 
"c50"));
 
-        initialRecordsProduced += produceDataToTopicWithHeaders(inputStream, 
baseTimestamp + WINDOW_SIZE_MS, EMPTY_HEADERS,
+        initialRecordsProduced += produceDataToTopicWithHeaders(inputStream, 
baseTimestamp + WINDOW_GAP_MS, EMPTY_HEADERS,
             KeyValue.pair(1, "a100"), KeyValue.pair(2, "b100"), 
KeyValue.pair(3, "c100"));
 
         IntegrationTestUtils.waitUntilMinRecordsReceived(
@@ -248,14 +248,14 @@ public class TimestampedWindowStoreWithHeadersTest {
         streamsBuilder = new StreamsBuilder();
 
         streamsBuilder.addStateStore(
-                Stores.timestampedWindowStoreWithHeadersBuilder(
-                    
Stores.persistentTimestampedWindowStoreWithHeaders(STORE_NAME, 
Duration.ofMillis(RETENTION_MS), Duration.ofMillis(WINDOW_SIZE_MS), false),
+                Stores.sessionStoreWithHeadersBuilder(
+                    Stores.persistentSessionStoreWithHeaders(STORE_NAME, 
Duration.ofMillis(RETENTION_MS)),
                     Serdes.Integer(),
                     Serdes.String()
                 )
             )
             .stream(inputStream, Consumed.with(Serdes.Integer(), 
Serdes.String()))
-            .process(() -> new 
TimestampedWindowStoreWithHeadersContentCheckerProcessor(true), STORE_NAME)
+            .process(() -> new 
SessionStoreWithHeadersContentCheckerProcessor(true), STORE_NAME)
             .to(outputStream, Produced.with(Serdes.Integer(), 
Serdes.Integer()));
 
         kafkaStreams = new KafkaStreams(streamsBuilder.build(), props);
@@ -263,7 +263,7 @@ public class TimestampedWindowStoreWithHeadersTest {
 
         // produce additional records to verify restored store works correctly
         final Headers finalHeaders = new RecordHeaders().add("final", 
"true".getBytes());
-        final int additionalRecordsProduced = 
produceDataToTopicWithHeaders(inputStream, baseTimestamp + 2 * WINDOW_SIZE_MS, 
finalHeaders,
+        final int additionalRecordsProduced = 
produceDataToTopicWithHeaders(inputStream, baseTimestamp + 2 * WINDOW_GAP_MS, 
finalHeaders,
             KeyValue.pair(1, "a200"), KeyValue.pair(2, "b200"), 
KeyValue.pair(3, "c200"));
 
         final List<KeyValue<Integer, Integer>> receivedRecords = 
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
@@ -282,14 +282,14 @@ public class TimestampedWindowStoreWithHeadersTest {
         final StreamsBuilder streamsBuilder = new StreamsBuilder();
 
         streamsBuilder.addStateStore(
-                Stores.timestampedWindowStoreBuilder(
-                    Stores.persistentTimestampedWindowStore(STORE_NAME, 
Duration.ofMillis(RETENTION_MS), Duration.ofMillis(WINDOW_SIZE_MS), false),
+                Stores.sessionStoreBuilder(
+                    Stores.persistentSessionStore(STORE_NAME, 
Duration.ofMillis(RETENTION_MS)),
                     Serdes.Integer(),
                     Serdes.String()
                 )
             )
             .stream(inputStream, Consumed.with(Serdes.Integer(), 
Serdes.String()))
-            .process(TimestampedWindowStoreContentCheckerProcessor::new, 
STORE_NAME)
+            .process(SessionStoreContentCheckerProcessor::new, STORE_NAME)
             .to(outputStream, Produced.with(Serdes.Integer(), 
Serdes.Integer()));
 
         shouldManualUpgradeFromTimestampedToHeaders(streamsBuilder.build());
@@ -307,7 +307,7 @@ public class TimestampedWindowStoreWithHeadersTest {
             KeyValue.pair(1, "a0"), KeyValue.pair(2, "b0"), KeyValue.pair(3, 
null));
         initialRecordsProduced += produceDataToTopic(inputStream, 
baseTimestamp + 50,
             KeyValue.pair(1, "a50"), KeyValue.pair(2, null), KeyValue.pair(3, 
"c50"));
-        initialRecordsProduced += produceDataToTopic(inputStream, 
baseTimestamp + WINDOW_SIZE_MS,
+        initialRecordsProduced += produceDataToTopic(inputStream, 
baseTimestamp + WINDOW_GAP_MS,
             KeyValue.pair(1, "a100"), KeyValue.pair(2, "b100"), 
KeyValue.pair(3, null));
 
         List<KeyValue<Integer, Integer>> receivedRecords = 
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
@@ -330,14 +330,14 @@ public class TimestampedWindowStoreWithHeadersTest {
 
         streamsBuilder
             .addStateStore(
-                Stores.timestampedWindowStoreWithHeadersBuilder(
-                    
Stores.persistentTimestampedWindowStoreWithHeaders(STORE_NAME, 
Duration.ofMillis(RETENTION_MS), Duration.ofMillis(WINDOW_SIZE_MS), false),
+                Stores.sessionStoreWithHeadersBuilder(
+                    Stores.persistentSessionStoreWithHeaders(STORE_NAME, 
Duration.ofMillis(RETENTION_MS)),
                     Serdes.Integer(),
                     Serdes.String()
                 )
             )
             .stream(inputStream, Consumed.with(Serdes.Integer(), 
Serdes.String()))
-            .process(() -> new 
TimestampedWindowStoreWithHeadersContentCheckerProcessor(true), STORE_NAME)
+            .process(() -> new 
SessionStoreWithHeadersContentCheckerProcessor(true), STORE_NAME)
             .to(outputStream, Produced.with(Serdes.Integer(), 
Serdes.Integer()));
 
         kafkaStreams = new KafkaStreams(streamsBuilder.build(), props);
@@ -345,7 +345,7 @@ public class TimestampedWindowStoreWithHeadersTest {
 
         // produce additional records with headers to verify upgraded store 
works
         final Headers upgradedHeaders = new RecordHeaders().add("upgraded", 
"true".getBytes());
-        final int additionalRecordsProduced = 
produceDataToTopicWithHeaders(inputStream, baseTimestamp + 2 * WINDOW_SIZE_MS, 
upgradedHeaders,
+        final int additionalRecordsProduced = 
produceDataToTopicWithHeaders(inputStream, baseTimestamp + 2 * WINDOW_GAP_MS, 
upgradedHeaders,
             KeyValue.pair(1, "a200"), KeyValue.pair(2, "b200"), 
KeyValue.pair(3, "c200"));
 
         receivedRecords = 
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
@@ -375,9 +375,9 @@ public class TimestampedWindowStoreWithHeadersTest {
      */
     @SuppressWarnings("varargs")
     @SafeVarargs
-    private final int produceDataToTopic(final String topic,
-                                         final long timestamp,
-                                         final KeyValue<Integer, String>... 
keyValues) {
+    private int produceDataToTopic(final String topic,
+                                   final long timestamp,
+                                   final KeyValue<Integer, String>... 
keyValues) {
         IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
             topic,
             Arrays.asList(keyValues),
@@ -412,21 +412,21 @@ public class TimestampedWindowStoreWithHeadersTest {
     }
 
     /**
-     * Processor for validating expected contents of a timestamped window 
store with headers, and forwards
+     * Processor for validating expected contents of a session store with 
headers, and forwards
      * the number of failed checks downstream for consumption.
      */
-    private static class 
TimestampedWindowStoreWithHeadersContentCheckerProcessor implements 
Processor<Integer, String, Integer, Integer> {
+    private static class SessionStoreWithHeadersContentCheckerProcessor 
implements Processor<Integer, String, Integer, Integer> {
 
         private ProcessorContext<Integer, Integer> context;
-        private TimestampedWindowStoreWithHeaders<Integer, String> store;
+        private SessionStoreWithHeaders<Integer, String> store;
 
         // whether the processor should write records to the store as they 
arrive.
         private final boolean writeToStore;
         // in-memory copy of seen data, to validate for testing purposes.
-        // Maps key -> windowStartTime -> ValueTimestampHeaders
-        private final Map<Integer, Map<Long, 
Optional<ValueTimestampHeaders<String>>>> data;
+        // Maps windowed-key -> AggregationWithHeaders
+        private final Map<Windowed<Integer>, 
Optional<AggregationWithHeaders<String>>> data;
 
-        TimestampedWindowStoreWithHeadersContentCheckerProcessor(final boolean 
writeToStore) {
+        SessionStoreWithHeadersContentCheckerProcessor(final boolean 
writeToStore) {
             this.writeToStore = writeToStore;
             this.data = new HashMap<>();
         }
@@ -439,15 +439,12 @@ public class TimestampedWindowStoreWithHeadersTest {
 
         @Override
         public void process(final Record<Integer, String> record) {
-            final long windowStartTime = record.timestamp() - 
(record.timestamp() % WINDOW_SIZE_MS);
-
             if (writeToStore) {
-                final ValueTimestampHeaders<String> valueTimestampHeaders =
-                    ValueTimestampHeaders.make(record.value(), 
record.timestamp(), record.headers());
-                store.put(record.key(), valueTimestampHeaders, 
windowStartTime);
+                final AggregationWithHeaders<String> valueHeaders =
+                    AggregationWithHeaders.make(record.value(), 
record.headers());
+                store.put(new Windowed<>(record.key(), new 
SessionWindow(record.timestamp(), record.timestamp())), valueHeaders);
 
-                data.computeIfAbsent(record.key(), k -> new HashMap<>());
-                data.get(record.key()).put(windowStartTime, 
Optional.ofNullable(valueTimestampHeaders));
+                data.put(new Windowed<>(record.key(), new 
SessionWindow(record.timestamp(), record.timestamp())), 
Optional.ofNullable(valueHeaders));
             }
 
             //
@@ -461,43 +458,36 @@ public class TimestampedWindowStoreWithHeadersTest {
          */
         private int checkStoreContents() {
             int failedChecks = 0;
-            for (final Map.Entry<Integer, Map<Long, 
Optional<ValueTimestampHeaders<String>>>> keyEntry : data.entrySet()) {
-                final Integer key = keyEntry.getKey();
-
-                for (final Map.Entry<Long, 
Optional<ValueTimestampHeaders<String>>> windowEntry : 
keyEntry.getValue().entrySet()) {
-                    final Long windowStartTime = windowEntry.getKey();
-                    final ValueTimestampHeaders<String> 
expectedValueTimestampHeaders =
-                        windowEntry.getValue().orElse(null);
-
-                    // validate fetch from store
-                    try (final 
WindowStoreIterator<ValueTimestampHeaders<String>> iterator =
-                             store.fetch(key, windowStartTime, 
windowStartTime)) {
-                        final ValueTimestampHeaders<String> 
actualValueTimestampHeaders =
-                            iterator.hasNext() ? iterator.next().value : null;
-                        if (!Objects.equals(actualValueTimestampHeaders, 
expectedValueTimestampHeaders)) {
-                            failedChecks++;
-                        }
-                    }
+            for (final Map.Entry<Windowed<Integer>, 
Optional<AggregationWithHeaders<String>>> keyEntry : data.entrySet()) {
+                final Windowed<Integer> windowedKey = keyEntry.getKey();
+                final long sessionTime = windowedKey.window().start();
+
+                final AggregationWithHeaders<String> expectedValueTHeaders = 
keyEntry.getValue().orElse(null);
+                final AggregationWithHeaders<String> actualValueHeaders = 
store.fetchSession(windowedKey.key(), sessionTime, sessionTime);
+
+                if (!Objects.equals(actualValueHeaders, 
expectedValueTHeaders)) {
+                    failedChecks++;
                 }
             }
+
             return failedChecks;
         }
     }
 
     /**
-     * Processor for validating expected contents of a timestamped window 
store (without headers).
-     * Used for testing the upgrade path from TimestampedWindowStore to 
TimestampedWindowStoreWithHeaders.
+     * Processor for validating expected contents of a session store (without 
headers).
+     * Used for testing the upgrade path from SessionStore to 
SessionStoreWithHeaders.
      */
-    private static class TimestampedWindowStoreContentCheckerProcessor 
implements Processor<Integer, String, Integer, Integer> {
+    private static class SessionStoreContentCheckerProcessor implements 
Processor<Integer, String, Integer, Integer> {
 
         private ProcessorContext<Integer, Integer> context;
-        private TimestampedWindowStore<Integer, String> store;
+        private SessionStore<Integer, String> store;
 
         // in-memory copy of seen data, to validate for testing purposes.
-        // Maps key -> windowStartTime -> ValueAndTimestamp
-        private final Map<Integer, Map<Long, 
Optional<ValueAndTimestamp<String>>>> data;
+        // Maps windowed-key -> value
+        private final Map<Windowed<Integer>, Optional<String>> data;
 
-        TimestampedWindowStoreContentCheckerProcessor() {
+        SessionStoreContentCheckerProcessor() {
             this.data = new HashMap<>();
         }
 
@@ -509,13 +499,10 @@ public class TimestampedWindowStoreWithHeadersTest {
 
         @Override
         public void process(final Record<Integer, String> record) {
-            final long windowStartTime = record.timestamp() - 
(record.timestamp() % WINDOW_SIZE_MS);
 
-            final ValueAndTimestamp<String> valueAndTimestamp = 
ValueAndTimestamp.make(record.value(), record.timestamp());
-            store.put(record.key(), valueAndTimestamp, windowStartTime);
+            store.put(new Windowed<>(record.key(), new 
SessionWindow(record.timestamp(), record.timestamp())), record.value());
 
-            data.computeIfAbsent(record.key(), k -> new HashMap<>());
-            data.get(record.key()).put(windowStartTime, 
Optional.ofNullable(valueAndTimestamp));
+            data.put(new Windowed<>(record.key(), new 
SessionWindow(record.timestamp(), record.timestamp())), 
Optional.ofNullable(record.value()));
 
             final int failedChecks = checkStoreContents();
             context.forward(record.withValue(failedChecks));
@@ -527,22 +514,15 @@ public class TimestampedWindowStoreWithHeadersTest {
          */
         private int checkStoreContents() {
             int failedChecks = 0;
-            for (final Map.Entry<Integer, Map<Long, 
Optional<ValueAndTimestamp<String>>>> keyEntry : data.entrySet()) {
-                final Integer key = keyEntry.getKey();
-
-                for (final Map.Entry<Long, 
Optional<ValueAndTimestamp<String>>> windowEntry : 
keyEntry.getValue().entrySet()) {
-                    final Long windowStartTime = windowEntry.getKey();
-                    final ValueAndTimestamp<String> expectedValueAndTimestamp 
= windowEntry.getValue().orElse(null);
-
-                    // validate fetch from store
-                    try (final WindowStoreIterator<ValueAndTimestamp<String>> 
iterator =
-                             store.fetch(key, windowStartTime, 
windowStartTime)) {
-                        final ValueAndTimestamp<String> 
actualValueAndTimestamp =
-                            iterator.hasNext() ? iterator.next().value : null;
-                        if (!Objects.equals(actualValueAndTimestamp, 
expectedValueAndTimestamp)) {
-                            failedChecks++;
-                        }
-                    }
+            for (final Map.Entry<Windowed<Integer>, Optional<String>> keyEntry 
: data.entrySet()) {
+                final Windowed<Integer> windowedKey = keyEntry.getKey();
+                final long sessionTime = windowedKey.window().start();
+
+                final String expectedValue = keyEntry.getValue().orElse(null);
+                final String actualValue = 
store.fetchSession(windowedKey.key(), sessionTime, sessionTime);
+
+                if (!Objects.equals(actualValue, expectedValue)) {
+                    failedChecks++;
                 }
             }
             return failedChecks;
diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TimestampedKeyValueStoreWithHeadersTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TimestampedKeyValueStoreWithHeadersTest.java
index b7cbdb2d82d..bb110d86cdd 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TimestampedKeyValueStoreWithHeadersTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TimestampedKeyValueStoreWithHeadersTest.java
@@ -125,7 +125,7 @@ public class TimestampedKeyValueStoreWithHeadersTest {
 
         streamsBuilder
             .addStateStore(
-                Stores.timestampedKeyValueStoreBuilderWithHeaders(
+                Stores.timestampedKeyValueStoreWithHeadersBuilder(
                     
Stores.persistentTimestampedKeyValueStoreWithHeaders(STORE_NAME),
                     Serdes.Integer(),
                     Serdes.String()
@@ -173,7 +173,7 @@ public class TimestampedKeyValueStoreWithHeadersTest {
 
         streamsBuilder
             .addStateStore(
-                Stores.timestampedKeyValueStoreBuilderWithHeaders(
+                Stores.timestampedKeyValueStoreWithHeadersBuilder(
                     
Stores.persistentTimestampedKeyValueStoreWithHeaders(STORE_NAME),
                     Serdes.Integer(),
                     Serdes.String()
@@ -210,7 +210,7 @@ public class TimestampedKeyValueStoreWithHeadersTest {
 
         streamsBuilder
             .addStateStore(
-                Stores.timestampedKeyValueStoreBuilderWithHeaders(
+                Stores.timestampedKeyValueStoreWithHeadersBuilder(
                     
Stores.persistentTimestampedKeyValueStoreWithHeaders(STORE_NAME),
                     Serdes.Integer(),
                     Serdes.String()
@@ -264,7 +264,7 @@ public class TimestampedKeyValueStoreWithHeadersTest {
 
         streamsBuilder
             .addStateStore(
-                Stores.timestampedKeyValueStoreBuilderWithHeaders(
+                Stores.timestampedKeyValueStoreWithHeadersBuilder(
                     
Stores.persistentTimestampedKeyValueStoreWithHeaders(STORE_NAME),
                     Serdes.Integer(),
                     Serdes.String()
@@ -356,7 +356,7 @@ public class TimestampedKeyValueStoreWithHeadersTest {
 
         streamsBuilder
             .addStateStore(
-                Stores.timestampedKeyValueStoreBuilderWithHeaders(
+                Stores.timestampedKeyValueStoreWithHeadersBuilder(
                     
Stores.persistentTimestampedKeyValueStoreWithHeaders(STORE_NAME),
                     Serdes.Integer(),
                     Serdes.String()
diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TimestampedWindowStoreWithHeadersTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TimestampedWindowStoreWithHeadersTest.java
index 1ae592e8901..012bc773df6 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TimestampedWindowStoreWithHeadersTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TimestampedWindowStoreWithHeadersTest.java
@@ -102,7 +102,7 @@ public class TimestampedWindowStoreWithHeadersTest {
     }
 
     @BeforeEach
-    public void beforeTest(final TestInfo testInfo) throws 
InterruptedException {
+    public void beforeTest(final TestInfo testInfo) {
         this.testInfo = testInfo;
         final String uniqueTestName = safeUniqueTestName(testInfo);
         inputStream = "input-stream-" + uniqueTestName;
@@ -199,7 +199,7 @@ public class TimestampedWindowStoreWithHeadersTest {
         // verify changelog topic properties
         final String changelogTopic = 
props.getProperty(StreamsConfig.APPLICATION_ID_CONFIG) + "-" + STORE_NAME + 
"-changelog";
         final Properties changelogTopicConfig = 
CLUSTER.getLogConfig(changelogTopic);
-        assertEquals("compact", 
changelogTopicConfig.getProperty("cleanup.policy"));
+        assertEquals("compact,delete", 
changelogTopicConfig.getProperty("cleanup.policy"));
     }
 
     @Test
@@ -375,9 +375,9 @@ public class TimestampedWindowStoreWithHeadersTest {
      */
     @SuppressWarnings("varargs")
     @SafeVarargs
-    private final int produceDataToTopic(final String topic,
-                                         final long timestamp,
-                                         final KeyValue<Integer, String>... 
keyValues) {
+    private int produceDataToTopic(final String topic,
+                                   final long timestamp,
+                                   final KeyValue<Integer, String>... 
keyValues) {
         IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
             topic,
             Arrays.asList(keyValues),
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractConfigurableStoreFactory.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractConfigurableStoreFactory.java
index 99c8c5a2bc6..6ca5142994d 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractConfigurableStoreFactory.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractConfigurableStoreFactory.java
@@ -27,13 +27,13 @@ import java.util.Set;
 public abstract class AbstractConfigurableStoreFactory implements StoreFactory 
{
     private final Set<String> connectedProcessorNames = new HashSet<>();
     private DslStoreSuppliers dslStoreSuppliers;
-    private final DslStoreFormat defaultStoreDefaultFormat;
+    private final DslStoreFormat defaultDslStoreFormat;
     private DslStoreFormat dslStoreFormatOverwrite;
 
     public AbstractConfigurableStoreFactory(final DslStoreSuppliers 
initialStoreSuppliers,
-                                            final DslStoreFormat 
defaultStoreDefaultFormat) {
+                                            final DslStoreFormat 
defaultDslStoreFormat) {
         this.dslStoreSuppliers = initialStoreSuppliers;
-        this.defaultStoreDefaultFormat = defaultStoreDefaultFormat;
+        this.defaultDslStoreFormat = defaultDslStoreFormat;
     }
 
     @Override
@@ -57,7 +57,7 @@ public abstract class AbstractConfigurableStoreFactory 
implements StoreFactory {
     }
 
     public DslStoreFormat dslStoreFormat() {
-        return dslStoreFormatOverwrite == null ? defaultStoreDefaultFormat : 
dslStoreFormatOverwrite;
+        return dslStoreFormatOverwrite == null ? defaultDslStoreFormat : 
dslStoreFormatOverwrite;
     }
 
     protected DslStoreSuppliers dslStoreSuppliers() {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java
index 30b3b885c10..3e23c409df0 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java
@@ -57,7 +57,7 @@ public class KeyValueStoreMaterializer<K, V> extends 
MaterializedStoreFactory<K,
                     materialized.keySerde(),
                     materialized.valueSerde());
         } else if (supplier instanceof HeadersBytesStoreSupplier) {
-            builder = Stores.timestampedKeyValueStoreBuilderWithHeaders(
+            builder = Stores.timestampedKeyValueStoreWithHeadersBuilder(
                 supplier,
                 materialized.keySerde(),
                 materialized.valueSerde());
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionStoreMaterializer.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionStoreMaterializer.java
index d1157086382..31fa0a58c9f 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionStoreMaterializer.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionStoreMaterializer.java
@@ -70,7 +70,7 @@ public class SessionStoreMaterializer<K, V> extends 
MaterializedStoreFactory<K,
 
         final StoreBuilder<?> builder;
         if (supplier instanceof HeadersBytesStoreSupplier) {
-            builder = Stores.sessionStoreBuilderWithHeaders(
+            builder = Stores.sessionStoreWithHeadersBuilder(
                 supplier,
                 materialized.keySerde(),
                 materialized.valueSerde()
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SubscriptionStoreFactory.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SubscriptionStoreFactory.java
index 2aed2f74d34..94b59fd0c11 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SubscriptionStoreFactory.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SubscriptionStoreFactory.java
@@ -47,7 +47,7 @@ public class SubscriptionStoreFactory<K> extends 
AbstractConfigurableStoreFactor
     @Override
     public StoreBuilder<?> builder() {
         StoreBuilder<?> builder;
-        builder = Stores.timestampedKeyValueStoreBuilderWithHeaders(
+        builder = Stores.timestampedKeyValueStoreWithHeadersBuilder(
             dslStoreSuppliers().keyValueStore(new DslKeyValueParams(name, 
dslStoreFormat())),
             new Serdes.BytesSerde(),
             subscriptionWrapperSerde
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreBuilderWrapper.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreBuilderWrapper.java
index be915a8dfb6..9581f312702 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreBuilderWrapper.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreBuilderWrapper.java
@@ -19,7 +19,9 @@ package org.apache.kafka.streams.processor.internals;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.internals.SessionStoreBuilder;
+import org.apache.kafka.streams.state.internals.SessionStoreWithHeadersBuilder;
 import org.apache.kafka.streams.state.internals.TimestampedWindowStoreBuilder;
+import 
org.apache.kafka.streams.state.internals.TimestampedWindowStoreWithHeadersBuilder;
 import org.apache.kafka.streams.state.internals.VersionedKeyValueStoreBuilder;
 import org.apache.kafka.streams.state.internals.WindowStoreBuilder;
 
@@ -69,11 +71,14 @@ public class StoreBuilderWrapper implements StoreFactory {
             return ((WindowStoreBuilder<?, ?>) builder).retentionPeriod();
         } else if (builder instanceof TimestampedWindowStoreBuilder) {
             return ((TimestampedWindowStoreBuilder<?, ?>) 
builder).retentionPeriod();
+        } else if (builder instanceof 
TimestampedWindowStoreWithHeadersBuilder) {
+            return ((TimestampedWindowStoreWithHeadersBuilder<?, ?>) 
builder).retentionPeriod();
         } else if (builder instanceof SessionStoreBuilder) {
             return ((SessionStoreBuilder<?, ?>) builder).retentionPeriod();
+        } else if (builder instanceof SessionStoreWithHeadersBuilder) {
+            return ((SessionStoreWithHeadersBuilder<?, ?>) 
builder).retentionPeriod();
         } else {
-            throw new IllegalStateException(
-                    "retentionPeriod is not supported when not a window 
store");
+            throw new IllegalStateException("retentionPeriod is not supported 
when not a window store");
         }
     }
 
@@ -82,8 +87,7 @@ public class StoreBuilderWrapper implements StoreFactory {
         if (builder instanceof VersionedKeyValueStoreBuilder) {
             return ((VersionedKeyValueStoreBuilder<?, ?>) 
builder).historyRetention();
         } else {
-            throw new IllegalStateException(
-                    "historyRetention is not supported when not a versioned 
store");
+            throw new IllegalStateException("historyRetention is not supported 
when not a versioned store");
         }
     }
 
@@ -105,8 +109,10 @@ public class StoreBuilderWrapper implements StoreFactory {
     @Override
     public boolean isWindowStore() {
         return builder instanceof WindowStoreBuilder
-                || builder instanceof TimestampedWindowStoreBuilder
-                || builder instanceof SessionStoreBuilder;
+            || builder instanceof TimestampedWindowStoreBuilder
+            || builder instanceof TimestampedWindowStoreWithHeadersBuilder
+            || builder instanceof SessionStoreBuilder
+            || builder instanceof SessionStoreWithHeadersBuilder;
     }
 
     @Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java 
b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
index 14c37ed93b4..d567d525dc3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
@@ -31,7 +31,7 @@ import 
org.apache.kafka.streams.state.internals.RocksDbSessionHeadersBytesStoreS
 import 
org.apache.kafka.streams.state.internals.RocksDbVersionedKeyValueBytesStoreSupplier;
 import 
org.apache.kafka.streams.state.internals.RocksDbWindowBytesStoreSupplier;
 import org.apache.kafka.streams.state.internals.SessionStoreBuilder;
-import org.apache.kafka.streams.state.internals.SessionStoreBuilderWithHeaders;
+import org.apache.kafka.streams.state.internals.SessionStoreWithHeadersBuilder;
 import 
org.apache.kafka.streams.state.internals.TimestampedKeyValueStoreBuilder;
 import 
org.apache.kafka.streams.state.internals.TimestampedKeyValueStoreBuilderWithHeaders;
 import org.apache.kafka.streams.state.internals.TimestampedWindowStoreBuilder;
@@ -125,7 +125,7 @@ public final class Stores {
      * Create a persistent {@link KeyValueBytesStoreSupplier} that stores 
headers along with timestamps.
      * <p>
      * This store supplier can be passed into a
-     * {@link 
#timestampedKeyValueStoreBuilderWithHeaders(KeyValueBytesStoreSupplier, Serde, 
Serde)}
+     * {@link 
#timestampedKeyValueStoreWithHeadersBuilder(KeyValueBytesStoreSupplier, Serde, 
Serde)}
      * to build a {@link TimestampedKeyValueStoreWithHeaders}.
      * <p>
      * The store will persist key-value pairs along with record timestamps and 
headers,
@@ -168,8 +168,10 @@ public final class Stores {
      * @return an instance of {@link VersionedBytesStoreSupplier}
      * @throws IllegalArgumentException if {@code historyRetention} can't be 
represented as {@code long milliseconds}
      */
-    public static VersionedBytesStoreSupplier 
persistentVersionedKeyValueStore(final String name,
-                                                                               
final Duration historyRetention) {
+    public static VersionedBytesStoreSupplier persistentVersionedKeyValueStore(
+        final String name,
+        final Duration historyRetention
+    ) {
         Objects.requireNonNull(name, "name cannot be null");
         final String hrMsgPrefix = 
prepareMillisCheckFailMsgPrefix(historyRetention, "historyRetention");
         final long historyRetentionMs = 
validateMillisecondDuration(historyRetention, hrMsgPrefix);
@@ -206,9 +208,11 @@ public final class Stores {
      * @return an instance of {@link VersionedBytesStoreSupplier}
      * @throws IllegalArgumentException if {@code historyRetention} or {@code 
segmentInterval} can't be represented as {@code long milliseconds}
      */
-    public static VersionedBytesStoreSupplier 
persistentVersionedKeyValueStore(final String name,
-                                                                               
final Duration historyRetention,
-                                                                               
final Duration segmentInterval) {
+    public static VersionedBytesStoreSupplier persistentVersionedKeyValueStore(
+        final String name,
+        final Duration historyRetention,
+        final Duration segmentInterval
+    ) {
         Objects.requireNonNull(name, "name cannot be null");
         final String hrMsgPrefix = 
prepareMillisCheckFailMsgPrefix(historyRetention, "historyRetention");
         final long historyRetentionMs = 
validateMillisecondDuration(historyRetention, hrMsgPrefix);
@@ -297,10 +301,12 @@ public final class Stores {
      * @throws IllegalArgumentException if {@code retentionPeriod} or {@code 
windowSize} can't be represented as {@code long milliseconds}
      * @throws IllegalArgumentException if {@code retentionPeriod} is smaller 
than {@code windowSize}
      */
-    public static WindowBytesStoreSupplier persistentWindowStore(final String 
name,
-                                                                 final 
Duration retentionPeriod,
-                                                                 final 
Duration windowSize,
-                                                                 final boolean 
retainDuplicates) throws IllegalArgumentException {
+    public static WindowBytesStoreSupplier persistentWindowStore(
+        final String name,
+        final Duration retentionPeriod,
+        final Duration windowSize,
+        final boolean retainDuplicates
+    ) throws IllegalArgumentException {
         return persistentWindowStore(name, retentionPeriod, windowSize, 
retainDuplicates, 
RocksDbWindowBytesStoreSupplier.WindowStoreTypes.DEFAULT_WINDOW_STORE);
     }
 
@@ -329,10 +335,12 @@ public final class Stores {
      * @throws IllegalArgumentException if {@code retentionPeriod} or {@code 
windowSize} can't be represented as {@code long milliseconds}
      * @throws IllegalArgumentException if {@code retentionPeriod} is smaller 
than {@code windowSize}
      */
-    public static WindowBytesStoreSupplier 
persistentTimestampedWindowStore(final String name,
-                                                                            
final Duration retentionPeriod,
-                                                                            
final Duration windowSize,
-                                                                            
final boolean retainDuplicates) throws IllegalArgumentException {
+    public static WindowBytesStoreSupplier persistentTimestampedWindowStore(
+        final String name,
+        final Duration retentionPeriod,
+        final Duration windowSize,
+        final boolean retainDuplicates
+    ) throws IllegalArgumentException {
         return persistentWindowStore(name, retentionPeriod, windowSize, 
retainDuplicates, 
RocksDbWindowBytesStoreSupplier.WindowStoreTypes.TIMESTAMPED_WINDOW_STORE);
     }
 
@@ -346,18 +354,22 @@ public final class Stores {
      * @return an instance of {@link WindowBytesStoreSupplier}
      * @throws IllegalArgumentException if {@code retentionPeriod} is smaller 
than {@code windowSize}
      */
-    public static WindowBytesStoreSupplier 
persistentTimestampedWindowStoreWithHeaders(final String name,
-                                                                               
        final Duration retentionPeriod,
-                                                                               
        final Duration windowSize,
-                                                                               
        final boolean retainDuplicates) throws IllegalArgumentException {
+    public static WindowBytesStoreSupplier 
persistentTimestampedWindowStoreWithHeaders(
+        final String name,
+        final Duration retentionPeriod,
+        final Duration windowSize,
+        final boolean retainDuplicates
+    ) throws IllegalArgumentException {
         return persistentWindowStore(name, retentionPeriod, windowSize, 
retainDuplicates, 
RocksDbWindowBytesStoreSupplier.WindowStoreTypes.TIMESTAMPED_WINDOW_STORE_WITH_HEADERS);
     }
 
-    private static WindowBytesStoreSupplier persistentWindowStore(final String 
name,
-                                                                  final 
Duration retentionPeriod,
-                                                                  final 
Duration windowSize,
-                                                                  final 
boolean retainDuplicates,
-                                                                  final 
RocksDbWindowBytesStoreSupplier.WindowStoreTypes storeType) {
+    private static WindowBytesStoreSupplier persistentWindowStore(
+        final String name,
+        final Duration retentionPeriod,
+        final Duration windowSize,
+        final boolean retainDuplicates,
+        final RocksDbWindowBytesStoreSupplier.WindowStoreTypes storeType
+    ) {
         Objects.requireNonNull(name, "name cannot be null");
         final String rpMsgPrefix = 
prepareMillisCheckFailMsgPrefix(retentionPeriod, "retentionPeriod");
         final long retentionMs = validateMillisecondDuration(retentionPeriod, 
rpMsgPrefix);
@@ -406,10 +418,12 @@ public final class Stores {
      * @throws IllegalArgumentException if {@code retentionPeriod} or {@code 
windowSize} can't be represented as {@code long milliseconds}
      * @throws IllegalArgumentException if {@code retentionPeriod} is smaller 
than {@code windowSize}
      */
-    public static WindowBytesStoreSupplier inMemoryWindowStore(final String 
name,
-                                                               final Duration 
retentionPeriod,
-                                                               final Duration 
windowSize,
-                                                               final boolean 
retainDuplicates) throws IllegalArgumentException {
+    public static WindowBytesStoreSupplier inMemoryWindowStore(
+        final String name,
+        final Duration retentionPeriod,
+        final Duration windowSize,
+        final boolean retainDuplicates
+    ) throws IllegalArgumentException {
         Objects.requireNonNull(name, "name cannot be null");
 
         final String repartitionPeriodErrorMessagePrefix = 
prepareMillisCheckFailMsgPrefix(retentionPeriod, "retentionPeriod");
@@ -447,8 +461,10 @@ public final class Stores {
      *                          contain the inactivity gap of the session and 
the entire grace period.)
      * @return an instance of a {@link  SessionBytesStoreSupplier}
      */
-    public static SessionBytesStoreSupplier persistentSessionStore(final 
String name,
-                                                                   final 
Duration retentionPeriod) {
+    public static SessionBytesStoreSupplier persistentSessionStore(
+        final String name,
+        final Duration retentionPeriod
+    ) {
         return persistentSessionStore(name, retentionPeriod, false);
     }
 
@@ -466,14 +482,18 @@ public final class Stores {
      *                          contain the inactivity gap of the session and 
the entire grace period.)
      * @return an instance of a {@link  SessionBytesStoreSupplier}
      */
-    public static SessionBytesStoreSupplier 
persistentSessionStoreWithHeaders(final String name,
-                                                                              
final Duration retentionPeriod) {
+    public static SessionBytesStoreSupplier persistentSessionStoreWithHeaders(
+        final String name,
+        final Duration retentionPeriod
+    ) {
         return persistentSessionStore(name, retentionPeriod, true);
     }
 
-    private static SessionBytesStoreSupplier persistentSessionStore(final 
String name,
-                                                                    final 
Duration retentionPeriod,
-                                                                    final 
boolean withHeaders) {
+    private static SessionBytesStoreSupplier persistentSessionStore(
+        final String name,
+        final Duration retentionPeriod,
+        final boolean withHeaders
+    ) {
         Objects.requireNonNull(name, "name cannot be null");
         final String msgPrefix = 
prepareMillisCheckFailMsgPrefix(retentionPeriod, "retentionPeriod");
         final long retentionPeriodMs = 
validateMillisecondDuration(retentionPeriod, msgPrefix);
@@ -521,9 +541,11 @@ public final class Stores {
      * @param <V>           value type
      * @return an instance of a {@link StoreBuilder} that can build a {@link 
KeyValueStore}
      */
-    public static <K, V> StoreBuilder<KeyValueStore<K, V>> 
keyValueStoreBuilder(final KeyValueBytesStoreSupplier supplier,
-                                                                               
 final Serde<K> keySerde,
-                                                                               
 final Serde<V> valueSerde) {
+    public static <K, V> StoreBuilder<KeyValueStore<K, V>> 
keyValueStoreBuilder(
+        final KeyValueBytesStoreSupplier supplier,
+        final Serde<K> keySerde,
+        final Serde<V> valueSerde
+    ) {
         Objects.requireNonNull(supplier, "supplier cannot be null");
         return new KeyValueStoreBuilder<>(supplier, keySerde, valueSerde, 
Time.SYSTEM);
     }
@@ -543,13 +565,35 @@ public final class Stores {
      * @param <V>           value type
      * @return an instance of a {@link StoreBuilder} that can build a {@link 
KeyValueStore}
      */
-    public static <K, V> StoreBuilder<TimestampedKeyValueStore<K, V>> 
timestampedKeyValueStoreBuilder(final KeyValueBytesStoreSupplier supplier,
-                                                                               
                       final Serde<K> keySerde,
-                                                                               
                       final Serde<V> valueSerde) {
+    public static <K, V> StoreBuilder<TimestampedKeyValueStore<K, V>> 
timestampedKeyValueStoreBuilder(
+        final KeyValueBytesStoreSupplier supplier,
+        final Serde<K> keySerde,
+        final Serde<V> valueSerde
+    ) {
         Objects.requireNonNull(supplier, "supplier cannot be null");
         return new TimestampedKeyValueStoreBuilder<>(supplier, keySerde, 
valueSerde, Time.SYSTEM);
     }
 
+    /**
+     * Creates a {@link StoreBuilder} that can be used to build a {@link 
TimestampedKeyValueStoreWithHeaders}.
+     *
+     * @param supplier      a {@link KeyValueBytesStoreSupplier} (cannot be 
{@code null})
+     * @param keySerde      the key serde to use
+     * @param valueSerde    the value serde to use; if the serialized bytes is 
{@code null} for put operations,
+     *                      it is treated as delete
+     * @param <K>           key type
+     * @param <V>           value type
+     * @return an instance of {@link StoreBuilder} than can build a {@link 
KeyValueStore}
+     */
+    public static <K, V> StoreBuilder<TimestampedKeyValueStoreWithHeaders<K, 
V>> timestampedKeyValueStoreWithHeadersBuilder(
+        final KeyValueBytesStoreSupplier supplier,
+        final Serde<K> keySerde,
+        final Serde<V> valueSerde
+    ) {
+        Objects.requireNonNull(supplier, "supplier cannot be null");
+        return new TimestampedKeyValueStoreBuilderWithHeaders<>(supplier, 
keySerde, valueSerde, Time.SYSTEM);
+    }
+
     /**
      * Creates a {@link StoreBuilder} that can be used to build a {@link 
VersionedKeyValueStore}.
      *
@@ -561,9 +605,11 @@ public final class Stores {
      * @param <V>        value type
      * @return an instance of a {@link StoreBuilder} that can build a {@link 
VersionedKeyValueStore}
      */
-    public static <K, V> StoreBuilder<VersionedKeyValueStore<K, V>> 
versionedKeyValueStoreBuilder(final VersionedBytesStoreSupplier supplier,
-                                                                               
                   final Serde<K> keySerde,
-                                                                               
                   final Serde<V> valueSerde) {
+    public static <K, V> StoreBuilder<VersionedKeyValueStore<K, V>> 
versionedKeyValueStoreBuilder(
+        final VersionedBytesStoreSupplier supplier,
+        final Serde<K> keySerde,
+        final Serde<V> valueSerde
+    ) {
         Objects.requireNonNull(supplier, "supplier cannot be null");
         return new VersionedKeyValueStoreBuilder<>(supplier, keySerde, 
valueSerde, Time.SYSTEM);
     }
@@ -582,9 +628,11 @@ public final class Stores {
      * @param <V>           value type
      * @return an instance of {@link StoreBuilder} than can build a {@link 
WindowStore}
      */
-    public static <K, V> StoreBuilder<WindowStore<K, V>> 
windowStoreBuilder(final WindowBytesStoreSupplier supplier,
-                                                                            
final Serde<K> keySerde,
-                                                                            
final Serde<V> valueSerde) {
+    public static <K, V> StoreBuilder<WindowStore<K, V>> windowStoreBuilder(
+        final WindowBytesStoreSupplier supplier,
+        final Serde<K> keySerde,
+        final Serde<V> valueSerde
+    ) {
         Objects.requireNonNull(supplier, "supplier cannot be null");
         return new WindowStoreBuilder<>(supplier, keySerde, valueSerde, 
Time.SYSTEM);
     }
@@ -604,9 +652,11 @@ public final class Stores {
      * @param <V>           value type
      * @return an instance of {@link StoreBuilder} that can build a {@link 
TimestampedWindowStore}
      */
-    public static <K, V> StoreBuilder<TimestampedWindowStore<K, V>> 
timestampedWindowStoreBuilder(final WindowBytesStoreSupplier supplier,
-                                                                               
                   final Serde<K> keySerde,
-                                                                               
                   final Serde<V> valueSerde) {
+    public static <K, V> StoreBuilder<TimestampedWindowStore<K, V>> 
timestampedWindowStoreBuilder(
+        final WindowBytesStoreSupplier supplier,
+        final Serde<K> keySerde,
+        final Serde<V> valueSerde
+    ) {
         Objects.requireNonNull(supplier, "supplier cannot be null");
         return new TimestampedWindowStoreBuilder<>(supplier, keySerde, 
valueSerde, Time.SYSTEM);
     }
@@ -624,7 +674,8 @@ public final class Stores {
     public static <K, V> StoreBuilder<TimestampedWindowStoreWithHeaders<K, V>> 
timestampedWindowStoreWithHeadersBuilder(
             final WindowBytesStoreSupplier supplier,
             final Serde<K> keySerde,
-            final Serde<V> valueSerde) {
+            final Serde<V> valueSerde
+    ) {
         Objects.requireNonNull(supplier, "supplier cannot be null");
         return new TimestampedWindowStoreWithHeadersBuilder<>(supplier, 
keySerde, valueSerde, Time.SYSTEM);
     }
@@ -640,9 +691,11 @@ public final class Stores {
      * @param <V>           value type
      * @return an instance of {@link StoreBuilder} than can build a {@link 
SessionStore}
      */
-    public static <K, V> StoreBuilder<SessionStore<K, V>> 
sessionStoreBuilder(final SessionBytesStoreSupplier supplier,
-                                                                              
final Serde<K> keySerde,
-                                                                              
final Serde<V> valueSerde) {
+    public static <K, V> StoreBuilder<SessionStore<K, V>> sessionStoreBuilder(
+        final SessionBytesStoreSupplier supplier,
+        final Serde<K> keySerde,
+        final Serde<V> valueSerde
+    ) {
         Objects.requireNonNull(supplier, "supplier cannot be null");
         return new SessionStoreBuilder<>(supplier, keySerde, valueSerde, 
Time.SYSTEM);
     }
@@ -658,29 +711,13 @@ public final class Stores {
      * @param <V>           value type
      * @return an instance of {@link StoreBuilder} than can build a {@link 
SessionStoreWithHeaders}
      */
-    public static <K, V> StoreBuilder<SessionStoreWithHeaders<K, V>> 
sessionStoreBuilderWithHeaders(
+    public static <K, V> StoreBuilder<SessionStoreWithHeaders<K, V>> 
sessionStoreWithHeadersBuilder(
             final SessionBytesStoreSupplier supplier,
             final Serde<K> keySerde,
-            final Serde<V> valueSerde) {
+            final Serde<V> valueSerde
+    ) {
         Objects.requireNonNull(supplier, "supplier cannot be null");
-        return new SessionStoreBuilderWithHeaders<>(supplier, keySerde, 
valueSerde, Time.SYSTEM);
+        return new SessionStoreWithHeadersBuilder<>(supplier, keySerde, 
valueSerde, Time.SYSTEM);
     }
 
-    /**
-     * Creates a {@link StoreBuilder} that can be used to build a {@link 
TimestampedKeyValueStoreWithHeaders}.
-     *
-     * @param supplier      a {@link KeyValueBytesStoreSupplier} (cannot be 
{@code null})
-     * @param keySerde      the key serde to use
-     * @param valueSerde    the value serde to use; if the serialized bytes is 
{@code null} for put operations,
-     *                      it is treated as delete
-     * @param <K>           key type
-     * @param <V>           value type
-     * @return an instance of {@link StoreBuilder} than can build a {@link 
KeyValueStore}
-     */
-    public static <K, V> StoreBuilder<TimestampedKeyValueStoreWithHeaders<K, 
V>> timestampedKeyValueStoreBuilderWithHeaders(final KeyValueBytesStoreSupplier 
supplier,
-                                                                               
                                             final Serde<K> keySerde,
-                                                                               
                                             final Serde<V> valueSerde) {
-        Objects.requireNonNull(supplier, "supplier cannot be null");
-        return new TimestampedKeyValueStoreBuilderWithHeaders<>(supplier, 
keySerde, valueSerde, Time.SYSTEM);
-    }
 }
\ No newline at end of file
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionStoreBuilderWithHeaders.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionStoreWithHeadersBuilder.java
similarity index 98%
rename from 
streams/src/main/java/org/apache/kafka/streams/state/internals/SessionStoreBuilderWithHeaders.java
rename to 
streams/src/main/java/org/apache/kafka/streams/state/internals/SessionStoreWithHeadersBuilder.java
index d09886b3e08..c6224976fa9 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionStoreBuilderWithHeaders.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionStoreWithHeadersBuilder.java
@@ -37,12 +37,12 @@ import java.util.Objects;
  * {@link AggregationWithHeaders} as the value wrapper and wires up the
  * header-aware store stack (change-logging, caching, metering).
  */
-public class SessionStoreBuilderWithHeaders<K, V>
+public class SessionStoreWithHeadersBuilder<K, V>
     extends AbstractStoreBuilder<K, AggregationWithHeaders<V>, 
SessionStoreWithHeaders<K, V>> {
 
     private final SessionBytesStoreSupplier storeSupplier;
 
-    public SessionStoreBuilderWithHeaders(final SessionBytesStoreSupplier 
storeSupplier,
+    public SessionStoreWithHeadersBuilder(final SessionBytesStoreSupplier 
storeSupplier,
                                           final Serde<K> keySerde,
                                           final Serde<V> valueSerde,
                                           final Time time) {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
index 5c18ef5662c..a43b5689106 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
@@ -154,7 +154,7 @@ public class KStreamSessionWindowAggregateProcessorTest {
             Stores.persistentSessionStore(STORE_NAME, ofMillis(GAP_MS * 3));
 
         final StoreBuilder<SessionStoreWithHeaders<String, Long>> storeBuilder 
=
-            Stores.sessionStoreBuilderWithHeaders(supplier, Serdes.String(), 
Serdes.Long())
+            Stores.sessionStoreWithHeadersBuilder(supplier, Serdes.String(), 
Serdes.Long())
                 .withLoggingDisabled();
 
         if (enableCaching && emitStrategy.type() != 
EmitStrategy.StrategyType.ON_WINDOW_CLOSE) {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignTableJoinProcessorSupplierTests.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignTableJoinProcessorSupplierTests.java
index f06b59a544c..c3ba367a305 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignTableJoinProcessorSupplierTests.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignTableJoinProcessorSupplierTests.java
@@ -210,7 +210,7 @@ public class ForeignTableJoinProcessorSupplierTests {
     private StoreBuilder<TimestampedKeyValueStoreWithHeaders<Bytes, 
SubscriptionWrapper<String>>> storeBuilder() {
         final Serde<SubscriptionWrapper<String>> subscriptionWrapperSerde = 
new SubscriptionWrapperSerde<>(
             PK_SERDE_TOPIC_SUPPLIER, Serdes.String());
-        return Stores.timestampedKeyValueStoreBuilderWithHeaders(
+        return Stores.timestampedKeyValueStoreWithHeadersBuilder(
             Stores.persistentTimestampedKeyValueStore(
                 "Store"
             ),
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionReceiveProcessorSupplierTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionReceiveProcessorSupplierTest.java
index f3dd326adcd..7a7fb74cd38 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionReceiveProcessorSupplierTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionReceiveProcessorSupplierTest.java
@@ -519,7 +519,7 @@ public class SubscriptionReceiveProcessorSupplierTest {
     private StoreBuilder<TimestampedKeyValueStoreWithHeaders<Bytes, 
SubscriptionWrapper<String>>> storeBuilder() {
         final Serde<SubscriptionWrapper<String>> subscriptionWrapperSerde = 
new SubscriptionWrapperSerde<>(
             PK_SERDE_TOPIC_SUPPLIER, Serdes.String());
-        return Stores.timestampedKeyValueStoreBuilderWithHeaders(
+        return Stores.timestampedKeyValueStoreWithHeadersBuilder(
             Stores.persistentTimestampedKeyValueStoreWithHeaders(
                 "Store"
             ),
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/QueryableStoreTypesWithHeadersTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/QueryableStoreTypesWithHeadersTest.java
index e361daae09d..e59050d912f 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/QueryableStoreTypesWithHeadersTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/QueryableStoreTypesWithHeadersTest.java
@@ -30,7 +30,7 @@ public class QueryableStoreTypesWithHeadersTest {
     @Test
     public void 
shouldAcceptTimestampedKeyValueStoreWithHeadersForTimestampedKeyValueStoreType()
 {
         final TimestampedKeyValueStoreWithHeaders<String, String> store =
-            Stores.timestampedKeyValueStoreBuilderWithHeaders(
+            Stores.timestampedKeyValueStoreWithHeadersBuilder(
                 Stores.inMemoryKeyValueStore("test-store"),
                 Serdes.String(),
                 Serdes.String())
@@ -45,7 +45,7 @@ public class QueryableStoreTypesWithHeadersTest {
     @Test
     public void 
shouldAcceptTimestampedKeyValueStoreWithHeadersForKeyValueStoreType() {
         final TimestampedKeyValueStoreWithHeaders<String, String> store =
-            Stores.timestampedKeyValueStoreBuilderWithHeaders(
+            Stores.timestampedKeyValueStoreWithHeadersBuilder(
                 Stores.inMemoryKeyValueStore("test-store"),
                 Serdes.String(),
                 Serdes.String())
@@ -60,7 +60,7 @@ public class QueryableStoreTypesWithHeadersTest {
     @Test
     public void 
shouldNotAcceptTimestampedKeyValueStoreWithHeadersForWindowStoreType() {
         final TimestampedKeyValueStoreWithHeaders<String, String> store =
-            Stores.timestampedKeyValueStoreBuilderWithHeaders(
+            Stores.timestampedKeyValueStoreWithHeadersBuilder(
                 Stores.inMemoryKeyValueStore("test-store"),
                 Serdes.String(),
                 Serdes.String())
@@ -200,7 +200,7 @@ public class QueryableStoreTypesWithHeadersTest {
     @Test
     public void shouldAcceptSessionStoreWithHeadersForSessionStoreType() {
         final SessionStoreWithHeaders<String, String> store =
-            Stores.sessionStoreBuilderWithHeaders(
+            Stores.sessionStoreWithHeadersBuilder(
                 Stores.inMemorySessionStore(
                     "test-session-store",
                     Duration.ofMillis(100)),
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProviderTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProviderTest.java
index cd8692137dd..f96f8e96d99 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProviderTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProviderTest.java
@@ -111,7 +111,7 @@ public class GlobalStateStoreProviderTest {
                 Serdes.String()).build());
         stores.put(
             "ts-kv-store-with-headers",
-            Stores.timestampedKeyValueStoreBuilderWithHeaders(
+            Stores.timestampedKeyValueStoreWithHeadersBuilder(
                 Stores.inMemoryKeyValueStore("ts-kv-store-with-headers"),
                 Serdes.String(),
                 Serdes.String()).build());
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionStoreWithHeadersSerializerSideEffectTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionStoreWithHeadersSerializerSideEffectTest.java
index b5eda458c78..500fb9a7cfe 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionStoreWithHeadersSerializerSideEffectTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionStoreWithHeadersSerializerSideEffectTest.java
@@ -135,7 +135,7 @@ public class 
SessionStoreWithHeadersSerializerSideEffectTest {
 
         // Create a session store with headers using our custom serializer
         builder.addStateStore(
-            Stores.sessionStoreBuilderWithHeaders(
+            Stores.sessionStoreWithHeadersBuilder(
                 Stores.inMemorySessionStore(
                     STORE_NAME,
                     Duration.ofMillis(10000L)
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
index 180cd121283..0bd44815997 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
@@ -155,7 +155,7 @@ public class StreamThreadStateStoreProviderTest {
                 Serdes.String()),
             "the-processor");
         topology.addStateStore(
-            Stores.timestampedKeyValueStoreBuilderWithHeaders(
+            Stores.timestampedKeyValueStoreWithHeadersBuilder(
                 
Stores.inMemoryKeyValueStore("timestamped-kv-store-with-headers"),
                 Serdes.String(),
                 Serdes.String()),
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreWithHeadersSerializerSideEffectTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreWithHeadersSerializerSideEffectTest.java
index 918e3e37d7a..2fe5671fea0 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreWithHeadersSerializerSideEffectTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreWithHeadersSerializerSideEffectTest.java
@@ -132,7 +132,7 @@ public class 
TimestampedKeyValueStoreWithHeadersSerializerSideEffectTest {
 
         // Create a timestamped key-value store with headers using our custom 
serializer
         builder.addStateStore(
-            Stores.timestampedKeyValueStoreBuilderWithHeaders(
+            Stores.timestampedKeyValueStoreWithHeadersBuilder(
                 Stores.inMemoryKeyValueStore(STORE_NAME),
                 new HeaderAddingSerde(),  // Custom key serializer that adds 
headers
                 Serdes.String()
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/UtilsTest.java 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/UtilsTest.java
index 631f906ee83..690506edbb3 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/UtilsTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/UtilsTest.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.errors.SerializationException;
 import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.ByteUtils;
 import org.apache.kafka.streams.state.StateSerdes;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
@@ -55,7 +56,7 @@ public class UtilsTest {
     // Header size's varint encoding cannot exceed 5 bytes (see @{link 
ByteUtils#readVarint(ByteBuffer)})
     private static final int MAX_VARINT_SIZE = 5;
     private static final int OVERFLOW_HEADERS_SIZE = (1 + MAX_VARINT_SIZE) + 
HEADERS.length + StateSerdes.TIMESTAMP_SIZE + VALUE.length;
-    // 1 byte header size, 0 byte empty headers, and timetsamp
+    // 1 byte header size, 0 byte empty headers, and timestamp
     private static final int MIN_SIZE = 1 + 0 + StateSerdes.TIMESTAMP_SIZE;
 
     @Test
@@ -132,7 +133,7 @@ public class UtilsTest {
         final Headers headers = new RecordHeaders().add("key1", 
"value1".getBytes(StandardCharsets.UTF_8));
         final ValueTimestampHeaders<String> input = 
ValueTimestampHeaders.make(VALUE_STR, TIMESTAMP, headers);
         try (
-            final ValueTimestampHeadersSerializer<String> serializer = new 
ValueTimestampHeadersSerializer<>(Serdes.String().serializer());
+            final ValueTimestampHeadersSerializer<String> serializer = new 
ValueTimestampHeadersSerializer<>(new StringSerializer());
             final ValueAndTimestampSerde<String> stringSerde = new 
ValueAndTimestampSerde<>(Serdes.String())
         ) {
             final byte[] inputBytes = serializer.serialize(TOPIC, input);
@@ -157,9 +158,9 @@ public class UtilsTest {
 
     @ParameterizedTest
     @ValueSource(bytes = { 0x10, 0x11 })
-    public void testEmptyHeadersAndTimestampWithInvalidHeaderSizes(final int 
invalidSize) {
+    public void testEmptyHeadersAndTimestampWithInvalidHeaderSizes(final byte 
invalidSize) {
         final byte[] invalid = new byte[MIN_SIZE];
-        invalid[0] = (byte) invalidSize; // header size
+        invalid[0] = invalidSize; // header size
         assertFalse(hasEmptyHeaders(invalid));
     }
 
@@ -186,12 +187,12 @@ public class UtilsTest {
     }
 
     private static byte[] timestampedValueWithEmptyHeaders(final byte[] value) 
{
-        // header size: 1 byte, emtpy headers: 0 byte, timestamp: 8 bytes, 
plain value length
+        // header size: 1 byte, empty headers: 0 byte, timestamp: 8 bytes, 
plain value length
         final byte[] res = new byte[1 + 0 + StateSerdes.TIMESTAMP_SIZE + 
value.length];
         final ByteBuffer buf = ByteBuffer.wrap(res);
         buf.put((byte) 0x00); // header size
         buf.putLong(TIMESTAMP);
-        buf.put(VALUE);
+        buf.put(value);
         return res;
     }
 

Reply via email to