This is an automated email from the ASF dual-hosted git repository.

frankvicky pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a2efacd44ef KAFKA-20307: Add Interactive Queries (IQv1) support for 
headers-aware stores (#21744)
a2efacd44ef is described below

commit a2efacd44ef298c5725bf012fcd10259b9d12518
Author: Alieh Saeedi <[email protected]>
AuthorDate: Mon Mar 16 14:22:47 2026 +0100

    KAFKA-20307: Add Interactive Queries (IQv1) support for headers-aware 
stores (#21744)
    
    This PR adds backward compatibility support for the new headers-aware
    state stores (TimestampedKeyValueStoreWithHeaders and
    TimestampedWindowStoreWithHeaders) when queried through the
    Interactive Queries v1 (IQv1) API. Users can now query headers-aware
    stores using the existing QueryableStoreTypes API without any code
    changes.
    
    Reviewers: Matthias J. Sax <[email protected]>
---
 .../HeadersStoreUpgradeIntegrationTest.java        |  92 +++++---
 .../internals/AbstractReadOnlyDecorator.java       |  11 +
 .../processor/internals/StateManagerUtil.java      |  29 +++
 .../kafka/streams/state/QueryableStoreTypes.java   |  18 +-
 .../state/internals/GlobalStateStoreProvider.java  |  22 +-
 .../internals/StreamThreadStateStoreProvider.java  |  22 +-
 .../streams/state/internals/ValueConverters.java   |   2 +-
 .../WindowToTimestampedWindowByteStoreAdapter.java |   2 +-
 .../internals/StateManagerUtilConverterTest.java   | 221 +++++++++++++++++++
 .../state/QueryableStoreTypesWithHeadersTest.java  | 233 +++++++++++++++++++++
 .../GenericReadOnlyKeyValueStoreFacadeTest.java    |   2 +-
 .../GenericReadOnlyWindowStoreFacadeTest.java      |   2 +-
 .../internals/GlobalStateStoreProviderTest.java    |  71 +++++++
 .../StreamThreadStateStoreProviderTest.java        |  74 +++++++
 .../state/internals/ValueConvertersTest.java       |   6 +-
 .../apache/kafka/streams/TopologyTestDriver.java   |   4 +-
 16 files changed, 764 insertions(+), 47 deletions(-)

diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HeadersStoreUpgradeIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HeadersStoreUpgradeIntegrationTest.java
index 6470f3942bb..f9614f5909a 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HeadersStoreUpgradeIntegrationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HeadersStoreUpgradeIntegrationTest.java
@@ -37,6 +37,7 @@ import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.state.AggregationWithHeaders;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.QueryableStoreType;
 import org.apache.kafka.streams.state.QueryableStoreTypes;
 import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
 import org.apache.kafka.streams.state.ReadOnlySessionStore;
@@ -51,6 +52,9 @@ import 
org.apache.kafka.streams.state.TimestampedWindowStoreWithHeaders;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
 import org.apache.kafka.streams.state.ValueTimestampHeaders;
 import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.state.internals.CompositeReadOnlyKeyValueStore;
+import org.apache.kafka.streams.state.internals.CompositeReadOnlyWindowStore;
+import org.apache.kafka.streams.state.internals.StateStoreProvider;
 import org.apache.kafka.test.TestUtils;
 
 import org.junit.jupiter.api.AfterAll;
@@ -479,7 +483,7 @@ public class HeadersStoreUpgradeIntegrationTest {
             () -> {
                 try {
                     final ReadOnlyKeyValueStore<K, ValueTimestampHeaders<V>> 
store = IntegrationTestUtils
-                        .getStore(STORE_NAME, kafkaStreams, 
QueryableStoreTypes.keyValueStore());
+                        .getStore(STORE_NAME, kafkaStreams, new 
TimestampedKeyValueStoreWithHeadersType<>());
 
                     if (store == null)
                         return false;
@@ -520,7 +524,7 @@ public class HeadersStoreUpgradeIntegrationTest {
             () -> {
                 try {
                     final ReadOnlyKeyValueStore<K, ValueTimestampHeaders<V>> 
store = IntegrationTestUtils
-                        .getStore(STORE_NAME, kafkaStreams, 
QueryableStoreTypes.keyValueStore());
+                        .getStore(STORE_NAME, kafkaStreams, new 
TimestampedKeyValueStoreWithHeadersType<>());
 
                     if (store == null)
                         return false;
@@ -546,7 +550,7 @@ public class HeadersStoreUpgradeIntegrationTest {
             () -> {
                 try {
                     final ReadOnlyKeyValueStore<K, ValueTimestampHeaders<V>> 
store = IntegrationTestUtils
-                        .getStore(STORE_NAME, kafkaStreams, 
QueryableStoreTypes.keyValueStore());
+                        .getStore(STORE_NAME, kafkaStreams, new 
TimestampedKeyValueStoreWithHeadersType<>());
 
                     if (store == null)
                         return false;
@@ -565,30 +569,6 @@ public class HeadersStoreUpgradeIntegrationTest {
             "Could not get expected result in time.");
     }
 
-    private <K, V> void verifyLegacyValuesWithEmptyHeaders(final K key,
-                                                           final V value) 
throws Exception {
-        TestUtils.waitForCondition(
-            () -> {
-                try {
-                    final ReadOnlyKeyValueStore<K, ValueTimestampHeaders<V>> 
store = IntegrationTestUtils
-                        .getStore(STORE_NAME, kafkaStreams, 
QueryableStoreTypes.keyValueStore());
-
-                    if (store == null)
-                        return false;
-
-                    final ValueTimestampHeaders<V> result = store.get(key);
-                    return result != null
-                        && result.value().equals(value)
-                        && result.headers().toArray().length == 0;
-                } catch (final Exception swallow) {
-                    LOG.error("Error while verifying legacy value with empty 
headers", swallow);
-                    return false;
-                }
-            },
-            60_000L,
-            "Could not get expected result in time.");
-    }
-
     private static class KeyValueProcessor implements Processor<String, 
String, Void, Void> {
         private KeyValueStore<String, String> store;
 
@@ -906,7 +886,7 @@ public class HeadersStoreUpgradeIntegrationTest {
         TestUtils.waitForCondition(() -> {
             try {
                 final ReadOnlyWindowStore<String, 
ValueTimestampHeaders<String>> store =
-                    IntegrationTestUtils.getStore(WINDOW_STORE_NAME, 
kafkaStreams, QueryableStoreTypes.windowStore());
+                    IntegrationTestUtils.getStore(WINDOW_STORE_NAME, 
kafkaStreams, new TimestampedWindowStoreWithHeadersType<>());
 
                 if (store == null) {
                     return false;
@@ -963,7 +943,7 @@ public class HeadersStoreUpgradeIntegrationTest {
         TestUtils.waitForCondition(() -> {
             try {
                 final ReadOnlyWindowStore<String, 
ValueTimestampHeaders<String>> store =
-                    IntegrationTestUtils.getStore(WINDOW_STORE_NAME, 
kafkaStreams, QueryableStoreTypes.windowStore());
+                    IntegrationTestUtils.getStore(WINDOW_STORE_NAME, 
kafkaStreams, new TimestampedWindowStoreWithHeadersType<>());
 
                 if (store == null) {
                     return false;
@@ -1049,7 +1029,7 @@ public class HeadersStoreUpgradeIntegrationTest {
         TestUtils.waitForCondition(() -> {
             try {
                 final ReadOnlyWindowStore<String, 
ValueTimestampHeaders<String>> store =
-                    IntegrationTestUtils.getStore(WINDOW_STORE_NAME, 
kafkaStreams, QueryableStoreTypes.windowStore());
+                    IntegrationTestUtils.getStore(WINDOW_STORE_NAME, 
kafkaStreams, new TimestampedWindowStoreWithHeadersType<>());
 
                 if (store == null) {
                     return false;
@@ -1089,7 +1069,7 @@ public class HeadersStoreUpgradeIntegrationTest {
         TestUtils.waitForCondition(() -> {
             try {
                 final ReadOnlyWindowStore<String, 
ValueTimestampHeaders<String>> store =
-                    IntegrationTestUtils.getStore(WINDOW_STORE_NAME, 
kafkaStreams, QueryableStoreTypes.windowStore());
+                    IntegrationTestUtils.getStore(WINDOW_STORE_NAME, 
kafkaStreams, new TimestampedWindowStoreWithHeadersType<>());
 
                 if (store == null) {
                     return false;
@@ -1493,7 +1473,7 @@ public class HeadersStoreUpgradeIntegrationTest {
     private boolean windowStoreContainsKey(final String key, final long 
timestamp) {
         try {
             final ReadOnlyWindowStore<String, ValueTimestampHeaders<String>> 
store =
-                IntegrationTestUtils.getStore(WINDOW_STORE_NAME, kafkaStreams, 
QueryableStoreTypes.windowStore());
+                IntegrationTestUtils.getStore(WINDOW_STORE_NAME, kafkaStreams, 
new TimestampedWindowStoreWithHeadersType<>());
 
             if (store == null) {
                 return false;
@@ -2037,4 +2017,52 @@ public class HeadersStoreUpgradeIntegrationTest {
             store.put(sessionKey, AggregationWithHeaders.make(record.value(), 
record.headers()));
         }
     }
+
+    // ==================== Custom QueryableStoreTypes ====================
+
+    /**
+     * Custom QueryableStoreType for querying 
TimestampedKeyValueStoreWithHeaders directly
+     * without facade wrapping. This returns the full ValueTimestampHeaders 
wrapper.
+     */
+    private static class TimestampedKeyValueStoreWithHeadersType<K, V>
+        implements QueryableStoreType<ReadOnlyKeyValueStore<K, 
ValueTimestampHeaders<V>>> {
+
+        @Override
+        public boolean accepts(final 
org.apache.kafka.streams.processor.StateStore stateStore) {
+            // Accept stores that implement both 
TimestampedKeyValueStoreWithHeaders and ReadOnlyKeyValueStore
+            return stateStore instanceof TimestampedKeyValueStoreWithHeaders
+                && stateStore instanceof ReadOnlyKeyValueStore;
+        }
+
+        @Override
+        public ReadOnlyKeyValueStore<K, ValueTimestampHeaders<V>> create(
+            final StateStoreProvider storeProvider,
+            final String storeName) {
+            return new CompositeReadOnlyKeyValueStore<>(storeProvider, this, 
storeName);
+        }
+    }
+
+    /**
+     * Custom queryable store type for accessing 
TimestampedWindowStoreWithHeaders directly
+     * without facade wrapping. This returns the full ValueTimestampHeaders 
wrapper.
+     */
+    private static class TimestampedWindowStoreWithHeadersType<K, V>
+        implements QueryableStoreType<ReadOnlyWindowStore<K, 
ValueTimestampHeaders<V>>> {
+
+        @Override
+        public boolean accepts(final 
org.apache.kafka.streams.processor.StateStore stateStore) {
+            // Accept stores that implement both 
TimestampedWindowStoreWithHeaders and ReadOnlyWindowStore
+            return stateStore instanceof TimestampedWindowStoreWithHeaders
+                && stateStore instanceof ReadOnlyWindowStore;
+        }
+
+        @Override
+        public ReadOnlyWindowStore<K, ValueTimestampHeaders<V>> create(
+            final StateStoreProvider storeProvider,
+            final String storeName) {
+            return new CompositeReadOnlyWindowStore<>(storeProvider, this, 
storeName);
+        }
+    }
+
+
 }
\ No newline at end of file
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadOnlyDecorator.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadOnlyDecorator.java
index 01fb9d7bf3c..56947f10bda 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadOnlyDecorator.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadOnlyDecorator.java
@@ -26,8 +26,10 @@ import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.SessionStore;
 import org.apache.kafka.streams.state.TimestampedKeyValueStore;
+import org.apache.kafka.streams.state.TimestampedKeyValueStoreWithHeaders;
 import org.apache.kafka.streams.state.TimestampedWindowStore;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.ValueTimestampHeaders;
 import org.apache.kafka.streams.state.VersionedKeyValueStore;
 import org.apache.kafka.streams.state.VersionedRecord;
 import org.apache.kafka.streams.state.WindowStore;
@@ -163,6 +165,15 @@ abstract class AbstractReadOnlyDecorator<T extends 
StateStore, K, V> extends Wra
         }
     }
 
+    static class TimestampedKeyValueStoreReadOnlyDecoratorWithHeaders<K, V>
+        extends KeyValueStoreReadOnlyDecorator<K, ValueTimestampHeaders<V>>
+        implements TimestampedKeyValueStoreWithHeaders<K, V> {
+
+        private TimestampedKeyValueStoreReadOnlyDecoratorWithHeaders(final 
TimestampedKeyValueStoreWithHeaders<K, V> inner) {
+            super(inner);
+        }
+    }
+
     static class VersionedKeyValueStoreReadOnlyDecorator<K, V>
         extends AbstractReadOnlyDecorator<VersionedKeyValueStore<K, V>, K, V>
         implements VersionedKeyValueStore<K, V> {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java
index b5e73567330..3d86b5ed15b 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java
@@ -27,7 +27,12 @@ import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.Task.TaskType;
 import org.apache.kafka.streams.state.SessionStore;
+import org.apache.kafka.streams.state.internals.PlainToHeadersStoreAdapter;
+import 
org.apache.kafka.streams.state.internals.PlainToHeadersWindowStoreAdapter;
 import org.apache.kafka.streams.state.internals.RecordConverter;
+import 
org.apache.kafka.streams.state.internals.TimestampedToHeadersStoreAdapter;
+import 
org.apache.kafka.streams.state.internals.TimestampedToHeadersWindowStoreAdapter;
+import org.apache.kafka.streams.state.internals.WrappedStateStore;
 
 import org.slf4j.Logger;
 
@@ -54,6 +59,7 @@ final class StateManagerUtil {
     private StateManagerUtil() {}
 
     static RecordConverter converterForStore(final StateStore store) {
+        // First check if the top-level store implements HeadersBytesStore or 
TimestampedBytesStore
         if (isHeadersAware(store)) {
             if (store instanceof SessionStore) {
                 return rawValueToSessionHeadersValue();
@@ -64,6 +70,29 @@ final class StateManagerUtil {
             // timestamp is used separately during put() process for restore 
of versioned stores
             return rawValueToTimestampedValue();
         }
+
+        // If top-level check didn't find the type, unwrap to find adapters
+        // This handles persistent stores that use adapters
+        StateStore current = store;
+        while (current != null) {
+            if (current instanceof TimestampedToHeadersStoreAdapter || current 
instanceof TimestampedToHeadersWindowStoreAdapter) {
+                // Adapter wraps a timestamped store, so restore in 
timestamped format
+                return rawValueToTimestampedValue();
+            } else if (current instanceof PlainToHeadersStoreAdapter || 
current instanceof PlainToHeadersWindowStoreAdapter) {
+                // Adapter wraps a plain store, so restore in plain format
+                return identity();
+            }
+
+            // If not a WrappedStateStore, we've reached the innermost store
+            if (!(current instanceof WrappedStateStore)) {
+                break;
+            }
+
+            // Unwrap one more level
+            current = ((WrappedStateStore<?, ?, ?>) current).wrapped();
+        }
+
+        // Default to identity if no special handling needed
         return identity();
     }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreTypes.java 
b/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreTypes.java
index e809d0a8b9e..026df05c515 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreTypes.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreTypes.java
@@ -123,15 +123,20 @@ public final class QueryableStoreTypes {
 
     }
 
-    private static class TimestampedKeyValueStoreType<K, V>
+    public static class TimestampedKeyValueStoreType<K, V>
         extends QueryableStoreTypeMatcher<ReadOnlyKeyValueStore<K, 
ValueAndTimestamp<V>>> {
 
         TimestampedKeyValueStoreType() {
             super(Set.of(
-                TimestampedKeyValueStore.class,
                 ReadOnlyKeyValueStore.class));
         }
 
+        @Override
+        public boolean accepts(final StateStore stateStore) {
+            return super.accepts(stateStore) &&
+                (stateStore instanceof TimestampedKeyValueStore || stateStore 
instanceof TimestampedKeyValueStoreWithHeaders);
+        }
+
         @Override
         public ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> create(final 
StateStoreProvider storeProvider,
                                                                      final 
String storeName) {
@@ -152,15 +157,20 @@ public final class QueryableStoreTypes {
         }
     }
 
-    private static class TimestampedWindowStoreType<K, V>
+    public static class TimestampedWindowStoreType<K, V>
         extends QueryableStoreTypeMatcher<ReadOnlyWindowStore<K, 
ValueAndTimestamp<V>>> {
 
         TimestampedWindowStoreType() {
             super(Set.of(
-                TimestampedWindowStore.class,
                 ReadOnlyWindowStore.class));
         }
 
+        @Override
+        public boolean accepts(final StateStore stateStore) {
+            return super.accepts(stateStore) &&
+                (stateStore instanceof TimestampedWindowStore || stateStore 
instanceof TimestampedWindowStoreWithHeaders);
+        }
+
         @Override
         public ReadOnlyWindowStore<K, ValueAndTimestamp<V>> create(final 
StateStoreProvider storeProvider,
                                                                    final 
String storeName) {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProvider.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProvider.java
index 6620401cbcf..5285b974759 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProvider.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProvider.java
@@ -22,7 +22,9 @@ import org.apache.kafka.streams.state.QueryableStoreType;
 import org.apache.kafka.streams.state.QueryableStoreTypes;
 import org.apache.kafka.streams.state.SessionStoreWithHeaders;
 import org.apache.kafka.streams.state.TimestampedKeyValueStore;
+import org.apache.kafka.streams.state.TimestampedKeyValueStoreWithHeaders;
 import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.TimestampedWindowStoreWithHeaders;
 
 import java.util.Collections;
 import java.util.List;
@@ -45,8 +47,26 @@ public class GlobalStateStoreProvider implements 
StateStoreProvider {
         if (!store.isOpen()) {
             throw new InvalidStateStoreException("the state store, " + 
storeName + ", is not open.");
         }
-        if (store instanceof TimestampedKeyValueStore && queryableStoreType 
instanceof QueryableStoreTypes.KeyValueStoreType) {
+        if (store instanceof TimestampedKeyValueStoreWithHeaders) {
+            if (queryableStoreType instanceof 
QueryableStoreTypes.KeyValueStoreType) {
+                return (List<T>) Collections.singletonList(new 
GenericReadOnlyKeyValueStoreFacade<>((TimestampedKeyValueStoreWithHeaders<Object,
 Object>) store, ValueConverters.extractValueFromHeaders()));
+            } else if (queryableStoreType instanceof 
QueryableStoreTypes.TimestampedKeyValueStoreType) {
+                return (List<T>) Collections.singletonList(new 
GenericReadOnlyKeyValueStoreFacade<>((TimestampedKeyValueStoreWithHeaders<Object,
 Object>) store, ValueConverters.extractValueAndTimestampFromHeaders()));
+            } else {
+                // For custom query types, return the raw store so they can 
access headers directly
+                return (List<T>) Collections.singletonList(store);
+            }
+        } else if (store instanceof TimestampedKeyValueStore && 
queryableStoreType instanceof QueryableStoreTypes.KeyValueStoreType) {
             return (List<T>) Collections.singletonList(new 
GenericReadOnlyKeyValueStoreFacade<>((TimestampedKeyValueStore<Object, Object>) 
store, ValueConverters.extractValue()));
+        } else if (store instanceof TimestampedWindowStoreWithHeaders) {
+            if (queryableStoreType instanceof 
QueryableStoreTypes.WindowStoreType) {
+                return (List<T>) Collections.singletonList(new 
GenericReadOnlyWindowStoreFacade<>((TimestampedWindowStoreWithHeaders<Object, 
Object>) store, ValueConverters.extractValueFromHeaders()));
+            } else if (queryableStoreType instanceof 
QueryableStoreTypes.TimestampedWindowStoreType) {
+                return (List<T>) Collections.singletonList(new 
GenericReadOnlyWindowStoreFacade<>((TimestampedWindowStoreWithHeaders<Object, 
Object>) store, ValueConverters.extractValueAndTimestampFromHeaders()));
+            } else {
+                // For custom query types, return the raw store so they can 
access headers directly
+                return (List<T>) Collections.singletonList(store);
+            }
         } else if (store instanceof TimestampedWindowStore && 
queryableStoreType instanceof QueryableStoreTypes.WindowStoreType) {
             return (List<T>) Collections.singletonList(new 
GenericReadOnlyWindowStoreFacade<>((TimestampedWindowStore<Object, Object>) 
store, ValueConverters.extractValue()));
         } else if (store instanceof SessionStoreWithHeaders && 
queryableStoreType instanceof QueryableStoreTypes.SessionStoreType) {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java
index 8a7418fec8a..a429ea0143a 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java
@@ -27,7 +27,9 @@ import org.apache.kafka.streams.state.QueryableStoreType;
 import org.apache.kafka.streams.state.QueryableStoreTypes;
 import org.apache.kafka.streams.state.SessionStoreWithHeaders;
 import org.apache.kafka.streams.state.TimestampedKeyValueStore;
+import org.apache.kafka.streams.state.TimestampedKeyValueStoreWithHeaders;
 import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.TimestampedWindowStoreWithHeaders;
 
 import java.util.ArrayList;
 import java.util.Collection;
@@ -104,8 +106,26 @@ public class StreamThreadStateStoreProvider {
                             " because the store is not open. " +
                             "The state store may have migrated to another 
instance.");
             }
-            if (store instanceof TimestampedKeyValueStore && 
queryableStoreType instanceof QueryableStoreTypes.KeyValueStoreType) {
+            if (store instanceof TimestampedKeyValueStoreWithHeaders) {
+                if (queryableStoreType instanceof 
QueryableStoreTypes.KeyValueStoreType) {
+                    return (T) new 
GenericReadOnlyKeyValueStoreFacade<>((TimestampedKeyValueStoreWithHeaders<Object,
 Object>) store, ValueConverters.extractValueFromHeaders());
+                } else if (queryableStoreType instanceof 
QueryableStoreTypes.TimestampedKeyValueStoreType) {
+                    return (T) new 
GenericReadOnlyKeyValueStoreFacade<>((TimestampedKeyValueStoreWithHeaders<Object,
 Object>) store, ValueConverters.extractValueAndTimestampFromHeaders());
+                } else {
+                    // For custom query types, return the raw store so they 
can access headers directly
+                    return (T) store;
+                }
+            } else if (store instanceof TimestampedKeyValueStore && 
queryableStoreType instanceof QueryableStoreTypes.KeyValueStoreType) {
                 return (T) new 
GenericReadOnlyKeyValueStoreFacade<>((TimestampedKeyValueStore<Object, Object>) 
store, ValueConverters.extractValue());
+            } else if (store instanceof TimestampedWindowStoreWithHeaders) {
+                if (queryableStoreType instanceof 
QueryableStoreTypes.WindowStoreType) {
+                    return (T) new 
GenericReadOnlyWindowStoreFacade<>((TimestampedWindowStoreWithHeaders<Object, 
Object>) store, ValueConverters.extractValueFromHeaders());
+                } else if (queryableStoreType instanceof 
QueryableStoreTypes.TimestampedWindowStoreType) {
+                    return (T) new 
GenericReadOnlyWindowStoreFacade<>((TimestampedWindowStoreWithHeaders<Object, 
Object>) store, ValueConverters.extractValueAndTimestampFromHeaders());
+                } else {
+                    // For custom query types, return the raw store so they 
can access headers directly
+                    return (T) store;
+                }
             } else if (store instanceof TimestampedWindowStore && 
queryableStoreType instanceof QueryableStoreTypes.WindowStoreType) {
                 return (T) new 
GenericReadOnlyWindowStoreFacade<>((TimestampedWindowStore<Object, Object>) 
store, ValueConverters.extractValue());
             } else if (store instanceof SessionStoreWithHeaders && 
queryableStoreType instanceof QueryableStoreTypes.SessionStoreType) {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueConverters.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueConverters.java
index 3a3ceec2d42..eb6a27f46a0 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueConverters.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueConverters.java
@@ -58,7 +58,7 @@ public final class ValueConverters {
      * @param <V> value type
      * @return converter function that creates ValueAndTimestamp or returns 
null
      */
-    public static <V> Function<ValueTimestampHeaders<V>, ValueAndTimestamp<V>> 
headersToValueAndTimestamp() {
+    public static <V> Function<ValueTimestampHeaders<V>, ValueAndTimestamp<V>> 
extractValueAndTimestampFromHeaders() {
         return vth -> vth == null ? null :
             ValueAndTimestamp.make(vth.value(), vth.timestamp());
     }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowToTimestampedWindowByteStoreAdapter.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowToTimestampedWindowByteStoreAdapter.java
index 90cb126d5de..1261f17a697 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowToTimestampedWindowByteStoreAdapter.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowToTimestampedWindowByteStoreAdapter.java
@@ -36,7 +36,7 @@ import java.util.Map;
 import static 
org.apache.kafka.streams.state.TimestampedBytesStore.convertToTimestampedFormat;
 import static 
org.apache.kafka.streams.state.internals.ValueAndTimestampDeserializer.rawValue;
 
-class WindowToTimestampedWindowByteStoreAdapter implements WindowStore<Bytes, 
byte[]> {
+public class WindowToTimestampedWindowByteStoreAdapter implements 
WindowStore<Bytes, byte[]> {
     final WindowStore<Bytes, byte[]> store;
 
     WindowToTimestampedWindowByteStoreAdapter(final WindowStore<Bytes, byte[]> 
store) {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilConverterTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilConverterTest.java
new file mode 100644
index 00000000000..34bf9979cb6
--- /dev/null
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilConverterTest.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.HeadersBytesStore;
+import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore;
+import org.apache.kafka.streams.state.internals.InMemorySessionStore;
+import org.apache.kafka.streams.state.internals.InMemoryWindowStore;
+import 
org.apache.kafka.streams.state.internals.KeyValueToTimestampedKeyValueByteStoreAdapter;
+import org.apache.kafka.streams.state.internals.MeteredSessionStoreWithHeaders;
+import 
org.apache.kafka.streams.state.internals.MeteredTimestampedKeyValueStore;
+import 
org.apache.kafka.streams.state.internals.MeteredTimestampedKeyValueStoreWithHeaders;
+import org.apache.kafka.streams.state.internals.MeteredTimestampedWindowStore;
+import 
org.apache.kafka.streams.state.internals.MeteredTimestampedWindowStoreWithHeaders;
+import org.apache.kafka.streams.state.internals.PlainToHeadersStoreAdapter;
+import 
org.apache.kafka.streams.state.internals.PlainToHeadersWindowStoreAdapter;
+import org.apache.kafka.streams.state.internals.RecordConverter;
+import org.apache.kafka.streams.state.internals.SessionToHeadersStoreAdapter;
+import 
org.apache.kafka.streams.state.internals.TimestampedToHeadersStoreAdapter;
+import 
org.apache.kafka.streams.state.internals.TimestampedToHeadersWindowStoreAdapter;
+import 
org.apache.kafka.streams.state.internals.WindowToTimestampedWindowByteStoreAdapter;
+import org.apache.kafka.streams.state.internals.WrappedStateStore;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.mockito.junit.jupiter.MockitoSettings;
+import org.mockito.quality.Strictness;
+
+import static 
org.apache.kafka.streams.state.internals.RecordConverters.identity;
+import static 
org.apache.kafka.streams.state.internals.RecordConverters.rawValueToHeadersValue;
+import static 
org.apache.kafka.streams.state.internals.RecordConverters.rawValueToSessionHeadersValue;
+import static 
org.apache.kafka.streams.state.internals.RecordConverters.rawValueToTimestampedValue;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.withSettings;
+
+@ExtendWith(MockitoExtension.class)
+@MockitoSettings(strictness = Strictness.STRICT_STUBS)
+public class StateManagerUtilConverterTest {
+
+    @Test
+    public void 
shouldReturnIdentityConverterForPlainToTimestampedPersistentKeyValueStore() {
+        // persistent plain kv -> ts kv
+        final WrappedStateStore<?, ?, ?> mockWrapper = 
mock(WrappedStateStore.class);
+        final StateStore mockAdapter = 
mock(KeyValueToTimestampedKeyValueByteStoreAdapter.class);
+
+        doReturn(mockAdapter).when(mockWrapper).wrapped();
+
+        final RecordConverter converter = 
StateManagerUtil.converterForStore(mockWrapper);
+
+        assertEquals(identity(), converter);
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void 
shouldReturnIdentityConverterForPlainToTimestampedInMemoryKeyValueStore() {
+        // in memory kv -> ts kv (using InMemoryTimestampedKeyValueStoreMarker)
+        final StateStore mockInnerStore = mock(InMemoryKeyValueStore.class);
+        final WrappedStateStore<?, ?, ?> mockMarker = 
mock(MeteredTimestampedKeyValueStore.class);
+
+        doReturn(mockInnerStore).when(mockMarker).wrapped();
+
+        final RecordConverter converter = 
StateManagerUtil.converterForStore(mockMarker);
+
+        assertEquals(identity(), converter);
+    }
+
+    @Test
+    public void 
shouldReturnIdentityConverterForPlainToHeadersPersistentKeyValueStore() {
+        // persistent plain kv -> headers kv
+        final WrappedStateStore<?, ?, ?> mockWrapper = 
mock(WrappedStateStore.class);
+        final StateStore mockAdapter = mock(PlainToHeadersStoreAdapter.class);
+
+        doReturn(mockAdapter).when(mockWrapper).wrapped();
+
+        final RecordConverter converter = 
StateManagerUtil.converterForStore(mockWrapper);
+
+        assertEquals(identity(), converter);
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void 
shouldReturnIdentityConverterForPlainToHeadersInMemoryKeyValueStore() {
+        // in memory kv -> headers kv (using 
InMemoryTimestampedKeyValueStoreWithHeadersMarker)
+        final StateStore mockInnerStore = mock(InMemoryKeyValueStore.class, 
withSettings().extraInterfaces(HeadersBytesStore.class));
+        final WrappedStateStore<?, ?, ?> mockMarker = 
mock(MeteredTimestampedKeyValueStoreWithHeaders.class);
+
+        doReturn(mockInnerStore).when(mockMarker).wrapped();
+
+        final RecordConverter converter = 
StateManagerUtil.converterForStore(mockMarker);
+
+        assertEquals(rawValueToHeadersValue(), converter);
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void 
shouldReturnTimestampedConverterForTimestampedToHeadersPersistentKeyValueStore()
 {
+        // ts kv -> headers kv
+        final WrappedStateStore<?, ?, ?> mockWrapper = 
mock(WrappedStateStore.class);
+        final StateStore mockAdapter = 
mock(TimestampedToHeadersStoreAdapter.class);
+
+        doReturn(mockAdapter).when(mockWrapper).wrapped();
+
+        final RecordConverter converter = 
StateManagerUtil.converterForStore(mockWrapper);
+
+        assertEquals(rawValueToTimestampedValue(), converter);
+    }
+
+    @Test
+    public void 
shouldReturnIdentityConverterForPlainToTimestampedPersistentWindowStore() {
+        // persistent plain window -> ts window
+        final WrappedStateStore<?, ?, ?> mockWrapper = 
mock(WrappedStateStore.class);
+        final StateStore mockAdapter = 
mock(WindowToTimestampedWindowByteStoreAdapter.class);
+
+        doReturn(mockAdapter).when(mockWrapper).wrapped();
+
+        final RecordConverter converter = 
StateManagerUtil.converterForStore(mockWrapper);
+
+        assertEquals(identity(), converter);
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void 
shouldReturnIdentityConverterForPlainToTimestampedInMemoryWindowStore() {
+        // in memory window -> ts window (using 
InMemoryTimestampedWindowStoreMarker)
+        final StateStore mockInnerStore = mock(InMemoryKeyValueStore.class);
+        final WrappedStateStore<?, ?, ?> mockMarker = 
mock(MeteredTimestampedWindowStore.class);
+
+        doReturn(mockInnerStore).when(mockMarker).wrapped();
+
+        final RecordConverter converter = 
StateManagerUtil.converterForStore(mockMarker);
+
+        assertEquals(identity(), converter);
+    }
+
+    @Test
+    public void 
shouldReturnIdentityConverterForPlainToHeadersPersistentWindowStore() {
+        // persistent plain window -> headers window
+        final WrappedStateStore<?, ?, ?> mockWrapper = 
mock(WrappedStateStore.class);
+        final StateStore mockAdapter = 
mock(PlainToHeadersWindowStoreAdapter.class);
+
+        doReturn(mockAdapter).when(mockWrapper).wrapped();
+
+        final RecordConverter converter = 
StateManagerUtil.converterForStore(mockWrapper);
+
+        assertEquals(identity(), converter);
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void 
shouldReturnIdentityConverterForPlainToHeadersInMemoryWindowStore() {
+        // in memory window -> headers window (using 
InMemoryTimestampedWindowStoreWithHeadersMarker)
+        final StateStore mockInnerStore = mock(InMemoryWindowStore.class, 
withSettings().extraInterfaces(HeadersBytesStore.class));
+        final WrappedStateStore<?, ?, ?> mockMarker = 
mock(MeteredTimestampedWindowStoreWithHeaders.class);
+
+        doReturn(mockInnerStore).when(mockMarker).wrapped();
+
+        final RecordConverter converter = 
StateManagerUtil.converterForStore(mockMarker);
+
+        assertEquals(rawValueToHeadersValue(), converter);
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void 
shouldReturnTimestampedConverterForTimestampedToHeadersPersistentWindowStore() {
+        // ts window -> headers window
+        final WrappedStateStore<?, ?, ?> mockWrapper = 
mock(WrappedStateStore.class);
+        final StateStore mockAdapter = 
mock(TimestampedToHeadersWindowStoreAdapter.class);
+
+        doReturn(mockAdapter).when(mockWrapper).wrapped();
+
+        final RecordConverter converter = 
StateManagerUtil.converterForStore(mockWrapper);
+
+        assertEquals(rawValueToTimestampedValue(), converter);
+    }
+
+    @Test
+    public void 
shouldReturnIdentityConverterForPlainToHeadersPersistentSessionStore() {
+        // persistent plain session -> headers session
+        final WrappedStateStore<?, ?, ?> mockWrapper = 
mock(WrappedStateStore.class);
+        final StateStore mockAdapter = 
mock(SessionToHeadersStoreAdapter.class);
+
+        doReturn(mockAdapter).when(mockWrapper).wrapped();
+
+        final RecordConverter converter = 
StateManagerUtil.converterForStore(mockWrapper);
+
+        assertEquals(identity(), converter);
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void 
shouldReturnIdentityConverterForPlainToHeadersInMemorySessionStore() {
+        // in memory session -> headers session (using 
InMemorySessionStoreWithHeadersMarker)
+        final StateStore mockInnerStore = mock(InMemorySessionStore.class, 
withSettings().extraInterfaces(HeadersBytesStore.class));
+        final WrappedStateStore<?, ?, ?> mockMarker = 
mock(MeteredSessionStoreWithHeaders.class);
+
+        doReturn(mockInnerStore).when(mockMarker).wrapped();
+
+        final RecordConverter converter = 
StateManagerUtil.converterForStore(mockMarker);
+
+        assertEquals(rawValueToSessionHeadersValue(), converter);
+    }
+
+}
\ No newline at end of file
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/QueryableStoreTypesWithHeadersTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/QueryableStoreTypesWithHeadersTest.java
new file mode 100644
index 00000000000..e361daae09d
--- /dev/null
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/QueryableStoreTypesWithHeadersTest.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state;
+
+import org.apache.kafka.common.serialization.Serdes;
+
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class QueryableStoreTypesWithHeadersTest {
+
+    @Test
+    public void 
shouldAcceptTimestampedKeyValueStoreWithHeadersForTimestampedKeyValueStoreType()
 {
+        final TimestampedKeyValueStoreWithHeaders<String, String> store =
+            Stores.timestampedKeyValueStoreBuilderWithHeaders(
+                Stores.inMemoryKeyValueStore("test-store"),
+                Serdes.String(),
+                Serdes.String())
+            .build();
+
+        final QueryableStoreType<ReadOnlyKeyValueStore<String, 
ValueAndTimestamp<String>>> storeType =
+            QueryableStoreTypes.timestampedKeyValueStore();
+
+        assertTrue(storeType.accepts(store));
+    }
+
+    @Test
+    public void 
shouldAcceptTimestampedKeyValueStoreWithHeadersForKeyValueStoreType() {
+        final TimestampedKeyValueStoreWithHeaders<String, String> store =
+            Stores.timestampedKeyValueStoreBuilderWithHeaders(
+                Stores.inMemoryKeyValueStore("test-store"),
+                Serdes.String(),
+                Serdes.String())
+            .build();
+
+        final QueryableStoreType<ReadOnlyKeyValueStore<String, String>> 
storeType =
+            QueryableStoreTypes.keyValueStore();
+
+        assertTrue(storeType.accepts(store));
+    }
+
+    @Test
+    public void 
shouldNotAcceptTimestampedKeyValueStoreWithHeadersForWindowStoreType() {
+        final TimestampedKeyValueStoreWithHeaders<String, String> store =
+            Stores.timestampedKeyValueStoreBuilderWithHeaders(
+                Stores.inMemoryKeyValueStore("test-store"),
+                Serdes.String(),
+                Serdes.String())
+            .build();
+
+        final QueryableStoreType<ReadOnlyWindowStore<String, String>> 
storeType =
+            QueryableStoreTypes.windowStore();
+
+        assertFalse(storeType.accepts(store));
+    }
+
+    @Test
+    public void 
shouldAcceptTimestampedWindowStoreWithHeadersForTimestampedWindowStoreType() {
+        final TimestampedWindowStoreWithHeaders<String, String> store =
+            Stores.timestampedWindowStoreWithHeadersBuilder(
+                Stores.inMemoryWindowStore(
+                    "test-window-store",
+                    Duration.ofMillis(100),
+                    Duration.ofMillis(10),
+                    false),
+                Serdes.String(),
+                Serdes.String())
+            .build();
+
+        final QueryableStoreType<ReadOnlyWindowStore<String, 
ValueAndTimestamp<String>>> storeType =
+            QueryableStoreTypes.timestampedWindowStore();
+
+        assertTrue(storeType.accepts(store));
+    }
+
+    @Test
+    public void 
shouldAcceptTimestampedWindowStoreWithHeadersForWindowStoreType() {
+        final TimestampedWindowStoreWithHeaders<String, String> store =
+            Stores.timestampedWindowStoreWithHeadersBuilder(
+                Stores.inMemoryWindowStore(
+                    "test-window-store",
+                    Duration.ofMillis(100),
+                    Duration.ofMillis(10),
+                    false),
+                Serdes.String(),
+                Serdes.String())
+            .build();
+
+        final QueryableStoreType<ReadOnlyWindowStore<String, String>> 
storeType =
+            QueryableStoreTypes.windowStore();
+
+        assertTrue(storeType.accepts(store));
+    }
+
+    @Test
+    public void 
shouldNotAcceptTimestampedWindowStoreWithHeadersForKeyValueStoreType() {
+        final TimestampedWindowStoreWithHeaders<String, String> store =
+            Stores.timestampedWindowStoreWithHeadersBuilder(
+                Stores.inMemoryWindowStore(
+                    "test-window-store",
+                    Duration.ofMillis(100),
+                    Duration.ofMillis(10),
+                    false),
+                Serdes.String(),
+                Serdes.String())
+            .build();
+
+        final QueryableStoreType<ReadOnlyKeyValueStore<String, String>> 
storeType =
+            QueryableStoreTypes.keyValueStore();
+
+        assertFalse(storeType.accepts(store));
+    }
+
+    @Test
+    public void 
shouldAcceptRegularTimestampedKeyValueStoreForTimestampedKeyValueStoreType() {
+        final TimestampedKeyValueStore<String, String> store =
+            Stores.timestampedKeyValueStoreBuilder(
+                Stores.inMemoryKeyValueStore("test-ts-store"),
+                Serdes.String(),
+                Serdes.String())
+            .build();
+
+        final QueryableStoreType<ReadOnlyKeyValueStore<String, 
ValueAndTimestamp<String>>> storeType =
+            QueryableStoreTypes.timestampedKeyValueStore();
+
+        assertTrue(storeType.accepts(store));
+    }
+
+    @Test
+    public void shouldAcceptRegularKeyValueStoreForKeyValueStoreType() {
+        final KeyValueStore<String, String> store =
+            Stores.keyValueStoreBuilder(
+                    Stores.inMemoryKeyValueStore("test-store"),
+                    Serdes.String(),
+                    Serdes.String())
+                .build();
+
+        final QueryableStoreType<ReadOnlyKeyValueStore<String, String>> 
storeType =
+            QueryableStoreTypes.keyValueStore();
+
+        assertTrue(storeType.accepts(store));
+    }
+
+    @Test
+    public void 
shouldAcceptRegularTimestampedWindowStoreForTimestampedWindowStoreType() {
+        final TimestampedWindowStore<String, String> store =
+            Stores.timestampedWindowStoreBuilder(
+                Stores.inMemoryWindowStore(
+                    "test-ts-window-store",
+                    Duration.ofMillis(100),
+                    Duration.ofMillis(10),
+                    false),
+                Serdes.String(),
+                Serdes.String())
+            .build();
+
+        final QueryableStoreType<ReadOnlyWindowStore<String, 
ValueAndTimestamp<String>>> storeType =
+            QueryableStoreTypes.timestampedWindowStore();
+
+        assertTrue(storeType.accepts(store));
+    }
+
+    @Test
+    public void shouldAcceptRegularWindowStoreForWindowStoreType() {
+        final WindowStore<String, String> store =
+            Stores.windowStoreBuilder(
+                    Stores.inMemoryWindowStore(
+                        "test-window-store",
+                        Duration.ofMillis(100),
+                        Duration.ofMillis(10),
+                        false),
+                    Serdes.String(),
+                    Serdes.String())
+                .build();
+
+        final QueryableStoreType<ReadOnlyWindowStore<String, String>> 
storeType =
+            QueryableStoreTypes.windowStore();
+
+        assertTrue(storeType.accepts(store));
+    }
+
+    @Test
+    public void shouldAcceptSessionStoreWithHeadersForSessionStoreType() {
+        final SessionStoreWithHeaders<String, String> store =
+            Stores.sessionStoreBuilderWithHeaders(
+                Stores.inMemorySessionStore(
+                    "test-session-store",
+                    Duration.ofMillis(100)),
+                Serdes.String(),
+                Serdes.String())
+            .build();
+
+        final QueryableStoreType<ReadOnlySessionStore<String, 
ValueAndTimestamp<String>>> storeType =
+            QueryableStoreTypes.sessionStore();
+
+        assertTrue(storeType.accepts(store));
+    }
+
+    @Test
+    public void shouldAcceptRegularSessionStoreForSessionStoreType() {
+        final SessionStore<String, String> store =
+            Stores.sessionStoreBuilder(
+                Stores.inMemorySessionStore(
+                    "test-session-store",
+                    Duration.ofMillis(100)),
+                Serdes.String(),
+                Serdes.String())
+            .build();
+
+        final QueryableStoreType<ReadOnlySessionStore<String, String>> 
storeType =
+            QueryableStoreTypes.sessionStore();
+
+        assertTrue(storeType.accepts(store));
+    }
+}
\ No newline at end of file
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/GenericReadOnlyKeyValueStoreFacadeTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/GenericReadOnlyKeyValueStoreFacadeTest.java
index c569ca20eaa..e6611c2e651 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/GenericReadOnlyKeyValueStoreFacadeTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/GenericReadOnlyKeyValueStoreFacadeTest.java
@@ -84,7 +84,7 @@ public class GenericReadOnlyKeyValueStoreFacadeTest {
     @Test
     public void shouldConvertValueToValueAndTimestamp() {
         final Function<ValueTimestampHeaders<String>, 
ValueAndTimestamp<String>> converter =
-            ValueConverters.headersToValueAndTimestamp();
+            ValueConverters.extractValueAndTimestampFromHeaders();
         final GenericReadOnlyKeyValueStoreFacade<String, 
ValueTimestampHeaders<String>, ValueAndTimestamp<String>> facade =
             new GenericReadOnlyKeyValueStoreFacade<>(mockedHeadersStore, 
converter);
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/GenericReadOnlyWindowStoreFacadeTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/GenericReadOnlyWindowStoreFacadeTest.java
index 86a4f3e5543..c5f1aaf9b37 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/GenericReadOnlyWindowStoreFacadeTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/GenericReadOnlyWindowStoreFacadeTest.java
@@ -91,7 +91,7 @@ public class GenericReadOnlyWindowStoreFacadeTest {
     @Test
     public void shouldConvertToValueAndTimestamp() {
         final Function<ValueTimestampHeaders<String>, 
ValueAndTimestamp<String>> converter =
-            ValueConverters.headersToValueAndTimestamp();
+            ValueConverters.extractValueAndTimestampFromHeaders();
         final GenericReadOnlyWindowStoreFacade<String, 
ValueTimestampHeaders<String>, ValueAndTimestamp<String>> facade =
             new GenericReadOnlyWindowStoreFacade<>(mockedHeadersStore, 
converter);
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProviderTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProviderTest.java
index 0480fbc0721..cd8692137dd 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProviderTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProviderTest.java
@@ -31,7 +31,9 @@ import org.apache.kafka.streams.state.ReadOnlySessionStore;
 import org.apache.kafka.streams.state.ReadOnlyWindowStore;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.state.TimestampedKeyValueStore;
+import org.apache.kafka.streams.state.TimestampedKeyValueStoreWithHeaders;
 import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.TimestampedWindowStoreWithHeaders;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
 import org.apache.kafka.test.NoOpReadOnlyStore;
 
@@ -107,6 +109,22 @@ public class GlobalStateStoreProviderTest {
                     Duration.ofMillis(10L)),
                 Serdes.String(),
                 Serdes.String()).build());
+        stores.put(
+            "ts-kv-store-with-headers",
+            Stores.timestampedKeyValueStoreBuilderWithHeaders(
+                Stores.inMemoryKeyValueStore("ts-kv-store-with-headers"),
+                Serdes.String(),
+                Serdes.String()).build());
+        stores.put(
+            "ts-w-store-with-headers",
+            Stores.timestampedWindowStoreWithHeadersBuilder(
+                Stores.inMemoryWindowStore(
+                    "ts-w-store-with-headers",
+                    Duration.ofMillis(10L),
+                    Duration.ofMillis(2L),
+                    false),
+                Serdes.String(),
+                Serdes.String()).build());
 
         final InternalProcessorContext<?, ?> mockContext = 
mock(InternalProcessorContext.class);
         when(mockContext.applicationId()).thenReturn("appId");
@@ -234,4 +252,57 @@ public class GlobalStateStoreProviderTest {
             assertThat(store, instanceOf(ReadOnlySessionStore.class));
         }
     }
+
+    @Test
+    public void 
shouldReturnKeyValueStoreWithHeadersFacadeForHeadersAwareStore() {
+        final GlobalStateStoreProvider provider = new 
GlobalStateStoreProvider(stores);
+        final List<ReadOnlyKeyValueStore<String, String>> stores =
+            provider.stores("ts-kv-store-with-headers", 
QueryableStoreTypes.keyValueStore());
+        assertEquals(1, stores.size());
+        for (final ReadOnlyKeyValueStore<String, String> store : stores) {
+            assertThat(store, 
instanceOf(GenericReadOnlyKeyValueStoreFacade.class));
+            assertThat(store, not(instanceOf(TimestampedKeyValueStore.class)));
+            assertThat(store, 
not(instanceOf(TimestampedKeyValueStoreWithHeaders.class)));
+        }
+    }
+
+    @Test
+    public void 
shouldReturnTimestampedKeyValueStoreWithHeadersFacadeForHeadersAwareStore() {
+        final GlobalStateStoreProvider provider = new 
GlobalStateStoreProvider(stores);
+        final List<ReadOnlyKeyValueStore<String, ValueAndTimestamp<String>>> 
stores =
+            provider.stores("ts-kv-store-with-headers", 
QueryableStoreTypes.timestampedKeyValueStore());
+        assertEquals(1, stores.size());
+        for (final ReadOnlyKeyValueStore<String, ValueAndTimestamp<String>> 
store : stores) {
+            assertThat(store, 
instanceOf(GenericReadOnlyKeyValueStoreFacade.class));
+            assertThat(store, not(instanceOf(TimestampedKeyValueStore.class)));
+            assertThat(store, 
not(instanceOf(TimestampedKeyValueStoreWithHeaders.class)));
+        }
+    }
+
+    @Test
+    public void 
shouldReturnWindowStoreWithHeadersFacadeForHeadersAwareWindowStore() {
+        final GlobalStateStoreProvider provider = new 
GlobalStateStoreProvider(stores);
+        final List<ReadOnlyWindowStore<String, String>> stores =
+            provider.stores("ts-w-store-with-headers", 
QueryableStoreTypes.windowStore());
+        assertEquals(1, stores.size());
+        for (final ReadOnlyWindowStore<String, String> store : stores) {
+            assertThat(store, 
instanceOf(GenericReadOnlyWindowStoreFacade.class));
+            assertThat(store, not(instanceOf(TimestampedWindowStore.class)));
+            assertThat(store, 
not(instanceOf(TimestampedWindowStoreWithHeaders.class)));
+        }
+    }
+
+    @Test
+    public void 
shouldReturnTimestampedWindowStoreWithHeadersFacadeForHeadersAwareWindowStore() 
{
+        final GlobalStateStoreProvider provider = new 
GlobalStateStoreProvider(stores);
+        final List<ReadOnlyWindowStore<String, ValueAndTimestamp<String>>> 
stores =
+            provider.stores("ts-w-store-with-headers", 
QueryableStoreTypes.timestampedWindowStore());
+        assertEquals(1, stores.size());
+        for (final ReadOnlyWindowStore<String, ValueAndTimestamp<String>> 
store : stores) {
+            assertThat(store, instanceOf(ReadOnlyWindowStore.class));
+            assertThat(store, 
instanceOf(GenericReadOnlyWindowStoreFacade.class));
+            assertThat(store, not(instanceOf(TimestampedWindowStore.class)));
+            assertThat(store, 
not(instanceOf(TimestampedWindowStoreWithHeaders.class)));
+        }
+    }
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
index 8a8e892df00..180cd121283 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
@@ -58,7 +58,9 @@ import org.apache.kafka.streams.state.ReadOnlySessionStore;
 import org.apache.kafka.streams.state.ReadOnlyWindowStore;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.state.TimestampedKeyValueStore;
+import org.apache.kafka.streams.state.TimestampedKeyValueStoreWithHeaders;
 import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.TimestampedWindowStoreWithHeaders;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
 import org.apache.kafka.test.MockApiProcessorSupplier;
 import org.apache.kafka.test.TestUtils;
@@ -152,6 +154,22 @@ public class StreamThreadStateStoreProviderTest {
                 Serdes.String(),
                 Serdes.String()),
             "the-processor");
+        topology.addStateStore(
+            Stores.timestampedKeyValueStoreBuilderWithHeaders(
+                
Stores.inMemoryKeyValueStore("timestamped-kv-store-with-headers"),
+                Serdes.String(),
+                Serdes.String()),
+            "the-processor");
+        topology.addStateStore(
+            Stores.timestampedWindowStoreWithHeadersBuilder(
+                Stores.inMemoryWindowStore(
+                    "timestamped-window-store-with-headers",
+                    Duration.ofMillis(10L),
+                    Duration.ofMillis(2L),
+                    false),
+                Serdes.String(),
+                Serdes.String()),
+            "the-processor");
 
         final Properties properties = new Properties();
         final String applicationId = "applicationId";
@@ -413,6 +431,62 @@ public class StreamThreadStateStoreProviderTest {
                 QueryableStoreTypes.keyValueStore())));
     }
 
+    @Test
+    public void shouldFindTimestampedKeyValueStoresWithHeaders() {
+        mockThread(true);
+        final List<ReadOnlyKeyValueStore<String, ValueAndTimestamp<String>>> 
stores =
+            
provider.stores(StoreQueryParameters.fromNameAndType("timestamped-kv-store-with-headers",
+                QueryableStoreTypes.timestampedKeyValueStore()));
+        assertEquals(2, stores.size());
+        for (final ReadOnlyKeyValueStore<String, ValueAndTimestamp<String>> 
store : stores) {
+            assertThat(store, 
instanceOf(GenericReadOnlyKeyValueStoreFacade.class));
+            assertThat(store, not(instanceOf(TimestampedKeyValueStore.class)));
+            assertThat(store, 
not(instanceOf(TimestampedKeyValueStoreWithHeaders.class)));
+        }
+    }
+
+    @Test
+    public void 
shouldFindTimestampedKeyValueStoresWithHeadersAsKeyValueStores() {
+        mockThread(true);
+        final List<ReadOnlyKeyValueStore<String, String>> stores =
+            
provider.stores(StoreQueryParameters.fromNameAndType("timestamped-kv-store-with-headers",
+                QueryableStoreTypes.keyValueStore()));
+        assertEquals(2, stores.size());
+        for (final ReadOnlyKeyValueStore<String, String> store : stores) {
+            assertThat(store, 
instanceOf(GenericReadOnlyKeyValueStoreFacade.class));
+            assertThat(store, not(instanceOf(TimestampedKeyValueStore.class)));
+            assertThat(store, 
not(instanceOf(TimestampedKeyValueStoreWithHeaders.class)));
+        }
+    }
+
+    @Test
+    public void shouldFindTimestampedWindowStoresWithHeaders() {
+        mockThread(true);
+        final List<ReadOnlyWindowStore<String, ValueAndTimestamp<String>>> 
stores =
+            
provider.stores(StoreQueryParameters.fromNameAndType("timestamped-window-store-with-headers",
+                QueryableStoreTypes.timestampedWindowStore()));
+        assertEquals(2, stores.size());
+        for (final ReadOnlyWindowStore<String, ValueAndTimestamp<String>> 
store : stores) {
+            assertThat(store, 
instanceOf(GenericReadOnlyWindowStoreFacade.class));
+            assertThat(store, not(instanceOf(TimestampedWindowStore.class)));
+            assertThat(store, 
not(instanceOf(TimestampedWindowStoreWithHeaders.class)));
+        }
+    }
+
+    @Test
+    public void shouldFindTimestampedWindowStoresWithHeadersAsWindowStores() {
+        mockThread(true);
+        final List<ReadOnlyWindowStore<String, String>> stores =
+            
provider.stores(StoreQueryParameters.fromNameAndType("timestamped-window-store-with-headers",
+                QueryableStoreTypes.windowStore()));
+        assertEquals(2, stores.size());
+        for (final ReadOnlyWindowStore<String, String> store : stores) {
+            assertThat(store, 
instanceOf(GenericReadOnlyWindowStoreFacade.class));
+            assertThat(store, not(instanceOf(TimestampedWindowStore.class)));
+            assertThat(store, 
not(instanceOf(TimestampedWindowStoreWithHeaders.class)));
+        }
+    }
+
     private StreamTask createStreamsTask(final StreamsConfig streamsConfig,
                                          final Consumer<byte[], byte[]> 
consumer,
                                          final Producer<byte[], byte[]> 
producer,
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ValueConvertersTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ValueConvertersTest.java
index fd8093f2933..5c6fd58a56c 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ValueConvertersTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ValueConvertersTest.java
@@ -63,7 +63,7 @@ public class ValueConvertersTest {
     @Test
     public void headersToValueAndTimestampShouldConvertCorrectly() {
         final Function<ValueTimestampHeaders<String>, 
ValueAndTimestamp<String>> converter =
-            ValueConverters.headersToValueAndTimestamp();
+            ValueConverters.extractValueAndTimestampFromHeaders();
 
         final ValueTimestampHeaders<String> vth = 
ValueTimestampHeaders.make("value", 42L, new RecordHeaders());
         final ValueAndTimestamp<String> result = converter.apply(vth);
@@ -75,7 +75,7 @@ public class ValueConvertersTest {
     @Test
     public void headersToValueAndTimestampShouldReturnNullWhenInputIsNull() {
         final Function<ValueTimestampHeaders<String>, 
ValueAndTimestamp<String>> converter =
-            ValueConverters.headersToValueAndTimestamp();
+            ValueConverters.extractValueAndTimestampFromHeaders();
 
         assertNull(converter.apply(null));
     }
@@ -83,7 +83,7 @@ public class ValueConvertersTest {
     @Test
     public void headersToValueAndTimestampShouldDiscardHeaders() {
         final Function<ValueTimestampHeaders<String>, 
ValueAndTimestamp<String>> converter =
-            ValueConverters.headersToValueAndTimestamp();
+            ValueConverters.extractValueAndTimestampFromHeaders();
 
         final RecordHeaders headers = new RecordHeaders();
         headers.add("key1", "value1".getBytes());
diff --git 
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
 
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index 3db553a6c6f..9f927f5893a 100644
--- 
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++ 
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -1639,7 +1639,7 @@ public class TopologyTestDriver implements Closeable {
         private final TimestampedWindowStoreWithHeaders<K, V> inner;
 
         public TimestampedWindowStoreFacadeForHeaders(final 
TimestampedWindowStoreWithHeaders<K, V> store) {
-            super(store, ValueConverters.headersToValueAndTimestamp());
+            super(store, 
ValueConverters.extractValueAndTimestampFromHeaders());
             this.inner = store;
         }
 
@@ -1943,7 +1943,7 @@ public class TopologyTestDriver implements Closeable {
         private final TimestampedKeyValueStoreWithHeaders<K, V> inner;
 
         public TimestampedKeyValueStoreFacadeForHeaders(final 
TimestampedKeyValueStoreWithHeaders<K, V> store) {
-            super(store, ValueConverters.headersToValueAndTimestamp());
+            super(store, 
ValueConverters.extractValueAndTimestampFromHeaders());
             this.inner = store;
         }
 

Reply via email to