This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 53629a28ac6 KAFKA-20307: expose header-stores natively in IQv1 (#21992)
53629a28ac6 is described below

commit 53629a28ac6b7d13d2096a0c0b753af736919ff5
Author: Matthias J. Sax <[email protected]>
AuthorDate: Wed Apr 8 09:29:00 2026 -0700

    KAFKA-20307: expose header-stores natively in IQv1 (#21992)
    
    We previously only added support to query headers stores as plain- or
    ts-kv-stores. This PR exposes headers-store with their native type.
    
    Reviewers: Bill Bejeck <[email protected]>
---
 .../HeadersStoreUpgradeIntegrationTest.java        | 158 +++++--------------
 .../kafka/streams/state/QueryableStoreTypes.java   | 115 ++++++++++++--
 .../internals/StreamThreadStateStoreProvider.java  |  10 +-
 .../internals/GlobalStateStoreProviderTest.java    | 163 +++++++++++++++++---
 .../StreamThreadStateStoreProviderTest.java        | 168 ++++++++++++++++++++-
 5 files changed, 442 insertions(+), 172 deletions(-)

diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HeadersStoreUpgradeIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HeadersStoreUpgradeIntegrationTest.java
index 5ba3f95f47d..bd5d5b87a56 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HeadersStoreUpgradeIntegrationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HeadersStoreUpgradeIntegrationTest.java
@@ -37,7 +37,6 @@ import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.state.AggregationWithHeaders;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.streams.state.QueryableStoreType;
 import org.apache.kafka.streams.state.QueryableStoreTypes;
 import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
 import org.apache.kafka.streams.state.ReadOnlySessionStore;
@@ -52,9 +51,6 @@ import 
org.apache.kafka.streams.state.TimestampedWindowStoreWithHeaders;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
 import org.apache.kafka.streams.state.ValueTimestampHeaders;
 import org.apache.kafka.streams.state.WindowStore;
-import org.apache.kafka.streams.state.internals.CompositeReadOnlyKeyValueStore;
-import org.apache.kafka.streams.state.internals.CompositeReadOnlyWindowStore;
-import org.apache.kafka.streams.state.internals.StateStoreProvider;
 import org.apache.kafka.test.TestUtils;
 
 import org.junit.jupiter.api.AfterAll;
@@ -72,7 +68,6 @@ import java.time.Duration;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Properties;
-import java.util.concurrent.atomic.AtomicReference;
 
 import static java.util.Arrays.asList;
 import static java.util.Collections.singletonList;
@@ -119,8 +114,8 @@ public class HeadersStoreUpgradeIntegrationTest {
         streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
         
streamsConfiguration.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
         streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory().getPath());
-        streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.String().getClass());
-        
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.String().getClass());
+        streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.StringSerde.class);
+        
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.StringSerde.class);
         streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 
1000L);
         streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
         return streamsConfiguration;
@@ -483,7 +478,7 @@ public class HeadersStoreUpgradeIntegrationTest {
             () -> {
                 try {
                     final ReadOnlyKeyValueStore<K, ValueTimestampHeaders<V>> 
store = IntegrationTestUtils
-                        .getStore(STORE_NAME, kafkaStreams, new 
TimestampedKeyValueStoreWithHeadersType<>());
+                        .getStore(STORE_NAME, kafkaStreams, 
QueryableStoreTypes.timestampedKeyValueStoreWithHeaders());
 
                     if (store == null)
                         return false;
@@ -524,7 +519,7 @@ public class HeadersStoreUpgradeIntegrationTest {
             () -> {
                 try {
                     final ReadOnlyKeyValueStore<K, ValueTimestampHeaders<V>> 
store = IntegrationTestUtils
-                        .getStore(STORE_NAME, kafkaStreams, new 
TimestampedKeyValueStoreWithHeadersType<>());
+                        .getStore(STORE_NAME, kafkaStreams, 
QueryableStoreTypes.timestampedKeyValueStoreWithHeaders());
 
                     if (store == null)
                         return false;
@@ -550,7 +545,7 @@ public class HeadersStoreUpgradeIntegrationTest {
             () -> {
                 try {
                     final ReadOnlyKeyValueStore<K, ValueTimestampHeaders<V>> 
store = IntegrationTestUtils
-                        .getStore(STORE_NAME, kafkaStreams, new 
TimestampedKeyValueStoreWithHeadersType<>());
+                        .getStore(STORE_NAME, kafkaStreams, 
QueryableStoreTypes.timestampedKeyValueStoreWithHeaders());
 
                     if (store == null)
                         return false;
@@ -886,7 +881,7 @@ public class HeadersStoreUpgradeIntegrationTest {
         TestUtils.waitForCondition(() -> {
             try {
                 final ReadOnlyWindowStore<String, 
ValueTimestampHeaders<String>> store =
-                    IntegrationTestUtils.getStore(WINDOW_STORE_NAME, 
kafkaStreams, new TimestampedWindowStoreWithHeadersType<>());
+                    IntegrationTestUtils.getStore(WINDOW_STORE_NAME, 
kafkaStreams, QueryableStoreTypes.timestampedWindowStoreWithHeaders());
 
                 if (store == null) {
                     return false;
@@ -943,7 +938,7 @@ public class HeadersStoreUpgradeIntegrationTest {
         TestUtils.waitForCondition(() -> {
             try {
                 final ReadOnlyWindowStore<String, 
ValueTimestampHeaders<String>> store =
-                    IntegrationTestUtils.getStore(WINDOW_STORE_NAME, 
kafkaStreams, new TimestampedWindowStoreWithHeadersType<>());
+                    IntegrationTestUtils.getStore(WINDOW_STORE_NAME, 
kafkaStreams, QueryableStoreTypes.timestampedWindowStoreWithHeaders());
 
                 if (store == null) {
                     return false;
@@ -1029,7 +1024,7 @@ public class HeadersStoreUpgradeIntegrationTest {
         TestUtils.waitForCondition(() -> {
             try {
                 final ReadOnlyWindowStore<String, 
ValueTimestampHeaders<String>> store =
-                    IntegrationTestUtils.getStore(WINDOW_STORE_NAME, 
kafkaStreams, new TimestampedWindowStoreWithHeadersType<>());
+                    IntegrationTestUtils.getStore(WINDOW_STORE_NAME, 
kafkaStreams, QueryableStoreTypes.timestampedWindowStoreWithHeaders());
 
                 if (store == null) {
                     return false;
@@ -1069,7 +1064,7 @@ public class HeadersStoreUpgradeIntegrationTest {
         TestUtils.waitForCondition(() -> {
             try {
                 final ReadOnlyWindowStore<String, 
ValueTimestampHeaders<String>> store =
-                    IntegrationTestUtils.getStore(WINDOW_STORE_NAME, 
kafkaStreams, new TimestampedWindowStoreWithHeadersType<>());
+                    IntegrationTestUtils.getStore(WINDOW_STORE_NAME, 
kafkaStreams, QueryableStoreTypes.timestampedWindowStoreWithHeaders());
 
                 if (store == null) {
                     return false;
@@ -1473,7 +1468,7 @@ public class HeadersStoreUpgradeIntegrationTest {
     private boolean windowStoreContainsKey(final String key, final long 
timestamp) {
         try {
             final ReadOnlyWindowStore<String, ValueTimestampHeaders<String>> 
store =
-                IntegrationTestUtils.getStore(WINDOW_STORE_NAME, kafkaStreams, 
new TimestampedWindowStoreWithHeadersType<>());
+                IntegrationTestUtils.getStore(WINDOW_STORE_NAME, kafkaStreams, 
QueryableStoreTypes.timestampedWindowStoreWithHeaders());
 
             if (store == null) {
                 return false;
@@ -1498,9 +1493,8 @@ public class HeadersStoreUpgradeIntegrationTest {
      * Setup and populate a window store with headers.
      * @param props Streams properties
      * @param records List of (key, timestampOffset) tuples. Values will be 
generated as "value{N}"
-     * @return base time used for record timestamps
      */
-    private long setupAndPopulateWindowStoreWithHeaders(final Properties props,
+    private void setupAndPopulateWindowStoreWithHeaders(final Properties props,
                                                         final 
List<KeyValue<String, Long>> records) throws Exception {
         final long baseTime = setupWindowStoreWithHeaders(props);
 
@@ -1525,7 +1519,6 @@ public class HeadersStoreUpgradeIntegrationTest {
         );
 
         kafkaStreams.close();
-        return baseTime;
     }
 
     private long setupWindowStoreWithHeaders(final Properties props) throws 
Exception {
@@ -1547,7 +1540,7 @@ public class HeadersStoreUpgradeIntegrationTest {
         return CLUSTER.time.milliseconds();
     }
 
-    private void produceRecordWithHeaders(final String key, final String 
value, final long timestamp) throws Exception {
+    private void produceRecordWithHeaders(final String key, final String 
value, final long timestamp) {
         final Headers headers = new RecordHeaders();
         headers.add("source", "test".getBytes());
 
@@ -1620,7 +1613,6 @@ public class HeadersStoreUpgradeIntegrationTest {
 
         // Phase 2: Restart with SessionStoreWithHeaders (headers-aware 
supplier)
         final StreamsBuilder newBuilder = new StreamsBuilder();
-        final AtomicReference<SessionWithHeadersProcessor> processorRef = new 
AtomicReference<>();
         newBuilder.addStateStore(
                 Stores.sessionStoreWithHeadersBuilder(
                     isPersistent ? 
Stores.persistentSessionStoreWithHeaders(SESSION_STORE_NAME, 
Duration.ofMillis(RETENTION_MS)) :
@@ -1628,26 +1620,22 @@ public class HeadersStoreUpgradeIntegrationTest {
                     Serdes.String(),
                     Serdes.String()))
             .stream(inputStream, Consumed.with(Serdes.String(), 
Serdes.String()))
-            .process(() -> {
-                final SessionWithHeadersProcessor sessionStore = new 
SessionWithHeadersProcessor();
-                processorRef.set(sessionStore);
-                return sessionStore;
-            }, SESSION_STORE_NAME);
+            .process(SessionWithHeadersProcessor::new, SESSION_STORE_NAME);
 
         kafkaStreams = new KafkaStreams(newBuilder.build(), props);
         IntegrationTestUtils.startApplicationAndWaitUntilRunning(kafkaStreams);
 
         // Verify legacy data can be read with empty headers
-        verifySessionValueWithEmptyHeaders("key1", "value1", baseTime + 100, 
processorRef);
-        verifySessionValueWithEmptyHeaders("key2", "value2", baseTime + 200, 
processorRef);
-        verifySessionValueWithEmptyHeaders("key3", "value3", baseTime + 300, 
processorRef);
+        verifySessionValueWithEmptyHeaders("key1", "value1", baseTime + 100);
+        verifySessionValueWithEmptyHeaders("key2", "value2", baseTime + 200);
+        verifySessionValueWithEmptyHeaders("key3", "value3", baseTime + 300);
 
         // Process new records with headers
         final Headers headers = new RecordHeaders();
         headers.add("source", "migration-test".getBytes());
 
-        processSessionKeyValueWithHeadersAndVerify("key4", "value4", baseTime 
+ 400, headers, headers, processorRef);
-        processSessionKeyValueWithHeadersAndVerify("key5", "value5", baseTime 
+ 500, headers, headers, processorRef);
+        processSessionKeyValueWithHeadersAndVerify("key4", "value4", baseTime 
+ 400, headers, headers);
+        processSessionKeyValueWithHeadersAndVerify("key5", "value5", baseTime 
+ 500, headers, headers);
 
         kafkaStreams.close();
     }
@@ -1678,26 +1666,21 @@ public class HeadersStoreUpgradeIntegrationTest {
 
         // Phase 2: Restart with headers-aware builder but non-headers 
supplier (proxy/adapter mode)
         final StreamsBuilder newBuilder = new StreamsBuilder();
-        final AtomicReference<SessionWithHeadersProcessor> processorRef = new 
AtomicReference<>();
         newBuilder.addStateStore(
                 Stores.sessionStoreWithHeadersBuilder(
                     Stores.persistentSessionStore(SESSION_STORE_NAME, 
Duration.ofMillis(RETENTION_MS)),  // non-headers supplier!
                     Serdes.String(),
                     Serdes.String()))
             .stream(inputStream, Consumed.with(Serdes.String(), 
Serdes.String()))
-            .process(() -> {
-                final SessionWithHeadersProcessor p = new 
SessionWithHeadersProcessor();
-                processorRef.set(p);
-                return p;
-            }, SESSION_STORE_NAME);
+            .process(SessionWithHeadersProcessor::new, SESSION_STORE_NAME);
 
         kafkaStreams = new KafkaStreams(newBuilder.build(), props);
         IntegrationTestUtils.startApplicationAndWaitUntilRunning(kafkaStreams);
 
         // Verify legacy data can be read with empty headers
-        verifySessionValueWithEmptyHeaders("key1", "value1", baseTime + 100, 
processorRef);
-        verifySessionValueWithEmptyHeaders("key2", "value2", baseTime + 200, 
processorRef);
-        verifySessionValueWithEmptyHeaders("key3", "value3", baseTime + 300, 
processorRef);
+        verifySessionValueWithEmptyHeaders("key1", "value1", baseTime + 100);
+        verifySessionValueWithEmptyHeaders("key2", "value2", baseTime + 200);
+        verifySessionValueWithEmptyHeaders("key3", "value3", baseTime + 300);
 
         // In proxy mode, headers are stripped when writing to non-headers 
store
         // So we expect empty headers when reading back
@@ -1705,8 +1688,8 @@ public class HeadersStoreUpgradeIntegrationTest {
         headers.add("source", "proxy-test".getBytes());
         final Headers expectedHeaders = new RecordHeaders();
 
-        processSessionKeyValueWithHeadersAndVerify("key4", "value4", baseTime 
+ 400, headers, expectedHeaders, processorRef);
-        processSessionKeyValueWithHeadersAndVerify("key5", "value5", baseTime 
+ 500, headers, expectedHeaders, processorRef);
+        processSessionKeyValueWithHeadersAndVerify("key4", "value4", baseTime 
+ 400, headers, expectedHeaders);
+        processSessionKeyValueWithHeadersAndVerify("key5", "value5", baseTime 
+ 500, headers, expectedHeaders);
 
         kafkaStreams.close();
     }
@@ -1823,14 +1806,12 @@ public class HeadersStoreUpgradeIntegrationTest {
 
     private void verifySessionValueWithEmptyHeaders(final String key,
                                                     final String value,
-                                                    final long timestamp,
-                                                    final 
AtomicReference<SessionWithHeadersProcessor> processorRef) throws Exception {
+                                                    final long timestamp) 
throws Exception {
         TestUtils.waitForCondition(() -> {
             try {
-                if (processorRef.get() == null) {
-                    return false;
-                }
-                final ReadOnlySessionStore<String, 
AggregationWithHeaders<String>> store = processorRef.get().store();
+                final ReadOnlySessionStore<String, 
AggregationWithHeaders<String>> store =
+                    IntegrationTestUtils.getStore(SESSION_STORE_NAME, 
kafkaStreams, QueryableStoreTypes.sessionStoreWithHeaders());
+
                 if (store == null) {
                     return false;
                 }
@@ -1866,8 +1847,7 @@ public class HeadersStoreUpgradeIntegrationTest {
                                                             final String value,
                                                             final long 
timestamp,
                                                             final Headers 
headers,
-                                                            final Headers 
expectedHeaders,
-                                                            final 
AtomicReference<SessionWithHeadersProcessor> processorRef) throws Exception {
+                                                            final Headers 
expectedHeaders) throws Exception {
         IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
             inputStream,
             singletonList(KeyValue.pair(key, value)),
@@ -1880,10 +1860,8 @@ public class HeadersStoreUpgradeIntegrationTest {
 
         TestUtils.waitForCondition(() -> {
             try {
-                if (processorRef.get() == null) {
-                    return false;
-                }
-                final ReadOnlySessionStore<String, 
AggregationWithHeaders<String>> store = processorRef.get().store();
+                final ReadOnlySessionStore<String, 
AggregationWithHeaders<String>> store =
+                    IntegrationTestUtils.getStore(SESSION_STORE_NAME, 
kafkaStreams, QueryableStoreTypes.sessionStoreWithHeaders());
 
                 if (store == null) {
                     return false;
@@ -1912,13 +1890,11 @@ public class HeadersStoreUpgradeIntegrationTest {
     }
 
     private boolean sessionStoreContainsKey(final String key,
-                                            final long timestamp,
-                                            final 
AtomicReference<SessionWithHeadersProcessor> processorRef) {
+                                            final long timestamp) {
         try {
-            if (processorRef.get() == null) {
-                return false;
-            }
-            final SessionStoreWithHeaders<String, String> store = 
processorRef.get().store();
+            final ReadOnlySessionStore<String, AggregationWithHeaders<String>> 
store =
+                IntegrationTestUtils.getStore(SESSION_STORE_NAME, 
kafkaStreams, QueryableStoreTypes.sessionStoreWithHeaders());
+
             if (store == null) {
                 return false;
             }
@@ -1939,18 +1915,13 @@ public class HeadersStoreUpgradeIntegrationTest {
 
     private void setupAndPopulateSessionStoreWithHeaders(final Properties 
props) throws Exception {
         final StreamsBuilder headersBuilder = new StreamsBuilder();
-        final AtomicReference<SessionWithHeadersProcessor> processorRef = new 
AtomicReference<>();
         headersBuilder.addStateStore(
                 Stores.sessionStoreWithHeadersBuilder(
                     
Stores.persistentSessionStoreWithHeaders(SESSION_STORE_NAME, 
Duration.ofMillis(RETENTION_MS)),
                     Serdes.String(),
                     Serdes.String()))
             .stream(inputStream, Consumed.with(Serdes.String(), 
Serdes.String()))
-            .process(() -> {
-                final SessionWithHeadersProcessor p = new 
SessionWithHeadersProcessor();
-                processorRef.set(p);
-                return p;
-            }, SESSION_STORE_NAME);
+            .process(SessionWithHeadersProcessor::new, SESSION_STORE_NAME);
 
         kafkaStreams = new KafkaStreams(headersBuilder.build(), props);
         IntegrationTestUtils.startApplicationAndWaitUntilRunning(kafkaStreams);
@@ -1968,7 +1939,7 @@ public class HeadersStoreUpgradeIntegrationTest {
             false);
 
         TestUtils.waitForCondition(
-            () -> sessionStoreContainsKey("key1", baseTime + 100, 
processorRef),
+            () -> sessionStoreContainsKey("key1", baseTime + 100),
             30_000L,
             "Store was not populated with expected data"
         );
@@ -1981,10 +1952,6 @@ public class HeadersStoreUpgradeIntegrationTest {
     private static class SessionProcessor implements Processor<String, String, 
Void, Void> {
         private SessionStore<String, String> store;
 
-        public SessionStore<String, String> store() {
-            return store;
-        }
-
         @Override
         public void init(final ProcessorContext<Void, Void> context) {
             store = context.getStateStore(SESSION_STORE_NAME);
@@ -2001,10 +1968,6 @@ public class HeadersStoreUpgradeIntegrationTest {
     private static class SessionWithHeadersProcessor implements 
Processor<String, String, Void, Void> {
         private SessionStoreWithHeaders<String, String> store;
 
-        public SessionStoreWithHeaders<String, String> store() {
-            return store;
-        }
-
         @Override
         public void init(final ProcessorContext<Void, Void> context) {
             store = context.getStateStore(SESSION_STORE_NAME);
@@ -2018,51 +1981,4 @@ public class HeadersStoreUpgradeIntegrationTest {
         }
     }
 
-    // ==================== Custom QueryableStoreTypes ====================
-
-    /**
-     * Custom QueryableStoreType for querying 
TimestampedKeyValueStoreWithHeaders directly
-     * without facade wrapping. This returns the full ValueTimestampHeaders 
wrapper.
-     */
-    private static class TimestampedKeyValueStoreWithHeadersType<K, V>
-        implements QueryableStoreType<ReadOnlyKeyValueStore<K, 
ValueTimestampHeaders<V>>> {
-
-        @Override
-        public boolean accepts(final 
org.apache.kafka.streams.processor.StateStore stateStore) {
-            // Accept stores that implement both 
TimestampedKeyValueStoreWithHeaders and ReadOnlyKeyValueStore
-            return stateStore instanceof TimestampedKeyValueStoreWithHeaders
-                && stateStore instanceof ReadOnlyKeyValueStore;
-        }
-
-        @Override
-        public ReadOnlyKeyValueStore<K, ValueTimestampHeaders<V>> create(
-            final StateStoreProvider storeProvider,
-            final String storeName) {
-            return new CompositeReadOnlyKeyValueStore<>(storeProvider, this, 
storeName);
-        }
-    }
-
-    /**
-     * Custom queryable store type for accessing 
TimestampedWindowStoreWithHeaders directly
-     * without facade wrapping. This returns the full ValueTimestampHeaders 
wrapper.
-     */
-    private static class TimestampedWindowStoreWithHeadersType<K, V>
-        implements QueryableStoreType<ReadOnlyWindowStore<K, 
ValueTimestampHeaders<V>>> {
-
-        @Override
-        public boolean accepts(final 
org.apache.kafka.streams.processor.StateStore stateStore) {
-            // Accept stores that implement both 
TimestampedWindowStoreWithHeaders and ReadOnlyWindowStore
-            return stateStore instanceof TimestampedWindowStoreWithHeaders
-                && stateStore instanceof ReadOnlyWindowStore;
-        }
-
-        @Override
-        public ReadOnlyWindowStore<K, ValueTimestampHeaders<V>> create(
-            final StateStoreProvider storeProvider,
-            final String storeName) {
-            return new CompositeReadOnlyWindowStore<>(storeProvider, this, 
storeName);
-        }
-    }
-
-
 }
\ No newline at end of file
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreTypes.java 
b/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreTypes.java
index 026df05c515..1f6f1b8890b 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreTypes.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreTypes.java
@@ -57,6 +57,17 @@ public final class QueryableStoreTypes {
         return new TimestampedKeyValueStoreType<>();
     }
 
+    /**
+     * A {@link QueryableStoreType} that accepts {@link ReadOnlyKeyValueStore 
ReadOnlyKeyValueStore&lt;K, ValueTimestampHeaders&lt;V&gt;&gt;}.
+     *
+     * @param <K> key type of the store
+     * @param <V> value type of the store
+     * @return {@link QueryableStoreTypes.TimestampedKeyValueStoreType}
+     */
+    public static <K, V> QueryableStoreType<ReadOnlyKeyValueStore<K, 
ValueTimestampHeaders<V>>> timestampedKeyValueStoreWithHeaders() {
+        return new TimestampedKeyValueStoreWithHeadersType<>();
+    }
+
     /**
      * A {@link QueryableStoreType} that accepts {@link ReadOnlyWindowStore}.
      *
@@ -79,6 +90,17 @@ public final class QueryableStoreTypes {
         return new TimestampedWindowStoreType<>();
     }
 
+    /**
+     * A {@link QueryableStoreType} that accepts {@link ReadOnlyWindowStore 
ReadOnlyWindowStore&lt;K, ValueTimestampeHeaders&lt;V&gt;&gt;}.
+     *
+     * @param <K> key type of the store
+     * @param <V> value type of the store
+     * @return {@link 
QueryableStoreTypes.TimestampedWindowStoreWithHeadersType}
+     */
+    public static <K, V> QueryableStoreType<ReadOnlyWindowStore<K, 
ValueTimestampHeaders<V>>> timestampedWindowStoreWithHeaders() {
+        return new TimestampedWindowStoreWithHeadersType<>();
+    }
+
     /**
      * A {@link QueryableStoreType} that accepts {@link ReadOnlySessionStore}.
      *
@@ -90,6 +112,17 @@ public final class QueryableStoreTypes {
         return new SessionStoreType<>();
     }
 
+    /**
+     * A {@link QueryableStoreType} that accepts {@link ReadOnlySessionStore 
ReadOnlySessionStoree&lt;K, AggregationWithHeaders&lt;V&gt;&gt;}.
+     *
+     * @param <K> key type of the store
+     * @param <V> value type of the store
+     * @return {@link QueryableStoreTypes.SessionStoreWithHeadersType}
+     */
+    public static <K, V> QueryableStoreType<ReadOnlySessionStore<K, 
AggregationWithHeaders<V>>> sessionStoreWithHeaders() {
+        return new SessionStoreWithHeadersType<>();
+    }
+
     private abstract static class QueryableStoreTypeMatcher<T> implements 
QueryableStoreType<T> {
 
         private final Set<Class<?>> matchTo;
@@ -116,8 +149,10 @@ public final class QueryableStoreTypes {
         }
 
         @Override
-        public ReadOnlyKeyValueStore<K, V> create(final StateStoreProvider 
storeProvider,
-                                                  final String storeName) {
+        public ReadOnlyKeyValueStore<K, V> create(
+            final StateStoreProvider storeProvider,
+            final String storeName
+        ) {
             return new CompositeReadOnlyKeyValueStore<>(storeProvider, this, 
storeName);
         }
 
@@ -127,8 +162,7 @@ public final class QueryableStoreTypes {
         extends QueryableStoreTypeMatcher<ReadOnlyKeyValueStore<K, 
ValueAndTimestamp<V>>> {
 
         TimestampedKeyValueStoreType() {
-            super(Set.of(
-                ReadOnlyKeyValueStore.class));
+            super(Set.of(ReadOnlyKeyValueStore.class));
         }
 
         @Override
@@ -138,8 +172,26 @@ public final class QueryableStoreTypes {
         }
 
         @Override
-        public ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> create(final 
StateStoreProvider storeProvider,
-                                                                     final 
String storeName) {
+        public ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> create(
+            final StateStoreProvider storeProvider,
+            final String storeName
+        ) {
+            return new CompositeReadOnlyKeyValueStore<>(storeProvider, this, 
storeName);
+        }
+    }
+
+    private static class TimestampedKeyValueStoreWithHeadersType<K, V>
+        extends QueryableStoreTypeMatcher<ReadOnlyKeyValueStore<K, 
ValueTimestampHeaders<V>>> {
+
+        TimestampedKeyValueStoreWithHeadersType() {
+            super(Set.of(ReadOnlyKeyValueStore.class, 
TimestampedKeyValueStoreWithHeaders.class));
+        }
+
+        @Override
+        public ReadOnlyKeyValueStore<K, ValueTimestampHeaders<V>> create(
+            final StateStoreProvider storeProvider,
+            final String storeName
+        ) {
             return new CompositeReadOnlyKeyValueStore<>(storeProvider, this, 
storeName);
         }
     }
@@ -151,8 +203,10 @@ public final class QueryableStoreTypes {
         }
 
         @Override
-        public ReadOnlyWindowStore<K, V> create(final StateStoreProvider 
storeProvider,
-                                                final String storeName) {
+        public ReadOnlyWindowStore<K, V> create(
+            final StateStoreProvider storeProvider,
+            final String storeName
+        ) {
             return new CompositeReadOnlyWindowStore<>(storeProvider, this, 
storeName);
         }
     }
@@ -161,8 +215,7 @@ public final class QueryableStoreTypes {
         extends QueryableStoreTypeMatcher<ReadOnlyWindowStore<K, 
ValueAndTimestamp<V>>> {
 
         TimestampedWindowStoreType() {
-            super(Set.of(
-                ReadOnlyWindowStore.class));
+            super(Set.of(ReadOnlyWindowStore.class));
         }
 
         @Override
@@ -172,8 +225,26 @@ public final class QueryableStoreTypes {
         }
 
         @Override
-        public ReadOnlyWindowStore<K, ValueAndTimestamp<V>> create(final 
StateStoreProvider storeProvider,
-                                                                   final 
String storeName) {
+        public ReadOnlyWindowStore<K, ValueAndTimestamp<V>> create(
+            final StateStoreProvider storeProvider,
+            final String storeName
+        ) {
+            return new CompositeReadOnlyWindowStore<>(storeProvider, this, 
storeName);
+        }
+    }
+
+    private static class TimestampedWindowStoreWithHeadersType<K, V>
+        extends QueryableStoreTypeMatcher<ReadOnlyWindowStore<K, 
ValueTimestampHeaders<V>>> {
+
+        TimestampedWindowStoreWithHeadersType() {
+            super(Set.of(ReadOnlyWindowStore.class, 
TimestampedWindowStoreWithHeaders.class));
+        }
+
+        @Override
+        public ReadOnlyWindowStore<K, ValueTimestampHeaders<V>> create(
+            final StateStoreProvider storeProvider,
+            final String storeName
+        ) {
             return new CompositeReadOnlyWindowStore<>(storeProvider, this, 
storeName);
         }
     }
@@ -185,10 +256,26 @@ public final class QueryableStoreTypes {
         }
 
         @Override
-        public ReadOnlySessionStore<K, V> create(final StateStoreProvider 
storeProvider,
-                                                 final String storeName) {
+        public ReadOnlySessionStore<K, V> create(
+            final StateStoreProvider storeProvider,
+            final String storeName
+        ) {
             return new CompositeReadOnlySessionStore<>(storeProvider, this, 
storeName);
         }
     }
 
+    private static class SessionStoreWithHeadersType<K, V> extends 
QueryableStoreTypeMatcher<ReadOnlySessionStore<K, AggregationWithHeaders<V>>> {
+
+        SessionStoreWithHeadersType() {
+            super(Set.of(ReadOnlySessionStore.class, 
SessionStoreWithHeaders.class));
+        }
+
+        @Override
+        public ReadOnlySessionStore<K, AggregationWithHeaders<V>> create(
+            final StateStoreProvider storeProvider,
+            final String storeName
+        ) {
+            return new CompositeReadOnlySessionStore<>(storeProvider, this, 
storeName);
+        }
+    }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java
index a429ea0143a..f9dd0250563 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java
@@ -111,9 +111,6 @@ public class StreamThreadStateStoreProvider {
                     return (T) new 
GenericReadOnlyKeyValueStoreFacade<>((TimestampedKeyValueStoreWithHeaders<Object,
 Object>) store, ValueConverters.extractValueFromHeaders());
                 } else if (queryableStoreType instanceof 
QueryableStoreTypes.TimestampedKeyValueStoreType) {
                     return (T) new 
GenericReadOnlyKeyValueStoreFacade<>((TimestampedKeyValueStoreWithHeaders<Object,
 Object>) store, ValueConverters.extractValueAndTimestampFromHeaders());
-                } else {
-                    // For custom query types, return the raw store so they 
can access headers directly
-                    return (T) store;
                 }
             } else if (store instanceof TimestampedKeyValueStore && 
queryableStoreType instanceof QueryableStoreTypes.KeyValueStoreType) {
                 return (T) new 
GenericReadOnlyKeyValueStoreFacade<>((TimestampedKeyValueStore<Object, Object>) 
store, ValueConverters.extractValue());
@@ -122,17 +119,14 @@ public class StreamThreadStateStoreProvider {
                     return (T) new 
GenericReadOnlyWindowStoreFacade<>((TimestampedWindowStoreWithHeaders<Object, 
Object>) store, ValueConverters.extractValueFromHeaders());
                 } else if (queryableStoreType instanceof 
QueryableStoreTypes.TimestampedWindowStoreType) {
                     return (T) new 
GenericReadOnlyWindowStoreFacade<>((TimestampedWindowStoreWithHeaders<Object, 
Object>) store, ValueConverters.extractValueAndTimestampFromHeaders());
-                } else {
-                    // For custom query types, return the raw store so they 
can access headers directly
-                    return (T) store;
                 }
             } else if (store instanceof TimestampedWindowStore && 
queryableStoreType instanceof QueryableStoreTypes.WindowStoreType) {
                 return (T) new 
GenericReadOnlyWindowStoreFacade<>((TimestampedWindowStore<Object, Object>) 
store, ValueConverters.extractValue());
             } else if (store instanceof SessionStoreWithHeaders && 
queryableStoreType instanceof QueryableStoreTypes.SessionStoreType) {
                 return (T) new 
ReadOnlySessionStoreFacade<>((SessionStoreWithHeaders<Object, Object>) store);
-            } else {
-                return (T) store;
             }
+
+            return (T) store;
         } else {
             throw new InvalidStateStoreException(
                 "Cannot get state store " + storeName +
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProviderTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProviderTest.java
index f96f8e96d99..7d8ef6dacee 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProviderTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProviderTest.java
@@ -25,16 +25,19 @@ import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.AggregationWithHeaders;
 import org.apache.kafka.streams.state.QueryableStoreTypes;
 import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
 import org.apache.kafka.streams.state.ReadOnlySessionStore;
 import org.apache.kafka.streams.state.ReadOnlyWindowStore;
+import org.apache.kafka.streams.state.SessionStoreWithHeaders;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.state.TimestampedKeyValueStore;
 import org.apache.kafka.streams.state.TimestampedKeyValueStoreWithHeaders;
 import org.apache.kafka.streams.state.TimestampedWindowStore;
 import org.apache.kafka.streams.state.TimestampedWindowStoreWithHeaders;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.ValueTimestampHeaders;
 import org.apache.kafka.test.NoOpReadOnlyStore;
 
 import org.junit.jupiter.api.BeforeEach;
@@ -125,6 +128,14 @@ public class GlobalStateStoreProviderTest {
                     false),
                 Serdes.String(),
                 Serdes.String()).build());
+        stores.put(
+            "s-store-with-headers",
+            Stores.sessionStoreWithHeadersBuilder(
+                Stores.inMemorySessionStore(
+                    "s-store-with-headers",
+                    Duration.ofMillis(10L)),
+                Serdes.String(),
+                Serdes.String()).build());
 
         final InternalProcessorContext<?, ?> mockContext = 
mock(InternalProcessorContext.class);
         when(mockContext.applicationId()).thenReturn("appId");
@@ -175,6 +186,7 @@ public class GlobalStateStoreProviderTest {
         for (final ReadOnlyKeyValueStore<String, String> store : stores) {
             assertThat(store, instanceOf(ReadOnlyKeyValueStore.class));
             assertThat(store, not(instanceOf(TimestampedKeyValueStore.class)));
+            assertThat(store, 
not(instanceOf(TimestampedKeyValueStoreWithHeaders.class)));
         }
     }
 
@@ -187,11 +199,12 @@ public class GlobalStateStoreProviderTest {
         for (final ReadOnlyKeyValueStore<String, ValueAndTimestamp<String>> 
store : stores) {
             assertThat(store, instanceOf(ReadOnlyKeyValueStore.class));
             assertThat(store, instanceOf(TimestampedKeyValueStore.class));
+            assertThat(store, 
not(instanceOf(TimestampedKeyValueStoreWithHeaders.class)));
         }
     }
 
     @Test
-    public void shouldNotReturnKeyValueStoreAsTimestampedStore() {
+    public void shouldNotReturnKeyValueStoreAsTimestampedKeyValueStore() {
         final GlobalStateStoreProvider provider = new 
GlobalStateStoreProvider(stores);
         final List<ReadOnlyKeyValueStore<String, ValueAndTimestamp<String>>> 
stores =
             provider.stores("kv-store", 
QueryableStoreTypes.timestampedKeyValueStore());
@@ -201,12 +214,13 @@ public class GlobalStateStoreProviderTest {
     @Test
     public void shouldReturnTimestampedKeyValueStoreAsKeyValueStore() {
         final GlobalStateStoreProvider provider = new 
GlobalStateStoreProvider(stores);
-        final List<ReadOnlyKeyValueStore<String, ValueAndTimestamp<String>>> 
stores =
+        final List<ReadOnlyKeyValueStore<String, String>> stores =
             provider.stores("ts-kv-store", 
QueryableStoreTypes.keyValueStore());
         assertEquals(1, stores.size());
-        for (final ReadOnlyKeyValueStore<String, ValueAndTimestamp<String>> 
store : stores) {
+        for (final ReadOnlyKeyValueStore<String, String> store : stores) {
             assertThat(store, instanceOf(ReadOnlyKeyValueStore.class));
             assertThat(store, not(instanceOf(TimestampedKeyValueStore.class)));
+            assertThat(store, 
not(instanceOf(TimestampedKeyValueStoreWithHeaders.class)));
         }
     }
 
@@ -219,11 +233,25 @@ public class GlobalStateStoreProviderTest {
         for (final ReadOnlyWindowStore<String, String> store : stores) {
             assertThat(store, instanceOf(ReadOnlyWindowStore.class));
             assertThat(store, not(instanceOf(TimestampedWindowStore.class)));
+            assertThat(store, 
not(instanceOf(TimestampedWindowStoreWithHeaders.class)));
         }
     }
 
     @Test
-    public void shouldNotReturnWindowStoreAsTimestampedStore() {
+    public void shouldReturnTimestampedWindowStore() {
+        final GlobalStateStoreProvider provider = new 
GlobalStateStoreProvider(stores);
+        final List<ReadOnlyWindowStore<String, ValueAndTimestamp<String>>> 
stores =
+            provider.stores("ts-w-store", 
QueryableStoreTypes.timestampedWindowStore());
+        assertEquals(1, stores.size());
+        for (final ReadOnlyWindowStore<String, ValueAndTimestamp<String>> 
store : stores) {
+            assertThat(store, instanceOf(ReadOnlyWindowStore.class));
+            assertThat(store, instanceOf(TimestampedWindowStore.class));
+            assertThat(store, 
not(instanceOf(TimestampedWindowStoreWithHeaders.class)));
+        }
+    }
+
+    @Test
+    public void shouldNotReturnWindowStoreAsTimestampedWindowStore() {
         final GlobalStateStoreProvider provider = new 
GlobalStateStoreProvider(stores);
         final List<ReadOnlyWindowStore<String, ValueAndTimestamp<String>>> 
stores =
                 provider.stores("w-store", 
QueryableStoreTypes.timestampedWindowStore());
@@ -233,12 +261,13 @@ public class GlobalStateStoreProviderTest {
     @Test
     public void shouldReturnTimestampedWindowStoreAsWindowStore() {
         final GlobalStateStoreProvider provider = new 
GlobalStateStoreProvider(stores);
-        final List<ReadOnlyWindowStore<String, ValueAndTimestamp<String>>> 
stores =
+        final List<ReadOnlyWindowStore<String, String>> stores =
             provider.stores("ts-w-store", QueryableStoreTypes.windowStore());
         assertEquals(1, stores.size());
-        for (final ReadOnlyWindowStore<String, ValueAndTimestamp<String>> 
store : stores) {
+        for (final ReadOnlyWindowStore<String, String> store : stores) {
             assertThat(store, instanceOf(ReadOnlyWindowStore.class));
             assertThat(store, not(instanceOf(TimestampedWindowStore.class)));
+            assertThat(store, 
not(instanceOf(TimestampedWindowStoreWithHeaders.class)));
         }
     }
 
@@ -250,29 +279,41 @@ public class GlobalStateStoreProviderTest {
         assertEquals(1, stores.size());
         for (final ReadOnlySessionStore<String, String> store : stores) {
             assertThat(store, instanceOf(ReadOnlySessionStore.class));
+            assertThat(store, not(instanceOf(SessionStoreWithHeaders.class)));
         }
     }
 
     @Test
-    public void 
shouldReturnKeyValueStoreWithHeadersFacadeForHeadersAwareStore() {
+    public void shouldReturnTimestampedKeyValueStoreWithHeaders() {
         final GlobalStateStoreProvider provider = new 
GlobalStateStoreProvider(stores);
-        final List<ReadOnlyKeyValueStore<String, String>> stores =
-            provider.stores("ts-kv-store-with-headers", 
QueryableStoreTypes.keyValueStore());
+        final List<ReadOnlyKeyValueStore<String, 
ValueTimestampHeaders<String>>> stores =
+            provider.stores("ts-kv-store-with-headers", 
QueryableStoreTypes.timestampedKeyValueStoreWithHeaders());
         assertEquals(1, stores.size());
-        for (final ReadOnlyKeyValueStore<String, String> store : stores) {
-            assertThat(store, 
instanceOf(GenericReadOnlyKeyValueStoreFacade.class));
-            assertThat(store, not(instanceOf(TimestampedKeyValueStore.class)));
-            assertThat(store, 
not(instanceOf(TimestampedKeyValueStoreWithHeaders.class)));
+        for (final ReadOnlyKeyValueStore<String, 
ValueTimestampHeaders<String>> store : stores) {
+            assertThat(store, instanceOf(ReadOnlyKeyValueStore.class));
+            assertThat(store, 
instanceOf(TimestampedKeyValueStoreWithHeaders.class));
         }
     }
 
     @Test
-    public void 
shouldReturnTimestampedKeyValueStoreWithHeadersFacadeForHeadersAwareStore() {
+    public void 
shouldReturnTimestampedKeyValueStoreWithHeadersAsTimestampedKeyValueStores() {
         final GlobalStateStoreProvider provider = new 
GlobalStateStoreProvider(stores);
         final List<ReadOnlyKeyValueStore<String, ValueAndTimestamp<String>>> 
stores =
             provider.stores("ts-kv-store-with-headers", 
QueryableStoreTypes.timestampedKeyValueStore());
         assertEquals(1, stores.size());
         for (final ReadOnlyKeyValueStore<String, ValueAndTimestamp<String>> 
store : stores) {
+            assertThat(store, 
instanceOf(GenericReadOnlyKeyValueStoreFacade.class));
+            assertThat(store, 
not(instanceOf(TimestampedKeyValueStoreWithHeaders.class)));
+        }
+    }
+
+    @Test
+    public void 
shouldReturnTimestampedKeyValueStoreWithHeadersAsKeyValueStores() {
+        final GlobalStateStoreProvider provider = new 
GlobalStateStoreProvider(stores);
+        final List<ReadOnlyKeyValueStore<String, String>> stores =
+            provider.stores("ts-kv-store-with-headers", 
QueryableStoreTypes.keyValueStore());
+        assertEquals(1, stores.size());
+        for (final ReadOnlyKeyValueStore<String, String> store : stores) {
             assertThat(store, 
instanceOf(GenericReadOnlyKeyValueStoreFacade.class));
             assertThat(store, not(instanceOf(TimestampedKeyValueStore.class)));
             assertThat(store, 
not(instanceOf(TimestampedKeyValueStoreWithHeaders.class)));
@@ -280,20 +321,35 @@ public class GlobalStateStoreProviderTest {
     }
 
     @Test
-    public void 
shouldReturnWindowStoreWithHeadersFacadeForHeadersAwareWindowStore() {
+    public void 
shouldNotReturnTimestampedKeyValueStoreAsTimestampedKeyValueStoreWithHeaders() {
         final GlobalStateStoreProvider provider = new 
GlobalStateStoreProvider(stores);
-        final List<ReadOnlyWindowStore<String, String>> stores =
-            provider.stores("ts-w-store-with-headers", 
QueryableStoreTypes.windowStore());
+        final List<ReadOnlyKeyValueStore<String, 
ValueTimestampHeaders<String>>> stores =
+            provider.stores("ts-kv-store", 
QueryableStoreTypes.timestampedKeyValueStoreWithHeaders());
+        assertEquals(0, stores.size());
+    }
+
+    @Test
+    public void 
shouldNotReturnKeyValueStoreAsTimestampedKeyValueStoreWithHeaders() {
+        final GlobalStateStoreProvider provider = new 
GlobalStateStoreProvider(stores);
+        final List<ReadOnlyKeyValueStore<String, 
ValueTimestampHeaders<String>>> stores =
+            provider.stores("kv-store", 
QueryableStoreTypes.timestampedKeyValueStoreWithHeaders());
+        assertEquals(0, stores.size());
+    }
+
+    @Test
+    public void shouldReturnTimestampedWindowStoreWithHeaders() {
+        final GlobalStateStoreProvider provider = new 
GlobalStateStoreProvider(stores);
+        final List<ReadOnlyWindowStore<String, ValueTimestampHeaders<String>>> 
stores =
+            provider.stores("ts-w-store-with-headers", 
QueryableStoreTypes.timestampedWindowStoreWithHeaders());
         assertEquals(1, stores.size());
-        for (final ReadOnlyWindowStore<String, String> store : stores) {
-            assertThat(store, 
instanceOf(GenericReadOnlyWindowStoreFacade.class));
-            assertThat(store, not(instanceOf(TimestampedWindowStore.class)));
-            assertThat(store, 
not(instanceOf(TimestampedWindowStoreWithHeaders.class)));
+        for (final ReadOnlyWindowStore<String, ValueTimestampHeaders<String>> 
store : stores) {
+            assertThat(store, instanceOf(ReadOnlyWindowStore.class));
+            assertThat(store, 
instanceOf(TimestampedWindowStoreWithHeaders.class));
         }
     }
 
     @Test
-    public void 
shouldReturnTimestampedWindowStoreWithHeadersFacadeForHeadersAwareWindowStore() 
{
+    public void 
shouldReturnTimestampedWindowStoreWithHeadersAsTimestampedWindowStore() {
         final GlobalStateStoreProvider provider = new 
GlobalStateStoreProvider(stores);
         final List<ReadOnlyWindowStore<String, ValueAndTimestamp<String>>> 
stores =
             provider.stores("ts-w-store-with-headers", 
QueryableStoreTypes.timestampedWindowStore());
@@ -301,8 +357,69 @@ public class GlobalStateStoreProviderTest {
         for (final ReadOnlyWindowStore<String, ValueAndTimestamp<String>> 
store : stores) {
             assertThat(store, instanceOf(ReadOnlyWindowStore.class));
             assertThat(store, 
instanceOf(GenericReadOnlyWindowStoreFacade.class));
-            assertThat(store, not(instanceOf(TimestampedWindowStore.class)));
             assertThat(store, 
not(instanceOf(TimestampedWindowStoreWithHeaders.class)));
         }
     }
+
+    @Test
+    public void shouldReturnTimestampedWindowStoreWithHeadersAsWindowStore() {
+        final GlobalStateStoreProvider provider = new 
GlobalStateStoreProvider(stores);
+        final List<ReadOnlyWindowStore<String, String>> stores =
+            provider.stores("ts-w-store-with-headers", 
QueryableStoreTypes.windowStore());
+        assertEquals(1, stores.size());
+        for (final ReadOnlyWindowStore<String, String> store : stores) {
+            assertThat(store, instanceOf(ReadOnlyWindowStore.class));
+            assertThat(store, 
instanceOf(GenericReadOnlyWindowStoreFacade.class));
+            assertThat(store, 
not(instanceOf(TimestampedWindowStoreWithHeaders.class)));
+        }
+    }
+
+    @Test
+    public void 
shouldNotReturnTimestampedWindowStoreAsTimestampedWindowStoreWithHeaders() {
+        final GlobalStateStoreProvider provider = new 
GlobalStateStoreProvider(stores);
+        final List<ReadOnlyKeyValueStore<String, 
ValueTimestampHeaders<String>>> stores =
+            provider.stores("ts-w-store", 
QueryableStoreTypes.timestampedKeyValueStoreWithHeaders());
+        assertEquals(0, stores.size());
+    }
+
+    @Test
+    public void 
shouldNotReturnWindowStoreAsTimestampedWindowStoreWithHeaders() {
+        final GlobalStateStoreProvider provider = new 
GlobalStateStoreProvider(stores);
+        final List<ReadOnlyKeyValueStore<String, 
ValueTimestampHeaders<String>>> stores =
+            provider.stores("w-store", 
QueryableStoreTypes.timestampedKeyValueStoreWithHeaders());
+        assertEquals(0, stores.size());
+    }
+
+    @Test
+    public void shouldReturnSessionStoreWithHeaders() {
+        final GlobalStateStoreProvider provider = new 
GlobalStateStoreProvider(stores);
+        final List<ReadOnlySessionStore<String, 
AggregationWithHeaders<String>>> stores =
+            provider.stores("s-store-with-headers", 
QueryableStoreTypes.sessionStoreWithHeaders());
+        assertEquals(1, stores.size());
+        for (final ReadOnlySessionStore<String, 
AggregationWithHeaders<String>> store : stores) {
+            assertThat(store, instanceOf(ReadOnlySessionStore.class));
+            assertThat(store, instanceOf(SessionStoreWithHeaders.class));
+        }
+    }
+
+    @Test
+    public void shouldReturnSessionStoreWithHeadersAsSessionStore() {
+        final GlobalStateStoreProvider provider = new 
GlobalStateStoreProvider(stores);
+        final List<ReadOnlySessionStore<String, String>> stores =
+            provider.stores("s-store-with-headers", 
QueryableStoreTypes.sessionStore());
+        assertEquals(1, stores.size());
+        for (final ReadOnlySessionStore<String, String> store : stores) {
+            assertThat(store, instanceOf(ReadOnlySessionStore.class));
+            assertThat(store, not(instanceOf(SessionStoreWithHeaders.class)));
+        }
+    }
+
+    @Test
+    public void shouldNotReturnSessionStoreAsSessionStoreWithHeaders() {
+        final GlobalStateStoreProvider provider = new 
GlobalStateStoreProvider(stores);
+        final List<ReadOnlySessionStore<String, 
AggregationWithHeaders<String>>> stores =
+            provider.stores("s-store", 
QueryableStoreTypes.sessionStoreWithHeaders());
+        assertEquals(0, stores.size());
+    }
+
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
index 0bd44815997..59cd8ff8ef1 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
@@ -52,16 +52,19 @@ import 
org.apache.kafka.streams.processor.internals.StreamThread;
 import org.apache.kafka.streams.processor.internals.StreamsProducer;
 import org.apache.kafka.streams.processor.internals.Task;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.AggregationWithHeaders;
 import org.apache.kafka.streams.state.QueryableStoreTypes;
 import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
 import org.apache.kafka.streams.state.ReadOnlySessionStore;
 import org.apache.kafka.streams.state.ReadOnlyWindowStore;
+import org.apache.kafka.streams.state.SessionStoreWithHeaders;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.state.TimestampedKeyValueStore;
 import org.apache.kafka.streams.state.TimestampedKeyValueStoreWithHeaders;
 import org.apache.kafka.streams.state.TimestampedWindowStore;
 import org.apache.kafka.streams.state.TimestampedWindowStoreWithHeaders;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.ValueTimestampHeaders;
 import org.apache.kafka.test.MockApiProcessorSupplier;
 import org.apache.kafka.test.TestUtils;
 
@@ -170,6 +173,14 @@ public class StreamThreadStateStoreProviderTest {
                 Serdes.String(),
                 Serdes.String()),
             "the-processor");
+        topology.addStateStore(
+            Stores.sessionStoreWithHeadersBuilder(
+                Stores.inMemorySessionStore(
+                    "session-store-with-headers",
+                    Duration.ofMillis(10L)),
+                Serdes.String(),
+                Serdes.String()),
+            "the-processor");
 
         final Properties properties = new Properties();
         final String applicationId = "applicationId";
@@ -228,6 +239,7 @@ public class StreamThreadStateStoreProviderTest {
         for (final ReadOnlyKeyValueStore<String, String> store: kvStores) {
             assertThat(store, instanceOf(ReadOnlyKeyValueStore.class));
             assertThat(store, not(instanceOf(TimestampedKeyValueStore.class)));
+            assertThat(store, 
not(instanceOf(TimestampedKeyValueStoreWithHeaders.class)));
         }
     }
 
@@ -240,6 +252,7 @@ public class StreamThreadStateStoreProviderTest {
         for (final ReadOnlyKeyValueStore<String, ValueAndTimestamp<String>> 
store: tkvStores) {
             assertThat(store, instanceOf(ReadOnlyKeyValueStore.class));
             assertThat(store, instanceOf(TimestampedKeyValueStore.class));
+            assertThat(store, 
not(instanceOf(TimestampedKeyValueStoreWithHeaders.class)));
         }
     }
 
@@ -264,12 +277,13 @@ public class StreamThreadStateStoreProviderTest {
     @Test
     public void shouldFindTimestampedKeyValueStoresAsKeyValueStores() {
         mockThread(true);
-        final List<ReadOnlyKeyValueStore<String, ValueAndTimestamp<String>>> 
tkvStores =
+        final List<ReadOnlyKeyValueStore<String, String>> tkvStores =
                 
provider.stores(StoreQueryParameters.fromNameAndType("timestamped-kv-store", 
QueryableStoreTypes.keyValueStore()));
         assertEquals(2, tkvStores.size());
-        for (final ReadOnlyKeyValueStore<String, ValueAndTimestamp<String>> 
store: tkvStores) {
+        for (final ReadOnlyKeyValueStore<String, String> store: tkvStores) {
             assertThat(store, instanceOf(ReadOnlyKeyValueStore.class));
             assertThat(store, not(instanceOf(TimestampedKeyValueStore.class)));
+            assertThat(store, 
not(instanceOf(TimestampedKeyValueStoreWithHeaders.class)));
         }
     }
 
@@ -282,6 +296,7 @@ public class StreamThreadStateStoreProviderTest {
         for (final ReadOnlyWindowStore<String, String> store: windowStores) {
             assertThat(store, instanceOf(ReadOnlyWindowStore.class));
             assertThat(store, not(instanceOf(TimestampedWindowStore.class)));
+            assertThat(store, 
not(instanceOf(TimestampedWindowStoreWithHeaders.class)));
         }
     }
 
@@ -294,6 +309,7 @@ public class StreamThreadStateStoreProviderTest {
         for (final ReadOnlyWindowStore<String, ValueAndTimestamp<String>> 
store: windowStores) {
             assertThat(store, instanceOf(ReadOnlyWindowStore.class));
             assertThat(store, instanceOf(TimestampedWindowStore.class));
+            assertThat(store, 
not(instanceOf(TimestampedWindowStoreWithHeaders.class)));
         }
     }
 
@@ -318,12 +334,13 @@ public class StreamThreadStateStoreProviderTest {
     @Test
     public void shouldFindTimestampedWindowStoresAsWindowStore() {
         mockThread(true);
-        final List<ReadOnlyWindowStore<String, ValueAndTimestamp<String>>> 
windowStores =
+        final List<ReadOnlyWindowStore<String, String>> windowStores =
             
provider.stores(StoreQueryParameters.fromNameAndType("timestamped-window-store",
 QueryableStoreTypes.windowStore()));
         assertEquals(2, windowStores.size());
-        for (final ReadOnlyWindowStore<String, ValueAndTimestamp<String>> 
store: windowStores) {
+        for (final ReadOnlyWindowStore<String, String> store: windowStores) {
             assertThat(store, instanceOf(ReadOnlyWindowStore.class));
             assertThat(store, not(instanceOf(TimestampedWindowStore.class)));
+            assertThat(store, 
not(instanceOf(TimestampedWindowStoreWithHeaders.class)));
         }
     }
 
@@ -335,6 +352,7 @@ public class StreamThreadStateStoreProviderTest {
         assertEquals(2, sessionStores.size());
         for (final ReadOnlySessionStore<String, String> store: sessionStores) {
             assertThat(store, instanceOf(ReadOnlySessionStore.class));
+            assertThat(store, not(instanceOf(SessionStoreWithHeaders.class)));
         }
     }
 
@@ -433,6 +451,19 @@ public class StreamThreadStateStoreProviderTest {
 
     @Test
     public void shouldFindTimestampedKeyValueStoresWithHeaders() {
+        mockThread(true);
+        final List<ReadOnlyKeyValueStore<String, ValueAndTimestamp<String>>> 
stores =
+            
provider.stores(StoreQueryParameters.fromNameAndType("timestamped-kv-store-with-headers",
+                QueryableStoreTypes.timestampedKeyValueStoreWithHeaders()));
+        assertEquals(2, stores.size());
+        for (final ReadOnlyKeyValueStore<String, ValueAndTimestamp<String>> 
store : stores) {
+            assertThat(store, instanceOf(ReadOnlyKeyValueStore.class));
+            assertThat(store, 
instanceOf(TimestampedKeyValueStoreWithHeaders.class));
+        }
+    }
+
+    @Test
+    public void 
shouldFindTimestampedKeyValueStoresWithHeadersAsTimestampedKeyValueStore() {
         mockThread(true);
         final List<ReadOnlyKeyValueStore<String, ValueAndTimestamp<String>>> 
stores =
             
provider.stores(StoreQueryParameters.fromNameAndType("timestamped-kv-store-with-headers",
@@ -440,7 +471,6 @@ public class StreamThreadStateStoreProviderTest {
         assertEquals(2, stores.size());
         for (final ReadOnlyKeyValueStore<String, ValueAndTimestamp<String>> 
store : stores) {
             assertThat(store, 
instanceOf(GenericReadOnlyKeyValueStoreFacade.class));
-            assertThat(store, not(instanceOf(TimestampedKeyValueStore.class)));
             assertThat(store, 
not(instanceOf(TimestampedKeyValueStoreWithHeaders.class)));
         }
     }
@@ -459,8 +489,57 @@ public class StreamThreadStateStoreProviderTest {
         }
     }
 
+    @Test
+    public void shouldNotFindKeyValueStoresAsHeadersStore() {
+        mockThread(true);
+        final InvalidStateStoreException exception = assertThrows(
+            InvalidStateStoreException.class,
+            () -> 
provider.stores(StoreQueryParameters.fromNameAndType("kv-store", 
QueryableStoreTypes.timestampedKeyValueStoreWithHeaders()))
+        );
+        assertThat(
+            exception.getMessage(),
+            is(
+                "Cannot get state store kv-store because the queryable store 
type " +
+                    "[class 
org.apache.kafka.streams.state.QueryableStoreTypes$TimestampedKeyValueStoreWithHeadersType]
 " +
+                    "does not accept the actual store type " +
+                    "[class 
org.apache.kafka.streams.state.internals.MeteredKeyValueStore]."
+            )
+        );
+    }
+
+    @Test
+    public void shouldNotFindTimestampedKeyValueStoresAsHeadersStore() {
+        mockThread(true);
+        final InvalidStateStoreException exception = assertThrows(
+            InvalidStateStoreException.class,
+            () -> 
provider.stores(StoreQueryParameters.fromNameAndType("timestamped-kv-store", 
QueryableStoreTypes.timestampedKeyValueStoreWithHeaders()))
+        );
+        assertThat(
+            exception.getMessage(),
+            is(
+                "Cannot get state store timestamped-kv-store because the 
queryable store type " +
+                    "[class 
org.apache.kafka.streams.state.QueryableStoreTypes$TimestampedKeyValueStoreWithHeadersType]
 " +
+                    "does not accept the actual store type " +
+                    "[class 
org.apache.kafka.streams.state.internals.MeteredTimestampedKeyValueStore]."
+            )
+        );
+    }
+
     @Test
     public void shouldFindTimestampedWindowStoresWithHeaders() {
+        mockThread(true);
+        final List<ReadOnlyWindowStore<String, ValueTimestampHeaders<String>>> 
stores =
+            
provider.stores(StoreQueryParameters.fromNameAndType("timestamped-window-store-with-headers",
+                QueryableStoreTypes.timestampedWindowStoreWithHeaders()));
+        assertEquals(2, stores.size());
+        for (final ReadOnlyWindowStore<String, ValueTimestampHeaders<String>> 
store : stores) {
+            assertThat(store, instanceOf(ReadOnlyWindowStore.class));
+            assertThat(store, 
instanceOf(TimestampedWindowStoreWithHeaders.class));
+        }
+    }
+
+    @Test
+    public void 
shouldFindTimestampedWindowStoresWithHeadersAsTimestampedWindowStore() {
         mockThread(true);
         final List<ReadOnlyWindowStore<String, ValueAndTimestamp<String>>> 
stores =
             
provider.stores(StoreQueryParameters.fromNameAndType("timestamped-window-store-with-headers",
@@ -468,7 +547,6 @@ public class StreamThreadStateStoreProviderTest {
         assertEquals(2, stores.size());
         for (final ReadOnlyWindowStore<String, ValueAndTimestamp<String>> 
store : stores) {
             assertThat(store, 
instanceOf(GenericReadOnlyWindowStoreFacade.class));
-            assertThat(store, not(instanceOf(TimestampedWindowStore.class)));
             assertThat(store, 
not(instanceOf(TimestampedWindowStoreWithHeaders.class)));
         }
     }
@@ -487,6 +565,84 @@ public class StreamThreadStateStoreProviderTest {
         }
     }
 
+    @Test
+    public void shouldNotFindWindowStoresAsHeadersStore() {
+        mockThread(true);
+        final InvalidStateStoreException exception = assertThrows(
+            InvalidStateStoreException.class,
+            () -> 
provider.stores(StoreQueryParameters.fromNameAndType("window-store", 
QueryableStoreTypes.timestampedWindowStoreWithHeaders()))
+        );
+        assertThat(
+            exception.getMessage(),
+            is(
+                "Cannot get state store window-store because the queryable 
store type " +
+                    "[class 
org.apache.kafka.streams.state.QueryableStoreTypes$TimestampedWindowStoreWithHeadersType]
 " +
+                    "does not accept the actual store type " +
+                    "[class 
org.apache.kafka.streams.state.internals.MeteredWindowStore]."
+            )
+        );
+    }
+
+    @Test
+    public void shouldNotFindTimestampedWindowStoresAsHeadersStore() {
+        mockThread(true);
+        final InvalidStateStoreException exception = assertThrows(
+            InvalidStateStoreException.class,
+            () -> 
provider.stores(StoreQueryParameters.fromNameAndType("timestamped-window-store",
 QueryableStoreTypes.timestampedWindowStoreWithHeaders()))
+        );
+        assertThat(
+            exception.getMessage(),
+            is(
+                "Cannot get state store timestamped-window-store because the 
queryable store type " +
+                    "[class 
org.apache.kafka.streams.state.QueryableStoreTypes$TimestampedWindowStoreWithHeadersType]
 " +
+                    "does not accept the actual store type " +
+                    "[class 
org.apache.kafka.streams.state.internals.MeteredTimestampedWindowStore]."
+            )
+        );
+    }
+
+    @Test
+    public void shouldFindSessionStoresWithHeaders() {
+        mockThread(true);
+        final List<ReadOnlySessionStore<String, 
AggregationWithHeaders<String>>> sessionStores =
+            
provider.stores(StoreQueryParameters.fromNameAndType("session-store-with-headers",
 QueryableStoreTypes.sessionStoreWithHeaders()));
+        assertEquals(2, sessionStores.size());
+        for (final ReadOnlySessionStore<String, 
AggregationWithHeaders<String>> store: sessionStores) {
+            assertThat(store, instanceOf(ReadOnlySessionStore.class));
+            assertThat(store, instanceOf(SessionStoreWithHeaders.class));
+        }
+    }
+
+    @Test
+    public void shouldFindSessionStoresWithHeadersAsSessionStore() {
+        mockThread(true);
+        final List<ReadOnlySessionStore<String, String>> sessionStores =
+            
provider.stores(StoreQueryParameters.fromNameAndType("session-store-with-headers",
 QueryableStoreTypes.sessionStore()));
+        assertEquals(2, sessionStores.size());
+        for (final ReadOnlySessionStore<String, String> store: sessionStores) {
+            assertThat(store, instanceOf(ReadOnlySessionStoreFacade.class));
+            assertThat(store, not(instanceOf(SessionStoreWithHeaders.class)));
+        }
+    }
+
+    @Test
+    public void shouldNotFindSessionStoresAsSessionStoreWithHeaders() {
+        mockThread(true);
+        final InvalidStateStoreException exception = assertThrows(
+            InvalidStateStoreException.class,
+            () -> 
provider.stores(StoreQueryParameters.fromNameAndType("session-store", 
QueryableStoreTypes.sessionStoreWithHeaders()))
+        );
+        assertThat(
+            exception.getMessage(),
+            is(
+                "Cannot get state store session-store because the queryable 
store type " +
+                    "[class 
org.apache.kafka.streams.state.QueryableStoreTypes$SessionStoreWithHeadersType] 
" +
+                    "does not accept the actual store type " +
+                    "[class 
org.apache.kafka.streams.state.internals.MeteredSessionStore]."
+            )
+        );
+    }
+
     private StreamTask createStreamsTask(final StreamsConfig streamsConfig,
                                          final Consumer<byte[], byte[]> 
consumer,
                                          final Producer<byte[], byte[]> 
producer,

Reply via email to