This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 30f92ecf221 MINOR: add missing overrides on TTD store facades (#21759)
30f92ecf221 is described below

commit 30f92ecf22167cab8fb36dabf14814fd684c28e4
Author: Matthias J. Sax <[email protected]>
AuthorDate: Sun Mar 15 15:42:40 2026 -0700

    MINOR: add missing overrides on TTD store facades (#21759)
    
    This PR adds missing overloads to TDD store facades from StateStore.
    
    Reviewers: Bill Bejeck <[email protected]>
---
 .../GenericKeyValueIteratorFacadeTest.java         |  2 +-
 .../GenericWindowStoreIteratorFacadeTest.java      |  2 +-
 .../apache/kafka/streams/TopologyTestDriver.java   | 52 +++++++++++++++++
 .../kafka/streams/KeyValueStoreFacadeTest.java     | 50 ++++++++++++++++
 .../kafka/streams/WindowStoreFacadeTest.java       | 67 +++++++++++++++++++---
 5 files changed, 162 insertions(+), 11 deletions(-)

diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/GenericKeyValueIteratorFacadeTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/GenericKeyValueIteratorFacadeTest.java
index 3983ed6057d..f357bf40de3 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/GenericKeyValueIteratorFacadeTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/GenericKeyValueIteratorFacadeTest.java
@@ -83,7 +83,7 @@ public class GenericKeyValueIteratorFacadeTest {
 
     @Test
     public void shouldDelegatePeekNextKey() {
-        when(mockedInnerIterator.peekNextKey()).thenReturn("peekedKey", null);
+        when(mockedInnerIterator.peekNextKey()).thenReturn("peekedKey", 
(String) null);
 
         assertThat(facade.peekNextKey(), is("peekedKey"));
         assertNull(facade.peekNextKey());
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/GenericWindowStoreIteratorFacadeTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/GenericWindowStoreIteratorFacadeTest.java
index 9bbab36e6ae..a989d385b61 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/GenericWindowStoreIteratorFacadeTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/GenericWindowStoreIteratorFacadeTest.java
@@ -84,7 +84,7 @@ public class GenericWindowStoreIteratorFacadeTest {
 
     @Test
     public void shouldDelegatePeekNextKey() {
-        when(mockedInnerIterator.peekNextKey()).thenReturn(100L, null);
+        when(mockedInnerIterator.peekNextKey()).thenReturn(100L, (Long) null);
 
         assertThat(facade.peekNextKey(), is(100L));
         assertNull(facade.peekNextKey());
diff --git 
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
 
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index 85023473005..3db553a6c6f 100644
--- 
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++ 
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -1318,6 +1318,12 @@ public class TopologyTestDriver implements Closeable {
             inner.init(stateStoreContext, root);
         }
 
+        @SuppressWarnings("deprecation")
+        @Override
+        public void flush() {
+            inner.flush();
+        }
+
         @Override
         public void put(final K key, final V value) {
             inner.put(key, ValueAndTimestamp.make(value, 
ConsumerRecord.NO_TIMESTAMP));
@@ -1345,6 +1351,17 @@ public class TopologyTestDriver implements Closeable {
             inner.commit(changelogOffsets);
         }
 
+        @Override
+        public Long committedOffset(final TopicPartition topicPartition) {
+            return inner.committedOffset(topicPartition);
+        }
+
+        @SuppressWarnings("deprecation")
+        @Override
+        public boolean managesOffsets() {
+            return inner.managesOffsets();
+        }
+
         @Override
         public void close() {
             inner.close();
@@ -1365,6 +1382,15 @@ public class TopologyTestDriver implements Closeable {
             return inner.isOpen();
         }
 
+        @Override
+        public <R> QueryResult<R> query(
+            final Query<R> query,
+            final PositionBound positionBound,
+            final QueryConfig config
+        ) {
+            return inner.query(query, positionBound, config);
+        }
+
         @Override
         public Position getPosition() {
             return inner.getPosition();
@@ -1384,6 +1410,12 @@ public class TopologyTestDriver implements Closeable {
             inner.init(stateStoreContext, root);
         }
 
+        @SuppressWarnings("deprecation")
+        @Override
+        public void flush() {
+            inner.flush();
+        }
+
         @Override
         public WindowStoreIterator<V> fetch(final K key,
                                             final long timeFrom,
@@ -1438,6 +1470,17 @@ public class TopologyTestDriver implements Closeable {
             inner.commit(changelogOffsets);
         }
 
+        @Override
+        public Long committedOffset(final TopicPartition topicPartition) {
+            return inner.committedOffset(topicPartition);
+        }
+
+        @SuppressWarnings("deprecation")
+        @Override
+        public boolean managesOffsets() {
+            return inner.managesOffsets();
+        }
+
         @Override
         public void close() {
             inner.close();
@@ -1458,6 +1501,15 @@ public class TopologyTestDriver implements Closeable {
             return inner.isOpen();
         }
 
+        @Override
+        public <R> QueryResult<R> query(
+            final Query<R> query,
+            final PositionBound positionBound,
+            final QueryConfig config
+        ) {
+            return inner.query(query, positionBound, config);
+        }
+
         @Override
         public Position getPosition() {
             return inner.getPosition();
diff --git 
a/streams/test-utils/src/test/java/org/apache/kafka/streams/KeyValueStoreFacadeTest.java
 
b/streams/test-utils/src/test/java/org/apache/kafka/streams/KeyValueStoreFacadeTest.java
index 3fb7aa7d99b..64be92dd744 100644
--- 
a/streams/test-utils/src/test/java/org/apache/kafka/streams/KeyValueStoreFacadeTest.java
+++ 
b/streams/test-utils/src/test/java/org/apache/kafka/streams/KeyValueStoreFacadeTest.java
@@ -17,10 +17,15 @@
 package org.apache.kafka.streams;
 
 import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.streams.TopologyTestDriver.KeyValueStoreFacade;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
 import org.apache.kafka.streams.state.TimestampedKeyValueStore;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
 
@@ -32,7 +37,10 @@ import java.util.Map;
 import static java.util.Arrays.asList;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
@@ -59,6 +67,13 @@ public class KeyValueStoreFacadeTest {
         verify(mockedKeyValueTimestampStore).init(context, store);
     }
 
+    @Deprecated
+    @Test
+    public void shouldForwardFlush() {
+        keyValueStoreFacade.flush();
+        verify(mockedKeyValueTimestampStore).flush();
+    }
+
     @Test
     public void shouldPutWithUnknownTimestamp() {
         keyValueStoreFacade.put("key", "value");
@@ -106,6 +121,24 @@ public class KeyValueStoreFacadeTest {
         verify(mockedKeyValueTimestampStore).commit(Map.of());
     }
 
+    @Test
+    public void shouldReturnCommitOffsets() {
+        final TopicPartition topicPartition = new TopicPartition("topic", 0);
+        
when(mockedKeyValueTimestampStore.committedOffset(any())).thenReturn(42L);
+
+        assertEquals(42L, keyValueStoreFacade.committedOffset(topicPartition));
+        verify(mockedKeyValueTimestampStore).committedOffset(topicPartition);
+    }
+
+    @Deprecated
+    @Test
+    public void shouldReturnManagedOffsets() {
+        when(mockedKeyValueTimestampStore.managesOffsets()).thenReturn(true);
+
+        assertTrue(keyValueStoreFacade.managesOffsets());
+        verify(mockedKeyValueTimestampStore).managesOffsets();
+    }
+
     @Test
     public void shouldForwardClose() {
         keyValueStoreFacade.close();
@@ -148,4 +181,21 @@ public class KeyValueStoreFacadeTest {
         assertThat(keyValueStoreFacade.getPosition(), 
is(Position.emptyPosition()));
         verify(mockedKeyValueTimestampStore, times(1)).getPosition();
     }
+
+    @Test
+    public void shouldReturnQueryResult() {
+        final Query<Object> query = new Query<>() { };
+        final QueryConfig queryConfig = new QueryConfig(true);
+        final QueryResult<Integer> queryResult = QueryResult.forResult(42);
+        when(mockedKeyValueTimestampStore.<Integer>query(any(), any(), 
any())).thenReturn(queryResult);
+
+        assertThat(
+            keyValueStoreFacade.query(
+                query,
+                PositionBound.unbounded(),
+                queryConfig
+            ),
+            is(queryResult));
+        verify(mockedKeyValueTimestampStore).query(query, 
PositionBound.unbounded(), queryConfig);
+    }
 }
diff --git 
a/streams/test-utils/src/test/java/org/apache/kafka/streams/WindowStoreFacadeTest.java
 
b/streams/test-utils/src/test/java/org/apache/kafka/streams/WindowStoreFacadeTest.java
index 9964e5cf6bb..a2cc9560aa9 100644
--- 
a/streams/test-utils/src/test/java/org/apache/kafka/streams/WindowStoreFacadeTest.java
+++ 
b/streams/test-utils/src/test/java/org/apache/kafka/streams/WindowStoreFacadeTest.java
@@ -17,12 +17,17 @@
 package org.apache.kafka.streams;
 
 import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.streams.TopologyTestDriver.WindowStoreFacade;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.internals.TimeWindow;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.TimestampedWindowStore;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
@@ -36,6 +41,9 @@ import java.util.Map;
 
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -62,6 +70,13 @@ public class WindowStoreFacadeTest {
             .init(context, store);
     }
 
+    @Deprecated
+    @Test
+    public void shouldForwardFlush() {
+        windowStoreFacade.flush();
+        verify(mockedWindowTimestampStore).flush();
+    }
+
     @Test
     public void shouldPutWindowStartTimestampWithUnknownTimestamp() {
         windowStoreFacade.put("key", "value", 21L);
@@ -75,6 +90,24 @@ public class WindowStoreFacadeTest {
         verify(mockedWindowTimestampStore).commit(Map.of());
     }
 
+    @Test
+    public void shouldReturnCommitOffsets() {
+        final TopicPartition topicPartition = new TopicPartition("topic", 0);
+        
when(mockedWindowTimestampStore.committedOffset(any())).thenReturn(42L);
+
+        assertEquals(42L, windowStoreFacade.committedOffset(topicPartition));
+        verify(mockedWindowTimestampStore).committedOffset(topicPartition);
+    }
+
+    @Deprecated
+    @Test
+    public void shouldReturnManagedOffsets() {
+        when(mockedWindowTimestampStore.managesOffsets()).thenReturn(true);
+
+        assertTrue(windowStoreFacade.managesOffsets());
+        verify(mockedWindowTimestampStore).managesOffsets();
+    }
+
     @Test
     public void shouldForwardClose() {
         windowStoreFacade.close();
@@ -109,15 +142,6 @@ public class WindowStoreFacadeTest {
         verify(mockedWindowTimestampStore, times(2)).isOpen();
     }
 
-    @Test
-    public void shouldReturnPosition() {
-        when(mockedWindowTimestampStore.getPosition())
-            .thenReturn(Position.emptyPosition());
-
-        assertThat(windowStoreFacade.getPosition(), 
is(Position.emptyPosition()));
-        verify(mockedWindowTimestampStore, times(1)).getPosition();
-    }
-
     @Test
     public void shouldFetchTimeRangeAndConvertValues() {
         @SuppressWarnings("unchecked")
@@ -173,4 +197,29 @@ public class WindowStoreFacadeTest {
         }
     }
 
+    @Test
+    public void shouldReturnPosition() {
+        when(mockedWindowTimestampStore.getPosition())
+            .thenReturn(Position.emptyPosition());
+
+        assertThat(windowStoreFacade.getPosition(), 
is(Position.emptyPosition()));
+        verify(mockedWindowTimestampStore).getPosition();
+    }
+
+    @Test
+    public void shouldReturnQueryResult() {
+        final Query<Object> query = new Query<>() { };
+        final QueryConfig queryConfig = new QueryConfig(true);
+        final QueryResult<Integer> queryResult = QueryResult.forResult(42);
+        when(mockedWindowTimestampStore.<Integer>query(any(), any(), 
any())).thenReturn(queryResult);
+
+        assertThat(
+            windowStoreFacade.query(
+                query,
+                PositionBound.unbounded(),
+                queryConfig
+            ),
+            is(queryResult));
+        verify(mockedWindowTimestampStore).query(query, 
PositionBound.unbounded(), queryConfig);
+    }
 }

Reply via email to