This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch 4.3
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.3 by this push:
new 98617c7d6a2 KAFKA-20307: expose header-stores natively in IQv1 (#21992)
98617c7d6a2 is described below
commit 98617c7d6a2a2cab36037cc7dec72d4ae595b7a8
Author: Matthias J. Sax <[email protected]>
AuthorDate: Wed Apr 8 09:29:00 2026 -0700
KAFKA-20307: expose header-stores natively in IQv1 (#21992)
We previously only added support to query headers stores as plain- or
ts-kv-stores. This PR exposes headers-store with their native type.
Reviewers: Bill Bejeck <[email protected]>
---
.../HeadersStoreUpgradeIntegrationTest.java | 158 +++++--------------
.../kafka/streams/state/QueryableStoreTypes.java | 115 ++++++++++++--
.../internals/StreamThreadStateStoreProvider.java | 10 +-
.../internals/GlobalStateStoreProviderTest.java | 163 +++++++++++++++++---
.../StreamThreadStateStoreProviderTest.java | 168 ++++++++++++++++++++-
5 files changed, 442 insertions(+), 172 deletions(-)
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HeadersStoreUpgradeIntegrationTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HeadersStoreUpgradeIntegrationTest.java
index 5ba3f95f47d..bd5d5b87a56 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HeadersStoreUpgradeIntegrationTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HeadersStoreUpgradeIntegrationTest.java
@@ -37,7 +37,6 @@ import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.state.AggregationWithHeaders;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.streams.state.QueryableStoreType;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.streams.state.ReadOnlySessionStore;
@@ -52,9 +51,6 @@ import
org.apache.kafka.streams.state.TimestampedWindowStoreWithHeaders;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.ValueTimestampHeaders;
import org.apache.kafka.streams.state.WindowStore;
-import org.apache.kafka.streams.state.internals.CompositeReadOnlyKeyValueStore;
-import org.apache.kafka.streams.state.internals.CompositeReadOnlyWindowStore;
-import org.apache.kafka.streams.state.internals.StateStoreProvider;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterAll;
@@ -72,7 +68,6 @@ import java.time.Duration;
import java.util.LinkedList;
import java.util.List;
import java.util.Properties;
-import java.util.concurrent.atomic.AtomicReference;
import static java.util.Arrays.asList;
import static java.util.Collections.singletonList;
@@ -119,8 +114,8 @@ public class HeadersStoreUpgradeIntegrationTest {
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
CLUSTER.bootstrapServers());
streamsConfiguration.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG,
TestUtils.tempDirectory().getPath());
- streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.String().getClass());
-
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.String().getClass());
+ streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.StringSerde.class);
+
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.StringSerde.class);
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,
1000L);
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
"earliest");
return streamsConfiguration;
@@ -483,7 +478,7 @@ public class HeadersStoreUpgradeIntegrationTest {
() -> {
try {
final ReadOnlyKeyValueStore<K, ValueTimestampHeaders<V>>
store = IntegrationTestUtils
- .getStore(STORE_NAME, kafkaStreams, new
TimestampedKeyValueStoreWithHeadersType<>());
+ .getStore(STORE_NAME, kafkaStreams,
QueryableStoreTypes.timestampedKeyValueStoreWithHeaders());
if (store == null)
return false;
@@ -524,7 +519,7 @@ public class HeadersStoreUpgradeIntegrationTest {
() -> {
try {
final ReadOnlyKeyValueStore<K, ValueTimestampHeaders<V>>
store = IntegrationTestUtils
- .getStore(STORE_NAME, kafkaStreams, new
TimestampedKeyValueStoreWithHeadersType<>());
+ .getStore(STORE_NAME, kafkaStreams,
QueryableStoreTypes.timestampedKeyValueStoreWithHeaders());
if (store == null)
return false;
@@ -550,7 +545,7 @@ public class HeadersStoreUpgradeIntegrationTest {
() -> {
try {
final ReadOnlyKeyValueStore<K, ValueTimestampHeaders<V>>
store = IntegrationTestUtils
- .getStore(STORE_NAME, kafkaStreams, new
TimestampedKeyValueStoreWithHeadersType<>());
+ .getStore(STORE_NAME, kafkaStreams,
QueryableStoreTypes.timestampedKeyValueStoreWithHeaders());
if (store == null)
return false;
@@ -886,7 +881,7 @@ public class HeadersStoreUpgradeIntegrationTest {
TestUtils.waitForCondition(() -> {
try {
final ReadOnlyWindowStore<String,
ValueTimestampHeaders<String>> store =
- IntegrationTestUtils.getStore(WINDOW_STORE_NAME,
kafkaStreams, new TimestampedWindowStoreWithHeadersType<>());
+ IntegrationTestUtils.getStore(WINDOW_STORE_NAME,
kafkaStreams, QueryableStoreTypes.timestampedWindowStoreWithHeaders());
if (store == null) {
return false;
@@ -943,7 +938,7 @@ public class HeadersStoreUpgradeIntegrationTest {
TestUtils.waitForCondition(() -> {
try {
final ReadOnlyWindowStore<String,
ValueTimestampHeaders<String>> store =
- IntegrationTestUtils.getStore(WINDOW_STORE_NAME,
kafkaStreams, new TimestampedWindowStoreWithHeadersType<>());
+ IntegrationTestUtils.getStore(WINDOW_STORE_NAME,
kafkaStreams, QueryableStoreTypes.timestampedWindowStoreWithHeaders());
if (store == null) {
return false;
@@ -1029,7 +1024,7 @@ public class HeadersStoreUpgradeIntegrationTest {
TestUtils.waitForCondition(() -> {
try {
final ReadOnlyWindowStore<String,
ValueTimestampHeaders<String>> store =
- IntegrationTestUtils.getStore(WINDOW_STORE_NAME,
kafkaStreams, new TimestampedWindowStoreWithHeadersType<>());
+ IntegrationTestUtils.getStore(WINDOW_STORE_NAME,
kafkaStreams, QueryableStoreTypes.timestampedWindowStoreWithHeaders());
if (store == null) {
return false;
@@ -1069,7 +1064,7 @@ public class HeadersStoreUpgradeIntegrationTest {
TestUtils.waitForCondition(() -> {
try {
final ReadOnlyWindowStore<String,
ValueTimestampHeaders<String>> store =
- IntegrationTestUtils.getStore(WINDOW_STORE_NAME,
kafkaStreams, new TimestampedWindowStoreWithHeadersType<>());
+ IntegrationTestUtils.getStore(WINDOW_STORE_NAME,
kafkaStreams, QueryableStoreTypes.timestampedWindowStoreWithHeaders());
if (store == null) {
return false;
@@ -1473,7 +1468,7 @@ public class HeadersStoreUpgradeIntegrationTest {
private boolean windowStoreContainsKey(final String key, final long
timestamp) {
try {
final ReadOnlyWindowStore<String, ValueTimestampHeaders<String>>
store =
- IntegrationTestUtils.getStore(WINDOW_STORE_NAME, kafkaStreams,
new TimestampedWindowStoreWithHeadersType<>());
+ IntegrationTestUtils.getStore(WINDOW_STORE_NAME, kafkaStreams,
QueryableStoreTypes.timestampedWindowStoreWithHeaders());
if (store == null) {
return false;
@@ -1498,9 +1493,8 @@ public class HeadersStoreUpgradeIntegrationTest {
* Setup and populate a window store with headers.
* @param props Streams properties
* @param records List of (key, timestampOffset) tuples. Values will be
generated as "value{N}"
- * @return base time used for record timestamps
*/
- private long setupAndPopulateWindowStoreWithHeaders(final Properties props,
+ private void setupAndPopulateWindowStoreWithHeaders(final Properties props,
final
List<KeyValue<String, Long>> records) throws Exception {
final long baseTime = setupWindowStoreWithHeaders(props);
@@ -1525,7 +1519,6 @@ public class HeadersStoreUpgradeIntegrationTest {
);
kafkaStreams.close();
- return baseTime;
}
private long setupWindowStoreWithHeaders(final Properties props) throws
Exception {
@@ -1547,7 +1540,7 @@ public class HeadersStoreUpgradeIntegrationTest {
return CLUSTER.time.milliseconds();
}
- private void produceRecordWithHeaders(final String key, final String
value, final long timestamp) throws Exception {
+ private void produceRecordWithHeaders(final String key, final String
value, final long timestamp) {
final Headers headers = new RecordHeaders();
headers.add("source", "test".getBytes());
@@ -1620,7 +1613,6 @@ public class HeadersStoreUpgradeIntegrationTest {
// Phase 2: Restart with SessionStoreWithHeaders (headers-aware
supplier)
final StreamsBuilder newBuilder = new StreamsBuilder();
- final AtomicReference<SessionWithHeadersProcessor> processorRef = new
AtomicReference<>();
newBuilder.addStateStore(
Stores.sessionStoreWithHeadersBuilder(
isPersistent ?
Stores.persistentSessionStoreWithHeaders(SESSION_STORE_NAME,
Duration.ofMillis(RETENTION_MS)) :
@@ -1628,26 +1620,22 @@ public class HeadersStoreUpgradeIntegrationTest {
Serdes.String(),
Serdes.String()))
.stream(inputStream, Consumed.with(Serdes.String(),
Serdes.String()))
- .process(() -> {
- final SessionWithHeadersProcessor sessionStore = new
SessionWithHeadersProcessor();
- processorRef.set(sessionStore);
- return sessionStore;
- }, SESSION_STORE_NAME);
+ .process(SessionWithHeadersProcessor::new, SESSION_STORE_NAME);
kafkaStreams = new KafkaStreams(newBuilder.build(), props);
IntegrationTestUtils.startApplicationAndWaitUntilRunning(kafkaStreams);
// Verify legacy data can be read with empty headers
- verifySessionValueWithEmptyHeaders("key1", "value1", baseTime + 100,
processorRef);
- verifySessionValueWithEmptyHeaders("key2", "value2", baseTime + 200,
processorRef);
- verifySessionValueWithEmptyHeaders("key3", "value3", baseTime + 300,
processorRef);
+ verifySessionValueWithEmptyHeaders("key1", "value1", baseTime + 100);
+ verifySessionValueWithEmptyHeaders("key2", "value2", baseTime + 200);
+ verifySessionValueWithEmptyHeaders("key3", "value3", baseTime + 300);
// Process new records with headers
final Headers headers = new RecordHeaders();
headers.add("source", "migration-test".getBytes());
- processSessionKeyValueWithHeadersAndVerify("key4", "value4", baseTime
+ 400, headers, headers, processorRef);
- processSessionKeyValueWithHeadersAndVerify("key5", "value5", baseTime
+ 500, headers, headers, processorRef);
+ processSessionKeyValueWithHeadersAndVerify("key4", "value4", baseTime
+ 400, headers, headers);
+ processSessionKeyValueWithHeadersAndVerify("key5", "value5", baseTime
+ 500, headers, headers);
kafkaStreams.close();
}
@@ -1678,26 +1666,21 @@ public class HeadersStoreUpgradeIntegrationTest {
// Phase 2: Restart with headers-aware builder but non-headers
supplier (proxy/adapter mode)
final StreamsBuilder newBuilder = new StreamsBuilder();
- final AtomicReference<SessionWithHeadersProcessor> processorRef = new
AtomicReference<>();
newBuilder.addStateStore(
Stores.sessionStoreWithHeadersBuilder(
Stores.persistentSessionStore(SESSION_STORE_NAME,
Duration.ofMillis(RETENTION_MS)), // non-headers supplier!
Serdes.String(),
Serdes.String()))
.stream(inputStream, Consumed.with(Serdes.String(),
Serdes.String()))
- .process(() -> {
- final SessionWithHeadersProcessor p = new
SessionWithHeadersProcessor();
- processorRef.set(p);
- return p;
- }, SESSION_STORE_NAME);
+ .process(SessionWithHeadersProcessor::new, SESSION_STORE_NAME);
kafkaStreams = new KafkaStreams(newBuilder.build(), props);
IntegrationTestUtils.startApplicationAndWaitUntilRunning(kafkaStreams);
// Verify legacy data can be read with empty headers
- verifySessionValueWithEmptyHeaders("key1", "value1", baseTime + 100,
processorRef);
- verifySessionValueWithEmptyHeaders("key2", "value2", baseTime + 200,
processorRef);
- verifySessionValueWithEmptyHeaders("key3", "value3", baseTime + 300,
processorRef);
+ verifySessionValueWithEmptyHeaders("key1", "value1", baseTime + 100);
+ verifySessionValueWithEmptyHeaders("key2", "value2", baseTime + 200);
+ verifySessionValueWithEmptyHeaders("key3", "value3", baseTime + 300);
// In proxy mode, headers are stripped when writing to non-headers
store
// So we expect empty headers when reading back
@@ -1705,8 +1688,8 @@ public class HeadersStoreUpgradeIntegrationTest {
headers.add("source", "proxy-test".getBytes());
final Headers expectedHeaders = new RecordHeaders();
- processSessionKeyValueWithHeadersAndVerify("key4", "value4", baseTime
+ 400, headers, expectedHeaders, processorRef);
- processSessionKeyValueWithHeadersAndVerify("key5", "value5", baseTime
+ 500, headers, expectedHeaders, processorRef);
+ processSessionKeyValueWithHeadersAndVerify("key4", "value4", baseTime
+ 400, headers, expectedHeaders);
+ processSessionKeyValueWithHeadersAndVerify("key5", "value5", baseTime
+ 500, headers, expectedHeaders);
kafkaStreams.close();
}
@@ -1823,14 +1806,12 @@ public class HeadersStoreUpgradeIntegrationTest {
private void verifySessionValueWithEmptyHeaders(final String key,
final String value,
- final long timestamp,
- final
AtomicReference<SessionWithHeadersProcessor> processorRef) throws Exception {
+ final long timestamp)
throws Exception {
TestUtils.waitForCondition(() -> {
try {
- if (processorRef.get() == null) {
- return false;
- }
- final ReadOnlySessionStore<String,
AggregationWithHeaders<String>> store = processorRef.get().store();
+ final ReadOnlySessionStore<String,
AggregationWithHeaders<String>> store =
+ IntegrationTestUtils.getStore(SESSION_STORE_NAME,
kafkaStreams, QueryableStoreTypes.sessionStoreWithHeaders());
+
if (store == null) {
return false;
}
@@ -1866,8 +1847,7 @@ public class HeadersStoreUpgradeIntegrationTest {
final String value,
final long
timestamp,
final Headers
headers,
- final Headers
expectedHeaders,
- final
AtomicReference<SessionWithHeadersProcessor> processorRef) throws Exception {
+ final Headers
expectedHeaders) throws Exception {
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
inputStream,
singletonList(KeyValue.pair(key, value)),
@@ -1880,10 +1860,8 @@ public class HeadersStoreUpgradeIntegrationTest {
TestUtils.waitForCondition(() -> {
try {
- if (processorRef.get() == null) {
- return false;
- }
- final ReadOnlySessionStore<String,
AggregationWithHeaders<String>> store = processorRef.get().store();
+ final ReadOnlySessionStore<String,
AggregationWithHeaders<String>> store =
+ IntegrationTestUtils.getStore(SESSION_STORE_NAME,
kafkaStreams, QueryableStoreTypes.sessionStoreWithHeaders());
if (store == null) {
return false;
@@ -1912,13 +1890,11 @@ public class HeadersStoreUpgradeIntegrationTest {
}
private boolean sessionStoreContainsKey(final String key,
- final long timestamp,
- final
AtomicReference<SessionWithHeadersProcessor> processorRef) {
+ final long timestamp) {
try {
- if (processorRef.get() == null) {
- return false;
- }
- final SessionStoreWithHeaders<String, String> store =
processorRef.get().store();
+ final ReadOnlySessionStore<String, AggregationWithHeaders<String>>
store =
+ IntegrationTestUtils.getStore(SESSION_STORE_NAME,
kafkaStreams, QueryableStoreTypes.sessionStoreWithHeaders());
+
if (store == null) {
return false;
}
@@ -1939,18 +1915,13 @@ public class HeadersStoreUpgradeIntegrationTest {
private void setupAndPopulateSessionStoreWithHeaders(final Properties
props) throws Exception {
final StreamsBuilder headersBuilder = new StreamsBuilder();
- final AtomicReference<SessionWithHeadersProcessor> processorRef = new
AtomicReference<>();
headersBuilder.addStateStore(
Stores.sessionStoreWithHeadersBuilder(
Stores.persistentSessionStoreWithHeaders(SESSION_STORE_NAME,
Duration.ofMillis(RETENTION_MS)),
Serdes.String(),
Serdes.String()))
.stream(inputStream, Consumed.with(Serdes.String(),
Serdes.String()))
- .process(() -> {
- final SessionWithHeadersProcessor p = new
SessionWithHeadersProcessor();
- processorRef.set(p);
- return p;
- }, SESSION_STORE_NAME);
+ .process(SessionWithHeadersProcessor::new, SESSION_STORE_NAME);
kafkaStreams = new KafkaStreams(headersBuilder.build(), props);
IntegrationTestUtils.startApplicationAndWaitUntilRunning(kafkaStreams);
@@ -1968,7 +1939,7 @@ public class HeadersStoreUpgradeIntegrationTest {
false);
TestUtils.waitForCondition(
- () -> sessionStoreContainsKey("key1", baseTime + 100,
processorRef),
+ () -> sessionStoreContainsKey("key1", baseTime + 100),
30_000L,
"Store was not populated with expected data"
);
@@ -1981,10 +1952,6 @@ public class HeadersStoreUpgradeIntegrationTest {
private static class SessionProcessor implements Processor<String, String,
Void, Void> {
private SessionStore<String, String> store;
- public SessionStore<String, String> store() {
- return store;
- }
-
@Override
public void init(final ProcessorContext<Void, Void> context) {
store = context.getStateStore(SESSION_STORE_NAME);
@@ -2001,10 +1968,6 @@ public class HeadersStoreUpgradeIntegrationTest {
private static class SessionWithHeadersProcessor implements
Processor<String, String, Void, Void> {
private SessionStoreWithHeaders<String, String> store;
- public SessionStoreWithHeaders<String, String> store() {
- return store;
- }
-
@Override
public void init(final ProcessorContext<Void, Void> context) {
store = context.getStateStore(SESSION_STORE_NAME);
@@ -2018,51 +1981,4 @@ public class HeadersStoreUpgradeIntegrationTest {
}
}
- // ==================== Custom QueryableStoreTypes ====================
-
- /**
- * Custom QueryableStoreType for querying
TimestampedKeyValueStoreWithHeaders directly
- * without facade wrapping. This returns the full ValueTimestampHeaders
wrapper.
- */
- private static class TimestampedKeyValueStoreWithHeadersType<K, V>
- implements QueryableStoreType<ReadOnlyKeyValueStore<K,
ValueTimestampHeaders<V>>> {
-
- @Override
- public boolean accepts(final
org.apache.kafka.streams.processor.StateStore stateStore) {
- // Accept stores that implement both
TimestampedKeyValueStoreWithHeaders and ReadOnlyKeyValueStore
- return stateStore instanceof TimestampedKeyValueStoreWithHeaders
- && stateStore instanceof ReadOnlyKeyValueStore;
- }
-
- @Override
- public ReadOnlyKeyValueStore<K, ValueTimestampHeaders<V>> create(
- final StateStoreProvider storeProvider,
- final String storeName) {
- return new CompositeReadOnlyKeyValueStore<>(storeProvider, this,
storeName);
- }
- }
-
- /**
- * Custom queryable store type for accessing
TimestampedWindowStoreWithHeaders directly
- * without facade wrapping. This returns the full ValueTimestampHeaders
wrapper.
- */
- private static class TimestampedWindowStoreWithHeadersType<K, V>
- implements QueryableStoreType<ReadOnlyWindowStore<K,
ValueTimestampHeaders<V>>> {
-
- @Override
- public boolean accepts(final
org.apache.kafka.streams.processor.StateStore stateStore) {
- // Accept stores that implement both
TimestampedWindowStoreWithHeaders and ReadOnlyWindowStore
- return stateStore instanceof TimestampedWindowStoreWithHeaders
- && stateStore instanceof ReadOnlyWindowStore;
- }
-
- @Override
- public ReadOnlyWindowStore<K, ValueTimestampHeaders<V>> create(
- final StateStoreProvider storeProvider,
- final String storeName) {
- return new CompositeReadOnlyWindowStore<>(storeProvider, this,
storeName);
- }
- }
-
-
}
\ No newline at end of file
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreTypes.java
b/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreTypes.java
index 026df05c515..1f6f1b8890b 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreTypes.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreTypes.java
@@ -57,6 +57,17 @@ public final class QueryableStoreTypes {
return new TimestampedKeyValueStoreType<>();
}
+ /**
+ * A {@link QueryableStoreType} that accepts {@link ReadOnlyKeyValueStore
ReadOnlyKeyValueStore<K, ValueTimestampHeaders<V>>}.
+ *
+ * @param <K> key type of the store
+ * @param <V> value type of the store
+ * @return {@link QueryableStoreTypes.TimestampedKeyValueStoreType}
+ */
+ public static <K, V> QueryableStoreType<ReadOnlyKeyValueStore<K,
ValueTimestampHeaders<V>>> timestampedKeyValueStoreWithHeaders() {
+ return new TimestampedKeyValueStoreWithHeadersType<>();
+ }
+
/**
* A {@link QueryableStoreType} that accepts {@link ReadOnlyWindowStore}.
*
@@ -79,6 +90,17 @@ public final class QueryableStoreTypes {
return new TimestampedWindowStoreType<>();
}
+ /**
+ * A {@link QueryableStoreType} that accepts {@link ReadOnlyWindowStore
ReadOnlyWindowStore<K, ValueTimestampeHeaders<V>>}.
+ *
+ * @param <K> key type of the store
+ * @param <V> value type of the store
+ * @return {@link
QueryableStoreTypes.TimestampedWindowStoreWithHeadersType}
+ */
+ public static <K, V> QueryableStoreType<ReadOnlyWindowStore<K,
ValueTimestampHeaders<V>>> timestampedWindowStoreWithHeaders() {
+ return new TimestampedWindowStoreWithHeadersType<>();
+ }
+
/**
* A {@link QueryableStoreType} that accepts {@link ReadOnlySessionStore}.
*
@@ -90,6 +112,17 @@ public final class QueryableStoreTypes {
return new SessionStoreType<>();
}
+ /**
+ * A {@link QueryableStoreType} that accepts {@link ReadOnlySessionStore
ReadOnlySessionStoree<K, AggregationWithHeaders<V>>}.
+ *
+ * @param <K> key type of the store
+ * @param <V> value type of the store
+ * @return {@link QueryableStoreTypes.SessionStoreWithHeadersType}
+ */
+ public static <K, V> QueryableStoreType<ReadOnlySessionStore<K,
AggregationWithHeaders<V>>> sessionStoreWithHeaders() {
+ return new SessionStoreWithHeadersType<>();
+ }
+
private abstract static class QueryableStoreTypeMatcher<T> implements
QueryableStoreType<T> {
private final Set<Class<?>> matchTo;
@@ -116,8 +149,10 @@ public final class QueryableStoreTypes {
}
@Override
- public ReadOnlyKeyValueStore<K, V> create(final StateStoreProvider
storeProvider,
- final String storeName) {
+ public ReadOnlyKeyValueStore<K, V> create(
+ final StateStoreProvider storeProvider,
+ final String storeName
+ ) {
return new CompositeReadOnlyKeyValueStore<>(storeProvider, this,
storeName);
}
@@ -127,8 +162,7 @@ public final class QueryableStoreTypes {
extends QueryableStoreTypeMatcher<ReadOnlyKeyValueStore<K,
ValueAndTimestamp<V>>> {
TimestampedKeyValueStoreType() {
- super(Set.of(
- ReadOnlyKeyValueStore.class));
+ super(Set.of(ReadOnlyKeyValueStore.class));
}
@Override
@@ -138,8 +172,26 @@ public final class QueryableStoreTypes {
}
@Override
- public ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> create(final
StateStoreProvider storeProvider,
- final
String storeName) {
+ public ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> create(
+ final StateStoreProvider storeProvider,
+ final String storeName
+ ) {
+ return new CompositeReadOnlyKeyValueStore<>(storeProvider, this,
storeName);
+ }
+ }
+
+ private static class TimestampedKeyValueStoreWithHeadersType<K, V>
+ extends QueryableStoreTypeMatcher<ReadOnlyKeyValueStore<K,
ValueTimestampHeaders<V>>> {
+
+ TimestampedKeyValueStoreWithHeadersType() {
+ super(Set.of(ReadOnlyKeyValueStore.class,
TimestampedKeyValueStoreWithHeaders.class));
+ }
+
+ @Override
+ public ReadOnlyKeyValueStore<K, ValueTimestampHeaders<V>> create(
+ final StateStoreProvider storeProvider,
+ final String storeName
+ ) {
return new CompositeReadOnlyKeyValueStore<>(storeProvider, this,
storeName);
}
}
@@ -151,8 +203,10 @@ public final class QueryableStoreTypes {
}
@Override
- public ReadOnlyWindowStore<K, V> create(final StateStoreProvider
storeProvider,
- final String storeName) {
+ public ReadOnlyWindowStore<K, V> create(
+ final StateStoreProvider storeProvider,
+ final String storeName
+ ) {
return new CompositeReadOnlyWindowStore<>(storeProvider, this,
storeName);
}
}
@@ -161,8 +215,7 @@ public final class QueryableStoreTypes {
extends QueryableStoreTypeMatcher<ReadOnlyWindowStore<K,
ValueAndTimestamp<V>>> {
TimestampedWindowStoreType() {
- super(Set.of(
- ReadOnlyWindowStore.class));
+ super(Set.of(ReadOnlyWindowStore.class));
}
@Override
@@ -172,8 +225,26 @@ public final class QueryableStoreTypes {
}
@Override
- public ReadOnlyWindowStore<K, ValueAndTimestamp<V>> create(final
StateStoreProvider storeProvider,
- final
String storeName) {
+ public ReadOnlyWindowStore<K, ValueAndTimestamp<V>> create(
+ final StateStoreProvider storeProvider,
+ final String storeName
+ ) {
+ return new CompositeReadOnlyWindowStore<>(storeProvider, this,
storeName);
+ }
+ }
+
+ private static class TimestampedWindowStoreWithHeadersType<K, V>
+ extends QueryableStoreTypeMatcher<ReadOnlyWindowStore<K,
ValueTimestampHeaders<V>>> {
+
+ TimestampedWindowStoreWithHeadersType() {
+ super(Set.of(ReadOnlyWindowStore.class,
TimestampedWindowStoreWithHeaders.class));
+ }
+
+ @Override
+ public ReadOnlyWindowStore<K, ValueTimestampHeaders<V>> create(
+ final StateStoreProvider storeProvider,
+ final String storeName
+ ) {
return new CompositeReadOnlyWindowStore<>(storeProvider, this,
storeName);
}
}
@@ -185,10 +256,26 @@ public final class QueryableStoreTypes {
}
@Override
- public ReadOnlySessionStore<K, V> create(final StateStoreProvider
storeProvider,
- final String storeName) {
+ public ReadOnlySessionStore<K, V> create(
+ final StateStoreProvider storeProvider,
+ final String storeName
+ ) {
return new CompositeReadOnlySessionStore<>(storeProvider, this,
storeName);
}
}
+ private static class SessionStoreWithHeadersType<K, V> extends
QueryableStoreTypeMatcher<ReadOnlySessionStore<K, AggregationWithHeaders<V>>> {
+
+ SessionStoreWithHeadersType() {
+ super(Set.of(ReadOnlySessionStore.class,
SessionStoreWithHeaders.class));
+ }
+
+ @Override
+ public ReadOnlySessionStore<K, AggregationWithHeaders<V>> create(
+ final StateStoreProvider storeProvider,
+ final String storeName
+ ) {
+ return new CompositeReadOnlySessionStore<>(storeProvider, this,
storeName);
+ }
+ }
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java
index a429ea0143a..f9dd0250563 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java
@@ -111,9 +111,6 @@ public class StreamThreadStateStoreProvider {
return (T) new
GenericReadOnlyKeyValueStoreFacade<>((TimestampedKeyValueStoreWithHeaders<Object,
Object>) store, ValueConverters.extractValueFromHeaders());
} else if (queryableStoreType instanceof
QueryableStoreTypes.TimestampedKeyValueStoreType) {
return (T) new
GenericReadOnlyKeyValueStoreFacade<>((TimestampedKeyValueStoreWithHeaders<Object,
Object>) store, ValueConverters.extractValueAndTimestampFromHeaders());
- } else {
- // For custom query types, return the raw store so they
can access headers directly
- return (T) store;
}
} else if (store instanceof TimestampedKeyValueStore &&
queryableStoreType instanceof QueryableStoreTypes.KeyValueStoreType) {
return (T) new
GenericReadOnlyKeyValueStoreFacade<>((TimestampedKeyValueStore<Object, Object>)
store, ValueConverters.extractValue());
@@ -122,17 +119,14 @@ public class StreamThreadStateStoreProvider {
return (T) new
GenericReadOnlyWindowStoreFacade<>((TimestampedWindowStoreWithHeaders<Object,
Object>) store, ValueConverters.extractValueFromHeaders());
} else if (queryableStoreType instanceof
QueryableStoreTypes.TimestampedWindowStoreType) {
return (T) new
GenericReadOnlyWindowStoreFacade<>((TimestampedWindowStoreWithHeaders<Object,
Object>) store, ValueConverters.extractValueAndTimestampFromHeaders());
- } else {
- // For custom query types, return the raw store so they
can access headers directly
- return (T) store;
}
} else if (store instanceof TimestampedWindowStore &&
queryableStoreType instanceof QueryableStoreTypes.WindowStoreType) {
return (T) new
GenericReadOnlyWindowStoreFacade<>((TimestampedWindowStore<Object, Object>)
store, ValueConverters.extractValue());
} else if (store instanceof SessionStoreWithHeaders &&
queryableStoreType instanceof QueryableStoreTypes.SessionStoreType) {
return (T) new
ReadOnlySessionStoreFacade<>((SessionStoreWithHeaders<Object, Object>) store);
- } else {
- return (T) store;
}
+
+ return (T) store;
} else {
throw new InvalidStateStoreException(
"Cannot get state store " + storeName +
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProviderTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProviderTest.java
index f96f8e96d99..7d8ef6dacee 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProviderTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProviderTest.java
@@ -25,16 +25,19 @@ import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.AggregationWithHeaders;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.streams.state.ReadOnlySessionStore;
import org.apache.kafka.streams.state.ReadOnlyWindowStore;
+import org.apache.kafka.streams.state.SessionStoreWithHeaders;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.TimestampedKeyValueStoreWithHeaders;
import org.apache.kafka.streams.state.TimestampedWindowStore;
import org.apache.kafka.streams.state.TimestampedWindowStoreWithHeaders;
import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.ValueTimestampHeaders;
import org.apache.kafka.test.NoOpReadOnlyStore;
import org.junit.jupiter.api.BeforeEach;
@@ -125,6 +128,14 @@ public class GlobalStateStoreProviderTest {
false),
Serdes.String(),
Serdes.String()).build());
+ stores.put(
+ "s-store-with-headers",
+ Stores.sessionStoreWithHeadersBuilder(
+ Stores.inMemorySessionStore(
+ "s-store-with-headers",
+ Duration.ofMillis(10L)),
+ Serdes.String(),
+ Serdes.String()).build());
final InternalProcessorContext<?, ?> mockContext =
mock(InternalProcessorContext.class);
when(mockContext.applicationId()).thenReturn("appId");
@@ -175,6 +186,7 @@ public class GlobalStateStoreProviderTest {
for (final ReadOnlyKeyValueStore<String, String> store : stores) {
assertThat(store, instanceOf(ReadOnlyKeyValueStore.class));
assertThat(store, not(instanceOf(TimestampedKeyValueStore.class)));
+ assertThat(store,
not(instanceOf(TimestampedKeyValueStoreWithHeaders.class)));
}
}
@@ -187,11 +199,12 @@ public class GlobalStateStoreProviderTest {
for (final ReadOnlyKeyValueStore<String, ValueAndTimestamp<String>>
store : stores) {
assertThat(store, instanceOf(ReadOnlyKeyValueStore.class));
assertThat(store, instanceOf(TimestampedKeyValueStore.class));
+ assertThat(store,
not(instanceOf(TimestampedKeyValueStoreWithHeaders.class)));
}
}
@Test
- public void shouldNotReturnKeyValueStoreAsTimestampedStore() {
+ public void shouldNotReturnKeyValueStoreAsTimestampedKeyValueStore() {
final GlobalStateStoreProvider provider = new
GlobalStateStoreProvider(stores);
final List<ReadOnlyKeyValueStore<String, ValueAndTimestamp<String>>>
stores =
provider.stores("kv-store",
QueryableStoreTypes.timestampedKeyValueStore());
@@ -201,12 +214,13 @@ public class GlobalStateStoreProviderTest {
@Test
public void shouldReturnTimestampedKeyValueStoreAsKeyValueStore() {
final GlobalStateStoreProvider provider = new
GlobalStateStoreProvider(stores);
- final List<ReadOnlyKeyValueStore<String, ValueAndTimestamp<String>>>
stores =
+ final List<ReadOnlyKeyValueStore<String, String>> stores =
provider.stores("ts-kv-store",
QueryableStoreTypes.keyValueStore());
assertEquals(1, stores.size());
- for (final ReadOnlyKeyValueStore<String, ValueAndTimestamp<String>>
store : stores) {
+ for (final ReadOnlyKeyValueStore<String, String> store : stores) {
assertThat(store, instanceOf(ReadOnlyKeyValueStore.class));
assertThat(store, not(instanceOf(TimestampedKeyValueStore.class)));
+ assertThat(store,
not(instanceOf(TimestampedKeyValueStoreWithHeaders.class)));
}
}
@@ -219,11 +233,25 @@ public class GlobalStateStoreProviderTest {
for (final ReadOnlyWindowStore<String, String> store : stores) {
assertThat(store, instanceOf(ReadOnlyWindowStore.class));
assertThat(store, not(instanceOf(TimestampedWindowStore.class)));
+ assertThat(store,
not(instanceOf(TimestampedWindowStoreWithHeaders.class)));
}
}
@Test
- public void shouldNotReturnWindowStoreAsTimestampedStore() {
+ public void shouldReturnTimestampedWindowStore() {
+ final GlobalStateStoreProvider provider = new
GlobalStateStoreProvider(stores);
+ final List<ReadOnlyWindowStore<String, ValueAndTimestamp<String>>>
stores =
+ provider.stores("ts-w-store",
QueryableStoreTypes.timestampedWindowStore());
+ assertEquals(1, stores.size());
+ for (final ReadOnlyWindowStore<String, ValueAndTimestamp<String>>
store : stores) {
+ assertThat(store, instanceOf(ReadOnlyWindowStore.class));
+ assertThat(store, instanceOf(TimestampedWindowStore.class));
+ assertThat(store,
not(instanceOf(TimestampedWindowStoreWithHeaders.class)));
+ }
+ }
+
+ @Test
+ public void shouldNotReturnWindowStoreAsTimestampedWindowStore() {
final GlobalStateStoreProvider provider = new
GlobalStateStoreProvider(stores);
final List<ReadOnlyWindowStore<String, ValueAndTimestamp<String>>>
stores =
provider.stores("w-store",
QueryableStoreTypes.timestampedWindowStore());
@@ -233,12 +261,13 @@ public class GlobalStateStoreProviderTest {
@Test
public void shouldReturnTimestampedWindowStoreAsWindowStore() {
final GlobalStateStoreProvider provider = new
GlobalStateStoreProvider(stores);
- final List<ReadOnlyWindowStore<String, ValueAndTimestamp<String>>>
stores =
+ final List<ReadOnlyWindowStore<String, String>> stores =
provider.stores("ts-w-store", QueryableStoreTypes.windowStore());
assertEquals(1, stores.size());
- for (final ReadOnlyWindowStore<String, ValueAndTimestamp<String>>
store : stores) {
+ for (final ReadOnlyWindowStore<String, String> store : stores) {
assertThat(store, instanceOf(ReadOnlyWindowStore.class));
assertThat(store, not(instanceOf(TimestampedWindowStore.class)));
+ assertThat(store,
not(instanceOf(TimestampedWindowStoreWithHeaders.class)));
}
}
@@ -250,29 +279,41 @@ public class GlobalStateStoreProviderTest {
assertEquals(1, stores.size());
for (final ReadOnlySessionStore<String, String> store : stores) {
assertThat(store, instanceOf(ReadOnlySessionStore.class));
+ assertThat(store, not(instanceOf(SessionStoreWithHeaders.class)));
}
}
@Test
- public void
shouldReturnKeyValueStoreWithHeadersFacadeForHeadersAwareStore() {
+ public void shouldReturnTimestampedKeyValueStoreWithHeaders() {
final GlobalStateStoreProvider provider = new
GlobalStateStoreProvider(stores);
- final List<ReadOnlyKeyValueStore<String, String>> stores =
- provider.stores("ts-kv-store-with-headers",
QueryableStoreTypes.keyValueStore());
+ final List<ReadOnlyKeyValueStore<String,
ValueTimestampHeaders<String>>> stores =
+ provider.stores("ts-kv-store-with-headers",
QueryableStoreTypes.timestampedKeyValueStoreWithHeaders());
assertEquals(1, stores.size());
- for (final ReadOnlyKeyValueStore<String, String> store : stores) {
- assertThat(store,
instanceOf(GenericReadOnlyKeyValueStoreFacade.class));
- assertThat(store, not(instanceOf(TimestampedKeyValueStore.class)));
- assertThat(store,
not(instanceOf(TimestampedKeyValueStoreWithHeaders.class)));
+ for (final ReadOnlyKeyValueStore<String,
ValueTimestampHeaders<String>> store : stores) {
+ assertThat(store, instanceOf(ReadOnlyKeyValueStore.class));
+ assertThat(store,
instanceOf(TimestampedKeyValueStoreWithHeaders.class));
}
}
@Test
- public void
shouldReturnTimestampedKeyValueStoreWithHeadersFacadeForHeadersAwareStore() {
+ public void
shouldReturnTimestampedKeyValueStoreWithHeadersAsTimestampedKeyValueStores() {
final GlobalStateStoreProvider provider = new
GlobalStateStoreProvider(stores);
final List<ReadOnlyKeyValueStore<String, ValueAndTimestamp<String>>>
stores =
provider.stores("ts-kv-store-with-headers",
QueryableStoreTypes.timestampedKeyValueStore());
assertEquals(1, stores.size());
for (final ReadOnlyKeyValueStore<String, ValueAndTimestamp<String>>
store : stores) {
+ assertThat(store,
instanceOf(GenericReadOnlyKeyValueStoreFacade.class));
+ assertThat(store,
not(instanceOf(TimestampedKeyValueStoreWithHeaders.class)));
+ }
+ }
+
+ @Test
+ public void
shouldReturnTimestampedKeyValueStoreWithHeadersAsKeyValueStores() {
+ final GlobalStateStoreProvider provider = new
GlobalStateStoreProvider(stores);
+ final List<ReadOnlyKeyValueStore<String, String>> stores =
+ provider.stores("ts-kv-store-with-headers",
QueryableStoreTypes.keyValueStore());
+ assertEquals(1, stores.size());
+ for (final ReadOnlyKeyValueStore<String, String> store : stores) {
assertThat(store,
instanceOf(GenericReadOnlyKeyValueStoreFacade.class));
assertThat(store, not(instanceOf(TimestampedKeyValueStore.class)));
assertThat(store,
not(instanceOf(TimestampedKeyValueStoreWithHeaders.class)));
@@ -280,20 +321,35 @@ public class GlobalStateStoreProviderTest {
}
@Test
- public void
shouldReturnWindowStoreWithHeadersFacadeForHeadersAwareWindowStore() {
+ public void
shouldNotReturnTimestampedKeyValueStoreAsTimestampedKeyValueStoreWithHeaders() {
final GlobalStateStoreProvider provider = new
GlobalStateStoreProvider(stores);
- final List<ReadOnlyWindowStore<String, String>> stores =
- provider.stores("ts-w-store-with-headers",
QueryableStoreTypes.windowStore());
+ final List<ReadOnlyKeyValueStore<String,
ValueTimestampHeaders<String>>> stores =
+ provider.stores("ts-kv-store",
QueryableStoreTypes.timestampedKeyValueStoreWithHeaders());
+ assertEquals(0, stores.size());
+ }
+
+ @Test
+ public void
shouldNotReturnKeyValueStoreAsTimestampedKeyValueStoreWithHeaders() {
+ final GlobalStateStoreProvider provider = new
GlobalStateStoreProvider(stores);
+ final List<ReadOnlyKeyValueStore<String,
ValueTimestampHeaders<String>>> stores =
+ provider.stores("kv-store",
QueryableStoreTypes.timestampedKeyValueStoreWithHeaders());
+ assertEquals(0, stores.size());
+ }
+
+ @Test
+ public void shouldReturnTimestampedWindowStoreWithHeaders() {
+ final GlobalStateStoreProvider provider = new
GlobalStateStoreProvider(stores);
+ final List<ReadOnlyWindowStore<String, ValueTimestampHeaders<String>>>
stores =
+ provider.stores("ts-w-store-with-headers",
QueryableStoreTypes.timestampedWindowStoreWithHeaders());
assertEquals(1, stores.size());
- for (final ReadOnlyWindowStore<String, String> store : stores) {
- assertThat(store,
instanceOf(GenericReadOnlyWindowStoreFacade.class));
- assertThat(store, not(instanceOf(TimestampedWindowStore.class)));
- assertThat(store,
not(instanceOf(TimestampedWindowStoreWithHeaders.class)));
+ for (final ReadOnlyWindowStore<String, ValueTimestampHeaders<String>>
store : stores) {
+ assertThat(store, instanceOf(ReadOnlyWindowStore.class));
+ assertThat(store,
instanceOf(TimestampedWindowStoreWithHeaders.class));
}
}
@Test
- public void
shouldReturnTimestampedWindowStoreWithHeadersFacadeForHeadersAwareWindowStore()
{
+ public void
shouldReturnTimestampedWindowStoreWithHeadersAsTimestampedWindowStore() {
final GlobalStateStoreProvider provider = new
GlobalStateStoreProvider(stores);
final List<ReadOnlyWindowStore<String, ValueAndTimestamp<String>>>
stores =
provider.stores("ts-w-store-with-headers",
QueryableStoreTypes.timestampedWindowStore());
@@ -301,8 +357,69 @@ public class GlobalStateStoreProviderTest {
for (final ReadOnlyWindowStore<String, ValueAndTimestamp<String>>
store : stores) {
assertThat(store, instanceOf(ReadOnlyWindowStore.class));
assertThat(store,
instanceOf(GenericReadOnlyWindowStoreFacade.class));
- assertThat(store, not(instanceOf(TimestampedWindowStore.class)));
assertThat(store,
not(instanceOf(TimestampedWindowStoreWithHeaders.class)));
}
}
+
+ @Test
+ public void shouldReturnTimestampedWindowStoreWithHeadersAsWindowStore() {
+ final GlobalStateStoreProvider provider = new
GlobalStateStoreProvider(stores);
+ final List<ReadOnlyWindowStore<String, String>> stores =
+ provider.stores("ts-w-store-with-headers",
QueryableStoreTypes.windowStore());
+ assertEquals(1, stores.size());
+ for (final ReadOnlyWindowStore<String, String> store : stores) {
+ assertThat(store, instanceOf(ReadOnlyWindowStore.class));
+ assertThat(store,
instanceOf(GenericReadOnlyWindowStoreFacade.class));
+ assertThat(store,
not(instanceOf(TimestampedWindowStoreWithHeaders.class)));
+ }
+ }
+
+ @Test
+ public void
shouldNotReturnTimestampedWindowStoreAsTimestampedWindowStoreWithHeaders() {
+ final GlobalStateStoreProvider provider = new
GlobalStateStoreProvider(stores);
+ final List<ReadOnlyKeyValueStore<String,
ValueTimestampHeaders<String>>> stores =
+ provider.stores("ts-w-store",
QueryableStoreTypes.timestampedKeyValueStoreWithHeaders());
+ assertEquals(0, stores.size());
+ }
+
+ @Test
+ public void
shouldNotReturnWindowStoreAsTimestampedWindowStoreWithHeaders() {
+ final GlobalStateStoreProvider provider = new
GlobalStateStoreProvider(stores);
+ final List<ReadOnlyKeyValueStore<String,
ValueTimestampHeaders<String>>> stores =
+ provider.stores("w-store",
QueryableStoreTypes.timestampedKeyValueStoreWithHeaders());
+ assertEquals(0, stores.size());
+ }
+
+ @Test
+ public void shouldReturnSessionStoreWithHeaders() {
+ final GlobalStateStoreProvider provider = new
GlobalStateStoreProvider(stores);
+ final List<ReadOnlySessionStore<String,
AggregationWithHeaders<String>>> stores =
+ provider.stores("s-store-with-headers",
QueryableStoreTypes.sessionStoreWithHeaders());
+ assertEquals(1, stores.size());
+ for (final ReadOnlySessionStore<String,
AggregationWithHeaders<String>> store : stores) {
+ assertThat(store, instanceOf(ReadOnlySessionStore.class));
+ assertThat(store, instanceOf(SessionStoreWithHeaders.class));
+ }
+ }
+
+ @Test
+ public void shouldReturnSessionStoreWithHeadersAsSessionStore() {
+ final GlobalStateStoreProvider provider = new
GlobalStateStoreProvider(stores);
+ final List<ReadOnlySessionStore<String, String>> stores =
+ provider.stores("s-store-with-headers",
QueryableStoreTypes.sessionStore());
+ assertEquals(1, stores.size());
+ for (final ReadOnlySessionStore<String, String> store : stores) {
+ assertThat(store, instanceOf(ReadOnlySessionStore.class));
+ assertThat(store, not(instanceOf(SessionStoreWithHeaders.class)));
+ }
+ }
+
+ @Test
+ public void shouldNotReturnSessionStoreAsSessionStoreWithHeaders() {
+ final GlobalStateStoreProvider provider = new
GlobalStateStoreProvider(stores);
+ final List<ReadOnlySessionStore<String,
AggregationWithHeaders<String>>> stores =
+ provider.stores("s-store",
QueryableStoreTypes.sessionStoreWithHeaders());
+ assertEquals(0, stores.size());
+ }
+
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
index 0bd44815997..59cd8ff8ef1 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
@@ -52,16 +52,19 @@ import
org.apache.kafka.streams.processor.internals.StreamThread;
import org.apache.kafka.streams.processor.internals.StreamsProducer;
import org.apache.kafka.streams.processor.internals.Task;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.AggregationWithHeaders;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.streams.state.ReadOnlySessionStore;
import org.apache.kafka.streams.state.ReadOnlyWindowStore;
+import org.apache.kafka.streams.state.SessionStoreWithHeaders;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.TimestampedKeyValueStoreWithHeaders;
import org.apache.kafka.streams.state.TimestampedWindowStore;
import org.apache.kafka.streams.state.TimestampedWindowStoreWithHeaders;
import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.ValueTimestampHeaders;
import org.apache.kafka.test.MockApiProcessorSupplier;
import org.apache.kafka.test.TestUtils;
@@ -170,6 +173,14 @@ public class StreamThreadStateStoreProviderTest {
Serdes.String(),
Serdes.String()),
"the-processor");
+ topology.addStateStore(
+ Stores.sessionStoreWithHeadersBuilder(
+ Stores.inMemorySessionStore(
+ "session-store-with-headers",
+ Duration.ofMillis(10L)),
+ Serdes.String(),
+ Serdes.String()),
+ "the-processor");
final Properties properties = new Properties();
final String applicationId = "applicationId";
@@ -228,6 +239,7 @@ public class StreamThreadStateStoreProviderTest {
for (final ReadOnlyKeyValueStore<String, String> store: kvStores) {
assertThat(store, instanceOf(ReadOnlyKeyValueStore.class));
assertThat(store, not(instanceOf(TimestampedKeyValueStore.class)));
+ assertThat(store,
not(instanceOf(TimestampedKeyValueStoreWithHeaders.class)));
}
}
@@ -240,6 +252,7 @@ public class StreamThreadStateStoreProviderTest {
for (final ReadOnlyKeyValueStore<String, ValueAndTimestamp<String>>
store: tkvStores) {
assertThat(store, instanceOf(ReadOnlyKeyValueStore.class));
assertThat(store, instanceOf(TimestampedKeyValueStore.class));
+ assertThat(store,
not(instanceOf(TimestampedKeyValueStoreWithHeaders.class)));
}
}
@@ -264,12 +277,13 @@ public class StreamThreadStateStoreProviderTest {
@Test
public void shouldFindTimestampedKeyValueStoresAsKeyValueStores() {
mockThread(true);
- final List<ReadOnlyKeyValueStore<String, ValueAndTimestamp<String>>>
tkvStores =
+ final List<ReadOnlyKeyValueStore<String, String>> tkvStores =
provider.stores(StoreQueryParameters.fromNameAndType("timestamped-kv-store",
QueryableStoreTypes.keyValueStore()));
assertEquals(2, tkvStores.size());
- for (final ReadOnlyKeyValueStore<String, ValueAndTimestamp<String>>
store: tkvStores) {
+ for (final ReadOnlyKeyValueStore<String, String> store: tkvStores) {
assertThat(store, instanceOf(ReadOnlyKeyValueStore.class));
assertThat(store, not(instanceOf(TimestampedKeyValueStore.class)));
+ assertThat(store,
not(instanceOf(TimestampedKeyValueStoreWithHeaders.class)));
}
}
@@ -282,6 +296,7 @@ public class StreamThreadStateStoreProviderTest {
for (final ReadOnlyWindowStore<String, String> store: windowStores) {
assertThat(store, instanceOf(ReadOnlyWindowStore.class));
assertThat(store, not(instanceOf(TimestampedWindowStore.class)));
+ assertThat(store,
not(instanceOf(TimestampedWindowStoreWithHeaders.class)));
}
}
@@ -294,6 +309,7 @@ public class StreamThreadStateStoreProviderTest {
for (final ReadOnlyWindowStore<String, ValueAndTimestamp<String>>
store: windowStores) {
assertThat(store, instanceOf(ReadOnlyWindowStore.class));
assertThat(store, instanceOf(TimestampedWindowStore.class));
+ assertThat(store,
not(instanceOf(TimestampedWindowStoreWithHeaders.class)));
}
}
@@ -318,12 +334,13 @@ public class StreamThreadStateStoreProviderTest {
@Test
public void shouldFindTimestampedWindowStoresAsWindowStore() {
mockThread(true);
- final List<ReadOnlyWindowStore<String, ValueAndTimestamp<String>>>
windowStores =
+ final List<ReadOnlyWindowStore<String, String>> windowStores =
provider.stores(StoreQueryParameters.fromNameAndType("timestamped-window-store",
QueryableStoreTypes.windowStore()));
assertEquals(2, windowStores.size());
- for (final ReadOnlyWindowStore<String, ValueAndTimestamp<String>>
store: windowStores) {
+ for (final ReadOnlyWindowStore<String, String> store: windowStores) {
assertThat(store, instanceOf(ReadOnlyWindowStore.class));
assertThat(store, not(instanceOf(TimestampedWindowStore.class)));
+ assertThat(store,
not(instanceOf(TimestampedWindowStoreWithHeaders.class)));
}
}
@@ -335,6 +352,7 @@ public class StreamThreadStateStoreProviderTest {
assertEquals(2, sessionStores.size());
for (final ReadOnlySessionStore<String, String> store: sessionStores) {
assertThat(store, instanceOf(ReadOnlySessionStore.class));
+ assertThat(store, not(instanceOf(SessionStoreWithHeaders.class)));
}
}
@@ -433,6 +451,19 @@ public class StreamThreadStateStoreProviderTest {
@Test
public void shouldFindTimestampedKeyValueStoresWithHeaders() {
+ mockThread(true);
+ final List<ReadOnlyKeyValueStore<String, ValueAndTimestamp<String>>>
stores =
+
provider.stores(StoreQueryParameters.fromNameAndType("timestamped-kv-store-with-headers",
+ QueryableStoreTypes.timestampedKeyValueStoreWithHeaders()));
+ assertEquals(2, stores.size());
+ for (final ReadOnlyKeyValueStore<String, ValueAndTimestamp<String>>
store : stores) {
+ assertThat(store, instanceOf(ReadOnlyKeyValueStore.class));
+ assertThat(store,
instanceOf(TimestampedKeyValueStoreWithHeaders.class));
+ }
+ }
+
+ @Test
+ public void
shouldFindTimestampedKeyValueStoresWithHeadersAsTimestampedKeyValueStore() {
mockThread(true);
final List<ReadOnlyKeyValueStore<String, ValueAndTimestamp<String>>>
stores =
provider.stores(StoreQueryParameters.fromNameAndType("timestamped-kv-store-with-headers",
@@ -440,7 +471,6 @@ public class StreamThreadStateStoreProviderTest {
assertEquals(2, stores.size());
for (final ReadOnlyKeyValueStore<String, ValueAndTimestamp<String>>
store : stores) {
assertThat(store,
instanceOf(GenericReadOnlyKeyValueStoreFacade.class));
- assertThat(store, not(instanceOf(TimestampedKeyValueStore.class)));
assertThat(store,
not(instanceOf(TimestampedKeyValueStoreWithHeaders.class)));
}
}
@@ -459,8 +489,57 @@ public class StreamThreadStateStoreProviderTest {
}
}
+ @Test
+ public void shouldNotFindKeyValueStoresAsHeadersStore() {
+ mockThread(true);
+ final InvalidStateStoreException exception = assertThrows(
+ InvalidStateStoreException.class,
+ () ->
provider.stores(StoreQueryParameters.fromNameAndType("kv-store",
QueryableStoreTypes.timestampedKeyValueStoreWithHeaders()))
+ );
+ assertThat(
+ exception.getMessage(),
+ is(
+ "Cannot get state store kv-store because the queryable store
type " +
+ "[class
org.apache.kafka.streams.state.QueryableStoreTypes$TimestampedKeyValueStoreWithHeadersType]
" +
+ "does not accept the actual store type " +
+ "[class
org.apache.kafka.streams.state.internals.MeteredKeyValueStore]."
+ )
+ );
+ }
+
+ @Test
+ public void shouldNotFindTimestampedKeyValueStoresAsHeadersStore() {
+ mockThread(true);
+ final InvalidStateStoreException exception = assertThrows(
+ InvalidStateStoreException.class,
+ () ->
provider.stores(StoreQueryParameters.fromNameAndType("timestamped-kv-store",
QueryableStoreTypes.timestampedKeyValueStoreWithHeaders()))
+ );
+ assertThat(
+ exception.getMessage(),
+ is(
+ "Cannot get state store timestamped-kv-store because the
queryable store type " +
+ "[class
org.apache.kafka.streams.state.QueryableStoreTypes$TimestampedKeyValueStoreWithHeadersType]
" +
+ "does not accept the actual store type " +
+ "[class
org.apache.kafka.streams.state.internals.MeteredTimestampedKeyValueStore]."
+ )
+ );
+ }
+
@Test
public void shouldFindTimestampedWindowStoresWithHeaders() {
+ mockThread(true);
+ final List<ReadOnlyWindowStore<String, ValueTimestampHeaders<String>>>
stores =
+
provider.stores(StoreQueryParameters.fromNameAndType("timestamped-window-store-with-headers",
+ QueryableStoreTypes.timestampedWindowStoreWithHeaders()));
+ assertEquals(2, stores.size());
+ for (final ReadOnlyWindowStore<String, ValueTimestampHeaders<String>>
store : stores) {
+ assertThat(store, instanceOf(ReadOnlyWindowStore.class));
+ assertThat(store,
instanceOf(TimestampedWindowStoreWithHeaders.class));
+ }
+ }
+
+ @Test
+ public void
shouldFindTimestampedWindowStoresWithHeadersAsTimestampedWindowStore() {
mockThread(true);
final List<ReadOnlyWindowStore<String, ValueAndTimestamp<String>>>
stores =
provider.stores(StoreQueryParameters.fromNameAndType("timestamped-window-store-with-headers",
@@ -468,7 +547,6 @@ public class StreamThreadStateStoreProviderTest {
assertEquals(2, stores.size());
for (final ReadOnlyWindowStore<String, ValueAndTimestamp<String>>
store : stores) {
assertThat(store,
instanceOf(GenericReadOnlyWindowStoreFacade.class));
- assertThat(store, not(instanceOf(TimestampedWindowStore.class)));
assertThat(store,
not(instanceOf(TimestampedWindowStoreWithHeaders.class)));
}
}
@@ -487,6 +565,84 @@ public class StreamThreadStateStoreProviderTest {
}
}
+ @Test
+ public void shouldNotFindWindowStoresAsHeadersStore() {
+ mockThread(true);
+ final InvalidStateStoreException exception = assertThrows(
+ InvalidStateStoreException.class,
+ () ->
provider.stores(StoreQueryParameters.fromNameAndType("window-store",
QueryableStoreTypes.timestampedWindowStoreWithHeaders()))
+ );
+ assertThat(
+ exception.getMessage(),
+ is(
+ "Cannot get state store window-store because the queryable
store type " +
+ "[class
org.apache.kafka.streams.state.QueryableStoreTypes$TimestampedWindowStoreWithHeadersType]
" +
+ "does not accept the actual store type " +
+ "[class
org.apache.kafka.streams.state.internals.MeteredWindowStore]."
+ )
+ );
+ }
+
+ @Test
+ public void shouldNotFindTimestampedWindowStoresAsHeadersStore() {
+ mockThread(true);
+ final InvalidStateStoreException exception = assertThrows(
+ InvalidStateStoreException.class,
+ () ->
provider.stores(StoreQueryParameters.fromNameAndType("timestamped-window-store",
QueryableStoreTypes.timestampedWindowStoreWithHeaders()))
+ );
+ assertThat(
+ exception.getMessage(),
+ is(
+ "Cannot get state store timestamped-window-store because the
queryable store type " +
+ "[class
org.apache.kafka.streams.state.QueryableStoreTypes$TimestampedWindowStoreWithHeadersType]
" +
+ "does not accept the actual store type " +
+ "[class
org.apache.kafka.streams.state.internals.MeteredTimestampedWindowStore]."
+ )
+ );
+ }
+
+ @Test
+ public void shouldFindSessionStoresWithHeaders() {
+ mockThread(true);
+ final List<ReadOnlySessionStore<String,
AggregationWithHeaders<String>>> sessionStores =
+
provider.stores(StoreQueryParameters.fromNameAndType("session-store-with-headers",
QueryableStoreTypes.sessionStoreWithHeaders()));
+ assertEquals(2, sessionStores.size());
+ for (final ReadOnlySessionStore<String,
AggregationWithHeaders<String>> store: sessionStores) {
+ assertThat(store, instanceOf(ReadOnlySessionStore.class));
+ assertThat(store, instanceOf(SessionStoreWithHeaders.class));
+ }
+ }
+
+ @Test
+ public void shouldFindSessionStoresWithHeadersAsSessionStore() {
+ mockThread(true);
+ final List<ReadOnlySessionStore<String, String>> sessionStores =
+
provider.stores(StoreQueryParameters.fromNameAndType("session-store-with-headers",
QueryableStoreTypes.sessionStore()));
+ assertEquals(2, sessionStores.size());
+ for (final ReadOnlySessionStore<String, String> store: sessionStores) {
+ assertThat(store, instanceOf(ReadOnlySessionStoreFacade.class));
+ assertThat(store, not(instanceOf(SessionStoreWithHeaders.class)));
+ }
+ }
+
+ @Test
+ public void shouldNotFindSessionStoresAsSessionStoreWithHeaders() {
+ mockThread(true);
+ final InvalidStateStoreException exception = assertThrows(
+ InvalidStateStoreException.class,
+ () ->
provider.stores(StoreQueryParameters.fromNameAndType("session-store",
QueryableStoreTypes.sessionStoreWithHeaders()))
+ );
+ assertThat(
+ exception.getMessage(),
+ is(
+ "Cannot get state store session-store because the queryable
store type " +
+ "[class
org.apache.kafka.streams.state.QueryableStoreTypes$SessionStoreWithHeadersType]
" +
+ "does not accept the actual store type " +
+ "[class
org.apache.kafka.streams.state.internals.MeteredSessionStore]."
+ )
+ );
+ }
+
private StreamTask createStreamsTask(final StreamsConfig streamsConfig,
final Consumer<byte[], byte[]>
consumer,
final Producer<byte[], byte[]>
producer,