This is an automated email from the ASF dual-hosted git repository.
bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new e0e7323f9b5 KAFKA-15768: Fix getOnlyPartitionResult to handle
FailedQueryResult (#22123)
e0e7323f9b5 is described below
commit e0e7323f9b5f945012ca53630a742e83cda64b0e
Author: Gavin Wang <[email protected]>
AuthorDate: Tue May 5 12:36:32 2026 -0400
KAFKA-15768: Fix getOnlyPartitionResult to handle FailedQueryResult (#22123)
`getOnlyPartitionResult()` was silently discarding `FailedQueryResult`
entries by filtering only for successful results, making failures
indistinguishable from "not found". Fix the filter to treat failed
results as meaningful so callers can inspect the failure via
`isFailure()`/`getFailureReason()`/`getFailureMessage()`.
Reviewers: Evan Zhou <[email protected]>, Bill Bejeck
<[email protected]>
---
.../integration/IQv2StoreIntegrationTest.java | 23 +++++++++++++++++++++-
.../kafka/streams/query/StateQueryResult.java | 3 +--
.../kafka/streams/query/StateQueryResultTest.java | 9 +++++++++
3 files changed, 32 insertions(+), 3 deletions(-)
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
index 42bd76d565d..7af75d07184 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
@@ -807,7 +807,7 @@ public class IQv2StoreIntegrationTest {
shouldHandleTimestampedKeyQuery(2,
ValueAndTimestamp.make(5, -1L));
}
} else {
- assertThrows(AssertionError.class, () ->
shouldHandleTimestampedKeyQuery(2, ValueAndTimestamp.make(5, WINDOW_START +
Duration.ofMinutes(2).toMillis() * 5)));
+ shouldHandleFailedTimestampedKeyQuery(2);
assertThrows(AssertionError.class, () ->
shouldHandleTimestampedRangeQueries(false));
}
@@ -1710,6 +1710,27 @@ public class IQv2StoreIntegrationTest {
assertThat(queryResult.getPosition(), is(POSITION_0));
}
+ public <V> void shouldHandleFailedTimestampedKeyQuery(final Integer key) {
+ final TimestampedKeyQuery<Integer, V> query =
TimestampedKeyQuery.withKey(key);
+ final StateQueryRequest<ValueAndTimestamp<V>> request =
+ inStore(STORE_NAME)
+ .withQuery(query)
+ .withPartitions(Set.of(0))
+ .withPositionBound(PositionBound.at(INPUT_POSITION));
+
+ final StateQueryResult<ValueAndTimestamp<V>> result =
+ IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request);
+
+ final QueryResult<ValueAndTimestamp<V>> queryResult =
+ result.getOnlyPartitionResult();
+
+ assertThat(queryResult.isFailure(), is(true));
+ assertThat(queryResult.getFailureReason(),
is(FailureReason.UNKNOWN_QUERY_TYPE));
+ assertThat(queryResult.getFailureMessage(),
containsString("TimestampedKeyQuery"));
+
+ assertThrows(IllegalArgumentException.class, queryResult::getResult);
+ }
+
public <V> void shouldHandleRangeQuery(
final Optional<Integer> lower,
final Optional<Integer> upper,
diff --git
a/streams/src/main/java/org/apache/kafka/streams/query/StateQueryResult.java
b/streams/src/main/java/org/apache/kafka/streams/query/StateQueryResult.java
index 5819710ebab..7ba8ca94af2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/query/StateQueryResult.java
+++ b/streams/src/main/java/org/apache/kafka/streams/query/StateQueryResult.java
@@ -69,8 +69,7 @@ public class StateQueryResult<R> {
partitionResults
.values()
.stream()
- .filter(QueryResult::isSuccess)
- .filter(r -> r.getResult() != null)
+ .filter(r -> r.isFailure() || r.getResult() != null)
.collect(Collectors.toList());
if (nonempty.size() > 1) {
diff --git
a/streams/src/test/java/org/apache/kafka/streams/query/StateQueryResultTest.java
b/streams/src/test/java/org/apache/kafka/streams/query/StateQueryResultTest.java
index f263e43bfa3..697f6fca28c 100644
---
a/streams/src/test/java/org/apache/kafka/streams/query/StateQueryResultTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/query/StateQueryResultTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.query;
+import org.apache.kafka.streams.query.internals.FailedQueryResult;
import org.apache.kafka.streams.query.internals.SucceededQueryResult;
import org.junit.jupiter.api.BeforeEach;
@@ -32,6 +33,7 @@ class StateQueryResultTest {
StateQueryResult<String> stringStateQueryResult;
final QueryResult<String> noResultsFound = new
SucceededQueryResult<>(null);
final QueryResult<String> validResult = new SucceededQueryResult<>("Foo");
+ final QueryResult<String> invalidResult = new
FailedQueryResult<>(FailureReason.DOES_NOT_EXIST, "Does not exist");
@BeforeEach
public void setUp() {
@@ -52,6 +54,13 @@ class StateQueryResultTest {
assertThat("Valid query results still works", result.getResult(),
is("Foo"));
}
+ @Test
+ void getOnlyPartitionResultWithSingleFailureResultTest() {
+ stringStateQueryResult.addResult(0, invalidResult);
+ final QueryResult<String> result =
stringStateQueryResult.getOnlyPartitionResult();
+ assertThat("Invalid query result should be a failure",
result.isFailure(), is(true));
+ }
+
@Test
void getOnlyPartitionResultMultipleResults() {
stringStateQueryResult.addResult(0, validResult);