This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 13df7ed5954 KAFKA-20308: Update TopologyTestDriver to support for
headers-aware stores (#21745)
13df7ed5954 is described below
commit 13df7ed5954f058bdc4fee821e4bd127b469dc1b
Author: Alieh Saeedi <[email protected]>
AuthorDate: Sun Mar 15 18:54:53 2026 +0100
KAFKA-20308: Update TopologyTestDriver to support for headers-aware stores
(#21745)
This PR adds full TopologyTestDriver support for headers-aware state
stores, enabling tests to both access headers and use the simple
non-headers API automatically.
Reviewers: Matthias J. Sax <[email protected]>
---
.../apache/kafka/streams/TopologyTestDriver.java | 430 +++++++++++++++++++++
1 file changed, 430 insertions(+)
diff --git
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index beae91966fc..85023473005 100644
---
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -76,6 +76,10 @@ import org.apache.kafka.streams.processor.internals.Task;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
import org.apache.kafka.streams.state.AggregationWithHeaders;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
@@ -1012,6 +1016,10 @@ public class TopologyTestDriver implements Closeable {
@SuppressWarnings("unchecked")
public <K, V> KeyValueStore<K, V> getKeyValueStore(final String name) {
final StateStore store = getStateStore(name, false);
+ if (store instanceof TimestampedKeyValueStoreWithHeaders) {
+ log.warn("Method #getTimestampedKeyValueStore() should be used to
access a TimestampedKeyValueStoreWithHeaders.");
+ return new
KeyValueStoreFacadeForHeaders<>((TimestampedKeyValueStoreWithHeaders<K, V>)
store);
+ }
if (store instanceof TimestampedKeyValueStore) {
log.warn("Method #getTimestampedKeyValueStore() should be used to
access a TimestampedKeyValueStore.");
return new KeyValueStoreFacade<>((TimestampedKeyValueStore<K, V>)
store);
@@ -1041,6 +1049,9 @@ public class TopologyTestDriver implements Closeable {
@SuppressWarnings("unchecked")
public <K, V> KeyValueStore<K, ValueAndTimestamp<V>>
getTimestampedKeyValueStore(final String name) {
final StateStore store = getStateStore(name, false);
+ if (store instanceof TimestampedKeyValueStoreWithHeaders) {
+ return new
TimestampedKeyValueStoreFacadeForHeaders<>((TimestampedKeyValueStoreWithHeaders<K,
V>) store);
+ }
return store instanceof TimestampedKeyValueStore ?
(TimestampedKeyValueStore<K, V>) store : null;
}
@@ -1121,6 +1132,10 @@ public class TopologyTestDriver implements Closeable {
@SuppressWarnings("unchecked")
public <K, V> WindowStore<K, V> getWindowStore(final String name) {
final StateStore store = getStateStore(name, false);
+ if (store instanceof TimestampedWindowStoreWithHeaders) {
+ log.warn("Method #getTimestampedWindowStoreWithHeaders() should be
used to access a TimestampedWindowStoreWithHeaders.");
+ return new
WindowStoreFacadeForHeaders<>((TimestampedWindowStoreWithHeaders<K, V>) store);
+ }
if (store instanceof TimestampedWindowStore) {
log.warn("Method #getTimestampedWindowStore() should be used to
access a TimestampedWindowStore.");
return new WindowStoreFacade<>((TimestampedWindowStore<K, V>)
store);
@@ -1149,6 +1164,9 @@ public class TopologyTestDriver implements Closeable {
@SuppressWarnings("unchecked")
public <K, V> WindowStore<K, ValueAndTimestamp<V>>
getTimestampedWindowStore(final String name) {
final StateStore store = getStateStore(name, false);
+ if (store instanceof TimestampedWindowStoreWithHeaders) {
+ return new
TimestampedWindowStoreFacadeForHeaders<>((TimestampedWindowStoreWithHeaders<K,
V>) store);
+ }
return store instanceof TimestampedWindowStore ?
(TimestampedWindowStore<K, V>) store : null;
}
@@ -1446,6 +1464,216 @@ public class TopologyTestDriver implements Closeable {
}
}
+ static class WindowStoreFacadeForHeaders<K, V> extends
GenericReadOnlyWindowStoreFacade<K, ValueTimestampHeaders<V>, V> implements
WindowStore<K, V> {
+ private final TimestampedWindowStoreWithHeaders<K, V> inner;
+
+ public WindowStoreFacadeForHeaders(final
TimestampedWindowStoreWithHeaders<K, V> store) {
+ super(store, ValueConverters.extractValueFromHeaders());
+ this.inner = store;
+ }
+
+ @Override
+ public void init(final StateStoreContext stateStoreContext, final
StateStore root) {
+ inner.init(stateStoreContext, root);
+ }
+
+ @SuppressWarnings("deprecation")
+ @Override
+ public void flush() {
+ inner.flush();
+ }
+
+ @Override
+ public WindowStoreIterator<V> fetch(final K key,
+ final long timeFrom,
+ final long timeTo) {
+ return fetch(key, Instant.ofEpochMilli(timeFrom),
Instant.ofEpochMilli(timeTo));
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<K>, V> fetchAll(final long timeFrom,
+ final long timeTo) {
+ return fetchAll(Instant.ofEpochMilli(timeFrom),
Instant.ofEpochMilli(timeTo));
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<K>, V> fetch(final K keyFrom,
+ final K keyTo,
+ final long timeFrom,
+ final long timeTo) {
+ return fetch(keyFrom, keyTo, Instant.ofEpochMilli(timeFrom),
Instant.ofEpochMilli(timeTo));
+ }
+
+ @Override
+ public WindowStoreIterator<V> backwardFetch(final K key,
+ final long timeFrom,
+ final long timeTo) {
+ return backwardFetch(key, Instant.ofEpochMilli(timeFrom),
Instant.ofEpochMilli(timeTo));
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<K>, V> backwardFetchAll(final long
timeFrom,
+ final long
timeTo) {
+ return backwardFetchAll(Instant.ofEpochMilli(timeFrom),
Instant.ofEpochMilli(timeTo));
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<K>, V> backwardFetch(final K keyFrom,
+ final K keyTo,
+ final long
timeFrom,
+ final long
timeTo) {
+ return backwardFetch(keyFrom, keyTo,
Instant.ofEpochMilli(timeFrom), Instant.ofEpochMilli(timeTo));
+ }
+
+ @Override
+ public void put(final K key,
+ final V value,
+ final long windowStartTimestamp) {
+ inner.put(key, ValueTimestampHeaders.make(value,
ConsumerRecord.NO_TIMESTAMP, null), windowStartTimestamp);
+ }
+
+ @Override
+ public void commit(final Map<TopicPartition, Long> changelogOffsets) {
+ inner.commit(changelogOffsets);
+ }
+
+ @Override
+ public Long committedOffset(final TopicPartition topicPartition) {
+ return inner.committedOffset(topicPartition);
+ }
+
+ @SuppressWarnings("deprecation")
+ @Override
+ public boolean managesOffsets() {
+ return inner.managesOffsets();
+ }
+
+ @Override
+ public void close() {
+ inner.close();
+ }
+
+ @Override
+ public String name() {
+ return inner.name();
+ }
+
+ @Override
+ public boolean persistent() {
+ return inner.persistent();
+ }
+
+ @Override
+ public boolean isOpen() {
+ return inner.isOpen();
+ }
+
+ @Override
+ public <R> QueryResult<R> query(
+ final Query<R> query,
+ final PositionBound positionBound,
+ final QueryConfig config
+ ) {
+ return inner.query(query, positionBound, config);
+ }
+
+ @Override
+ public Position getPosition() {
+ return inner.getPosition();
+ }
+ }
+
+ static class TimestampedWindowStoreFacadeForHeaders<K, V> extends
GenericReadOnlyWindowStoreFacade<K, ValueTimestampHeaders<V>,
ValueAndTimestamp<V>> implements WindowStore<K, ValueAndTimestamp<V>> {
+ private final TimestampedWindowStoreWithHeaders<K, V> inner;
+
+ public TimestampedWindowStoreFacadeForHeaders(final
TimestampedWindowStoreWithHeaders<K, V> store) {
+ super(store, ValueConverters.headersToValueAndTimestamp());
+ this.inner = store;
+ }
+
+ @Override
+ public void init(final StateStoreContext stateStoreContext, final
StateStore root) {
+ inner.init(stateStoreContext, root);
+ }
+
+ @SuppressWarnings("deprecation")
+ @Override
+ public void flush() {
+ inner.flush();
+ }
+
+ @Override
+ public void commit(final Map<TopicPartition, Long> changelogOffsets) {
+ inner.commit(changelogOffsets);
+ }
+
+ @Override
+ public Long committedOffset(final TopicPartition topicPartition) {
+ return inner.committedOffset(topicPartition);
+ }
+
+ @SuppressWarnings("deprecation")
+ @Override
+ public boolean managesOffsets() {
+ return inner.managesOffsets();
+ }
+
+ @Override
+ public void close() {
+ inner.close();
+ }
+
+ @Override
+ public String name() {
+ return inner.name();
+ }
+
+ @Override
+ public boolean persistent() {
+ return inner.persistent();
+ }
+
+ @Override
+ public boolean isOpen() {
+ return inner.isOpen();
+ }
+
+ @Override
+ public <R> QueryResult<R> query(
+ final Query<R> query,
+ final PositionBound positionBound,
+ final QueryConfig config
+ ) {
+ return inner.query(query, positionBound, config);
+ }
+
+ @Override
+ public Position getPosition() {
+ return inner.getPosition();
+ }
+
+ @Override
+ public void put(final K key, final ValueAndTimestamp<V> value, final
long windowStartTimestamp) {
+ inner.put(key, ValueTimestampHeaders.make(value.value(),
value.timestamp(), null), windowStartTimestamp);
+ }
+
+ @Override
+ public WindowStoreIterator<ValueAndTimestamp<V>> fetch(final K key,
final long timeFrom, final long timeTo) {
+ return fetch(key, Instant.ofEpochMilli(timeFrom),
Instant.ofEpochMilli(timeTo));
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<K>, ValueAndTimestamp<V>> fetch(final
K keyFrom, final K keyTo, final long timeFrom, final long timeTo) {
+ return fetch(keyFrom, keyTo, Instant.ofEpochMilli(timeFrom),
Instant.ofEpochMilli(timeTo));
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<K>, ValueAndTimestamp<V>>
fetchAll(final long timeFrom, final long timeTo) {
+ return fetchAll(Instant.ofEpochMilli(timeFrom),
Instant.ofEpochMilli(timeTo));
+ }
+ }
+
+
static class SessionStoreFacade<K, V> implements SessionStore<K, V> {
private final SessionStoreWithHeaders<K, V> inner;
@@ -1557,4 +1785,206 @@ public class TopologyTestDriver implements Closeable {
}
}
+ static class KeyValueStoreFacadeForHeaders<K, V> extends
GenericReadOnlyKeyValueStoreFacade<K, ValueTimestampHeaders<V>, V> implements
KeyValueStore<K, V> {
+ private final TimestampedKeyValueStoreWithHeaders<K, V> inner;
+
+ public KeyValueStoreFacadeForHeaders(final
TimestampedKeyValueStoreWithHeaders<K, V> store) {
+ super(store, ValueConverters.extractValueFromHeaders());
+ this.inner = store;
+ }
+
+ @Override
+ public V get(final K key) {
+ return ValueTimestampHeaders.getValueOrNull(inner.get(key));
+ }
+
+ @Override
+ public void init(final StateStoreContext stateStoreContext, final
StateStore root) {
+ inner.init(stateStoreContext, root);
+ }
+
+ @SuppressWarnings("deprecation")
+ @Override
+ public void flush() {
+ inner.flush();
+ }
+
+ @Override
+ public void put(final K key, final V value) {
+ inner.put(key, ValueTimestampHeaders.make(value,
ConsumerRecord.NO_TIMESTAMP, null));
+ }
+
+ @Override
+ public V putIfAbsent(final K key, final V value) {
+ return ValueTimestampHeaders.getValueOrNull(inner.putIfAbsent(key,
ValueTimestampHeaders.make(value, ConsumerRecord.NO_TIMESTAMP, null)));
+ }
+
+ @Override
+ public void putAll(final List<KeyValue<K, V>> entries) {
+ for (final KeyValue<K, V> entry : entries) {
+ inner.put(entry.key, ValueTimestampHeaders.make(entry.value,
ConsumerRecord.NO_TIMESTAMP, null));
+ }
+ }
+
+ @Override
+ public V delete(final K key) {
+ return ValueTimestampHeaders.getValueOrNull(inner.delete(key));
+ }
+
+ @Override
+ public long approximateNumEntries() {
+ return inner.approximateNumEntries();
+ }
+
+ @Override
+ public void commit(final Map<TopicPartition, Long> changelogOffsets) {
+ inner.commit(changelogOffsets);
+ }
+
+ @Override
+ public Long committedOffset(final TopicPartition topicPartition) {
+ return inner.committedOffset(topicPartition);
+ }
+
+ @SuppressWarnings("deprecation")
+ @Override
+ public boolean managesOffsets() {
+ return inner.managesOffsets();
+ }
+
+ @Override
+ public void close() {
+ inner.close();
+ }
+
+ @Override
+ public String name() {
+ return inner.name();
+ }
+
+ @Override
+ public boolean persistent() {
+ return inner.persistent();
+ }
+
+ @Override
+ public boolean isOpen() {
+ return inner.isOpen();
+ }
+
+ @Override
+ public <R> QueryResult<R> query(
+ final Query<R> query,
+ final PositionBound positionBound,
+ final QueryConfig config
+ ) {
+ return inner.query(query, positionBound, config);
+ }
+
+ @Override
+ public Position getPosition() {
+ return inner.getPosition();
+ }
+ }
+
+ static class TimestampedKeyValueStoreFacadeForHeaders<K, V> extends
GenericReadOnlyKeyValueStoreFacade<K, ValueTimestampHeaders<V>,
ValueAndTimestamp<V>> implements KeyValueStore<K, ValueAndTimestamp<V>> {
+ private final TimestampedKeyValueStoreWithHeaders<K, V> inner;
+
+ public TimestampedKeyValueStoreFacadeForHeaders(final
TimestampedKeyValueStoreWithHeaders<K, V> store) {
+ super(store, ValueConverters.headersToValueAndTimestamp());
+ this.inner = store;
+ }
+
+ @Override
+ public void init(final StateStoreContext stateStoreContext, final
StateStore root) {
+ inner.init(stateStoreContext, root);
+ }
+
+ @SuppressWarnings("deprecation")
+ @Override
+ public void flush() {
+ inner.flush();
+ }
+
+ @Override
+ public void put(final K key, final ValueAndTimestamp<V> value) {
+ inner.put(key, ValueTimestampHeaders.make(value.value(),
value.timestamp(), null));
+ }
+
+ @Override
+ public ValueAndTimestamp<V> get(final K key) {
+ final ValueTimestampHeaders<V> vth = inner.get(key);
+ return vth == null ? null : ValueAndTimestamp.make(vth.value(),
vth.timestamp());
+ }
+
+ @Override
+ public ValueAndTimestamp<V> putIfAbsent(final K key, final
ValueAndTimestamp<V> value) {
+ final ValueTimestampHeaders<V> result = inner.putIfAbsent(key,
ValueTimestampHeaders.make(value.value(), value.timestamp(), null));
+ return result == null ? null :
ValueAndTimestamp.make(result.value(), result.timestamp());
+ }
+
+ @Override
+ public void putAll(final List<KeyValue<K, ValueAndTimestamp<V>>>
entries) {
+ for (final KeyValue<K, ValueAndTimestamp<V>> entry : entries) {
+ inner.put(entry.key,
ValueTimestampHeaders.make(entry.value.value(), entry.value.timestamp(), null));
+ }
+ }
+
+ @Override
+ public ValueAndTimestamp<V> delete(final K key) {
+ final ValueTimestampHeaders<V> result = inner.delete(key);
+ return result == null ? null :
ValueAndTimestamp.make(result.value(), result.timestamp());
+ }
+
+ @Override
+ public void commit(final Map<TopicPartition, Long> changelogOffsets) {
+ inner.commit(changelogOffsets);
+ }
+
+ @Override
+ public Long committedOffset(final TopicPartition topicPartition) {
+ return inner.committedOffset(topicPartition);
+ }
+
+ @SuppressWarnings("deprecation")
+ @Override
+ public boolean managesOffsets() {
+ return inner.managesOffsets();
+ }
+
+ @Override
+ public void close() {
+ inner.close();
+ }
+
+ @Override
+ public String name() {
+ return inner.name();
+ }
+
+ @Override
+ public boolean persistent() {
+ return inner.persistent();
+ }
+
+ @Override
+ public boolean isOpen() {
+ return inner.isOpen();
+ }
+
+ @Override
+ public <R> QueryResult<R> query(
+ final Query<R> query,
+ final PositionBound positionBound,
+ final QueryConfig config
+ ) {
+ return inner.query(query, positionBound, config);
+ }
+
+ @Override
+ public Position getPosition() {
+ return inner.getPosition();
+ }
+ }
+
}