This is an automated email from the ASF dual-hosted git repository.
frankvicky pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new c1f901cdb29 KAFKA-20132: Fix header-based key deserialization in
MeteredTimestampedKeyValueStoreWithHeaders iterator methods (#21736)
c1f901cdb29 is described below
commit c1f901cdb29986b20859ddd38c045e6e699540ff
Author: TengYao Chi <[email protected]>
AuthorDate: Sun Mar 15 16:42:03 2026 +0000
KAFKA-20132: Fix header-based key deserialization in
MeteredTimestampedKeyValueStoreWithHeaders iterator methods (#21736)
Fixes bugs where `MeteredTimestampedKeyValueStoreWithHeaders` iterator
methods fail when key deserializers require headers.
The class inherits iterator-returning methods from
`MeteredKeyValueStore`: - `range(K, K)` / `reverseRange(K, K)` -
`all()` / `reverseAll()` - `prefixScan(P, PS)`
These methods use `deserializeKey(rawKey)` which calls
`serdes.keyFrom(rawKey, internalContext.headers())`, using the current
processing context's headers instead of each record's own headers.
Additionally, the `peekNextKey()` implementation in iterator classes
deserializes keys with empty headers.
Reviewers: Matthias J. Sax <[email protected]>
---
.../state/internals/MeteredKeyValueStore.java | 6 +-
...MeteredTimestampedKeyValueStoreWithHeaders.java | 131 ++++++++++++++-
.../MeteredTimestampedWindowStoreWithHeaders.java | 7 +-
...redTimestampedKeyValueStoreWithHeadersTest.java | 163 +++++++++++++++++++
...teredTimestampedWindowStoreWithHeadersTest.java | 175 ++++-----------------
5 files changed, 325 insertions(+), 157 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
index cd6fa23fa53..3be8fc8a962 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
@@ -88,9 +88,9 @@ public class MeteredKeyValueStore<K, V>
protected Sensor getSensor;
protected Sensor deleteSensor;
private Sensor putAllSensor;
- private Sensor allSensor;
- private Sensor rangeSensor;
- private Sensor prefixScanSensor;
+ protected Sensor allSensor;
+ protected Sensor rangeSensor;
+ protected Sensor prefixScanSensor;
private Sensor flushSensor;
private Sensor e2eLatencySensor;
protected Sensor iteratorDurationSensor;
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java
index 8730aadcc11..86624961650 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.KeyValue;
@@ -312,6 +313,57 @@ public class MeteredTimestampedKeyValueStoreWithHeaders<K,
V>
return result;
}
+ @Override
+ public <PS extends Serializer<P>, P> KeyValueIterator<K,
ValueTimestampHeaders<V>> prefixScan(final P prefix,
+
final PS prefixKeySerializer) {
+ Objects.requireNonNull(prefix, "prefix cannot be null");
+ Objects.requireNonNull(prefixKeySerializer, "prefixKeySerializer
cannot be null");
+ return new MeteredValueTimestampHeadersIterator(
+ wrapped().prefixScan(prefix, prefixKeySerializer),
+ prefixScanSensor
+ );
+ }
+
+ @Override
+ public KeyValueIterator<K, ValueTimestampHeaders<V>> range(final K from,
+ final K to) {
+ return new MeteredValueTimestampHeadersIterator(
+ wrapped().range(
+ keyBytes(from, new RecordHeaders()),
+ keyBytes(to, new RecordHeaders())
+ ),
+ rangeSensor
+ );
+ }
+
+ @Override
+ public KeyValueIterator<K, ValueTimestampHeaders<V>> reverseRange(final K
from,
+ final K
to) {
+ return new MeteredValueTimestampHeadersIterator(
+ wrapped().reverseRange(
+ keyBytes(from, new RecordHeaders()),
+ keyBytes(to, new RecordHeaders())
+ ),
+ rangeSensor
+ );
+ }
+
+ @Override
+ public KeyValueIterator<K, ValueTimestampHeaders<V>> all() {
+ return new MeteredValueTimestampHeadersIterator(
+ wrapped().all(),
+ allSensor
+ );
+ }
+
+ @Override
+ public KeyValueIterator<K, ValueTimestampHeaders<V>> reverseAll() {
+ return new MeteredValueTimestampHeadersIterator(
+ wrapped().reverseAll(),
+ allSensor
+ );
+ }
+
@SuppressWarnings("unchecked")
private class MeteredTimestampedKeyValueStoreWithHeadersIterator
implements KeyValueIterator<K, V>, MeteredIterator {
@@ -322,6 +374,7 @@ public class MeteredTimestampedKeyValueStoreWithHeaders<K,
V>
private final Function<byte[], ValueTimestampHeaders<V>>
valueTimestampHeadersDeserializer;
private final boolean returnPlainValue;
+ private KeyValue<K, V> cachedNext;
private MeteredTimestampedKeyValueStoreWithHeadersIterator(final
KeyValueIterator<Bytes, byte[]> iter,
final
Sensor sensor,
@@ -343,11 +396,17 @@ public class
MeteredTimestampedKeyValueStoreWithHeaders<K, V>
@Override
public boolean hasNext() {
- return iter.hasNext();
+ return cachedNext != null || iter.hasNext();
}
@Override
public KeyValue<K, V> next() {
+ if (cachedNext != null) {
+ final KeyValue<K, V> result = cachedNext;
+ cachedNext = null;
+ return result;
+ }
+
final KeyValue<Bytes, byte[]> keyValue = iter.next();
final ValueTimestampHeaders<V> valueTimestampHeaders =
valueTimestampHeadersDeserializer.apply(keyValue.value);
final Headers headers = valueTimestampHeaders != null ?
valueTimestampHeaders.headers() : new RecordHeaders();
@@ -383,12 +442,78 @@ public class
MeteredTimestampedKeyValueStoreWithHeaders<K, V>
@Override
public K peekNextKey() {
- return serdes.keyFrom(iter.peekNextKey().get(), new
RecordHeaders());
+ if (cachedNext == null) {
+ cachedNext = next();
+ }
+ return cachedNext.key;
+ }
+ }
+
+ private class MeteredValueTimestampHeadersIterator implements
KeyValueIterator<K, ValueTimestampHeaders<V>>, MeteredIterator {
+ private final KeyValueIterator<Bytes, byte[]> iter;
+ private final Sensor sensor;
+ private final long startNs;
+ private final long startTimestampMs;
+ private KeyValue<K, ValueTimestampHeaders<V>> cachedNext;
+
+ private MeteredValueTimestampHeadersIterator(final
KeyValueIterator<Bytes, byte[]> iter,
+ final Sensor sensor) {
+ this.iter = iter;
+ this.sensor = sensor;
+ this.startNs = time.nanoseconds();
+ this.startTimestampMs = time.milliseconds();
+ numOpenIterators.increment();
+ openIterators.add(this);
+ }
+
+ @Override
+ public long startTimestamp() {
+ return startTimestampMs;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return cachedNext != null || iter.hasNext();
+ }
+
+ @Override
+ public KeyValue<K, ValueTimestampHeaders<V>> next() {
+ if (cachedNext != null) {
+ final KeyValue<K, ValueTimestampHeaders<V>> result =
cachedNext;
+ cachedNext = null;
+ return result;
+ }
+
+ final KeyValue<Bytes, byte[]> keyValue = iter.next();
+ final ValueTimestampHeaders<V> valueTimestampHeaders =
serdes.valueFrom(keyValue.value, new RecordHeaders());
+ final Headers headers = valueTimestampHeaders != null ?
valueTimestampHeaders.headers() : new RecordHeaders();
+ final K key = serdes.keyFrom(keyValue.key.get(), headers);
+ return KeyValue.pair(key, valueTimestampHeaders);
+ }
+
+ @Override
+ public void close() {
+ try {
+ iter.close();
+ } finally {
+ final long duration = time.nanoseconds() - startNs;
+ sensor.record(duration);
+ iteratorDurationSensor.record(duration);
+ numOpenIterators.decrement();
+ openIterators.remove(this);
+ }
+ }
+
+ @Override
+ public K peekNextKey() {
+ if (cachedNext == null) {
+ cachedNext = next();
+ }
+ return cachedNext.key;
}
}
protected Bytes keyBytes(final K key, final Headers headers) {
return Bytes.wrap(serdes.rawKey(key, headers));
}
-
}
\ No newline at end of file
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreWithHeaders.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreWithHeaders.java
index 2131bb4ec96..13bff9c6e21 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreWithHeaders.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreWithHeaders.java
@@ -375,11 +375,6 @@ public class MeteredTimestampedWindowStoreWithHeaders<K, V>
}
final KeyValue<Windowed<Bytes>, byte[]> next = iter.next();
-
- if (next == null) {
- return null;
- }
-
final ValueTimestampHeaders<V> valueTimestampHeaders =
serdes.valueFrom(next.value, new RecordHeaders());
final Headers headers = valueTimestampHeaders != null ?
valueTimestampHeaders.headers() : new RecordHeaders();
final K key = serdes.keyFrom(next.key.key().get(), headers);
@@ -405,7 +400,7 @@ public class MeteredTimestampedWindowStoreWithHeaders<K, V>
if (cachedNext == null) {
cachedNext = next();
}
- return cachedNext == null ? null : cachedNext.key;
+ return cachedNext.key;
}
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeadersTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeadersTest.java
index 427b495e157..851891ecced 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeadersTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeadersTest.java
@@ -65,6 +65,7 @@ import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
@@ -99,6 +100,7 @@ public class MeteredTimestampedKeyValueStoreWithHeadersTest {
);
private final Metrics metrics = new Metrics();
private Map<String, String> tags;
+ private Deserializer<String> keyDeserializer;
private void setUpWithoutContext() {
mockTime = new MockTime();
@@ -497,4 +499,165 @@ public class
MeteredTimestampedKeyValueStoreWithHeadersTest {
private KafkaMetric metric(final MetricName metricName) {
return this.metrics.metric(metricName);
}
+
+ @SuppressWarnings("unchecked")
+ private MeteredTimestampedKeyValueStoreWithHeaders<String, String>
createStoreWithMockSerdes() {
+ final Serde<String> keySerde = mock(Serde.class);
+ final Serializer<String> keySerializer = mock(Serializer.class);
+ keyDeserializer = mock(Deserializer.class);
+ final Serde<ValueTimestampHeaders<String>> valueSerde =
mock(Serde.class);
+ final Deserializer<ValueTimestampHeaders<String>> valueDeserializer =
mock(Deserializer.class);
+
+ lenient().when(keySerde.deserializer()).thenReturn(keyDeserializer);
+ lenient().when(keySerde.serializer()).thenReturn(keySerializer);
+
lenient().when(valueSerde.deserializer()).thenReturn(valueDeserializer);
+
+ lenient().when(keySerializer.serialize(any(),
any(RecordHeaders.class), any())).thenReturn(KEY.getBytes());
+
+ lenient().when(valueDeserializer.deserialize(any(),
any(RecordHeaders.class), eq(VALUE_TIMESTAMP_HEADERS_BYTES)))
+ .thenReturn(VALUE_TIMESTAMP_HEADERS);
+
+ lenient().when(keyDeserializer.deserialize(any(), eq(HEADERS),
eq(KEY.getBytes())))
+ .thenReturn(KEY);
+
+ final MeteredTimestampedKeyValueStoreWithHeaders<String, String>
mockStore = new MeteredTimestampedKeyValueStoreWithHeaders<>(
+ inner,
+ STORE_TYPE,
+ new MockTime(),
+ keySerde,
+ valueSerde
+ );
+ mockStore.init(context, mockStore);
+ return mockStore;
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void shouldUseHeadersFromValueToDeserializeKeyInRange() {
+ setUp();
+
+ final KeyValue<Bytes, byte[]> testData = KeyValue.pair(KEY_BYTES,
VALUE_TIMESTAMP_HEADERS_BYTES);
+ when(inner.range(any(Bytes.class), any(Bytes.class)))
+ .thenReturn(new
KeyValueIteratorStub<>(List.of(testData).iterator()));
+
+ metered = createStoreWithMockSerdes();
+
+ final KeyValueIterator<String, ValueTimestampHeaders<String>> iterator
= metered.range("a", "z");
+
+ assertTrue(iterator.hasNext());
+ assertEquals(KEY, iterator.peekNextKey());
+ final KeyValue<String, ValueTimestampHeaders<String>> result =
iterator.next();
+
+ assertEquals(KEY, result.key);
+ assertEquals(VALUE_TIMESTAMP_HEADERS, result.value);
+ assertFalse(iterator.hasNext());
+ iterator.close();
+
+ // The critical verification: key deserializer must have been called
with HEADERS (not empty headers)
+ verify(keyDeserializer).deserialize(any(), eq(HEADERS),
eq(KEY.getBytes()));
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void shouldUseHeadersFromValueToDeserializeKeyInReverseRange() {
+ setUp();
+
+ final KeyValue<Bytes, byte[]> testData = KeyValue.pair(KEY_BYTES,
VALUE_TIMESTAMP_HEADERS_BYTES);
+ when(inner.reverseRange(any(Bytes.class), any(Bytes.class)))
+ .thenReturn(new
KeyValueIteratorStub<>(List.of(testData).iterator()));
+
+ metered = createStoreWithMockSerdes();
+
+ final KeyValueIterator<String, ValueTimestampHeaders<String>> iterator
= metered.reverseRange("a", "z");
+
+ assertTrue(iterator.hasNext());
+ assertEquals(KEY, iterator.peekNextKey());
+ final KeyValue<String, ValueTimestampHeaders<String>> result =
iterator.next();
+
+ assertEquals(KEY, result.key);
+ assertEquals(VALUE_TIMESTAMP_HEADERS, result.value);
+ assertFalse(iterator.hasNext());
+ iterator.close();
+
+ // The critical verification: key deserializer must have been called
with HEADERS (not empty headers)
+ verify(keyDeserializer).deserialize(any(), eq(HEADERS),
eq(KEY.getBytes()));
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void shouldUseHeadersFromValueToDeserializeKeyInAll() {
+ setUp();
+
+ final KeyValue<Bytes, byte[]> testData = KeyValue.pair(KEY_BYTES,
VALUE_TIMESTAMP_HEADERS_BYTES);
+ when(inner.all())
+ .thenReturn(new
KeyValueIteratorStub<>(List.of(testData).iterator()));
+
+ metered = createStoreWithMockSerdes();
+
+ final KeyValueIterator<String, ValueTimestampHeaders<String>> iterator
= metered.all();
+
+ assertTrue(iterator.hasNext());
+ assertEquals(KEY, iterator.peekNextKey());
+ final KeyValue<String, ValueTimestampHeaders<String>> result =
iterator.next();
+
+ assertEquals(KEY, result.key);
+ assertEquals(VALUE_TIMESTAMP_HEADERS, result.value);
+ assertFalse(iterator.hasNext());
+ iterator.close();
+
+ // The critical verification: key deserializer must have been called
with HEADERS (not empty headers)
+ verify(keyDeserializer).deserialize(any(), eq(HEADERS),
eq(KEY.getBytes()));
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void shouldUseHeadersFromValueToDeserializeKeyInReverseAll() {
+ setUp();
+
+ final KeyValue<Bytes, byte[]> testData = KeyValue.pair(KEY_BYTES,
VALUE_TIMESTAMP_HEADERS_BYTES);
+ when(inner.reverseAll())
+ .thenReturn(new
KeyValueIteratorStub<>(List.of(testData).iterator()));
+
+ metered = createStoreWithMockSerdes();
+
+ final KeyValueIterator<String, ValueTimestampHeaders<String>> iterator
= metered.reverseAll();
+
+ assertTrue(iterator.hasNext());
+ assertEquals(KEY, iterator.peekNextKey());
+ final KeyValue<String, ValueTimestampHeaders<String>> result =
iterator.next();
+
+ assertEquals(KEY, result.key);
+ assertEquals(VALUE_TIMESTAMP_HEADERS, result.value);
+ assertFalse(iterator.hasNext());
+ iterator.close();
+
+ // The critical verification: key deserializer must have been called
with HEADERS (not empty headers)
+ verify(keyDeserializer).deserialize(any(), eq(HEADERS),
eq(KEY.getBytes()));
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void shouldUseHeadersFromValueToDeserializeKeyInPrefixScan() {
+ setUp();
+
+ final KeyValue<Bytes, byte[]> testData = KeyValue.pair(KEY_BYTES,
VALUE_TIMESTAMP_HEADERS_BYTES);
+ when(inner.prefixScan(any(), any()))
+ .thenReturn(new
KeyValueIteratorStub<>(List.of(testData).iterator()));
+
+ metered = createStoreWithMockSerdes();
+
+ final KeyValueIterator<String, ValueTimestampHeaders<String>> iterator
= metered.prefixScan("prefix", Serdes.String().serializer());
+
+ assertTrue(iterator.hasNext());
+ assertEquals(KEY, iterator.peekNextKey());
+ final KeyValue<String, ValueTimestampHeaders<String>> result =
iterator.next();
+
+ assertEquals(KEY, result.key);
+ assertEquals(VALUE_TIMESTAMP_HEADERS, result.value);
+ assertFalse(iterator.hasNext());
+ iterator.close();
+
+ // The critical verification: key deserializer must have been called
with HEADERS (not empty headers)
+ verify(keyDeserializer).deserialize(any(), eq(HEADERS),
eq(KEY.getBytes()));
+ }
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreWithHeadersTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreWithHeadersTest.java
index 5501b8bc48e..ec379f578bd 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreWithHeadersTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreWithHeadersTest.java
@@ -88,6 +88,7 @@ public class MeteredTimestampedWindowStoreWithHeadersTest {
private WindowStore<Bytes, byte[]> innerStoreMock;
private final Metrics metrics = new Metrics(new
MetricConfig().recordLevel(Sensor.RecordingLevel.DEBUG));
private MeteredTimestampedWindowStoreWithHeaders<String, String> store;
+ private Deserializer<String> keyDeserializer;
public void setUp() {
final StreamsMetricsImpl streamsMetrics =
@@ -290,14 +291,11 @@ public class MeteredTimestampedWindowStoreWithHeadersTest
{
verify(innerStoreMock).put(KEY_BYTES, VALUE_TIMESTAMP_HEADERS_BYTES,
TIMESTAMP);
}
- @Test
@SuppressWarnings("unchecked")
- public void shouldUseHeadersFromValueToDeserializeKeyInFetchAll() {
- setUp();
-
+ private MeteredTimestampedWindowStoreWithHeaders<String, String>
createStoreWithMockSerdes() {
final Serde<String> keySerde = mock(Serde.class);
final Serializer<String> keySerializer = mock(Serializer.class);
- final Deserializer<String> keyDeserializer = mock(Deserializer.class);
+ keyDeserializer = mock(Deserializer.class);
final Serde<ValueTimestampHeaders<String>> valueSerde =
mock(Serde.class);
final Deserializer<ValueTimestampHeaders<String>> valueDeserializer =
mock(Deserializer.class);
@@ -313,13 +311,7 @@ public class MeteredTimestampedWindowStoreWithHeadersTest {
lenient().when(keyDeserializer.deserialize(any(), eq(HEADERS),
eq(KEY.getBytes())))
.thenReturn(KEY);
- final Windowed<Bytes> windowedKey = new Windowed<>(KEY_BYTES, new
TimeWindow(0, WINDOW_SIZE_MS));
- final KeyValue<Windowed<Bytes>, byte[]> testData =
KeyValue.pair(windowedKey, VALUE_TIMESTAMP_HEADERS_BYTES);
-
- when(innerStoreMock.fetchAll(0, 100))
- .thenReturn(new
KeyValueIteratorStub<>(List.of(testData).iterator()));
-
- store = new MeteredTimestampedWindowStoreWithHeaders<>(
+ final MeteredTimestampedWindowStoreWithHeaders<String, String>
mockStore = new MeteredTimestampedWindowStoreWithHeaders<>(
innerStoreMock,
WINDOW_SIZE_MS,
STORE_TYPE,
@@ -327,11 +319,26 @@ public class MeteredTimestampedWindowStoreWithHeadersTest
{
keySerde,
valueSerde
);
- store.init(context, store);
+ mockStore.init(context, mockStore);
+ return mockStore;
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void shouldUseHeadersFromValueToDeserializeKeyInFetchAll() {
+ setUp();
+
+ final Windowed<Bytes> windowedKey = new Windowed<>(KEY_BYTES, new
TimeWindow(0, WINDOW_SIZE_MS));
+ final KeyValue<Windowed<Bytes>, byte[]> testData =
KeyValue.pair(windowedKey, VALUE_TIMESTAMP_HEADERS_BYTES);
+ when(innerStoreMock.fetchAll(0, 100))
+ .thenReturn(new
KeyValueIteratorStub<>(List.of(testData).iterator()));
+
+ store = createStoreWithMockSerdes();
final KeyValueIterator<Windowed<String>,
ValueTimestampHeaders<String>> iterator = store.fetchAll(0, 100);
assertTrue(iterator.hasNext());
+ assertEquals(KEY, iterator.peekNextKey().key());
final KeyValue<Windowed<String>, ValueTimestampHeaders<String>> result
= iterator.next();
assertEquals(KEY, result.key.key());
@@ -348,41 +355,17 @@ public class MeteredTimestampedWindowStoreWithHeadersTest
{
public void shouldUseHeadersFromValueToDeserializeKeyInAll() {
setUp();
- final Serde<String> keySerde = mock(Serde.class);
- final Deserializer<String> keyDeserializer = mock(Deserializer.class);
- final Serializer<String> keySerializer = mock(Serializer.class);
- final Serde<ValueTimestampHeaders<String>> valueSerde =
mock(Serde.class);
- final Deserializer<ValueTimestampHeaders<String>> valueDeserializer =
mock(Deserializer.class);
-
- lenient().when(keySerde.deserializer()).thenReturn(keyDeserializer);
- lenient().when(keySerde.serializer()).thenReturn(keySerializer);
-
lenient().when(valueSerde.deserializer()).thenReturn(valueDeserializer);
-
- lenient().when(keySerializer.serialize(any(),
any(RecordHeaders.class), any())).thenReturn(KEY.getBytes());
- lenient().when(valueDeserializer.deserialize(any(),
any(RecordHeaders.class), eq(VALUE_TIMESTAMP_HEADERS_BYTES)))
- .thenReturn(VALUE_TIMESTAMP_HEADERS);
- lenient().when(keyDeserializer.deserialize(any(), eq(HEADERS),
eq(KEY.getBytes())))
- .thenReturn(KEY);
-
final Windowed<Bytes> windowedKey = new Windowed<>(KEY_BYTES, new
TimeWindow(0, WINDOW_SIZE_MS));
final KeyValue<Windowed<Bytes>, byte[]> testData =
KeyValue.pair(windowedKey, VALUE_TIMESTAMP_HEADERS_BYTES);
-
when(innerStoreMock.all())
.thenReturn(new
KeyValueIteratorStub<>(List.of(testData).iterator()));
- store = new MeteredTimestampedWindowStoreWithHeaders<>(
- innerStoreMock,
- WINDOW_SIZE_MS,
- STORE_TYPE,
- new MockTime(),
- keySerde,
- valueSerde
- );
- store.init(context, store);
+ store = createStoreWithMockSerdes();
final KeyValueIterator<Windowed<String>,
ValueTimestampHeaders<String>> iterator = store.all();
assertTrue(iterator.hasNext());
+ assertEquals(KEY, iterator.peekNextKey().key());
final KeyValue<Windowed<String>, ValueTimestampHeaders<String>> result
= iterator.next();
assertEquals(KEY, result.key.key());
@@ -398,42 +381,18 @@ public class MeteredTimestampedWindowStoreWithHeadersTest
{
public void shouldUseHeadersFromValueToDeserializeKeyInFetchRange() {
setUp();
- final Serde<String> keySerde = mock(Serde.class);
- final Deserializer<String> keyDeserializer = mock(Deserializer.class);
- final Serializer<String> keySerializer = mock(Serializer.class);
- final Serde<ValueTimestampHeaders<String>> valueSerde =
mock(Serde.class);
- final Deserializer<ValueTimestampHeaders<String>> valueDeserializer =
mock(Deserializer.class);
-
- lenient().when(keySerde.deserializer()).thenReturn(keyDeserializer);
- lenient().when(keySerde.serializer()).thenReturn(keySerializer);
-
lenient().when(valueSerde.deserializer()).thenReturn(valueDeserializer);
-
- lenient().when(keySerializer.serialize(any(),
any(RecordHeaders.class), any())).thenReturn(KEY.getBytes());
- lenient().when(valueDeserializer.deserialize(any(),
any(RecordHeaders.class), eq(VALUE_TIMESTAMP_HEADERS_BYTES)))
- .thenReturn(VALUE_TIMESTAMP_HEADERS);
- lenient().when(keyDeserializer.deserialize(any(), eq(HEADERS),
eq(KEY.getBytes())))
- .thenReturn(KEY);
-
final Windowed<Bytes> windowedKey = new Windowed<>(KEY_BYTES, new
TimeWindow(0, WINDOW_SIZE_MS));
final KeyValue<Windowed<Bytes>, byte[]> testData =
KeyValue.pair(windowedKey, VALUE_TIMESTAMP_HEADERS_BYTES);
-
when(innerStoreMock.fetch(any(Bytes.class), any(Bytes.class), eq(0L),
eq(100L)))
.thenReturn(new
KeyValueIteratorStub<>(List.of(testData).iterator()));
- store = new MeteredTimestampedWindowStoreWithHeaders<>(
- innerStoreMock,
- WINDOW_SIZE_MS,
- STORE_TYPE,
- new MockTime(),
- keySerde,
- valueSerde
- );
- store.init(context, store);
+ store = createStoreWithMockSerdes();
final KeyValueIterator<Windowed<String>,
ValueTimestampHeaders<String>> iterator =
store.fetch(KEY, KEY, 0, 100);
assertTrue(iterator.hasNext());
+ assertEquals(KEY, iterator.peekNextKey().key());
final KeyValue<Windowed<String>, ValueTimestampHeaders<String>> result
=
iterator.next();
@@ -450,43 +409,17 @@ public class MeteredTimestampedWindowStoreWithHeadersTest
{
public void shouldUseHeadersFromValueToDeserializeKeyInBackwardFetchAll() {
setUp();
- final Serde<String> keySerde = mock(Serde.class);
- final Serializer<String> keySerializer = mock(Serializer.class);
- final Deserializer<String> keyDeserializer = mock(Deserializer.class);
- final Serde<ValueTimestampHeaders<String>> valueSerde =
mock(Serde.class);
- final Deserializer<ValueTimestampHeaders<String>> valueDeserializer =
mock(Deserializer.class);
-
- lenient().when(keySerde.deserializer()).thenReturn(keyDeserializer);
- lenient().when(keySerde.serializer()).thenReturn(keySerializer);
-
lenient().when(valueSerde.deserializer()).thenReturn(valueDeserializer);
-
- lenient().when(keySerializer.serialize(any(),
any(RecordHeaders.class), any())).thenReturn(KEY.getBytes());
-
- lenient().when(valueDeserializer.deserialize(any(),
any(RecordHeaders.class), eq(VALUE_TIMESTAMP_HEADERS_BYTES)))
- .thenReturn(VALUE_TIMESTAMP_HEADERS);
-
- lenient().when(keyDeserializer.deserialize(any(), eq(HEADERS),
eq(KEY.getBytes())))
- .thenReturn(KEY);
-
final Windowed<Bytes> windowedKey = new Windowed<>(KEY_BYTES, new
TimeWindow(0, WINDOW_SIZE_MS));
final KeyValue<Windowed<Bytes>, byte[]> testData =
KeyValue.pair(windowedKey, VALUE_TIMESTAMP_HEADERS_BYTES);
-
when(innerStoreMock.backwardFetchAll(0, 100))
.thenReturn(new
KeyValueIteratorStub<>(List.of(testData).iterator()));
- store = new MeteredTimestampedWindowStoreWithHeaders<>(
- innerStoreMock,
- WINDOW_SIZE_MS,
- STORE_TYPE,
- new MockTime(),
- keySerde,
- valueSerde
- );
- store.init(context, store);
+ store = createStoreWithMockSerdes();
final KeyValueIterator<Windowed<String>,
ValueTimestampHeaders<String>> iterator = store.backwardFetchAll(0, 100);
assertTrue(iterator.hasNext());
+ assertEquals(KEY, iterator.peekNextKey().key());
final KeyValue<Windowed<String>, ValueTimestampHeaders<String>> result
= iterator.next();
assertEquals(KEY, result.key.key());
@@ -502,41 +435,17 @@ public class MeteredTimestampedWindowStoreWithHeadersTest
{
public void shouldUseHeadersFromValueToDeserializeKeyInBackwardAll() {
setUp();
- final Serde<String> keySerde = mock(Serde.class);
- final Deserializer<String> keyDeserializer = mock(Deserializer.class);
- final Serializer<String> keySerializer = mock(Serializer.class);
- final Serde<ValueTimestampHeaders<String>> valueSerde =
mock(Serde.class);
- final Deserializer<ValueTimestampHeaders<String>> valueDeserializer =
mock(Deserializer.class);
-
- lenient().when(keySerde.deserializer()).thenReturn(keyDeserializer);
- lenient().when(keySerde.serializer()).thenReturn(keySerializer);
-
lenient().when(valueSerde.deserializer()).thenReturn(valueDeserializer);
-
- lenient().when(keySerializer.serialize(any(),
any(RecordHeaders.class), any())).thenReturn(KEY.getBytes());
- lenient().when(valueDeserializer.deserialize(any(),
any(RecordHeaders.class), eq(VALUE_TIMESTAMP_HEADERS_BYTES)))
- .thenReturn(VALUE_TIMESTAMP_HEADERS);
- lenient().when(keyDeserializer.deserialize(any(), eq(HEADERS),
eq(KEY.getBytes())))
- .thenReturn(KEY);
-
final Windowed<Bytes> windowedKey = new Windowed<>(KEY_BYTES, new
TimeWindow(0, WINDOW_SIZE_MS));
final KeyValue<Windowed<Bytes>, byte[]> testData =
KeyValue.pair(windowedKey, VALUE_TIMESTAMP_HEADERS_BYTES);
-
when(innerStoreMock.backwardAll())
.thenReturn(new
KeyValueIteratorStub<>(List.of(testData).iterator()));
- store = new MeteredTimestampedWindowStoreWithHeaders<>(
- innerStoreMock,
- WINDOW_SIZE_MS,
- STORE_TYPE,
- new MockTime(),
- keySerde,
- valueSerde
- );
- store.init(context, store);
+ store = createStoreWithMockSerdes();
final KeyValueIterator<Windowed<String>,
ValueTimestampHeaders<String>> iterator = store.backwardAll();
assertTrue(iterator.hasNext());
+ assertEquals(KEY, iterator.peekNextKey().key());
final KeyValue<Windowed<String>, ValueTimestampHeaders<String>> result
= iterator.next();
assertEquals(KEY, result.key.key());
@@ -552,42 +461,18 @@ public class MeteredTimestampedWindowStoreWithHeadersTest
{
public void
shouldUseHeadersFromValueToDeserializeKeyInBackwardFetchRange() {
setUp();
- final Serde<String> keySerde = mock(Serde.class);
- final Deserializer<String> keyDeserializer = mock(Deserializer.class);
- final Serializer<String> keySerializer = mock(Serializer.class);
- final Serde<ValueTimestampHeaders<String>> valueSerde =
mock(Serde.class);
- final Deserializer<ValueTimestampHeaders<String>> valueDeserializer =
mock(Deserializer.class);
-
- lenient().when(keySerde.deserializer()).thenReturn(keyDeserializer);
- lenient().when(keySerde.serializer()).thenReturn(keySerializer);
-
lenient().when(valueSerde.deserializer()).thenReturn(valueDeserializer);
-
- lenient().when(keySerializer.serialize(any(),
any(RecordHeaders.class), any())).thenReturn(KEY.getBytes());
- lenient().when(valueDeserializer.deserialize(any(),
any(RecordHeaders.class), eq(VALUE_TIMESTAMP_HEADERS_BYTES)))
- .thenReturn(VALUE_TIMESTAMP_HEADERS);
- lenient().when(keyDeserializer.deserialize(any(), eq(HEADERS),
eq(KEY.getBytes())))
- .thenReturn(KEY);
-
final Windowed<Bytes> windowedKey = new Windowed<>(KEY_BYTES, new
TimeWindow(0, WINDOW_SIZE_MS));
final KeyValue<Windowed<Bytes>, byte[]> testData =
KeyValue.pair(windowedKey, VALUE_TIMESTAMP_HEADERS_BYTES);
-
when(innerStoreMock.backwardFetch(any(Bytes.class), any(Bytes.class),
eq(0L), eq(100L)))
.thenReturn(new
KeyValueIteratorStub<>(List.of(testData).iterator()));
- store = new MeteredTimestampedWindowStoreWithHeaders<>(
- innerStoreMock,
- WINDOW_SIZE_MS,
- STORE_TYPE,
- new MockTime(),
- keySerde,
- valueSerde
- );
- store.init(context, store);
+ store = createStoreWithMockSerdes();
final KeyValueIterator<Windowed<String>,
ValueTimestampHeaders<String>> iterator =
store.backwardFetch(KEY, KEY, 0, 100);
assertTrue(iterator.hasNext());
+ assertEquals(KEY, iterator.peekNextKey().key());
final KeyValue<Windowed<String>, ValueTimestampHeaders<String>> result
=
iterator.next();