This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch 4.3
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.3 by this push:
new 128aabc1c7e KAFKA-20173: Ensure Metered window-stores pass headers
correctly (#21936)
128aabc1c7e is described below
commit 128aabc1c7ee9e5a1248013f3f594fabce5097ae
Author: Matthias J. Sax <[email protected]>
AuthorDate: Thu Apr 2 15:05:04 2026 -0700
KAFKA-20173: Ensure Metered window-stores pass headers correctly (#21936)
Ensures that all Metered Window-stores (plain, ts, headers) pass headers
into de/serializers.
Reviewers: Alieh Saeedi <[email protected]>, TengYao Chi
<[email protected]>, Bill Bejeck <[email protected]>
---
.../state/internals/MeteredKeyValueStore.java | 19 +-
.../state/internals/MeteredSessionStore.java | 10 -
.../internals/MeteredSessionStoreWithHeaders.java | 1 -
...MeteredTimestampedKeyValueStoreWithHeaders.java | 6 +-
.../internals/MeteredTimestampedWindowStore.java | 14 +-
.../MeteredTimestampedWindowStoreWithHeaders.java | 242 +++++++++++++--------
.../state/internals/MeteredWindowStore.java | 182 ++++++++--------
.../internals/MeteredWindowStoreIterator.java | 4 -
.../internals/MeteredWindowedKeyValueIterator.java | 10 +-
.../streams/state/internals/WindowKeySchema.java | 11 -
.../kstream/internals/KTableAggregateTest.java | 7 +-
...teredTimestampedWindowStoreWithHeadersTest.java | 21 +-
12 files changed, 286 insertions(+), 241 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
index 2c291ac9306..452b662733d 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
@@ -227,7 +227,8 @@ public class MeteredKeyValueStore<K, V>
))
);
},
- sendOldValues);
+ sendOldValues
+ );
}
return false;
}
@@ -440,14 +441,6 @@ public class MeteredKeyValueStore<K, V>
}
}
- protected byte[] serializeValue(final V value) {
- return value != null ? serdes.rawValue(value,
internalContext.headers()) : null;
- }
-
- protected V deserializeValue(final byte[] rawValue) {
- return rawValue != null ? serdes.valueFrom(rawValue,
internalContext.headers()) : null;
- }
-
protected Bytes serializeKey(final K key) {
return Bytes.wrap(serdes.rawKey(key, internalContext.headers()));
}
@@ -456,6 +449,14 @@ public class MeteredKeyValueStore<K, V>
return serdes.keyFrom(rawKey, internalContext.headers());
}
+ protected byte[] serializeValue(final V value) {
+ return value != null ? serdes.rawValue(value,
internalContext.headers()) : null;
+ }
+
+ protected V deserializeValue(final byte[] rawValue) {
+ return rawValue != null ? serdes.valueFrom(rawValue,
internalContext.headers()) : null;
+ }
+
private List<KeyValue<Bytes, byte[]>> innerEntries(final List<KeyValue<K,
V>> from) {
final List<KeyValue<Bytes, byte[]>> byteEntries = new ArrayList<>();
for (final KeyValue<K, V> entry : from) {
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
index 3d0e85474b0..a838ceab680 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
@@ -271,7 +271,6 @@ public class MeteredSessionStore<K, V>
wrapped().fetch(keyBytes(key)),
fetchSensor,
iteratorDurationSensor,
- streamsMetrics,
bytes -> serdes.keyFrom(bytes, new RecordHeaders()),
serdes::valueFrom,
time,
@@ -286,7 +285,6 @@ public class MeteredSessionStore<K, V>
wrapped().backwardFetch(keyBytes(key)),
fetchSensor,
iteratorDurationSensor,
- streamsMetrics,
bytes -> serdes.keyFrom(bytes, new RecordHeaders()),
serdes::valueFrom,
time,
@@ -302,7 +300,6 @@ public class MeteredSessionStore<K, V>
wrapped().fetch(keyBytes(keyFrom), keyBytes(keyTo)),
fetchSensor,
iteratorDurationSensor,
- streamsMetrics,
bytes -> serdes.keyFrom(bytes, new RecordHeaders()),
serdes::valueFrom,
time,
@@ -317,7 +314,6 @@ public class MeteredSessionStore<K, V>
wrapped().backwardFetch(keyBytes(keyFrom), keyBytes(keyTo)),
fetchSensor,
iteratorDurationSensor,
- streamsMetrics,
bytes -> serdes.keyFrom(bytes, new RecordHeaders()),
serdes::valueFrom,
time,
@@ -339,7 +335,6 @@ public class MeteredSessionStore<K, V>
latestSessionStartTime),
fetchSensor,
iteratorDurationSensor,
- streamsMetrics,
bytes -> serdes.keyFrom(bytes, new RecordHeaders()),
serdes::valueFrom,
time,
@@ -361,7 +356,6 @@ public class MeteredSessionStore<K, V>
),
fetchSensor,
iteratorDurationSensor,
- streamsMetrics,
bytes -> serdes.keyFrom(bytes, new RecordHeaders()),
serdes::valueFrom,
time,
@@ -385,7 +379,6 @@ public class MeteredSessionStore<K, V>
latestSessionStartTime),
fetchSensor,
iteratorDurationSensor,
- streamsMetrics,
bytes -> serdes.keyFrom(bytes, new RecordHeaders()),
serdes::valueFrom,
time,
@@ -400,7 +393,6 @@ public class MeteredSessionStore<K, V>
wrapped().findSessions(earliestSessionEndTime,
latestSessionEndTime),
fetchSensor,
iteratorDurationSensor,
- streamsMetrics,
bytes -> serdes.keyFrom(bytes, new RecordHeaders()),
serdes::valueFrom,
time,
@@ -424,7 +416,6 @@ public class MeteredSessionStore<K, V>
),
fetchSensor,
iteratorDurationSensor,
- streamsMetrics,
bytes -> serdes.keyFrom(bytes, new RecordHeaders()),
serdes::valueFrom,
time,
@@ -497,7 +488,6 @@ public class MeteredSessionStore<K, V>
rawResult.getResult(),
fetchSensor,
iteratorDurationSensor,
- streamsMetrics,
bytes -> serdes.keyFrom(bytes, new RecordHeaders()),
StoreQueryUtils.deserializeValue(serdes, wrapped()),
time,
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreWithHeaders.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreWithHeaders.java
index b74ebd9ec88..45a1b17ecc8 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreWithHeaders.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreWithHeaders.java
@@ -304,7 +304,6 @@ public class MeteredSessionStoreWithHeaders<K, AGG>
rawResult.getResult(),
fetchSensor,
iteratorDurationSensor,
- streamsMetrics,
bytes -> serdes.keyFrom(bytes, new RecordHeaders()),
byteArray -> {
final AggregationWithHeaders<AGG> awh =
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java
index e93aad8c8f2..f3865cfc641 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java
@@ -149,7 +149,7 @@ public class MeteredTimestampedKeyValueStoreWithHeaders<K,
V>
internalContext.setRecordContext(currentContext);
}
} else {
- // it's ok to only pass header into `serializeKey`,
because for the value case passed-in headers are
+ // it's ok to only pass headers into `serializeKey`,
because for the value case passed-in headers are
// getting ignored anyway, because the value (of type
`ValueTimestampHeaders`) itself carries the headers
final Headers headers = value.headers();
wrapped().put(serializeKey(key, headers),
serializeValue(value));
@@ -194,7 +194,7 @@ public class MeteredTimestampedKeyValueStoreWithHeaders<K,
V>
internalContext.setRecordContext(currentContext);
}
} else {
- // it's ok to only pass header into `serializeKey`,
because for the value case passed-in headers are
+ // it's ok to only pass headers into `serializeKey`,
because for the value case passed-in headers are
// getting ignored anyway, because the value (of type
`ValueTimestampHeaders`) itself carries the headers
final Headers headers = value.headers();
// `rawOldValue` returned from
`wrapped().putIfAbsent(...)` is type ValueTimestampHeader
@@ -225,7 +225,7 @@ public class MeteredTimestampedKeyValueStoreWithHeaders<K,
V>
private List<KeyValue<Bytes, byte[]>> innerEntries(final List<KeyValue<K,
ValueTimestampHeaders<V>>> from) {
final List<KeyValue<Bytes, byte[]>> byteEntries = new ArrayList<>();
for (final KeyValue<K, ValueTimestampHeaders<V>> entry : from) {
- // it's ok to only pass header into `serializeKey`, because for
the value case passed-in headers are
+ // it's ok to only pass headers into `serializeKey`, because for
the value case passed-in headers are
// getting ignored anyway, because the value (of type
`ValueTimestampHeaders`) itself carries the headers
final Headers headers = entry.value != null ?
entry.value.headers() : internalContext.headers();
byteEntries.add(KeyValue.pair(serializeKey(entry.key, headers),
serializeValue(entry.value)));
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStore.java
index f172222baa7..b72f638f1df 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStore.java
@@ -37,12 +37,14 @@ public class MeteredTimestampedWindowStore<K, V>
extends MeteredWindowStore<K, ValueAndTimestamp<V>>
implements TimestampedWindowStore<K, V> {
- MeteredTimestampedWindowStore(final WindowStore<Bytes, byte[]> inner,
- final long windowSizeMs,
- final String metricScope,
- final Time time,
- final Serde<K> keySerde,
- final Serde<ValueAndTimestamp<V>>
valueSerde) {
+ MeteredTimestampedWindowStore(
+ final WindowStore<Bytes, byte[]> inner,
+ final long windowSizeMs,
+ final String metricScope,
+ final Time time,
+ final Serde<K> keySerde,
+ final Serde<ValueAndTimestamp<V>> valueSerde
+ ) {
super(inner, windowSizeMs, metricScope, time, keySerde, valueSerde);
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreWithHeaders.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreWithHeaders.java
index 74aafc87bd4..9d5a9ffb4d7 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreWithHeaders.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreWithHeaders.java
@@ -18,12 +18,14 @@ package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.SerdeGetter;
import org.apache.kafka.streams.query.FailureReason;
@@ -43,6 +45,9 @@ import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.WindowStoreIterator;
import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.atomic.LongAdder;
+import java.util.function.BiFunction;
import java.util.function.Function;
import static
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency;
@@ -103,14 +108,15 @@ public class MeteredTimestampedWindowStoreWithHeaders<K,
V>
try {
internalContext.setRecordContext(temporaryContext);
- wrapped().put(keyBytes(key, deleteHeaders), null,
windowStartTimestamp);
+ wrapped().put(serializeKey(key, deleteHeaders),
null, windowStartTimestamp);
} finally {
// Restore original context
internalContext.setRecordContext(currentContext);
}
} else {
- final Headers headers = value.headers() == null ? new
RecordHeaders() : value.headers();
- wrapped().put(keyBytes(key, headers),
serdes.rawValue(value, headers), windowStartTimestamp);
+ // it's ok to only pass headers into `serializeKey`,
because for the value case passed-in headers are
+ // getting ignored anyway, because the value (of type
`ValueTimestampHeaders`) itself carries the headers
+ wrapped().put(serializeKey(key, value.headers()),
serializeValue(value), windowStartTimestamp);
}
},
time,
@@ -123,10 +129,6 @@ public class MeteredTimestampedWindowStoreWithHeaders<K, V>
}
}
- protected Bytes keyBytes(final K key, final Headers headers) {
- return Bytes.wrap(serdes.rawKey(key, headers));
- }
-
@SuppressWarnings("unchecked")
@Override
public <R> QueryResult<R> query(final Query<R> query,
@@ -165,52 +167,36 @@ public class MeteredTimestampedWindowStoreWithHeaders<K,
V>
final QueryResult<R> queryResult;
if (query.getTimeFrom().isPresent() && query.getTimeTo().isPresent()) {
final WindowKeyQuery<Bytes, byte[]> rawKeyQuery =
- WindowKeyQuery.withKeyAndWindowStartRange(
- keyBytes(query.getKey(), new RecordHeaders()),
- query.getTimeFrom().get(),
- query.getTimeTo().get()
- );
+ WindowKeyQuery.withKeyAndWindowStartRange(
+ serializeKey(query.getKey(), internalContext.headers()),
+ query.getTimeFrom().get(),
+ query.getTimeTo().get()
+ );
final QueryResult<WindowStoreIterator<byte[]>> rawResult =
wrapped().query(rawKeyQuery, positionBound, config);
if (rawResult.isSuccess()) {
if (isUnderlyingStoreTimestamped()) {
// For timestamped stores, return ValueAndTimestamp<V>
- final Function<byte[], ValueAndTimestamp<V>> valueFrom =
bytes -> {
- final ValueTimestampHeaders<V> vth =
serdes.valueFrom(bytes, new RecordHeaders());
- return vth == null ? null :
ValueAndTimestamp.make(vth.value(), vth.timestamp());
- };
-
- final MeteredWindowStoreIterator<ValueAndTimestamp<V>>
typedResult =
- new MeteredWindowStoreIterator<>(
- rawResult.getResult(),
- fetchSensor,
- iteratorDurationSensor,
- streamsMetrics,
- valueFrom,
- time,
- numOpenIterators,
- openIterators
- );
+ final MeteredWindowStoreIterator<ValueAndTimestamp<V>>
typedResult = meteredIterator(
+ rawResult,
+ rawValueTimestampHeaders -> {
+ final ValueTimestampHeaders<V> vth =
deserializeValue(rawValueTimestampHeaders);
+ return vth == null ? null :
ValueAndTimestamp.make(vth.value(), vth.timestamp());
+ }
+ );
+
final
QueryResult<MeteredWindowStoreIterator<ValueAndTimestamp<V>>> typedQueryResult =
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult,
typedResult);
queryResult = (QueryResult<R>) typedQueryResult;
} else {
// For non-timestamped stores, return plain V
- final Function<byte[], V> valueFrom = bytes -> {
- final ValueTimestampHeaders<V> vth =
serdes.valueFrom(bytes, new RecordHeaders());
- return vth == null ? null : vth.value();
- };
-
- final MeteredWindowStoreIterator<V> typedResult =
- new MeteredWindowStoreIterator<>(
- rawResult.getResult(),
- fetchSensor,
- iteratorDurationSensor,
- streamsMetrics,
- valueFrom,
- time,
- numOpenIterators,
- openIterators
- );
+ final MeteredWindowStoreIterator<V> typedResult =
meteredIterator(
+ rawResult,
+ rawValueTimestampHeaders -> {
+ final ValueTimestampHeaders<V> vth =
deserializeValue(rawValueTimestampHeaders);
+ return vth == null ? null : vth.value();
+ }
+ );
+
final QueryResult<MeteredWindowStoreIterator<V>>
typedQueryResult =
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult,
typedResult);
queryResult = (QueryResult<R>) typedQueryResult;
@@ -230,6 +216,20 @@ public class MeteredTimestampedWindowStoreWithHeaders<K, V>
return queryResult;
}
+ private <ValueType> MeteredWindowStoreIterator<ValueType> meteredIterator(
+ final QueryResult<WindowStoreIterator<byte[]>> rawResult,
+ final Function<byte[], ValueType> valueDeserializer
+ ) {
+ return new MeteredWindowStoreIterator<>(
+ rawResult.getResult(),
+ fetchSensor,
+ iteratorDurationSensor,
+ valueDeserializer,
+ time,
+ numOpenIterators,
+ openIterators
+ );
+ }
/**
* Handles WindowRangeQuery by creating a MeteredWindowedKeyValueIterator
with conversion from
@@ -249,49 +249,25 @@ public class MeteredTimestampedWindowStoreWithHeaders<K,
V>
final QueryResult<KeyValueIterator<Windowed<Bytes>, byte[]>>
rawResult =
wrapped().query(rawKeyQuery, positionBound, config);
if (rawResult.isSuccess()) {
- final Function<byte[], K> keyFrom = bytes ->
serdes.keyFrom(bytes, new RecordHeaders());
-
if (isUnderlyingStoreTimestamped()) {
// For timestamped stores, return ValueAndTimestamp<V>
- final Function<byte[], ValueAndTimestamp<V>> valueFrom =
bytes -> {
- final ValueTimestampHeaders<V> vth =
serdes.valueFrom(bytes, new RecordHeaders());
- return vth == null ? null :
ValueAndTimestamp.make(vth.value(), vth.timestamp());
- };
-
final MeteredWindowedKeyValueIterator<K,
ValueAndTimestamp<V>> typedResult =
- new MeteredWindowedKeyValueIterator<>(
- rawResult.getResult(),
- fetchSensor,
- iteratorDurationSensor,
- streamsMetrics,
- keyFrom,
- valueFrom,
- time,
- numOpenIterators,
- openIterators
- );
+ meteredWindowedIterator(
+ rawResult,
+ valueTimestampHeaders -> valueTimestampHeaders ==
null ? null : ValueAndTimestamp.make(valueTimestampHeaders.value(),
valueTimestampHeaders.timestamp())
+ );
+
final QueryResult<MeteredWindowedKeyValueIterator<K,
ValueAndTimestamp<V>>> typedQueryResult =
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult,
typedResult);
result = (QueryResult<R>) typedQueryResult;
} else {
// For non-timestamped stores, return plain V
- final Function<byte[], V> valueFrom = bytes -> {
- final ValueTimestampHeaders<V> vth =
serdes.valueFrom(bytes, new RecordHeaders());
- return vth == null ? null : vth.value();
- };
-
final MeteredWindowedKeyValueIterator<K, V> typedResult =
- new MeteredWindowedKeyValueIterator<>(
- rawResult.getResult(),
- fetchSensor,
- iteratorDurationSensor,
- streamsMetrics,
- keyFrom,
- valueFrom,
- time,
- numOpenIterators,
- openIterators
- );
+ meteredWindowedIterator(
+ rawResult,
+ valueTimestampHeaders -> valueTimestampHeaders ==
null ? null : valueTimestampHeaders.value()
+ );
+
final QueryResult<MeteredWindowedKeyValueIterator<K, V>>
typedQueryResult =
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult,
typedResult);
result = (QueryResult<R>) typedQueryResult;
@@ -312,28 +288,32 @@ public class MeteredTimestampedWindowStoreWithHeaders<K,
V>
}
@Override
- public KeyValueIterator<Windowed<K>, ValueTimestampHeaders<V>> fetch(final
K keyFrom,
- final
K keyTo,
- final
long timeFrom,
- final
long timeTo) {
+ public KeyValueIterator<Windowed<K>, ValueTimestampHeaders<V>> fetch(
+ final K keyFrom,
+ final K keyTo,
+ final long timeFrom,
+ final long timeTo
+ ) {
return new MeteredTimestampedWindowStoreWithHeadersKeyValueIterator(
wrapped().fetch(
- keyBytes(keyFrom, new RecordHeaders()),
- keyBytes(keyTo, new RecordHeaders()),
+ serializeKey(keyFrom, internalContext.headers()),
+ serializeKey(keyTo, internalContext.headers()),
timeFrom,
timeTo)
);
}
@Override
- public KeyValueIterator<Windowed<K>, ValueTimestampHeaders<V>>
backwardFetch(final K keyFrom,
-
final K keyTo,
-
final long timeFrom,
-
final long timeTo) {
+ public KeyValueIterator<Windowed<K>, ValueTimestampHeaders<V>>
backwardFetch(
+ final K keyFrom,
+ final K keyTo,
+ final long timeFrom,
+ final long timeTo
+ ) {
return new MeteredTimestampedWindowStoreWithHeadersKeyValueIterator(
wrapped().backwardFetch(
- keyBytes(keyFrom, new RecordHeaders()),
- keyBytes(keyTo, new RecordHeaders()),
+ serializeKey(keyFrom, internalContext.headers()),
+ serializeKey(keyTo, internalContext.headers()),
timeFrom,
timeTo)
);
@@ -403,9 +383,9 @@ public class MeteredTimestampedWindowStoreWithHeaders<K, V>
}
final KeyValue<Windowed<Bytes>, byte[]> next = iter.next();
- final ValueTimestampHeaders<V> valueTimestampHeaders =
serdes.valueFrom(next.value, new RecordHeaders());
+ final ValueTimestampHeaders<V> valueTimestampHeaders =
deserializeValue(next.value);
final Headers headers = valueTimestampHeaders != null ?
valueTimestampHeaders.headers() : new RecordHeaders();
- final K key = serdes.keyFrom(next.key.key().get(), headers);
+ final K key = deserializeKey(next.key.key().get(), headers);
final Windowed<K> windowedKey = new Windowed<>(key,
next.key.window());
return KeyValue.pair(windowedKey, valueTimestampHeaders);
}
@@ -432,8 +412,69 @@ public class MeteredTimestampedWindowStoreWithHeaders<K, V>
}
}
+ private <ValueType> MeteredWindowedKeyValueIterator<K, ValueType>
meteredWindowedIterator(
+ final QueryResult<KeyValueIterator<Windowed<Bytes>, byte[]>> rawResult,
+ final Function<ValueTimestampHeaders<V>, ValueType> valueConverter
+ ) {
+ return new MeteredWindowedKeyValueWithHeadersIterator<>(
+ rawResult.getResult(),
+ fetchSensor,
+ iteratorDurationSensor,
+ this::deserializeKey,
+ valueConverter,
+ time,
+ numOpenIterators,
+ openIterators
+ );
+ }
+
+ private final class MeteredWindowedKeyValueWithHeadersIterator<ValueType>
extends MeteredWindowedKeyValueIterator<K, ValueType> {
+ private final BiFunction<byte[], Headers, K> deserializeKey;
+ private final Function<ValueTimestampHeaders<V>, ValueType>
valueConverter;
+
+ MeteredWindowedKeyValueWithHeadersIterator(
+ final KeyValueIterator<Windowed<Bytes>, byte[]> iter,
+ final Sensor operationSensor,
+ final Sensor iteratorSensor,
+ final BiFunction<byte[], Headers, K> deserializeKey,
+ final Function<ValueTimestampHeaders<V>, ValueType> valueConverter,
+ final Time time,
+ final LongAdder numOpenIterators,
+ final Set<MeteredIterator> openIterators
+ ) {
+ super(
+ iter,
+ operationSensor,
+ iteratorSensor,
+ null, // should not be used in super-class
+ null, // should not be used in super-class
+ time,
+ numOpenIterators,
+ openIterators
+ );
+
+ this.deserializeKey = deserializeKey;
+ this.valueConverter = valueConverter;
+ }
+
+ @Override
+ public KeyValue<Windowed<K>, ValueType> next() {
+ final KeyValue<Windowed<Bytes>, byte[]> next = iter.next();
+ final ValueTimestampHeaders<V> valueTimestampHeaders =
deserializeValue(next.value);
+ return KeyValue.pair(
+ windowedKey(next.key, valueTimestampHeaders.headers()),
+ valueConverter.apply(valueTimestampHeaders)
+ );
+ }
+
+ private Windowed<K> windowedKey(final Windowed<Bytes> bytesKey, final
Headers headers) {
+ final K key = deserializeKey.apply(bytesKey.key().get(), headers);
+ return new Windowed<>(key, bytesKey.window());
+ }
+ }
+
private boolean isUnderlyingStoreTimestamped() {
- Object store = wrapped();
+ StateStore store = wrapped();
do {
// Check adapters first before attempting to unwrap
if (store instanceof TimestampedToHeadersWindowStoreAdapter) {
@@ -452,6 +493,21 @@ public class MeteredTimestampedWindowStoreWithHeaders<K, V>
break;
}
} while (true);
+
return false;
}
+
+ protected Bytes serializeKey(final K key, final Headers headers) {
+ return Bytes.wrap(serdes.rawKey(key, headers));
+ }
+
+ @Override
+ protected K deserializeKey(final byte[] rawKey) {
+ throw new
UnsupportedOperationException("MeteredTimestampedWindowStoreWithHeaders
required to pass in Headers when deserializing a key.");
+ }
+
+ protected K deserializeKey(final byte[] rawKey, final Headers headers) {
+ return serdes.keyFrom(rawKey, headers);
+ }
+
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
index 25d98162bd8..cfc3f90ac24 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
@@ -17,7 +17,6 @@
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Bytes;
@@ -61,8 +60,6 @@ import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency;
-// TODO: replace with new method in follow-up PR of KIP-1271
-@SuppressWarnings("deprecation")
public class MeteredWindowStore<K, V>
extends WrappedStateStore<WindowStore<Bytes, byte[]>, Windowed<K>, V>
implements WindowStore<K, V>, MeteredStateStore {
@@ -151,7 +148,7 @@ public class MeteredWindowStore<K, V>
e2eLatencySensor =
StateStoreMetrics.e2ELatencySensor(taskId.toString(), metricsScope, name(),
streamsMetrics);
iteratorDurationSensor =
StateStoreMetrics.iteratorDurationSensor(taskId.toString(), metricsScope,
name(), streamsMetrics);
StateStoreMetrics.addNumOpenIteratorsGauge(taskId.toString(),
metricsScope, name(), streamsMetrics,
- (config, now) -> numOpenIterators.sum());
+ (config, now) -> numOpenIterators.sum());
StateStoreMetrics.addOldestOpenIteratorGauge(taskId.toString(),
metricsScope, name(), streamsMetrics,
(config, now) -> {
try {
@@ -164,10 +161,10 @@ public class MeteredWindowStore<K, V>
);
if (!persistent()) {
StateStoreMetrics.addNumKeysGauge(taskId.toString(), metricsScope,
name(), streamsMetrics,
- (config, now) -> {
- final InMemoryWindowStore inMemoryStore =
findInMemoryWindowStore(wrapped());
- return inMemoryStore != null ?
inMemoryStore.numEntries() : -1L;
- }
+ (config, now) -> {
+ final InMemoryWindowStore inMemoryStore =
findInMemoryWindowStore(wrapped());
+ return inMemoryStore != null ? inMemoryStore.numEntries()
: -1L;
+ }
);
}
}
@@ -191,37 +188,51 @@ public class MeteredWindowStore<K, V>
final String storeName = name();
final String changelogTopic =
ProcessorContextUtils.changelogFor(context, storeName, Boolean.FALSE);
serdes = StoreSerdeInitializer.prepareStoreSerde(
- context, storeName, changelogTopic, keySerde, valueSerde,
this::prepareValueSerde);
+ context,
+ storeName,
+ changelogTopic,
+ keySerde,
+ valueSerde,
+ this::prepareValueSerde
+ );
}
@SuppressWarnings("unchecked")
@Override
- public boolean setFlushListener(final CacheFlushListener<Windowed<K>, V>
listener,
- final boolean sendOldValues) {
+ public boolean setFlushListener(final CacheFlushListener<Windowed<K>, V>
listener, final boolean sendOldValues) {
final WindowStore<Bytes, byte[]> wrapped = wrapped();
if (wrapped instanceof CachedStateStore) {
return ((CachedStateStore<byte[], byte[]>)
wrapped).setFlushListener(
- record -> listener.apply(
- record.withKey(WindowKeySchema.fromStoreKey(record.key(),
windowSizeMs, serdes.keyDeserializer(), serdes.topic()))
- .withValue(new Change<>(
- record.value().newValue != null ?
serdes.valueFrom(record.value().newValue, record.headers()) : null,
- record.value().oldValue != null ?
serdes.valueFrom(record.value().oldValue, record.headers()) : null,
- record.value().isLatest
- ))
- ),
- sendOldValues);
+ record -> {
+ final Change<byte[]> change = record.value();
+ listener.apply(
+ record.withKey(
+ WindowKeySchema.fromStoreKey(
+ record.key(),
+ windowSizeMs,
+ serdes.keyDeserializer(),
+ internalContext.headers(),
+ serdes.topic()
+ ))
+ .withValue(new Change<>(
+ change.newValue != null ?
serdes.valueFrom(change.newValue, record.headers()) : null,
+ change.oldValue != null ?
serdes.valueFrom(change.oldValue, record.headers()) : null,
+ change.isLatest
+ ))
+ );
+ },
+ sendOldValues
+ );
}
return false;
}
@Override
- public void put(final K key,
- final V value,
- final long windowStartTimestamp) {
+ public void put(final K key, final V value, final long
windowStartTimestamp) {
Objects.requireNonNull(key, "key cannot be null");
try {
maybeMeasureLatency(
- () -> wrapped().put(keyBytes(key), serdes.rawValue(value, new
RecordHeaders()), windowStartTimestamp),
+ () -> wrapped().put(serializeKey(key), serializeValue(value),
windowStartTimestamp),
time,
putSensor
);
@@ -233,16 +244,15 @@ public class MeteredWindowStore<K, V>
}
@Override
- public V fetch(final K key,
- final long timestamp) {
+ public V fetch(final K key, final long timestamp) {
Objects.requireNonNull(key, "key cannot be null");
return maybeMeasureLatency(
() -> {
- final byte[] result = wrapped().fetch(keyBytes(key),
timestamp);
+ final byte[] result = wrapped().fetch(serializeKey(key),
timestamp);
if (result == null) {
return null;
}
- return serdes.valueFrom(result, new RecordHeaders());
+ return deserializeValue(result);
},
time,
fetchSensor
@@ -250,16 +260,13 @@ public class MeteredWindowStore<K, V>
}
@Override
- public WindowStoreIterator<V> fetch(final K key,
- final long timeFrom,
- final long timeTo) {
+ public WindowStoreIterator<V> fetch(final K key, final long timeFrom,
final long timeTo) {
Objects.requireNonNull(key, "key cannot be null");
return new MeteredWindowStoreIterator<>(
- wrapped().fetch(keyBytes(key), timeFrom, timeTo),
+ wrapped().fetch(serializeKey(key), timeFrom, timeTo),
fetchSensor,
iteratorDurationSensor,
- streamsMetrics,
- serdes::valueFrom,
+ this::deserializeValue,
time,
numOpenIterators,
openIterators
@@ -267,16 +274,13 @@ public class MeteredWindowStore<K, V>
}
@Override
- public WindowStoreIterator<V> backwardFetch(final K key,
- final long timeFrom,
- final long timeTo) {
+ public WindowStoreIterator<V> backwardFetch(final K key, final long
timeFrom, final long timeTo) {
Objects.requireNonNull(key, "key cannot be null");
return new MeteredWindowStoreIterator<>(
- wrapped().backwardFetch(keyBytes(key), timeFrom, timeTo),
+ wrapped().backwardFetch(serializeKey(key), timeFrom, timeTo),
fetchSensor,
iteratorDurationSensor,
- streamsMetrics,
- serdes::valueFrom,
+ this::deserializeValue,
time,
numOpenIterators,
openIterators
@@ -284,75 +288,77 @@ public class MeteredWindowStore<K, V>
}
@Override
- public KeyValueIterator<Windowed<K>, V> fetch(final K keyFrom,
- final K keyTo,
- final long timeFrom,
- final long timeTo) {
+ public KeyValueIterator<Windowed<K>, V> fetch(
+ final K keyFrom,
+ final K keyTo,
+ final long timeFrom,
+ final long timeTo
+ ) {
return new MeteredWindowedKeyValueIterator<>(
wrapped().fetch(
- keyBytes(keyFrom),
- keyBytes(keyTo),
+ serializeKey(keyFrom),
+ serializeKey(keyTo),
timeFrom,
timeTo),
fetchSensor,
iteratorDurationSensor,
- streamsMetrics,
- serdes::keyFrom,
- serdes::valueFrom,
+ this::deserializeKey,
+ this::deserializeValue,
time,
numOpenIterators,
- openIterators);
+ openIterators
+ );
}
@Override
- public KeyValueIterator<Windowed<K>, V> backwardFetch(final K keyFrom,
- final K keyTo,
- final long timeFrom,
- final long timeTo) {
+ public KeyValueIterator<Windowed<K>, V> backwardFetch(
+ final K keyFrom,
+ final K keyTo,
+ final long timeFrom,
+ final long timeTo
+ ) {
return new MeteredWindowedKeyValueIterator<>(
wrapped().backwardFetch(
- keyBytes(keyFrom),
- keyBytes(keyTo),
+ serializeKey(keyFrom),
+ serializeKey(keyTo),
timeFrom,
timeTo),
fetchSensor,
iteratorDurationSensor,
- streamsMetrics,
- serdes::keyFrom,
- serdes::valueFrom,
+ this::deserializeKey,
+ this::deserializeValue,
time,
numOpenIterators,
- openIterators);
+ openIterators
+ );
}
@Override
- public KeyValueIterator<Windowed<K>, V> fetchAll(final long timeFrom,
- final long timeTo) {
+ public KeyValueIterator<Windowed<K>, V> fetchAll(final long timeFrom,
final long timeTo) {
return new MeteredWindowedKeyValueIterator<>(
wrapped().fetchAll(timeFrom, timeTo),
fetchSensor,
iteratorDurationSensor,
- streamsMetrics,
- serdes::keyFrom,
- serdes::valueFrom,
+ this::deserializeKey,
+ this::deserializeValue,
time,
numOpenIterators,
- openIterators);
+ openIterators
+ );
}
@Override
- public KeyValueIterator<Windowed<K>, V> backwardFetchAll(final long
timeFrom,
- final long
timeTo) {
+ public KeyValueIterator<Windowed<K>, V> backwardFetchAll(final long
timeFrom, final long timeTo) {
return new MeteredWindowedKeyValueIterator<>(
wrapped().backwardFetchAll(timeFrom, timeTo),
fetchSensor,
iteratorDurationSensor,
- streamsMetrics,
- serdes::keyFrom,
- serdes::valueFrom,
+ this::deserializeKey,
+ this::deserializeValue,
time,
numOpenIterators,
- openIterators);
+ openIterators
+ );
}
@Override
@@ -361,9 +367,8 @@ public class MeteredWindowStore<K, V>
wrapped().all(),
fetchSensor,
iteratorDurationSensor,
- streamsMetrics,
- serdes::keyFrom,
- serdes::valueFrom,
+ this::deserializeKey,
+ this::deserializeValue,
time,
numOpenIterators,
openIterators
@@ -376,9 +381,8 @@ public class MeteredWindowStore<K, V>
wrapped().backwardAll(),
fetchSensor,
iteratorDurationSensor,
- streamsMetrics,
- serdes::keyFrom,
- serdes::valueFrom,
+ this::deserializeKey,
+ this::deserializeValue,
time,
numOpenIterators,
openIterators
@@ -455,8 +459,7 @@ public class MeteredWindowStore<K, V>
rawResult.getResult(),
fetchSensor,
iteratorDurationSensor,
- streamsMetrics,
- serdes::keyFrom,
+ this::deserializeKey,
StoreQueryUtils.deserializeValue(serdes, wrapped()),
time,
numOpenIterators,
@@ -494,7 +497,7 @@ public class MeteredWindowStore<K, V>
if (typedQuery.getTimeFrom().isPresent() &&
typedQuery.getTimeTo().isPresent()) {
final WindowKeyQuery<Bytes, byte[]> rawKeyQuery =
WindowKeyQuery.withKeyAndWindowStartRange(
- keyBytes(typedQuery.getKey()),
+ serializeKey(typedQuery.getKey()),
typedQuery.getTimeFrom().get(),
typedQuery.getTimeTo().get()
);
@@ -508,7 +511,6 @@ public class MeteredWindowStore<K, V>
rawResult.getResult(),
fetchSensor,
iteratorDurationSensor,
- streamsMetrics,
StoreQueryUtils.deserializeValue(serdes, wrapped()),
time,
numOpenIterators,
@@ -534,12 +536,20 @@ public class MeteredWindowStore<K, V>
return queryResult;
}
- private Bytes keyBytes(final K key) {
- return Bytes.wrap(serdes.rawKey(key, new RecordHeaders()));
+ private Bytes serializeKey(final K key) {
+ return Bytes.wrap(serdes.rawKey(key, internalContext.headers()));
+ }
+
+ protected K deserializeKey(final byte[] rawKey) {
+ return serdes.keyFrom(rawKey, internalContext.headers());
+ }
+
+ protected byte[] serializeValue(final V value) {
+ return value != null ? serdes.rawValue(value,
internalContext.headers()) : null;
}
- protected V outerValue(final byte[] value) {
- return value != null ? serdes.valueFrom(value, new RecordHeaders()) :
null;
+ protected V deserializeValue(final byte[] rawValue) {
+ return rawValue != null ? serdes.valueFrom(rawValue,
internalContext.headers()) : null;
}
protected void maybeRecordE2ELatency() {
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreIterator.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreIterator.java
index 119cb739558..b0c752f7d50 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreIterator.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreIterator.java
@@ -19,7 +19,6 @@ package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.StreamsMetrics;
import org.apache.kafka.streams.state.WindowStoreIterator;
import java.util.Set;
@@ -31,7 +30,6 @@ class MeteredWindowStoreIterator<V> implements
WindowStoreIterator<V>, MeteredIt
private final WindowStoreIterator<byte[]> iter;
private final Sensor operationSensor;
private final Sensor iteratorSensor;
- private final StreamsMetrics metrics;
private final Function<byte[], V> valueFrom;
private final long startNs;
private final long startTimestampMs;
@@ -42,7 +40,6 @@ class MeteredWindowStoreIterator<V> implements
WindowStoreIterator<V>, MeteredIt
MeteredWindowStoreIterator(final WindowStoreIterator<byte[]> iter,
final Sensor operationSensor,
final Sensor iteratorSensor,
- final StreamsMetrics metrics,
final Function<byte[], V> valueFrom,
final Time time,
final LongAdder numOpenIterators,
@@ -50,7 +47,6 @@ class MeteredWindowStoreIterator<V> implements
WindowStoreIterator<V>, MeteredIt
this.iter = iter;
this.operationSensor = operationSensor;
this.iteratorSensor = iteratorSensor;
- this.metrics = metrics;
this.valueFrom = valueFrom;
this.startNs = time.nanoseconds();
this.startTimestampMs = time.milliseconds();
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowedKeyValueIterator.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowedKeyValueIterator.java
index 2f4bf65a56e..a441d825408 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowedKeyValueIterator.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowedKeyValueIterator.java
@@ -20,7 +20,6 @@ import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.StreamsMetrics;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.state.KeyValueIterator;
@@ -30,12 +29,11 @@ import java.util.function.Function;
class MeteredWindowedKeyValueIterator<K, V> implements
KeyValueIterator<Windowed<K>, V>, MeteredIterator {
- private final KeyValueIterator<Windowed<Bytes>, byte[]> iter;
+ final KeyValueIterator<Windowed<Bytes>, byte[]> iter;
private final Sensor operationSensor;
private final Sensor iteratorSensor;
- private final StreamsMetrics metrics;
- private final Function<byte[], K> deserializeKey;
- private final Function<byte[], V> deserializeValue;
+ final Function<byte[], K> deserializeKey;
+ final Function<byte[], V> deserializeValue;
private final long startNs;
private final long startTimestampMs;
private final Time time;
@@ -45,7 +43,6 @@ class MeteredWindowedKeyValueIterator<K, V> implements
KeyValueIterator<Windowed
MeteredWindowedKeyValueIterator(final KeyValueIterator<Windowed<Bytes>,
byte[]> iter,
final Sensor operationSensor,
final Sensor iteratorSensor,
- final StreamsMetrics metrics,
final Function<byte[], K> deserializeKey,
final Function<byte[], V> deserializeValue,
final Time time,
@@ -54,7 +51,6 @@ class MeteredWindowedKeyValueIterator<K, V> implements
KeyValueIterator<Windowed
this.iter = iter;
this.operationSensor = operationSensor;
this.iteratorSensor = iteratorSensor;
- this.metrics = metrics;
this.deserializeKey = deserializeKey;
this.deserializeValue = deserializeValue;
this.startNs = time.nanoseconds();
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java
index d294f04b663..e837b060c5b 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java
@@ -220,17 +220,6 @@ public class WindowKeySchema implements
RocksDBSegmentedBytesStore.KeySchema {
return ByteBuffer.wrap(binaryKey).getInt(binaryKey.length -
SEQNUM_SIZE);
}
- // TODO: Remove this method when MeteredWindowStore will use headers
version
- @Deprecated
- public static <K> Windowed<K> fromStoreKey(final byte[] binaryKey,
- final long windowSize,
- final Deserializer<K>
deserializer,
- final String topic) {
- final K key = deserializer.deserialize(topic,
extractStoreKeyBytes(binaryKey));
- final Window window = extractStoreWindow(binaryKey, windowSize);
- return new Windowed<>(key, window);
- }
-
public static <K> Windowed<K> fromStoreKey(final byte[] binaryKey,
final long windowSize,
final Deserializer<K>
deserializer,
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
index f3f29572030..250e10169b6 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
@@ -16,6 +16,9 @@
*/
package org.apache.kafka.streams.kstream.internals;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeader;
+import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.Serde;
@@ -106,7 +109,9 @@ public class KTableAggregateTest {
final TestInputTopic<String, String> inputTopic =
driver.createInputTopic(topic1, new StringSerializer(), new
StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
- inputTopic.pipeInput("A", "1", 10L);
+ final Headers headers = new RecordHeaders();
+ headers.add(new RecordHeader("header-key",
"header-value".getBytes(StandardCharsets.UTF_8)));
+ inputTopic.pipeInput(new TestRecord<>("A", "1", headers, 10L));
inputTopic.pipeInput("B", "2", 15L);
inputTopic.pipeInput("A", "3", 20L);
inputTopic.pipeInput("B", "4", 18L);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreWithHeadersTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreWithHeadersTest.java
index ec379f578bd..726054e787f 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreWithHeadersTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreWithHeadersTest.java
@@ -24,6 +24,7 @@ import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
@@ -34,6 +35,7 @@ import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.TimeWindow;
import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.KeyValueIterator;
@@ -252,7 +254,7 @@ public class MeteredTimestampedWindowStoreWithHeadersTest {
}
private static byte[] serializeValueTimestampHeaders() {
- final ValueTimestampHeadersSerializer<String> serializer = new
ValueTimestampHeadersSerializer<>(Serdes.String().serializer());
+ final ValueTimestampHeadersSerializer<String> serializer = new
ValueTimestampHeadersSerializer<>(new StringSerializer());
return serializer.serialize("topic", VALUE_TIMESTAMP_HEADERS);
}
@@ -264,15 +266,20 @@ public class MeteredTimestampedWindowStoreWithHeadersTest
{
final Deserializer<ValueTimestampHeaders<String>> valueDeserializer =
mock(Deserializer.class);
final Serializer<ValueTimestampHeaders<String>> valueSerializer =
mock(Serializer.class);
when(keySerde.serializer()).thenReturn(keySerializer);
- // For fetch: key serialization uses empty headers (no value context
available)
- when(keySerializer.serialize(topic, new RecordHeaders(),
KEY)).thenReturn(KEY.getBytes());
// For put: key serialization uses value's headers
when(keySerializer.serialize(topic, HEADERS,
KEY)).thenReturn(KEY.getBytes());
when(valueSerde.deserializer()).thenReturn(valueDeserializer);
- when(valueDeserializer.deserialize(topic, new RecordHeaders(),
VALUE_TIMESTAMP_HEADERS_BYTES)).thenReturn(VALUE_TIMESTAMP_HEADERS);
+ when(valueDeserializer.deserialize(topic, HEADERS,
VALUE_TIMESTAMP_HEADERS_BYTES)).thenReturn(VALUE_TIMESTAMP_HEADERS);
when(valueSerde.serializer()).thenReturn(valueSerializer);
// For put: value serialization uses value's headers
when(valueSerializer.serialize(topic, HEADERS,
VALUE_TIMESTAMP_HEADERS)).thenReturn(VALUE_TIMESTAMP_HEADERS_BYTES);
+ context.setRecordContext(new ProcessorRecordContext(
+ 0L,
+ 0L,
+ 0,
+ topic,
+ HEADERS
+ ));
when(innerStoreMock.fetch(KEY_BYTES,
TIMESTAMP)).thenReturn(VALUE_TIMESTAMP_HEADERS_BYTES);
store = new MeteredTimestampedWindowStoreWithHeaders<>(
innerStoreMock,
@@ -324,7 +331,6 @@ public class MeteredTimestampedWindowStoreWithHeadersTest {
}
@Test
- @SuppressWarnings("unchecked")
public void shouldUseHeadersFromValueToDeserializeKeyInFetchAll() {
setUp();
@@ -351,7 +357,6 @@ public class MeteredTimestampedWindowStoreWithHeadersTest {
}
@Test
- @SuppressWarnings("unchecked")
public void shouldUseHeadersFromValueToDeserializeKeyInAll() {
setUp();
@@ -377,7 +382,6 @@ public class MeteredTimestampedWindowStoreWithHeadersTest {
}
@Test
- @SuppressWarnings("unchecked")
public void shouldUseHeadersFromValueToDeserializeKeyInFetchRange() {
setUp();
@@ -405,7 +409,6 @@ public class MeteredTimestampedWindowStoreWithHeadersTest {
}
@Test
- @SuppressWarnings("unchecked")
public void shouldUseHeadersFromValueToDeserializeKeyInBackwardFetchAll() {
setUp();
@@ -431,7 +434,6 @@ public class MeteredTimestampedWindowStoreWithHeadersTest {
}
@Test
- @SuppressWarnings("unchecked")
public void shouldUseHeadersFromValueToDeserializeKeyInBackwardAll() {
setUp();
@@ -457,7 +459,6 @@ public class MeteredTimestampedWindowStoreWithHeadersTest {
}
@Test
- @SuppressWarnings("unchecked")
public void
shouldUseHeadersFromValueToDeserializeKeyInBackwardFetchRange() {
setUp();