This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 30f92ecf221 MINOR: add missing overrides on TTD store facades (#21759)
30f92ecf221 is described below
commit 30f92ecf22167cab8fb36dabf14814fd684c28e4
Author: Matthias J. Sax <[email protected]>
AuthorDate: Sun Mar 15 15:42:40 2026 -0700
MINOR: add missing overrides on TTD store facades (#21759)
This PR adds missing overloads to TDD store facades from StateStore.
Reviewers: Bill Bejeck <[email protected]>
---
.../GenericKeyValueIteratorFacadeTest.java | 2 +-
.../GenericWindowStoreIteratorFacadeTest.java | 2 +-
.../apache/kafka/streams/TopologyTestDriver.java | 52 +++++++++++++++++
.../kafka/streams/KeyValueStoreFacadeTest.java | 50 ++++++++++++++++
.../kafka/streams/WindowStoreFacadeTest.java | 67 +++++++++++++++++++---
5 files changed, 162 insertions(+), 11 deletions(-)
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/GenericKeyValueIteratorFacadeTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/GenericKeyValueIteratorFacadeTest.java
index 3983ed6057d..f357bf40de3 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/GenericKeyValueIteratorFacadeTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/GenericKeyValueIteratorFacadeTest.java
@@ -83,7 +83,7 @@ public class GenericKeyValueIteratorFacadeTest {
@Test
public void shouldDelegatePeekNextKey() {
- when(mockedInnerIterator.peekNextKey()).thenReturn("peekedKey", null);
+ when(mockedInnerIterator.peekNextKey()).thenReturn("peekedKey",
(String) null);
assertThat(facade.peekNextKey(), is("peekedKey"));
assertNull(facade.peekNextKey());
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/GenericWindowStoreIteratorFacadeTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/GenericWindowStoreIteratorFacadeTest.java
index 9bbab36e6ae..a989d385b61 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/GenericWindowStoreIteratorFacadeTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/GenericWindowStoreIteratorFacadeTest.java
@@ -84,7 +84,7 @@ public class GenericWindowStoreIteratorFacadeTest {
@Test
public void shouldDelegatePeekNextKey() {
- when(mockedInnerIterator.peekNextKey()).thenReturn(100L, null);
+ when(mockedInnerIterator.peekNextKey()).thenReturn(100L, (Long) null);
assertThat(facade.peekNextKey(), is(100L));
assertNull(facade.peekNextKey());
diff --git
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index 85023473005..3db553a6c6f 100644
---
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -1318,6 +1318,12 @@ public class TopologyTestDriver implements Closeable {
inner.init(stateStoreContext, root);
}
+ @SuppressWarnings("deprecation")
+ @Override
+ public void flush() {
+ inner.flush();
+ }
+
@Override
public void put(final K key, final V value) {
inner.put(key, ValueAndTimestamp.make(value,
ConsumerRecord.NO_TIMESTAMP));
@@ -1345,6 +1351,17 @@ public class TopologyTestDriver implements Closeable {
inner.commit(changelogOffsets);
}
+ @Override
+ public Long committedOffset(final TopicPartition topicPartition) {
+ return inner.committedOffset(topicPartition);
+ }
+
+ @SuppressWarnings("deprecation")
+ @Override
+ public boolean managesOffsets() {
+ return inner.managesOffsets();
+ }
+
@Override
public void close() {
inner.close();
@@ -1365,6 +1382,15 @@ public class TopologyTestDriver implements Closeable {
return inner.isOpen();
}
+ @Override
+ public <R> QueryResult<R> query(
+ final Query<R> query,
+ final PositionBound positionBound,
+ final QueryConfig config
+ ) {
+ return inner.query(query, positionBound, config);
+ }
+
@Override
public Position getPosition() {
return inner.getPosition();
@@ -1384,6 +1410,12 @@ public class TopologyTestDriver implements Closeable {
inner.init(stateStoreContext, root);
}
+ @SuppressWarnings("deprecation")
+ @Override
+ public void flush() {
+ inner.flush();
+ }
+
@Override
public WindowStoreIterator<V> fetch(final K key,
final long timeFrom,
@@ -1438,6 +1470,17 @@ public class TopologyTestDriver implements Closeable {
inner.commit(changelogOffsets);
}
+ @Override
+ public Long committedOffset(final TopicPartition topicPartition) {
+ return inner.committedOffset(topicPartition);
+ }
+
+ @SuppressWarnings("deprecation")
+ @Override
+ public boolean managesOffsets() {
+ return inner.managesOffsets();
+ }
+
@Override
public void close() {
inner.close();
@@ -1458,6 +1501,15 @@ public class TopologyTestDriver implements Closeable {
return inner.isOpen();
}
+ @Override
+ public <R> QueryResult<R> query(
+ final Query<R> query,
+ final PositionBound positionBound,
+ final QueryConfig config
+ ) {
+ return inner.query(query, positionBound, config);
+ }
+
@Override
public Position getPosition() {
return inner.getPosition();
diff --git
a/streams/test-utils/src/test/java/org/apache/kafka/streams/KeyValueStoreFacadeTest.java
b/streams/test-utils/src/test/java/org/apache/kafka/streams/KeyValueStoreFacadeTest.java
index 3fb7aa7d99b..64be92dd744 100644
---
a/streams/test-utils/src/test/java/org/apache/kafka/streams/KeyValueStoreFacadeTest.java
+++
b/streams/test-utils/src/test/java/org/apache/kafka/streams/KeyValueStoreFacadeTest.java
@@ -17,10 +17,15 @@
package org.apache.kafka.streams;
import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.streams.TopologyTestDriver.KeyValueStoreFacade;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
@@ -32,7 +37,10 @@ import java.util.Map;
import static java.util.Arrays.asList;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
@@ -59,6 +67,13 @@ public class KeyValueStoreFacadeTest {
verify(mockedKeyValueTimestampStore).init(context, store);
}
+ @Deprecated
+ @Test
+ public void shouldForwardFlush() {
+ keyValueStoreFacade.flush();
+ verify(mockedKeyValueTimestampStore).flush();
+ }
+
@Test
public void shouldPutWithUnknownTimestamp() {
keyValueStoreFacade.put("key", "value");
@@ -106,6 +121,24 @@ public class KeyValueStoreFacadeTest {
verify(mockedKeyValueTimestampStore).commit(Map.of());
}
+ @Test
+ public void shouldReturnCommitOffsets() {
+ final TopicPartition topicPartition = new TopicPartition("topic", 0);
+
when(mockedKeyValueTimestampStore.committedOffset(any())).thenReturn(42L);
+
+ assertEquals(42L, keyValueStoreFacade.committedOffset(topicPartition));
+ verify(mockedKeyValueTimestampStore).committedOffset(topicPartition);
+ }
+
+ @Deprecated
+ @Test
+ public void shouldReturnManagedOffsets() {
+ when(mockedKeyValueTimestampStore.managesOffsets()).thenReturn(true);
+
+ assertTrue(keyValueStoreFacade.managesOffsets());
+ verify(mockedKeyValueTimestampStore).managesOffsets();
+ }
+
@Test
public void shouldForwardClose() {
keyValueStoreFacade.close();
@@ -148,4 +181,21 @@ public class KeyValueStoreFacadeTest {
assertThat(keyValueStoreFacade.getPosition(),
is(Position.emptyPosition()));
verify(mockedKeyValueTimestampStore, times(1)).getPosition();
}
+
+ @Test
+ public void shouldReturnQueryResult() {
+ final Query<Object> query = new Query<>() { };
+ final QueryConfig queryConfig = new QueryConfig(true);
+ final QueryResult<Integer> queryResult = QueryResult.forResult(42);
+ when(mockedKeyValueTimestampStore.<Integer>query(any(), any(),
any())).thenReturn(queryResult);
+
+ assertThat(
+ keyValueStoreFacade.query(
+ query,
+ PositionBound.unbounded(),
+ queryConfig
+ ),
+ is(queryResult));
+ verify(mockedKeyValueTimestampStore).query(query,
PositionBound.unbounded(), queryConfig);
+ }
}
diff --git
a/streams/test-utils/src/test/java/org/apache/kafka/streams/WindowStoreFacadeTest.java
b/streams/test-utils/src/test/java/org/apache/kafka/streams/WindowStoreFacadeTest.java
index 9964e5cf6bb..a2cc9560aa9 100644
---
a/streams/test-utils/src/test/java/org/apache/kafka/streams/WindowStoreFacadeTest.java
+++
b/streams/test-utils/src/test/java/org/apache/kafka/streams/WindowStoreFacadeTest.java
@@ -17,12 +17,17 @@
package org.apache.kafka.streams;
import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.streams.TopologyTestDriver.WindowStoreFacade;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.TimeWindow;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.TimestampedWindowStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
@@ -36,6 +41,9 @@ import java.util.Map;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@@ -62,6 +70,13 @@ public class WindowStoreFacadeTest {
.init(context, store);
}
+ @Deprecated
+ @Test
+ public void shouldForwardFlush() {
+ windowStoreFacade.flush();
+ verify(mockedWindowTimestampStore).flush();
+ }
+
@Test
public void shouldPutWindowStartTimestampWithUnknownTimestamp() {
windowStoreFacade.put("key", "value", 21L);
@@ -75,6 +90,24 @@ public class WindowStoreFacadeTest {
verify(mockedWindowTimestampStore).commit(Map.of());
}
+ @Test
+ public void shouldReturnCommitOffsets() {
+ final TopicPartition topicPartition = new TopicPartition("topic", 0);
+
when(mockedWindowTimestampStore.committedOffset(any())).thenReturn(42L);
+
+ assertEquals(42L, windowStoreFacade.committedOffset(topicPartition));
+ verify(mockedWindowTimestampStore).committedOffset(topicPartition);
+ }
+
+ @Deprecated
+ @Test
+ public void shouldReturnManagedOffsets() {
+ when(mockedWindowTimestampStore.managesOffsets()).thenReturn(true);
+
+ assertTrue(windowStoreFacade.managesOffsets());
+ verify(mockedWindowTimestampStore).managesOffsets();
+ }
+
@Test
public void shouldForwardClose() {
windowStoreFacade.close();
@@ -109,15 +142,6 @@ public class WindowStoreFacadeTest {
verify(mockedWindowTimestampStore, times(2)).isOpen();
}
- @Test
- public void shouldReturnPosition() {
- when(mockedWindowTimestampStore.getPosition())
- .thenReturn(Position.emptyPosition());
-
- assertThat(windowStoreFacade.getPosition(),
is(Position.emptyPosition()));
- verify(mockedWindowTimestampStore, times(1)).getPosition();
- }
-
@Test
public void shouldFetchTimeRangeAndConvertValues() {
@SuppressWarnings("unchecked")
@@ -173,4 +197,29 @@ public class WindowStoreFacadeTest {
}
}
+ @Test
+ public void shouldReturnPosition() {
+ when(mockedWindowTimestampStore.getPosition())
+ .thenReturn(Position.emptyPosition());
+
+ assertThat(windowStoreFacade.getPosition(),
is(Position.emptyPosition()));
+ verify(mockedWindowTimestampStore).getPosition();
+ }
+
+ @Test
+ public void shouldReturnQueryResult() {
+ final Query<Object> query = new Query<>() { };
+ final QueryConfig queryConfig = new QueryConfig(true);
+ final QueryResult<Integer> queryResult = QueryResult.forResult(42);
+ when(mockedWindowTimestampStore.<Integer>query(any(), any(),
any())).thenReturn(queryResult);
+
+ assertThat(
+ windowStoreFacade.query(
+ query,
+ PositionBound.unbounded(),
+ queryConfig
+ ),
+ is(queryResult));
+ verify(mockedWindowTimestampStore).query(query,
PositionBound.unbounded(), queryConfig);
+ }
}