This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 826755a9b4f MINOR: improve generic type safety for IQv2 in
metered-store layer (#21683)
826755a9b4f is described below
commit 826755a9b4f303021f4141977e49f12ccfde1ef8
Author: Matthias J. Sax <[email protected]>
AuthorDate: Fri Mar 20 14:07:04 2026 -0700
MINOR: improve generic type safety for IQv2 in metered-store layer (#21683)
Reviewers: TengYao Chi <[email protected]>
---
.../state/internals/MeteredKeyValueStore.java | 23 +--
.../state/internals/MeteredSessionStore.java | 42 ++--
.../internals/MeteredTimestampedKeyValueStore.java | 131 +++++++------
...MeteredTimestampedKeyValueStoreWithHeaders.java | 139 +++++++-------
.../MeteredTimestampedWindowStoreWithHeaders.java | 213 +++++++++++----------
.../internals/MeteredVersionedKeyValueStore.java | 103 ++++++----
.../state/internals/MeteredWindowStore.java | 59 +++---
.../streams/state/internals/StoreQueryUtils.java | 82 ++++----
...angeLoggingTimestampedWindowBytesStoreTest.java | 8 +-
.../ChangeLoggingWindowBytesStoreTest.java | 8 +-
10 files changed, 416 insertions(+), 392 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
index d94b8fd5253..5586b71d41b 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
@@ -103,8 +103,7 @@ public class MeteredKeyValueStore<K, V>
protected NavigableSet<MeteredIterator> openIterators = new
ConcurrentSkipListSet<>(Comparator.comparingLong(MeteredIterator::startTimestamp));
- @SuppressWarnings("rawtypes")
- private final Map<Class, QueryHandler> queryHandlers =
+ private final Map<Class<?>, QueryHandler<?>> queryHandlers =
mkMap(
mkEntry(
RangeQuery.class,
@@ -230,31 +229,29 @@ public class MeteredKeyValueStore<K, V>
@SuppressWarnings("unchecked")
@Override
- public <R> QueryResult<R> query(final Query<R> query,
- final PositionBound positionBound,
- final QueryConfig config) {
-
+ public <R> QueryResult<R> query(
+ final Query<R> query,
+ final PositionBound positionBound,
+ final QueryConfig config
+ ) {
final long start = time.nanoseconds();
final QueryResult<R> result;
- final QueryHandler handler = queryHandlers.get(query.getClass());
+ final QueryHandler<?> handler = queryHandlers.get(query.getClass());
if (handler == null) {
result = wrapped().query(query, positionBound, config);
if (config.isCollectExecutionInfo()) {
- result.addExecutionInfo(
- "Handled in " + getClass() + " in " + (time.nanoseconds()
- start) + "ns");
+ result.addExecutionInfo("Handled in " + getClass() + " in " +
(time.nanoseconds() - start) + "ns");
}
} else {
- result = (QueryResult<R>) handler.apply(
+ result = ((QueryHandler<R>) handler).apply(
query,
positionBound,
config,
this
);
if (config.isCollectExecutionInfo()) {
- result.addExecutionInfo(
- "Handled in " + getClass() + " with serdes "
- + serdes + " in " + (time.nanoseconds() - start) +
"ns");
+ result.addExecutionInfo("Handled in " + getClass() + " with
serdes " + serdes + " in " + (time.nanoseconds() - start) + "ns");
}
}
return result;
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
index 9b4f05e44cf..5d678cbfd25 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
@@ -84,14 +84,13 @@ public class MeteredSessionStore<K, V>
protected final LongAdder numOpenIterators = new LongAdder();
protected final NavigableSet<MeteredIterator> openIterators = new
ConcurrentSkipListSet<>(Comparator.comparingLong(MeteredIterator::startTimestamp));
- @SuppressWarnings("rawtypes")
- private final Map<Class, QueryHandler> queryHandlers =
- mkMap(
- mkEntry(
- WindowRangeQuery.class,
- (query, positionBound, config, store) ->
runRangeQuery(query, positionBound, config)
- )
- );
+ private final Map<Class<?>, QueryHandler<?>> queryHandlers =
+ mkMap(
+ mkEntry(
+ WindowRangeQuery.class,
+ (query, positionBound, config, store) -> runRangeQuery(query,
positionBound, config)
+ )
+ );
MeteredSessionStore(final SessionStore<Bytes, byte[]> inner,
@@ -445,39 +444,40 @@ public class MeteredSessionStore<K, V>
@SuppressWarnings("unchecked")
@Override
- public <R> QueryResult<R> query(final Query<R> query,
- final PositionBound positionBound,
- final QueryConfig config) {
+ public <R> QueryResult<R> query(
+ final Query<R> query,
+ final PositionBound positionBound,
+ final QueryConfig config
+ ) {
final long start = time.nanoseconds();
final QueryResult<R> result;
- final QueryHandler handler = queryHandlers.get(query.getClass());
+ final QueryHandler<?> handler = queryHandlers.get(query.getClass());
if (handler == null) {
result = wrapped().query(query, positionBound, config);
if (config.isCollectExecutionInfo()) {
- result.addExecutionInfo(
- "Handled in " + getClass() + " in " + (time.nanoseconds()
- start) + "ns");
+ result.addExecutionInfo("Handled in " + getClass() + " in " +
(time.nanoseconds() - start) + "ns");
}
} else {
- result = (QueryResult<R>) handler.apply(
+ result = ((QueryHandler<R>) handler).apply(
query,
positionBound,
config,
this
);
if (config.isCollectExecutionInfo()) {
- result.addExecutionInfo(
- "Handled in " + getClass() + " with serdes "
- + serdes + " in " + (time.nanoseconds() - start) +
"ns");
+ result.addExecutionInfo("Handled in " + getClass() + " with
serdes " + serdes + " in " + (time.nanoseconds() - start) + "ns");
}
}
return result;
}
@SuppressWarnings("unchecked")
- private <R> QueryResult<R> runRangeQuery(final Query<R> query,
- final PositionBound positionBound,
- final QueryConfig config) {
+ private <R> QueryResult<R> runRangeQuery(
+ final Query<R> query,
+ final PositionBound positionBound,
+ final QueryConfig config
+ ) {
final QueryResult<R> result;
final WindowRangeQuery<K, V> typedQuery = (WindowRangeQuery<K, V>)
query;
if (typedQuery.getKey().isPresent()) {
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java
index bbc7214a842..72caa56d88e 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java
@@ -37,6 +37,7 @@ import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.internals.StoreQueryUtils.QueryHandler;
import java.util.Map;
import java.util.function.Function;
@@ -67,26 +68,25 @@ public class MeteredTimestampedKeyValueStore<K, V>
super(inner, metricScope, time, keySerde, valueSerde);
}
- @SuppressWarnings("rawtypes")
- private final Map<Class, StoreQueryUtils.QueryHandler> queryHandlers =
- mkMap(
- mkEntry(
- RangeQuery.class,
- (query, positionBound, config, store) ->
runRangeQuery(query, positionBound, config)
- ),
- mkEntry(
- TimestampedRangeQuery.class,
- (query, positionBound, config, store) ->
runTimestampedRangeQuery(query, positionBound, config)
- ),
- mkEntry(
- KeyQuery.class,
- (query, positionBound, config, store) ->
runKeyQuery(query, positionBound, config)
- ),
- mkEntry(
- TimestampedKeyQuery.class,
- (query, positionBound, config, store) ->
runTimestampedKeyQuery(query, positionBound, config)
- )
- );
+ private final Map<Class<?>, QueryHandler<?>> queryHandlers =
+ mkMap(
+ mkEntry(
+ RangeQuery.class,
+ (query, positionBound, config, store) -> runRangeQuery(query,
positionBound, config)
+ ),
+ mkEntry(
+ TimestampedRangeQuery.class,
+ (query, positionBound, config, store) ->
runTimestampedRangeQuery(query, positionBound, config)
+ ),
+ mkEntry(
+ KeyQuery.class,
+ (query, positionBound, config, store) -> runKeyQuery(query,
positionBound, config)
+ ),
+ mkEntry(
+ TimestampedKeyQuery.class,
+ (query, positionBound, config, store) ->
runTimestampedKeyQuery(query, positionBound, config)
+ )
+ );
@SuppressWarnings("unchecked")
@Override
@@ -134,40 +134,40 @@ public class MeteredTimestampedKeyValueStore<K, V>
@SuppressWarnings("unchecked")
@Override
- public <R> QueryResult<R> query(final Query<R> query,
- final PositionBound positionBound,
- final QueryConfig config) {
-
+ public <R> QueryResult<R> query(
+ final Query<R> query,
+ final PositionBound positionBound,
+ final QueryConfig config
+ ) {
final long start = time.nanoseconds();
final QueryResult<R> result;
- final StoreQueryUtils.QueryHandler handler =
queryHandlers.get(query.getClass());
+ final QueryHandler<?> handler = queryHandlers.get(query.getClass());
if (handler == null) {
result = wrapped().query(query, positionBound, config);
if (config.isCollectExecutionInfo()) {
- result.addExecutionInfo(
- "Handled in " + getClass() + " in " +
(time.nanoseconds() - start) + "ns");
+ result.addExecutionInfo("Handled in " + getClass() + " in " +
(time.nanoseconds() - start) + "ns");
}
} else {
- result = (QueryResult<R>) handler.apply(
- query,
- positionBound,
- config,
- this
+ result = ((QueryHandler<R>) handler).apply(
+ query,
+ positionBound,
+ config,
+ this
);
if (config.isCollectExecutionInfo()) {
- result.addExecutionInfo(
- "Handled in " + getClass() + " with serdes "
- + serdes + " in " + (time.nanoseconds() -
start) + "ns");
+ result.addExecutionInfo("Handled in " + getClass() + " with
serdes " + serdes + " in " + (time.nanoseconds() - start) + "ns");
}
}
return result;
}
@SuppressWarnings("unchecked")
- private <R> QueryResult<R> runTimestampedKeyQuery(final Query<R> query,
- final PositionBound
positionBound,
- final QueryConfig
config) {
+ private <R> QueryResult<R> runTimestampedKeyQuery(
+ final Query<R> query,
+ final PositionBound positionBound,
+ final QueryConfig config
+ ) {
final QueryResult<R> result;
final TimestampedKeyQuery<K, V> typedKeyQuery =
(TimestampedKeyQuery<K, V>) query;
final KeyQuery<Bytes, byte[]> rawKeyQuery =
KeyQuery.withKey(serializeKey(typedKeyQuery.key()));
@@ -176,7 +176,7 @@ public class MeteredTimestampedKeyValueStore<K, V>
final Function<byte[], ValueAndTimestamp<V>> deserializer =
StoreQueryUtils.deserializeValue(serdes, wrapped());
final ValueAndTimestamp<V> valueAndTimestamp =
deserializer.apply(rawResult.getResult());
final QueryResult<ValueAndTimestamp<V>> typedQueryResult =
-
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult,
valueAndTimestamp);
+
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult,
valueAndTimestamp);
result = (QueryResult<R>) typedQueryResult;
} else {
// the generic type doesn't matter, since failed queries have no
result set.
@@ -186,10 +186,11 @@ public class MeteredTimestampedKeyValueStore<K, V>
}
@SuppressWarnings("unchecked")
- private <R> QueryResult<R> runTimestampedRangeQuery(final Query<R> query,
- final PositionBound
positionBound,
- final QueryConfig
config) {
-
+ private <R> QueryResult<R> runTimestampedRangeQuery(
+ final Query<R> query,
+ final PositionBound positionBound,
+ final QueryConfig config
+ ) {
final QueryResult<R> result;
final TimestampedRangeQuery<K, V> typedQuery =
(TimestampedRangeQuery<K, V>) query;
RangeQuery<Bytes, byte[]> rawRangeQuery;
@@ -205,20 +206,21 @@ public class MeteredTimestampedKeyValueStore<K, V>
rawRangeQuery = rawRangeQuery.withAscendingKeys();
}
final QueryResult<KeyValueIterator<Bytes, byte[]>> rawResult =
- wrapped().query(rawRangeQuery, positionBound, config);
+ wrapped().query(rawRangeQuery, positionBound, config);
if (rawResult.isSuccess()) {
final KeyValueIterator<Bytes, byte[]> iterator =
rawResult.getResult();
- final KeyValueIterator<K, ValueAndTimestamp<V>> resultIterator =
(KeyValueIterator<K, ValueAndTimestamp<V>>) new
MeteredTimestampedKeyValueStoreIterator(
+ final KeyValueIterator<K, ValueAndTimestamp<V>> resultIterator =
+ (KeyValueIterator<K, ValueAndTimestamp<V>>) new
MeteredTimestampedKeyValueStoreIterator(
iterator,
getSensor,
StoreQueryUtils.deserializeValue(serdes, wrapped()),
false
- );
+ );
final QueryResult<KeyValueIterator<K, ValueAndTimestamp<V>>>
typedQueryResult =
-
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(
- rawResult,
- resultIterator
- );
+ InternalQueryResultUtil.copyAndSubstituteDeserializedResult(
+ rawResult,
+ resultIterator
+ );
result = (QueryResult<R>) typedQueryResult;
} else {
// the generic type doesn't matter, since failed queries have no
result set.
@@ -228,9 +230,11 @@ public class MeteredTimestampedKeyValueStore<K, V>
}
@SuppressWarnings("unchecked")
- private <R> QueryResult<R> runKeyQuery(final Query<R> query,
- final PositionBound positionBound,
- final QueryConfig config) {
+ private <R> QueryResult<R> runKeyQuery(
+ final Query<R> query,
+ final PositionBound positionBound,
+ final QueryConfig config
+ ) {
final QueryResult<R> result;
final KeyQuery<K, V> typedKeyQuery = (KeyQuery<K, V>) query;
final KeyQuery<Bytes, byte[]> rawKeyQuery =
KeyQuery.withKey(serializeKey(typedKeyQuery.getKey()));
@@ -241,7 +245,7 @@ public class MeteredTimestampedKeyValueStore<K, V>
final ValueAndTimestamp<V> valueAndTimestamp =
deserializer.apply(rawResult.getResult());
final V plainValue = valueAndTimestamp == null ? null :
valueAndTimestamp.value();
final QueryResult<V> typedQueryResult =
-
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult,
plainValue);
+
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult,
plainValue);
result = (QueryResult<R>) typedQueryResult;
} else {
// the generic type doesn't matter, since failed queries have no
result set.
@@ -251,10 +255,11 @@ public class MeteredTimestampedKeyValueStore<K, V>
}
@SuppressWarnings("unchecked")
- private <R> QueryResult<R> runRangeQuery(final Query<R> query,
- final PositionBound
positionBound,
- final QueryConfig config) {
-
+ private <R> QueryResult<R> runRangeQuery(
+ final Query<R> query,
+ final PositionBound positionBound,
+ final QueryConfig config
+ ) {
final QueryResult<R> result;
final RangeQuery<K, V> typedQuery = (RangeQuery<K, V>) query;
RangeQuery<Bytes, byte[]> rawRangeQuery;
@@ -270,7 +275,7 @@ public class MeteredTimestampedKeyValueStore<K, V>
rawRangeQuery = rawRangeQuery.withAscendingKeys();
}
final QueryResult<KeyValueIterator<Bytes, byte[]>> rawResult =
- wrapped().query(rawRangeQuery, positionBound, config);
+ wrapped().query(rawRangeQuery, positionBound, config);
if (rawResult.isSuccess()) {
final KeyValueIterator<Bytes, byte[]> iterator =
rawResult.getResult();
final KeyValueIterator<K, V> resultIterator = new
MeteredTimestampedKeyValueStoreIterator(
@@ -280,10 +285,10 @@ public class MeteredTimestampedKeyValueStore<K, V>
true
);
final QueryResult<KeyValueIterator<K, V>> typedQueryResult =
-
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(
- rawResult,
- resultIterator
- );
+ InternalQueryResultUtil.copyAndSubstituteDeserializedResult(
+ rawResult,
+ resultIterator
+ );
result = (QueryResult<R>) typedQueryResult;
} else {
// the generic type doesn't matter, since failed queries have no
result set.
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java
index f77471ad905..9d8ef5f3fd4 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java
@@ -42,6 +42,7 @@ import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.TimestampedKeyValueStoreWithHeaders;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.ValueTimestampHeaders;
+import org.apache.kafka.streams.state.internals.StoreQueryUtils.QueryHandler;
import java.util.ArrayList;
import java.util.List;
@@ -69,16 +70,17 @@ public class MeteredTimestampedKeyValueStoreWithHeaders<K,
V>
extends MeteredKeyValueStore<K, ValueTimestampHeaders<V>>
implements TimestampedKeyValueStoreWithHeaders<K, V> {
- MeteredTimestampedKeyValueStoreWithHeaders(final KeyValueStore<Bytes,
byte[]> inner,
- final String metricScope,
- final Time time,
- final Serde<K> keySerde,
- final
Serde<ValueTimestampHeaders<V>> valueSerde) {
+ MeteredTimestampedKeyValueStoreWithHeaders(
+ final KeyValueStore<Bytes, byte[]> inner,
+ final String metricScope,
+ final Time time,
+ final Serde<K> keySerde,
+ final Serde<ValueTimestampHeaders<V>> valueSerde
+ ) {
super(inner, metricScope, time, keySerde, valueSerde);
}
- @SuppressWarnings("rawtypes")
- private final Map<Class, StoreQueryUtils.QueryHandler> queryHandlers =
+ private final Map<Class<?>, QueryHandler<?>> queryHandlers =
mkMap(
mkEntry(
KeyQuery.class,
@@ -284,55 +286,54 @@ public class
MeteredTimestampedKeyValueStoreWithHeaders<K, V>
* @param config the query configuration
* @return the query result
*/
-
@SuppressWarnings("unchecked")
@Override
- public <R> QueryResult<R> query(final Query<R> query,
- final PositionBound positionBound,
- final QueryConfig config) {
-
+ public <R> QueryResult<R> query(
+ final Query<R> query,
+ final PositionBound positionBound,
+ final QueryConfig config
+ ) {
final long start = time.nanoseconds();
final QueryResult<R> result;
- final StoreQueryUtils.QueryHandler handler =
queryHandlers.get(query.getClass());
+ final QueryHandler<?> handler = queryHandlers.get(query.getClass());
if (handler == null) {
result = wrapped().query(query, positionBound, config);
if (config.isCollectExecutionInfo()) {
- result.addExecutionInfo(
- "Handled in " + getClass() + " in " + (time.nanoseconds()
- start) + "ns");
+ result.addExecutionInfo("Handled in " + getClass() + " in " +
(time.nanoseconds() - start) + "ns");
}
} else {
- result = (QueryResult<R>) handler.apply(
+ result = ((QueryHandler<R>) handler).apply(
query,
positionBound,
config,
this
);
if (config.isCollectExecutionInfo()) {
- result.addExecutionInfo("Handled in " + getClass() + " with
serdes "
- + serdes + " in " + (time.nanoseconds() - start) +
"ns");
+ result.addExecutionInfo("Handled in " + getClass() + " with
serdes " + serdes + " in " + (time.nanoseconds() - start) + "ns");
}
}
return result;
}
@SuppressWarnings("unchecked")
- private <R> QueryResult<R> runKeyQuery(final Query<R> query,
- final PositionBound positionBound,
- final QueryConfig config) {
+ private <R> QueryResult<R> runKeyQuery(
+ final Query<R> query,
+ final PositionBound positionBound,
+ final QueryConfig config
+ ) {
final QueryResult<R> result;
final KeyQuery<K, V> typedKeyQuery = (KeyQuery<K, V>) query;
- final KeyQuery<Bytes, byte[]> rawKeyQuery =
- KeyQuery.withKey(serializeKey(typedKeyQuery.getKey(),
internalContext.headers()));
- final QueryResult<byte[]> rawResult =
- wrapped().query(rawKeyQuery, positionBound, config);
+
+ final KeyQuery<Bytes, byte[]> rawKeyQuery =
KeyQuery.withKey(serializeKey(typedKeyQuery.getKey(),
internalContext.headers()));
+ final QueryResult<byte[]> rawResult = wrapped().query(rawKeyQuery,
positionBound, config);
if (rawResult.isSuccess()) {
// value will be `rawValueTimestampHeader`; no need to pass
headers explicitly
final Function<byte[], ValueTimestampHeaders<V>> deserializer =
StoreQueryUtils.deserializeValue(serdes, wrapped());
final ValueTimestampHeaders<V> valueTimestampHeaders =
deserializer.apply(rawResult.getResult());
final V plainValue = valueTimestampHeaders == null ? null :
valueTimestampHeaders.value();
final QueryResult<V> typedQueryResult =
-
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult,
plainValue);
+
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult,
plainValue);
result = (QueryResult<R>) typedQueryResult;
} else {
// the generic type doesn't matter, since failed queries have no
result set.
@@ -342,25 +343,27 @@ public class
MeteredTimestampedKeyValueStoreWithHeaders<K, V>
}
@SuppressWarnings("unchecked")
- private <R> QueryResult<R> runTimestampedKeyQuery(final Query<R> query,
- final PositionBound
positionBound,
- final QueryConfig
config) {
+ private <R> QueryResult<R> runTimestampedKeyQuery(
+ final Query<R> query,
+ final PositionBound positionBound,
+ final QueryConfig config
+ ) {
final QueryResult<R> result;
final TimestampedKeyQuery<K, V> typedKeyQuery =
(TimestampedKeyQuery<K, V>) query;
- final KeyQuery<Bytes, byte[]> rawKeyQuery =
- KeyQuery.withKey(serializeKey(typedKeyQuery.key(),
internalContext.headers()));
- final QueryResult<byte[]> rawResult =
- wrapped().query(rawKeyQuery, positionBound, config);
+
+ final KeyQuery<Bytes, byte[]> rawKeyQuery =
KeyQuery.withKey(serializeKey(typedKeyQuery.key(), internalContext.headers()));
+ final QueryResult<byte[]> rawResult = wrapped().query(rawKeyQuery,
positionBound, config);
if (rawResult.isSuccess()) {
// value will be `rawValueTimestampHeader`; no need to pass
headers explicitly
final Function<byte[], ValueTimestampHeaders<V>> deserializer =
StoreQueryUtils.deserializeValue(serdes, wrapped());
final ValueTimestampHeaders<V> valueTimestampHeaders =
deserializer.apply(rawResult.getResult());
// Convert ValueTimestampHeaders to ValueAndTimestamp for the
result
- final ValueAndTimestamp<V> valueAndTimestamp =
valueTimestampHeaders == null
+ final ValueAndTimestamp<V> valueAndTimestamp =
+ valueTimestampHeaders == null
? null
: ValueAndTimestamp.make(valueTimestampHeaders.value(),
valueTimestampHeaders.timestamp());
final QueryResult<ValueAndTimestamp<V>> typedQueryResult =
-
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult,
valueAndTimestamp);
+
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult,
valueAndTimestamp);
result = (QueryResult<R>) typedQueryResult;
} else {
// the generic type doesn't matter, since failed queries have no
result set.
@@ -370,12 +373,14 @@ public class
MeteredTimestampedKeyValueStoreWithHeaders<K, V>
}
@SuppressWarnings("unchecked")
- private <R> QueryResult<R> runRangeQuery(final Query<R> query,
- final PositionBound positionBound,
- final QueryConfig config) {
-
+ private <R> QueryResult<R> runRangeQuery(
+ final Query<R> query,
+ final PositionBound positionBound,
+ final QueryConfig config
+ ) {
final QueryResult<R> result;
final RangeQuery<K, V> typedQuery = (RangeQuery<K, V>) query;
+
RangeQuery<Bytes, byte[]> rawRangeQuery;
final ResultOrder order = typedQuery.resultOrder();
rawRangeQuery = RangeQuery.withRange(
@@ -388,21 +393,21 @@ public class
MeteredTimestampedKeyValueStoreWithHeaders<K, V>
if (order.equals(ResultOrder.ASCENDING)) {
rawRangeQuery = rawRangeQuery.withAscendingKeys();
}
- final QueryResult<KeyValueIterator<Bytes, byte[]>> rawResult =
- wrapped().query(rawRangeQuery, positionBound, config);
+
+ final QueryResult<KeyValueIterator<Bytes, byte[]>> rawResult =
wrapped().query(rawRangeQuery, positionBound, config);
if (rawResult.isSuccess()) {
final KeyValueIterator<Bytes, byte[]> iterator =
rawResult.getResult();
final KeyValueIterator<K, V> resultIterator = new
MeteredTimestampedKeyValueStoreWithHeadersQueryIterator(
- iterator,
- getSensor,
- // value will be `rawValueTimestampHeader`; no need to
pass headers explicitly
- StoreQueryUtils.deserializeValue(serdes, wrapped()),
- true
+ iterator,
+ getSensor,
+ // value will be `rawValueTimestampHeader`; no need to pass
headers explicitly
+ StoreQueryUtils.deserializeValue(serdes, wrapped()),
+ true
);
final QueryResult<KeyValueIterator<K, V>> typedQueryResult =
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(
- rawResult,
- resultIterator
+ rawResult,
+ resultIterator
);
result = (QueryResult<R>) typedQueryResult;
} else {
@@ -413,12 +418,14 @@ public class
MeteredTimestampedKeyValueStoreWithHeaders<K, V>
}
@SuppressWarnings("unchecked")
- private <R> QueryResult<R> runTimestampedRangeQuery(final Query<R> query,
- final PositionBound
positionBound,
- final QueryConfig
config) {
-
+ private <R> QueryResult<R> runTimestampedRangeQuery(
+ final Query<R> query,
+ final PositionBound positionBound,
+ final QueryConfig config
+ ) {
final QueryResult<R> result;
final TimestampedRangeQuery<K, V> typedQuery =
(TimestampedRangeQuery<K, V>) query;
+
RangeQuery<Bytes, byte[]> rawRangeQuery;
final ResultOrder order = typedQuery.resultOrder();
rawRangeQuery = RangeQuery.withRange(
@@ -431,23 +438,23 @@ public class
MeteredTimestampedKeyValueStoreWithHeaders<K, V>
if (order.equals(ResultOrder.ASCENDING)) {
rawRangeQuery = rawRangeQuery.withAscendingKeys();
}
- final QueryResult<KeyValueIterator<Bytes, byte[]>> rawResult =
- wrapped().query(rawRangeQuery, positionBound, config);
+
+ final QueryResult<KeyValueIterator<Bytes, byte[]>> rawResult =
wrapped().query(rawRangeQuery, positionBound, config);
if (rawResult.isSuccess()) {
final KeyValueIterator<Bytes, byte[]> iterator =
rawResult.getResult();
final KeyValueIterator<K, ValueAndTimestamp<V>> resultIterator =
- (KeyValueIterator<K, ValueAndTimestamp<V>>) new
MeteredTimestampedKeyValueStoreWithHeadersQueryIterator(
- iterator,
- getSensor,
- // value will be `rawValueTimestampHeader`; no
need to pass headers explicitly
- StoreQueryUtils.deserializeValue(serdes,
wrapped()),
- false
- );
+ (KeyValueIterator<K, ValueAndTimestamp<V>>) new
MeteredTimestampedKeyValueStoreWithHeadersQueryIterator(
+ iterator,
+ getSensor,
+ // value will be `rawValueTimestampHeader`; no need to
pass headers explicitly
+ StoreQueryUtils.deserializeValue(serdes, wrapped()),
+ false
+ );
final QueryResult<KeyValueIterator<K, ValueAndTimestamp<V>>>
typedQueryResult =
-
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(
- rawResult,
- resultIterator
- );
+ InternalQueryResultUtil.copyAndSubstituteDeserializedResult(
+ rawResult,
+ resultIterator
+ );
result = (QueryResult<R>) typedQueryResult;
} else {
// the generic type doesn't matter, since failed queries have no
result set.
@@ -670,4 +677,4 @@ public class MeteredTimestampedKeyValueStoreWithHeaders<K,
V>
protected K deserializeKey(final byte[] rawKey, final Headers headers) {
return serdes.keyFrom(rawKey, headers);
}
-}
\ No newline at end of file
+}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreWithHeaders.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreWithHeaders.java
index e88ec3398d5..6dc01498ff2 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreWithHeaders.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreWithHeaders.java
@@ -46,7 +46,6 @@ import java.util.Objects;
import java.util.function.Function;
import static
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency;
-import static
org.apache.kafka.streams.state.internals.ValueTimestampHeadersDeserializer.headers;
/**
* A Metered {@link TimestampedWindowStoreWithHeaders} wrapper that is used
for recording operation metrics,
@@ -61,12 +60,14 @@ public class MeteredTimestampedWindowStoreWithHeaders<K, V>
extends MeteredWindowStore<K, ValueTimestampHeaders<V>>
implements TimestampedWindowStoreWithHeaders<K, V> {
- MeteredTimestampedWindowStoreWithHeaders(final WindowStore<Bytes, byte[]>
inner,
- final long windowSizeMs,
- final String metricScope,
- final Time time,
- final Serde<K> keySerde,
- final
Serde<ValueTimestampHeaders<V>> valueSerde) {
+ MeteredTimestampedWindowStoreWithHeaders(
+ final WindowStore<Bytes, byte[]> inner,
+ final long windowSizeMs,
+ final String metricScope,
+ final Time time,
+ final Serde<K> keySerde,
+ final Serde<ValueTimestampHeaders<V>> valueSerde
+ ) {
super(inner, windowSizeMs, metricScope, time, keySerde, valueSerde);
}
@@ -110,7 +111,7 @@ public class MeteredTimestampedWindowStoreWithHeaders<K, V>
internalContext.setRecordContext(currentContext);
}
} else {
- final Headers headers = value.headers() == null ? new
RecordHeaders() : value.headers();
+ final Headers headers = value.headers();
wrapped().put(keyBytes(key, headers),
serdes.rawValue(value, headers), windowStartTimestamp);
}
},
@@ -130,9 +131,11 @@ public class MeteredTimestampedWindowStoreWithHeaders<K, V>
@SuppressWarnings("unchecked")
@Override
- public <R> QueryResult<R> query(final Query<R> query,
- final PositionBound positionBound,
- final QueryConfig config) {
+ public <R> QueryResult<R> query(
+ final Query<R> query,
+ final PositionBound positionBound,
+ final QueryConfig config
+ ) {
final long start = config.isCollectExecutionInfo() ? System.nanoTime()
: -1L;
final QueryResult<R> result;
@@ -160,10 +163,13 @@ public class MeteredTimestampedWindowStoreWithHeaders<K,
V>
* ValueTimestampHeaders to either ValueAndTimestamp<V> (for timestamped
stores) or V (for non-timestamped stores).
*/
@SuppressWarnings("unchecked")
- private <R> QueryResult<R> runWindowKeyQuery(final WindowKeyQuery<K,
ValueTimestampHeaders<V>> query,
- final PositionBound
positionBound,
- final QueryConfig config) {
+ private <R> QueryResult<R> runWindowKeyQuery(
+ final WindowKeyQuery<K, ValueTimestampHeaders<V>> query,
+ final PositionBound positionBound,
+ final QueryConfig config
+ ) {
final QueryResult<R> queryResult;
+
if (query.getTimeFrom().isPresent() && query.getTimeTo().isPresent()) {
final WindowKeyQuery<Bytes, byte[]> rawKeyQuery =
WindowKeyQuery.withKeyAndWindowStartRange(
@@ -175,45 +181,29 @@ public class MeteredTimestampedWindowStoreWithHeaders<K,
V>
if (rawResult.isSuccess()) {
if (isUnderlyingStoreTimestamped()) {
// For timestamped stores, return ValueAndTimestamp<V>
- final Function<byte[], ValueAndTimestamp<V>> valueFrom =
bytes -> {
- final ValueTimestampHeaders<V> vth =
serdes.valueFrom(bytes, new RecordHeaders());
- return vth == null ? null :
ValueAndTimestamp.make(vth.value(), vth.timestamp());
- };
-
- final MeteredWindowStoreIterator<ValueAndTimestamp<V>>
typedResult =
- new MeteredWindowStoreIterator<>(
- rawResult.getResult(),
- fetchSensor,
- iteratorDurationSensor,
- streamsMetrics,
- valueFrom,
- time,
- numOpenIterators,
- openIterators
- );
+ final MeteredWindowStoreIterator<ValueAndTimestamp<V>>
typedResult = meteredIterator(
+ rawResult,
+ bytes -> {
+ final ValueTimestampHeaders<V> vth =
serdes.valueFrom(bytes, new RecordHeaders());
+ return vth == null ? null :
ValueAndTimestamp.make(vth.value(), vth.timestamp());
+ }
+ );
+
final
QueryResult<MeteredWindowStoreIterator<ValueAndTimestamp<V>>> typedQueryResult =
-
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult,
typedResult);
+
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult,
typedResult);
queryResult = (QueryResult<R>) typedQueryResult;
} else {
// For non-timestamped stores, return plain V
- final Function<byte[], V> valueFrom = bytes -> {
- final ValueTimestampHeaders<V> vth =
serdes.valueFrom(bytes, new RecordHeaders());
- return vth == null ? null : vth.value();
- };
-
- final MeteredWindowStoreIterator<V> typedResult =
- new MeteredWindowStoreIterator<>(
- rawResult.getResult(),
- fetchSensor,
- iteratorDurationSensor,
- streamsMetrics,
- valueFrom,
- time,
- numOpenIterators,
- openIterators
- );
+ final MeteredWindowStoreIterator<V> typedResult =
meteredIterator(
+ rawResult,
+ bytes -> {
+ final ValueTimestampHeaders<V> vth =
serdes.valueFrom(bytes, new RecordHeaders());
+ return vth == null ? null : vth.value();
+ }
+ );
+
final QueryResult<MeteredWindowStoreIterator<V>>
typedQueryResult =
-
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult,
typedResult);
+
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult,
typedResult);
queryResult = (QueryResult<R>) typedQueryResult;
}
} else {
@@ -221,80 +211,80 @@ public class MeteredTimestampedWindowStoreWithHeaders<K,
V>
}
} else {
queryResult = QueryResult.forFailure(
- FailureReason.UNKNOWN_QUERY_TYPE,
- "This store (" + getClass() + ") doesn't know how to
execute"
- + " the given query (" + query + ") because it only
supports closed-range"
- + " queries."
- + " Contact the store maintainer if you need support
for a new query type."
+ FailureReason.UNKNOWN_QUERY_TYPE,
+ "This store (" + getClass() + ") doesn't know how to execute"
+ + " the given query (" + query + ") because it only
supports closed-range"
+ + " queries."
+ + " Contact the store maintainer if you need support for a
new query type."
);
}
return queryResult;
}
+ private <ValueType> MeteredWindowStoreIterator<ValueType> meteredIterator(
+ final QueryResult<WindowStoreIterator<byte[]>> rawResult,
+ final Function<byte[], ValueType> valueDeserializer
+ ) {
+ return new MeteredWindowStoreIterator<>(
+ rawResult.getResult(),
+ fetchSensor,
+ iteratorDurationSensor,
+ streamsMetrics,
+ valueDeserializer,
+ time,
+ numOpenIterators,
+ openIterators
+ );
+ }
/**
* Handles WindowRangeQuery by creating a MeteredWindowedKeyValueIterator
with conversion from
* ValueTimestampHeaders to either ValueAndTimestamp<V> (for timestamped
stores) or V (for non-timestamped stores).
*/
@SuppressWarnings("unchecked")
- private <R> QueryResult<R> runWindowRangeQuery(final WindowRangeQuery<K,
ValueTimestampHeaders<V>> query,
- final PositionBound
positionBound,
- final QueryConfig config) {
+ private <R> QueryResult<R> runWindowRangeQuery(
+ final WindowRangeQuery<K, ValueTimestampHeaders<V>> query,
+ final PositionBound positionBound,
+ final QueryConfig config
+ ) {
final QueryResult<R> result;
+
if (query.getTimeFrom().isPresent() && query.getTimeTo().isPresent()) {
final WindowRangeQuery<Bytes, byte[]> rawKeyQuery =
- WindowRangeQuery.withWindowStartRange(
- query.getTimeFrom().get(),
- query.getTimeTo().get()
- );
- final QueryResult<KeyValueIterator<Windowed<Bytes>, byte[]>>
rawResult =
- wrapped().query(rawKeyQuery, positionBound, config);
- if (rawResult.isSuccess()) {
- final Function<byte[], K> keyFrom = bytes ->
serdes.keyFrom(bytes, new RecordHeaders());
+ WindowRangeQuery.withWindowStartRange(
+ query.getTimeFrom().get(),
+ query.getTimeTo().get()
+ );
+ final QueryResult<KeyValueIterator<Windowed<Bytes>, byte[]>>
rawResult = wrapped().query(rawKeyQuery, positionBound, config);
+ if (rawResult.isSuccess()) {
if (isUnderlyingStoreTimestamped()) {
// For timestamped stores, return ValueAndTimestamp<V>
- final Function<byte[], ValueAndTimestamp<V>> valueFrom =
bytes -> {
- final ValueTimestampHeaders<V> vth =
serdes.valueFrom(bytes, new RecordHeaders());
- return vth == null ? null :
ValueAndTimestamp.make(vth.value(), vth.timestamp());
- };
-
final MeteredWindowedKeyValueIterator<K,
ValueAndTimestamp<V>> typedResult =
- new MeteredWindowedKeyValueIterator<>(
- rawResult.getResult(),
- fetchSensor,
- iteratorDurationSensor,
- streamsMetrics,
- keyFrom,
- valueFrom,
- time,
- numOpenIterators,
- openIterators
- );
+ meteredWindowedIterator(
+ rawResult,
+ bytes -> {
+ final ValueTimestampHeaders<V> vth =
serdes.valueFrom(bytes, new RecordHeaders());
+ return vth == null ? null :
ValueAndTimestamp.make(vth.value(), vth.timestamp());
+ }
+ );
+
final QueryResult<MeteredWindowedKeyValueIterator<K,
ValueAndTimestamp<V>>> typedQueryResult =
-
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult,
typedResult);
+
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult,
typedResult);
result = (QueryResult<R>) typedQueryResult;
} else {
// For non-timestamped stores, return plain V
- final Function<byte[], V> valueFrom = bytes -> {
- final ValueTimestampHeaders<V> vth =
serdes.valueFrom(bytes, new RecordHeaders());
- return vth == null ? null : vth.value();
- };
-
final MeteredWindowedKeyValueIterator<K, V> typedResult =
- new MeteredWindowedKeyValueIterator<>(
- rawResult.getResult(),
- fetchSensor,
- iteratorDurationSensor,
- streamsMetrics,
- keyFrom,
- valueFrom,
- time,
- numOpenIterators,
- openIterators
- );
+ meteredWindowedIterator(
+ rawResult,
+ bytes -> {
+ final ValueTimestampHeaders<V> vth =
serdes.valueFrom(bytes, new RecordHeaders());
+ return vth == null ? null : vth.value();
+ }
+ );
+
final QueryResult<MeteredWindowedKeyValueIterator<K, V>>
typedQueryResult =
-
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult,
typedResult);
+
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult,
typedResult);
result = (QueryResult<R>) typedQueryResult;
}
} else {
@@ -302,11 +292,11 @@ public class MeteredTimestampedWindowStoreWithHeaders<K,
V>
}
} else {
result = QueryResult.forFailure(
- FailureReason.UNKNOWN_QUERY_TYPE,
- "This store (" + getClass() + ") doesn't know how to"
- + " execute the given query (" + query + ") because"
- + " WindowStores only supports
WindowRangeQuery.withWindowStartRange."
- + " Contact the store maintainer if you need support
for a new query type."
+ FailureReason.UNKNOWN_QUERY_TYPE,
+ "This store (" + getClass() + ") doesn't know how to"
+ + " execute the given query (" + query + ") because"
+ + " WindowStores only supports
WindowRangeQuery.withWindowStartRange."
+ + " Contact the store maintainer if you need support for a
new query type."
);
}
return result;
@@ -433,6 +423,23 @@ public class MeteredTimestampedWindowStoreWithHeaders<K, V>
}
}
+ private <ValueType> MeteredWindowedKeyValueIterator<K, ValueType>
meteredWindowedIterator(
+ final QueryResult<KeyValueIterator<Windowed<Bytes>, byte[]>> rawResult,
+ final Function<byte[], ValueType> valueDeserializer
+ ) {
+ return new MeteredWindowedKeyValueIterator<>(
+ rawResult.getResult(),
+ fetchSensor,
+ iteratorDurationSensor,
+ streamsMetrics,
+ bytes -> serdes.keyFrom(bytes, new RecordHeaders()),
+ valueDeserializer,
+ time,
+ numOpenIterators,
+ openIterators
+ );
+ }
+
private boolean isUnderlyingStoreTimestamped() {
Object store = wrapped();
do {
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java
index 0cb35fc63b7..9cd8b9e1861 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java
@@ -71,11 +71,13 @@ public class MeteredVersionedKeyValueStore<K, V>
private final MeteredVersionedKeyValueStoreInternal internal;
- MeteredVersionedKeyValueStore(final VersionedBytesStore inner,
- final String metricScope,
- final Time time,
- final Serde<K> keySerde,
- final Serde<V> valueSerde) {
+ MeteredVersionedKeyValueStore(
+ final VersionedBytesStore inner,
+ final String metricScope,
+ final Time time,
+ final Serde<K> keySerde,
+ final Serde<V> valueSerde
+ ) {
super(inner);
internal = new MeteredVersionedKeyValueStoreInternal(inner,
metricScope, time, keySerde, valueSerde);
}
@@ -95,14 +97,13 @@ public class MeteredVersionedKeyValueStore<K, V>
* Note that the addition of {@link #get(Object, long)} and {@link
#delete(Object, long)} in
* this class are to match the interface of {@link VersionedKeyValueStore}.
*/
- private class MeteredVersionedKeyValueStoreInternal
- extends MeteredKeyValueStore<K, ValueAndTimestamp<V>> {
+ private class MeteredVersionedKeyValueStoreInternal extends
MeteredKeyValueStore<K, ValueAndTimestamp<V>> {
private final VersionedBytesStore inner;
private final Serde<V> plainValueSerde;
private StateSerdes<K, V> plainValueSerdes;
- private final Map<Class<?>, QueryHandler> queryHandlers =
+ private final Map<Class<?>, QueryHandler<?>> queryHandlers =
mkMap(
mkEntry(
RangeQuery.class,
@@ -122,11 +123,13 @@ public class MeteredVersionedKeyValueStore<K, V>
)
);
- MeteredVersionedKeyValueStoreInternal(final VersionedBytesStore inner,
- final String metricScope,
- final Time time,
- final Serde<K> keySerde,
- final Serde<V> valueSerde) {
+ MeteredVersionedKeyValueStoreInternal(
+ final VersionedBytesStore inner,
+ final String metricScope,
+ final Time time,
+ final Serde<K> keySerde,
+ final Serde<V> valueSerde
+ ) {
super(
inner,
metricScope,
@@ -182,56 +185,62 @@ public class MeteredVersionedKeyValueStore<K, V>
@SuppressWarnings("unchecked")
@Override
- public <R> QueryResult<R> query(final Query<R> query, final
PositionBound positionBound, final QueryConfig config) {
-
+ public <R> QueryResult<R> query(
+ final Query<R> query,
+ final PositionBound positionBound,
+ final QueryConfig config
+ ) {
final long start = time.nanoseconds();
final QueryResult<R> result;
- final QueryHandler handler = queryHandlers.get(query.getClass());
+ final QueryHandler<?> handler =
queryHandlers.get(query.getClass());
if (handler == null) {
result = wrapped().query(query, positionBound, config);
if (config.isCollectExecutionInfo()) {
- result.addExecutionInfo(
- "Handled in " + getClass() + " in " +
(time.nanoseconds() - start) + "ns");
+ result.addExecutionInfo("Handled in " + getClass() + " in
" + (time.nanoseconds() - start) + "ns");
}
} else {
- result = (QueryResult<R>) handler.apply(
+ result = ((QueryHandler<R>) handler).apply(
query,
positionBound,
config,
this
);
if (config.isCollectExecutionInfo()) {
- result.addExecutionInfo(
- "Handled in " + getClass() + " with serdes "
- + serdes + " in " + (time.nanoseconds() - start) +
"ns");
+ result.addExecutionInfo("Handled in " + getClass() + "
with serdes " + serdes + " in " + (time.nanoseconds() - start) + "ns");
}
}
return result;
}
@SuppressWarnings("unused")
- private <R> QueryResult<R> runRangeQuery(final Query<R> query,
- final PositionBound
positionBound,
- final QueryConfig config) {
+ private <R> QueryResult<R> runRangeQuery(
+ final Query<R> query,
+ final PositionBound positionBound,
+ final QueryConfig config
+ ) {
// throw exception for now to reserve the ability to implement
this in the future
// without clashing with users' custom implementations in the
meantime
throw new UnsupportedOperationException("Versioned stores do not
support RangeQuery queries at this time.");
}
@SuppressWarnings("unused")
- private <R> QueryResult<R> runKeyQuery(final Query<R> query,
- final PositionBound
positionBound,
- final QueryConfig config) {
+ private <R> QueryResult<R> runKeyQuery(
+ final Query<R> query,
+ final PositionBound positionBound,
+ final QueryConfig config
+ ) {
// throw exception for now to reserve the ability to implement
this in the future
// without clashing with users' custom implementations in the
meantime
throw new UnsupportedOperationException("Versioned stores do not
support KeyQuery queries at this time.");
}
@SuppressWarnings("unchecked")
- private <R> QueryResult<R> runVersionedKeyQuery(final Query<R> query,
- final PositionBound
positionBound,
- final QueryConfig
config) {
+ private <R> QueryResult<R> runVersionedKeyQuery(
+ final Query<R> query,
+ final PositionBound positionBound,
+ final QueryConfig config
+ ) {
final QueryResult<R> result;
final VersionedKeyQuery<K, V> typedKeyQuery =
(VersionedKeyQuery<K, V>) query;
VersionedKeyQuery<Bytes, byte[]> rawKeyQuery =
VersionedKeyQuery.withKey(serializeKey(typedKeyQuery.key()));
@@ -253,7 +262,11 @@ public class MeteredVersionedKeyValueStore<K, V>
}
@SuppressWarnings("unchecked")
- private <R> QueryResult<R> runMultiVersionedKeyQuery(final Query<R>
query, final PositionBound positionBound, final QueryConfig config) {
+ private <R> QueryResult<R> runMultiVersionedKeyQuery(
+ final Query<R> query,
+ final PositionBound positionBound,
+ final QueryConfig config
+ ) {
final QueryResult<R> result;
final MultiVersionedKeyQuery<K, V> typedKeyQuery =
(MultiVersionedKeyQuery<K, V>) query;
@@ -273,16 +286,16 @@ public class MeteredVersionedKeyValueStore<K, V>
final QueryResult<VersionedRecordIterator<byte[]>> rawResult =
wrapped().query(rawKeyQuery, positionBound, config);
if (rawResult.isSuccess()) {
final MeteredMultiVersionedKeyQueryIterator<V> typedResult =
- new MeteredMultiVersionedKeyQueryIterator<>(
- rawResult.getResult(),
- iteratorDurationSensor,
- time,
- StoreQueryUtils.deserializeValue(plainValueSerdes),
- numOpenIterators,
- openIterators
- );
+ new MeteredMultiVersionedKeyQueryIterator<>(
+ rawResult.getResult(),
+ iteratorDurationSensor,
+ time,
+ StoreQueryUtils.deserializeValue(plainValueSerdes),
+ numOpenIterators,
+ openIterators
+ );
final QueryResult<MeteredMultiVersionedKeyQueryIterator<V>>
typedQueryResult =
-
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult,
typedResult);
+
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult,
typedResult);
result = (QueryResult<R>) typedQueryResult;
} else {
// the generic type doesn't matter, since failed queries have
no result set.
@@ -312,7 +325,13 @@ public class MeteredVersionedKeyValueStore<K, V>
final String storeName = super.name();
final String changelogTopic =
ProcessorContextUtils.changelogFor(context, storeName, Boolean.FALSE);
plainValueSerdes = StoreSerdeInitializer.prepareStoreSerde(
- context, storeName, changelogTopic, keySerde, plainValueSerde,
WrappingNullableUtils::prepareValueSerde);
+ context,
+ storeName,
+ changelogTopic,
+ keySerde,
+ plainValueSerde,
+ WrappingNullableUtils::prepareValueSerde
+ );
}
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
index 8df7fbe161e..2929c2b654a 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
@@ -86,8 +86,7 @@ public class MeteredWindowStore<K, V>
protected final LongAdder numOpenIterators = new LongAdder();
protected final NavigableSet<MeteredIterator> openIterators = new
ConcurrentSkipListSet<>(Comparator.comparingLong(MeteredIterator::startTimestamp));
- @SuppressWarnings("rawtypes")
- private final Map<Class, QueryHandler> queryHandlers =
+ private final Map<Class<?>, QueryHandler<?>> queryHandlers =
mkMap(
mkEntry(
WindowRangeQuery.class,
@@ -107,12 +106,14 @@ public class MeteredWindowStore<K, V>
)
);
- MeteredWindowStore(final WindowStore<Bytes, byte[]> inner,
- final long windowSizeMs,
- final String metricsScope,
- final Time time,
- final Serde<K> keySerde,
- final Serde<V> valueSerde) {
+ MeteredWindowStore(
+ final WindowStore<Bytes, byte[]> inner,
+ final long windowSizeMs,
+ final String metricsScope,
+ final Time time,
+ final Serde<K> keySerde,
+ final Serde<V> valueSerde
+ ) {
super(inner);
this.windowSizeMs = windowSizeMs;
this.metricsScope = metricsScope;
@@ -122,8 +123,7 @@ public class MeteredWindowStore<K, V>
}
@Override
- public void init(final StateStoreContext stateStoreContext,
- final StateStore root) {
+ public void init(final StateStoreContext stateStoreContext, final
StateStore root) {
internalContext = stateStoreContext instanceof
InternalProcessorContext ? (InternalProcessorContext<?, ?>) stateStoreContext :
null;
taskId = stateStoreContext.taskId();
initStoreSerde(stateStoreContext);
@@ -136,6 +136,7 @@ public class MeteredWindowStore<K, V>
// register and possibly restore the state from the logs
maybeMeasureLatency(() -> super.init(stateStoreContext, root), time,
restoreSensor);
}
+
protected Serde<V> prepareValueSerde(final Serde<V> valueSerde, final
SerdeGetter getter) {
return WrappingNullableUtils.prepareValueSerde(valueSerde, getter);
}
@@ -397,39 +398,40 @@ public class MeteredWindowStore<K, V>
@SuppressWarnings("unchecked")
@Override
- public <R> QueryResult<R> query(final Query<R> query,
- final PositionBound positionBound,
- final QueryConfig config) {
+ public <R> QueryResult<R> query(
+ final Query<R> query,
+ final PositionBound positionBound,
+ final QueryConfig config
+ ) {
final long start = time.nanoseconds();
final QueryResult<R> result;
- final QueryHandler handler = queryHandlers.get(query.getClass());
+ final QueryHandler<?> handler = queryHandlers.get(query.getClass());
if (handler == null) {
result = wrapped().query(query, positionBound, config);
if (config.isCollectExecutionInfo()) {
- result.addExecutionInfo(
- "Handled in " + getClass() + " in " + (time.nanoseconds()
- start) + "ns");
+ result.addExecutionInfo("Handled in " + getClass() + " in " +
(time.nanoseconds() - start) + "ns");
}
} else {
- result = (QueryResult<R>) handler.apply(
+ result = ((QueryHandler<R>) handler).apply(
query,
positionBound,
config,
this
);
if (config.isCollectExecutionInfo()) {
- result.addExecutionInfo(
- "Handled in " + getClass() + " with serdes "
- + serdes + " in " + (time.nanoseconds() - start) +
"ns");
+ result.addExecutionInfo("Handled in " + getClass() + " with
serdes " + serdes + " in " + (time.nanoseconds() - start) + "ns");
}
}
return result;
}
@SuppressWarnings("unchecked")
- private <R> QueryResult<R> runRangeQuery(final Query<R> query,
- final PositionBound positionBound,
- final QueryConfig config) {
+ private <R> QueryResult<R> runRangeQuery(
+ final Query<R> query,
+ final PositionBound positionBound,
+ final QueryConfig config
+ ) {
final QueryResult<R> result;
final WindowRangeQuery<K, V> typedQuery = (WindowRangeQuery<K, V>)
query;
// There's no store API for open time ranges
@@ -466,7 +468,6 @@ public class MeteredWindowStore<K, V>
result = (QueryResult<R>) rawResult;
}
} else {
-
result = QueryResult.forFailure(
FailureReason.UNKNOWN_QUERY_TYPE,
"This store (" + getClass() + ") doesn't know how to"
@@ -479,11 +480,12 @@ public class MeteredWindowStore<K, V>
return result;
}
-
@SuppressWarnings("unchecked")
- private <R> QueryResult<R> runKeyQuery(final Query<R> query,
- final PositionBound positionBound,
- final QueryConfig config) {
+ private <R> QueryResult<R> runKeyQuery(
+ final Query<R> query,
+ final PositionBound positionBound,
+ final QueryConfig config
+ ) {
final QueryResult<R> queryResult;
final WindowKeyQuery<K, V> typedQuery = (WindowKeyQuery<K, V>) query;
// There's no store API for open time ranges
@@ -518,7 +520,6 @@ public class MeteredWindowStore<K, V>
queryResult = (QueryResult<R>) rawResult;
}
} else {
-
queryResult = QueryResult.forFailure(
FailureReason.UNKNOWN_QUERY_TYPE,
"This store (" + getClass() + ") doesn't know how to execute"
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
index 0330ebd05de..08a5bf8e8d5 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
@@ -71,17 +71,16 @@ public final class StoreQueryUtils {
* in a map.
*/
@FunctionalInterface
- public interface QueryHandler {
- QueryResult<?> apply(
- final Query<?> query,
+ public interface QueryHandler<R> {
+ QueryResult<R> apply(
+ final Query<R> query,
final PositionBound positionBound,
final QueryConfig config,
final StateStore store
);
}
- @SuppressWarnings("rawtypes")
- private static final Map<Class, QueryHandler> QUERY_HANDLER_MAP =
+ private static final Map<Class<?>, QueryHandler<?>> QUERY_HANDLER_MAP =
mkMap(
mkEntry(
RangeQuery.class,
@@ -110,9 +109,8 @@ public final class StoreQueryUtils {
);
// make this class uninstantiable
+ private StoreQueryUtils() { }
- private StoreQueryUtils() {
- }
@SuppressWarnings("unchecked")
public static <R> QueryResult<R> handleBasicQueries(
final Query<R> query,
@@ -126,7 +124,7 @@ public final class StoreQueryUtils {
final long start = config.isCollectExecutionInfo() ? System.nanoTime()
: -1L;
final QueryResult<R> result;
- final QueryHandler handler = QUERY_HANDLER_MAP.get(query.getClass());
+ final QueryHandler<?> handler =
QUERY_HANDLER_MAP.get(query.getClass());
synchronized (position) {
if (handler == null) {
result = QueryResult.forUnknownQueryType(query, store);
@@ -137,7 +135,7 @@ public final class StoreQueryUtils {
context == null ? null : context.taskId().partition()
);
} else {
- result = (QueryResult<R>) handler.apply(
+ result = ((QueryHandler<R>) handler).apply(
query,
positionBound,
config,
@@ -154,10 +152,7 @@ public final class StoreQueryUtils {
return result;
}
- public static void updatePosition(
- final Position position,
- final StateStoreContext stateStoreContext) {
-
+ public static void updatePosition(final Position position, final
StateStoreContext stateStoreContext) {
if (stateStoreContext != null &&
stateStoreContext.recordMetadata().isPresent()) {
final RecordMetadata meta =
stateStoreContext.recordMetadata().get();
if (meta.topic() != null) {
@@ -363,10 +358,12 @@ public final class StoreQueryUtils {
}
@SuppressWarnings("unchecked")
- private static <R> QueryResult<R> runVersionedKeyQuery(final Query<R>
query,
- final PositionBound
positionBound,
- final QueryConfig
config,
- final StateStore
store) {
+ private static <R> QueryResult<R> runVersionedKeyQuery(
+ final Query<R> query,
+ final PositionBound positionBound,
+ final QueryConfig config,
+ final StateStore store
+ ) {
if (store instanceof VersionedKeyValueStore) {
final VersionedKeyValueStore<Bytes, byte[]> versionedKeyValueStore
=
(VersionedKeyValueStore<Bytes, byte[]>) store;
@@ -388,27 +385,29 @@ public final class StoreQueryUtils {
message
);
}
-
} else {
return QueryResult.forUnknownQueryType(query, store);
}
}
@SuppressWarnings("unchecked")
- private static <R> QueryResult<R> runMultiVersionedKeyQuery(final Query<R>
query,
- final
PositionBound positionBound,
- final
QueryConfig config,
- final
StateStore store) {
-
+ private static <R> QueryResult<R> runMultiVersionedKeyQuery(
+ final Query<R> query,
+ final PositionBound positionBound,
+ final QueryConfig config,
+ final StateStore store
+ ) {
if (store instanceof VersionedKeyValueStore) {
final RocksDBVersionedStore rocksDBVersionedStore =
(RocksDBVersionedStore) store;
final MultiVersionedKeyQuery<Bytes, byte[]> rawKeyQuery =
(MultiVersionedKeyQuery<Bytes, byte[]>) query;
try {
final VersionedRecordIterator<byte[]> segmentIterator =
- rocksDBVersionedStore.get(rawKeyQuery.key(),
-
rawKeyQuery.fromTime().get().toEpochMilli(),
-
rawKeyQuery.toTime().get().toEpochMilli(),
- rawKeyQuery.resultOrder());
+ rocksDBVersionedStore.get(
+ rawKeyQuery.key(),
+ rawKeyQuery.fromTime().get().toEpochMilli(),
+ rawKeyQuery.toTime().get().toEpochMilli(),
+ rawKeyQuery.resultOrder()
+ );
return (QueryResult<R>) QueryResult.forResult(segmentIterator);
} catch (final Exception e) {
final String message = parseStoreException(e, store, query);
@@ -450,16 +449,19 @@ public final class StoreQueryUtils {
public static <V> Function<VersionedRecord<byte[]>, VersionedRecord<V>>
deserializeValue(final StateSerdes<?, V> serdes) {
final Serde<V> valueSerde = serdes.valueSerde();
final Deserializer<V> deserializer = valueSerde.deserializer();
- return rawVersionedRecord -> rawVersionedRecord.validTo().isPresent()
- ? new VersionedRecord<>(
- // deserializeValue s only used via IQ, so it's ok to not pass
any headers
- deserializer.deserialize(serdes.topic(), new RecordHeaders(),
rawVersionedRecord.value()),
- rawVersionedRecord.timestamp(),
- rawVersionedRecord.validTo().get())
- : new VersionedRecord<>(
- // deserializeValue s only used via IQ, so it's ok to not pass
any headers
- deserializer.deserialize(serdes.topic(), new RecordHeaders(),
rawVersionedRecord.value()),
- rawVersionedRecord.timestamp());
+ return rawVersionedRecord ->
+ rawVersionedRecord.validTo().isPresent()
+ ? new VersionedRecord<>(
+ // deserializeValue s only used via IQ, so it's ok to
not pass any headers
+ deserializer.deserialize(serdes.topic(), new
RecordHeaders(), rawVersionedRecord.value()),
+ rawVersionedRecord.timestamp(),
+ rawVersionedRecord.validTo().get()
+ )
+ : new VersionedRecord<>(
+ // deserializeValue s only used via IQ, so it's ok to
not pass any headers
+ deserializer.deserialize(serdes.topic(), new
RecordHeaders(), rawVersionedRecord.value()),
+ rawVersionedRecord.timestamp()
+ );
}
@SuppressWarnings("resource")
@@ -504,8 +506,7 @@ public final class StoreQueryUtils {
private static Position topicPartitionMapToPosition(final
Map<TopicPartition, Long> topicPartitions) {
final Map<String, Map<Integer, Long>> pos = new HashMap<>();
for (final Entry<TopicPartition, Long> e : topicPartitions.entrySet())
{
- pos
- .computeIfAbsent(e.getKey().topic(), t -> new HashMap<>())
+ pos.computeIfAbsent(e.getKey().topic(), t -> new HashMap<>())
.put(e.getKey().partition(), e.getValue());
}
return Position.fromMap(pos);
@@ -514,8 +515,7 @@ public final class StoreQueryUtils {
private static <R> String parseStoreException(final Exception e, final
StateStore store, final Query<R> query) {
final StringWriter stringWriter = new StringWriter();
final PrintWriter printWriter = new PrintWriter(stringWriter);
- printWriter.println(
- store.getClass() + " failed to handle query " + query + ":");
+ printWriter.println(store.getClass() + " failed to handle query " +
query + ":");
e.printStackTrace(printWriter);
printWriter.flush();
return stringWriter.toString();
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStoreTest.java
index fae6a7fe379..67186489f28 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStoreTest.java
@@ -27,7 +27,6 @@ import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.WindowStoreIterator;
-import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
@@ -65,14 +64,9 @@ public class ChangeLoggingTimestampedWindowBytesStoreTest {
store.init(context, store);
}
- @AfterEach
- public void tearDown() {
- verify(inner).init(context, store);
- }
-
@Test
public void shouldDelegateInit() {
- // testing the combination of setUp and tearDown
+ verify(inner).init(context, store);
}
@Test
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java
index 39c93b86fe7..fabca5e3bdb 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java
@@ -27,7 +27,6 @@ import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.WindowStoreIterator;
-import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
@@ -64,14 +63,9 @@ public class ChangeLoggingWindowBytesStoreTest {
store.init(context, store);
}
- @AfterEach
- public void tearDown() {
- verify(inner).init(context, store);
- }
-
@Test
public void shouldDelegateInit() {
- // testing the combination of setUp and tearDown
+ verify(inner).init(context, store);
}
@Test