This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new fb36f97d960 KAFKA-20304: Implement all required ReadOnly*Facade
classes for headers stores (#21743)
fb36f97d960 is described below
commit fb36f97d960fca677778fbb4258e71cc35c8a030
Author: Alieh Saeedi <[email protected]>
AuthorDate: Sun Mar 15 01:28:42 2026 +0100
KAFKA-20304: Implement all required ReadOnly*Facade classes for headers
stores (#21743)
This PR
- adds all `ReadOnly*Facade` classes required to wrap headers store for
backward compatibility
- refactors `ReadOnly*Facade` classes to use generic infrastructure with
converter functions
Reviewers: Matthias J. Sax <[email protected]>
---
...ade.java => GenericKeyValueIteratorFacade.java} | 38 ++-
.../GenericReadOnlyKeyValueStoreFacade.java | 78 ++++++
.../GenericReadOnlyWindowStoreFacade.java | 101 +++++++
....java => GenericWindowStoreIteratorFacade.java} | 40 +--
.../state/internals/GlobalStateStoreProvider.java | 4 +-
.../internals/ReadOnlyKeyValueStoreFacade.java | 70 -----
.../state/internals/ReadOnlyWindowStoreFacade.java | 124 ---------
.../internals/StreamThreadStateStoreProvider.java | 4 +-
.../streams/state/internals/ValueConverters.java | 65 +++++
...java => GenericKeyValueIteratorFacadeTest.java} | 57 ++--
.../GenericReadOnlyKeyValueStoreFacadeTest.java | 209 +++++++++++++++
.../GenericReadOnlyWindowStoreFacadeTest.java | 296 +++++++++++++++++++++
...a => GenericWindowStoreIteratorFacadeTest.java} | 67 ++---
.../internals/ReadOnlyKeyValueStoreFacadeTest.java | 107 --------
.../internals/ReadOnlyWindowStoreFacadeTest.java | 192 -------------
.../state/internals/ValueConvertersTest.java | 98 +++++++
.../apache/kafka/streams/TopologyTestDriver.java | 84 +++---
.../kafka/streams/KeyValueStoreFacadeTest.java | 10 +
.../kafka/streams/WindowStoreFacadeTest.java | 70 +++++
19 files changed, 1098 insertions(+), 616 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueIteratorFacade.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/GenericKeyValueIteratorFacade.java
similarity index 56%
copy from
streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueIteratorFacade.java
copy to
streams/src/main/java/org/apache/kafka/streams/state/internals/GenericKeyValueIteratorFacade.java
index f79b6f352a2..2ab15313b61 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueIteratorFacade.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/GenericKeyValueIteratorFacade.java
@@ -18,20 +18,30 @@ package org.apache.kafka.streams.state.internals;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.state.KeyValueIterator;
-import org.apache.kafka.streams.state.ValueAndTimestamp;
-import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
+import java.util.function.Function;
-public class KeyValueIteratorFacade<K, V> implements KeyValueIterator<K, V> {
- private final KeyValueIterator<K, ValueAndTimestamp<V>> innerIterator;
-
- public KeyValueIteratorFacade(final KeyValueIterator<K,
ValueAndTimestamp<V>> iterator) {
- innerIterator = iterator;
+/**
+ * Generic iterator facade that wraps a {@link KeyValueIterator} and converts
values
+ * using a provided converter function.
+ *
+ * @param <K> key type
+ * @param <InV> input value type (from inner iterator)
+ * @param <OutV> output value type (exposed by this facade)
+ */
+class GenericKeyValueIteratorFacade<K, InV, OutV> implements
KeyValueIterator<K, OutV> {
+ private final KeyValueIterator<K, InV> innerIterator;
+ private final Function<InV, OutV> valueConverter;
+
+ GenericKeyValueIteratorFacade(final KeyValueIterator<K, InV> innerIterator,
+ final Function<InV, OutV> valueConverter) {
+ this.innerIterator = innerIterator;
+ this.valueConverter = valueConverter;
}
@Override
- public boolean hasNext() {
- return innerIterator.hasNext();
+ public void close() {
+ innerIterator.close();
}
@Override
@@ -40,13 +50,13 @@ public class KeyValueIteratorFacade<K, V> implements
KeyValueIterator<K, V> {
}
@Override
- public KeyValue<K, V> next() {
- final KeyValue<K, ValueAndTimestamp<V>> innerKeyValue =
innerIterator.next();
- return KeyValue.pair(innerKeyValue.key,
getValueOrNull(innerKeyValue.value));
+ public boolean hasNext() {
+ return innerIterator.hasNext();
}
@Override
- public void close() {
- innerIterator.close();
+ public KeyValue<K, OutV> next() {
+ final KeyValue<K, InV> innerKeyValue = innerIterator.next();
+ return KeyValue.pair(innerKeyValue.key,
valueConverter.apply(innerKeyValue.value));
}
}
\ No newline at end of file
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/GenericReadOnlyKeyValueStoreFacade.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/GenericReadOnlyKeyValueStoreFacade.java
new file mode 100644
index 00000000000..a35db0ebbd1
--- /dev/null
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/GenericReadOnlyKeyValueStoreFacade.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
+
+import java.util.function.Function;
+
+/**
+ * Generic facade that wraps a {@link ReadOnlyKeyValueStore} and converts
values
+ * using a provided converter function.
+ *
+ * @param <K> key type
+ * @param <InV> input value type (from inner store)
+ * @param <OutV> output value type (exposed by this facade)
+ */
+public class GenericReadOnlyKeyValueStoreFacade<K, InV, OutV> implements
ReadOnlyKeyValueStore<K, OutV> {
+ private final ReadOnlyKeyValueStore<K, InV> inner;
+ private final Function<InV, OutV> valueConverter;
+
+ public GenericReadOnlyKeyValueStoreFacade(final ReadOnlyKeyValueStore<K,
InV> inner,
+ final Function<InV, OutV>
valueConverter) {
+ this.inner = inner;
+ this.valueConverter = valueConverter;
+ }
+
+ @Override
+ public OutV get(final K key) {
+ return valueConverter.apply(inner.get(key));
+ }
+
+ @Override
+ public KeyValueIterator<K, OutV> range(final K from, final K to) {
+ return new GenericKeyValueIteratorFacade<>(inner.range(from, to),
valueConverter);
+ }
+
+ @Override
+ public KeyValueIterator<K, OutV> reverseRange(final K from, final K to) {
+ return new GenericKeyValueIteratorFacade<>(inner.reverseRange(from,
to), valueConverter);
+ }
+
+ @Override
+ public <PS extends Serializer<P>, P> KeyValueIterator<K, OutV>
prefixScan(final P prefix,
+
final PS prefixKeySerializer) {
+ return new GenericKeyValueIteratorFacade<>(inner.prefixScan(prefix,
prefixKeySerializer), valueConverter);
+ }
+
+ @Override
+ public KeyValueIterator<K, OutV> all() {
+ return new GenericKeyValueIteratorFacade<>(inner.all(),
valueConverter);
+ }
+
+ @Override
+ public KeyValueIterator<K, OutV> reverseAll() {
+ return new GenericKeyValueIteratorFacade<>(inner.reverseAll(),
valueConverter);
+ }
+
+ @Override
+ public long approximateNumEntries() {
+ return inner.approximateNumEntries();
+ }
+}
\ No newline at end of file
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/GenericReadOnlyWindowStoreFacade.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/GenericReadOnlyWindowStoreFacade.java
new file mode 100644
index 00000000000..93f4d0b5245
--- /dev/null
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/GenericReadOnlyWindowStoreFacade.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.ReadOnlyWindowStore;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+
+import java.time.Instant;
+import java.util.function.Function;
+
+/**
+ * Generic facade that wraps a {@link ReadOnlyWindowStore} and converts values
+ * using a provided converter function.
+ *
+ * @param <K> key type
+ * @param <InV> input value type (from inner store)
+ * @param <OutV> output value type (exposed by this facade)
+ */
+public class GenericReadOnlyWindowStoreFacade<K, InV, OutV> implements
ReadOnlyWindowStore<K, OutV> {
+ private final ReadOnlyWindowStore<K, InV> inner;
+ private final Function<InV, OutV> valueConverter;
+
+ public GenericReadOnlyWindowStoreFacade(final ReadOnlyWindowStore<K, InV>
inner,
+ final Function<InV, OutV>
valueConverter) {
+ this.inner = inner;
+ this.valueConverter = valueConverter;
+ }
+
+ @Override
+ public OutV fetch(final K key, final long time) {
+ return valueConverter.apply(inner.fetch(key, time));
+ }
+
+ @Override
+ public WindowStoreIterator<OutV> fetch(final K key,
+ final Instant timeFrom,
+ final Instant timeTo) throws
IllegalArgumentException {
+ return new GenericWindowStoreIteratorFacade<>(inner.fetch(key,
timeFrom, timeTo), valueConverter);
+ }
+
+ @Override
+ public WindowStoreIterator<OutV> backwardFetch(final K key,
+ final Instant timeFrom,
+ final Instant timeTo)
throws IllegalArgumentException {
+ return new GenericWindowStoreIteratorFacade<>(inner.backwardFetch(key,
timeFrom, timeTo), valueConverter);
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<K>, OutV> fetch(final K keyFrom,
+ final K keyTo,
+ final Instant timeFrom,
+ final Instant timeTo)
throws IllegalArgumentException {
+ return new GenericKeyValueIteratorFacade<>(inner.fetch(keyFrom, keyTo,
timeFrom, timeTo), valueConverter);
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<K>, OutV> backwardFetch(final K keyFrom,
+ final K keyTo,
+ final Instant
timeFrom,
+ final Instant
timeTo) throws IllegalArgumentException {
+ return new
GenericKeyValueIteratorFacade<>(inner.backwardFetch(keyFrom, keyTo, timeFrom,
timeTo), valueConverter);
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<K>, OutV> fetchAll(final Instant timeFrom,
+ final Instant timeTo)
throws IllegalArgumentException {
+ return new GenericKeyValueIteratorFacade<>(inner.fetchAll(timeFrom,
timeTo), valueConverter);
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<K>, OutV> backwardFetchAll(final Instant
timeFrom,
+ final Instant
timeTo) throws IllegalArgumentException {
+ return new
GenericKeyValueIteratorFacade<>(inner.backwardFetchAll(timeFrom, timeTo),
valueConverter);
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<K>, OutV> all() {
+ return new GenericKeyValueIteratorFacade<>(inner.all(),
valueConverter);
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<K>, OutV> backwardAll() {
+ return new GenericKeyValueIteratorFacade<>(inner.backwardAll(),
valueConverter);
+ }
+}
\ No newline at end of file
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueIteratorFacade.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/GenericWindowStoreIteratorFacade.java
similarity index 52%
rename from
streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueIteratorFacade.java
rename to
streams/src/main/java/org/apache/kafka/streams/state/internals/GenericWindowStoreIteratorFacade.java
index f79b6f352a2..a2e858b05dc 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueIteratorFacade.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/GenericWindowStoreIteratorFacade.java
@@ -18,35 +18,45 @@ package org.apache.kafka.streams.state.internals;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.state.KeyValueIterator;
-import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.WindowStoreIterator;
-import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
+import java.util.function.Function;
-public class KeyValueIteratorFacade<K, V> implements KeyValueIterator<K, V> {
- private final KeyValueIterator<K, ValueAndTimestamp<V>> innerIterator;
-
- public KeyValueIteratorFacade(final KeyValueIterator<K,
ValueAndTimestamp<V>> iterator) {
- innerIterator = iterator;
+/**
+ * Generic iterator facade that wraps a {@link KeyValueIterator} with Long
keys and converts values
+ * using a provided converter function to implement {@link
WindowStoreIterator}.
+ *
+ * @param <InV> input value type (from inner iterator)
+ * @param <OutV> output value type (exposed by this facade)
+ */
+class GenericWindowStoreIteratorFacade<InV, OutV> implements
WindowStoreIterator<OutV> {
+ private final KeyValueIterator<Long, InV> innerIterator;
+ private final Function<InV, OutV> valueConverter;
+
+ GenericWindowStoreIteratorFacade(final KeyValueIterator<Long, InV>
innerIterator,
+ final Function<InV, OutV> valueConverter)
{
+ this.innerIterator = innerIterator;
+ this.valueConverter = valueConverter;
}
@Override
- public boolean hasNext() {
- return innerIterator.hasNext();
+ public void close() {
+ innerIterator.close();
}
@Override
- public K peekNextKey() {
+ public Long peekNextKey() {
return innerIterator.peekNextKey();
}
@Override
- public KeyValue<K, V> next() {
- final KeyValue<K, ValueAndTimestamp<V>> innerKeyValue =
innerIterator.next();
- return KeyValue.pair(innerKeyValue.key,
getValueOrNull(innerKeyValue.value));
+ public boolean hasNext() {
+ return innerIterator.hasNext();
}
@Override
- public void close() {
- innerIterator.close();
+ public KeyValue<Long, OutV> next() {
+ final KeyValue<Long, InV> innerKeyValue = innerIterator.next();
+ return KeyValue.pair(innerKeyValue.key,
valueConverter.apply(innerKeyValue.value));
}
}
\ No newline at end of file
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProvider.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProvider.java
index f15c21be5c7..6620401cbcf 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProvider.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProvider.java
@@ -46,9 +46,9 @@ public class GlobalStateStoreProvider implements
StateStoreProvider {
throw new InvalidStateStoreException("the state store, " +
storeName + ", is not open.");
}
if (store instanceof TimestampedKeyValueStore && queryableStoreType
instanceof QueryableStoreTypes.KeyValueStoreType) {
- return (List<T>) Collections.singletonList(new
ReadOnlyKeyValueStoreFacade<>((TimestampedKeyValueStore<Object, Object>)
store));
+ return (List<T>) Collections.singletonList(new
GenericReadOnlyKeyValueStoreFacade<>((TimestampedKeyValueStore<Object, Object>)
store, ValueConverters.extractValue()));
} else if (store instanceof TimestampedWindowStore &&
queryableStoreType instanceof QueryableStoreTypes.WindowStoreType) {
- return (List<T>) Collections.singletonList(new
ReadOnlyWindowStoreFacade<>((TimestampedWindowStore<Object, Object>) store));
+ return (List<T>) Collections.singletonList(new
GenericReadOnlyWindowStoreFacade<>((TimestampedWindowStore<Object, Object>)
store, ValueConverters.extractValue()));
} else if (store instanceof SessionStoreWithHeaders &&
queryableStoreType instanceof QueryableStoreTypes.SessionStoreType) {
return (List<T>) Collections.singletonList(new
ReadOnlySessionStoreFacade<>((SessionStoreWithHeaders<Object, Object>) store));
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ReadOnlyKeyValueStoreFacade.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ReadOnlyKeyValueStoreFacade.java
deleted file mode 100644
index 7a03f7270e8..00000000000
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ReadOnlyKeyValueStoreFacade.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.state.internals;
-
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.streams.state.KeyValueIterator;
-import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
-import org.apache.kafka.streams.state.TimestampedKeyValueStore;
-
-import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
-
-public class ReadOnlyKeyValueStoreFacade<K, V> implements
ReadOnlyKeyValueStore<K, V> {
- protected final TimestampedKeyValueStore<K, V> inner;
-
- protected ReadOnlyKeyValueStoreFacade(final TimestampedKeyValueStore<K, V>
store) {
- inner = store;
- }
-
- @Override
- public V get(final K key) {
- return getValueOrNull(inner.get(key));
- }
-
- @Override
- public KeyValueIterator<K, V> range(final K from,
- final K to) {
- return new KeyValueIteratorFacade<>(inner.range(from, to));
- }
-
- @Override
- public KeyValueIterator<K, V> reverseRange(final K from,
- final K to) {
- return new KeyValueIteratorFacade<>(inner.reverseRange(from, to));
- }
-
- @Override
- public <PS extends Serializer<P>, P> KeyValueIterator<K, V>
prefixScan(final P prefix,
-
final PS prefixKeySerializer) {
- return new KeyValueIteratorFacade<>(inner.prefixScan(prefix,
prefixKeySerializer));
- }
-
- @Override
- public KeyValueIterator<K, V> all() {
- return new KeyValueIteratorFacade<>(inner.all());
- }
-
- @Override
- public KeyValueIterator<K, V> reverseAll() {
- return new KeyValueIteratorFacade<>(inner.reverseAll());
- }
-
- @Override
- public long approximateNumEntries() {
- return inner.approximateNumEntries();
- }
-}
\ No newline at end of file
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreFacade.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreFacade.java
deleted file mode 100644
index 281a1c26757..00000000000
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreFacade.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.state.internals;
-
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.kstream.Windowed;
-import org.apache.kafka.streams.state.KeyValueIterator;
-import org.apache.kafka.streams.state.ReadOnlyWindowStore;
-import org.apache.kafka.streams.state.TimestampedWindowStore;
-import org.apache.kafka.streams.state.ValueAndTimestamp;
-import org.apache.kafka.streams.state.WindowStoreIterator;
-
-import java.time.Instant;
-
-import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
-
-public class ReadOnlyWindowStoreFacade<K, V> implements ReadOnlyWindowStore<K,
V> {
- protected final TimestampedWindowStore<K, V> inner;
-
- protected ReadOnlyWindowStoreFacade(final TimestampedWindowStore<K, V>
store) {
- inner = store;
- }
-
- @Override
- public V fetch(final K key,
- final long time) {
- return getValueOrNull(inner.fetch(key, time));
- }
-
- @Override
- public WindowStoreIterator<V> fetch(final K key,
- final Instant timeFrom,
- final Instant timeTo) throws
IllegalArgumentException {
- return new WindowStoreIteratorFacade<>(inner.fetch(key, timeFrom,
timeTo));
- }
-
- @Override
- public WindowStoreIterator<V> backwardFetch(final K key,
- final Instant timeFrom,
- final Instant timeTo) throws
IllegalArgumentException {
- return new WindowStoreIteratorFacade<>(inner.backwardFetch(key,
timeFrom, timeTo));
- }
-
- @Override
- public KeyValueIterator<Windowed<K>, V> fetch(final K keyFrom,
- final K keyTo,
- final Instant timeFrom,
- final Instant timeTo) throws
IllegalArgumentException {
- return new KeyValueIteratorFacade<>(inner.fetch(keyFrom, keyTo,
timeFrom, timeTo));
- }
-
- @Override
- public KeyValueIterator<Windowed<K>, V> backwardFetch(final K keyFrom,
- final K keyTo,
- final Instant
timeFrom,
- final Instant
timeTo) throws IllegalArgumentException {
- return new KeyValueIteratorFacade<>(inner.backwardFetch(keyFrom,
keyTo, timeFrom, timeTo));
- }
-
- @Override
- public KeyValueIterator<Windowed<K>, V> fetchAll(final Instant timeFrom,
- final Instant timeTo)
throws IllegalArgumentException {
- return new KeyValueIteratorFacade<>(inner.fetchAll(timeFrom, timeTo));
- }
-
- @Override
- public KeyValueIterator<Windowed<K>, V> backwardFetchAll(final Instant
timeFrom,
- final Instant
timeTo) throws IllegalArgumentException {
- return new KeyValueIteratorFacade<>(inner.backwardFetchAll(timeFrom,
timeTo));
- }
-
- @Override
- public KeyValueIterator<Windowed<K>, V> all() {
- return new KeyValueIteratorFacade<>(inner.all());
- }
-
- @Override
- public KeyValueIterator<Windowed<K>, V> backwardAll() {
- return new KeyValueIteratorFacade<>(inner.backwardAll());
- }
-
- private static class WindowStoreIteratorFacade<V> implements
WindowStoreIterator<V> {
- final KeyValueIterator<Long, ValueAndTimestamp<V>> innerIterator;
-
- WindowStoreIteratorFacade(final KeyValueIterator<Long,
ValueAndTimestamp<V>> iterator) {
- innerIterator = iterator;
- }
-
- @Override
- public void close() {
- innerIterator.close();
- }
-
- @Override
- public Long peekNextKey() {
- return innerIterator.peekNextKey();
- }
-
- @Override
- public boolean hasNext() {
- return innerIterator.hasNext();
- }
-
- @Override
- public KeyValue<Long, V> next() {
- final KeyValue<Long, ValueAndTimestamp<V>> innerKeyValue =
innerIterator.next();
- return KeyValue.pair(innerKeyValue.key,
getValueOrNull(innerKeyValue.value));
- }
- }
-}
\ No newline at end of file
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java
index f797ff0aadb..8a7418fec8a 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java
@@ -105,9 +105,9 @@ public class StreamThreadStateStoreProvider {
"The state store may have migrated to another
instance.");
}
if (store instanceof TimestampedKeyValueStore &&
queryableStoreType instanceof QueryableStoreTypes.KeyValueStoreType) {
- return (T) new
ReadOnlyKeyValueStoreFacade<>((TimestampedKeyValueStore<Object, Object>) store);
+ return (T) new
GenericReadOnlyKeyValueStoreFacade<>((TimestampedKeyValueStore<Object, Object>)
store, ValueConverters.extractValue());
} else if (store instanceof TimestampedWindowStore &&
queryableStoreType instanceof QueryableStoreTypes.WindowStoreType) {
- return (T) new
ReadOnlyWindowStoreFacade<>((TimestampedWindowStore<Object, Object>) store);
+ return (T) new
GenericReadOnlyWindowStoreFacade<>((TimestampedWindowStore<Object, Object>)
store, ValueConverters.extractValue());
} else if (store instanceof SessionStoreWithHeaders &&
queryableStoreType instanceof QueryableStoreTypes.SessionStoreType) {
return (T) new
ReadOnlySessionStoreFacade<>((SessionStoreWithHeaders<Object, Object>) store);
} else {
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueConverters.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueConverters.java
new file mode 100644
index 00000000000..3a3ceec2d42
--- /dev/null
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueConverters.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.ValueTimestampHeaders;
+
+import java.util.function.Function;
+
+/**
+ * Utility class providing common value converter functions for facade
patterns.
+ * These converters are used to transform between different value
representations
+ * (e.g., ValueAndTimestamp, ValueTimestampHeaders, plain values).
+ */
+public final class ValueConverters {
+
+ private ValueConverters() {
+ // Utility class - prevent instantiation
+ }
+
+ /**
+ * Converts {@link ValueAndTimestamp} to plain value, discarding timestamp.
+ *
+ * @param <V> value type
+ * @return converter function that extracts the value or returns null
+ */
+ public static <V> Function<ValueAndTimestamp<V>, V> extractValue() {
+ return ValueAndTimestamp::getValueOrNull;
+ }
+
+ /**
+ * Converts {@link ValueTimestampHeaders} to plain value, discarding
timestamp and headers.
+ *
+ * @param <V> value type
+ * @return converter function that extracts the value or returns null
+ */
+ public static <V> Function<ValueTimestampHeaders<V>, V>
extractValueFromHeaders() {
+ return vth -> vth == null ? null : vth.value();
+ }
+
+ /**
+ * Converts {@link ValueTimestampHeaders} to {@link ValueAndTimestamp},
discarding headers.
+ *
+ * @param <V> value type
+ * @return converter function that creates ValueAndTimestamp or returns
null
+ */
+ public static <V> Function<ValueTimestampHeaders<V>, ValueAndTimestamp<V>>
headersToValueAndTimestamp() {
+ return vth -> vth == null ? null :
+ ValueAndTimestamp.make(vth.value(), vth.timestamp());
+ }
+}
\ No newline at end of file
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueIteratorFacadeTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/GenericKeyValueIteratorFacadeTest.java
similarity index 50%
rename from
streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueIteratorFacadeTest.java
rename to
streams/src/test/java/org/apache/kafka/streams/state/internals/GenericKeyValueIteratorFacadeTest.java
index 2f2458a3e39..3983ed6057d 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueIteratorFacadeTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/GenericKeyValueIteratorFacadeTest.java
@@ -28,53 +28,70 @@ import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;
+import java.util.function.Function;
+
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
@MockitoSettings(strictness = Strictness.STRICT_STUBS)
-public class KeyValueIteratorFacadeTest {
+public class GenericKeyValueIteratorFacadeTest {
+
@Mock
- private KeyValueIterator<String, ValueAndTimestamp<String>>
mockedKeyValueIterator;
+ private KeyValueIterator<String, ValueAndTimestamp<String>>
mockedInnerIterator;
- private KeyValueIteratorFacade<String, String> keyValueIteratorFacade;
+ private GenericKeyValueIteratorFacade<String, ValueAndTimestamp<String>,
String> facade;
@BeforeEach
public void setup() {
- keyValueIteratorFacade = new
KeyValueIteratorFacade<>(mockedKeyValueIterator);
+ final Function<ValueAndTimestamp<String>, String> converter =
ValueConverters.extractValue();
+ facade = new GenericKeyValueIteratorFacade<>(mockedInnerIterator,
converter);
}
@Test
- public void shouldForwardHasNext() {
-
when(mockedKeyValueIterator.hasNext()).thenReturn(true).thenReturn(false);
+ public void shouldConvertValues() {
+ when(mockedInnerIterator.next())
+ .thenReturn(KeyValue.pair("key1", ValueAndTimestamp.make("value1",
42L)))
+ .thenReturn(KeyValue.pair("key2", ValueAndTimestamp.make("value2",
84L)));
- assertTrue(keyValueIteratorFacade.hasNext());
- assertFalse(keyValueIteratorFacade.hasNext());
+ assertThat(facade.next(), is(KeyValue.pair("key1", "value1")));
+ assertThat(facade.next(), is(KeyValue.pair("key2", "value2")));
}
@Test
- public void shouldForwardPeekNextKey() {
- when(mockedKeyValueIterator.peekNextKey()).thenReturn("key");
+ public void shouldHandleNullValues() {
+ when(mockedInnerIterator.next())
+ .thenReturn(KeyValue.pair("key1", null))
+ .thenReturn(KeyValue.pair("key2", ValueAndTimestamp.make("value2",
42L)));
- assertThat(keyValueIteratorFacade.peekNextKey(), is("key"));
+ assertThat(facade.next(), is(KeyValue.pair("key1", null)));
+ assertThat(facade.next(), is(KeyValue.pair("key2", "value2")));
}
@Test
- public void shouldReturnPlainKeyValuePairOnGet() {
- when(mockedKeyValueIterator.next()).thenReturn(
- new KeyValue<>("key", ValueAndTimestamp.make("value", 42L)));
+ public void shouldDelegateHasNext() {
+ when(mockedInnerIterator.hasNext()).thenReturn(true, false);
- assertThat(keyValueIteratorFacade.next(), is(KeyValue.pair("key",
"value")));
+ assertTrue(facade.hasNext());
+ assertFalse(facade.hasNext());
}
@Test
- public void shouldCloseInnerIterator() {
- doNothing().when(mockedKeyValueIterator).close();
+ public void shouldDelegatePeekNextKey() {
+ when(mockedInnerIterator.peekNextKey()).thenReturn("peekedKey", null);
- keyValueIteratorFacade.close();
+ assertThat(facade.peekNextKey(), is("peekedKey"));
+ assertNull(facade.peekNextKey());
+ }
+
+ @Test
+ public void shouldDelegateClose() {
+ facade.close();
+ verify(mockedInnerIterator).close();
}
-}
+}
\ No newline at end of file
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/GenericReadOnlyKeyValueStoreFacadeTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/GenericReadOnlyKeyValueStoreFacadeTest.java
new file mode 100644
index 00000000000..c569ca20eaa
--- /dev/null
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/GenericReadOnlyKeyValueStoreFacadeTest.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.ValueTimestampHeaders;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.mockito.junit.jupiter.MockitoSettings;
+import org.mockito.quality.Strictness;
+
+import java.util.function.Function;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(MockitoExtension.class)
+@MockitoSettings(strictness = Strictness.STRICT_STUBS)
+public class GenericReadOnlyKeyValueStoreFacadeTest {
+
+ @Mock
+ private ReadOnlyKeyValueStore<String, ValueAndTimestamp<String>>
mockedTimestampedStore;
+ @Mock
+ private ReadOnlyKeyValueStore<String, ValueTimestampHeaders<String>>
mockedHeadersStore;
+ @Mock
+ private KeyValueIterator<String, ValueAndTimestamp<String>>
mockedTimestampedIterator;
+ @Mock
+ private KeyValueIterator<String, ValueTimestampHeaders<String>>
mockedHeadersIterator;
+
+ @Test
+ public void shouldConvertValueWithExtractValueConverter() {
+ final Function<ValueAndTimestamp<String>, String> converter =
ValueConverters.extractValue();
+ final GenericReadOnlyKeyValueStoreFacade<String,
ValueAndTimestamp<String>, String> facade =
+ new GenericReadOnlyKeyValueStoreFacade<>(mockedTimestampedStore,
converter);
+
+ when(mockedTimestampedStore.get("key"))
+ .thenReturn(ValueAndTimestamp.make("value", 42L));
+ when(mockedTimestampedStore.get("unknownKey"))
+ .thenReturn(null);
+
+ assertThat(facade.get("key"), is("value"));
+ assertNull(facade.get("unknownKey"));
+ }
+
+ @Test
+ public void shouldConvertValueWithHeadersConverter() {
+ final Function<ValueTimestampHeaders<String>, String> converter =
ValueConverters.extractValueFromHeaders();
+ final GenericReadOnlyKeyValueStoreFacade<String,
ValueTimestampHeaders<String>, String> facade =
+ new GenericReadOnlyKeyValueStoreFacade<>(mockedHeadersStore,
converter);
+
+ when(mockedHeadersStore.get("key"))
+ .thenReturn(ValueTimestampHeaders.make("value", 42L, new
RecordHeaders()));
+ when(mockedHeadersStore.get("unknownKey"))
+ .thenReturn(null);
+
+ assertThat(facade.get("key"), is("value"));
+ assertNull(facade.get("unknownKey"));
+ }
+
+ @Test
+ public void shouldConvertValueToValueAndTimestamp() {
+ final Function<ValueTimestampHeaders<String>,
ValueAndTimestamp<String>> converter =
+ ValueConverters.headersToValueAndTimestamp();
+ final GenericReadOnlyKeyValueStoreFacade<String,
ValueTimestampHeaders<String>, ValueAndTimestamp<String>> facade =
+ new GenericReadOnlyKeyValueStoreFacade<>(mockedHeadersStore,
converter);
+
+ when(mockedHeadersStore.get("key"))
+ .thenReturn(ValueTimestampHeaders.make("value", 42L, new
RecordHeaders()));
+ when(mockedHeadersStore.get("unknownKey"))
+ .thenReturn(null);
+
+ final ValueAndTimestamp<String> result = facade.get("key");
+ assertThat(result.value(), is("value"));
+ assertThat(result.timestamp(), is(42L));
+ assertNull(facade.get("unknownKey"));
+ }
+
+ @Test
+ public void shouldConvertIteratorValues() {
+ final Function<ValueAndTimestamp<String>, String> converter =
ValueConverters.extractValue();
+ final GenericReadOnlyKeyValueStoreFacade<String,
ValueAndTimestamp<String>, String> facade =
+ new GenericReadOnlyKeyValueStoreFacade<>(mockedTimestampedStore,
converter);
+
+ when(mockedTimestampedIterator.next())
+ .thenReturn(KeyValue.pair("key1", ValueAndTimestamp.make("value1",
21L)))
+ .thenReturn(KeyValue.pair("key2", ValueAndTimestamp.make("value2",
42L)));
+ when(mockedTimestampedStore.range("key1",
"key2")).thenReturn(mockedTimestampedIterator);
+
+ final KeyValueIterator<String, String> iterator = facade.range("key1",
"key2");
+ assertThat(iterator.next(), is(KeyValue.pair("key1", "value1")));
+ assertThat(iterator.next(), is(KeyValue.pair("key2", "value2")));
+ }
+
+ @Test
+ public void shouldConvertReverseRangeIteratorValues() {
+ final Function<ValueAndTimestamp<String>, String> converter =
ValueConverters.extractValue();
+ final GenericReadOnlyKeyValueStoreFacade<String,
ValueAndTimestamp<String>, String> facade =
+ new GenericReadOnlyKeyValueStoreFacade<>(mockedTimestampedStore,
converter);
+
+ when(mockedTimestampedIterator.next())
+ .thenReturn(KeyValue.pair("key2", ValueAndTimestamp.make("value2",
42L)))
+ .thenReturn(KeyValue.pair("key1", ValueAndTimestamp.make("value1",
21L)));
+ when(mockedTimestampedStore.reverseRange("key1",
"key2")).thenReturn(mockedTimestampedIterator);
+
+ final KeyValueIterator<String, String> iterator =
facade.reverseRange("key1", "key2");
+ assertThat(iterator.next(), is(KeyValue.pair("key2", "value2")));
+ assertThat(iterator.next(), is(KeyValue.pair("key1", "value1")));
+ }
+
+ @Test
+ public void shouldConvertPrefixScanIteratorValues() {
+ final Function<ValueAndTimestamp<String>, String> converter =
ValueConverters.extractValue();
+ final GenericReadOnlyKeyValueStoreFacade<String,
ValueAndTimestamp<String>, String> facade =
+ new GenericReadOnlyKeyValueStoreFacade<>(mockedTimestampedStore,
converter);
+
+ final StringSerializer stringSerializer = new StringSerializer();
+ when(mockedTimestampedIterator.next())
+ .thenReturn(KeyValue.pair("key1", ValueAndTimestamp.make("value1",
21L)))
+ .thenReturn(KeyValue.pair("key2", ValueAndTimestamp.make("value2",
42L)));
+ when(mockedTimestampedStore.prefixScan("key",
stringSerializer)).thenReturn(mockedTimestampedIterator);
+
+ final KeyValueIterator<String, String> iterator =
facade.prefixScan("key", stringSerializer);
+ assertThat(iterator.next(), is(KeyValue.pair("key1", "value1")));
+ assertThat(iterator.next(), is(KeyValue.pair("key2", "value2")));
+ }
+
+ @Test
+ public void shouldConvertAllIteratorValues() {
+ final Function<ValueAndTimestamp<String>, String> converter =
ValueConverters.extractValue();
+ final GenericReadOnlyKeyValueStoreFacade<String,
ValueAndTimestamp<String>, String> facade =
+ new GenericReadOnlyKeyValueStoreFacade<>(mockedTimestampedStore,
converter);
+
+ when(mockedTimestampedIterator.next())
+ .thenReturn(KeyValue.pair("key1", ValueAndTimestamp.make("value1",
21L)))
+ .thenReturn(KeyValue.pair("key2", ValueAndTimestamp.make("value2",
42L)));
+
when(mockedTimestampedStore.all()).thenReturn(mockedTimestampedIterator);
+
+ final KeyValueIterator<String, String> iterator = facade.all();
+ assertThat(iterator.next(), is(KeyValue.pair("key1", "value1")));
+ assertThat(iterator.next(), is(KeyValue.pair("key2", "value2")));
+ }
+
+ @Test
+ public void shouldConvertReverseAllIteratorValues() {
+ final Function<ValueAndTimestamp<String>, String> converter =
ValueConverters.extractValue();
+ final GenericReadOnlyKeyValueStoreFacade<String,
ValueAndTimestamp<String>, String> facade =
+ new GenericReadOnlyKeyValueStoreFacade<>(mockedTimestampedStore,
converter);
+
+ when(mockedTimestampedIterator.next())
+ .thenReturn(KeyValue.pair("key2", ValueAndTimestamp.make("value2",
42L)))
+ .thenReturn(KeyValue.pair("key1", ValueAndTimestamp.make("value1",
21L)));
+
when(mockedTimestampedStore.reverseAll()).thenReturn(mockedTimestampedIterator);
+
+ final KeyValueIterator<String, String> iterator = facade.reverseAll();
+ assertThat(iterator.next(), is(KeyValue.pair("key2", "value2")));
+ assertThat(iterator.next(), is(KeyValue.pair("key1", "value1")));
+ }
+
+ @Test
+ public void shouldForwardApproximateNumEntries() {
+ final Function<ValueAndTimestamp<String>, String> converter =
ValueConverters.extractValue();
+ final GenericReadOnlyKeyValueStoreFacade<String,
ValueAndTimestamp<String>, String> facade =
+ new GenericReadOnlyKeyValueStoreFacade<>(mockedTimestampedStore,
converter);
+
+ when(mockedTimestampedStore.approximateNumEntries()).thenReturn(42L);
+
+ assertThat(facade.approximateNumEntries(), is(42L));
+ }
+
+ @Test
+ public void shouldHandleNullValuesInIterator() {
+ final Function<ValueAndTimestamp<String>, String> converter =
ValueConverters.extractValue();
+ final GenericReadOnlyKeyValueStoreFacade<String,
ValueAndTimestamp<String>, String> facade =
+ new GenericReadOnlyKeyValueStoreFacade<>(mockedTimestampedStore,
converter);
+
+ when(mockedTimestampedIterator.next())
+ .thenReturn(KeyValue.pair("key1", null))
+ .thenReturn(KeyValue.pair("key2", ValueAndTimestamp.make("value2",
42L)));
+
when(mockedTimestampedStore.all()).thenReturn(mockedTimestampedIterator);
+
+ final KeyValueIterator<String, String> iterator = facade.all();
+ assertThat(iterator.next(), is(KeyValue.pair("key1", null)));
+ assertThat(iterator.next(), is(KeyValue.pair("key2", "value2")));
+ }
+}
\ No newline at end of file
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/GenericReadOnlyWindowStoreFacadeTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/GenericReadOnlyWindowStoreFacadeTest.java
new file mode 100644
index 00000000000..86a4f3e5543
--- /dev/null
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/GenericReadOnlyWindowStoreFacadeTest.java
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.internals.TimeWindow;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.ReadOnlyWindowStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.ValueTimestampHeaders;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.mockito.junit.jupiter.MockitoSettings;
+import org.mockito.quality.Strictness;
+
+import java.time.Instant;
+import java.util.function.Function;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(MockitoExtension.class)
+@MockitoSettings(strictness = Strictness.STRICT_STUBS)
+public class GenericReadOnlyWindowStoreFacadeTest {
+
+ @Mock
+ private ReadOnlyWindowStore<String, ValueAndTimestamp<String>>
mockedTimestampedStore;
+ @Mock
+ private ReadOnlyWindowStore<String, ValueTimestampHeaders<String>>
mockedHeadersStore;
+ @Mock
+ private WindowStoreIterator<ValueAndTimestamp<String>>
mockedTimestampedWindowIterator;
+ @Mock
+ private WindowStoreIterator<ValueTimestampHeaders<String>>
mockedHeadersWindowIterator;
+ @Mock
+ private KeyValueIterator<Windowed<String>, ValueAndTimestamp<String>>
mockedTimestampedKeyValueIterator;
+ @Mock
+ private KeyValueIterator<Windowed<String>, ValueTimestampHeaders<String>>
mockedHeadersKeyValueIterator;
+
+ @Test
+ public void shouldConvertSingleValueFetch() {
+ final Function<ValueAndTimestamp<String>, String> converter =
ValueConverters.extractValue();
+ final GenericReadOnlyWindowStoreFacade<String,
ValueAndTimestamp<String>, String> facade =
+ new GenericReadOnlyWindowStoreFacade<>(mockedTimestampedStore,
converter);
+
+ when(mockedTimestampedStore.fetch("key1", 21L))
+ .thenReturn(ValueAndTimestamp.make("value1", 42L));
+ when(mockedTimestampedStore.fetch("unknownKey", 21L))
+ .thenReturn(null);
+
+ assertThat(facade.fetch("key1", 21L), is("value1"));
+ assertNull(facade.fetch("unknownKey", 21L));
+ }
+
+ @Test
+ public void shouldConvertSingleValueFetchWithHeadersConverter() {
+ final Function<ValueTimestampHeaders<String>, String> converter =
ValueConverters.extractValueFromHeaders();
+ final GenericReadOnlyWindowStoreFacade<String,
ValueTimestampHeaders<String>, String> facade =
+ new GenericReadOnlyWindowStoreFacade<>(mockedHeadersStore,
converter);
+
+ when(mockedHeadersStore.fetch("key1", 21L))
+ .thenReturn(ValueTimestampHeaders.make("value1", 42L, new
RecordHeaders()));
+ when(mockedHeadersStore.fetch("unknownKey", 21L))
+ .thenReturn(null);
+
+ assertThat(facade.fetch("key1", 21L), is("value1"));
+ assertNull(facade.fetch("unknownKey", 21L));
+ }
+
+ @Test
+ public void shouldConvertToValueAndTimestamp() {
+ final Function<ValueTimestampHeaders<String>,
ValueAndTimestamp<String>> converter =
+ ValueConverters.headersToValueAndTimestamp();
+ final GenericReadOnlyWindowStoreFacade<String,
ValueTimestampHeaders<String>, ValueAndTimestamp<String>> facade =
+ new GenericReadOnlyWindowStoreFacade<>(mockedHeadersStore,
converter);
+
+ when(mockedHeadersStore.fetch("key1", 21L))
+ .thenReturn(ValueTimestampHeaders.make("value1", 42L, new
RecordHeaders()));
+
+ final ValueAndTimestamp<String> result = facade.fetch("key1", 21L);
+ assertThat(result.value(), is("value1"));
+ assertThat(result.timestamp(), is(42L));
+ }
+
+ @Test
+ public void shouldConvertWindowStoreIterator() {
+ final Function<ValueAndTimestamp<String>, String> converter =
ValueConverters.extractValue();
+ final GenericReadOnlyWindowStoreFacade<String,
ValueAndTimestamp<String>, String> facade =
+ new GenericReadOnlyWindowStoreFacade<>(mockedTimestampedStore,
converter);
+
+ when(mockedTimestampedWindowIterator.next())
+ .thenReturn(KeyValue.pair(21L, ValueAndTimestamp.make("value1",
22L)))
+ .thenReturn(KeyValue.pair(42L, ValueAndTimestamp.make("value2",
23L)));
+ when(mockedTimestampedStore.fetch("key1", Instant.ofEpochMilli(21L),
Instant.ofEpochMilli(42L)))
+ .thenReturn(mockedTimestampedWindowIterator);
+
+ final WindowStoreIterator<String> iterator =
+ facade.fetch("key1", Instant.ofEpochMilli(21L),
Instant.ofEpochMilli(42L));
+
+ assertThat(iterator.next(), is(KeyValue.pair(21L, "value1")));
+ assertThat(iterator.next(), is(KeyValue.pair(42L, "value2")));
+ }
+
+ @Test
+ public void shouldConvertBackwardFetchWindowStoreIterator() {
+ final Function<ValueAndTimestamp<String>, String> converter =
ValueConverters.extractValue();
+ final GenericReadOnlyWindowStoreFacade<String,
ValueAndTimestamp<String>, String> facade =
+ new GenericReadOnlyWindowStoreFacade<>(mockedTimestampedStore,
converter);
+
+ when(mockedTimestampedWindowIterator.next())
+ .thenReturn(KeyValue.pair(42L, ValueAndTimestamp.make("value2",
23L)))
+ .thenReturn(KeyValue.pair(21L, ValueAndTimestamp.make("value1",
22L)));
+ when(mockedTimestampedStore.backwardFetch("key1",
Instant.ofEpochMilli(21L), Instant.ofEpochMilli(42L)))
+ .thenReturn(mockedTimestampedWindowIterator);
+
+ final WindowStoreIterator<String> iterator =
+ facade.backwardFetch("key1", Instant.ofEpochMilli(21L),
Instant.ofEpochMilli(42L));
+
+ assertThat(iterator.next(), is(KeyValue.pair(42L, "value2")));
+ assertThat(iterator.next(), is(KeyValue.pair(21L, "value1")));
+ }
+
+ @Test
+ public void shouldConvertKeyRangeFetchIterator() {
+ final Function<ValueAndTimestamp<String>, String> converter =
ValueConverters.extractValue();
+ final GenericReadOnlyWindowStoreFacade<String,
ValueAndTimestamp<String>, String> facade =
+ new GenericReadOnlyWindowStoreFacade<>(mockedTimestampedStore,
converter);
+
+ when(mockedTimestampedKeyValueIterator.next())
+ .thenReturn(KeyValue.pair(
+ new Windowed<>("key1", new TimeWindow(21L, 22L)),
+ ValueAndTimestamp.make("value1", 22L)))
+ .thenReturn(KeyValue.pair(
+ new Windowed<>("key2", new TimeWindow(42L, 43L)),
+ ValueAndTimestamp.make("value2", 100L)));
+ when(mockedTimestampedStore.fetch("key1", "key2",
Instant.ofEpochMilli(21L), Instant.ofEpochMilli(42L)))
+ .thenReturn(mockedTimestampedKeyValueIterator);
+
+ final KeyValueIterator<Windowed<String>, String> iterator =
+ facade.fetch("key1", "key2", Instant.ofEpochMilli(21L),
Instant.ofEpochMilli(42L));
+
+ assertThat(iterator.next(), is(KeyValue.pair(new Windowed<>("key1",
new TimeWindow(21L, 22L)), "value1")));
+ assertThat(iterator.next(), is(KeyValue.pair(new Windowed<>("key2",
new TimeWindow(42L, 43L)), "value2")));
+ }
+
+ @Test
+ public void shouldConvertBackwardFetchKeyRangeIterator() {
+ final Function<ValueAndTimestamp<String>, String> converter =
ValueConverters.extractValue();
+ final GenericReadOnlyWindowStoreFacade<String,
ValueAndTimestamp<String>, String> facade =
+ new GenericReadOnlyWindowStoreFacade<>(mockedTimestampedStore,
converter);
+
+ when(mockedTimestampedKeyValueIterator.next())
+ .thenReturn(KeyValue.pair(
+ new Windowed<>("key2", new TimeWindow(42L, 43L)),
+ ValueAndTimestamp.make("value2", 100L)))
+ .thenReturn(KeyValue.pair(
+ new Windowed<>("key1", new TimeWindow(21L, 22L)),
+ ValueAndTimestamp.make("value1", 22L)));
+ when(mockedTimestampedStore.backwardFetch("key1", "key2",
Instant.ofEpochMilli(21L), Instant.ofEpochMilli(42L)))
+ .thenReturn(mockedTimestampedKeyValueIterator);
+
+ final KeyValueIterator<Windowed<String>, String> iterator =
+ facade.backwardFetch("key1", "key2", Instant.ofEpochMilli(21L),
Instant.ofEpochMilli(42L));
+
+ assertThat(iterator.next(), is(KeyValue.pair(new Windowed<>("key2",
new TimeWindow(42L, 43L)), "value2")));
+ assertThat(iterator.next(), is(KeyValue.pair(new Windowed<>("key1",
new TimeWindow(21L, 22L)), "value1")));
+ }
+
+ @Test
+ public void shouldConvertFetchAllIterator() {
+ final Function<ValueAndTimestamp<String>, String> converter =
ValueConverters.extractValue();
+ final GenericReadOnlyWindowStoreFacade<String,
ValueAndTimestamp<String>, String> facade =
+ new GenericReadOnlyWindowStoreFacade<>(mockedTimestampedStore,
converter);
+
+ when(mockedTimestampedKeyValueIterator.next())
+ .thenReturn(KeyValue.pair(
+ new Windowed<>("key1", new TimeWindow(21L, 22L)),
+ ValueAndTimestamp.make("value1", 22L)))
+ .thenReturn(KeyValue.pair(
+ new Windowed<>("key2", new TimeWindow(42L, 43L)),
+ ValueAndTimestamp.make("value2", 100L)));
+ when(mockedTimestampedStore.fetchAll(Instant.ofEpochMilli(21L),
Instant.ofEpochMilli(42L)))
+ .thenReturn(mockedTimestampedKeyValueIterator);
+
+ final KeyValueIterator<Windowed<String>, String> iterator =
+ facade.fetchAll(Instant.ofEpochMilli(21L),
Instant.ofEpochMilli(42L));
+
+ assertThat(iterator.next(), is(KeyValue.pair(new Windowed<>("key1",
new TimeWindow(21L, 22L)), "value1")));
+ assertThat(iterator.next(), is(KeyValue.pair(new Windowed<>("key2",
new TimeWindow(42L, 43L)), "value2")));
+ }
+
+ @Test
+ public void shouldConvertBackwardFetchAllIterator() {
+ final Function<ValueAndTimestamp<String>, String> converter =
ValueConverters.extractValue();
+ final GenericReadOnlyWindowStoreFacade<String,
ValueAndTimestamp<String>, String> facade =
+ new GenericReadOnlyWindowStoreFacade<>(mockedTimestampedStore,
converter);
+
+ when(mockedTimestampedKeyValueIterator.next())
+ .thenReturn(KeyValue.pair(
+ new Windowed<>("key2", new TimeWindow(42L, 43L)),
+ ValueAndTimestamp.make("value2", 100L)))
+ .thenReturn(KeyValue.pair(
+ new Windowed<>("key1", new TimeWindow(21L, 22L)),
+ ValueAndTimestamp.make("value1", 22L)));
+
when(mockedTimestampedStore.backwardFetchAll(Instant.ofEpochMilli(21L),
Instant.ofEpochMilli(42L)))
+ .thenReturn(mockedTimestampedKeyValueIterator);
+
+ final KeyValueIterator<Windowed<String>, String> iterator =
+ facade.backwardFetchAll(Instant.ofEpochMilli(21L),
Instant.ofEpochMilli(42L));
+
+ assertThat(iterator.next(), is(KeyValue.pair(new Windowed<>("key2",
new TimeWindow(42L, 43L)), "value2")));
+ assertThat(iterator.next(), is(KeyValue.pair(new Windowed<>("key1",
new TimeWindow(21L, 22L)), "value1")));
+ }
+
+ @Test
+ public void shouldConvertAllIterator() {
+ final Function<ValueAndTimestamp<String>, String> converter =
ValueConverters.extractValue();
+ final GenericReadOnlyWindowStoreFacade<String,
ValueAndTimestamp<String>, String> facade =
+ new GenericReadOnlyWindowStoreFacade<>(mockedTimestampedStore,
converter);
+
+ when(mockedTimestampedKeyValueIterator.next())
+ .thenReturn(KeyValue.pair(
+ new Windowed<>("key1", new TimeWindow(21L, 22L)),
+ ValueAndTimestamp.make("value1", 22L)))
+ .thenReturn(KeyValue.pair(
+ new Windowed<>("key2", new TimeWindow(42L, 43L)),
+ ValueAndTimestamp.make("value2", 100L)));
+
when(mockedTimestampedStore.all()).thenReturn(mockedTimestampedKeyValueIterator);
+
+ final KeyValueIterator<Windowed<String>, String> iterator =
facade.all();
+
+ assertThat(iterator.next(), is(KeyValue.pair(new Windowed<>("key1",
new TimeWindow(21L, 22L)), "value1")));
+ assertThat(iterator.next(), is(KeyValue.pair(new Windowed<>("key2",
new TimeWindow(42L, 43L)), "value2")));
+ }
+
+ @Test
+ public void shouldConvertBackwardAllIterator() {
+ final Function<ValueAndTimestamp<String>, String> converter =
ValueConverters.extractValue();
+ final GenericReadOnlyWindowStoreFacade<String,
ValueAndTimestamp<String>, String> facade =
+ new GenericReadOnlyWindowStoreFacade<>(mockedTimestampedStore,
converter);
+
+ when(mockedTimestampedKeyValueIterator.next())
+ .thenReturn(KeyValue.pair(
+ new Windowed<>("key2", new TimeWindow(42L, 43L)),
+ ValueAndTimestamp.make("value2", 100L)))
+ .thenReturn(KeyValue.pair(
+ new Windowed<>("key1", new TimeWindow(21L, 22L)),
+ ValueAndTimestamp.make("value1", 22L)));
+
when(mockedTimestampedStore.backwardAll()).thenReturn(mockedTimestampedKeyValueIterator);
+
+ final KeyValueIterator<Windowed<String>, String> iterator =
facade.backwardAll();
+
+ assertThat(iterator.next(), is(KeyValue.pair(new Windowed<>("key2",
new TimeWindow(42L, 43L)), "value2")));
+ assertThat(iterator.next(), is(KeyValue.pair(new Windowed<>("key1",
new TimeWindow(21L, 22L)), "value1")));
+ }
+
+ @Test
+ public void shouldHandleNullValuesInWindowIterator() {
+ final Function<ValueAndTimestamp<String>, String> converter =
ValueConverters.extractValue();
+ final GenericReadOnlyWindowStoreFacade<String,
ValueAndTimestamp<String>, String> facade =
+ new GenericReadOnlyWindowStoreFacade<>(mockedTimestampedStore,
converter);
+
+ when(mockedTimestampedWindowIterator.next())
+ .thenReturn(KeyValue.pair(21L, null))
+ .thenReturn(KeyValue.pair(42L, ValueAndTimestamp.make("value2",
23L)));
+ when(mockedTimestampedStore.fetch("key1", Instant.ofEpochMilli(21L),
Instant.ofEpochMilli(42L)))
+ .thenReturn(mockedTimestampedWindowIterator);
+
+ final WindowStoreIterator<String> iterator =
+ facade.fetch("key1", Instant.ofEpochMilli(21L),
Instant.ofEpochMilli(42L));
+
+ assertThat(iterator.next(), is(KeyValue.pair(21L, null)));
+ assertThat(iterator.next(), is(KeyValue.pair(42L, "value2")));
+ }
+}
\ No newline at end of file
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionStoreIteratorFacadeTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/GenericWindowStoreIteratorFacadeTest.java
similarity index 50%
rename from
streams/src/test/java/org/apache/kafka/streams/state/internals/SessionStoreIteratorFacadeTest.java
rename to
streams/src/test/java/org/apache/kafka/streams/state/internals/GenericWindowStoreIteratorFacadeTest.java
index c1f35083051..9bbab36e6ae 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionStoreIteratorFacadeTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/GenericWindowStoreIteratorFacadeTest.java
@@ -16,10 +16,10 @@
*/
package org.apache.kafka.streams.state.internals;
-import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.state.AggregationWithHeaders;
import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.WindowStoreIterator;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -29,63 +29,70 @@ import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;
+import java.util.function.Function;
+
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
-import static org.hamcrest.Matchers.nullValue;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
@MockitoSettings(strictness = Strictness.STRICT_STUBS)
-public class SessionStoreIteratorFacadeTest {
+public class GenericWindowStoreIteratorFacadeTest {
+
@Mock
- private KeyValueIterator<String, AggregationWithHeaders<String>>
mockedInnerIterator;
+ private KeyValueIterator<Long, ValueAndTimestamp<String>>
mockedInnerIterator;
- private SessionStoreIteratorFacade<String, String> facade;
+ private WindowStoreIterator<String> facade;
@BeforeEach
public void setup() {
- facade = new SessionStoreIteratorFacade<>(mockedInnerIterator);
+ final Function<ValueAndTimestamp<String>, String> converter =
ValueConverters.extractValue();
+ facade = new GenericWindowStoreIteratorFacade<>(mockedInnerIterator,
converter);
}
@Test
- public void shouldForwardHasNext() {
- when(mockedInnerIterator.hasNext()).thenReturn(true);
- assertTrue(facade.hasNext());
- }
+ public void shouldConvertValues() {
+ when(mockedInnerIterator.next())
+ .thenReturn(KeyValue.pair(100L, ValueAndTimestamp.make("value1",
42L)))
+ .thenReturn(KeyValue.pair(200L, ValueAndTimestamp.make("value2",
84L)));
- @Test
- public void shouldForwardPeekNextKey() {
- when(mockedInnerIterator.peekNextKey()).thenReturn("key");
- assertThat(facade.peekNextKey(), is("key"));
+ assertThat(facade.next(), is(KeyValue.pair(100L, "value1")));
+ assertThat(facade.next(), is(KeyValue.pair(200L, "value2")));
}
@Test
- public void shouldReturnPlainKeyValuePairOnNext() {
- final AggregationWithHeaders<String> aggregation =
- AggregationWithHeaders.make("value", new RecordHeaders());
- when(mockedInnerIterator.next()).thenReturn(new KeyValue<>("key",
aggregation));
- assertThat(facade.next(), is(KeyValue.pair("key", "value")));
+ public void shouldHandleNullValues() {
+ when(mockedInnerIterator.next())
+ .thenReturn(KeyValue.pair(100L, null))
+ .thenReturn(KeyValue.pair(200L, ValueAndTimestamp.make("value2",
42L)));
+
+ assertThat(facade.next(), is(KeyValue.pair(100L, null)));
+ assertThat(facade.next(), is(KeyValue.pair(200L, "value2")));
}
@Test
- public void shouldReturnNullValueWhenAggregationWithHeadersIsNull() {
- when(mockedInnerIterator.next()).thenReturn(new KeyValue<>("key",
null));
- final KeyValue<String, String> result = facade.next();
- assertThat(result.key, is("key"));
- assertThat(result.value, is(nullValue()));
+ public void shouldDelegateHasNext() {
+ when(mockedInnerIterator.hasNext()).thenReturn(true, false);
+
+ assertTrue(facade.hasNext());
+ assertFalse(facade.hasNext());
}
@Test
- public void shouldReturnNullWhenInnerNextReturnsNull() {
- when(mockedInnerIterator.next()).thenReturn(null);
- assertThat(facade.next(), is(nullValue()));
+ public void shouldDelegatePeekNextKey() {
+ when(mockedInnerIterator.peekNextKey()).thenReturn(100L, null);
+
+ assertThat(facade.peekNextKey(), is(100L));
+ assertNull(facade.peekNextKey());
}
@Test
- public void shouldCloseInnerIterator() {
+ public void shouldDelegateClose() {
facade.close();
verify(mockedInnerIterator).close();
}
-}
+}
\ No newline at end of file
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyKeyValueStoreFacadeTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyKeyValueStoreFacadeTest.java
deleted file mode 100644
index 6d978c80b0f..00000000000
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyKeyValueStoreFacadeTest.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.state.internals;
-
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.state.KeyValueIterator;
-import org.apache.kafka.streams.state.TimestampedKeyValueStore;
-import org.apache.kafka.streams.state.ValueAndTimestamp;
-
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
-import org.mockito.Mock;
-import org.mockito.junit.jupiter.MockitoExtension;
-import org.mockito.junit.jupiter.MockitoSettings;
-import org.mockito.quality.Strictness;
-
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.is;
-import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.mockito.Mockito.when;
-
-@ExtendWith(MockitoExtension.class)
-@MockitoSettings(strictness = Strictness.STRICT_STUBS)
-public class ReadOnlyKeyValueStoreFacadeTest {
- @Mock
- private TimestampedKeyValueStore<String, String>
mockedKeyValueTimestampStore;
- @Mock
- private KeyValueIterator<String, ValueAndTimestamp<String>>
mockedKeyValueTimestampIterator;
-
- private ReadOnlyKeyValueStoreFacade<String, String>
readOnlyKeyValueStoreFacade;
-
- @BeforeEach
- public void setup() {
- readOnlyKeyValueStoreFacade = new
ReadOnlyKeyValueStoreFacade<>(mockedKeyValueTimestampStore);
- }
-
- @Test
- public void shouldReturnPlainValueOnGet() {
- when(mockedKeyValueTimestampStore.get("key"))
- .thenReturn(ValueAndTimestamp.make("value", 42L));
- when(mockedKeyValueTimestampStore.get("unknownKey"))
- .thenReturn(null);
-
- assertThat(readOnlyKeyValueStoreFacade.get("key"), is("value"));
- assertNull(readOnlyKeyValueStoreFacade.get("unknownKey"));
- }
-
- @Test
- public void shouldReturnPlainKeyValuePairsForRangeIterator() {
- when(mockedKeyValueTimestampIterator.next())
- .thenReturn(KeyValue.pair("key1", ValueAndTimestamp.make("value1",
21L)))
- .thenReturn(KeyValue.pair("key2", ValueAndTimestamp.make("value2",
42L)));
- when(mockedKeyValueTimestampStore.range("key1",
"key2")).thenReturn(mockedKeyValueTimestampIterator);
-
- final KeyValueIterator<String, String> iterator =
readOnlyKeyValueStoreFacade.range("key1", "key2");
- assertThat(iterator.next(), is(KeyValue.pair("key1", "value1")));
- assertThat(iterator.next(), is(KeyValue.pair("key2", "value2")));
- }
-
- @Test
- public void shouldReturnPlainKeyValuePairsForPrefixScan() {
- final StringSerializer stringSerializer = new StringSerializer();
- when(mockedKeyValueTimestampIterator.next())
- .thenReturn(KeyValue.pair("key1", ValueAndTimestamp.make("value1",
21L)))
- .thenReturn(KeyValue.pair("key2", ValueAndTimestamp.make("value2",
42L)));
- when(mockedKeyValueTimestampStore.prefixScan("key",
stringSerializer)).thenReturn(mockedKeyValueTimestampIterator);
-
- final KeyValueIterator<String, String> iterator =
readOnlyKeyValueStoreFacade.prefixScan("key", stringSerializer);
- assertThat(iterator.next(), is(KeyValue.pair("key1", "value1")));
- assertThat(iterator.next(), is(KeyValue.pair("key2", "value2")));
- }
-
- @Test
- public void shouldReturnPlainKeyValuePairsForAllIterator() {
- when(mockedKeyValueTimestampIterator.next())
- .thenReturn(KeyValue.pair("key1", ValueAndTimestamp.make("value1",
21L)))
- .thenReturn(KeyValue.pair("key2", ValueAndTimestamp.make("value2",
42L)));
-
when(mockedKeyValueTimestampStore.all()).thenReturn(mockedKeyValueTimestampIterator);
-
- final KeyValueIterator<String, String> iterator =
readOnlyKeyValueStoreFacade.all();
- assertThat(iterator.next(), is(KeyValue.pair("key1", "value1")));
- assertThat(iterator.next(), is(KeyValue.pair("key2", "value2")));
- }
-
- @Test
- public void shouldForwardApproximateNumEntries() {
-
when(mockedKeyValueTimestampStore.approximateNumEntries()).thenReturn(42L);
-
- assertThat(readOnlyKeyValueStoreFacade.approximateNumEntries(),
is(42L));
- }
-}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreFacadeTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreFacadeTest.java
deleted file mode 100644
index b0d65aa10c2..00000000000
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ReadOnlyWindowStoreFacadeTest.java
+++ /dev/null
@@ -1,192 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.state.internals;
-
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.kstream.Windowed;
-import org.apache.kafka.streams.kstream.internals.TimeWindow;
-import org.apache.kafka.streams.state.KeyValueIterator;
-import org.apache.kafka.streams.state.TimestampedWindowStore;
-import org.apache.kafka.streams.state.ValueAndTimestamp;
-import org.apache.kafka.streams.state.WindowStoreIterator;
-
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
-import org.mockito.Mock;
-import org.mockito.junit.jupiter.MockitoExtension;
-import org.mockito.junit.jupiter.MockitoSettings;
-import org.mockito.quality.Strictness;
-
-import java.time.Instant;
-
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.is;
-import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.mockito.Mockito.when;
-
-@ExtendWith(MockitoExtension.class)
-@MockitoSettings(strictness = Strictness.STRICT_STUBS)
-public class ReadOnlyWindowStoreFacadeTest {
- @Mock
- private TimestampedWindowStore<String, String> mockedWindowTimestampStore;
- @Mock
- private WindowStoreIterator<ValueAndTimestamp<String>>
mockedWindowTimestampIterator;
- @Mock
- private KeyValueIterator<Windowed<String>, ValueAndTimestamp<String>>
mockedKeyValueWindowTimestampIterator;
-
- private ReadOnlyWindowStoreFacade<String, String>
readOnlyWindowStoreFacade;
-
- @BeforeEach
- public void setup() {
- readOnlyWindowStoreFacade = new
ReadOnlyWindowStoreFacade<>(mockedWindowTimestampStore);
- }
-
- @Test
- public void shouldReturnPlainKeyValuePairsOnSingleKeyFetch() {
- when(mockedWindowTimestampStore.fetch("key1", 21L))
- .thenReturn(ValueAndTimestamp.make("value1", 42L));
- when(mockedWindowTimestampStore.fetch("unknownKey", 21L))
- .thenReturn(null);
-
- assertThat(readOnlyWindowStoreFacade.fetch("key1", 21L), is("value1"));
- assertNull(readOnlyWindowStoreFacade.fetch("unknownKey", 21L));
- }
-
- @Test
- public void shouldReturnPlainKeyValuePairsOnSingleKeyFetchLongParameters()
{
- when(mockedWindowTimestampIterator.next())
- .thenReturn(KeyValue.pair(21L, ValueAndTimestamp.make("value1",
22L)))
- .thenReturn(KeyValue.pair(42L, ValueAndTimestamp.make("value2",
23L)));
- when(mockedWindowTimestampStore.fetch("key1",
Instant.ofEpochMilli(21L), Instant.ofEpochMilli(42L)))
- .thenReturn(mockedWindowTimestampIterator);
-
- final WindowStoreIterator<String> iterator =
- readOnlyWindowStoreFacade.fetch("key1", Instant.ofEpochMilli(21L),
Instant.ofEpochMilli(42L));
-
- assertThat(iterator.next(), is(KeyValue.pair(21L, "value1")));
- assertThat(iterator.next(), is(KeyValue.pair(42L, "value2")));
- }
-
- @Test
- public void
shouldReturnPlainKeyValuePairsOnSingleKeyFetchInstantParameters() {
- when(mockedWindowTimestampIterator.next())
- .thenReturn(KeyValue.pair(21L, ValueAndTimestamp.make("value1",
22L)))
- .thenReturn(KeyValue.pair(42L, ValueAndTimestamp.make("value2",
23L)));
- when(mockedWindowTimestampStore.fetch("key1",
Instant.ofEpochMilli(21L), Instant.ofEpochMilli(42L)))
- .thenReturn(mockedWindowTimestampIterator);
-
- final WindowStoreIterator<String> iterator =
- readOnlyWindowStoreFacade.fetch("key1", Instant.ofEpochMilli(21L),
Instant.ofEpochMilli(42L));
-
- assertThat(iterator.next(), is(KeyValue.pair(21L, "value1")));
- assertThat(iterator.next(), is(KeyValue.pair(42L, "value2")));
- }
-
- @Test
- public void shouldReturnPlainKeyValuePairsOnRangeFetchLongParameters() {
- when(mockedKeyValueWindowTimestampIterator.next())
- .thenReturn(KeyValue.pair(
- new Windowed<>("key1", new TimeWindow(21L, 22L)),
- ValueAndTimestamp.make("value1", 22L)))
- .thenReturn(KeyValue.pair(
- new Windowed<>("key2", new TimeWindow(42L, 43L)),
- ValueAndTimestamp.make("value2", 100L)));
- when(mockedWindowTimestampStore.fetch("key1", "key2",
Instant.ofEpochMilli(21L), Instant.ofEpochMilli(42L)))
- .thenReturn(mockedKeyValueWindowTimestampIterator);
-
- final KeyValueIterator<Windowed<String>, String> iterator =
- readOnlyWindowStoreFacade.fetch("key1", "key2",
Instant.ofEpochMilli(21L), Instant.ofEpochMilli(42L));
-
- assertThat(iterator.next(), is(KeyValue.pair(new Windowed<>("key1",
new TimeWindow(21L, 22L)), "value1")));
- assertThat(iterator.next(), is(KeyValue.pair(new Windowed<>("key2",
new TimeWindow(42L, 43L)), "value2")));
- }
-
- @Test
- public void shouldReturnPlainKeyValuePairsOnRangeFetchInstantParameters() {
- when(mockedKeyValueWindowTimestampIterator.next())
- .thenReturn(KeyValue.pair(
- new Windowed<>("key1", new TimeWindow(21L, 22L)),
- ValueAndTimestamp.make("value1", 22L)))
- .thenReturn(KeyValue.pair(
- new Windowed<>("key2", new TimeWindow(42L, 43L)),
- ValueAndTimestamp.make("value2", 100L)));
- when(mockedWindowTimestampStore.fetch("key1", "key2",
Instant.ofEpochMilli(21L), Instant.ofEpochMilli(42L)))
- .thenReturn(mockedKeyValueWindowTimestampIterator);
-
- final KeyValueIterator<Windowed<String>, String> iterator =
- readOnlyWindowStoreFacade.fetch("key1", "key2",
Instant.ofEpochMilli(21L), Instant.ofEpochMilli(42L));
-
- assertThat(iterator.next(), is(KeyValue.pair(new Windowed<>("key1",
new TimeWindow(21L, 22L)), "value1")));
- assertThat(iterator.next(), is(KeyValue.pair(new Windowed<>("key2",
new TimeWindow(42L, 43L)), "value2")));
- }
-
- @Test
- public void shouldReturnPlainKeyValuePairsOnFetchAllLongParameters() {
- when(mockedKeyValueWindowTimestampIterator.next())
- .thenReturn(KeyValue.pair(
- new Windowed<>("key1", new TimeWindow(21L, 22L)),
- ValueAndTimestamp.make("value1", 22L)))
- .thenReturn(KeyValue.pair(
- new Windowed<>("key2", new TimeWindow(42L, 43L)),
- ValueAndTimestamp.make("value2", 100L)));
- when(mockedWindowTimestampStore.fetchAll(Instant.ofEpochMilli(21L),
Instant.ofEpochMilli(42L)))
- .thenReturn(mockedKeyValueWindowTimestampIterator);
-
- final KeyValueIterator<Windowed<String>, String> iterator =
- readOnlyWindowStoreFacade.fetchAll(Instant.ofEpochMilli(21L),
Instant.ofEpochMilli(42L));
-
- assertThat(iterator.next(), is(KeyValue.pair(new Windowed<>("key1",
new TimeWindow(21L, 22L)), "value1")));
- assertThat(iterator.next(), is(KeyValue.pair(new Windowed<>("key2",
new TimeWindow(42L, 43L)), "value2")));
- }
-
- @Test
- public void shouldReturnPlainKeyValuePairsOnFetchAllInstantParameters() {
- when(mockedKeyValueWindowTimestampIterator.next())
- .thenReturn(KeyValue.pair(
- new Windowed<>("key1", new TimeWindow(21L, 22L)),
- ValueAndTimestamp.make("value1", 22L)))
- .thenReturn(KeyValue.pair(
- new Windowed<>("key2", new TimeWindow(42L, 43L)),
- ValueAndTimestamp.make("value2", 100L)));
- when(mockedWindowTimestampStore.fetchAll(Instant.ofEpochMilli(21L),
Instant.ofEpochMilli(42L)))
- .thenReturn(mockedKeyValueWindowTimestampIterator);
-
- final KeyValueIterator<Windowed<String>, String> iterator =
- readOnlyWindowStoreFacade.fetchAll(Instant.ofEpochMilli(21L),
Instant.ofEpochMilli(42L));
-
- assertThat(iterator.next(), is(KeyValue.pair(new Windowed<>("key1",
new TimeWindow(21L, 22L)), "value1")));
- assertThat(iterator.next(), is(KeyValue.pair(new Windowed<>("key2",
new TimeWindow(42L, 43L)), "value2")));
- }
-
- @Test
- public void shouldReturnPlainKeyValuePairsOnAll() {
- when(mockedKeyValueWindowTimestampIterator.next())
- .thenReturn(KeyValue.pair(
- new Windowed<>("key1", new TimeWindow(21L, 22L)),
- ValueAndTimestamp.make("value1", 22L)))
- .thenReturn(KeyValue.pair(
- new Windowed<>("key2", new TimeWindow(42L, 43L)),
- ValueAndTimestamp.make("value2", 100L)));
-
when(mockedWindowTimestampStore.all()).thenReturn(mockedKeyValueWindowTimestampIterator);
-
- final KeyValueIterator<Windowed<String>, String> iterator =
readOnlyWindowStoreFacade.all();
-
- assertThat(iterator.next(), is(KeyValue.pair(new Windowed<>("key1",
new TimeWindow(21L, 22L)), "value1")));
- assertThat(iterator.next(), is(KeyValue.pair(new Windowed<>("key2",
new TimeWindow(42L, 43L)), "value2")));
- }
-}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ValueConvertersTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ValueConvertersTest.java
new file mode 100644
index 00000000000..fd8093f2933
--- /dev/null
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ValueConvertersTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.ValueTimestampHeaders;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.function.Function;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+public class ValueConvertersTest {
+
+ @Test
+ public void extractValueShouldReturnPlainValue() {
+ final Function<ValueAndTimestamp<String>, String> converter =
ValueConverters.extractValue();
+
+ final ValueAndTimestamp<String> valueAndTimestamp =
ValueAndTimestamp.make("value", 42L);
+ assertThat(converter.apply(valueAndTimestamp), is("value"));
+ }
+
+ @Test
+ public void extractValueShouldReturnNullWhenInputIsNull() {
+ final Function<ValueAndTimestamp<String>, String> converter =
ValueConverters.extractValue();
+
+ assertNull(converter.apply(null));
+ }
+
+ @Test
+ public void extractValueFromHeadersShouldReturnPlainValue() {
+ final Function<ValueTimestampHeaders<String>, String> converter =
ValueConverters.extractValueFromHeaders();
+
+ final ValueTimestampHeaders<String> vth =
ValueTimestampHeaders.make("value", 42L, new RecordHeaders());
+ assertThat(converter.apply(vth), is("value"));
+ }
+
+ @Test
+ public void extractValueFromHeadersShouldReturnNullWhenInputIsNull() {
+ final Function<ValueTimestampHeaders<String>, String> converter =
ValueConverters.extractValueFromHeaders();
+
+ assertNull(converter.apply(null));
+ }
+
+ @Test
+ public void headersToValueAndTimestampShouldConvertCorrectly() {
+ final Function<ValueTimestampHeaders<String>,
ValueAndTimestamp<String>> converter =
+ ValueConverters.headersToValueAndTimestamp();
+
+ final ValueTimestampHeaders<String> vth =
ValueTimestampHeaders.make("value", 42L, new RecordHeaders());
+ final ValueAndTimestamp<String> result = converter.apply(vth);
+
+ assertThat(result.value(), is("value"));
+ assertThat(result.timestamp(), is(42L));
+ }
+
+ @Test
+ public void headersToValueAndTimestampShouldReturnNullWhenInputIsNull() {
+ final Function<ValueTimestampHeaders<String>,
ValueAndTimestamp<String>> converter =
+ ValueConverters.headersToValueAndTimestamp();
+
+ assertNull(converter.apply(null));
+ }
+
+ @Test
+ public void headersToValueAndTimestampShouldDiscardHeaders() {
+ final Function<ValueTimestampHeaders<String>,
ValueAndTimestamp<String>> converter =
+ ValueConverters.headersToValueAndTimestamp();
+
+ final RecordHeaders headers = new RecordHeaders();
+ headers.add("key1", "value1".getBytes());
+ final ValueTimestampHeaders<String> vth =
ValueTimestampHeaders.make("value", 42L, headers);
+
+ final ValueAndTimestamp<String> result = converter.apply(vth);
+
+ // Should only have value and timestamp, headers are discarded
+ assertThat(result.value(), is("value"));
+ assertThat(result.timestamp(), is(42L));
+ }
+}
\ No newline at end of file
diff --git
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index 4fa9117cee7..beae91966fc 100644
---
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -93,10 +93,11 @@ import org.apache.kafka.streams.state.ValueTimestampHeaders;
import org.apache.kafka.streams.state.VersionedKeyValueStore;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.WindowStoreIterator;
-import org.apache.kafka.streams.state.internals.ReadOnlyKeyValueStoreFacade;
-import org.apache.kafka.streams.state.internals.ReadOnlyWindowStoreFacade;
+import
org.apache.kafka.streams.state.internals.GenericReadOnlyKeyValueStoreFacade;
+import
org.apache.kafka.streams.state.internals.GenericReadOnlyWindowStoreFacade;
import org.apache.kafka.streams.state.internals.SessionStoreIteratorFacade;
import org.apache.kafka.streams.state.internals.ThreadCache;
+import org.apache.kafka.streams.state.internals.ValueConverters;
import org.apache.kafka.streams.test.TestRecord;
import org.slf4j.Logger;
@@ -1286,10 +1287,12 @@ public class TopologyTestDriver implements Closeable {
}
}
- static class KeyValueStoreFacade<K, V> extends
ReadOnlyKeyValueStoreFacade<K, V> implements KeyValueStore<K, V> {
+ static class KeyValueStoreFacade<K, V> extends
GenericReadOnlyKeyValueStoreFacade<K, ValueAndTimestamp<V>, V> implements
KeyValueStore<K, V> {
+ private final TimestampedKeyValueStore<K, V> inner;
- public KeyValueStoreFacade(final TimestampedKeyValueStore<K, V> inner)
{
- super(inner);
+ public KeyValueStoreFacade(final TimestampedKeyValueStore<K, V> store)
{
+ super(store, ValueConverters.extractValue());
+ this.inner = store;
}
@Override
@@ -1350,10 +1353,12 @@ public class TopologyTestDriver implements Closeable {
}
}
- static class WindowStoreFacade<K, V> extends ReadOnlyWindowStoreFacade<K,
V> implements WindowStore<K, V> {
+ static class WindowStoreFacade<K, V> extends
GenericReadOnlyWindowStoreFacade<K, ValueAndTimestamp<V>, V> implements
WindowStore<K, V> {
+ private final TimestampedWindowStore<K, V> inner;
public WindowStoreFacade(final TimestampedWindowStore<K, V> store) {
- super(store);
+ super(store, ValueConverters.extractValue());
+ this.inner = store;
}
@Override
@@ -1361,13 +1366,6 @@ public class TopologyTestDriver implements Closeable {
inner.init(stateStoreContext, root);
}
- @Override
- public void put(final K key,
- final V value,
- final long windowStartTimestamp) {
- inner.put(key, ValueAndTimestamp.make(value,
ConsumerRecord.NO_TIMESTAMP), windowStartTimestamp);
- }
-
@Override
public WindowStoreIterator<V> fetch(final K key,
final long timeFrom,
@@ -1376,10 +1374,9 @@ public class TopologyTestDriver implements Closeable {
}
@Override
- public WindowStoreIterator<V> backwardFetch(final K key,
- final long timeFrom,
- final long timeTo) {
- return backwardFetch(key, Instant.ofEpochMilli(timeFrom),
Instant.ofEpochMilli(timeTo));
+ public KeyValueIterator<Windowed<K>, V> fetchAll(final long timeFrom,
+ final long timeTo) {
+ return fetchAll(Instant.ofEpochMilli(timeFrom),
Instant.ofEpochMilli(timeTo));
}
@Override
@@ -1387,8 +1384,20 @@ public class TopologyTestDriver implements Closeable {
final K keyTo,
final long timeFrom,
final long timeTo) {
- return fetch(keyFrom, keyTo, Instant.ofEpochMilli(timeFrom),
- Instant.ofEpochMilli(timeTo));
+ return fetch(keyFrom, keyTo, Instant.ofEpochMilli(timeFrom),
Instant.ofEpochMilli(timeTo));
+ }
+
+ @Override
+ public WindowStoreIterator<V> backwardFetch(final K key,
+ final long timeFrom,
+ final long timeTo) {
+ return backwardFetch(key, Instant.ofEpochMilli(timeFrom),
Instant.ofEpochMilli(timeTo));
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<K>, V> backwardFetchAll(final long
timeFrom,
+ final long
timeTo) {
+ return backwardFetchAll(Instant.ofEpochMilli(timeFrom),
Instant.ofEpochMilli(timeTo));
}
@Override
@@ -1400,15 +1409,10 @@ public class TopologyTestDriver implements Closeable {
}
@Override
- public KeyValueIterator<Windowed<K>, V> fetchAll(final long timeFrom,
- final long timeTo) {
- return fetchAll(Instant.ofEpochMilli(timeFrom),
Instant.ofEpochMilli(timeTo));
- }
-
- @Override
- public KeyValueIterator<Windowed<K>, V> backwardFetchAll(final long
timeFrom,
- final long
timeTo) {
- return backwardFetchAll(Instant.ofEpochMilli(timeFrom),
Instant.ofEpochMilli(timeTo));
+ public void put(final K key,
+ final V value,
+ final long windowStartTimestamp) {
+ inner.put(key, ValueAndTimestamp.make(value,
ConsumerRecord.NO_TIMESTAMP), windowStartTimestamp);
}
@Override
@@ -1451,31 +1455,31 @@ public class TopologyTestDriver implements Closeable {
@Override
public KeyValueIterator<Windowed<K>, V> findSessions(final K key,
- final long
earliestSessionEndTime,
- final long
latestSessionStartTime) {
+ final long
earliestSessionEndTime,
+ final long
latestSessionStartTime) {
return new SessionStoreIteratorFacade<>(inner.findSessions(key,
earliestSessionEndTime, latestSessionStartTime));
}
@Override
public KeyValueIterator<Windowed<K>, V> backwardFindSessions(final K
key,
- final
long earliestSessionEndTime,
- final
long latestSessionStartTime) {
+ final
long earliestSessionEndTime,
+ final
long latestSessionStartTime) {
return new
SessionStoreIteratorFacade<>(inner.backwardFindSessions(key,
earliestSessionEndTime, latestSessionStartTime));
}
@Override
public KeyValueIterator<Windowed<K>, V> findSessions(final K keyFrom,
- final K keyTo,
- final long
earliestSessionEndTime,
- final long
latestSessionStartTime) {
+ final K keyTo,
+ final long
earliestSessionEndTime,
+ final long
latestSessionStartTime) {
return new
SessionStoreIteratorFacade<>(inner.findSessions(keyFrom, keyTo,
earliestSessionEndTime, latestSessionStartTime));
}
@Override
public KeyValueIterator<Windowed<K>, V> backwardFindSessions(final K
keyFrom,
- final K
keyTo,
- final
long earliestSessionEndTime,
- final
long latestSessionStartTime) {
+ final K
keyTo,
+ final
long earliestSessionEndTime,
+ final
long latestSessionStartTime) {
return new
SessionStoreIteratorFacade<>(inner.backwardFindSessions(keyFrom, keyTo,
earliestSessionEndTime, latestSessionStartTime));
}
@@ -1488,7 +1492,7 @@ public class TopologyTestDriver implements Closeable {
@Override
public KeyValueIterator<Windowed<K>, V> findSessions(final long
earliestSessionEndTime,
- final long
latestSessionEndTime) {
+ final long
latestSessionEndTime) {
return new
SessionStoreIteratorFacade<>(inner.findSessions(earliestSessionEndTime,
latestSessionEndTime));
}
diff --git
a/streams/test-utils/src/test/java/org/apache/kafka/streams/KeyValueStoreFacadeTest.java
b/streams/test-utils/src/test/java/org/apache/kafka/streams/KeyValueStoreFacadeTest.java
index 5942322f103..3fb7aa7d99b 100644
---
a/streams/test-utils/src/test/java/org/apache/kafka/streams/KeyValueStoreFacadeTest.java
+++
b/streams/test-utils/src/test/java/org/apache/kafka/streams/KeyValueStoreFacadeTest.java
@@ -20,6 +20,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.streams.TopologyTestDriver.KeyValueStoreFacade;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.query.Position;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
@@ -138,4 +139,13 @@ public class KeyValueStoreFacadeTest {
assertThat(keyValueStoreFacade.isOpen(), is(false));
verify(mockedKeyValueTimestampStore, times(2)).isOpen();
}
+
+ @Test
+ public void shouldReturnPosition() {
+ when(mockedKeyValueTimestampStore.getPosition())
+ .thenReturn(Position.emptyPosition());
+
+ assertThat(keyValueStoreFacade.getPosition(),
is(Position.emptyPosition()));
+ verify(mockedKeyValueTimestampStore, times(1)).getPosition();
+ }
}
diff --git
a/streams/test-utils/src/test/java/org/apache/kafka/streams/WindowStoreFacadeTest.java
b/streams/test-utils/src/test/java/org/apache/kafka/streams/WindowStoreFacadeTest.java
index 7b64dc6e374..9964e5cf6bb 100644
---
a/streams/test-utils/src/test/java/org/apache/kafka/streams/WindowStoreFacadeTest.java
+++
b/streams/test-utils/src/test/java/org/apache/kafka/streams/WindowStoreFacadeTest.java
@@ -18,14 +18,20 @@ package org.apache.kafka.streams;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.streams.TopologyTestDriver.WindowStoreFacade;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.internals.TimeWindow;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.TimestampedWindowStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.WindowStoreIterator;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import java.time.Instant;
import java.util.Map;
import static org.hamcrest.CoreMatchers.is;
@@ -103,4 +109,68 @@ public class WindowStoreFacadeTest {
verify(mockedWindowTimestampStore, times(2)).isOpen();
}
+ @Test
+ public void shouldReturnPosition() {
+ when(mockedWindowTimestampStore.getPosition())
+ .thenReturn(Position.emptyPosition());
+
+ assertThat(windowStoreFacade.getPosition(),
is(Position.emptyPosition()));
+ verify(mockedWindowTimestampStore, times(1)).getPosition();
+ }
+
+ @Test
+ public void shouldFetchTimeRangeAndConvertValues() {
+ @SuppressWarnings("unchecked")
+ final WindowStoreIterator<ValueAndTimestamp<String>> mockIterator =
mock(WindowStoreIterator.class);
+ final long from = 100L;
+ final long to = 200L;
+
+ when(mockedWindowTimestampStore.fetch("key",
Instant.ofEpochMilli(from), Instant.ofEpochMilli(to))).thenReturn(mockIterator);
+ when(mockIterator.hasNext()).thenReturn(true, true, false);
+ when(mockIterator.next())
+ .thenReturn(KeyValue.pair(100L, ValueAndTimestamp.make("value1",
10L)))
+ .thenReturn(KeyValue.pair(150L, ValueAndTimestamp.make("value2",
20L)));
+
+ try (final WindowStoreIterator<String> iterator =
windowStoreFacade.fetch("key", from, to)) {
+ assertThat(iterator.next(), is(KeyValue.pair(100L, "value1")));
+ assertThat(iterator.next(), is(KeyValue.pair(150L, "value2")));
+ }
+ }
+
+ @Test
+ public void shouldFetchAllTimeRangeAndConvertValues() {
+ @SuppressWarnings("unchecked")
+ final KeyValueIterator<Windowed<String>, ValueAndTimestamp<String>>
mockIterator = mock(KeyValueIterator.class);
+ final long from = 100L;
+ final long to = 200L;
+ final Windowed<String> windowedKey = new Windowed<>("key", new
TimeWindow(100L, 200L));
+
+ when(mockedWindowTimestampStore.fetchAll(Instant.ofEpochMilli(from),
Instant.ofEpochMilli(to))).thenReturn(mockIterator);
+ when(mockIterator.hasNext()).thenReturn(true, false);
+ when(mockIterator.next())
+ .thenReturn(KeyValue.pair(windowedKey,
ValueAndTimestamp.make("value", 10L)));
+
+ try (final KeyValueIterator<Windowed<String>, String> iterator =
windowStoreFacade.fetchAll(from, to)) {
+ assertThat(iterator.next(), is(KeyValue.pair(windowedKey,
"value")));
+ }
+ }
+
+ @Test
+ public void shouldFetchKeyRangeTimeRangeAndConvertValues() {
+ @SuppressWarnings("unchecked")
+ final KeyValueIterator<Windowed<String>, ValueAndTimestamp<String>>
mockIterator = mock(KeyValueIterator.class);
+ final long from = 100L;
+ final long to = 200L;
+ final Windowed<String> windowedKey = new Windowed<>("key", new
TimeWindow(100L, 200L));
+
+ when(mockedWindowTimestampStore.fetch("key", "key",
Instant.ofEpochMilli(from), Instant.ofEpochMilli(to))).thenReturn(mockIterator);
+ when(mockIterator.hasNext()).thenReturn(true, false);
+ when(mockIterator.next())
+ .thenReturn(KeyValue.pair(windowedKey,
ValueAndTimestamp.make("value", 10L)));
+
+ try (final KeyValueIterator<Windowed<String>, String> iterator =
windowStoreFacade.fetch("key", "key", from, to)) {
+ assertThat(iterator.next(), is(KeyValue.pair(windowedKey,
"value")));
+ }
+ }
+
}