mjsax commented on code in PR #21643:
URL: https://github.com/apache/kafka/pull/21643#discussion_r2893839057


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderWithHeaders.java:
##########
@@ -173,8 +173,7 @@ public long approximateNumEntries() {
         public <R> QueryResult<R> query(final Query<R> query,
                                         final PositionBound positionBound,
                                         final QueryConfig config) {
-
-            throw new UnsupportedOperationException("Queries (IQv2) are not 
supported by timestamped key-value stores with headers yet.");
+            return wrapped().query(query, positionBound, config);

Review Comment:
   As we extend `Wrapped` store, I think it best to remove the whole override 
and re-use the impl from `Wrapped` store.



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java:
##########
@@ -86,7 +120,7 @@ public void put(final K key,
 
     @Override
     public ValueTimestampHeaders<V> putIfAbsent(final K key,
-                         final ValueTimestampHeaders<V> value) {
+                                                final ValueTimestampHeaders<V> 
value) {

Review Comment:
   TY!



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeaders.java:
##########
@@ -152,11 +148,4 @@ private void verifyAndCloseEmptyDefaultColumnFamily(final 
ColumnFamilyHandle col
         }
     }
 
-    @Override
-    public <R> QueryResult<R> query(final Query<R> query,
-                                    final PositionBound positionBound,
-                                    final QueryConfig config) {
-        throw new UnsupportedOperationException("Queries (IQv2) are not 
supported for timestamped key-value stores with headers yet.");
-    }

Review Comment:
   Yes -- but thinking about it, throwing an exception is not right anyway? We 
should rather return a `QueryResult.forUnknownQueryType(query, store);` and 
some boiler plate code  for "tracing" and "position" handling?



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedToHeadersStoreAdapter.java:
##########
@@ -152,7 +152,7 @@ public boolean isOpen() {
     public <R> QueryResult<R> query(final Query<R> query,
                                     final PositionBound positionBound,
                                     final QueryConfig config) {
-        throw new UnsupportedOperationException("Queries (IQv2) are not 
supported for timestamped key-value stores with headers yet.");
+        return store.query(query, positionBound, config);

Review Comment:
   If this is an adaptor, do we need to make some translation between types 
here?



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java:
##########
@@ -98,16 +132,259 @@ public ValueTimestampHeaders<V> putIfAbsent(final K key,
         return currentValue;
     }
 
+    /**
+     * Executes a query against this store.
+     *
+     * <p>Note: Query results do NOT include headers, even though headers are
+     * preserved in the underlying store. This behavior provides compatibility
+     * with existing IQv2 APIs that operate on timestamped stores.
+     *
+     * @param query the query to execute
+     * @param positionBound the position bound
+     * @param config the query configuration
+     * @return the query result
+     */
+
+    @SuppressWarnings("unchecked")
     @Override
     public <R> QueryResult<R> query(final Query<R> query,
                                     final PositionBound positionBound,
                                     final QueryConfig config) {
-        throw new UnsupportedOperationException("Queries (IQv2) are not 
supported for timestamped key-value stores with headers yet.");
+
+        final long start = time.nanoseconds();
+        final QueryResult<R> result;
+
+        final StoreQueryUtils.QueryHandler handler = 
queryHandlers.get(query.getClass());
+        if (handler == null) {
+            result = wrapped().query(query, positionBound, config);
+            if (config.isCollectExecutionInfo()) {
+                result.addExecutionInfo(
+                    "Handled in " + getClass() + " in " + (time.nanoseconds() 
- start) + "ns");
+            }
+        } else {
+            result = (QueryResult<R>) handler.apply(
+                query,
+                positionBound,
+                config,
+                this
+            );
+            if (config.isCollectExecutionInfo()) {
+                result.addExecutionInfo("Handled in " + getClass() + " with 
serdes "
+                        + serdes + " in " + (time.nanoseconds() - start) + 
"ns");
+            }
+        }
+        return result;
     }
 
-    @Override
-    public Position getPosition() {
-        throw new UnsupportedOperationException("Position is not supported by 
timestamped key-value stores with headers yet.");
+    @SuppressWarnings("unchecked")
+    private <R> QueryResult<R> runKeyQuery(final Query<R> query,
+                                           final PositionBound positionBound,
+                                           final QueryConfig config) {
+        final QueryResult<R> result;
+        final KeyQuery<K, V> typedKeyQuery = (KeyQuery<K, V>) query;
+        final KeyQuery<Bytes, byte[]> rawKeyQuery =
+            KeyQuery.withKey(keyBytes(typedKeyQuery.getKey(), new 
RecordHeaders()));
+        final QueryResult<byte[]> rawResult =
+            wrapped().query(rawKeyQuery, positionBound, config);
+        if (rawResult.isSuccess()) {
+            final Function<byte[], ValueTimestampHeaders<V>> deserializer = 
StoreQueryUtils.deserializeValue(serdes, wrapped());
+            final ValueTimestampHeaders<V> valueTimestampHeaders = 
deserializer.apply(rawResult.getResult());
+            final V plainValue = valueTimestampHeaders == null ? null : 
valueTimestampHeaders.value();
+            final QueryResult<V> typedQueryResult =
+                    
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult, 
plainValue);
+            result = (QueryResult<R>) typedQueryResult;
+        } else {
+            // the generic type doesn't matter, since failed queries have no 
result set.
+            result = (QueryResult<R>) rawResult;
+        }
+        return result;
+    }
+
+    @SuppressWarnings("unchecked")
+    private <R> QueryResult<R> runTimestampedKeyQuery(final Query<R> query,
+                                                      final PositionBound 
positionBound,
+                                                      final QueryConfig 
config) {
+        final QueryResult<R> result;
+        final TimestampedKeyQuery<K, V> typedKeyQuery = 
(TimestampedKeyQuery<K, V>) query;
+        final KeyQuery<Bytes, byte[]> rawKeyQuery =
+                KeyQuery.withKey(keyBytes(typedKeyQuery.key(), new 
RecordHeaders()));
+        final QueryResult<byte[]> rawResult =
+                wrapped().query(rawKeyQuery, positionBound, config);
+        if (rawResult.isSuccess()) {
+            final Function<byte[], ValueTimestampHeaders<V>> deserializer = 
StoreQueryUtils.deserializeValue(serdes, wrapped());
+            final ValueTimestampHeaders<V> valueTimestampHeaders = 
deserializer.apply(rawResult.getResult());
+            // Convert ValueTimestampHeaders to ValueAndTimestamp for the 
result
+            final ValueAndTimestamp<V> valueAndTimestamp = 
valueTimestampHeaders == null
+                    ? null
+                    : ValueAndTimestamp.make(valueTimestampHeaders.value(), 
valueTimestampHeaders.timestamp());
+            final QueryResult<ValueAndTimestamp<V>> typedQueryResult =
+                    
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult, 
valueAndTimestamp);
+            result = (QueryResult<R>) typedQueryResult;
+        } else {
+            // the generic type doesn't matter, since failed queries have no 
result set.
+            result = (QueryResult<R>) rawResult;
+        }
+        return result;
+    }
+
+    @SuppressWarnings("unchecked")
+    private <R> QueryResult<R> runRangeQuery(final Query<R> query,
+                                             final PositionBound positionBound,
+                                             final QueryConfig config) {
+
+        final QueryResult<R> result;
+        final RangeQuery<K, V> typedQuery = (RangeQuery<K, V>) query;
+        RangeQuery<Bytes, byte[]> rawRangeQuery;
+        final ResultOrder order = typedQuery.resultOrder();
+        rawRangeQuery = RangeQuery.withRange(
+                keyBytes(typedQuery.getLowerBound().orElse(null), new 
RecordHeaders()),
+                keyBytes(typedQuery.getUpperBound().orElse(null), new 
RecordHeaders())
+        );
+        if (order.equals(ResultOrder.DESCENDING)) {
+            rawRangeQuery = rawRangeQuery.withDescendingKeys();
+        }
+        if (order.equals(ResultOrder.ASCENDING)) {
+            rawRangeQuery = rawRangeQuery.withAscendingKeys();
+        }
+        final QueryResult<KeyValueIterator<Bytes, byte[]>> rawResult =
+            wrapped().query(rawRangeQuery, positionBound, config);
+        if (rawResult.isSuccess()) {
+            final KeyValueIterator<Bytes, byte[]> iterator = 
rawResult.getResult();
+            final KeyValueIterator<K, V> resultIterator = new 
MeteredTimestampedKeyValueStoreWithHeadersIterator(
+                    iterator,
+                    getSensor,
+                    StoreQueryUtils.deserializeValue(serdes, wrapped()),
+                    true
+            );
+            final QueryResult<KeyValueIterator<K, V>> typedQueryResult =
+                InternalQueryResultUtil.copyAndSubstituteDeserializedResult(
+                        rawResult,
+                        resultIterator
+                );
+            result = (QueryResult<R>) typedQueryResult;
+        } else {
+            // the generic type doesn't matter, since failed queries have no 
result set.
+            result = (QueryResult<R>) rawResult;
+        }
+        return result;
+    }
+
+    @SuppressWarnings("unchecked")
+    private <R> QueryResult<R> runTimestampedRangeQuery(final Query<R> query,
+                                                        final PositionBound 
positionBound,
+                                                        final QueryConfig 
config) {
+
+        final QueryResult<R> result;
+        final TimestampedRangeQuery<K, V> typedQuery = 
(TimestampedRangeQuery<K, V>) query;
+        RangeQuery<Bytes, byte[]> rawRangeQuery;
+        final ResultOrder order = typedQuery.resultOrder();
+        rawRangeQuery = RangeQuery.withRange(
+                keyBytes(typedQuery.lowerBound().orElse(null), new 
RecordHeaders()),
+                keyBytes(typedQuery.upperBound().orElse(null), new 
RecordHeaders())
+        );
+        if (order.equals(ResultOrder.DESCENDING)) {
+            rawRangeQuery = rawRangeQuery.withDescendingKeys();
+        }
+        if (order.equals(ResultOrder.ASCENDING)) {
+            rawRangeQuery = rawRangeQuery.withAscendingKeys();
+        }
+        final QueryResult<KeyValueIterator<Bytes, byte[]>> rawResult =
+                wrapped().query(rawRangeQuery, positionBound, config);
+        if (rawResult.isSuccess()) {
+            final KeyValueIterator<Bytes, byte[]> iterator = 
rawResult.getResult();
+            final KeyValueIterator<K, ValueAndTimestamp<V>> resultIterator =
+                    (KeyValueIterator<K, ValueAndTimestamp<V>>) new 
MeteredTimestampedKeyValueStoreWithHeadersIterator(
+                            iterator,
+                            getSensor,
+                            StoreQueryUtils.deserializeValue(serdes, 
wrapped()),
+                            false
+            );
+            final QueryResult<KeyValueIterator<K, ValueAndTimestamp<V>>> 
typedQueryResult =
+                    
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(
+                            rawResult,
+                            resultIterator
+                    );
+            result = (QueryResult<R>) typedQueryResult;
+        } else {
+            // the generic type doesn't matter, since failed queries have no 
result set.
+            result = (QueryResult<R>) rawResult;
+        }
+        return result;
+    }
+
+    @SuppressWarnings("unchecked")
+    private class MeteredTimestampedKeyValueStoreWithHeadersIterator 
implements KeyValueIterator<K, V>, MeteredIterator {
+
+        private final KeyValueIterator<Bytes, byte[]> iter;
+        private final Sensor sensor;
+        private final long startNs;
+        private final long startTimestampMs;
+        private final Function<byte[], ValueTimestampHeaders<V>> 
valueTimestampHeadersDeserializer;
+
+        private final boolean returnPlainValue;
+
+        private MeteredTimestampedKeyValueStoreWithHeadersIterator(final 
KeyValueIterator<Bytes, byte[]> iter,
+                                                                   final 
Sensor sensor,
+                                                                   final 
Function<byte[], ValueTimestampHeaders<V>> valueTimestampHeadersDeserializer,
+                                                                   final 
boolean returnPlainValue) {
+            this.iter = iter;
+            this.sensor = sensor;
+            this.valueTimestampHeadersDeserializer = 
valueTimestampHeadersDeserializer;
+            this.startNs = time.nanoseconds();
+            this.startTimestampMs = time.milliseconds();
+            this.returnPlainValue = returnPlainValue;
+            openIterators.add(this);
+        }
+
+        @Override
+        public long startTimestamp() {
+            return startTimestampMs;
+        }
+
+        @Override
+        public boolean hasNext() {
+            return iter.hasNext();
+        }
+
+        @Override
+        public KeyValue<K, V> next() {
+            final KeyValue<Bytes, byte[]> keyValue = iter.next();
+            final ValueTimestampHeaders<V> valueTimestampHeaders = 
valueTimestampHeadersDeserializer.apply(keyValue.value);
+            final Headers headers = valueTimestampHeaders != null ? 
valueTimestampHeaders.headers() : new RecordHeaders();
+            if (returnPlainValue) {
+                final V plainValue = valueTimestampHeaders == null ? null : 
valueTimestampHeaders.value();
+                return KeyValue.pair(
+                    serdes.keyFrom(keyValue.key.get(), headers),
+                    plainValue
+                );
+            } else {
+                // Return as ValueAndTimestamp
+                final ValueAndTimestamp<V> valueAndTimestamp = 
valueTimestampHeaders == null
+                    ? null
+                    : ValueAndTimestamp.make(valueTimestampHeaders.value(), 
valueTimestampHeaders.timestamp());
+                return KeyValue.pair(
+                    serdes.keyFrom(keyValue.key.get(), headers),
+                    (V) valueAndTimestamp

Review Comment:
   This is messy... but it's the same in `MeteredTimestampedKeyValueStore`. 
Let's keep it this way. I'll do a follow up PR to fix this after this PR is 
merged.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to