This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new e952ccd52ba KAFKA-20173: Ensure Metered kv-stores pass headers
correctly (#21768)
e952ccd52ba is described below
commit e952ccd52ba0579bdcfd7f177f24bc38321df1e3
Author: Matthias J. Sax <[email protected]>
AuthorDate: Tue Mar 17 15:38:36 2026 -0700
KAFKA-20173: Ensure Metered kv-stores pass headers correctly (#21768)
Ensures that all Metered KV-stores (plain, ts, headers, version) pass
headers into de/serializers.
Reviewers: Alieh Saeedi <[email protected]>, TengYao Chi
<[email protected]>, Uladzislau Blok, <[email protected]>, Bill
Bejeck <[email protected]>
---
.../apache/kafka/streams/state/StateSerdes.java | 4 +-
.../state/internals/MeteredKeyValueStore.java | 91 +++---------
...MeteredTimestampedKeyValueStoreWithHeaders.java | 154 +++++++++++++--------
.../internals/MeteredVersionedKeyValueStore.java | 12 +-
.../streams/state/internals/StoreQueryUtils.java | 35 +++--
...redTimestampedKeyValueStoreWithHeadersTest.java | 1 +
6 files changed, 152 insertions(+), 145 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java
b/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java
index 7128dfc85e2..8632dd37bfe 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java
@@ -207,6 +207,7 @@ public final class StateSerdes<K, V> {
* @param key the key to be serialized
* @return the serialized key
*/
+ @SuppressWarnings("resource")
public byte[] rawKey(final K key, final Headers headers) {
try {
return keySerde.serializer().serialize(topic, headers, key);
@@ -230,7 +231,6 @@ public final class StateSerdes<K, V> {
* @deprecated Since 4.3. Use {@link #rawValue(Object, Headers)} instead.
*/
@Deprecated
- @SuppressWarnings("rawtypes")
public byte[] rawValue(final V value) {
return rawValue(value, new RecordHeaders());
}
@@ -241,7 +241,7 @@ public final class StateSerdes<K, V> {
* @param value the value to be serialized
* @return the serialized value
*/
- @SuppressWarnings("rawtypes")
+ @SuppressWarnings({"rawtypes", "resource"})
public byte[] rawValue(final V value, final Headers headers) {
try {
return valueSerde.serializer().serialize(topic, headers, value);
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
index 3be8fc8a962..d94b8fd5253 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
@@ -87,7 +87,7 @@ public class MeteredKeyValueStore<K, V>
protected Sensor putIfAbsentSensor;
protected Sensor getSensor;
protected Sensor deleteSensor;
- private Sensor putAllSensor;
+ protected Sensor putAllSensor;
protected Sensor allSensor;
protected Sensor rangeSensor;
protected Sensor prefixScanSensor;
@@ -288,10 +288,9 @@ public class MeteredKeyValueStore<K, V>
wrapped().query(rawRangeQuery, positionBound, config);
if (rawResult.isSuccess()) {
final KeyValueIterator<Bytes, byte[]> iterator =
rawResult.getResult();
- final KeyValueIterator<K, V> resultIterator = new
MeteredKeyValueTimestampedIterator(
+ final KeyValueIterator<K, V> resultIterator = new
MeteredKeyValueStoreIterator(
iterator,
- getSensor,
- StoreQueryUtils.deserializeValue(serdes, wrapped())
+ getSensor
);
final QueryResult<KeyValueIterator<K, V>> typedQueryResult =
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(
@@ -387,13 +386,13 @@ public class MeteredKeyValueStore<K, V>
public <PS extends Serializer<P>, P> KeyValueIterator<K, V>
prefixScan(final P prefix, final PS prefixKeySerializer) {
Objects.requireNonNull(prefix, "prefix cannot be null");
Objects.requireNonNull(prefixKeySerializer, "prefixKeySerializer
cannot be null");
- return new MeteredKeyValueIterator(wrapped().prefixScan(prefix,
prefixKeySerializer), prefixScanSensor);
+ return new MeteredKeyValueStoreIterator(wrapped().prefixScan(prefix,
prefixKeySerializer), prefixScanSensor);
}
@Override
public KeyValueIterator<K, V> range(final K from,
final K to) {
- return new MeteredKeyValueIterator(
+ return new MeteredKeyValueStoreIterator(
wrapped().range(serializeKey(from), serializeKey(to)),
rangeSensor
);
@@ -402,7 +401,7 @@ public class MeteredKeyValueStore<K, V>
@Override
public KeyValueIterator<K, V> reverseRange(final K from,
final K to) {
- return new MeteredKeyValueIterator(
+ return new MeteredKeyValueStoreIterator(
wrapped().reverseRange(serializeKey(from), serializeKey(to)),
rangeSensor
);
@@ -410,12 +409,12 @@ public class MeteredKeyValueStore<K, V>
@Override
public KeyValueIterator<K, V> all() {
- return new MeteredKeyValueIterator(wrapped().all(), allSensor);
+ return new MeteredKeyValueStoreIterator(wrapped().all(), allSensor);
}
@Override
public KeyValueIterator<K, V> reverseAll() {
- return new MeteredKeyValueIterator(wrapped().reverseAll(), allSensor);
+ return new MeteredKeyValueStoreIterator(wrapped().reverseAll(),
allSensor);
}
@Override
@@ -450,7 +449,7 @@ public class MeteredKeyValueStore<K, V>
}
protected K deserializeKey(final byte[] rawKey) {
- return rawKey != null ? serdes.keyFrom(rawKey,
internalContext.headers()) : null;
+ return serdes.keyFrom(rawKey, internalContext.headers());
}
private List<KeyValue<Bytes, byte[]>> innerEntries(final List<KeyValue<K,
V>> from) {
@@ -469,15 +468,15 @@ public class MeteredKeyValueStore<K, V>
}
}
- private class MeteredKeyValueIterator implements KeyValueIterator<K, V>,
MeteredIterator {
+ private class MeteredKeyValueStoreIterator implements KeyValueIterator<K,
V>, MeteredIterator {
private final KeyValueIterator<Bytes, byte[]> iter;
private final Sensor sensor;
private final long startNs;
private final long startTimestamp;
- private MeteredKeyValueIterator(final KeyValueIterator<Bytes, byte[]>
iter,
- final Sensor sensor) {
+ private MeteredKeyValueStoreIterator(final KeyValueIterator<Bytes,
byte[]> iter,
+ final Sensor sensor) {
this.iter = iter;
this.sensor = sensor;
this.startTimestamp = time.milliseconds();
@@ -500,6 +499,9 @@ public class MeteredKeyValueStore<K, V>
public KeyValue<K, V> next() {
final KeyValue<Bytes, byte[]> keyValue = iter.next();
return KeyValue.pair(
+ // note: `MeteredKeyValueStoreIterator` is also use on the IQ
code path,
+ // and that fine: `internalContext.headers()` will return `new
RecordHeaders()`
+ // what make sense as for IQ there is no "record context" at
hand.
deserializeKey(keyValue.key.get()),
deserializeValue(keyValue.value));
}
@@ -519,66 +521,9 @@ public class MeteredKeyValueStore<K, V>
@Override
public K peekNextKey() {
- return deserializeKey(iter.peekNextKey().get());
- }
- }
-
- private class MeteredKeyValueTimestampedIterator implements
KeyValueIterator<K, V>, MeteredIterator {
-
- private final KeyValueIterator<Bytes, byte[]> iter;
- private final Sensor sensor;
- private final long startNs;
- private final long startTimestamp;
- private final Function<byte[], V> valueDeserializer;
-
- private MeteredKeyValueTimestampedIterator(
- final KeyValueIterator<Bytes, byte[]> iter,
- final Sensor sensor,
- final Function<byte[], V> valueDeserializer
- ) {
- this.iter = iter;
- this.sensor = sensor;
- this.valueDeserializer = valueDeserializer;
- this.startTimestamp = time.milliseconds();
- this.startNs = time.nanoseconds();
- numOpenIterators.increment();
- openIterators.add(this);
- }
-
- @Override
- public long startTimestamp() {
- return startTimestamp;
- }
-
- @Override
- public boolean hasNext() {
- return iter.hasNext();
- }
-
- @Override
- public KeyValue<K, V> next() {
- final KeyValue<Bytes, byte[]> keyValue = iter.next();
- return KeyValue.pair(
- deserializeKey(keyValue.key.get()),
- valueDeserializer.apply(keyValue.value)
- );
- }
-
- @Override
- public void close() {
- try {
- iter.close();
- } finally {
- final long duration = time.nanoseconds() - startNs;
- sensor.record(duration);
- iteratorDurationSensor.record(duration);
- numOpenIterators.decrement();
- openIterators.remove(this);
- }
- }
-
- @Override
- public K peekNextKey() {
+ // note: `MeteredKeyValueStoreIterator` is also use on the IQ code
path,
+ // and that fine: `internalContext.headers()` will return `new
RecordHeaders()`
+ // what make sense as for IQ there is no "record context" at hand.
return deserializeKey(iter.peekNextKey().get());
}
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java
index 2346c7646af..f77471ad905 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java
@@ -43,6 +43,8 @@ import
org.apache.kafka.streams.state.TimestampedKeyValueStoreWithHeaders;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.ValueTimestampHeaders;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;
@@ -57,7 +59,7 @@ import static
org.apache.kafka.streams.state.internals.ValueTimestampHeadersDese
* A Metered {@link TimestampedKeyValueStoreWithHeaders} wrapper that is used
for recording operation metrics, and hence
* its inner KeyValueStore implementation does not need to provide its own
metrics collecting functionality.
*
- * The inner {@link KeyValueStore} of this class is of type <Bytes,
byte[]>,
+ * <p> The inner {@link KeyValueStore} of this class is of type <Bytes,
byte[]>,
* hence we use {@link Serde}s to convert from <K,
ValueTimestampHeaders<V>> to <Bytes, byte[]>.
*
* @param <K> key type
@@ -107,6 +109,17 @@ public class MeteredTimestampedKeyValueStoreWithHeaders<K,
V>
}
}
+ @Override
+ public ValueTimestampHeaders<V> get(final K key) {
+ Objects.requireNonNull(key, "key cannot be null");
+ try {
+ return maybeMeasureLatency(() ->
deserializeValue(wrapped().get(serializeKey(key, internalContext.headers()))),
time, getSensor);
+ } catch (final ProcessorStateException e) {
+ final String message = String.format(e.getMessage(), key);
+ throw new ProcessorStateException(message, e);
+ }
+ }
+
@Override
public void put(final K key,
final ValueTimestampHeaders<V> value) {
@@ -118,7 +131,7 @@ public class MeteredTimestampedKeyValueStoreWithHeaders<K,
V>
final ProcessorRecordContext currentContext =
internalContext.recordContext();
// Create new headers object to isolate tombstone
operation from input record
- final Headers deleteHeaders = new
RecordHeaders(currentContext.headers());
+ final Headers tombstoneHeaders = new
RecordHeaders(currentContext.headers());
// Create temporary context with new headers
final ProcessorRecordContext temporaryContext = new
ProcessorRecordContext(
@@ -126,19 +139,21 @@ public class
MeteredTimestampedKeyValueStoreWithHeaders<K, V>
currentContext.offset(),
currentContext.partition(),
currentContext.topic(),
- deleteHeaders
+ tombstoneHeaders
);
try {
internalContext.setRecordContext(temporaryContext);
- wrapped().put(keyBytes(key, deleteHeaders),
serdes.rawValue(null, deleteHeaders));
+ wrapped().put(serializeKey(key, tombstoneHeaders),
serializeValue(null));
} finally {
// Restore original context
internalContext.setRecordContext(currentContext);
}
} else {
+ // it's ok to only pass header into `serializeKey`,
because for the value case passed-in headers are
+ // getting ignored anyway, because the value (of type
`ValueTimestampHeaders`) itself carries the headers
final Headers headers = value.headers();
- wrapped().put(keyBytes(key, headers),
serdes.rawValue(value, headers));
+ wrapped().put(serializeKey(key, headers),
serializeValue(value));
}
},
time,
@@ -161,7 +176,7 @@ public class MeteredTimestampedKeyValueStoreWithHeaders<K,
V>
final ProcessorRecordContext currentContext =
internalContext.recordContext();
// Create new headers object to isolate tombstone
operation from input record
- final Headers deleteHeaders = new
RecordHeaders(currentContext.headers());
+ final Headers tombstoneHeaders = new
RecordHeaders(currentContext.headers());
// Create temporary context with new headers
final ProcessorRecordContext temporaryContext = new
ProcessorRecordContext(
@@ -169,19 +184,23 @@ public class
MeteredTimestampedKeyValueStoreWithHeaders<K, V>
currentContext.offset(),
currentContext.partition(),
currentContext.topic(),
- deleteHeaders
+ tombstoneHeaders
);
try {
internalContext.setRecordContext(temporaryContext);
- return
deserializeValue(wrapped().putIfAbsent(keyBytes(key, deleteHeaders),
serdes.rawValue(null, deleteHeaders)));
+ return
deserializeValue(wrapped().putIfAbsent(serializeKey(key, tombstoneHeaders),
serializeValue(null)));
} finally {
// Restore original context
internalContext.setRecordContext(currentContext);
}
} else {
+ // it's ok to only pass header into `serializeKey`,
because for the value case passed-in headers are
+ // getting ignored anyway, because the value (of type
`ValueTimestampHeaders`) itself carries the headers
final Headers headers = value.headers();
- return
deserializeValue(wrapped().putIfAbsent(keyBytes(key, headers),
serdes.rawValue(value, headers)));
+ // `rawOldValue` returned from
`wrapped().putIfAbsent(...)` is type ValueTimestampHeader
+ // -> no need to pass in Headers into `deserializeValue()`
+ return
deserializeValue(wrapped().putIfAbsent(serializeKey(key, headers),
serializeValue(value)));
}
},
time,
@@ -192,7 +211,7 @@ public class MeteredTimestampedKeyValueStoreWithHeaders<K,
V>
}
@Override
- public void putAll(final java.util.List<KeyValue<K,
ValueTimestampHeaders<V>>> entries) {
+ public void putAll(final List<KeyValue<K, ValueTimestampHeaders<V>>>
entries) {
entries.forEach(entry -> Objects.requireNonNull(entry.key, "key cannot
be null"));
final boolean hasNullValue = entries.stream().anyMatch(entry ->
entry.value == null);
@@ -200,9 +219,19 @@ public class MeteredTimestampedKeyValueStoreWithHeaders<K,
V>
if (hasNullValue) {
entries.forEach(entry -> put(entry.key, entry.value));
} else {
- // If no null values, use parent's batch optimization
- super.putAll(entries);
+ maybeMeasureLatency(() -> wrapped().putAll(innerEntries(entries)),
time, putAllSensor);
+ }
+ }
+
+ private List<KeyValue<Bytes, byte[]>> innerEntries(final List<KeyValue<K,
ValueTimestampHeaders<V>>> from) {
+ final List<KeyValue<Bytes, byte[]>> byteEntries = new ArrayList<>();
+ for (final KeyValue<K, ValueTimestampHeaders<V>> entry : from) {
+ // it's ok to only pass header into `serializeKey`, because for
the value case passed-in headers are
+ // getting ignored anyway, because the value (of type
`ValueTimestampHeaders`) itself carries the headers
+ final Headers headers = entry.value != null ?
entry.value.headers() : internalContext.headers();
+ byteEntries.add(KeyValue.pair(serializeKey(entry.key, headers),
serializeValue(entry.value)));
}
+ return byteEntries;
}
@Override
@@ -214,7 +243,7 @@ public class MeteredTimestampedKeyValueStoreWithHeaders<K,
V>
final ProcessorRecordContext currentContext =
internalContext.recordContext();
// Create new headers object to isolate delete operation
from input record
- final Headers deleteHeaders = new
RecordHeaders(currentContext.headers());
+ final Headers tombstoneHeaders = new
RecordHeaders(currentContext.headers());
// Create temporary context with new headers
final ProcessorRecordContext temporaryContext = new
ProcessorRecordContext(
@@ -222,12 +251,12 @@ public class
MeteredTimestampedKeyValueStoreWithHeaders<K, V>
currentContext.offset(),
currentContext.partition(),
currentContext.topic(),
- deleteHeaders
+ tombstoneHeaders
);
try {
internalContext.setRecordContext(temporaryContext);
- final byte[] deletedValue =
wrapped().delete(keyBytes(key, deleteHeaders));
+ final byte[] deletedValue =
wrapped().delete(serializeKey(key, tombstoneHeaders));
return deserializeValue(deletedValue);
} finally {
// Restore original context
@@ -294,10 +323,11 @@ public class
MeteredTimestampedKeyValueStoreWithHeaders<K, V>
final QueryResult<R> result;
final KeyQuery<K, V> typedKeyQuery = (KeyQuery<K, V>) query;
final KeyQuery<Bytes, byte[]> rawKeyQuery =
- KeyQuery.withKey(keyBytes(typedKeyQuery.getKey(), new
RecordHeaders()));
+ KeyQuery.withKey(serializeKey(typedKeyQuery.getKey(),
internalContext.headers()));
final QueryResult<byte[]> rawResult =
wrapped().query(rawKeyQuery, positionBound, config);
if (rawResult.isSuccess()) {
+ // value will be `rawValueTimestampHeader`; no need to pass
headers explicitly
final Function<byte[], ValueTimestampHeaders<V>> deserializer =
StoreQueryUtils.deserializeValue(serdes, wrapped());
final ValueTimestampHeaders<V> valueTimestampHeaders =
deserializer.apply(rawResult.getResult());
final V plainValue = valueTimestampHeaders == null ? null :
valueTimestampHeaders.value();
@@ -318,10 +348,11 @@ public class
MeteredTimestampedKeyValueStoreWithHeaders<K, V>
final QueryResult<R> result;
final TimestampedKeyQuery<K, V> typedKeyQuery =
(TimestampedKeyQuery<K, V>) query;
final KeyQuery<Bytes, byte[]> rawKeyQuery =
- KeyQuery.withKey(keyBytes(typedKeyQuery.key(), new
RecordHeaders()));
+ KeyQuery.withKey(serializeKey(typedKeyQuery.key(),
internalContext.headers()));
final QueryResult<byte[]> rawResult =
wrapped().query(rawKeyQuery, positionBound, config);
if (rawResult.isSuccess()) {
+ // value will be `rawValueTimestampHeader`; no need to pass
headers explicitly
final Function<byte[], ValueTimestampHeaders<V>> deserializer =
StoreQueryUtils.deserializeValue(serdes, wrapped());
final ValueTimestampHeaders<V> valueTimestampHeaders =
deserializer.apply(rawResult.getResult());
// Convert ValueTimestampHeaders to ValueAndTimestamp for the
result
@@ -348,8 +379,8 @@ public class MeteredTimestampedKeyValueStoreWithHeaders<K,
V>
RangeQuery<Bytes, byte[]> rawRangeQuery;
final ResultOrder order = typedQuery.resultOrder();
rawRangeQuery = RangeQuery.withRange(
- keyBytes(typedQuery.getLowerBound().orElse(null), new
RecordHeaders()),
- keyBytes(typedQuery.getUpperBound().orElse(null), new
RecordHeaders())
+ serializeKey(typedQuery.getLowerBound().orElse(null),
internalContext.headers()),
+ serializeKey(typedQuery.getUpperBound().orElse(null),
internalContext.headers())
);
if (order.equals(ResultOrder.DESCENDING)) {
rawRangeQuery = rawRangeQuery.withDescendingKeys();
@@ -361,9 +392,10 @@ public class MeteredTimestampedKeyValueStoreWithHeaders<K,
V>
wrapped().query(rawRangeQuery, positionBound, config);
if (rawResult.isSuccess()) {
final KeyValueIterator<Bytes, byte[]> iterator =
rawResult.getResult();
- final KeyValueIterator<K, V> resultIterator = new
MeteredTimestampedKeyValueStoreWithHeadersIterator(
+ final KeyValueIterator<K, V> resultIterator = new
MeteredTimestampedKeyValueStoreWithHeadersQueryIterator(
iterator,
getSensor,
+ // value will be `rawValueTimestampHeader`; no need to
pass headers explicitly
StoreQueryUtils.deserializeValue(serdes, wrapped()),
true
);
@@ -390,8 +422,8 @@ public class MeteredTimestampedKeyValueStoreWithHeaders<K,
V>
RangeQuery<Bytes, byte[]> rawRangeQuery;
final ResultOrder order = typedQuery.resultOrder();
rawRangeQuery = RangeQuery.withRange(
- keyBytes(typedQuery.lowerBound().orElse(null), new
RecordHeaders()),
- keyBytes(typedQuery.upperBound().orElse(null), new
RecordHeaders())
+ serializeKey(typedQuery.lowerBound().orElse(null),
internalContext.headers()),
+ serializeKey(typedQuery.upperBound().orElse(null),
internalContext.headers())
);
if (order.equals(ResultOrder.DESCENDING)) {
rawRangeQuery = rawRangeQuery.withDescendingKeys();
@@ -404,9 +436,10 @@ public class MeteredTimestampedKeyValueStoreWithHeaders<K,
V>
if (rawResult.isSuccess()) {
final KeyValueIterator<Bytes, byte[]> iterator =
rawResult.getResult();
final KeyValueIterator<K, ValueAndTimestamp<V>> resultIterator =
- (KeyValueIterator<K, ValueAndTimestamp<V>>) new
MeteredTimestampedKeyValueStoreWithHeadersIterator(
+ (KeyValueIterator<K, ValueAndTimestamp<V>>) new
MeteredTimestampedKeyValueStoreWithHeadersQueryIterator(
iterator,
getSensor,
+ // value will be `rawValueTimestampHeader`; no
need to pass headers explicitly
StoreQueryUtils.deserializeValue(serdes,
wrapped()),
false
);
@@ -428,7 +461,7 @@ public class MeteredTimestampedKeyValueStoreWithHeaders<K,
V>
final PS prefixKeySerializer) {
Objects.requireNonNull(prefix, "prefix cannot be null");
Objects.requireNonNull(prefixKeySerializer, "prefixKeySerializer
cannot be null");
- return new MeteredValueTimestampHeadersIterator(
+ return new MeteredTimestampedKeyValueStoreWithHeadersIterator(
wrapped().prefixScan(prefix, prefixKeySerializer),
prefixScanSensor
);
@@ -437,10 +470,10 @@ public class
MeteredTimestampedKeyValueStoreWithHeaders<K, V>
@Override
public KeyValueIterator<K, ValueTimestampHeaders<V>> range(final K from,
final K to) {
- return new MeteredValueTimestampHeadersIterator(
+ return new MeteredTimestampedKeyValueStoreWithHeadersIterator(
wrapped().range(
- keyBytes(from, new RecordHeaders()),
- keyBytes(to, new RecordHeaders())
+ serializeKey(from, internalContext.headers()),
+ serializeKey(to, internalContext.headers())
),
rangeSensor
);
@@ -449,10 +482,10 @@ public class
MeteredTimestampedKeyValueStoreWithHeaders<K, V>
@Override
public KeyValueIterator<K, ValueTimestampHeaders<V>> reverseRange(final K
from,
final K
to) {
- return new MeteredValueTimestampHeadersIterator(
+ return new MeteredTimestampedKeyValueStoreWithHeadersIterator(
wrapped().reverseRange(
- keyBytes(from, new RecordHeaders()),
- keyBytes(to, new RecordHeaders())
+ serializeKey(from, internalContext.headers()),
+ serializeKey(to, internalContext.headers())
),
rangeSensor
);
@@ -460,7 +493,7 @@ public class MeteredTimestampedKeyValueStoreWithHeaders<K,
V>
@Override
public KeyValueIterator<K, ValueTimestampHeaders<V>> all() {
- return new MeteredValueTimestampHeadersIterator(
+ return new MeteredTimestampedKeyValueStoreWithHeadersIterator(
wrapped().all(),
allSensor
);
@@ -468,14 +501,14 @@ public class
MeteredTimestampedKeyValueStoreWithHeaders<K, V>
@Override
public KeyValueIterator<K, ValueTimestampHeaders<V>> reverseAll() {
- return new MeteredValueTimestampHeadersIterator(
+ return new MeteredTimestampedKeyValueStoreWithHeadersIterator(
wrapped().reverseAll(),
allSensor
);
}
@SuppressWarnings("unchecked")
- private class MeteredTimestampedKeyValueStoreWithHeadersIterator
implements KeyValueIterator<K, V>, MeteredIterator {
+ private class MeteredTimestampedKeyValueStoreWithHeadersQueryIterator
implements KeyValueIterator<K, V>, MeteredIterator {
private final KeyValueIterator<Bytes, byte[]> iter;
private final Sensor sensor;
@@ -486,10 +519,12 @@ public class
MeteredTimestampedKeyValueStoreWithHeaders<K, V>
private final boolean returnPlainValue;
private KeyValue<K, V> cachedNext;
- private MeteredTimestampedKeyValueStoreWithHeadersIterator(final
KeyValueIterator<Bytes, byte[]> iter,
- final
Sensor sensor,
- final
Function<byte[], ValueTimestampHeaders<V>> valueTimestampHeadersDeserializer,
- final
boolean returnPlainValue) {
+ private MeteredTimestampedKeyValueStoreWithHeadersQueryIterator(
+ final KeyValueIterator<Bytes, byte[]> iter,
+ final Sensor sensor,
+ final Function<byte[], ValueTimestampHeaders<V>>
valueTimestampHeadersDeserializer,
+ final boolean returnPlainValue
+ ) {
this.iter = iter;
this.sensor = sensor;
this.valueTimestampHeadersDeserializer =
valueTimestampHeadersDeserializer;
@@ -519,21 +554,15 @@ public class
MeteredTimestampedKeyValueStoreWithHeaders<K, V>
final KeyValue<Bytes, byte[]> keyValue = iter.next();
final ValueTimestampHeaders<V> valueTimestampHeaders =
valueTimestampHeadersDeserializer.apply(keyValue.value);
- final Headers headers = valueTimestampHeaders != null ?
valueTimestampHeaders.headers() : new RecordHeaders();
+ final Headers headers = valueTimestampHeaders.headers();
+
if (returnPlainValue) {
- final V plainValue = valueTimestampHeaders == null ? null :
valueTimestampHeaders.value();
- return KeyValue.pair(
- serdes.keyFrom(keyValue.key.get(), headers),
- plainValue
- );
+ return KeyValue.pair(deserializeKey(keyValue.key.get(),
headers), valueTimestampHeaders.value());
} else {
// Return as ValueAndTimestamp
- final ValueAndTimestamp<V> valueAndTimestamp =
valueTimestampHeaders == null
- ? null
- : ValueAndTimestamp.make(valueTimestampHeaders.value(),
valueTimestampHeaders.timestamp());
return KeyValue.pair(
- serdes.keyFrom(keyValue.key.get(), headers),
- (V) valueAndTimestamp
+ deserializeKey(keyValue.key.get(), headers),
+ (V) ValueAndTimestamp.make(valueTimestampHeaders.value(),
valueTimestampHeaders.timestamp())
);
}
}
@@ -559,15 +588,17 @@ public class
MeteredTimestampedKeyValueStoreWithHeaders<K, V>
}
}
- private class MeteredValueTimestampHeadersIterator implements
KeyValueIterator<K, ValueTimestampHeaders<V>>, MeteredIterator {
+ private class MeteredTimestampedKeyValueStoreWithHeadersIterator
implements KeyValueIterator<K, ValueTimestampHeaders<V>>, MeteredIterator {
private final KeyValueIterator<Bytes, byte[]> iter;
private final Sensor sensor;
private final long startNs;
private final long startTimestampMs;
private KeyValue<K, ValueTimestampHeaders<V>> cachedNext;
- private MeteredValueTimestampHeadersIterator(final
KeyValueIterator<Bytes, byte[]> iter,
- final Sensor sensor) {
+ private MeteredTimestampedKeyValueStoreWithHeadersIterator(
+ final KeyValueIterator<Bytes, byte[]> iter,
+ final Sensor sensor
+ ) {
this.iter = iter;
this.sensor = sensor;
this.startNs = time.nanoseconds();
@@ -595,9 +626,8 @@ public class MeteredTimestampedKeyValueStoreWithHeaders<K,
V>
}
final KeyValue<Bytes, byte[]> keyValue = iter.next();
- final ValueTimestampHeaders<V> valueTimestampHeaders =
serdes.valueFrom(keyValue.value, new RecordHeaders());
- final Headers headers = valueTimestampHeaders != null ?
valueTimestampHeaders.headers() : new RecordHeaders();
- final K key = serdes.keyFrom(keyValue.key.get(), headers);
+ final ValueTimestampHeaders<V> valueTimestampHeaders =
deserializeValue(keyValue.value);
+ final K key = deserializeKey(keyValue.key.get(),
valueTimestampHeaders.headers());
return KeyValue.pair(key, valueTimestampHeaders);
}
@@ -623,7 +653,21 @@ public class MeteredTimestampedKeyValueStoreWithHeaders<K,
V>
}
}
- protected Bytes keyBytes(final K key, final Headers headers) {
+ @Override
+ protected Bytes serializeKey(final K key) {
+ throw new
UnsupportedOperationException("MeteredTimestampedKeyValueStoreWithHeaders
required to pass in Headers when serializing a key.");
+ }
+
+ protected Bytes serializeKey(final K key, final Headers headers) {
return Bytes.wrap(serdes.rawKey(key, headers));
}
+
+ @Override
+ protected K deserializeKey(final byte[] rawKey) {
+ throw new
UnsupportedOperationException("MeteredTimestampedKeyValueStoreWithHeaders
required to pass in Headers when deserializing a key.");
+ }
+
+ protected K deserializeKey(final byte[] rawKey, final Headers headers) {
+ return serdes.keyFrom(rawKey, headers);
+ }
}
\ No newline at end of file
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java
index c98c5590d7a..0cb35fc63b7 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java
@@ -65,8 +65,6 @@ import static
org.apache.kafka.streams.processor.internals.metrics.StreamsMetric
* @param <K> The key type
* @param <V> The (raw) value type
*/
-// TODO: replace with new method in follow-up PR of KIP-1271
-@SuppressWarnings("deprecation")
public class MeteredVersionedKeyValueStore<K, V>
extends WrappedStateStore<VersionedBytesStore, K, V>
implements VersionedKeyValueStore<K, V> {
@@ -145,7 +143,15 @@ public class MeteredVersionedKeyValueStore<K, V>
public long put(final K key, final V value, final long timestamp) {
Objects.requireNonNull(key, "key cannot be null");
try {
- final long validTo = maybeMeasureLatency(() ->
inner.put(serializeKey(key), plainValueSerdes.rawValue(value), timestamp),
time, putSensor);
+ final long validTo = maybeMeasureLatency(
+ () -> inner.put(
+ serializeKey(key),
+ plainValueSerdes.rawValue(value,
internalContext.headers()),
+ timestamp
+ ),
+ time,
+ putSensor
+ );
maybeRecordE2ELatency();
return validTo;
} catch (final ProcessorStateException e) {
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
index 4bcbe0089c9..3a3e79570ca 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
@@ -17,6 +17,7 @@
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Bytes;
@@ -188,7 +189,7 @@ public final class StoreQueryUtils {
return true;
}
- @SuppressWarnings("unchecked")
+ @SuppressWarnings({"unchecked", "resource"})
private static <R> QueryResult<R> runRangeQuery(
final Query<R> query,
final PositionBound positionBound,
@@ -409,7 +410,7 @@ public final class StoreQueryUtils {
}
}
- @SuppressWarnings({"unchecked", "rawtypes"})
+ @SuppressWarnings({"unchecked", "rawtypes", "resource"})
public static <V> Function<byte[], V> deserializeValue(final
StateSerdes<?, V> serdes, final StateStore wrapped) {
final Serde<V> valueSerde = serdes.valueSerde();
final boolean timestamped = WrappedStateStore.isTimestamped(wrapped)
|| isAdapter(wrapped);
@@ -421,9 +422,11 @@ public final class StoreQueryUtils {
} else {
deserializer = valueSerde.deserializer();
}
- return byteArray -> deserializer.deserialize(serdes.topic(),
byteArray);
+ // deserializeValue() is only used via IQ, so it's ok to not pass any
headers
+ return byteArray -> deserializer.deserialize(serdes.topic(), new
RecordHeaders(), byteArray);
}
+ @SuppressWarnings("rawtypes")
public static boolean isAdapter(final StateStore stateStore) {
if (stateStore instanceof
KeyValueToTimestampedKeyValueByteStoreAdapter) {
return true;
@@ -434,22 +437,30 @@ public final class StoreQueryUtils {
}
}
- @SuppressWarnings({"unchecked", "rawtypes"})
+ @SuppressWarnings("resource")
public static <V> Function<VersionedRecord<byte[]>, VersionedRecord<V>>
deserializeValue(final StateSerdes<?, V> serdes) {
final Serde<V> valueSerde = serdes.valueSerde();
final Deserializer<V> deserializer = valueSerde.deserializer();
- return rawVersionedRecord -> rawVersionedRecord.validTo().isPresent()
? new VersionedRecord<>(deserializer.deserialize(serdes.topic(),
rawVersionedRecord.value()),
-
rawVersionedRecord.timestamp(),
-
rawVersionedRecord.validTo().get())
-
: new VersionedRecord<>(deserializer.deserialize(serdes.topic(),
rawVersionedRecord.value()),
-
rawVersionedRecord.timestamp());
+ return rawVersionedRecord -> rawVersionedRecord.validTo().isPresent()
+ ? new VersionedRecord<>(
+ // deserializeValue s only used via IQ, so it's ok to not pass
any headers
+ deserializer.deserialize(serdes.topic(), new RecordHeaders(),
rawVersionedRecord.value()),
+ rawVersionedRecord.timestamp(),
+ rawVersionedRecord.validTo().get())
+ : new VersionedRecord<>(
+ // deserializeValue s only used via IQ, so it's ok to not pass
any headers
+ deserializer.deserialize(serdes.topic(), new RecordHeaders(),
rawVersionedRecord.value()),
+ rawVersionedRecord.timestamp());
}
+ @SuppressWarnings("resource")
public static <V> VersionedRecord<V> deserializeVersionedRecord(final
StateSerdes<?, V> serdes, final VersionedRecord<byte[]> rawVersionedRecord) {
final Deserializer<V> valueDeserializer = serdes.valueDeserializer();
- final V value = valueDeserializer.deserialize(serdes.topic(),
rawVersionedRecord.value());
- return rawVersionedRecord.validTo().isPresent() ? new
VersionedRecord<>(value, rawVersionedRecord.timestamp(),
rawVersionedRecord.validTo().get())
- : new
VersionedRecord<>(value, rawVersionedRecord.timestamp());
+ // deserializeValue s only used via IQ, so it's ok to not pass any
headers
+ final V value = valueDeserializer.deserialize(serdes.topic(), new
RecordHeaders(), rawVersionedRecord.value());
+ return rawVersionedRecord.validTo().isPresent()
+ ? new VersionedRecord<>(value, rawVersionedRecord.timestamp(),
rawVersionedRecord.validTo().get())
+ : new VersionedRecord<>(value, rawVersionedRecord.timestamp());
}
public static void checkpointPosition(final OffsetCheckpoint
checkpointFile, final Position position) {
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeadersTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeadersTest.java
index fc340282988..58e6ee45e9e 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeadersTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeadersTest.java
@@ -529,6 +529,7 @@ public class MeteredTimestampedKeyValueStoreWithHeadersTest
{
keySerde,
valueSerde
);
+ when(context.headers()).thenReturn(new RecordHeaders());
mockStore.init(context, mockStore);
return mockStore;
}