This is an automated email from the ASF dual-hosted git repository.
frankvicky pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new a2efacd44ef KAFKA-20307: Add Interactive Queries (IQv1) support for
headers-aware stores (#21744)
a2efacd44ef is described below
commit a2efacd44ef298c5725bf012fcd10259b9d12518
Author: Alieh Saeedi <[email protected]>
AuthorDate: Mon Mar 16 14:22:47 2026 +0100
KAFKA-20307: Add Interactive Queries (IQv1) support for headers-aware
stores (#21744)
This PR adds backward compatibility support for the new headers-aware
state stores (TimestampedKeyValueStoreWithHeaders and
TimestampedWindowStoreWithHeaders) when queried through the
Interactive Queries v1 (IQv1) API. Users can now query headers-aware
stores using the existing QueryableStoreTypes API without any code
changes.
Reviewers: Matthias J. Sax <[email protected]>
---
.../HeadersStoreUpgradeIntegrationTest.java | 92 +++++---
.../internals/AbstractReadOnlyDecorator.java | 11 +
.../processor/internals/StateManagerUtil.java | 29 +++
.../kafka/streams/state/QueryableStoreTypes.java | 18 +-
.../state/internals/GlobalStateStoreProvider.java | 22 +-
.../internals/StreamThreadStateStoreProvider.java | 22 +-
.../streams/state/internals/ValueConverters.java | 2 +-
.../WindowToTimestampedWindowByteStoreAdapter.java | 2 +-
.../internals/StateManagerUtilConverterTest.java | 221 +++++++++++++++++++
.../state/QueryableStoreTypesWithHeadersTest.java | 233 +++++++++++++++++++++
.../GenericReadOnlyKeyValueStoreFacadeTest.java | 2 +-
.../GenericReadOnlyWindowStoreFacadeTest.java | 2 +-
.../internals/GlobalStateStoreProviderTest.java | 71 +++++++
.../StreamThreadStateStoreProviderTest.java | 74 +++++++
.../state/internals/ValueConvertersTest.java | 6 +-
.../apache/kafka/streams/TopologyTestDriver.java | 4 +-
16 files changed, 764 insertions(+), 47 deletions(-)
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HeadersStoreUpgradeIntegrationTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HeadersStoreUpgradeIntegrationTest.java
index 6470f3942bb..f9614f5909a 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HeadersStoreUpgradeIntegrationTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HeadersStoreUpgradeIntegrationTest.java
@@ -37,6 +37,7 @@ import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.state.AggregationWithHeaders;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.QueryableStoreType;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.streams.state.ReadOnlySessionStore;
@@ -51,6 +52,9 @@ import
org.apache.kafka.streams.state.TimestampedWindowStoreWithHeaders;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.ValueTimestampHeaders;
import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.state.internals.CompositeReadOnlyKeyValueStore;
+import org.apache.kafka.streams.state.internals.CompositeReadOnlyWindowStore;
+import org.apache.kafka.streams.state.internals.StateStoreProvider;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterAll;
@@ -479,7 +483,7 @@ public class HeadersStoreUpgradeIntegrationTest {
() -> {
try {
final ReadOnlyKeyValueStore<K, ValueTimestampHeaders<V>>
store = IntegrationTestUtils
- .getStore(STORE_NAME, kafkaStreams,
QueryableStoreTypes.keyValueStore());
+ .getStore(STORE_NAME, kafkaStreams, new
TimestampedKeyValueStoreWithHeadersType<>());
if (store == null)
return false;
@@ -520,7 +524,7 @@ public class HeadersStoreUpgradeIntegrationTest {
() -> {
try {
final ReadOnlyKeyValueStore<K, ValueTimestampHeaders<V>>
store = IntegrationTestUtils
- .getStore(STORE_NAME, kafkaStreams,
QueryableStoreTypes.keyValueStore());
+ .getStore(STORE_NAME, kafkaStreams, new
TimestampedKeyValueStoreWithHeadersType<>());
if (store == null)
return false;
@@ -546,7 +550,7 @@ public class HeadersStoreUpgradeIntegrationTest {
() -> {
try {
final ReadOnlyKeyValueStore<K, ValueTimestampHeaders<V>>
store = IntegrationTestUtils
- .getStore(STORE_NAME, kafkaStreams,
QueryableStoreTypes.keyValueStore());
+ .getStore(STORE_NAME, kafkaStreams, new
TimestampedKeyValueStoreWithHeadersType<>());
if (store == null)
return false;
@@ -565,30 +569,6 @@ public class HeadersStoreUpgradeIntegrationTest {
"Could not get expected result in time.");
}
- private <K, V> void verifyLegacyValuesWithEmptyHeaders(final K key,
- final V value)
throws Exception {
- TestUtils.waitForCondition(
- () -> {
- try {
- final ReadOnlyKeyValueStore<K, ValueTimestampHeaders<V>>
store = IntegrationTestUtils
- .getStore(STORE_NAME, kafkaStreams,
QueryableStoreTypes.keyValueStore());
-
- if (store == null)
- return false;
-
- final ValueTimestampHeaders<V> result = store.get(key);
- return result != null
- && result.value().equals(value)
- && result.headers().toArray().length == 0;
- } catch (final Exception swallow) {
- LOG.error("Error while verifying legacy value with empty
headers", swallow);
- return false;
- }
- },
- 60_000L,
- "Could not get expected result in time.");
- }
-
private static class KeyValueProcessor implements Processor<String,
String, Void, Void> {
private KeyValueStore<String, String> store;
@@ -906,7 +886,7 @@ public class HeadersStoreUpgradeIntegrationTest {
TestUtils.waitForCondition(() -> {
try {
final ReadOnlyWindowStore<String,
ValueTimestampHeaders<String>> store =
- IntegrationTestUtils.getStore(WINDOW_STORE_NAME,
kafkaStreams, QueryableStoreTypes.windowStore());
+ IntegrationTestUtils.getStore(WINDOW_STORE_NAME,
kafkaStreams, new TimestampedWindowStoreWithHeadersType<>());
if (store == null) {
return false;
@@ -963,7 +943,7 @@ public class HeadersStoreUpgradeIntegrationTest {
TestUtils.waitForCondition(() -> {
try {
final ReadOnlyWindowStore<String,
ValueTimestampHeaders<String>> store =
- IntegrationTestUtils.getStore(WINDOW_STORE_NAME,
kafkaStreams, QueryableStoreTypes.windowStore());
+ IntegrationTestUtils.getStore(WINDOW_STORE_NAME,
kafkaStreams, new TimestampedWindowStoreWithHeadersType<>());
if (store == null) {
return false;
@@ -1049,7 +1029,7 @@ public class HeadersStoreUpgradeIntegrationTest {
TestUtils.waitForCondition(() -> {
try {
final ReadOnlyWindowStore<String,
ValueTimestampHeaders<String>> store =
- IntegrationTestUtils.getStore(WINDOW_STORE_NAME,
kafkaStreams, QueryableStoreTypes.windowStore());
+ IntegrationTestUtils.getStore(WINDOW_STORE_NAME,
kafkaStreams, new TimestampedWindowStoreWithHeadersType<>());
if (store == null) {
return false;
@@ -1089,7 +1069,7 @@ public class HeadersStoreUpgradeIntegrationTest {
TestUtils.waitForCondition(() -> {
try {
final ReadOnlyWindowStore<String,
ValueTimestampHeaders<String>> store =
- IntegrationTestUtils.getStore(WINDOW_STORE_NAME,
kafkaStreams, QueryableStoreTypes.windowStore());
+ IntegrationTestUtils.getStore(WINDOW_STORE_NAME,
kafkaStreams, new TimestampedWindowStoreWithHeadersType<>());
if (store == null) {
return false;
@@ -1493,7 +1473,7 @@ public class HeadersStoreUpgradeIntegrationTest {
private boolean windowStoreContainsKey(final String key, final long
timestamp) {
try {
final ReadOnlyWindowStore<String, ValueTimestampHeaders<String>>
store =
- IntegrationTestUtils.getStore(WINDOW_STORE_NAME, kafkaStreams,
QueryableStoreTypes.windowStore());
+ IntegrationTestUtils.getStore(WINDOW_STORE_NAME, kafkaStreams,
new TimestampedWindowStoreWithHeadersType<>());
if (store == null) {
return false;
@@ -2037,4 +2017,52 @@ public class HeadersStoreUpgradeIntegrationTest {
store.put(sessionKey, AggregationWithHeaders.make(record.value(),
record.headers()));
}
}
+
+ // ==================== Custom QueryableStoreTypes ====================
+
+ /**
+ * Custom QueryableStoreType for querying
TimestampedKeyValueStoreWithHeaders directly
+ * without facade wrapping. This returns the full ValueTimestampHeaders
wrapper.
+ */
+ private static class TimestampedKeyValueStoreWithHeadersType<K, V>
+ implements QueryableStoreType<ReadOnlyKeyValueStore<K,
ValueTimestampHeaders<V>>> {
+
+ @Override
+ public boolean accepts(final
org.apache.kafka.streams.processor.StateStore stateStore) {
+ // Accept stores that implement both
TimestampedKeyValueStoreWithHeaders and ReadOnlyKeyValueStore
+ return stateStore instanceof TimestampedKeyValueStoreWithHeaders
+ && stateStore instanceof ReadOnlyKeyValueStore;
+ }
+
+ @Override
+ public ReadOnlyKeyValueStore<K, ValueTimestampHeaders<V>> create(
+ final StateStoreProvider storeProvider,
+ final String storeName) {
+ return new CompositeReadOnlyKeyValueStore<>(storeProvider, this,
storeName);
+ }
+ }
+
+ /**
+ * Custom queryable store type for accessing
TimestampedWindowStoreWithHeaders directly
+ * without facade wrapping. This returns the full ValueTimestampHeaders
wrapper.
+ */
+ private static class TimestampedWindowStoreWithHeadersType<K, V>
+ implements QueryableStoreType<ReadOnlyWindowStore<K,
ValueTimestampHeaders<V>>> {
+
+ @Override
+ public boolean accepts(final
org.apache.kafka.streams.processor.StateStore stateStore) {
+ // Accept stores that implement both
TimestampedWindowStoreWithHeaders and ReadOnlyWindowStore
+ return stateStore instanceof TimestampedWindowStoreWithHeaders
+ && stateStore instanceof ReadOnlyWindowStore;
+ }
+
+ @Override
+ public ReadOnlyWindowStore<K, ValueTimestampHeaders<V>> create(
+ final StateStoreProvider storeProvider,
+ final String storeName) {
+ return new CompositeReadOnlyWindowStore<>(storeProvider, this,
storeName);
+ }
+ }
+
+
}
\ No newline at end of file
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadOnlyDecorator.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadOnlyDecorator.java
index 01fb9d7bf3c..56947f10bda 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadOnlyDecorator.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadOnlyDecorator.java
@@ -26,8 +26,10 @@ import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.SessionStore;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
+import org.apache.kafka.streams.state.TimestampedKeyValueStoreWithHeaders;
import org.apache.kafka.streams.state.TimestampedWindowStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.ValueTimestampHeaders;
import org.apache.kafka.streams.state.VersionedKeyValueStore;
import org.apache.kafka.streams.state.VersionedRecord;
import org.apache.kafka.streams.state.WindowStore;
@@ -163,6 +165,15 @@ abstract class AbstractReadOnlyDecorator<T extends
StateStore, K, V> extends Wra
}
}
+ static class TimestampedKeyValueStoreReadOnlyDecoratorWithHeaders<K, V>
+ extends KeyValueStoreReadOnlyDecorator<K, ValueTimestampHeaders<V>>
+ implements TimestampedKeyValueStoreWithHeaders<K, V> {
+
+ private TimestampedKeyValueStoreReadOnlyDecoratorWithHeaders(final
TimestampedKeyValueStoreWithHeaders<K, V> inner) {
+ super(inner);
+ }
+ }
+
static class VersionedKeyValueStoreReadOnlyDecorator<K, V>
extends AbstractReadOnlyDecorator<VersionedKeyValueStore<K, V>, K, V>
implements VersionedKeyValueStore<K, V> {
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java
index b5e73567330..3d86b5ed15b 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java
@@ -27,7 +27,12 @@ import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.Task.TaskType;
import org.apache.kafka.streams.state.SessionStore;
+import org.apache.kafka.streams.state.internals.PlainToHeadersStoreAdapter;
+import
org.apache.kafka.streams.state.internals.PlainToHeadersWindowStoreAdapter;
import org.apache.kafka.streams.state.internals.RecordConverter;
+import
org.apache.kafka.streams.state.internals.TimestampedToHeadersStoreAdapter;
+import
org.apache.kafka.streams.state.internals.TimestampedToHeadersWindowStoreAdapter;
+import org.apache.kafka.streams.state.internals.WrappedStateStore;
import org.slf4j.Logger;
@@ -54,6 +59,7 @@ final class StateManagerUtil {
private StateManagerUtil() {}
static RecordConverter converterForStore(final StateStore store) {
+ // First check if the top-level store implements HeadersBytesStore or
TimestampedBytesStore
if (isHeadersAware(store)) {
if (store instanceof SessionStore) {
return rawValueToSessionHeadersValue();
@@ -64,6 +70,29 @@ final class StateManagerUtil {
// timestamp is used separately during put() process for restore
of versioned stores
return rawValueToTimestampedValue();
}
+
+ // If top-level check didn't find the type, unwrap to find adapters
+ // This handles persistent stores that use adapters
+ StateStore current = store;
+ while (current != null) {
+ if (current instanceof TimestampedToHeadersStoreAdapter || current
instanceof TimestampedToHeadersWindowStoreAdapter) {
+ // Adapter wraps a timestamped store, so restore in
timestamped format
+ return rawValueToTimestampedValue();
+ } else if (current instanceof PlainToHeadersStoreAdapter ||
current instanceof PlainToHeadersWindowStoreAdapter) {
+ // Adapter wraps a plain store, so restore in plain format
+ return identity();
+ }
+
+ // If not a WrappedStateStore, we've reached the innermost store
+ if (!(current instanceof WrappedStateStore)) {
+ break;
+ }
+
+ // Unwrap one more level
+ current = ((WrappedStateStore<?, ?, ?>) current).wrapped();
+ }
+
+ // Default to identity if no special handling needed
return identity();
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreTypes.java
b/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreTypes.java
index e809d0a8b9e..026df05c515 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreTypes.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreTypes.java
@@ -123,15 +123,20 @@ public final class QueryableStoreTypes {
}
- private static class TimestampedKeyValueStoreType<K, V>
+ public static class TimestampedKeyValueStoreType<K, V>
extends QueryableStoreTypeMatcher<ReadOnlyKeyValueStore<K,
ValueAndTimestamp<V>>> {
TimestampedKeyValueStoreType() {
super(Set.of(
- TimestampedKeyValueStore.class,
ReadOnlyKeyValueStore.class));
}
+ @Override
+ public boolean accepts(final StateStore stateStore) {
+ return super.accepts(stateStore) &&
+ (stateStore instanceof TimestampedKeyValueStore || stateStore
instanceof TimestampedKeyValueStoreWithHeaders);
+ }
+
@Override
public ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> create(final
StateStoreProvider storeProvider,
final
String storeName) {
@@ -152,15 +157,20 @@ public final class QueryableStoreTypes {
}
}
- private static class TimestampedWindowStoreType<K, V>
+ public static class TimestampedWindowStoreType<K, V>
extends QueryableStoreTypeMatcher<ReadOnlyWindowStore<K,
ValueAndTimestamp<V>>> {
TimestampedWindowStoreType() {
super(Set.of(
- TimestampedWindowStore.class,
ReadOnlyWindowStore.class));
}
+ @Override
+ public boolean accepts(final StateStore stateStore) {
+ return super.accepts(stateStore) &&
+ (stateStore instanceof TimestampedWindowStore || stateStore
instanceof TimestampedWindowStoreWithHeaders);
+ }
+
@Override
public ReadOnlyWindowStore<K, ValueAndTimestamp<V>> create(final
StateStoreProvider storeProvider,
final
String storeName) {
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProvider.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProvider.java
index 6620401cbcf..5285b974759 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProvider.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProvider.java
@@ -22,7 +22,9 @@ import org.apache.kafka.streams.state.QueryableStoreType;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.SessionStoreWithHeaders;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
+import org.apache.kafka.streams.state.TimestampedKeyValueStoreWithHeaders;
import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.TimestampedWindowStoreWithHeaders;
import java.util.Collections;
import java.util.List;
@@ -45,8 +47,26 @@ public class GlobalStateStoreProvider implements
StateStoreProvider {
if (!store.isOpen()) {
throw new InvalidStateStoreException("the state store, " +
storeName + ", is not open.");
}
- if (store instanceof TimestampedKeyValueStore && queryableStoreType
instanceof QueryableStoreTypes.KeyValueStoreType) {
+ if (store instanceof TimestampedKeyValueStoreWithHeaders) {
+ if (queryableStoreType instanceof
QueryableStoreTypes.KeyValueStoreType) {
+ return (List<T>) Collections.singletonList(new
GenericReadOnlyKeyValueStoreFacade<>((TimestampedKeyValueStoreWithHeaders<Object,
Object>) store, ValueConverters.extractValueFromHeaders()));
+ } else if (queryableStoreType instanceof
QueryableStoreTypes.TimestampedKeyValueStoreType) {
+ return (List<T>) Collections.singletonList(new
GenericReadOnlyKeyValueStoreFacade<>((TimestampedKeyValueStoreWithHeaders<Object,
Object>) store, ValueConverters.extractValueAndTimestampFromHeaders()));
+ } else {
+ // For custom query types, return the raw store so they can
access headers directly
+ return (List<T>) Collections.singletonList(store);
+ }
+ } else if (store instanceof TimestampedKeyValueStore &&
queryableStoreType instanceof QueryableStoreTypes.KeyValueStoreType) {
return (List<T>) Collections.singletonList(new
GenericReadOnlyKeyValueStoreFacade<>((TimestampedKeyValueStore<Object, Object>)
store, ValueConverters.extractValue()));
+ } else if (store instanceof TimestampedWindowStoreWithHeaders) {
+ if (queryableStoreType instanceof
QueryableStoreTypes.WindowStoreType) {
+ return (List<T>) Collections.singletonList(new
GenericReadOnlyWindowStoreFacade<>((TimestampedWindowStoreWithHeaders<Object,
Object>) store, ValueConverters.extractValueFromHeaders()));
+ } else if (queryableStoreType instanceof
QueryableStoreTypes.TimestampedWindowStoreType) {
+ return (List<T>) Collections.singletonList(new
GenericReadOnlyWindowStoreFacade<>((TimestampedWindowStoreWithHeaders<Object,
Object>) store, ValueConverters.extractValueAndTimestampFromHeaders()));
+ } else {
+ // For custom query types, return the raw store so they can
access headers directly
+ return (List<T>) Collections.singletonList(store);
+ }
} else if (store instanceof TimestampedWindowStore &&
queryableStoreType instanceof QueryableStoreTypes.WindowStoreType) {
return (List<T>) Collections.singletonList(new
GenericReadOnlyWindowStoreFacade<>((TimestampedWindowStore<Object, Object>)
store, ValueConverters.extractValue()));
} else if (store instanceof SessionStoreWithHeaders &&
queryableStoreType instanceof QueryableStoreTypes.SessionStoreType) {
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java
index 8a7418fec8a..a429ea0143a 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java
@@ -27,7 +27,9 @@ import org.apache.kafka.streams.state.QueryableStoreType;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.SessionStoreWithHeaders;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
+import org.apache.kafka.streams.state.TimestampedKeyValueStoreWithHeaders;
import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.TimestampedWindowStoreWithHeaders;
import java.util.ArrayList;
import java.util.Collection;
@@ -104,8 +106,26 @@ public class StreamThreadStateStoreProvider {
" because the store is not open. " +
"The state store may have migrated to another
instance.");
}
- if (store instanceof TimestampedKeyValueStore &&
queryableStoreType instanceof QueryableStoreTypes.KeyValueStoreType) {
+ if (store instanceof TimestampedKeyValueStoreWithHeaders) {
+ if (queryableStoreType instanceof
QueryableStoreTypes.KeyValueStoreType) {
+ return (T) new
GenericReadOnlyKeyValueStoreFacade<>((TimestampedKeyValueStoreWithHeaders<Object,
Object>) store, ValueConverters.extractValueFromHeaders());
+ } else if (queryableStoreType instanceof
QueryableStoreTypes.TimestampedKeyValueStoreType) {
+ return (T) new
GenericReadOnlyKeyValueStoreFacade<>((TimestampedKeyValueStoreWithHeaders<Object,
Object>) store, ValueConverters.extractValueAndTimestampFromHeaders());
+ } else {
+ // For custom query types, return the raw store so they
can access headers directly
+ return (T) store;
+ }
+ } else if (store instanceof TimestampedKeyValueStore &&
queryableStoreType instanceof QueryableStoreTypes.KeyValueStoreType) {
return (T) new
GenericReadOnlyKeyValueStoreFacade<>((TimestampedKeyValueStore<Object, Object>)
store, ValueConverters.extractValue());
+ } else if (store instanceof TimestampedWindowStoreWithHeaders) {
+ if (queryableStoreType instanceof
QueryableStoreTypes.WindowStoreType) {
+ return (T) new
GenericReadOnlyWindowStoreFacade<>((TimestampedWindowStoreWithHeaders<Object,
Object>) store, ValueConverters.extractValueFromHeaders());
+ } else if (queryableStoreType instanceof
QueryableStoreTypes.TimestampedWindowStoreType) {
+ return (T) new
GenericReadOnlyWindowStoreFacade<>((TimestampedWindowStoreWithHeaders<Object,
Object>) store, ValueConverters.extractValueAndTimestampFromHeaders());
+ } else {
+ // For custom query types, return the raw store so they
can access headers directly
+ return (T) store;
+ }
} else if (store instanceof TimestampedWindowStore &&
queryableStoreType instanceof QueryableStoreTypes.WindowStoreType) {
return (T) new
GenericReadOnlyWindowStoreFacade<>((TimestampedWindowStore<Object, Object>)
store, ValueConverters.extractValue());
} else if (store instanceof SessionStoreWithHeaders &&
queryableStoreType instanceof QueryableStoreTypes.SessionStoreType) {
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueConverters.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueConverters.java
index 3a3ceec2d42..eb6a27f46a0 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueConverters.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueConverters.java
@@ -58,7 +58,7 @@ public final class ValueConverters {
* @param <V> value type
* @return converter function that creates ValueAndTimestamp or returns
null
*/
- public static <V> Function<ValueTimestampHeaders<V>, ValueAndTimestamp<V>>
headersToValueAndTimestamp() {
+ public static <V> Function<ValueTimestampHeaders<V>, ValueAndTimestamp<V>>
extractValueAndTimestampFromHeaders() {
return vth -> vth == null ? null :
ValueAndTimestamp.make(vth.value(), vth.timestamp());
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowToTimestampedWindowByteStoreAdapter.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowToTimestampedWindowByteStoreAdapter.java
index 90cb126d5de..1261f17a697 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowToTimestampedWindowByteStoreAdapter.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowToTimestampedWindowByteStoreAdapter.java
@@ -36,7 +36,7 @@ import java.util.Map;
import static
org.apache.kafka.streams.state.TimestampedBytesStore.convertToTimestampedFormat;
import static
org.apache.kafka.streams.state.internals.ValueAndTimestampDeserializer.rawValue;
-class WindowToTimestampedWindowByteStoreAdapter implements WindowStore<Bytes,
byte[]> {
+public class WindowToTimestampedWindowByteStoreAdapter implements
WindowStore<Bytes, byte[]> {
final WindowStore<Bytes, byte[]> store;
WindowToTimestampedWindowByteStoreAdapter(final WindowStore<Bytes, byte[]>
store) {
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilConverterTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilConverterTest.java
new file mode 100644
index 00000000000..34bf9979cb6
--- /dev/null
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilConverterTest.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.HeadersBytesStore;
+import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore;
+import org.apache.kafka.streams.state.internals.InMemorySessionStore;
+import org.apache.kafka.streams.state.internals.InMemoryWindowStore;
+import
org.apache.kafka.streams.state.internals.KeyValueToTimestampedKeyValueByteStoreAdapter;
+import org.apache.kafka.streams.state.internals.MeteredSessionStoreWithHeaders;
+import
org.apache.kafka.streams.state.internals.MeteredTimestampedKeyValueStore;
+import
org.apache.kafka.streams.state.internals.MeteredTimestampedKeyValueStoreWithHeaders;
+import org.apache.kafka.streams.state.internals.MeteredTimestampedWindowStore;
+import
org.apache.kafka.streams.state.internals.MeteredTimestampedWindowStoreWithHeaders;
+import org.apache.kafka.streams.state.internals.PlainToHeadersStoreAdapter;
+import
org.apache.kafka.streams.state.internals.PlainToHeadersWindowStoreAdapter;
+import org.apache.kafka.streams.state.internals.RecordConverter;
+import org.apache.kafka.streams.state.internals.SessionToHeadersStoreAdapter;
+import
org.apache.kafka.streams.state.internals.TimestampedToHeadersStoreAdapter;
+import
org.apache.kafka.streams.state.internals.TimestampedToHeadersWindowStoreAdapter;
+import
org.apache.kafka.streams.state.internals.WindowToTimestampedWindowByteStoreAdapter;
+import org.apache.kafka.streams.state.internals.WrappedStateStore;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.mockito.junit.jupiter.MockitoSettings;
+import org.mockito.quality.Strictness;
+
+import static
org.apache.kafka.streams.state.internals.RecordConverters.identity;
+import static
org.apache.kafka.streams.state.internals.RecordConverters.rawValueToHeadersValue;
+import static
org.apache.kafka.streams.state.internals.RecordConverters.rawValueToSessionHeadersValue;
+import static
org.apache.kafka.streams.state.internals.RecordConverters.rawValueToTimestampedValue;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.withSettings;
+
+@ExtendWith(MockitoExtension.class)
+@MockitoSettings(strictness = Strictness.STRICT_STUBS)
+public class StateManagerUtilConverterTest {
+
+ @Test
+ public void
shouldReturnIdentityConverterForPlainToTimestampedPersistentKeyValueStore() {
+ // persistent plain kv -> ts kv
+ final WrappedStateStore<?, ?, ?> mockWrapper =
mock(WrappedStateStore.class);
+ final StateStore mockAdapter =
mock(KeyValueToTimestampedKeyValueByteStoreAdapter.class);
+
+ doReturn(mockAdapter).when(mockWrapper).wrapped();
+
+ final RecordConverter converter =
StateManagerUtil.converterForStore(mockWrapper);
+
+ assertEquals(identity(), converter);
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void
shouldReturnIdentityConverterForPlainToTimestampedInMemoryKeyValueStore() {
+ // in memory kv -> ts kv (using InMemoryTimestampedKeyValueStoreMarker)
+ final StateStore mockInnerStore = mock(InMemoryKeyValueStore.class);
+ final WrappedStateStore<?, ?, ?> mockMarker =
mock(MeteredTimestampedKeyValueStore.class);
+
+ doReturn(mockInnerStore).when(mockMarker).wrapped();
+
+ final RecordConverter converter =
StateManagerUtil.converterForStore(mockMarker);
+
+ assertEquals(identity(), converter);
+ }
+
+ @Test
+ public void
shouldReturnIdentityConverterForPlainToHeadersPersistentKeyValueStore() {
+ // persistent plain kv -> headers kv
+ final WrappedStateStore<?, ?, ?> mockWrapper =
mock(WrappedStateStore.class);
+ final StateStore mockAdapter = mock(PlainToHeadersStoreAdapter.class);
+
+ doReturn(mockAdapter).when(mockWrapper).wrapped();
+
+ final RecordConverter converter =
StateManagerUtil.converterForStore(mockWrapper);
+
+ assertEquals(identity(), converter);
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void
shouldReturnIdentityConverterForPlainToHeadersInMemoryKeyValueStore() {
+ // in memory kv -> headers kv (using
InMemoryTimestampedKeyValueStoreWithHeadersMarker)
+ final StateStore mockInnerStore = mock(InMemoryKeyValueStore.class,
withSettings().extraInterfaces(HeadersBytesStore.class));
+ final WrappedStateStore<?, ?, ?> mockMarker =
mock(MeteredTimestampedKeyValueStoreWithHeaders.class);
+
+ doReturn(mockInnerStore).when(mockMarker).wrapped();
+
+ final RecordConverter converter =
StateManagerUtil.converterForStore(mockMarker);
+
+ assertEquals(rawValueToHeadersValue(), converter);
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void
shouldReturnTimestampedConverterForTimestampedToHeadersPersistentKeyValueStore()
{
+ // ts kv -> headers kv
+ final WrappedStateStore<?, ?, ?> mockWrapper =
mock(WrappedStateStore.class);
+ final StateStore mockAdapter =
mock(TimestampedToHeadersStoreAdapter.class);
+
+ doReturn(mockAdapter).when(mockWrapper).wrapped();
+
+ final RecordConverter converter =
StateManagerUtil.converterForStore(mockWrapper);
+
+ assertEquals(rawValueToTimestampedValue(), converter);
+ }
+
+ @Test
+ public void
shouldReturnIdentityConverterForPlainToTimestampedPersistentWindowStore() {
+ // persistent plain window -> ts window
+ final WrappedStateStore<?, ?, ?> mockWrapper =
mock(WrappedStateStore.class);
+ final StateStore mockAdapter =
mock(WindowToTimestampedWindowByteStoreAdapter.class);
+
+ doReturn(mockAdapter).when(mockWrapper).wrapped();
+
+ final RecordConverter converter =
StateManagerUtil.converterForStore(mockWrapper);
+
+ assertEquals(identity(), converter);
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void
shouldReturnIdentityConverterForPlainToTimestampedInMemoryWindowStore() {
+ // in memory window -> ts window (using
InMemoryTimestampedWindowStoreMarker)
+ final StateStore mockInnerStore = mock(InMemoryKeyValueStore.class);
+ final WrappedStateStore<?, ?, ?> mockMarker =
mock(MeteredTimestampedWindowStore.class);
+
+ doReturn(mockInnerStore).when(mockMarker).wrapped();
+
+ final RecordConverter converter =
StateManagerUtil.converterForStore(mockMarker);
+
+ assertEquals(identity(), converter);
+ }
+
+ @Test
+ public void
shouldReturnIdentityConverterForPlainToHeadersPersistentWindowStore() {
+ // persistent plain window -> headers window
+ final WrappedStateStore<?, ?, ?> mockWrapper =
mock(WrappedStateStore.class);
+ final StateStore mockAdapter =
mock(PlainToHeadersWindowStoreAdapter.class);
+
+ doReturn(mockAdapter).when(mockWrapper).wrapped();
+
+ final RecordConverter converter =
StateManagerUtil.converterForStore(mockWrapper);
+
+ assertEquals(identity(), converter);
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void
shouldReturnIdentityConverterForPlainToHeadersInMemoryWindowStore() {
+ // in memory window -> headers window (using
InMemoryTimestampedWindowStoreWithHeadersMarker)
+ final StateStore mockInnerStore = mock(InMemoryWindowStore.class,
withSettings().extraInterfaces(HeadersBytesStore.class));
+ final WrappedStateStore<?, ?, ?> mockMarker =
mock(MeteredTimestampedWindowStoreWithHeaders.class);
+
+ doReturn(mockInnerStore).when(mockMarker).wrapped();
+
+ final RecordConverter converter =
StateManagerUtil.converterForStore(mockMarker);
+
+ assertEquals(rawValueToHeadersValue(), converter);
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void
shouldReturnTimestampedConverterForTimestampedToHeadersPersistentWindowStore() {
+ // ts window -> headers window
+ final WrappedStateStore<?, ?, ?> mockWrapper =
mock(WrappedStateStore.class);
+ final StateStore mockAdapter =
mock(TimestampedToHeadersWindowStoreAdapter.class);
+
+ doReturn(mockAdapter).when(mockWrapper).wrapped();
+
+ final RecordConverter converter =
StateManagerUtil.converterForStore(mockWrapper);
+
+ assertEquals(rawValueToTimestampedValue(), converter);
+ }
+
+ @Test
+ public void
shouldReturnIdentityConverterForPlainToHeadersPersistentSessionStore() {
+ // persistent plain session -> headers session
+ final WrappedStateStore<?, ?, ?> mockWrapper =
mock(WrappedStateStore.class);
+ final StateStore mockAdapter =
mock(SessionToHeadersStoreAdapter.class);
+
+ doReturn(mockAdapter).when(mockWrapper).wrapped();
+
+ final RecordConverter converter =
StateManagerUtil.converterForStore(mockWrapper);
+
+ assertEquals(identity(), converter);
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void
shouldReturnIdentityConverterForPlainToHeadersInMemorySessionStore() {
+ // in memory session -> headers session (using
InMemorySessionStoreWithHeadersMarker)
+ final StateStore mockInnerStore = mock(InMemorySessionStore.class,
withSettings().extraInterfaces(HeadersBytesStore.class));
+ final WrappedStateStore<?, ?, ?> mockMarker =
mock(MeteredSessionStoreWithHeaders.class);
+
+ doReturn(mockInnerStore).when(mockMarker).wrapped();
+
+ final RecordConverter converter =
StateManagerUtil.converterForStore(mockMarker);
+
+ assertEquals(rawValueToSessionHeadersValue(), converter);
+ }
+
+}
\ No newline at end of file
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/QueryableStoreTypesWithHeadersTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/QueryableStoreTypesWithHeadersTest.java
new file mode 100644
index 00000000000..e361daae09d
--- /dev/null
+++
b/streams/src/test/java/org/apache/kafka/streams/state/QueryableStoreTypesWithHeadersTest.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state;
+
+import org.apache.kafka.common.serialization.Serdes;
+
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class QueryableStoreTypesWithHeadersTest {
+
+ @Test
+ public void
shouldAcceptTimestampedKeyValueStoreWithHeadersForTimestampedKeyValueStoreType()
{
+ final TimestampedKeyValueStoreWithHeaders<String, String> store =
+ Stores.timestampedKeyValueStoreBuilderWithHeaders(
+ Stores.inMemoryKeyValueStore("test-store"),
+ Serdes.String(),
+ Serdes.String())
+ .build();
+
+ final QueryableStoreType<ReadOnlyKeyValueStore<String,
ValueAndTimestamp<String>>> storeType =
+ QueryableStoreTypes.timestampedKeyValueStore();
+
+ assertTrue(storeType.accepts(store));
+ }
+
+ @Test
+ public void
shouldAcceptTimestampedKeyValueStoreWithHeadersForKeyValueStoreType() {
+ final TimestampedKeyValueStoreWithHeaders<String, String> store =
+ Stores.timestampedKeyValueStoreBuilderWithHeaders(
+ Stores.inMemoryKeyValueStore("test-store"),
+ Serdes.String(),
+ Serdes.String())
+ .build();
+
+ final QueryableStoreType<ReadOnlyKeyValueStore<String, String>>
storeType =
+ QueryableStoreTypes.keyValueStore();
+
+ assertTrue(storeType.accepts(store));
+ }
+
+ @Test
+ public void
shouldNotAcceptTimestampedKeyValueStoreWithHeadersForWindowStoreType() {
+ final TimestampedKeyValueStoreWithHeaders<String, String> store =
+ Stores.timestampedKeyValueStoreBuilderWithHeaders(
+ Stores.inMemoryKeyValueStore("test-store"),
+ Serdes.String(),
+ Serdes.String())
+ .build();
+
+ final QueryableStoreType<ReadOnlyWindowStore<String, String>>
storeType =
+ QueryableStoreTypes.windowStore();
+
+ assertFalse(storeType.accepts(store));
+ }
+
+ @Test
+ public void
shouldAcceptTimestampedWindowStoreWithHeadersForTimestampedWindowStoreType() {
+ final TimestampedWindowStoreWithHeaders<String, String> store =
+ Stores.timestampedWindowStoreWithHeadersBuilder(
+ Stores.inMemoryWindowStore(
+ "test-window-store",
+ Duration.ofMillis(100),
+ Duration.ofMillis(10),
+ false),
+ Serdes.String(),
+ Serdes.String())
+ .build();
+
+ final QueryableStoreType<ReadOnlyWindowStore<String,
ValueAndTimestamp<String>>> storeType =
+ QueryableStoreTypes.timestampedWindowStore();
+
+ assertTrue(storeType.accepts(store));
+ }
+
+ @Test
+ public void
shouldAcceptTimestampedWindowStoreWithHeadersForWindowStoreType() {
+ final TimestampedWindowStoreWithHeaders<String, String> store =
+ Stores.timestampedWindowStoreWithHeadersBuilder(
+ Stores.inMemoryWindowStore(
+ "test-window-store",
+ Duration.ofMillis(100),
+ Duration.ofMillis(10),
+ false),
+ Serdes.String(),
+ Serdes.String())
+ .build();
+
+ final QueryableStoreType<ReadOnlyWindowStore<String, String>>
storeType =
+ QueryableStoreTypes.windowStore();
+
+ assertTrue(storeType.accepts(store));
+ }
+
+ @Test
+ public void
shouldNotAcceptTimestampedWindowStoreWithHeadersForKeyValueStoreType() {
+ final TimestampedWindowStoreWithHeaders<String, String> store =
+ Stores.timestampedWindowStoreWithHeadersBuilder(
+ Stores.inMemoryWindowStore(
+ "test-window-store",
+ Duration.ofMillis(100),
+ Duration.ofMillis(10),
+ false),
+ Serdes.String(),
+ Serdes.String())
+ .build();
+
+ final QueryableStoreType<ReadOnlyKeyValueStore<String, String>>
storeType =
+ QueryableStoreTypes.keyValueStore();
+
+ assertFalse(storeType.accepts(store));
+ }
+
+ @Test
+ public void
shouldAcceptRegularTimestampedKeyValueStoreForTimestampedKeyValueStoreType() {
+ final TimestampedKeyValueStore<String, String> store =
+ Stores.timestampedKeyValueStoreBuilder(
+ Stores.inMemoryKeyValueStore("test-ts-store"),
+ Serdes.String(),
+ Serdes.String())
+ .build();
+
+ final QueryableStoreType<ReadOnlyKeyValueStore<String,
ValueAndTimestamp<String>>> storeType =
+ QueryableStoreTypes.timestampedKeyValueStore();
+
+ assertTrue(storeType.accepts(store));
+ }
+
+ @Test
+ public void shouldAcceptRegularKeyValueStoreForKeyValueStoreType() {
+ final KeyValueStore<String, String> store =
+ Stores.keyValueStoreBuilder(
+ Stores.inMemoryKeyValueStore("test-store"),
+ Serdes.String(),
+ Serdes.String())
+ .build();
+
+ final QueryableStoreType<ReadOnlyKeyValueStore<String, String>>
storeType =
+ QueryableStoreTypes.keyValueStore();
+
+ assertTrue(storeType.accepts(store));
+ }
+
+ @Test
+ public void
shouldAcceptRegularTimestampedWindowStoreForTimestampedWindowStoreType() {
+ final TimestampedWindowStore<String, String> store =
+ Stores.timestampedWindowStoreBuilder(
+ Stores.inMemoryWindowStore(
+ "test-ts-window-store",
+ Duration.ofMillis(100),
+ Duration.ofMillis(10),
+ false),
+ Serdes.String(),
+ Serdes.String())
+ .build();
+
+ final QueryableStoreType<ReadOnlyWindowStore<String,
ValueAndTimestamp<String>>> storeType =
+ QueryableStoreTypes.timestampedWindowStore();
+
+ assertTrue(storeType.accepts(store));
+ }
+
+ @Test
+ public void shouldAcceptRegularWindowStoreForWindowStoreType() {
+ final WindowStore<String, String> store =
+ Stores.windowStoreBuilder(
+ Stores.inMemoryWindowStore(
+ "test-window-store",
+ Duration.ofMillis(100),
+ Duration.ofMillis(10),
+ false),
+ Serdes.String(),
+ Serdes.String())
+ .build();
+
+ final QueryableStoreType<ReadOnlyWindowStore<String, String>>
storeType =
+ QueryableStoreTypes.windowStore();
+
+ assertTrue(storeType.accepts(store));
+ }
+
+ @Test
+ public void shouldAcceptSessionStoreWithHeadersForSessionStoreType() {
+ final SessionStoreWithHeaders<String, String> store =
+ Stores.sessionStoreBuilderWithHeaders(
+ Stores.inMemorySessionStore(
+ "test-session-store",
+ Duration.ofMillis(100)),
+ Serdes.String(),
+ Serdes.String())
+ .build();
+
+ final QueryableStoreType<ReadOnlySessionStore<String,
ValueAndTimestamp<String>>> storeType =
+ QueryableStoreTypes.sessionStore();
+
+ assertTrue(storeType.accepts(store));
+ }
+
+ @Test
+ public void shouldAcceptRegularSessionStoreForSessionStoreType() {
+ final SessionStore<String, String> store =
+ Stores.sessionStoreBuilder(
+ Stores.inMemorySessionStore(
+ "test-session-store",
+ Duration.ofMillis(100)),
+ Serdes.String(),
+ Serdes.String())
+ .build();
+
+ final QueryableStoreType<ReadOnlySessionStore<String, String>>
storeType =
+ QueryableStoreTypes.sessionStore();
+
+ assertTrue(storeType.accepts(store));
+ }
+}
\ No newline at end of file
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/GenericReadOnlyKeyValueStoreFacadeTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/GenericReadOnlyKeyValueStoreFacadeTest.java
index c569ca20eaa..e6611c2e651 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/GenericReadOnlyKeyValueStoreFacadeTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/GenericReadOnlyKeyValueStoreFacadeTest.java
@@ -84,7 +84,7 @@ public class GenericReadOnlyKeyValueStoreFacadeTest {
@Test
public void shouldConvertValueToValueAndTimestamp() {
final Function<ValueTimestampHeaders<String>,
ValueAndTimestamp<String>> converter =
- ValueConverters.headersToValueAndTimestamp();
+ ValueConverters.extractValueAndTimestampFromHeaders();
final GenericReadOnlyKeyValueStoreFacade<String,
ValueTimestampHeaders<String>, ValueAndTimestamp<String>> facade =
new GenericReadOnlyKeyValueStoreFacade<>(mockedHeadersStore,
converter);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/GenericReadOnlyWindowStoreFacadeTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/GenericReadOnlyWindowStoreFacadeTest.java
index 86a4f3e5543..c5f1aaf9b37 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/GenericReadOnlyWindowStoreFacadeTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/GenericReadOnlyWindowStoreFacadeTest.java
@@ -91,7 +91,7 @@ public class GenericReadOnlyWindowStoreFacadeTest {
@Test
public void shouldConvertToValueAndTimestamp() {
final Function<ValueTimestampHeaders<String>,
ValueAndTimestamp<String>> converter =
- ValueConverters.headersToValueAndTimestamp();
+ ValueConverters.extractValueAndTimestampFromHeaders();
final GenericReadOnlyWindowStoreFacade<String,
ValueTimestampHeaders<String>, ValueAndTimestamp<String>> facade =
new GenericReadOnlyWindowStoreFacade<>(mockedHeadersStore,
converter);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProviderTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProviderTest.java
index 0480fbc0721..cd8692137dd 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProviderTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProviderTest.java
@@ -31,7 +31,9 @@ import org.apache.kafka.streams.state.ReadOnlySessionStore;
import org.apache.kafka.streams.state.ReadOnlyWindowStore;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
+import org.apache.kafka.streams.state.TimestampedKeyValueStoreWithHeaders;
import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.TimestampedWindowStoreWithHeaders;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.test.NoOpReadOnlyStore;
@@ -107,6 +109,22 @@ public class GlobalStateStoreProviderTest {
Duration.ofMillis(10L)),
Serdes.String(),
Serdes.String()).build());
+ stores.put(
+ "ts-kv-store-with-headers",
+ Stores.timestampedKeyValueStoreBuilderWithHeaders(
+ Stores.inMemoryKeyValueStore("ts-kv-store-with-headers"),
+ Serdes.String(),
+ Serdes.String()).build());
+ stores.put(
+ "ts-w-store-with-headers",
+ Stores.timestampedWindowStoreWithHeadersBuilder(
+ Stores.inMemoryWindowStore(
+ "ts-w-store-with-headers",
+ Duration.ofMillis(10L),
+ Duration.ofMillis(2L),
+ false),
+ Serdes.String(),
+ Serdes.String()).build());
final InternalProcessorContext<?, ?> mockContext =
mock(InternalProcessorContext.class);
when(mockContext.applicationId()).thenReturn("appId");
@@ -234,4 +252,57 @@ public class GlobalStateStoreProviderTest {
assertThat(store, instanceOf(ReadOnlySessionStore.class));
}
}
+
+ @Test
+ public void
shouldReturnKeyValueStoreWithHeadersFacadeForHeadersAwareStore() {
+ final GlobalStateStoreProvider provider = new
GlobalStateStoreProvider(stores);
+ final List<ReadOnlyKeyValueStore<String, String>> stores =
+ provider.stores("ts-kv-store-with-headers",
QueryableStoreTypes.keyValueStore());
+ assertEquals(1, stores.size());
+ for (final ReadOnlyKeyValueStore<String, String> store : stores) {
+ assertThat(store,
instanceOf(GenericReadOnlyKeyValueStoreFacade.class));
+ assertThat(store, not(instanceOf(TimestampedKeyValueStore.class)));
+ assertThat(store,
not(instanceOf(TimestampedKeyValueStoreWithHeaders.class)));
+ }
+ }
+
+ @Test
+ public void
shouldReturnTimestampedKeyValueStoreWithHeadersFacadeForHeadersAwareStore() {
+ final GlobalStateStoreProvider provider = new
GlobalStateStoreProvider(stores);
+ final List<ReadOnlyKeyValueStore<String, ValueAndTimestamp<String>>>
stores =
+ provider.stores("ts-kv-store-with-headers",
QueryableStoreTypes.timestampedKeyValueStore());
+ assertEquals(1, stores.size());
+ for (final ReadOnlyKeyValueStore<String, ValueAndTimestamp<String>>
store : stores) {
+ assertThat(store,
instanceOf(GenericReadOnlyKeyValueStoreFacade.class));
+ assertThat(store, not(instanceOf(TimestampedKeyValueStore.class)));
+ assertThat(store,
not(instanceOf(TimestampedKeyValueStoreWithHeaders.class)));
+ }
+ }
+
+ @Test
+ public void
shouldReturnWindowStoreWithHeadersFacadeForHeadersAwareWindowStore() {
+ final GlobalStateStoreProvider provider = new
GlobalStateStoreProvider(stores);
+ final List<ReadOnlyWindowStore<String, String>> stores =
+ provider.stores("ts-w-store-with-headers",
QueryableStoreTypes.windowStore());
+ assertEquals(1, stores.size());
+ for (final ReadOnlyWindowStore<String, String> store : stores) {
+ assertThat(store,
instanceOf(GenericReadOnlyWindowStoreFacade.class));
+ assertThat(store, not(instanceOf(TimestampedWindowStore.class)));
+ assertThat(store,
not(instanceOf(TimestampedWindowStoreWithHeaders.class)));
+ }
+ }
+
+ @Test
+ public void
shouldReturnTimestampedWindowStoreWithHeadersFacadeForHeadersAwareWindowStore()
{
+ final GlobalStateStoreProvider provider = new
GlobalStateStoreProvider(stores);
+ final List<ReadOnlyWindowStore<String, ValueAndTimestamp<String>>>
stores =
+ provider.stores("ts-w-store-with-headers",
QueryableStoreTypes.timestampedWindowStore());
+ assertEquals(1, stores.size());
+ for (final ReadOnlyWindowStore<String, ValueAndTimestamp<String>>
store : stores) {
+ assertThat(store, instanceOf(ReadOnlyWindowStore.class));
+ assertThat(store,
instanceOf(GenericReadOnlyWindowStoreFacade.class));
+ assertThat(store, not(instanceOf(TimestampedWindowStore.class)));
+ assertThat(store,
not(instanceOf(TimestampedWindowStoreWithHeaders.class)));
+ }
+ }
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
index 8a8e892df00..180cd121283 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
@@ -58,7 +58,9 @@ import org.apache.kafka.streams.state.ReadOnlySessionStore;
import org.apache.kafka.streams.state.ReadOnlyWindowStore;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
+import org.apache.kafka.streams.state.TimestampedKeyValueStoreWithHeaders;
import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.TimestampedWindowStoreWithHeaders;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.test.MockApiProcessorSupplier;
import org.apache.kafka.test.TestUtils;
@@ -152,6 +154,22 @@ public class StreamThreadStateStoreProviderTest {
Serdes.String(),
Serdes.String()),
"the-processor");
+ topology.addStateStore(
+ Stores.timestampedKeyValueStoreBuilderWithHeaders(
+
Stores.inMemoryKeyValueStore("timestamped-kv-store-with-headers"),
+ Serdes.String(),
+ Serdes.String()),
+ "the-processor");
+ topology.addStateStore(
+ Stores.timestampedWindowStoreWithHeadersBuilder(
+ Stores.inMemoryWindowStore(
+ "timestamped-window-store-with-headers",
+ Duration.ofMillis(10L),
+ Duration.ofMillis(2L),
+ false),
+ Serdes.String(),
+ Serdes.String()),
+ "the-processor");
final Properties properties = new Properties();
final String applicationId = "applicationId";
@@ -413,6 +431,62 @@ public class StreamThreadStateStoreProviderTest {
QueryableStoreTypes.keyValueStore())));
}
+ @Test
+ public void shouldFindTimestampedKeyValueStoresWithHeaders() {
+ mockThread(true);
+ final List<ReadOnlyKeyValueStore<String, ValueAndTimestamp<String>>>
stores =
+
provider.stores(StoreQueryParameters.fromNameAndType("timestamped-kv-store-with-headers",
+ QueryableStoreTypes.timestampedKeyValueStore()));
+ assertEquals(2, stores.size());
+ for (final ReadOnlyKeyValueStore<String, ValueAndTimestamp<String>>
store : stores) {
+ assertThat(store,
instanceOf(GenericReadOnlyKeyValueStoreFacade.class));
+ assertThat(store, not(instanceOf(TimestampedKeyValueStore.class)));
+ assertThat(store,
not(instanceOf(TimestampedKeyValueStoreWithHeaders.class)));
+ }
+ }
+
+ @Test
+ public void
shouldFindTimestampedKeyValueStoresWithHeadersAsKeyValueStores() {
+ mockThread(true);
+ final List<ReadOnlyKeyValueStore<String, String>> stores =
+
provider.stores(StoreQueryParameters.fromNameAndType("timestamped-kv-store-with-headers",
+ QueryableStoreTypes.keyValueStore()));
+ assertEquals(2, stores.size());
+ for (final ReadOnlyKeyValueStore<String, String> store : stores) {
+ assertThat(store,
instanceOf(GenericReadOnlyKeyValueStoreFacade.class));
+ assertThat(store, not(instanceOf(TimestampedKeyValueStore.class)));
+ assertThat(store,
not(instanceOf(TimestampedKeyValueStoreWithHeaders.class)));
+ }
+ }
+
+ @Test
+ public void shouldFindTimestampedWindowStoresWithHeaders() {
+ mockThread(true);
+ final List<ReadOnlyWindowStore<String, ValueAndTimestamp<String>>>
stores =
+
provider.stores(StoreQueryParameters.fromNameAndType("timestamped-window-store-with-headers",
+ QueryableStoreTypes.timestampedWindowStore()));
+ assertEquals(2, stores.size());
+ for (final ReadOnlyWindowStore<String, ValueAndTimestamp<String>>
store : stores) {
+ assertThat(store,
instanceOf(GenericReadOnlyWindowStoreFacade.class));
+ assertThat(store, not(instanceOf(TimestampedWindowStore.class)));
+ assertThat(store,
not(instanceOf(TimestampedWindowStoreWithHeaders.class)));
+ }
+ }
+
+ @Test
+ public void shouldFindTimestampedWindowStoresWithHeadersAsWindowStores() {
+ mockThread(true);
+ final List<ReadOnlyWindowStore<String, String>> stores =
+
provider.stores(StoreQueryParameters.fromNameAndType("timestamped-window-store-with-headers",
+ QueryableStoreTypes.windowStore()));
+ assertEquals(2, stores.size());
+ for (final ReadOnlyWindowStore<String, String> store : stores) {
+ assertThat(store,
instanceOf(GenericReadOnlyWindowStoreFacade.class));
+ assertThat(store, not(instanceOf(TimestampedWindowStore.class)));
+ assertThat(store,
not(instanceOf(TimestampedWindowStoreWithHeaders.class)));
+ }
+ }
+
private StreamTask createStreamsTask(final StreamsConfig streamsConfig,
final Consumer<byte[], byte[]>
consumer,
final Producer<byte[], byte[]>
producer,
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ValueConvertersTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ValueConvertersTest.java
index fd8093f2933..5c6fd58a56c 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ValueConvertersTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ValueConvertersTest.java
@@ -63,7 +63,7 @@ public class ValueConvertersTest {
@Test
public void headersToValueAndTimestampShouldConvertCorrectly() {
final Function<ValueTimestampHeaders<String>,
ValueAndTimestamp<String>> converter =
- ValueConverters.headersToValueAndTimestamp();
+ ValueConverters.extractValueAndTimestampFromHeaders();
final ValueTimestampHeaders<String> vth =
ValueTimestampHeaders.make("value", 42L, new RecordHeaders());
final ValueAndTimestamp<String> result = converter.apply(vth);
@@ -75,7 +75,7 @@ public class ValueConvertersTest {
@Test
public void headersToValueAndTimestampShouldReturnNullWhenInputIsNull() {
final Function<ValueTimestampHeaders<String>,
ValueAndTimestamp<String>> converter =
- ValueConverters.headersToValueAndTimestamp();
+ ValueConverters.extractValueAndTimestampFromHeaders();
assertNull(converter.apply(null));
}
@@ -83,7 +83,7 @@ public class ValueConvertersTest {
@Test
public void headersToValueAndTimestampShouldDiscardHeaders() {
final Function<ValueTimestampHeaders<String>,
ValueAndTimestamp<String>> converter =
- ValueConverters.headersToValueAndTimestamp();
+ ValueConverters.extractValueAndTimestampFromHeaders();
final RecordHeaders headers = new RecordHeaders();
headers.add("key1", "value1".getBytes());
diff --git
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index 3db553a6c6f..9f927f5893a 100644
---
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -1639,7 +1639,7 @@ public class TopologyTestDriver implements Closeable {
private final TimestampedWindowStoreWithHeaders<K, V> inner;
public TimestampedWindowStoreFacadeForHeaders(final
TimestampedWindowStoreWithHeaders<K, V> store) {
- super(store, ValueConverters.headersToValueAndTimestamp());
+ super(store,
ValueConverters.extractValueAndTimestampFromHeaders());
this.inner = store;
}
@@ -1943,7 +1943,7 @@ public class TopologyTestDriver implements Closeable {
private final TimestampedKeyValueStoreWithHeaders<K, V> inner;
public TimestampedKeyValueStoreFacadeForHeaders(final
TimestampedKeyValueStoreWithHeaders<K, V> store) {
- super(store, ValueConverters.headersToValueAndTimestamp());
+ super(store,
ValueConverters.extractValueAndTimestampFromHeaders());
this.inner = store;
}