mjsax commented on code in PR #21643:
URL: https://github.com/apache/kafka/pull/21643#discussion_r2898274271
##########
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeadersTest.java:
##########
@@ -638,20 +635,6 @@ private void prepareTimestampedStore() {
}
}
- @Test
- public void shouldThrowUnsupportedOperationExceptionOnQuery() {
Review Comment:
I think we should keep this test, but instead of expecting an exception,
expect a `FailedQueryResult` ?
##########
streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderWithHeadersTest.java:
##########
@@ -191,70 +187,6 @@ public void shouldThrowNullPointerIfMetricsScopeIsNull() {
assertTrue(e.getMessage().contains("storeSupplier's metricsScope can't
be null"));
}
- @Test
- public void shouldThrowUsingIQv2ForInMemoryStores() {
Review Comment:
So we need to add test that we now support something? Can also be a follow
up PR (or just Jira ticket for now?)
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedToHeadersStoreAdapter.java:
##########
@@ -152,7 +155,40 @@ public boolean isOpen() {
public <R> QueryResult<R> query(final Query<R> query,
final PositionBound positionBound,
final QueryConfig config) {
- throw new UnsupportedOperationException("Queries (IQv2) are not
supported for timestamped key-value stores with headers yet.");
+ // Handle KeyQuery: convert byte[] result from timestamped to headers
format
Review Comment:
We should also add the "tracing" code, ie, call
`result.addExecutionInfo(...)`.
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedToHeadersStoreAdapter.java:
##########
@@ -152,6 +155,39 @@ public boolean isOpen() {
public <R> QueryResult<R> query(final Query<R> query,
final PositionBound positionBound,
final QueryConfig config) {
+ // Handle KeyQuery: convert byte[] result from timestamped to headers
format
+ if (query instanceof KeyQuery) {
+ final KeyQuery<Bytes, byte[]> keyQuery = (KeyQuery<Bytes, byte[]>)
query;
+ final QueryResult<byte[]> rawResult = store.query(keyQuery,
positionBound, config);
+
+ if (rawResult.isSuccess()) {
+ final byte[] convertedValue =
convertToHeaderFormat(rawResult.getResult());
+ final QueryResult<byte[]> convertedResult =
+
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult,
convertedValue);
+ return (QueryResult<R>) convertedResult;
+ } else {
+ return (QueryResult<R>) rawResult;
+ }
+ }
+
+ // Handle RangeQuery: wrap iterator to convert values
+ if (query instanceof RangeQuery) {
+ final RangeQuery<Bytes, byte[]> rangeQuery = (RangeQuery<Bytes,
byte[]>) query;
+ final QueryResult<KeyValueIterator<Bytes, byte[]>> rawResult =
+ store.query(rangeQuery, positionBound, config);
+
+ if (rawResult.isSuccess()) {
+ final KeyValueIterator<Bytes, byte[]> convertedIterator =
+ new
TimestampedToHeadersIteratorAdapter<>(rawResult.getResult());
+ final QueryResult<KeyValueIterator<Bytes, byte[]>>
convertedResult =
+
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult,
convertedIterator);
+ return (QueryResult<R>) convertedResult;
+ } else {
+ return (QueryResult<R>) rawResult;
+ }
+ }
+
+ // For other query types, delegate to the underlying store
Review Comment:
Not sure if this is correct? -- For regular layers in the hierarchy, if a
query-type is not supported, it's fine to forward to lower layers, but for an
adaptor, if we cannot translate the bytes for unknown query type, even if the
lower layer supports the query, we would crash in the upper layer trying to
deserialize as the byte format does not match what the metered layer expects,
w/o the adaptor fixing it up...
So it seems better to return `FailedQueryResult` for this case?
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeaders.java:
##########
@@ -152,11 +148,4 @@ private void verifyAndCloseEmptyDefaultColumnFamily(final
ColumnFamilyHandle col
}
}
- @Override
- public <R> QueryResult<R> query(final Query<R> query,
- final PositionBound positionBound,
- final QueryConfig config) {
- throw new UnsupportedOperationException("Queries (IQv2) are not
supported for timestamped key-value stores with headers yet.");
- }
Review Comment:
I don't think so. If we don't overwrite and re-use `RocksDBStore#query()` we
would start to "support" `KeyQuery` what we don't want to do at this point?
I think we need:
```
@Override
public <R> QueryResult<R> query(final Query<R> query,
final PositionBound positionBound,
final QueryConfig config) {
final long start = config.isCollectExecutionInfo() ? System.nanoTime() :
-1L;
final QueryResult<R> result;
synchronized (position) {
result = QueryResult.forUnknownQueryType(query, store);
if (config.isCollectExecutionInfo()) {
result.addExecutionInfo(
"Handled in " + store.getClass() + " in " +
(System.nanoTime() - start) + "ns"
);
}
result.setPosition(position.copy());
}
return result;
}
```
##########
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java:
##########
@@ -440,6 +465,11 @@ public void setup(final boolean cache, final boolean log,
final StoresToTest sto
final StreamsBuilder builder = new StreamsBuilder();
if (Objects.equals(kind, "DSL") && supplier instanceof
KeyValueBytesStoreSupplier) {
+ if (storeToTest.isHeaders()) {
+ // DSL doesn't support headers stores - skip this test
combination
Review Comment:
Let write down a TODO (Jira ticket?) to enable this later, after DSL is
completed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]