This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 13df7ed5954 KAFKA-20308: Update TopologyTestDriver to support for 
headers-aware stores (#21745)
13df7ed5954 is described below

commit 13df7ed5954f058bdc4fee821e4bd127b469dc1b
Author: Alieh Saeedi <[email protected]>
AuthorDate: Sun Mar 15 18:54:53 2026 +0100

    KAFKA-20308: Update TopologyTestDriver to support for headers-aware stores 
(#21745)
    
    This PR adds full TopologyTestDriver support for headers-aware state
    stores, enabling tests to both access headers and use the simple
    non-headers API automatically.
    
    Reviewers: Matthias J. Sax <[email protected]>
---
 .../apache/kafka/streams/TopologyTestDriver.java   | 430 +++++++++++++++++++++
 1 file changed, 430 insertions(+)

diff --git 
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
 
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index beae91966fc..85023473005 100644
--- 
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++ 
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -76,6 +76,10 @@ import org.apache.kafka.streams.processor.internals.Task;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
 import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
 import org.apache.kafka.streams.state.AggregationWithHeaders;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
@@ -1012,6 +1016,10 @@ public class TopologyTestDriver implements Closeable {
     @SuppressWarnings("unchecked")
     public <K, V> KeyValueStore<K, V> getKeyValueStore(final String name) {
         final StateStore store = getStateStore(name, false);
+        if (store instanceof TimestampedKeyValueStoreWithHeaders) {
+            log.warn("Method #getTimestampedKeyValueStore() should be used to 
access a TimestampedKeyValueStoreWithHeaders.");
+            return new 
KeyValueStoreFacadeForHeaders<>((TimestampedKeyValueStoreWithHeaders<K, V>) 
store);
+        }
         if (store instanceof TimestampedKeyValueStore) {
             log.warn("Method #getTimestampedKeyValueStore() should be used to 
access a TimestampedKeyValueStore.");
             return new KeyValueStoreFacade<>((TimestampedKeyValueStore<K, V>) 
store);
@@ -1041,6 +1049,9 @@ public class TopologyTestDriver implements Closeable {
     @SuppressWarnings("unchecked")
     public <K, V> KeyValueStore<K, ValueAndTimestamp<V>> 
getTimestampedKeyValueStore(final String name) {
         final StateStore store = getStateStore(name, false);
+        if (store instanceof TimestampedKeyValueStoreWithHeaders) {
+            return new 
TimestampedKeyValueStoreFacadeForHeaders<>((TimestampedKeyValueStoreWithHeaders<K,
 V>) store);
+        }
         return store instanceof TimestampedKeyValueStore ? 
(TimestampedKeyValueStore<K, V>) store : null;
     }
 
@@ -1121,6 +1132,10 @@ public class TopologyTestDriver implements Closeable {
     @SuppressWarnings("unchecked")
     public <K, V> WindowStore<K, V> getWindowStore(final String name) {
         final StateStore store = getStateStore(name, false);
+        if (store instanceof TimestampedWindowStoreWithHeaders) {
+            log.warn("Method #getTimestampedWindowStoreWithHeaders() should be 
used to access a TimestampedWindowStoreWithHeaders.");
+            return new 
WindowStoreFacadeForHeaders<>((TimestampedWindowStoreWithHeaders<K, V>) store);
+        }
         if (store instanceof TimestampedWindowStore) {
             log.warn("Method #getTimestampedWindowStore() should be used to 
access a TimestampedWindowStore.");
             return new WindowStoreFacade<>((TimestampedWindowStore<K, V>) 
store);
@@ -1149,6 +1164,9 @@ public class TopologyTestDriver implements Closeable {
     @SuppressWarnings("unchecked")
     public <K, V> WindowStore<K, ValueAndTimestamp<V>> 
getTimestampedWindowStore(final String name) {
         final StateStore store = getStateStore(name, false);
+        if (store instanceof TimestampedWindowStoreWithHeaders) {
+            return new 
TimestampedWindowStoreFacadeForHeaders<>((TimestampedWindowStoreWithHeaders<K, 
V>) store);
+        }
         return store instanceof TimestampedWindowStore ? 
(TimestampedWindowStore<K, V>) store : null;
     }
 
@@ -1446,6 +1464,216 @@ public class TopologyTestDriver implements Closeable {
         }
     }
 
+    static class WindowStoreFacadeForHeaders<K, V> extends 
GenericReadOnlyWindowStoreFacade<K, ValueTimestampHeaders<V>, V> implements 
WindowStore<K, V> {
+        private final TimestampedWindowStoreWithHeaders<K, V> inner;
+
+        public WindowStoreFacadeForHeaders(final 
TimestampedWindowStoreWithHeaders<K, V> store) {
+            super(store, ValueConverters.extractValueFromHeaders());
+            this.inner = store;
+        }
+
+        @Override
+        public void init(final StateStoreContext stateStoreContext, final 
StateStore root) {
+            inner.init(stateStoreContext, root);
+        }
+
+        @SuppressWarnings("deprecation")
+        @Override
+        public void flush() {
+            inner.flush();
+        }
+
+        @Override
+        public WindowStoreIterator<V> fetch(final K key,
+                                            final long timeFrom,
+                                            final long timeTo) {
+            return fetch(key, Instant.ofEpochMilli(timeFrom), 
Instant.ofEpochMilli(timeTo));
+        }
+
+        @Override
+        public KeyValueIterator<Windowed<K>, V> fetchAll(final long timeFrom,
+                                                         final long timeTo) {
+            return fetchAll(Instant.ofEpochMilli(timeFrom), 
Instant.ofEpochMilli(timeTo));
+        }
+
+        @Override
+        public KeyValueIterator<Windowed<K>, V> fetch(final K keyFrom,
+                                                      final K keyTo,
+                                                      final long timeFrom,
+                                                      final long timeTo) {
+            return fetch(keyFrom, keyTo, Instant.ofEpochMilli(timeFrom), 
Instant.ofEpochMilli(timeTo));
+        }
+
+        @Override
+        public WindowStoreIterator<V> backwardFetch(final K key,
+                                                    final long timeFrom,
+                                                    final long timeTo) {
+            return backwardFetch(key, Instant.ofEpochMilli(timeFrom), 
Instant.ofEpochMilli(timeTo));
+        }
+
+        @Override
+        public KeyValueIterator<Windowed<K>, V> backwardFetchAll(final long 
timeFrom,
+                                                                 final long 
timeTo) {
+            return backwardFetchAll(Instant.ofEpochMilli(timeFrom), 
Instant.ofEpochMilli(timeTo));
+        }
+
+        @Override
+        public KeyValueIterator<Windowed<K>, V> backwardFetch(final K keyFrom,
+                                                              final K keyTo,
+                                                              final long 
timeFrom,
+                                                              final long 
timeTo) {
+            return backwardFetch(keyFrom, keyTo, 
Instant.ofEpochMilli(timeFrom), Instant.ofEpochMilli(timeTo));
+        }
+
+        @Override
+        public void put(final K key,
+                        final V value,
+                        final long windowStartTimestamp) {
+            inner.put(key, ValueTimestampHeaders.make(value, 
ConsumerRecord.NO_TIMESTAMP, null), windowStartTimestamp);
+        }
+
+        @Override
+        public void commit(final Map<TopicPartition, Long> changelogOffsets) {
+            inner.commit(changelogOffsets);
+        }
+
+        @Override
+        public Long committedOffset(final TopicPartition topicPartition) {
+            return inner.committedOffset(topicPartition);
+        }
+
+        @SuppressWarnings("deprecation")
+        @Override
+        public boolean managesOffsets() {
+            return inner.managesOffsets();
+        }
+
+        @Override
+        public void close() {
+            inner.close();
+        }
+
+        @Override
+        public String name() {
+            return inner.name();
+        }
+
+        @Override
+        public boolean persistent() {
+            return inner.persistent();
+        }
+
+        @Override
+        public boolean isOpen() {
+            return inner.isOpen();
+        }
+
+        @Override
+        public <R> QueryResult<R> query(
+            final Query<R> query,
+            final PositionBound positionBound,
+            final QueryConfig config
+        ) {
+            return inner.query(query, positionBound, config);
+        }
+
+        @Override
+        public Position getPosition() {
+            return inner.getPosition();
+        }
+    }
+
+    static class TimestampedWindowStoreFacadeForHeaders<K, V> extends 
GenericReadOnlyWindowStoreFacade<K, ValueTimestampHeaders<V>, 
ValueAndTimestamp<V>> implements WindowStore<K, ValueAndTimestamp<V>> {
+        private final TimestampedWindowStoreWithHeaders<K, V> inner;
+
+        public TimestampedWindowStoreFacadeForHeaders(final 
TimestampedWindowStoreWithHeaders<K, V> store) {
+            super(store, ValueConverters.headersToValueAndTimestamp());
+            this.inner = store;
+        }
+
+        @Override
+        public void init(final StateStoreContext stateStoreContext, final 
StateStore root) {
+            inner.init(stateStoreContext, root);
+        }
+
+        @SuppressWarnings("deprecation")
+        @Override
+        public void flush() {
+            inner.flush();
+        }
+
+        @Override
+        public void commit(final Map<TopicPartition, Long> changelogOffsets) {
+            inner.commit(changelogOffsets);
+        }
+
+        @Override
+        public Long committedOffset(final TopicPartition topicPartition) {
+            return inner.committedOffset(topicPartition);
+        }
+
+        @SuppressWarnings("deprecation")
+        @Override
+        public boolean managesOffsets() {
+            return inner.managesOffsets();
+        }
+
+        @Override
+        public void close() {
+            inner.close();
+        }
+
+        @Override
+        public String name() {
+            return inner.name();
+        }
+
+        @Override
+        public boolean persistent() {
+            return inner.persistent();
+        }
+
+        @Override
+        public boolean isOpen() {
+            return inner.isOpen();
+        }
+
+        @Override
+        public <R> QueryResult<R> query(
+            final Query<R> query,
+            final PositionBound positionBound,
+            final QueryConfig config
+        ) {
+            return inner.query(query, positionBound, config);
+        }
+
+        @Override
+        public Position getPosition() {
+            return inner.getPosition();
+        }
+
+        @Override
+        public void put(final K key, final ValueAndTimestamp<V> value, final 
long windowStartTimestamp) {
+            inner.put(key, ValueTimestampHeaders.make(value.value(), 
value.timestamp(), null), windowStartTimestamp);
+        }
+
+        @Override
+        public WindowStoreIterator<ValueAndTimestamp<V>> fetch(final K key, 
final long timeFrom, final long timeTo) {
+            return fetch(key, Instant.ofEpochMilli(timeFrom), 
Instant.ofEpochMilli(timeTo));
+        }
+
+        @Override
+        public KeyValueIterator<Windowed<K>, ValueAndTimestamp<V>> fetch(final 
K keyFrom, final K keyTo, final long timeFrom, final long timeTo) {
+            return fetch(keyFrom, keyTo, Instant.ofEpochMilli(timeFrom), 
Instant.ofEpochMilli(timeTo));
+        }
+
+        @Override
+        public KeyValueIterator<Windowed<K>, ValueAndTimestamp<V>> 
fetchAll(final long timeFrom, final long timeTo) {
+            return fetchAll(Instant.ofEpochMilli(timeFrom), 
Instant.ofEpochMilli(timeTo));
+        }
+    }
+
+
     static class SessionStoreFacade<K, V> implements SessionStore<K, V> {
         private final SessionStoreWithHeaders<K, V> inner;
 
@@ -1557,4 +1785,206 @@ public class TopologyTestDriver implements Closeable {
         }
     }
 
+    static class KeyValueStoreFacadeForHeaders<K, V> extends 
GenericReadOnlyKeyValueStoreFacade<K, ValueTimestampHeaders<V>, V> implements 
KeyValueStore<K, V> {
+        private final TimestampedKeyValueStoreWithHeaders<K, V> inner;
+
+        public KeyValueStoreFacadeForHeaders(final 
TimestampedKeyValueStoreWithHeaders<K, V> store) {
+            super(store, ValueConverters.extractValueFromHeaders());
+            this.inner = store;
+        }
+
+        @Override
+        public V get(final K key) {
+            return ValueTimestampHeaders.getValueOrNull(inner.get(key));
+        }
+
+        @Override
+        public void init(final StateStoreContext stateStoreContext, final 
StateStore root) {
+            inner.init(stateStoreContext, root);
+        }
+
+        @SuppressWarnings("deprecation")
+        @Override
+        public void flush() {
+            inner.flush();
+        }
+
+        @Override
+        public void put(final K key, final V value) {
+            inner.put(key, ValueTimestampHeaders.make(value, 
ConsumerRecord.NO_TIMESTAMP, null));
+        }
+
+        @Override
+        public V putIfAbsent(final K key, final V value) {
+            return ValueTimestampHeaders.getValueOrNull(inner.putIfAbsent(key, 
ValueTimestampHeaders.make(value, ConsumerRecord.NO_TIMESTAMP, null)));
+        }
+
+        @Override
+        public void putAll(final List<KeyValue<K, V>> entries) {
+            for (final KeyValue<K, V> entry : entries) {
+                inner.put(entry.key, ValueTimestampHeaders.make(entry.value, 
ConsumerRecord.NO_TIMESTAMP, null));
+            }
+        }
+
+        @Override
+        public V delete(final K key) {
+            return ValueTimestampHeaders.getValueOrNull(inner.delete(key));
+        }
+
+        @Override
+        public long approximateNumEntries() {
+            return inner.approximateNumEntries();
+        }
+
+        @Override
+        public void commit(final Map<TopicPartition, Long> changelogOffsets) {
+            inner.commit(changelogOffsets);
+        }
+
+        @Override
+        public Long committedOffset(final TopicPartition topicPartition) {
+            return inner.committedOffset(topicPartition);
+        }
+
+        @SuppressWarnings("deprecation")
+        @Override
+        public boolean managesOffsets() {
+            return inner.managesOffsets();
+        }
+
+        @Override
+        public void close() {
+            inner.close();
+        }
+
+        @Override
+        public String name() {
+            return inner.name();
+        }
+
+        @Override
+        public boolean persistent() {
+            return inner.persistent();
+        }
+
+        @Override
+        public boolean isOpen() {
+            return inner.isOpen();
+        }
+
+        @Override
+        public <R> QueryResult<R> query(
+            final Query<R> query,
+            final PositionBound positionBound,
+            final QueryConfig config
+        ) {
+            return inner.query(query, positionBound, config);
+        }
+
+        @Override
+        public Position getPosition() {
+            return inner.getPosition();
+        }
+    }
+
+    static class TimestampedKeyValueStoreFacadeForHeaders<K, V> extends 
GenericReadOnlyKeyValueStoreFacade<K, ValueTimestampHeaders<V>, 
ValueAndTimestamp<V>> implements KeyValueStore<K, ValueAndTimestamp<V>> {
+        private final TimestampedKeyValueStoreWithHeaders<K, V> inner;
+
+        public TimestampedKeyValueStoreFacadeForHeaders(final 
TimestampedKeyValueStoreWithHeaders<K, V> store) {
+            super(store, ValueConverters.headersToValueAndTimestamp());
+            this.inner = store;
+        }
+
+        @Override
+        public void init(final StateStoreContext stateStoreContext, final 
StateStore root) {
+            inner.init(stateStoreContext, root);
+        }
+
+        @SuppressWarnings("deprecation")
+        @Override
+        public void flush() {
+            inner.flush();
+        }
+
+        @Override
+        public void put(final K key, final ValueAndTimestamp<V> value) {
+            inner.put(key, ValueTimestampHeaders.make(value.value(), 
value.timestamp(), null));
+        }
+
+        @Override
+        public ValueAndTimestamp<V> get(final K key) {
+            final ValueTimestampHeaders<V> vth = inner.get(key);
+            return vth == null ? null : ValueAndTimestamp.make(vth.value(), 
vth.timestamp());
+        }
+
+        @Override
+        public ValueAndTimestamp<V> putIfAbsent(final K key, final 
ValueAndTimestamp<V> value) {
+            final ValueTimestampHeaders<V> result = inner.putIfAbsent(key, 
ValueTimestampHeaders.make(value.value(), value.timestamp(), null));
+            return result == null ? null : 
ValueAndTimestamp.make(result.value(), result.timestamp());
+        }
+
+        @Override
+        public void putAll(final List<KeyValue<K, ValueAndTimestamp<V>>> 
entries) {
+            for (final KeyValue<K, ValueAndTimestamp<V>> entry : entries) {
+                inner.put(entry.key, 
ValueTimestampHeaders.make(entry.value.value(), entry.value.timestamp(), null));
+            }
+        }
+
+        @Override
+        public ValueAndTimestamp<V> delete(final K key) {
+            final ValueTimestampHeaders<V> result = inner.delete(key);
+            return result == null ? null : 
ValueAndTimestamp.make(result.value(), result.timestamp());
+        }
+
+        @Override
+        public void commit(final Map<TopicPartition, Long> changelogOffsets) {
+            inner.commit(changelogOffsets);
+        }
+
+        @Override
+        public Long committedOffset(final TopicPartition topicPartition) {
+            return inner.committedOffset(topicPartition);
+        }
+
+        @SuppressWarnings("deprecation")
+        @Override
+        public boolean managesOffsets() {
+            return inner.managesOffsets();
+        }
+
+        @Override
+        public void close() {
+            inner.close();
+        }
+
+        @Override
+        public String name() {
+            return inner.name();
+        }
+
+        @Override
+        public boolean persistent() {
+            return inner.persistent();
+        }
+
+        @Override
+        public boolean isOpen() {
+            return inner.isOpen();
+        }
+
+        @Override
+        public <R> QueryResult<R> query(
+            final Query<R> query,
+            final PositionBound positionBound,
+            final QueryConfig config
+        ) {
+            return inner.query(query, positionBound, config);
+        }
+
+        @Override
+        public Position getPosition() {
+            return inner.getPosition();
+        }
+    }
+
 }

Reply via email to