This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 4.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.3 by this push:
     new 384db3be186 KAFKA-20194: Add sesssion-header-store support to TDD 
(#21956)
384db3be186 is described below

commit 384db3be18618ca029539f5ad10fa17f9be45088
Author: Matthias J. Sax <[email protected]>
AuthorDate: Wed Apr 8 11:47:29 2026 -0700

    KAFKA-20194: Add sesssion-header-store support to TDD (#21956)
    
    Adds missing method TopologyTestDriver.getSessionStoreWithHeader(), and
    updates  TopologyTestDriverTest for all newly added header store types.
    
    Reviewers: TengYao Chi <[email protected]>, Alieh Saeedi
     <[email protected]>
---
 checkstyle/suppressions.xml                        |  10 +-
 .../org/apache/kafka/streams/state/Stores.java     |   7 +-
 .../state/internals/GlobalStateStoreProvider.java  |  21 +-
 .../internals/StreamThreadStateStoreProvider.java  |  14 +-
 .../apache/kafka/streams/TopologyTestDriver.java   |  91 ++++---
 .../kafka/streams/TopologyTestDriverTest.java      | 293 +++++++++++++++------
 6 files changed, 286 insertions(+), 150 deletions(-)

diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 8d00d2dca5b..dd72b56524c 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -202,7 +202,10 @@
               
files="(RecordCollectorTest|StreamsPartitionAssignorTest|StreamThreadTest|StreamTaskTest|TaskManagerTest|TopologyTestDriverTest|KafkaStreamsTest|EosIntegrationTest|RestoreIntegrationTest).java"/>
 
     <suppress checks="MethodLength"
-              
files="(EosIntegrationTest|KStreamKStreamJoinTest|RocksDBWindowStoreTest).java"/>
+              
files="(EosIntegrationTest|KStreamKStreamJoinTest|KStreamKStreamLeftJoinTest|KStreamKStreamOuterJoinTest|KStreamSlidingWindowAggregateTest|KTableKTableForeignKeyJoinIntegrationTest|RocksDBWindowStoreTest|TopologyTestDriverTest).java"/>
+
+    <suppress checks="ParameterNumber"
+              files="(TopologyTestDriverTest).java"/>
 
     <suppress checks="ClassDataAbstractionCoupling"
               files=".*[/\\]streams[/\\].*test[/\\].*.java"/>
@@ -211,7 +214,7 @@
               
files="(KStreamKStreamJoinTest|KTableKTableForeignKeyJoinIntegrationTest|RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest|RelationalSmokeTest|MockProcessorContextStateStoreTest|IQv2StoreIntegrationTest|StreamsConfigTest).java"/>
 
     <suppress checks="JavaNCSS"
-              
files="(KStreamKStreamJoinTest|StreamThreadTest|TaskManagerTest|StreamTaskTest).java"/>
+              
files="(KStreamKStreamJoinTest|StreamTaskTest|StreamThreadTest|TaskManagerTest|TopologyTestDriverTest).java"/>
 
     <suppress checks="NPathComplexity"
               
files="(KStreamKStreamJoinTest|KTableKTableForeignKeyJoinIntegrationTest|RelationalSmokeTest|MockProcessorContextStateStoreTest|TopologyTestDriverTest|IQv2StoreIntegrationTest).java"/>
@@ -219,9 +222,6 @@
     <suppress 
checks="(FinalLocalVariable|WhitespaceAround|LocalVariableName|ImportControl)"
               files="Murmur3Test.java"/>
 
-    <suppress checks="MethodLength"
-              
files="(KStreamSlidingWindowAggregateTest|KStreamKStreamLeftJoinTest|KStreamKStreamOuterJoinTest|KTableKTableForeignKeyJoinIntegrationTest).java"/>
-
     <!-- Streams test-utils -->
     <suppress checks="ClassFanOutComplexity|ClassDataAbstractionCoupling"
               files="TopologyTestDriver.java"/>
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java 
b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
index d567d525dc3..dfa2a2e2bd0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
@@ -712,12 +712,11 @@ public final class Stores {
      * @return an instance of {@link StoreBuilder} than can build a {@link 
SessionStoreWithHeaders}
      */
     public static <K, V> StoreBuilder<SessionStoreWithHeaders<K, V>> 
sessionStoreWithHeadersBuilder(
-            final SessionBytesStoreSupplier supplier,
-            final Serde<K> keySerde,
-            final Serde<V> valueSerde
+        final SessionBytesStoreSupplier supplier,
+        final Serde<K> keySerde,
+        final Serde<V> valueSerde
     ) {
         Objects.requireNonNull(supplier, "supplier cannot be null");
         return new SessionStoreWithHeadersBuilder<>(supplier, keySerde, 
valueSerde, Time.SYSTEM);
     }
-
 }
\ No newline at end of file
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProvider.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProvider.java
index 5285b974759..b4ce0a01cd3 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProvider.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProvider.java
@@ -49,29 +49,24 @@ public class GlobalStateStoreProvider implements 
StateStoreProvider {
         }
         if (store instanceof TimestampedKeyValueStoreWithHeaders) {
             if (queryableStoreType instanceof 
QueryableStoreTypes.KeyValueStoreType) {
-                return (List<T>) Collections.singletonList(new 
GenericReadOnlyKeyValueStoreFacade<>((TimestampedKeyValueStoreWithHeaders<Object,
 Object>) store, ValueConverters.extractValueFromHeaders()));
+                return (List<T>) Collections.singletonList(new 
GenericReadOnlyKeyValueStoreFacade<>((TimestampedKeyValueStoreWithHeaders<?, 
?>) store, ValueConverters.extractValueFromHeaders()));
             } else if (queryableStoreType instanceof 
QueryableStoreTypes.TimestampedKeyValueStoreType) {
-                return (List<T>) Collections.singletonList(new 
GenericReadOnlyKeyValueStoreFacade<>((TimestampedKeyValueStoreWithHeaders<Object,
 Object>) store, ValueConverters.extractValueAndTimestampFromHeaders()));
-            } else {
-                // For custom query types, return the raw store so they can 
access headers directly
-                return (List<T>) Collections.singletonList(store);
+                return (List<T>) Collections.singletonList(new 
GenericReadOnlyKeyValueStoreFacade<>((TimestampedKeyValueStoreWithHeaders<?, 
?>) store, ValueConverters.extractValueAndTimestampFromHeaders()));
             }
         } else if (store instanceof TimestampedKeyValueStore && 
queryableStoreType instanceof QueryableStoreTypes.KeyValueStoreType) {
-            return (List<T>) Collections.singletonList(new 
GenericReadOnlyKeyValueStoreFacade<>((TimestampedKeyValueStore<Object, Object>) 
store, ValueConverters.extractValue()));
+            return (List<T>) Collections.singletonList(new 
GenericReadOnlyKeyValueStoreFacade<>((TimestampedKeyValueStore<?, ?>) store, 
ValueConverters.extractValue()));
         } else if (store instanceof TimestampedWindowStoreWithHeaders) {
             if (queryableStoreType instanceof 
QueryableStoreTypes.WindowStoreType) {
-                return (List<T>) Collections.singletonList(new 
GenericReadOnlyWindowStoreFacade<>((TimestampedWindowStoreWithHeaders<Object, 
Object>) store, ValueConverters.extractValueFromHeaders()));
+                return (List<T>) Collections.singletonList(new 
GenericReadOnlyWindowStoreFacade<>((TimestampedWindowStoreWithHeaders<?, ?>) 
store, ValueConverters.extractValueFromHeaders()));
             } else if (queryableStoreType instanceof 
QueryableStoreTypes.TimestampedWindowStoreType) {
-                return (List<T>) Collections.singletonList(new 
GenericReadOnlyWindowStoreFacade<>((TimestampedWindowStoreWithHeaders<Object, 
Object>) store, ValueConverters.extractValueAndTimestampFromHeaders()));
-            } else {
-                // For custom query types, return the raw store so they can 
access headers directly
-                return (List<T>) Collections.singletonList(store);
+                return (List<T>) Collections.singletonList(new 
GenericReadOnlyWindowStoreFacade<>((TimestampedWindowStoreWithHeaders<?, ?>) 
store, ValueConverters.extractValueAndTimestampFromHeaders()));
             }
         } else if (store instanceof TimestampedWindowStore && 
queryableStoreType instanceof QueryableStoreTypes.WindowStoreType) {
-            return (List<T>) Collections.singletonList(new 
GenericReadOnlyWindowStoreFacade<>((TimestampedWindowStore<Object, Object>) 
store, ValueConverters.extractValue()));
+            return (List<T>) Collections.singletonList(new 
GenericReadOnlyWindowStoreFacade<>((TimestampedWindowStore<?, ?>) store, 
ValueConverters.extractValue()));
         } else if (store instanceof SessionStoreWithHeaders && 
queryableStoreType instanceof QueryableStoreTypes.SessionStoreType) {
-            return (List<T>) Collections.singletonList(new 
ReadOnlySessionStoreFacade<>((SessionStoreWithHeaders<Object, Object>) store));
+            return (List<T>) Collections.singletonList(new 
ReadOnlySessionStoreFacade<>((SessionStoreWithHeaders<?, ?>) store));
         }
+
         return (List<T>) Collections.singletonList(store);
     }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java
index f9dd0250563..321b9bf60a2 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java
@@ -108,22 +108,22 @@ public class StreamThreadStateStoreProvider {
             }
             if (store instanceof TimestampedKeyValueStoreWithHeaders) {
                 if (queryableStoreType instanceof 
QueryableStoreTypes.KeyValueStoreType) {
-                    return (T) new 
GenericReadOnlyKeyValueStoreFacade<>((TimestampedKeyValueStoreWithHeaders<Object,
 Object>) store, ValueConverters.extractValueFromHeaders());
+                    return (T) new 
GenericReadOnlyKeyValueStoreFacade<>((TimestampedKeyValueStoreWithHeaders<?, 
?>) store, ValueConverters.extractValueFromHeaders());
                 } else if (queryableStoreType instanceof 
QueryableStoreTypes.TimestampedKeyValueStoreType) {
-                    return (T) new 
GenericReadOnlyKeyValueStoreFacade<>((TimestampedKeyValueStoreWithHeaders<Object,
 Object>) store, ValueConverters.extractValueAndTimestampFromHeaders());
+                    return (T) new 
GenericReadOnlyKeyValueStoreFacade<>((TimestampedKeyValueStoreWithHeaders<?, 
?>) store, ValueConverters.extractValueAndTimestampFromHeaders());
                 }
             } else if (store instanceof TimestampedKeyValueStore && 
queryableStoreType instanceof QueryableStoreTypes.KeyValueStoreType) {
-                return (T) new 
GenericReadOnlyKeyValueStoreFacade<>((TimestampedKeyValueStore<Object, Object>) 
store, ValueConverters.extractValue());
+                return (T) new 
GenericReadOnlyKeyValueStoreFacade<>((TimestampedKeyValueStore<?, ?>) store, 
ValueConverters.extractValue());
             } else if (store instanceof TimestampedWindowStoreWithHeaders) {
                 if (queryableStoreType instanceof 
QueryableStoreTypes.WindowStoreType) {
-                    return (T) new 
GenericReadOnlyWindowStoreFacade<>((TimestampedWindowStoreWithHeaders<Object, 
Object>) store, ValueConverters.extractValueFromHeaders());
+                    return (T) new 
GenericReadOnlyWindowStoreFacade<>((TimestampedWindowStoreWithHeaders<?, ?>) 
store, ValueConverters.extractValueFromHeaders());
                 } else if (queryableStoreType instanceof 
QueryableStoreTypes.TimestampedWindowStoreType) {
-                    return (T) new 
GenericReadOnlyWindowStoreFacade<>((TimestampedWindowStoreWithHeaders<Object, 
Object>) store, ValueConverters.extractValueAndTimestampFromHeaders());
+                    return (T) new 
GenericReadOnlyWindowStoreFacade<>((TimestampedWindowStoreWithHeaders<?, ?>) 
store, ValueConverters.extractValueAndTimestampFromHeaders());
                 }
             } else if (store instanceof TimestampedWindowStore && 
queryableStoreType instanceof QueryableStoreTypes.WindowStoreType) {
-                return (T) new 
GenericReadOnlyWindowStoreFacade<>((TimestampedWindowStore<Object, Object>) 
store, ValueConverters.extractValue());
+                return (T) new 
GenericReadOnlyWindowStoreFacade<>((TimestampedWindowStore<?, ?>) store, 
ValueConverters.extractValue());
             } else if (store instanceof SessionStoreWithHeaders && 
queryableStoreType instanceof QueryableStoreTypes.SessionStoreType) {
-                return (T) new 
ReadOnlySessionStoreFacade<>((SessionStoreWithHeaders<Object, Object>) store);
+                return (T) new 
ReadOnlySessionStoreFacade<>((SessionStoreWithHeaders<?, ?>) store);
             }
 
             return (T) store;
diff --git 
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
 
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index ae56c798776..b7300f14454 100644
--- 
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++ 
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -49,13 +49,11 @@ import 
org.apache.kafka.streams.internals.StreamsConfigUtils;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.processor.PunctuationType;
 import org.apache.kafka.streams.processor.Punctuator;
-import org.apache.kafka.streams.processor.StandbyUpdateListener.SuspendReason;
 import org.apache.kafka.streams.processor.StateRestoreListener;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.api.ProcessorContext;
-import org.apache.kafka.streams.processor.internals.ChangelogRegister;
 import org.apache.kafka.streams.processor.internals.ClientUtils;
 import org.apache.kafka.streams.processor.internals.GlobalProcessorContextImpl;
 import org.apache.kafka.streams.processor.internals.GlobalStateManager;
@@ -111,7 +109,6 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.time.Duration;
 import java.time.Instant;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -891,12 +888,13 @@ public class TopologyTestDriver implements Closeable {
      * @see #getStateStore(String)
      * @see #getKeyValueStore(String)
      * @see #getTimestampedKeyValueStore(String)
-     * @see #getVersionedKeyValueStore(String)
      * @see #getTimestampedKeyValueStoreWithHeaders(String)
+     * @see #getVersionedKeyValueStore(String)
      * @see #getWindowStore(String)
      * @see #getTimestampedWindowStore(String)
      * @see #getTimestampedWindowStoreWithHeaders(String)
      * @see #getSessionStore(String)
+     * @see #getSessionStoreWithHeaders(String)
      */
     public Map<String, StateStore> getAllStateStores() {
         final Map<String, StateStore> allStores = new HashMap<>();
@@ -924,12 +922,13 @@ public class TopologyTestDriver implements Closeable {
      * @see #getAllStateStores()
      * @see #getKeyValueStore(String)
      * @see #getTimestampedKeyValueStore(String)
-     * @see #getVersionedKeyValueStore(String)
      * @see #getTimestampedKeyValueStoreWithHeaders(String)
+     * @see #getVersionedKeyValueStore(String)
      * @see #getWindowStore(String)
      * @see #getTimestampedWindowStore(String)
      * @see #getTimestampedWindowStoreWithHeaders(String)
      * @see #getSessionStore(String)
+     * @see #getSessionStoreWithHeaders(String)
      */
     public StateStore getStateStore(final String name) throws 
IllegalArgumentException {
         return getStateStore(name, true);
@@ -1006,12 +1005,13 @@ public class TopologyTestDriver implements Closeable {
      * @see #getAllStateStores()
      * @see #getStateStore(String)
      * @see #getTimestampedKeyValueStore(String)
-     * @see #getVersionedKeyValueStore(String)
      * @see #getTimestampedKeyValueStoreWithHeaders(String)
+     * @see #getVersionedKeyValueStore(String)
      * @see #getWindowStore(String)
      * @see #getTimestampedWindowStore(String)
      * @see #getTimestampedWindowStoreWithHeaders(String)
      * @see #getSessionStore(String)
+     * @see #getSessionStoreWithHeaders(String)
      */
     @SuppressWarnings("unchecked")
     public <K, V> KeyValueStore<K, V> getKeyValueStore(final String name) {
@@ -1039,12 +1039,13 @@ public class TopologyTestDriver implements Closeable {
      * @see #getAllStateStores()
      * @see #getStateStore(String)
      * @see #getKeyValueStore(String)
-     * @see #getVersionedKeyValueStore(String)
      * @see #getTimestampedKeyValueStoreWithHeaders(String)
+     * @see #getVersionedKeyValueStore(String)
      * @see #getWindowStore(String)
      * @see #getTimestampedWindowStore(String)
      * @see #getTimestampedWindowStoreWithHeaders(String)
      * @see #getSessionStore(String)
+     * @see #getSessionStoreWithHeaders(String)
      */
     @SuppressWarnings("unchecked")
     public <K, V> KeyValueStore<K, ValueAndTimestamp<V>> 
getTimestampedKeyValueStore(final String name) {
@@ -1056,53 +1057,55 @@ public class TopologyTestDriver implements Closeable {
     }
 
     /**
-     * Get the {@link VersionedKeyValueStore} with the given name.
+     * Get the {@link TimestampedKeyValueStoreWithHeaders} with the given name.
      * The store can be a "regular" or global store.
      * <p>
      * This is often useful in test cases to pre-populate the store before the 
test case instructs the topology to
      * {@link TestInputTopic#pipeInput(TestRecord) process an input message}, 
and/or to check the store afterward.
      *
      * @param name the name of the store
-     * @return the key value store, or {@code null} if no {@link 
VersionedKeyValueStore} has been registered with the given name
+     * @return the key value store, or {@code null} if no {@link 
TimestampedKeyValueStoreWithHeaders} has been registered with the given name
      * @see #getAllStateStores()
      * @see #getStateStore(String)
      * @see #getKeyValueStore(String)
      * @see #getTimestampedKeyValueStore(String)
-     * @see #getTimestampedKeyValueStoreWithHeaders(String)
+     * @see #getVersionedKeyValueStore(String)
      * @see #getWindowStore(String)
      * @see #getTimestampedWindowStore(String)
      * @see #getTimestampedWindowStoreWithHeaders(String)
      * @see #getSessionStore(String)
+     * @see #getSessionStoreWithHeaders(String)
      */
     @SuppressWarnings("unchecked")
-    public <K, V> VersionedKeyValueStore<K, V> getVersionedKeyValueStore(final 
String name) {
+    public <K, V> KeyValueStore<K, ValueTimestampHeaders<V>> 
getTimestampedKeyValueStoreWithHeaders(final String name) {
         final StateStore store = getStateStore(name, false);
-        return store instanceof VersionedKeyValueStore ? 
(VersionedKeyValueStore<K, V>) store : null;
+        return store instanceof TimestampedKeyValueStoreWithHeaders ? 
(TimestampedKeyValueStoreWithHeaders<K, V>) store : null;
     }
 
     /**
-     * Get the {@link TimestampedKeyValueStoreWithHeaders} with the given name.
+     * Get the {@link VersionedKeyValueStore} with the given name.
      * The store can be a "regular" or global store.
      * <p>
      * This is often useful in test cases to pre-populate the store before the 
test case instructs the topology to
      * {@link TestInputTopic#pipeInput(TestRecord) process an input message}, 
and/or to check the store afterward.
      *
      * @param name the name of the store
-     * @return the key value store, or {@code null} if no {@link 
TimestampedKeyValueStoreWithHeaders} has been registered with the given name
+     * @return the key value store, or {@code null} if no {@link 
VersionedKeyValueStore} has been registered with the given name
      * @see #getAllStateStores()
      * @see #getStateStore(String)
      * @see #getKeyValueStore(String)
      * @see #getTimestampedKeyValueStore(String)
-     * @see #getVersionedKeyValueStore(String)
+     * @see #getTimestampedKeyValueStoreWithHeaders(String)
      * @see #getWindowStore(String)
      * @see #getTimestampedWindowStore(String)
      * @see #getTimestampedWindowStoreWithHeaders(String)
      * @see #getSessionStore(String)
+     * @see #getSessionStoreWithHeaders(String)
      */
     @SuppressWarnings("unchecked")
-    public <K, V> KeyValueStore<K, ValueTimestampHeaders<V>> 
getTimestampedKeyValueStoreWithHeaders(final String name) {
+    public <K, V> VersionedKeyValueStore<K, V> getVersionedKeyValueStore(final 
String name) {
         final StateStore store = getStateStore(name, false);
-        return store instanceof TimestampedKeyValueStoreWithHeaders ? 
(TimestampedKeyValueStoreWithHeaders<K, V>) store : null;
+        return store instanceof VersionedKeyValueStore ? 
(VersionedKeyValueStore<K, V>) store : null;
     }
 
     /**
@@ -1123,11 +1126,12 @@ public class TopologyTestDriver implements Closeable {
      * @see #getStateStore(String)
      * @see #getKeyValueStore(String)
      * @see #getTimestampedKeyValueStore(String)
-     * @see #getVersionedKeyValueStore(String)
      * @see #getTimestampedKeyValueStoreWithHeaders(String)
+     * @see #getVersionedKeyValueStore(String)
      * @see #getTimestampedWindowStore(String)
      * @see #getTimestampedWindowStoreWithHeaders(String)
      * @see #getSessionStore(String)
+     * @see #getSessionStoreWithHeaders(String)
      */
     @SuppressWarnings("unchecked")
     public <K, V> WindowStore<K, V> getWindowStore(final String name) {
@@ -1156,10 +1160,12 @@ public class TopologyTestDriver implements Closeable {
      * @see #getStateStore(String)
      * @see #getKeyValueStore(String)
      * @see #getTimestampedKeyValueStore(String)
-     * @see #getVersionedKeyValueStore(String)
      * @see #getTimestampedKeyValueStoreWithHeaders(String)
+     * @see #getVersionedKeyValueStore(String)
      * @see #getWindowStore(String)
+     * @see #getTimestampedWindowStoreWithHeaders(String)
      * @see #getSessionStore(String)
+     * @see #getSessionStoreWithHeaders(String)
      */
     @SuppressWarnings("unchecked")
     public <K, V> WindowStore<K, ValueAndTimestamp<V>> 
getTimestampedWindowStore(final String name) {
@@ -1183,11 +1189,12 @@ public class TopologyTestDriver implements Closeable {
      * @see #getStateStore(String)
      * @see #getKeyValueStore(String)
      * @see #getTimestampedKeyValueStore(String)
-     * @see #getVersionedKeyValueStore(String)
      * @see #getTimestampedKeyValueStoreWithHeaders(String)
+     * @see #getVersionedKeyValueStore(String)
      * @see #getWindowStore(String)
      * @see #getTimestampedWindowStore(String)
      * @see #getSessionStore(String)
+     * @see #getSessionStoreWithHeaders(String)
      */
     @SuppressWarnings("unchecked")
     public <K, V> WindowStore<K, ValueTimestampHeaders<V>> 
getTimestampedWindowStoreWithHeaders(final String name) {
@@ -1208,11 +1215,12 @@ public class TopologyTestDriver implements Closeable {
      * @see #getStateStore(String)
      * @see #getKeyValueStore(String)
      * @see #getTimestampedKeyValueStore(String)
+     * @see #getTimestampedKeyValueStoreWithHeaders(String)
      * @see #getVersionedKeyValueStore(String)
      * @see #getWindowStore(String)
      * @see #getTimestampedWindowStore(String)
-     * @see #getTimestampedKeyValueStoreWithHeaders(String)
      * @see #getTimestampedWindowStoreWithHeaders(String)
+     * @see #getSessionStoreWithHeaders(String)
      */
     @SuppressWarnings("unchecked")
     public <K, V> SessionStore<K, V> getSessionStore(final String name) {
@@ -1223,6 +1231,32 @@ public class TopologyTestDriver implements Closeable {
         return store instanceof SessionStore ? (SessionStore<K, V>) store : 
null;
     }
 
+    /**
+     * Get the {@link SessionStore} with the given name.
+     * The store can be a "regular" or global store.
+     * <p>
+     * This is often useful in test cases to pre-populate the store before the 
test case instructs the topology to
+     * {@link TestInputTopic#pipeInput(TestRecord) process an input message}, 
and/or to check the store afterward.
+     *
+     * @param name the name of the store
+     * @return the key value store, or {@code null} if no {@link SessionStore} 
has been registered with the given name
+     * @see #getAllStateStores()
+     * @see #getStateStore(String)
+     * @see #getKeyValueStore(String)
+     * @see #getTimestampedKeyValueStore(String)
+     * @see #getTimestampedKeyValueStoreWithHeaders(String)
+     * @see #getVersionedKeyValueStore(String)
+     * @see #getWindowStore(String)
+     * @see #getTimestampedWindowStore(String)
+     * @see #getTimestampedWindowStoreWithHeaders(String)
+     * @see #getSessionStore(String)
+     */
+    @SuppressWarnings("unchecked")
+    public <K, V> SessionStoreWithHeaders<K, V> 
getSessionStoreWithHeaders(final String name) {
+        final StateStore store = getStateStore(name, false);
+        return store instanceof SessionStoreWithHeaders ? 
(SessionStoreWithHeaders<K, V>) store : null;
+    }
+
     /**
      * Close the driver, its topology, and all processors.
      */
@@ -1251,21 +1285,6 @@ public class TopologyTestDriver implements Closeable {
         stateDirectory.clean();
     }
 
-    static class MockChangelogRegister implements ChangelogRegister {
-        @Override
-        public void register(final TopicPartition partition, final 
ProcessorStateManager stateManager) { }
-
-        @Override
-        public void register(final Set<TopicPartition> changelogPartitions, 
final ProcessorStateManager stateManager) { }
-
-        @Override
-        public void unregister(final Collection<TopicPartition> partitions) { }
-
-        @Override
-        public void unregister(final Collection<TopicPartition> partitions,
-                               final SuspendReason reason) { }
-    }
-
     static class MockTime implements Time {
         private final AtomicLong timeMs;
         private final AtomicLong highResTimeNs;
diff --git 
a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
 
b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
index 2dc0089990e..7555c078330 100644
--- 
a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
+++ 
b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
@@ -24,6 +24,10 @@ import 
org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.DoubleDeserializer;
+import org.apache.kafka.common.serialization.DoubleSerializer;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.LongDeserializer;
 import org.apache.kafka.common.serialization.LongSerializer;
 import org.apache.kafka.common.serialization.Serdes;
@@ -386,7 +390,6 @@ public abstract class TopologyTestDriverTest {
                 () -> new Processor<Object, Object, Void, Void>() {
                     KeyValueStore<Object, Object> store;
 
-                    @SuppressWarnings("unchecked")
                     @Override
                     public void init(final ProcessorContext<Void, Void> 
context) {
                         store = context.getStateStore(sourceTopicName + 
"-globalStore");
@@ -602,6 +605,7 @@ public abstract class TopologyTestDriverTest {
         assertThat(record, equalTo(expectedResult));
     }
 
+    @SuppressWarnings("resource")
     @Test
     public void shouldUseSourceSpecificDeserializers() {
         final Topology topology = new Topology();
@@ -610,23 +614,23 @@ public abstract class TopologyTestDriverTest {
         final String sourceName2 = "source-2";
         final String processor = "processor";
 
-        topology.addSource(sourceName1, Serdes.Long().deserializer(), 
Serdes.String().deserializer(), SOURCE_TOPIC_1);
-        topology.addSource(sourceName2, Serdes.Integer().deserializer(), 
Serdes.Double().deserializer(), SOURCE_TOPIC_2);
+        topology.addSource(sourceName1, new LongDeserializer(), new 
StringDeserializer(), SOURCE_TOPIC_1);
+        topology.addSource(sourceName2, new IntegerDeserializer(), new 
DoubleDeserializer(), SOURCE_TOPIC_2);
         topology.addProcessor(processor, new MockProcessorSupplier(), 
sourceName1, sourceName2);
         topology.addSink(
             "sink",
             SINK_TOPIC_1,
             (topic, data) -> {
                 if (data instanceof Long) {
-                    return Serdes.Long().serializer().serialize(topic, (Long) 
data);
+                    return new LongSerializer().serialize(topic, (Long) data);
                 }
-                return Serdes.Integer().serializer().serialize(topic, 
(Integer) data);
+                return new IntegerSerializer().serialize(topic, (Integer) 
data);
             },
             (topic, data) -> {
                 if (data instanceof String) {
-                    return Serdes.String().serializer().serialize(topic, 
(String) data);
+                    return new StringSerializer().serialize(topic, (String) 
data);
                 }
-                return Serdes.Double().serializer().serialize(topic, (Double) 
data);
+                return new DoubleSerializer().serialize(topic, (Double) data);
             },
             processor);
 
@@ -642,21 +646,21 @@ public abstract class TopologyTestDriverTest {
 
         testDriver.pipeRecord(SOURCE_TOPIC_1,
                 consumerRecord1,
-                Serdes.Long().serializer(),
-                Serdes.String().serializer(),
+                new LongSerializer(),
+                new StringSerializer(),
                 Instant.now());
         final TestRecord<Long, String> result1 =
-            testDriver.readRecord(SINK_TOPIC_1, Serdes.Long().deserializer(), 
Serdes.String().deserializer());
+            testDriver.readRecord(SINK_TOPIC_1, new LongDeserializer(), new 
StringDeserializer());
         assertThat(result1.getKey(), equalTo(source1Key));
         assertThat(result1.getValue(), equalTo(source1Value));
 
         testDriver.pipeRecord(SOURCE_TOPIC_2,
                 consumerRecord2,
-                Serdes.Integer().serializer(),
-                Serdes.Double().serializer(),
+                new IntegerSerializer(),
+                new DoubleSerializer(),
                 Instant.now());
         final TestRecord<Integer, Double> result2 =
-            testDriver.readRecord(SINK_TOPIC_1, 
Serdes.Integer().deserializer(), Serdes.Double().deserializer());
+            testDriver.readRecord(SINK_TOPIC_1, new IntegerDeserializer(), new 
DoubleDeserializer());
         assertThat(result2.getKey(), equalTo(source2Key));
         assertThat(result2.getValue(), equalTo(source2Value));
     }
@@ -718,10 +722,10 @@ public abstract class TopologyTestDriverTest {
         final String sourceName1 = "source-1";
         final String sourceName2 = "source-2";
 
-        topology.addSource(sourceName1, Serdes.Long().deserializer(), 
Serdes.String().deserializer(), SOURCE_TOPIC_1);
-        topology.addSource(sourceName2, Serdes.Integer().deserializer(), 
Serdes.Double().deserializer(), SOURCE_TOPIC_2);
-        topology.addSink("sink-1", SINK_TOPIC_1, Serdes.Long().serializer(), 
Serdes.String().serializer(), sourceName1);
-        topology.addSink("sink-2", SINK_TOPIC_2, 
Serdes.Integer().serializer(), Serdes.Double().serializer(), sourceName2);
+        topology.addSource(sourceName1, new LongDeserializer(), new 
StringDeserializer(), SOURCE_TOPIC_1);
+        topology.addSource(sourceName2, new IntegerDeserializer(), new 
DoubleDeserializer(), SOURCE_TOPIC_2);
+        topology.addSink("sink-1", SINK_TOPIC_1, new LongSerializer(), new 
StringSerializer(), sourceName1);
+        topology.addSink("sink-2", SINK_TOPIC_2, new IntegerSerializer(), new 
DoubleSerializer(), sourceName2);
 
         testDriver = new TopologyTestDriver(topology);
 
@@ -735,21 +739,21 @@ public abstract class TopologyTestDriverTest {
 
         testDriver.pipeRecord(SOURCE_TOPIC_1,
                 consumerRecord1,
-                Serdes.Long().serializer(),
-                Serdes.String().serializer(),
+                new LongSerializer(),
+                new StringSerializer(),
                 Instant.now());
         final TestRecord<Long, String> result1 =
-                testDriver.readRecord(SINK_TOPIC_1, 
Serdes.Long().deserializer(), Serdes.String().deserializer());
+                testDriver.readRecord(SINK_TOPIC_1, new LongDeserializer(), 
new StringDeserializer());
         assertThat(result1.getKey(), equalTo(source1Key));
         assertThat(result1.getValue(), equalTo(source1Value));
 
         testDriver.pipeRecord(SOURCE_TOPIC_2,
                 consumerRecord2,
-                Serdes.Integer().serializer(),
-                Serdes.Double().serializer(),
+                new IntegerSerializer(),
+                new DoubleSerializer(),
                 Instant.now());
         final TestRecord<Integer, Double> result2 =
-                testDriver.readRecord(SINK_TOPIC_2, 
Serdes.Integer().deserializer(), Serdes.Double().deserializer());
+                testDriver.readRecord(SINK_TOPIC_2, new IntegerDeserializer(), 
new DoubleDeserializer());
         assertThat(result2.getKey(), equalTo(source2Key));
         assertThat(result2.getValue(), equalTo(source2Value));
     }
@@ -880,8 +884,8 @@ public abstract class TopologyTestDriverTest {
                 Serdes.ByteArray(),
                 Time.SYSTEM).withLoggingDisabled(),
             "sourceProcessorName",
-            Serdes.ByteArray().deserializer(),
-            Serdes.ByteArray().deserializer(),
+            new ByteArrayDeserializer(),
+            new ByteArrayDeserializer(),
             "globalTopicName",
             "globalProcessorName",
             voidProcessorSupplier);
@@ -911,12 +915,16 @@ public abstract class TopologyTestDriverTest {
     private void shouldReturnCorrectStoreTypeOnly(final boolean persistent) {
         final String keyValueStoreName = "keyValueStore";
         final String timestampedKeyValueStoreName = "keyValueTimestampStore";
+        final String timestampedKeyValueStoreWithHeaderName = 
"keyValueTimestampStoreWithHeaders";
         final String versionedKeyValueStoreName = "keyValueVersionedStore";
         final String windowStoreName = "windowStore";
         final String timestampedWindowStoreName = "windowTimestampStore";
+        final String timestampedWindowStoreWithHeadersName = 
"windowTimestampStoreWithHeaders";
         final String sessionStoreName = "sessionStore";
+        final String sessionStoreWithHeadersName = "sessionStoreWithHeaders";
         final String globalKeyValueStoreName = "globalKeyValueStore";
         final String globalTimestampedKeyValueStoreName = 
"globalKeyValueTimestampStore";
+        final String globalTimestampedKeyValueStoreWithHeadersName = 
"globalKeyValueTimestampStoreWithHeaders";
         final String globalVersionedKeyValueStoreName = 
"globalKeyValueVersionedStore";
 
         final Topology topology = setupSingleProcessorTopology();
@@ -925,12 +933,16 @@ public abstract class TopologyTestDriverTest {
             persistent,
             keyValueStoreName,
             timestampedKeyValueStoreName,
+            timestampedKeyValueStoreWithHeaderName,
             versionedKeyValueStoreName,
             windowStoreName,
             timestampedWindowStoreName,
+            timestampedWindowStoreWithHeadersName,
             sessionStoreName,
+            sessionStoreWithHeadersName,
             globalKeyValueStoreName,
             globalTimestampedKeyValueStoreName,
+            globalTimestampedKeyValueStoreWithHeadersName,
             globalVersionedKeyValueStoreName);
 
 
@@ -939,70 +951,137 @@ public abstract class TopologyTestDriverTest {
         // verify state stores
         assertNotNull(testDriver.getKeyValueStore(keyValueStoreName));
         assertNull(testDriver.getTimestampedKeyValueStore(keyValueStoreName));
+        
assertNull(testDriver.getTimestampedKeyValueStoreWithHeaders(keyValueStoreName));
         assertNull(testDriver.getVersionedKeyValueStore(keyValueStoreName));
         assertNull(testDriver.getWindowStore(keyValueStoreName));
         assertNull(testDriver.getTimestampedWindowStore(keyValueStoreName));
+        
assertNull(testDriver.getTimestampedWindowStoreWithHeaders(keyValueStoreName));
         assertNull(testDriver.getSessionStore(keyValueStoreName));
+        assertNull(testDriver.getSessionStoreWithHeaders(keyValueStoreName));
 
         
assertNotNull(testDriver.getKeyValueStore(timestampedKeyValueStoreName));
         
assertNotNull(testDriver.getTimestampedKeyValueStore(timestampedKeyValueStoreName));
+        
assertNull(testDriver.getTimestampedKeyValueStoreWithHeaders(timestampedKeyValueStoreName));
         
assertNull(testDriver.getVersionedKeyValueStore(timestampedKeyValueStoreName));
         assertNull(testDriver.getWindowStore(timestampedKeyValueStoreName));
         
assertNull(testDriver.getTimestampedWindowStore(timestampedKeyValueStoreName));
+        
assertNull(testDriver.getTimestampedWindowStoreWithHeaders(timestampedKeyValueStoreName));
         assertNull(testDriver.getSessionStore(timestampedKeyValueStoreName));
+        
assertNull(testDriver.getSessionStoreWithHeaders(timestampedKeyValueStoreName));
 
         if (persistent) { // versioned stores do not offer an in-memory 
version yet, so nothing to test/verify unless persistent
             
assertNull(testDriver.getKeyValueStore(versionedKeyValueStoreName));
             
assertNull(testDriver.getTimestampedKeyValueStore(versionedKeyValueStoreName));
+            
assertNull(testDriver.getTimestampedKeyValueStoreWithHeaders(versionedKeyValueStoreName));
             
assertNotNull(testDriver.getVersionedKeyValueStore(versionedKeyValueStoreName));
             assertNull(testDriver.getWindowStore(versionedKeyValueStoreName));
             
assertNull(testDriver.getTimestampedWindowStore(versionedKeyValueStoreName));
+            
assertNull(testDriver.getTimestampedWindowStoreWithHeaders(versionedKeyValueStoreName));
             assertNull(testDriver.getSessionStore(versionedKeyValueStoreName));
+            
assertNull(testDriver.getSessionStoreWithHeaders(versionedKeyValueStoreName));
         }
 
+        
assertNotNull(testDriver.getKeyValueStore(timestampedKeyValueStoreWithHeaderName));
+        
assertNotNull(testDriver.getTimestampedKeyValueStore(timestampedKeyValueStoreWithHeaderName));
+        
assertNotNull(testDriver.getTimestampedKeyValueStoreWithHeaders(timestampedKeyValueStoreWithHeaderName));
+        
assertNull(testDriver.getVersionedKeyValueStore(timestampedKeyValueStoreWithHeaderName));
+        
assertNull(testDriver.getWindowStore(timestampedKeyValueStoreWithHeaderName));
+        
assertNull(testDriver.getTimestampedWindowStore(timestampedKeyValueStoreWithHeaderName));
+        
assertNull(testDriver.getTimestampedWindowStoreWithHeaders(timestampedKeyValueStoreWithHeaderName));
+        
assertNull(testDriver.getSessionStore(timestampedKeyValueStoreWithHeaderName));
+        
assertNull(testDriver.getSessionStoreWithHeaders(timestampedKeyValueStoreWithHeaderName));
+
         assertNull(testDriver.getKeyValueStore(windowStoreName));
         assertNull(testDriver.getTimestampedKeyValueStore(windowStoreName));
+        
assertNull(testDriver.getTimestampedKeyValueStoreWithHeaders(windowStoreName));
         assertNull(testDriver.getVersionedKeyValueStore(windowStoreName));
         assertNotNull(testDriver.getWindowStore(windowStoreName));
         assertNull(testDriver.getTimestampedWindowStore(windowStoreName));
+        
assertNull(testDriver.getTimestampedWindowStoreWithHeaders(windowStoreName));
         assertNull(testDriver.getSessionStore(windowStoreName));
+        assertNull(testDriver.getSessionStoreWithHeaders(windowStoreName));
 
         assertNull(testDriver.getKeyValueStore(timestampedWindowStoreName));
         
assertNull(testDriver.getTimestampedKeyValueStore(timestampedWindowStoreName));
+        
assertNull(testDriver.getTimestampedKeyValueStoreWithHeaders(timestampedWindowStoreName));
         
assertNull(testDriver.getVersionedKeyValueStore(timestampedWindowStoreName));
         assertNotNull(testDriver.getWindowStore(timestampedWindowStoreName));
         
assertNotNull(testDriver.getTimestampedWindowStore(timestampedWindowStoreName));
+        
assertNull(testDriver.getTimestampedWindowStoreWithHeaders(timestampedWindowStoreName));
         assertNull(testDriver.getSessionStore(timestampedWindowStoreName));
+        
assertNull(testDriver.getSessionStoreWithHeaders(timestampedWindowStoreName));
+
+        
assertNull(testDriver.getKeyValueStore(timestampedWindowStoreWithHeadersName));
+        
assertNull(testDriver.getTimestampedKeyValueStore(timestampedWindowStoreWithHeadersName));
+        
assertNull(testDriver.getTimestampedKeyValueStoreWithHeaders(timestampedWindowStoreWithHeadersName));
+        
assertNull(testDriver.getVersionedKeyValueStore(timestampedWindowStoreWithHeadersName));
+        
assertNotNull(testDriver.getWindowStore(timestampedWindowStoreWithHeadersName));
+        
assertNotNull(testDriver.getTimestampedWindowStore(timestampedWindowStoreWithHeadersName));
+        
assertNotNull(testDriver.getTimestampedWindowStoreWithHeaders(timestampedWindowStoreWithHeadersName));
+        
assertNull(testDriver.getSessionStore(timestampedWindowStoreWithHeadersName));
+        
assertNull(testDriver.getSessionStoreWithHeaders(timestampedWindowStoreWithHeadersName));
 
         assertNull(testDriver.getKeyValueStore(sessionStoreName));
         assertNull(testDriver.getTimestampedKeyValueStore(sessionStoreName));
+        
assertNull(testDriver.getTimestampedKeyValueStoreWithHeaders(sessionStoreName));
         assertNull(testDriver.getVersionedKeyValueStore(sessionStoreName));
         assertNull(testDriver.getWindowStore(sessionStoreName));
         assertNull(testDriver.getTimestampedWindowStore(sessionStoreName));
+        
assertNull(testDriver.getTimestampedWindowStoreWithHeaders(sessionStoreName));
         assertNotNull(testDriver.getSessionStore(sessionStoreName));
+        assertNull(testDriver.getSessionStoreWithHeaders(sessionStoreName));
+
+        assertNull(testDriver.getKeyValueStore(sessionStoreWithHeadersName));
+        
assertNull(testDriver.getTimestampedKeyValueStore(sessionStoreWithHeadersName));
+        
assertNull(testDriver.getTimestampedKeyValueStoreWithHeaders(sessionStoreWithHeadersName));
+        
assertNull(testDriver.getVersionedKeyValueStore(sessionStoreWithHeadersName));
+        assertNull(testDriver.getWindowStore(sessionStoreWithHeadersName));
+        
assertNull(testDriver.getTimestampedWindowStore(sessionStoreWithHeadersName));
+        
assertNull(testDriver.getTimestampedWindowStoreWithHeaders(sessionStoreWithHeadersName));
+        assertNotNull(testDriver.getSessionStore(sessionStoreWithHeadersName));
+        
assertNotNull(testDriver.getSessionStoreWithHeaders(sessionStoreWithHeadersName));
 
         // verify global stores
         assertNotNull(testDriver.getKeyValueStore(globalKeyValueStoreName));
         
assertNull(testDriver.getTimestampedKeyValueStore(globalKeyValueStoreName));
+        
assertNull(testDriver.getTimestampedKeyValueStoreWithHeaders(globalKeyValueStoreName));
         
assertNull(testDriver.getVersionedKeyValueStore(globalKeyValueStoreName));
         assertNull(testDriver.getWindowStore(globalKeyValueStoreName));
         
assertNull(testDriver.getTimestampedWindowStore(globalKeyValueStoreName));
+        
assertNull(testDriver.getTimestampedWindowStoreWithHeaders(globalKeyValueStoreName));
         assertNull(testDriver.getSessionStore(globalKeyValueStoreName));
+        
assertNull(testDriver.getSessionStoreWithHeaders(globalKeyValueStoreName));
 
         
assertNotNull(testDriver.getKeyValueStore(globalTimestampedKeyValueStoreName));
         
assertNotNull(testDriver.getTimestampedKeyValueStore(globalTimestampedKeyValueStoreName));
+        
assertNull(testDriver.getTimestampedKeyValueStoreWithHeaders(globalTimestampedKeyValueStoreName));
         
assertNull(testDriver.getVersionedKeyValueStore(globalTimestampedKeyValueStoreName));
         
assertNull(testDriver.getWindowStore(globalTimestampedKeyValueStoreName));
         
assertNull(testDriver.getTimestampedWindowStore(globalTimestampedKeyValueStoreName));
+        
assertNull(testDriver.getTimestampedWindowStoreWithHeaders(globalTimestampedKeyValueStoreName));
         
assertNull(testDriver.getSessionStore(globalTimestampedKeyValueStoreName));
+        
assertNull(testDriver.getSessionStoreWithHeaders(globalTimestampedKeyValueStoreName));
+
+        
assertNotNull(testDriver.getKeyValueStore(globalTimestampedKeyValueStoreWithHeadersName));
+        
assertNotNull(testDriver.getTimestampedKeyValueStore(globalTimestampedKeyValueStoreWithHeadersName));
+        
assertNotNull(testDriver.getTimestampedKeyValueStoreWithHeaders(globalTimestampedKeyValueStoreWithHeadersName));
+        
assertNull(testDriver.getVersionedKeyValueStore(globalTimestampedKeyValueStoreWithHeadersName));
+        
assertNull(testDriver.getWindowStore(globalTimestampedKeyValueStoreWithHeadersName));
+        
assertNull(testDriver.getTimestampedWindowStore(globalTimestampedKeyValueStoreWithHeadersName));
+        
assertNull(testDriver.getTimestampedWindowStoreWithHeaders(globalTimestampedKeyValueStoreWithHeadersName));
+        
assertNull(testDriver.getSessionStore(globalTimestampedKeyValueStoreWithHeadersName));
+        
assertNull(testDriver.getSessionStoreWithHeaders(globalTimestampedKeyValueStoreWithHeadersName));
 
         if (persistent) { // versioned stores do not offer an in-memory 
version yet, so nothing to test/verify unless persistent
             
assertNull(testDriver.getKeyValueStore(globalVersionedKeyValueStoreName));
             
assertNull(testDriver.getTimestampedKeyValueStore(globalVersionedKeyValueStoreName));
+            
assertNull(testDriver.getTimestampedKeyValueStoreWithHeaders(globalVersionedKeyValueStoreName));
             
assertNotNull(testDriver.getVersionedKeyValueStore(globalVersionedKeyValueStoreName));
             
assertNull(testDriver.getWindowStore(globalVersionedKeyValueStoreName));
             
assertNull(testDriver.getTimestampedWindowStore(globalVersionedKeyValueStoreName));
+            
assertNull(testDriver.getTimestampedWindowStoreWithHeaders(globalVersionedKeyValueStoreName));
             
assertNull(testDriver.getSessionStore(globalVersionedKeyValueStoreName));
+            
assertNull(testDriver.getSessionStoreWithHeaders(globalVersionedKeyValueStoreName));
         }
     }
 
@@ -1019,12 +1098,16 @@ public abstract class TopologyTestDriverTest {
     private void shouldThrowIfBuiltInStoreIsAccessedWithUntypedMethod(final 
boolean persistent) {
         final String keyValueStoreName = "keyValueStore";
         final String timestampedKeyValueStoreName = "keyValueTimestampStore";
+        final String timestampedKeyValueStoreWithHeadersName = 
"keyValueTimestampStoreWithHeaders";
         final String versionedKeyValueStoreName = "keyValueVersionedStore";
         final String windowStoreName = "windowStore";
         final String timestampedWindowStoreName = "windowTimestampStore";
+        final String timestampedWindowStoreWithHeadersName = 
"windowTimestampStoreWithHeaders";
         final String sessionStoreName = "sessionStore";
+        final String sessionStoreWithHeadersName = "sessionStoreWithHeaders";
         final String globalKeyValueStoreName = "globalKeyValueStore";
         final String globalTimestampedKeyValueStoreName = 
"globalKeyValueTimestampStore";
+        final String globalTimestampedKeyValueStoreWithHeadersName = 
"globalKeyValueTimestampStoreWithHeaders";
         final String globalVersionedKeyValueStoreName = 
"globalKeyValueVersionedStore";
 
         final Topology topology = setupSingleProcessorTopology();
@@ -1033,12 +1116,16 @@ public abstract class TopologyTestDriverTest {
             persistent,
             keyValueStoreName,
             timestampedKeyValueStoreName,
+            timestampedKeyValueStoreWithHeadersName,
             versionedKeyValueStoreName,
             windowStoreName,
             timestampedWindowStoreName,
+            timestampedWindowStoreWithHeadersName,
             sessionStoreName,
+            sessionStoreWithHeadersName,
             globalKeyValueStoreName,
             globalTimestampedKeyValueStoreName,
+            globalTimestampedKeyValueStoreWithHeadersName,
             globalVersionedKeyValueStoreName);
 
 
@@ -1127,6 +1214,7 @@ public abstract class TopologyTestDriverTest {
         }
     }
 
+    // CAUTION: Do not replace with Lambda; Needs to return a new Processor 
instance each time
     final ProcessorSupplier<byte[], byte[], Void, Void> voidProcessorSupplier 
= () -> new Processor<byte[], byte[], Void, Void>() {
         @Override
         public void process(final Record<byte[], byte[]> record) {
@@ -1137,29 +1225,42 @@ public abstract class TopologyTestDriverTest {
                                      final boolean persistent,
                                      final String keyValueStoreName,
                                      final String timestampedKeyValueStoreName,
+                                     final String 
timestampedKeyValueStoreWithHeadersName,
                                      final String versionedKeyValueStoreName,
                                      final String windowStoreName,
                                      final String timestampedWindowStoreName,
+                                     final String 
timestampedWindowStoreWithHeadersName,
                                      final String sessionStoreName,
+                                     final String sessionStoreWithHeadersName,
                                      final String globalKeyValueStoreName,
                                      final String 
globalTimestampedKeyValueStoreName,
+                                     final String 
globalTimestampedKeyValueStoreWithHeadersName,
                                      final String 
globalVersionedKeyValueStoreName) {
 
         // add state stores
         topology.addStateStore(
             Stores.keyValueStoreBuilder(
-                persistent ?
-                    Stores.persistentKeyValueStore(keyValueStoreName) :
-                    Stores.inMemoryKeyValueStore(keyValueStoreName),
+                persistent
+                    ? Stores.persistentKeyValueStore(keyValueStoreName)
+                    : Stores.inMemoryKeyValueStore(keyValueStoreName),
                 Serdes.ByteArray(),
                 Serdes.ByteArray()
             ),
             "processor");
         topology.addStateStore(
             Stores.timestampedKeyValueStoreBuilder(
-                persistent ?
-                    
Stores.persistentTimestampedKeyValueStore(timestampedKeyValueStoreName) :
-                    Stores.inMemoryKeyValueStore(timestampedKeyValueStoreName),
+                persistent
+                    ? 
Stores.persistentTimestampedKeyValueStore(timestampedKeyValueStoreName)
+                    : 
Stores.inMemoryKeyValueStore(timestampedKeyValueStoreName),
+                Serdes.ByteArray(),
+                Serdes.ByteArray()
+            ),
+            "processor");
+        topology.addStateStore(
+            Stores.timestampedKeyValueStoreWithHeadersBuilder(
+                persistent
+                    ? 
Stores.persistentTimestampedKeyValueStoreWithHeaders(timestampedKeyValueStoreWithHeadersName)
+                    : 
Stores.inMemoryKeyValueStore(timestampedKeyValueStoreWithHeadersName),
                 Serdes.ByteArray(),
                 Serdes.ByteArray()
             ),
@@ -1175,70 +1276,92 @@ public abstract class TopologyTestDriverTest {
         }
         topology.addStateStore(
             Stores.windowStoreBuilder(
-                persistent ?
-                    Stores.persistentWindowStore(windowStoreName, 
Duration.ofMillis(1000L), Duration.ofMillis(100L), false) :
-                    Stores.inMemoryWindowStore(windowStoreName, 
Duration.ofMillis(1000L), Duration.ofMillis(100L), false),
+                persistent
+                    ? Stores.persistentWindowStore(windowStoreName, 
Duration.ofMillis(1000L), Duration.ofMillis(100L), false)
+                    : Stores.inMemoryWindowStore(windowStoreName, 
Duration.ofMillis(1000L), Duration.ofMillis(100L), false),
                 Serdes.ByteArray(),
                 Serdes.ByteArray()
             ),
             "processor");
         topology.addStateStore(
             Stores.timestampedWindowStoreBuilder(
-                persistent ?
-                    
Stores.persistentTimestampedWindowStore(timestampedWindowStoreName, 
Duration.ofMillis(1000L), Duration.ofMillis(100L), false) :
-                    Stores.inMemoryWindowStore(timestampedWindowStoreName, 
Duration.ofMillis(1000L), Duration.ofMillis(100L), false),
+                persistent
+                    ? 
Stores.persistentTimestampedWindowStore(timestampedWindowStoreName, 
Duration.ofMillis(1000L), Duration.ofMillis(100L), false)
+                    : Stores.inMemoryWindowStore(timestampedWindowStoreName, 
Duration.ofMillis(1000L), Duration.ofMillis(100L), false),
                 Serdes.ByteArray(),
                 Serdes.ByteArray()
             ),
             "processor");
         topology.addStateStore(
-            persistent ?
-                Stores.sessionStoreBuilder(
-                    Stores.persistentSessionStore(sessionStoreName, 
Duration.ofMillis(1000L)),
-                    Serdes.ByteArray(),
-                    Serdes.ByteArray()) :
-                Stores.sessionStoreBuilder(
-                    Stores.inMemorySessionStore(sessionStoreName, 
Duration.ofMillis(1000L)),
-                    Serdes.ByteArray(),
-                    Serdes.ByteArray()),
+            Stores.timestampedWindowStoreWithHeadersBuilder(
+                persistent
+                    ? 
Stores.persistentTimestampedWindowStoreWithHeaders(timestampedWindowStoreWithHeadersName,
 Duration.ofMillis(1000L), Duration.ofMillis(100L), false)
+                    : 
Stores.inMemoryWindowStore(timestampedWindowStoreWithHeadersName, 
Duration.ofMillis(1000L), Duration.ofMillis(100L), false),
+                Serdes.ByteArray(),
+                Serdes.ByteArray()
+            ),
+            "processor");
+        topology.addStateStore(
+            Stores.sessionStoreBuilder(
+                persistent
+                    ? Stores.persistentSessionStore(sessionStoreName, 
Duration.ofMillis(1000L))
+                    : Stores.inMemorySessionStore(sessionStoreName, 
Duration.ofMillis(1000L)),
+                Serdes.ByteArray(),
+                Serdes.ByteArray()
+            ),
+            "processor");
+        topology.addStateStore(
+            Stores.sessionStoreWithHeadersBuilder(
+                persistent
+                    ? 
Stores.persistentSessionStoreWithHeaders(sessionStoreWithHeadersName, 
Duration.ofMillis(1000L))
+                    : Stores.inMemorySessionStore(sessionStoreWithHeadersName, 
Duration.ofMillis(1000L)),
+                Serdes.ByteArray(),
+                Serdes.ByteArray()
+            ),
             "processor");
         // add global stores
         topology.addGlobalStore(
-            persistent ?
-                Stores.keyValueStoreBuilder(
-                    Stores.persistentKeyValueStore(globalKeyValueStoreName),
-                    Serdes.ByteArray(),
-                    Serdes.ByteArray()
-                ).withLoggingDisabled() :
-                Stores.keyValueStoreBuilder(
-                    Stores.inMemoryKeyValueStore(globalKeyValueStoreName),
-                    Serdes.ByteArray(),
-                    Serdes.ByteArray()
-                ).withLoggingDisabled(),
+            Stores.keyValueStoreBuilder(
+                persistent
+                    ? Stores.persistentKeyValueStore(globalKeyValueStoreName)
+                    : Stores.inMemoryKeyValueStore(globalKeyValueStoreName),
+                Serdes.ByteArray(),
+                Serdes.ByteArray()
+            ).withLoggingDisabled(),
             "sourceDummy1",
-            Serdes.ByteArray().deserializer(),
-            Serdes.ByteArray().deserializer(),
+            new ByteArrayDeserializer(),
+            new ByteArrayDeserializer(),
             "topicDummy1",
             "processorDummy1",
             voidProcessorSupplier);
         topology.addGlobalStore(
-            persistent ?
-                Stores.timestampedKeyValueStoreBuilder(
-                    
Stores.persistentTimestampedKeyValueStore(globalTimestampedKeyValueStoreName),
-                    Serdes.ByteArray(),
-                    Serdes.ByteArray()
-                ).withLoggingDisabled() :
-                Stores.timestampedKeyValueStoreBuilder(
-                    
Stores.inMemoryKeyValueStore(globalTimestampedKeyValueStoreName),
-                    Serdes.ByteArray(),
-                    Serdes.ByteArray()
-                ).withLoggingDisabled(),
+            Stores.timestampedKeyValueStoreBuilder(
+                persistent
+                    ? 
Stores.persistentTimestampedKeyValueStore(globalTimestampedKeyValueStoreName)
+                    : 
Stores.inMemoryKeyValueStore(globalTimestampedKeyValueStoreName),
+                Serdes.ByteArray(),
+                Serdes.ByteArray()
+            ).withLoggingDisabled(),
             "sourceDummy2",
-            Serdes.ByteArray().deserializer(),
-            Serdes.ByteArray().deserializer(),
+            new ByteArrayDeserializer(),
+            new ByteArrayDeserializer(),
             "topicDummy2",
             "processorDummy2",
             voidProcessorSupplier);
+        topology.addGlobalStore(
+            Stores.timestampedKeyValueStoreWithHeadersBuilder(
+                persistent
+                    ? 
Stores.persistentTimestampedKeyValueStoreWithHeaders(globalTimestampedKeyValueStoreWithHeadersName)
+                    : 
Stores.inMemoryKeyValueStore(globalTimestampedKeyValueStoreWithHeadersName),
+                Serdes.ByteArray(),
+                Serdes.ByteArray()
+            ).withLoggingDisabled(),
+            "sourceDummy3",
+            new ByteArrayDeserializer(),
+            new ByteArrayDeserializer(),
+            "topicDummy3",
+            "processorDummy3",
+            voidProcessorSupplier);
         if (persistent) { // versioned stores do not offer an in-memory 
version yet
             topology.addGlobalStore(
                 Stores.versionedKeyValueStoreBuilder(
@@ -1246,11 +1369,11 @@ public abstract class TopologyTestDriverTest {
                     Serdes.ByteArray(),
                     Serdes.ByteArray()
                 ).withLoggingDisabled(),
-                "sourceDummy3",
-                Serdes.ByteArray().deserializer(),
-                Serdes.ByteArray().deserializer(),
-                "topicDummy3",
-                "processorDummy3",
+                "sourceDummy4",
+                new ByteArrayDeserializer(),
+                new ByteArrayDeserializer(),
+                "topicDummy4",
+                "processorDummy4",
                 voidProcessorSupplier);
         }
     }
@@ -1271,8 +1394,8 @@ public abstract class TopologyTestDriverTest {
                 Serdes.ByteArray(),
                 Time.SYSTEM).withLoggingDisabled(),
             "sourceProcessorName",
-            Serdes.ByteArray().deserializer(),
-            Serdes.ByteArray().deserializer(),
+            new ByteArrayDeserializer(),
+            new ByteArrayDeserializer(),
             "globalTopicName",
             "globalProcessorName",
             voidProcessorSupplier);
@@ -1300,8 +1423,8 @@ public abstract class TopologyTestDriverTest {
                 "aggregator");
         topology.addSink("sinkProcessor", "result-topic", "aggregator");
 
-        config.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.String().getClass().getName());
-        config.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.Long().getClass().getName());
+        config.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.StringSerde.class.getName());
+        config.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.LongSerde.class.getName());
         testDriver = new TopologyTestDriver(topology, config);
 
         store = testDriver.getKeyValueStore("aggStore");
@@ -1439,7 +1562,7 @@ public abstract class TopologyTestDriverTest {
             new ProcessorSupplier<String, Long, Void, Void>() {
                 @Override
                 public Processor<String, Long, Void, Void> get() {
-                    return new Processor<String, Long, Void, Void>() {
+                    return new Processor<>() {
                         private KeyValueStore<String, Long> store;
 
                         @Override
@@ -1463,8 +1586,8 @@ public abstract class TopologyTestDriverTest {
         final Properties config = new Properties();
         config.put(StreamsConfig.APPLICATION_ID_CONFIG, 
"test-TopologyTestDriver-cleanup");
         config.put(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory().getAbsolutePath());
-        config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.String().getClass().getName());
-        config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.Long().getClass().getName());
+        config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.StringSerde.class.getName());
+        config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.LongSerde.class.getName());
 
         try (final TopologyTestDriver testDriver = new 
TopologyTestDriver(topology, config)) {
             
assertNull(testDriver.getKeyValueStore("storeProcessorStore").get("a"));
@@ -1674,7 +1797,7 @@ public abstract class TopologyTestDriverTest {
             new StringDeserializer(),
             "global-topic",
             "globalProcessor",
-            () -> new Processor<String, String, Void, Void>() {
+            () -> new Processor<>() {
                 private KeyValueStore<String, String> stateStore;
 
                 @Override

Reply via email to