This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 94fc9adb5ea KAFKA-20194: Add sesssion-header-store support to TDD
(#21956)
94fc9adb5ea is described below
commit 94fc9adb5eaa41be23880db184ae13716c415db7
Author: Matthias J. Sax <[email protected]>
AuthorDate: Wed Apr 8 11:47:29 2026 -0700
KAFKA-20194: Add sesssion-header-store support to TDD (#21956)
Adds missing method TopologyTestDriver.getSessionStoreWithHeader(), and
updates TopologyTestDriverTest for all newly added header store types.
Reviewers: TengYao Chi <[email protected]>, Alieh Saeedi
<[email protected]>
---
checkstyle/suppressions.xml | 10 +-
.../org/apache/kafka/streams/state/Stores.java | 7 +-
.../state/internals/GlobalStateStoreProvider.java | 21 +-
.../internals/StreamThreadStateStoreProvider.java | 14 +-
.../apache/kafka/streams/TopologyTestDriver.java | 91 ++++---
.../kafka/streams/TopologyTestDriverTest.java | 293 +++++++++++++++------
6 files changed, 286 insertions(+), 150 deletions(-)
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index defa1e51e55..58e77235515 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -202,7 +202,10 @@
files="(RecordCollectorTest|StreamsPartitionAssignorTest|StreamThreadTest|StreamTaskTest|TaskManagerTest|TopologyTestDriverTest|KafkaStreamsTest|EosIntegrationTest|RestoreIntegrationTest).java"/>
<suppress checks="MethodLength"
-
files="(EosIntegrationTest|KStreamKStreamJoinTest|RocksDBWindowStoreTest).java"/>
+
files="(EosIntegrationTest|KStreamKStreamJoinTest|KStreamKStreamLeftJoinTest|KStreamKStreamOuterJoinTest|KStreamSlidingWindowAggregateTest|KTableKTableForeignKeyJoinIntegrationTest|RocksDBWindowStoreTest|TopologyTestDriverTest).java"/>
+
+ <suppress checks="ParameterNumber"
+ files="(TopologyTestDriverTest).java"/>
<suppress checks="ClassDataAbstractionCoupling"
files=".*[/\\]streams[/\\].*test[/\\].*.java"/>
@@ -211,7 +214,7 @@
files="(KStreamKStreamJoinTest|KTableKTableForeignKeyJoinIntegrationTest|RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest|RelationalSmokeTest|MockProcessorContextStateStoreTest|IQv2StoreIntegrationTest|StreamsConfigTest).java"/>
<suppress checks="JavaNCSS"
-
files="(KStreamKStreamJoinTest|StreamThreadTest|TaskManagerTest|StreamTaskTest).java"/>
+
files="(KStreamKStreamJoinTest|StreamTaskTest|StreamThreadTest|TaskManagerTest|TopologyTestDriverTest).java"/>
<suppress checks="NPathComplexity"
files="(KStreamKStreamJoinTest|KTableKTableForeignKeyJoinIntegrationTest|RelationalSmokeTest|MockProcessorContextStateStoreTest|TopologyTestDriverTest|IQv2StoreIntegrationTest).java"/>
@@ -219,9 +222,6 @@
<suppress
checks="(FinalLocalVariable|WhitespaceAround|LocalVariableName|ImportControl)"
files="Murmur3Test.java"/>
- <suppress checks="MethodLength"
-
files="(KStreamSlidingWindowAggregateTest|KStreamKStreamLeftJoinTest|KStreamKStreamOuterJoinTest|KTableKTableForeignKeyJoinIntegrationTest).java"/>
-
<!-- Streams test-utils -->
<suppress checks="ClassFanOutComplexity|ClassDataAbstractionCoupling"
files="TopologyTestDriver.java"/>
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
index d567d525dc3..dfa2a2e2bd0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
@@ -712,12 +712,11 @@ public final class Stores {
* @return an instance of {@link StoreBuilder} than can build a {@link
SessionStoreWithHeaders}
*/
public static <K, V> StoreBuilder<SessionStoreWithHeaders<K, V>>
sessionStoreWithHeadersBuilder(
- final SessionBytesStoreSupplier supplier,
- final Serde<K> keySerde,
- final Serde<V> valueSerde
+ final SessionBytesStoreSupplier supplier,
+ final Serde<K> keySerde,
+ final Serde<V> valueSerde
) {
Objects.requireNonNull(supplier, "supplier cannot be null");
return new SessionStoreWithHeadersBuilder<>(supplier, keySerde,
valueSerde, Time.SYSTEM);
}
-
}
\ No newline at end of file
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProvider.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProvider.java
index 5285b974759..b4ce0a01cd3 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProvider.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProvider.java
@@ -49,29 +49,24 @@ public class GlobalStateStoreProvider implements
StateStoreProvider {
}
if (store instanceof TimestampedKeyValueStoreWithHeaders) {
if (queryableStoreType instanceof
QueryableStoreTypes.KeyValueStoreType) {
- return (List<T>) Collections.singletonList(new
GenericReadOnlyKeyValueStoreFacade<>((TimestampedKeyValueStoreWithHeaders<Object,
Object>) store, ValueConverters.extractValueFromHeaders()));
+ return (List<T>) Collections.singletonList(new
GenericReadOnlyKeyValueStoreFacade<>((TimestampedKeyValueStoreWithHeaders<?,
?>) store, ValueConverters.extractValueFromHeaders()));
} else if (queryableStoreType instanceof
QueryableStoreTypes.TimestampedKeyValueStoreType) {
- return (List<T>) Collections.singletonList(new
GenericReadOnlyKeyValueStoreFacade<>((TimestampedKeyValueStoreWithHeaders<Object,
Object>) store, ValueConverters.extractValueAndTimestampFromHeaders()));
- } else {
- // For custom query types, return the raw store so they can
access headers directly
- return (List<T>) Collections.singletonList(store);
+ return (List<T>) Collections.singletonList(new
GenericReadOnlyKeyValueStoreFacade<>((TimestampedKeyValueStoreWithHeaders<?,
?>) store, ValueConverters.extractValueAndTimestampFromHeaders()));
}
} else if (store instanceof TimestampedKeyValueStore &&
queryableStoreType instanceof QueryableStoreTypes.KeyValueStoreType) {
- return (List<T>) Collections.singletonList(new
GenericReadOnlyKeyValueStoreFacade<>((TimestampedKeyValueStore<Object, Object>)
store, ValueConverters.extractValue()));
+ return (List<T>) Collections.singletonList(new
GenericReadOnlyKeyValueStoreFacade<>((TimestampedKeyValueStore<?, ?>) store,
ValueConverters.extractValue()));
} else if (store instanceof TimestampedWindowStoreWithHeaders) {
if (queryableStoreType instanceof
QueryableStoreTypes.WindowStoreType) {
- return (List<T>) Collections.singletonList(new
GenericReadOnlyWindowStoreFacade<>((TimestampedWindowStoreWithHeaders<Object,
Object>) store, ValueConverters.extractValueFromHeaders()));
+ return (List<T>) Collections.singletonList(new
GenericReadOnlyWindowStoreFacade<>((TimestampedWindowStoreWithHeaders<?, ?>)
store, ValueConverters.extractValueFromHeaders()));
} else if (queryableStoreType instanceof
QueryableStoreTypes.TimestampedWindowStoreType) {
- return (List<T>) Collections.singletonList(new
GenericReadOnlyWindowStoreFacade<>((TimestampedWindowStoreWithHeaders<Object,
Object>) store, ValueConverters.extractValueAndTimestampFromHeaders()));
- } else {
- // For custom query types, return the raw store so they can
access headers directly
- return (List<T>) Collections.singletonList(store);
+ return (List<T>) Collections.singletonList(new
GenericReadOnlyWindowStoreFacade<>((TimestampedWindowStoreWithHeaders<?, ?>)
store, ValueConverters.extractValueAndTimestampFromHeaders()));
}
} else if (store instanceof TimestampedWindowStore &&
queryableStoreType instanceof QueryableStoreTypes.WindowStoreType) {
- return (List<T>) Collections.singletonList(new
GenericReadOnlyWindowStoreFacade<>((TimestampedWindowStore<Object, Object>)
store, ValueConverters.extractValue()));
+ return (List<T>) Collections.singletonList(new
GenericReadOnlyWindowStoreFacade<>((TimestampedWindowStore<?, ?>) store,
ValueConverters.extractValue()));
} else if (store instanceof SessionStoreWithHeaders &&
queryableStoreType instanceof QueryableStoreTypes.SessionStoreType) {
- return (List<T>) Collections.singletonList(new
ReadOnlySessionStoreFacade<>((SessionStoreWithHeaders<Object, Object>) store));
+ return (List<T>) Collections.singletonList(new
ReadOnlySessionStoreFacade<>((SessionStoreWithHeaders<?, ?>) store));
}
+
return (List<T>) Collections.singletonList(store);
}
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java
index f9dd0250563..321b9bf60a2 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java
@@ -108,22 +108,22 @@ public class StreamThreadStateStoreProvider {
}
if (store instanceof TimestampedKeyValueStoreWithHeaders) {
if (queryableStoreType instanceof
QueryableStoreTypes.KeyValueStoreType) {
- return (T) new
GenericReadOnlyKeyValueStoreFacade<>((TimestampedKeyValueStoreWithHeaders<Object,
Object>) store, ValueConverters.extractValueFromHeaders());
+ return (T) new
GenericReadOnlyKeyValueStoreFacade<>((TimestampedKeyValueStoreWithHeaders<?,
?>) store, ValueConverters.extractValueFromHeaders());
} else if (queryableStoreType instanceof
QueryableStoreTypes.TimestampedKeyValueStoreType) {
- return (T) new
GenericReadOnlyKeyValueStoreFacade<>((TimestampedKeyValueStoreWithHeaders<Object,
Object>) store, ValueConverters.extractValueAndTimestampFromHeaders());
+ return (T) new
GenericReadOnlyKeyValueStoreFacade<>((TimestampedKeyValueStoreWithHeaders<?,
?>) store, ValueConverters.extractValueAndTimestampFromHeaders());
}
} else if (store instanceof TimestampedKeyValueStore &&
queryableStoreType instanceof QueryableStoreTypes.KeyValueStoreType) {
- return (T) new
GenericReadOnlyKeyValueStoreFacade<>((TimestampedKeyValueStore<Object, Object>)
store, ValueConverters.extractValue());
+ return (T) new
GenericReadOnlyKeyValueStoreFacade<>((TimestampedKeyValueStore<?, ?>) store,
ValueConverters.extractValue());
} else if (store instanceof TimestampedWindowStoreWithHeaders) {
if (queryableStoreType instanceof
QueryableStoreTypes.WindowStoreType) {
- return (T) new
GenericReadOnlyWindowStoreFacade<>((TimestampedWindowStoreWithHeaders<Object,
Object>) store, ValueConverters.extractValueFromHeaders());
+ return (T) new
GenericReadOnlyWindowStoreFacade<>((TimestampedWindowStoreWithHeaders<?, ?>)
store, ValueConverters.extractValueFromHeaders());
} else if (queryableStoreType instanceof
QueryableStoreTypes.TimestampedWindowStoreType) {
- return (T) new
GenericReadOnlyWindowStoreFacade<>((TimestampedWindowStoreWithHeaders<Object,
Object>) store, ValueConverters.extractValueAndTimestampFromHeaders());
+ return (T) new
GenericReadOnlyWindowStoreFacade<>((TimestampedWindowStoreWithHeaders<?, ?>)
store, ValueConverters.extractValueAndTimestampFromHeaders());
}
} else if (store instanceof TimestampedWindowStore &&
queryableStoreType instanceof QueryableStoreTypes.WindowStoreType) {
- return (T) new
GenericReadOnlyWindowStoreFacade<>((TimestampedWindowStore<Object, Object>)
store, ValueConverters.extractValue());
+ return (T) new
GenericReadOnlyWindowStoreFacade<>((TimestampedWindowStore<?, ?>) store,
ValueConverters.extractValue());
} else if (store instanceof SessionStoreWithHeaders &&
queryableStoreType instanceof QueryableStoreTypes.SessionStoreType) {
- return (T) new
ReadOnlySessionStoreFacade<>((SessionStoreWithHeaders<Object, Object>) store);
+ return (T) new
ReadOnlySessionStoreFacade<>((SessionStoreWithHeaders<?, ?>) store);
}
return (T) store;
diff --git
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index ae56c798776..b7300f14454 100644
---
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -49,13 +49,11 @@ import
org.apache.kafka.streams.internals.StreamsConfigUtils;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.Punctuator;
-import org.apache.kafka.streams.processor.StandbyUpdateListener.SuspendReason;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.api.ProcessorContext;
-import org.apache.kafka.streams.processor.internals.ChangelogRegister;
import org.apache.kafka.streams.processor.internals.ClientUtils;
import org.apache.kafka.streams.processor.internals.GlobalProcessorContextImpl;
import org.apache.kafka.streams.processor.internals.GlobalStateManager;
@@ -111,7 +109,6 @@ import java.io.Closeable;
import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
-import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -891,12 +888,13 @@ public class TopologyTestDriver implements Closeable {
* @see #getStateStore(String)
* @see #getKeyValueStore(String)
* @see #getTimestampedKeyValueStore(String)
- * @see #getVersionedKeyValueStore(String)
* @see #getTimestampedKeyValueStoreWithHeaders(String)
+ * @see #getVersionedKeyValueStore(String)
* @see #getWindowStore(String)
* @see #getTimestampedWindowStore(String)
* @see #getTimestampedWindowStoreWithHeaders(String)
* @see #getSessionStore(String)
+ * @see #getSessionStoreWithHeaders(String)
*/
public Map<String, StateStore> getAllStateStores() {
final Map<String, StateStore> allStores = new HashMap<>();
@@ -924,12 +922,13 @@ public class TopologyTestDriver implements Closeable {
* @see #getAllStateStores()
* @see #getKeyValueStore(String)
* @see #getTimestampedKeyValueStore(String)
- * @see #getVersionedKeyValueStore(String)
* @see #getTimestampedKeyValueStoreWithHeaders(String)
+ * @see #getVersionedKeyValueStore(String)
* @see #getWindowStore(String)
* @see #getTimestampedWindowStore(String)
* @see #getTimestampedWindowStoreWithHeaders(String)
* @see #getSessionStore(String)
+ * @see #getSessionStoreWithHeaders(String)
*/
public StateStore getStateStore(final String name) throws
IllegalArgumentException {
return getStateStore(name, true);
@@ -1006,12 +1005,13 @@ public class TopologyTestDriver implements Closeable {
* @see #getAllStateStores()
* @see #getStateStore(String)
* @see #getTimestampedKeyValueStore(String)
- * @see #getVersionedKeyValueStore(String)
* @see #getTimestampedKeyValueStoreWithHeaders(String)
+ * @see #getVersionedKeyValueStore(String)
* @see #getWindowStore(String)
* @see #getTimestampedWindowStore(String)
* @see #getTimestampedWindowStoreWithHeaders(String)
* @see #getSessionStore(String)
+ * @see #getSessionStoreWithHeaders(String)
*/
@SuppressWarnings("unchecked")
public <K, V> KeyValueStore<K, V> getKeyValueStore(final String name) {
@@ -1039,12 +1039,13 @@ public class TopologyTestDriver implements Closeable {
* @see #getAllStateStores()
* @see #getStateStore(String)
* @see #getKeyValueStore(String)
- * @see #getVersionedKeyValueStore(String)
* @see #getTimestampedKeyValueStoreWithHeaders(String)
+ * @see #getVersionedKeyValueStore(String)
* @see #getWindowStore(String)
* @see #getTimestampedWindowStore(String)
* @see #getTimestampedWindowStoreWithHeaders(String)
* @see #getSessionStore(String)
+ * @see #getSessionStoreWithHeaders(String)
*/
@SuppressWarnings("unchecked")
public <K, V> KeyValueStore<K, ValueAndTimestamp<V>>
getTimestampedKeyValueStore(final String name) {
@@ -1056,53 +1057,55 @@ public class TopologyTestDriver implements Closeable {
}
/**
- * Get the {@link VersionedKeyValueStore} with the given name.
+ * Get the {@link TimestampedKeyValueStoreWithHeaders} with the given name.
* The store can be a "regular" or global store.
* <p>
* This is often useful in test cases to pre-populate the store before the
test case instructs the topology to
* {@link TestInputTopic#pipeInput(TestRecord) process an input message},
and/or to check the store afterward.
*
* @param name the name of the store
- * @return the key value store, or {@code null} if no {@link
VersionedKeyValueStore} has been registered with the given name
+ * @return the key value store, or {@code null} if no {@link
TimestampedKeyValueStoreWithHeaders} has been registered with the given name
* @see #getAllStateStores()
* @see #getStateStore(String)
* @see #getKeyValueStore(String)
* @see #getTimestampedKeyValueStore(String)
- * @see #getTimestampedKeyValueStoreWithHeaders(String)
+ * @see #getVersionedKeyValueStore(String)
* @see #getWindowStore(String)
* @see #getTimestampedWindowStore(String)
* @see #getTimestampedWindowStoreWithHeaders(String)
* @see #getSessionStore(String)
+ * @see #getSessionStoreWithHeaders(String)
*/
@SuppressWarnings("unchecked")
- public <K, V> VersionedKeyValueStore<K, V> getVersionedKeyValueStore(final
String name) {
+ public <K, V> KeyValueStore<K, ValueTimestampHeaders<V>>
getTimestampedKeyValueStoreWithHeaders(final String name) {
final StateStore store = getStateStore(name, false);
- return store instanceof VersionedKeyValueStore ?
(VersionedKeyValueStore<K, V>) store : null;
+ return store instanceof TimestampedKeyValueStoreWithHeaders ?
(TimestampedKeyValueStoreWithHeaders<K, V>) store : null;
}
/**
- * Get the {@link TimestampedKeyValueStoreWithHeaders} with the given name.
+ * Get the {@link VersionedKeyValueStore} with the given name.
* The store can be a "regular" or global store.
* <p>
* This is often useful in test cases to pre-populate the store before the
test case instructs the topology to
* {@link TestInputTopic#pipeInput(TestRecord) process an input message},
and/or to check the store afterward.
*
* @param name the name of the store
- * @return the key value store, or {@code null} if no {@link
TimestampedKeyValueStoreWithHeaders} has been registered with the given name
+ * @return the key value store, or {@code null} if no {@link
VersionedKeyValueStore} has been registered with the given name
* @see #getAllStateStores()
* @see #getStateStore(String)
* @see #getKeyValueStore(String)
* @see #getTimestampedKeyValueStore(String)
- * @see #getVersionedKeyValueStore(String)
+ * @see #getTimestampedKeyValueStoreWithHeaders(String)
* @see #getWindowStore(String)
* @see #getTimestampedWindowStore(String)
* @see #getTimestampedWindowStoreWithHeaders(String)
* @see #getSessionStore(String)
+ * @see #getSessionStoreWithHeaders(String)
*/
@SuppressWarnings("unchecked")
- public <K, V> KeyValueStore<K, ValueTimestampHeaders<V>>
getTimestampedKeyValueStoreWithHeaders(final String name) {
+ public <K, V> VersionedKeyValueStore<K, V> getVersionedKeyValueStore(final
String name) {
final StateStore store = getStateStore(name, false);
- return store instanceof TimestampedKeyValueStoreWithHeaders ?
(TimestampedKeyValueStoreWithHeaders<K, V>) store : null;
+ return store instanceof VersionedKeyValueStore ?
(VersionedKeyValueStore<K, V>) store : null;
}
/**
@@ -1123,11 +1126,12 @@ public class TopologyTestDriver implements Closeable {
* @see #getStateStore(String)
* @see #getKeyValueStore(String)
* @see #getTimestampedKeyValueStore(String)
- * @see #getVersionedKeyValueStore(String)
* @see #getTimestampedKeyValueStoreWithHeaders(String)
+ * @see #getVersionedKeyValueStore(String)
* @see #getTimestampedWindowStore(String)
* @see #getTimestampedWindowStoreWithHeaders(String)
* @see #getSessionStore(String)
+ * @see #getSessionStoreWithHeaders(String)
*/
@SuppressWarnings("unchecked")
public <K, V> WindowStore<K, V> getWindowStore(final String name) {
@@ -1156,10 +1160,12 @@ public class TopologyTestDriver implements Closeable {
* @see #getStateStore(String)
* @see #getKeyValueStore(String)
* @see #getTimestampedKeyValueStore(String)
- * @see #getVersionedKeyValueStore(String)
* @see #getTimestampedKeyValueStoreWithHeaders(String)
+ * @see #getVersionedKeyValueStore(String)
* @see #getWindowStore(String)
+ * @see #getTimestampedWindowStoreWithHeaders(String)
* @see #getSessionStore(String)
+ * @see #getSessionStoreWithHeaders(String)
*/
@SuppressWarnings("unchecked")
public <K, V> WindowStore<K, ValueAndTimestamp<V>>
getTimestampedWindowStore(final String name) {
@@ -1183,11 +1189,12 @@ public class TopologyTestDriver implements Closeable {
* @see #getStateStore(String)
* @see #getKeyValueStore(String)
* @see #getTimestampedKeyValueStore(String)
- * @see #getVersionedKeyValueStore(String)
* @see #getTimestampedKeyValueStoreWithHeaders(String)
+ * @see #getVersionedKeyValueStore(String)
* @see #getWindowStore(String)
* @see #getTimestampedWindowStore(String)
* @see #getSessionStore(String)
+ * @see #getSessionStoreWithHeaders(String)
*/
@SuppressWarnings("unchecked")
public <K, V> WindowStore<K, ValueTimestampHeaders<V>>
getTimestampedWindowStoreWithHeaders(final String name) {
@@ -1208,11 +1215,12 @@ public class TopologyTestDriver implements Closeable {
* @see #getStateStore(String)
* @see #getKeyValueStore(String)
* @see #getTimestampedKeyValueStore(String)
+ * @see #getTimestampedKeyValueStoreWithHeaders(String)
* @see #getVersionedKeyValueStore(String)
* @see #getWindowStore(String)
* @see #getTimestampedWindowStore(String)
- * @see #getTimestampedKeyValueStoreWithHeaders(String)
* @see #getTimestampedWindowStoreWithHeaders(String)
+ * @see #getSessionStoreWithHeaders(String)
*/
@SuppressWarnings("unchecked")
public <K, V> SessionStore<K, V> getSessionStore(final String name) {
@@ -1223,6 +1231,32 @@ public class TopologyTestDriver implements Closeable {
return store instanceof SessionStore ? (SessionStore<K, V>) store :
null;
}
+ /**
+ * Get the {@link SessionStore} with the given name.
+ * The store can be a "regular" or global store.
+ * <p>
+ * This is often useful in test cases to pre-populate the store before the
test case instructs the topology to
+ * {@link TestInputTopic#pipeInput(TestRecord) process an input message},
and/or to check the store afterward.
+ *
+ * @param name the name of the store
+ * @return the key value store, or {@code null} if no {@link SessionStore}
has been registered with the given name
+ * @see #getAllStateStores()
+ * @see #getStateStore(String)
+ * @see #getKeyValueStore(String)
+ * @see #getTimestampedKeyValueStore(String)
+ * @see #getTimestampedKeyValueStoreWithHeaders(String)
+ * @see #getVersionedKeyValueStore(String)
+ * @see #getWindowStore(String)
+ * @see #getTimestampedWindowStore(String)
+ * @see #getTimestampedWindowStoreWithHeaders(String)
+ * @see #getSessionStore(String)
+ */
+ @SuppressWarnings("unchecked")
+ public <K, V> SessionStoreWithHeaders<K, V>
getSessionStoreWithHeaders(final String name) {
+ final StateStore store = getStateStore(name, false);
+ return store instanceof SessionStoreWithHeaders ?
(SessionStoreWithHeaders<K, V>) store : null;
+ }
+
/**
* Close the driver, its topology, and all processors.
*/
@@ -1251,21 +1285,6 @@ public class TopologyTestDriver implements Closeable {
stateDirectory.clean();
}
- static class MockChangelogRegister implements ChangelogRegister {
- @Override
- public void register(final TopicPartition partition, final
ProcessorStateManager stateManager) { }
-
- @Override
- public void register(final Set<TopicPartition> changelogPartitions,
final ProcessorStateManager stateManager) { }
-
- @Override
- public void unregister(final Collection<TopicPartition> partitions) { }
-
- @Override
- public void unregister(final Collection<TopicPartition> partitions,
- final SuspendReason reason) { }
- }
-
static class MockTime implements Time {
private final AtomicLong timeMs;
private final AtomicLong highResTimeNs;
diff --git
a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
index 2dc0089990e..7555c078330 100644
---
a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
+++
b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
@@ -24,6 +24,10 @@ import
org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.DoubleDeserializer;
+import org.apache.kafka.common.serialization.DoubleSerializer;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serdes;
@@ -386,7 +390,6 @@ public abstract class TopologyTestDriverTest {
() -> new Processor<Object, Object, Void, Void>() {
KeyValueStore<Object, Object> store;
- @SuppressWarnings("unchecked")
@Override
public void init(final ProcessorContext<Void, Void>
context) {
store = context.getStateStore(sourceTopicName +
"-globalStore");
@@ -602,6 +605,7 @@ public abstract class TopologyTestDriverTest {
assertThat(record, equalTo(expectedResult));
}
+ @SuppressWarnings("resource")
@Test
public void shouldUseSourceSpecificDeserializers() {
final Topology topology = new Topology();
@@ -610,23 +614,23 @@ public abstract class TopologyTestDriverTest {
final String sourceName2 = "source-2";
final String processor = "processor";
- topology.addSource(sourceName1, Serdes.Long().deserializer(),
Serdes.String().deserializer(), SOURCE_TOPIC_1);
- topology.addSource(sourceName2, Serdes.Integer().deserializer(),
Serdes.Double().deserializer(), SOURCE_TOPIC_2);
+ topology.addSource(sourceName1, new LongDeserializer(), new
StringDeserializer(), SOURCE_TOPIC_1);
+ topology.addSource(sourceName2, new IntegerDeserializer(), new
DoubleDeserializer(), SOURCE_TOPIC_2);
topology.addProcessor(processor, new MockProcessorSupplier(),
sourceName1, sourceName2);
topology.addSink(
"sink",
SINK_TOPIC_1,
(topic, data) -> {
if (data instanceof Long) {
- return Serdes.Long().serializer().serialize(topic, (Long)
data);
+ return new LongSerializer().serialize(topic, (Long) data);
}
- return Serdes.Integer().serializer().serialize(topic,
(Integer) data);
+ return new IntegerSerializer().serialize(topic, (Integer)
data);
},
(topic, data) -> {
if (data instanceof String) {
- return Serdes.String().serializer().serialize(topic,
(String) data);
+ return new StringSerializer().serialize(topic, (String)
data);
}
- return Serdes.Double().serializer().serialize(topic, (Double)
data);
+ return new DoubleSerializer().serialize(topic, (Double) data);
},
processor);
@@ -642,21 +646,21 @@ public abstract class TopologyTestDriverTest {
testDriver.pipeRecord(SOURCE_TOPIC_1,
consumerRecord1,
- Serdes.Long().serializer(),
- Serdes.String().serializer(),
+ new LongSerializer(),
+ new StringSerializer(),
Instant.now());
final TestRecord<Long, String> result1 =
- testDriver.readRecord(SINK_TOPIC_1, Serdes.Long().deserializer(),
Serdes.String().deserializer());
+ testDriver.readRecord(SINK_TOPIC_1, new LongDeserializer(), new
StringDeserializer());
assertThat(result1.getKey(), equalTo(source1Key));
assertThat(result1.getValue(), equalTo(source1Value));
testDriver.pipeRecord(SOURCE_TOPIC_2,
consumerRecord2,
- Serdes.Integer().serializer(),
- Serdes.Double().serializer(),
+ new IntegerSerializer(),
+ new DoubleSerializer(),
Instant.now());
final TestRecord<Integer, Double> result2 =
- testDriver.readRecord(SINK_TOPIC_1,
Serdes.Integer().deserializer(), Serdes.Double().deserializer());
+ testDriver.readRecord(SINK_TOPIC_1, new IntegerDeserializer(), new
DoubleDeserializer());
assertThat(result2.getKey(), equalTo(source2Key));
assertThat(result2.getValue(), equalTo(source2Value));
}
@@ -718,10 +722,10 @@ public abstract class TopologyTestDriverTest {
final String sourceName1 = "source-1";
final String sourceName2 = "source-2";
- topology.addSource(sourceName1, Serdes.Long().deserializer(),
Serdes.String().deserializer(), SOURCE_TOPIC_1);
- topology.addSource(sourceName2, Serdes.Integer().deserializer(),
Serdes.Double().deserializer(), SOURCE_TOPIC_2);
- topology.addSink("sink-1", SINK_TOPIC_1, Serdes.Long().serializer(),
Serdes.String().serializer(), sourceName1);
- topology.addSink("sink-2", SINK_TOPIC_2,
Serdes.Integer().serializer(), Serdes.Double().serializer(), sourceName2);
+ topology.addSource(sourceName1, new LongDeserializer(), new
StringDeserializer(), SOURCE_TOPIC_1);
+ topology.addSource(sourceName2, new IntegerDeserializer(), new
DoubleDeserializer(), SOURCE_TOPIC_2);
+ topology.addSink("sink-1", SINK_TOPIC_1, new LongSerializer(), new
StringSerializer(), sourceName1);
+ topology.addSink("sink-2", SINK_TOPIC_2, new IntegerSerializer(), new
DoubleSerializer(), sourceName2);
testDriver = new TopologyTestDriver(topology);
@@ -735,21 +739,21 @@ public abstract class TopologyTestDriverTest {
testDriver.pipeRecord(SOURCE_TOPIC_1,
consumerRecord1,
- Serdes.Long().serializer(),
- Serdes.String().serializer(),
+ new LongSerializer(),
+ new StringSerializer(),
Instant.now());
final TestRecord<Long, String> result1 =
- testDriver.readRecord(SINK_TOPIC_1,
Serdes.Long().deserializer(), Serdes.String().deserializer());
+ testDriver.readRecord(SINK_TOPIC_1, new LongDeserializer(),
new StringDeserializer());
assertThat(result1.getKey(), equalTo(source1Key));
assertThat(result1.getValue(), equalTo(source1Value));
testDriver.pipeRecord(SOURCE_TOPIC_2,
consumerRecord2,
- Serdes.Integer().serializer(),
- Serdes.Double().serializer(),
+ new IntegerSerializer(),
+ new DoubleSerializer(),
Instant.now());
final TestRecord<Integer, Double> result2 =
- testDriver.readRecord(SINK_TOPIC_2,
Serdes.Integer().deserializer(), Serdes.Double().deserializer());
+ testDriver.readRecord(SINK_TOPIC_2, new IntegerDeserializer(),
new DoubleDeserializer());
assertThat(result2.getKey(), equalTo(source2Key));
assertThat(result2.getValue(), equalTo(source2Value));
}
@@ -880,8 +884,8 @@ public abstract class TopologyTestDriverTest {
Serdes.ByteArray(),
Time.SYSTEM).withLoggingDisabled(),
"sourceProcessorName",
- Serdes.ByteArray().deserializer(),
- Serdes.ByteArray().deserializer(),
+ new ByteArrayDeserializer(),
+ new ByteArrayDeserializer(),
"globalTopicName",
"globalProcessorName",
voidProcessorSupplier);
@@ -911,12 +915,16 @@ public abstract class TopologyTestDriverTest {
private void shouldReturnCorrectStoreTypeOnly(final boolean persistent) {
final String keyValueStoreName = "keyValueStore";
final String timestampedKeyValueStoreName = "keyValueTimestampStore";
+ final String timestampedKeyValueStoreWithHeaderName =
"keyValueTimestampStoreWithHeaders";
final String versionedKeyValueStoreName = "keyValueVersionedStore";
final String windowStoreName = "windowStore";
final String timestampedWindowStoreName = "windowTimestampStore";
+ final String timestampedWindowStoreWithHeadersName =
"windowTimestampStoreWithHeaders";
final String sessionStoreName = "sessionStore";
+ final String sessionStoreWithHeadersName = "sessionStoreWithHeaders";
final String globalKeyValueStoreName = "globalKeyValueStore";
final String globalTimestampedKeyValueStoreName =
"globalKeyValueTimestampStore";
+ final String globalTimestampedKeyValueStoreWithHeadersName =
"globalKeyValueTimestampStoreWithHeaders";
final String globalVersionedKeyValueStoreName =
"globalKeyValueVersionedStore";
final Topology topology = setupSingleProcessorTopology();
@@ -925,12 +933,16 @@ public abstract class TopologyTestDriverTest {
persistent,
keyValueStoreName,
timestampedKeyValueStoreName,
+ timestampedKeyValueStoreWithHeaderName,
versionedKeyValueStoreName,
windowStoreName,
timestampedWindowStoreName,
+ timestampedWindowStoreWithHeadersName,
sessionStoreName,
+ sessionStoreWithHeadersName,
globalKeyValueStoreName,
globalTimestampedKeyValueStoreName,
+ globalTimestampedKeyValueStoreWithHeadersName,
globalVersionedKeyValueStoreName);
@@ -939,70 +951,137 @@ public abstract class TopologyTestDriverTest {
// verify state stores
assertNotNull(testDriver.getKeyValueStore(keyValueStoreName));
assertNull(testDriver.getTimestampedKeyValueStore(keyValueStoreName));
+
assertNull(testDriver.getTimestampedKeyValueStoreWithHeaders(keyValueStoreName));
assertNull(testDriver.getVersionedKeyValueStore(keyValueStoreName));
assertNull(testDriver.getWindowStore(keyValueStoreName));
assertNull(testDriver.getTimestampedWindowStore(keyValueStoreName));
+
assertNull(testDriver.getTimestampedWindowStoreWithHeaders(keyValueStoreName));
assertNull(testDriver.getSessionStore(keyValueStoreName));
+ assertNull(testDriver.getSessionStoreWithHeaders(keyValueStoreName));
assertNotNull(testDriver.getKeyValueStore(timestampedKeyValueStoreName));
assertNotNull(testDriver.getTimestampedKeyValueStore(timestampedKeyValueStoreName));
+
assertNull(testDriver.getTimestampedKeyValueStoreWithHeaders(timestampedKeyValueStoreName));
assertNull(testDriver.getVersionedKeyValueStore(timestampedKeyValueStoreName));
assertNull(testDriver.getWindowStore(timestampedKeyValueStoreName));
assertNull(testDriver.getTimestampedWindowStore(timestampedKeyValueStoreName));
+
assertNull(testDriver.getTimestampedWindowStoreWithHeaders(timestampedKeyValueStoreName));
assertNull(testDriver.getSessionStore(timestampedKeyValueStoreName));
+
assertNull(testDriver.getSessionStoreWithHeaders(timestampedKeyValueStoreName));
if (persistent) { // versioned stores do not offer an in-memory
version yet, so nothing to test/verify unless persistent
assertNull(testDriver.getKeyValueStore(versionedKeyValueStoreName));
assertNull(testDriver.getTimestampedKeyValueStore(versionedKeyValueStoreName));
+
assertNull(testDriver.getTimestampedKeyValueStoreWithHeaders(versionedKeyValueStoreName));
assertNotNull(testDriver.getVersionedKeyValueStore(versionedKeyValueStoreName));
assertNull(testDriver.getWindowStore(versionedKeyValueStoreName));
assertNull(testDriver.getTimestampedWindowStore(versionedKeyValueStoreName));
+
assertNull(testDriver.getTimestampedWindowStoreWithHeaders(versionedKeyValueStoreName));
assertNull(testDriver.getSessionStore(versionedKeyValueStoreName));
+
assertNull(testDriver.getSessionStoreWithHeaders(versionedKeyValueStoreName));
}
+
assertNotNull(testDriver.getKeyValueStore(timestampedKeyValueStoreWithHeaderName));
+
assertNotNull(testDriver.getTimestampedKeyValueStore(timestampedKeyValueStoreWithHeaderName));
+
assertNotNull(testDriver.getTimestampedKeyValueStoreWithHeaders(timestampedKeyValueStoreWithHeaderName));
+
assertNull(testDriver.getVersionedKeyValueStore(timestampedKeyValueStoreWithHeaderName));
+
assertNull(testDriver.getWindowStore(timestampedKeyValueStoreWithHeaderName));
+
assertNull(testDriver.getTimestampedWindowStore(timestampedKeyValueStoreWithHeaderName));
+
assertNull(testDriver.getTimestampedWindowStoreWithHeaders(timestampedKeyValueStoreWithHeaderName));
+
assertNull(testDriver.getSessionStore(timestampedKeyValueStoreWithHeaderName));
+
assertNull(testDriver.getSessionStoreWithHeaders(timestampedKeyValueStoreWithHeaderName));
+
assertNull(testDriver.getKeyValueStore(windowStoreName));
assertNull(testDriver.getTimestampedKeyValueStore(windowStoreName));
+
assertNull(testDriver.getTimestampedKeyValueStoreWithHeaders(windowStoreName));
assertNull(testDriver.getVersionedKeyValueStore(windowStoreName));
assertNotNull(testDriver.getWindowStore(windowStoreName));
assertNull(testDriver.getTimestampedWindowStore(windowStoreName));
+
assertNull(testDriver.getTimestampedWindowStoreWithHeaders(windowStoreName));
assertNull(testDriver.getSessionStore(windowStoreName));
+ assertNull(testDriver.getSessionStoreWithHeaders(windowStoreName));
assertNull(testDriver.getKeyValueStore(timestampedWindowStoreName));
assertNull(testDriver.getTimestampedKeyValueStore(timestampedWindowStoreName));
+
assertNull(testDriver.getTimestampedKeyValueStoreWithHeaders(timestampedWindowStoreName));
assertNull(testDriver.getVersionedKeyValueStore(timestampedWindowStoreName));
assertNotNull(testDriver.getWindowStore(timestampedWindowStoreName));
assertNotNull(testDriver.getTimestampedWindowStore(timestampedWindowStoreName));
+
assertNull(testDriver.getTimestampedWindowStoreWithHeaders(timestampedWindowStoreName));
assertNull(testDriver.getSessionStore(timestampedWindowStoreName));
+
assertNull(testDriver.getSessionStoreWithHeaders(timestampedWindowStoreName));
+
+
assertNull(testDriver.getKeyValueStore(timestampedWindowStoreWithHeadersName));
+
assertNull(testDriver.getTimestampedKeyValueStore(timestampedWindowStoreWithHeadersName));
+
assertNull(testDriver.getTimestampedKeyValueStoreWithHeaders(timestampedWindowStoreWithHeadersName));
+
assertNull(testDriver.getVersionedKeyValueStore(timestampedWindowStoreWithHeadersName));
+
assertNotNull(testDriver.getWindowStore(timestampedWindowStoreWithHeadersName));
+
assertNotNull(testDriver.getTimestampedWindowStore(timestampedWindowStoreWithHeadersName));
+
assertNotNull(testDriver.getTimestampedWindowStoreWithHeaders(timestampedWindowStoreWithHeadersName));
+
assertNull(testDriver.getSessionStore(timestampedWindowStoreWithHeadersName));
+
assertNull(testDriver.getSessionStoreWithHeaders(timestampedWindowStoreWithHeadersName));
assertNull(testDriver.getKeyValueStore(sessionStoreName));
assertNull(testDriver.getTimestampedKeyValueStore(sessionStoreName));
+
assertNull(testDriver.getTimestampedKeyValueStoreWithHeaders(sessionStoreName));
assertNull(testDriver.getVersionedKeyValueStore(sessionStoreName));
assertNull(testDriver.getWindowStore(sessionStoreName));
assertNull(testDriver.getTimestampedWindowStore(sessionStoreName));
+
assertNull(testDriver.getTimestampedWindowStoreWithHeaders(sessionStoreName));
assertNotNull(testDriver.getSessionStore(sessionStoreName));
+ assertNull(testDriver.getSessionStoreWithHeaders(sessionStoreName));
+
+ assertNull(testDriver.getKeyValueStore(sessionStoreWithHeadersName));
+
assertNull(testDriver.getTimestampedKeyValueStore(sessionStoreWithHeadersName));
+
assertNull(testDriver.getTimestampedKeyValueStoreWithHeaders(sessionStoreWithHeadersName));
+
assertNull(testDriver.getVersionedKeyValueStore(sessionStoreWithHeadersName));
+ assertNull(testDriver.getWindowStore(sessionStoreWithHeadersName));
+
assertNull(testDriver.getTimestampedWindowStore(sessionStoreWithHeadersName));
+
assertNull(testDriver.getTimestampedWindowStoreWithHeaders(sessionStoreWithHeadersName));
+ assertNotNull(testDriver.getSessionStore(sessionStoreWithHeadersName));
+
assertNotNull(testDriver.getSessionStoreWithHeaders(sessionStoreWithHeadersName));
// verify global stores
assertNotNull(testDriver.getKeyValueStore(globalKeyValueStoreName));
assertNull(testDriver.getTimestampedKeyValueStore(globalKeyValueStoreName));
+
assertNull(testDriver.getTimestampedKeyValueStoreWithHeaders(globalKeyValueStoreName));
assertNull(testDriver.getVersionedKeyValueStore(globalKeyValueStoreName));
assertNull(testDriver.getWindowStore(globalKeyValueStoreName));
assertNull(testDriver.getTimestampedWindowStore(globalKeyValueStoreName));
+
assertNull(testDriver.getTimestampedWindowStoreWithHeaders(globalKeyValueStoreName));
assertNull(testDriver.getSessionStore(globalKeyValueStoreName));
+
assertNull(testDriver.getSessionStoreWithHeaders(globalKeyValueStoreName));
assertNotNull(testDriver.getKeyValueStore(globalTimestampedKeyValueStoreName));
assertNotNull(testDriver.getTimestampedKeyValueStore(globalTimestampedKeyValueStoreName));
+
assertNull(testDriver.getTimestampedKeyValueStoreWithHeaders(globalTimestampedKeyValueStoreName));
assertNull(testDriver.getVersionedKeyValueStore(globalTimestampedKeyValueStoreName));
assertNull(testDriver.getWindowStore(globalTimestampedKeyValueStoreName));
assertNull(testDriver.getTimestampedWindowStore(globalTimestampedKeyValueStoreName));
+
assertNull(testDriver.getTimestampedWindowStoreWithHeaders(globalTimestampedKeyValueStoreName));
assertNull(testDriver.getSessionStore(globalTimestampedKeyValueStoreName));
+
assertNull(testDriver.getSessionStoreWithHeaders(globalTimestampedKeyValueStoreName));
+
+
assertNotNull(testDriver.getKeyValueStore(globalTimestampedKeyValueStoreWithHeadersName));
+
assertNotNull(testDriver.getTimestampedKeyValueStore(globalTimestampedKeyValueStoreWithHeadersName));
+
assertNotNull(testDriver.getTimestampedKeyValueStoreWithHeaders(globalTimestampedKeyValueStoreWithHeadersName));
+
assertNull(testDriver.getVersionedKeyValueStore(globalTimestampedKeyValueStoreWithHeadersName));
+
assertNull(testDriver.getWindowStore(globalTimestampedKeyValueStoreWithHeadersName));
+
assertNull(testDriver.getTimestampedWindowStore(globalTimestampedKeyValueStoreWithHeadersName));
+
assertNull(testDriver.getTimestampedWindowStoreWithHeaders(globalTimestampedKeyValueStoreWithHeadersName));
+
assertNull(testDriver.getSessionStore(globalTimestampedKeyValueStoreWithHeadersName));
+
assertNull(testDriver.getSessionStoreWithHeaders(globalTimestampedKeyValueStoreWithHeadersName));
if (persistent) { // versioned stores do not offer an in-memory
version yet, so nothing to test/verify unless persistent
assertNull(testDriver.getKeyValueStore(globalVersionedKeyValueStoreName));
assertNull(testDriver.getTimestampedKeyValueStore(globalVersionedKeyValueStoreName));
+
assertNull(testDriver.getTimestampedKeyValueStoreWithHeaders(globalVersionedKeyValueStoreName));
assertNotNull(testDriver.getVersionedKeyValueStore(globalVersionedKeyValueStoreName));
assertNull(testDriver.getWindowStore(globalVersionedKeyValueStoreName));
assertNull(testDriver.getTimestampedWindowStore(globalVersionedKeyValueStoreName));
+
assertNull(testDriver.getTimestampedWindowStoreWithHeaders(globalVersionedKeyValueStoreName));
assertNull(testDriver.getSessionStore(globalVersionedKeyValueStoreName));
+
assertNull(testDriver.getSessionStoreWithHeaders(globalVersionedKeyValueStoreName));
}
}
@@ -1019,12 +1098,16 @@ public abstract class TopologyTestDriverTest {
private void shouldThrowIfBuiltInStoreIsAccessedWithUntypedMethod(final
boolean persistent) {
final String keyValueStoreName = "keyValueStore";
final String timestampedKeyValueStoreName = "keyValueTimestampStore";
+ final String timestampedKeyValueStoreWithHeadersName =
"keyValueTimestampStoreWithHeaders";
final String versionedKeyValueStoreName = "keyValueVersionedStore";
final String windowStoreName = "windowStore";
final String timestampedWindowStoreName = "windowTimestampStore";
+ final String timestampedWindowStoreWithHeadersName =
"windowTimestampStoreWithHeaders";
final String sessionStoreName = "sessionStore";
+ final String sessionStoreWithHeadersName = "sessionStoreWithHeaders";
final String globalKeyValueStoreName = "globalKeyValueStore";
final String globalTimestampedKeyValueStoreName =
"globalKeyValueTimestampStore";
+ final String globalTimestampedKeyValueStoreWithHeadersName =
"globalKeyValueTimestampStoreWithHeaders";
final String globalVersionedKeyValueStoreName =
"globalKeyValueVersionedStore";
final Topology topology = setupSingleProcessorTopology();
@@ -1033,12 +1116,16 @@ public abstract class TopologyTestDriverTest {
persistent,
keyValueStoreName,
timestampedKeyValueStoreName,
+ timestampedKeyValueStoreWithHeadersName,
versionedKeyValueStoreName,
windowStoreName,
timestampedWindowStoreName,
+ timestampedWindowStoreWithHeadersName,
sessionStoreName,
+ sessionStoreWithHeadersName,
globalKeyValueStoreName,
globalTimestampedKeyValueStoreName,
+ globalTimestampedKeyValueStoreWithHeadersName,
globalVersionedKeyValueStoreName);
@@ -1127,6 +1214,7 @@ public abstract class TopologyTestDriverTest {
}
}
+ // CAUTION: Do not replace with Lambda; Needs to return a new Processor
instance each time
final ProcessorSupplier<byte[], byte[], Void, Void> voidProcessorSupplier
= () -> new Processor<byte[], byte[], Void, Void>() {
@Override
public void process(final Record<byte[], byte[]> record) {
@@ -1137,29 +1225,42 @@ public abstract class TopologyTestDriverTest {
final boolean persistent,
final String keyValueStoreName,
final String timestampedKeyValueStoreName,
+ final String
timestampedKeyValueStoreWithHeadersName,
final String versionedKeyValueStoreName,
final String windowStoreName,
final String timestampedWindowStoreName,
+ final String
timestampedWindowStoreWithHeadersName,
final String sessionStoreName,
+ final String sessionStoreWithHeadersName,
final String globalKeyValueStoreName,
final String
globalTimestampedKeyValueStoreName,
+ final String
globalTimestampedKeyValueStoreWithHeadersName,
final String
globalVersionedKeyValueStoreName) {
// add state stores
topology.addStateStore(
Stores.keyValueStoreBuilder(
- persistent ?
- Stores.persistentKeyValueStore(keyValueStoreName) :
- Stores.inMemoryKeyValueStore(keyValueStoreName),
+ persistent
+ ? Stores.persistentKeyValueStore(keyValueStoreName)
+ : Stores.inMemoryKeyValueStore(keyValueStoreName),
Serdes.ByteArray(),
Serdes.ByteArray()
),
"processor");
topology.addStateStore(
Stores.timestampedKeyValueStoreBuilder(
- persistent ?
-
Stores.persistentTimestampedKeyValueStore(timestampedKeyValueStoreName) :
- Stores.inMemoryKeyValueStore(timestampedKeyValueStoreName),
+ persistent
+ ?
Stores.persistentTimestampedKeyValueStore(timestampedKeyValueStoreName)
+ :
Stores.inMemoryKeyValueStore(timestampedKeyValueStoreName),
+ Serdes.ByteArray(),
+ Serdes.ByteArray()
+ ),
+ "processor");
+ topology.addStateStore(
+ Stores.timestampedKeyValueStoreWithHeadersBuilder(
+ persistent
+ ?
Stores.persistentTimestampedKeyValueStoreWithHeaders(timestampedKeyValueStoreWithHeadersName)
+ :
Stores.inMemoryKeyValueStore(timestampedKeyValueStoreWithHeadersName),
Serdes.ByteArray(),
Serdes.ByteArray()
),
@@ -1175,70 +1276,92 @@ public abstract class TopologyTestDriverTest {
}
topology.addStateStore(
Stores.windowStoreBuilder(
- persistent ?
- Stores.persistentWindowStore(windowStoreName,
Duration.ofMillis(1000L), Duration.ofMillis(100L), false) :
- Stores.inMemoryWindowStore(windowStoreName,
Duration.ofMillis(1000L), Duration.ofMillis(100L), false),
+ persistent
+ ? Stores.persistentWindowStore(windowStoreName,
Duration.ofMillis(1000L), Duration.ofMillis(100L), false)
+ : Stores.inMemoryWindowStore(windowStoreName,
Duration.ofMillis(1000L), Duration.ofMillis(100L), false),
Serdes.ByteArray(),
Serdes.ByteArray()
),
"processor");
topology.addStateStore(
Stores.timestampedWindowStoreBuilder(
- persistent ?
-
Stores.persistentTimestampedWindowStore(timestampedWindowStoreName,
Duration.ofMillis(1000L), Duration.ofMillis(100L), false) :
- Stores.inMemoryWindowStore(timestampedWindowStoreName,
Duration.ofMillis(1000L), Duration.ofMillis(100L), false),
+ persistent
+ ?
Stores.persistentTimestampedWindowStore(timestampedWindowStoreName,
Duration.ofMillis(1000L), Duration.ofMillis(100L), false)
+ : Stores.inMemoryWindowStore(timestampedWindowStoreName,
Duration.ofMillis(1000L), Duration.ofMillis(100L), false),
Serdes.ByteArray(),
Serdes.ByteArray()
),
"processor");
topology.addStateStore(
- persistent ?
- Stores.sessionStoreBuilder(
- Stores.persistentSessionStore(sessionStoreName,
Duration.ofMillis(1000L)),
- Serdes.ByteArray(),
- Serdes.ByteArray()) :
- Stores.sessionStoreBuilder(
- Stores.inMemorySessionStore(sessionStoreName,
Duration.ofMillis(1000L)),
- Serdes.ByteArray(),
- Serdes.ByteArray()),
+ Stores.timestampedWindowStoreWithHeadersBuilder(
+ persistent
+ ?
Stores.persistentTimestampedWindowStoreWithHeaders(timestampedWindowStoreWithHeadersName,
Duration.ofMillis(1000L), Duration.ofMillis(100L), false)
+ :
Stores.inMemoryWindowStore(timestampedWindowStoreWithHeadersName,
Duration.ofMillis(1000L), Duration.ofMillis(100L), false),
+ Serdes.ByteArray(),
+ Serdes.ByteArray()
+ ),
+ "processor");
+ topology.addStateStore(
+ Stores.sessionStoreBuilder(
+ persistent
+ ? Stores.persistentSessionStore(sessionStoreName,
Duration.ofMillis(1000L))
+ : Stores.inMemorySessionStore(sessionStoreName,
Duration.ofMillis(1000L)),
+ Serdes.ByteArray(),
+ Serdes.ByteArray()
+ ),
+ "processor");
+ topology.addStateStore(
+ Stores.sessionStoreWithHeadersBuilder(
+ persistent
+ ?
Stores.persistentSessionStoreWithHeaders(sessionStoreWithHeadersName,
Duration.ofMillis(1000L))
+ : Stores.inMemorySessionStore(sessionStoreWithHeadersName,
Duration.ofMillis(1000L)),
+ Serdes.ByteArray(),
+ Serdes.ByteArray()
+ ),
"processor");
// add global stores
topology.addGlobalStore(
- persistent ?
- Stores.keyValueStoreBuilder(
- Stores.persistentKeyValueStore(globalKeyValueStoreName),
- Serdes.ByteArray(),
- Serdes.ByteArray()
- ).withLoggingDisabled() :
- Stores.keyValueStoreBuilder(
- Stores.inMemoryKeyValueStore(globalKeyValueStoreName),
- Serdes.ByteArray(),
- Serdes.ByteArray()
- ).withLoggingDisabled(),
+ Stores.keyValueStoreBuilder(
+ persistent
+ ? Stores.persistentKeyValueStore(globalKeyValueStoreName)
+ : Stores.inMemoryKeyValueStore(globalKeyValueStoreName),
+ Serdes.ByteArray(),
+ Serdes.ByteArray()
+ ).withLoggingDisabled(),
"sourceDummy1",
- Serdes.ByteArray().deserializer(),
- Serdes.ByteArray().deserializer(),
+ new ByteArrayDeserializer(),
+ new ByteArrayDeserializer(),
"topicDummy1",
"processorDummy1",
voidProcessorSupplier);
topology.addGlobalStore(
- persistent ?
- Stores.timestampedKeyValueStoreBuilder(
-
Stores.persistentTimestampedKeyValueStore(globalTimestampedKeyValueStoreName),
- Serdes.ByteArray(),
- Serdes.ByteArray()
- ).withLoggingDisabled() :
- Stores.timestampedKeyValueStoreBuilder(
-
Stores.inMemoryKeyValueStore(globalTimestampedKeyValueStoreName),
- Serdes.ByteArray(),
- Serdes.ByteArray()
- ).withLoggingDisabled(),
+ Stores.timestampedKeyValueStoreBuilder(
+ persistent
+ ?
Stores.persistentTimestampedKeyValueStore(globalTimestampedKeyValueStoreName)
+ :
Stores.inMemoryKeyValueStore(globalTimestampedKeyValueStoreName),
+ Serdes.ByteArray(),
+ Serdes.ByteArray()
+ ).withLoggingDisabled(),
"sourceDummy2",
- Serdes.ByteArray().deserializer(),
- Serdes.ByteArray().deserializer(),
+ new ByteArrayDeserializer(),
+ new ByteArrayDeserializer(),
"topicDummy2",
"processorDummy2",
voidProcessorSupplier);
+ topology.addGlobalStore(
+ Stores.timestampedKeyValueStoreWithHeadersBuilder(
+ persistent
+ ?
Stores.persistentTimestampedKeyValueStoreWithHeaders(globalTimestampedKeyValueStoreWithHeadersName)
+ :
Stores.inMemoryKeyValueStore(globalTimestampedKeyValueStoreWithHeadersName),
+ Serdes.ByteArray(),
+ Serdes.ByteArray()
+ ).withLoggingDisabled(),
+ "sourceDummy3",
+ new ByteArrayDeserializer(),
+ new ByteArrayDeserializer(),
+ "topicDummy3",
+ "processorDummy3",
+ voidProcessorSupplier);
if (persistent) { // versioned stores do not offer an in-memory
version yet
topology.addGlobalStore(
Stores.versionedKeyValueStoreBuilder(
@@ -1246,11 +1369,11 @@ public abstract class TopologyTestDriverTest {
Serdes.ByteArray(),
Serdes.ByteArray()
).withLoggingDisabled(),
- "sourceDummy3",
- Serdes.ByteArray().deserializer(),
- Serdes.ByteArray().deserializer(),
- "topicDummy3",
- "processorDummy3",
+ "sourceDummy4",
+ new ByteArrayDeserializer(),
+ new ByteArrayDeserializer(),
+ "topicDummy4",
+ "processorDummy4",
voidProcessorSupplier);
}
}
@@ -1271,8 +1394,8 @@ public abstract class TopologyTestDriverTest {
Serdes.ByteArray(),
Time.SYSTEM).withLoggingDisabled(),
"sourceProcessorName",
- Serdes.ByteArray().deserializer(),
- Serdes.ByteArray().deserializer(),
+ new ByteArrayDeserializer(),
+ new ByteArrayDeserializer(),
"globalTopicName",
"globalProcessorName",
voidProcessorSupplier);
@@ -1300,8 +1423,8 @@ public abstract class TopologyTestDriverTest {
"aggregator");
topology.addSink("sinkProcessor", "result-topic", "aggregator");
- config.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.String().getClass().getName());
- config.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.Long().getClass().getName());
+ config.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.StringSerde.class.getName());
+ config.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.LongSerde.class.getName());
testDriver = new TopologyTestDriver(topology, config);
store = testDriver.getKeyValueStore("aggStore");
@@ -1439,7 +1562,7 @@ public abstract class TopologyTestDriverTest {
new ProcessorSupplier<String, Long, Void, Void>() {
@Override
public Processor<String, Long, Void, Void> get() {
- return new Processor<String, Long, Void, Void>() {
+ return new Processor<>() {
private KeyValueStore<String, Long> store;
@Override
@@ -1463,8 +1586,8 @@ public abstract class TopologyTestDriverTest {
final Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG,
"test-TopologyTestDriver-cleanup");
config.put(StreamsConfig.STATE_DIR_CONFIG,
TestUtils.tempDirectory().getAbsolutePath());
- config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.String().getClass().getName());
- config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.Long().getClass().getName());
+ config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.StringSerde.class.getName());
+ config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.LongSerde.class.getName());
try (final TopologyTestDriver testDriver = new
TopologyTestDriver(topology, config)) {
assertNull(testDriver.getKeyValueStore("storeProcessorStore").get("a"));
@@ -1674,7 +1797,7 @@ public abstract class TopologyTestDriverTest {
new StringDeserializer(),
"global-topic",
"globalProcessor",
- () -> new Processor<String, String, Void, Void>() {
+ () -> new Processor<>() {
private KeyValueStore<String, String> stateStore;
@Override