This is an automated email from the ASF dual-hosted git repository.
bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 258183c725b KAFKA-20327: Add overrides to AbstractDecorator classes
(#21791)
258183c725b is described below
commit 258183c725be01ed0c9583a2d435d308b3d5bdff
Author: Bill Bejeck <[email protected]>
AuthorDate: Tue Mar 17 14:47:50 2026 -0400
KAFKA-20327: Add overrides to AbstractDecorator classes (#21791)
This PR adds all overrides for `Session` stores to the
`AbstractReadWriteDecorator` and `AbstractReadOnlyDecorator`
Reviewers: Christo Lolov <[email protected]>
---
.../internals/AbstractReadOnlyDecorator.java | 70 ++++++++++++++++++++++
.../internals/AbstractReadWriteDecorator.java | 65 ++++++++++++++++++++
2 files changed, 135 insertions(+)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadOnlyDecorator.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadOnlyDecorator.java
index 252864e78cd..5fa6f256bd0 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadOnlyDecorator.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadOnlyDecorator.java
@@ -36,6 +36,7 @@ import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.WindowStoreIterator;
import org.apache.kafka.streams.state.internals.WrappedStateStore;
+import java.time.Instant;
import java.util.List;
import java.util.Map;
@@ -327,6 +328,75 @@ abstract class AbstractReadOnlyDecorator<T extends
StateStore, K, V> extends Wra
return wrapped().fetchSession(key, earliestSessionEndTime,
latestSessionStartTime);
}
+ @Override
+ public KeyValueIterator<Windowed<K>, AGG> findSessions(final long
earliestSessionEndTime,
+ final long
latestSessionEndTime) {
+ return wrapped().findSessions(earliestSessionEndTime,
latestSessionEndTime);
+ }
+
+ @Override
+ public AGG fetchSession(final K key,
+ final Instant sessionStartTime,
+ final Instant sessionEndTime) {
+ return wrapped().fetchSession(key, sessionStartTime,
sessionEndTime);
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<K>, AGG> backwardFindSessions(final K
keyFrom,
+ final K
keyTo,
+ final
Instant earliestSessionEndTime,
+ final
Instant latestSessionStartTime) {
+ return wrapped().backwardFindSessions(keyFrom, keyTo,
earliestSessionEndTime, latestSessionStartTime);
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<K>, AGG> findSessions(final K keyFrom,
+ final K keyTo,
+ final Instant
earliestSessionEndTime,
+ final Instant
latestSessionStartTime) {
+ return wrapped().findSessions(keyFrom, keyTo,
earliestSessionEndTime, latestSessionStartTime);
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<K>, AGG> backwardFetch(final K key) {
+ return wrapped().backwardFetch(key);
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<K>, AGG> backwardFetch(final K
keyFrom,
+ final K keyTo)
{
+ return wrapped().backwardFetch(keyFrom, keyTo);
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<K>, AGG> backwardFindSessions(final K
key,
+ final
long earliestSessionEndTime,
+ final
long latestSessionStartTime) {
+ return wrapped().backwardFindSessions(key, earliestSessionEndTime,
latestSessionStartTime);
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<K>, AGG> backwardFindSessions(final K
keyFrom,
+ final K
keyTo,
+ final
long earliestSessionEndTime,
+ final
long latestSessionStartTime) {
+ return wrapped().backwardFindSessions(keyFrom, keyTo,
earliestSessionEndTime, latestSessionStartTime);
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<K>, AGG> backwardFindSessions(final K
key,
+ final
Instant earliestSessionEndTime,
+ final
Instant latestSessionStartTime) {
+ return wrapped().backwardFindSessions(key, earliestSessionEndTime,
latestSessionStartTime);
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<K>, AGG> findSessions(final K key,
+ final Instant
earliestSessionEndTime,
+ final Instant
latestSessionStartTime) {
+ return wrapped().findSessions(key, earliestSessionEndTime,
latestSessionStartTime);
+ }
+
@Override
public KeyValueIterator<Windowed<K>, AGG> fetch(final K key) {
return wrapped().fetch(key);
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadWriteDecorator.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadWriteDecorator.java
index 99a53656fa2..dc7cdd8f346 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadWriteDecorator.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadWriteDecorator.java
@@ -39,6 +39,7 @@ import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.WindowStoreIterator;
import org.apache.kafka.streams.state.internals.WrappedStateStore;
+import java.time.Instant;
import java.util.List;
import java.util.Map;
@@ -326,6 +327,70 @@ abstract class AbstractReadWriteDecorator<T extends
StateStore, K, V> extends Wr
return wrapped().findSessions(earliestSessionEndTime,
latestSessionEndTime);
}
+ @Override
+ public AGG fetchSession(final K key,
+ final Instant sessionStartTime,
+ final Instant sessionEndTime) {
+ return wrapped().fetchSession(key, sessionStartTime,
sessionEndTime);
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<K>, AGG> backwardFindSessions(final K
keyFrom,
+ final K
keyTo,
+ final
Instant earliestSessionEndTime,
+ final
Instant latestSessionStartTime) {
+
+ return wrapped().backwardFindSessions(keyFrom, keyTo,
earliestSessionEndTime, latestSessionStartTime);
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<K>, AGG> findSessions(final K keyFrom,
+ final K keyTo,
+ final Instant
earliestSessionEndTime,
+ final Instant
latestSessionStartTime) {
+ return wrapped().findSessions(keyFrom, keyTo,
earliestSessionEndTime, latestSessionStartTime);
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<K>, AGG> backwardFetch(final K key) {
+ return wrapped().backwardFetch(key);
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<K>, AGG> backwardFetch(final K
keyFrom,
+ final K keyTo)
{
+ return wrapped().backwardFetch(keyFrom, keyTo);
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<K>, AGG> backwardFindSessions(final K
key,
+ final
long earliestSessionEndTime,
+ final
long latestSessionStartTime) {
+ return wrapped().backwardFindSessions(key, earliestSessionEndTime,
latestSessionStartTime);
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<K>, AGG> backwardFindSessions(final K
keyFrom,
+ final K
keyTo,
+ final
long earliestSessionEndTime,
+ final
long latestSessionStartTime) {
+ return wrapped().backwardFindSessions(keyFrom, keyTo,
earliestSessionEndTime, latestSessionStartTime);
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<K>, AGG> backwardFindSessions(final K
key,
+ final
Instant earliestSessionEndTime,
+ final
Instant latestSessionStartTime) {
+ return wrapped().backwardFindSessions(key, earliestSessionEndTime,
latestSessionStartTime);
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<K>, AGG> findSessions(final K key,
+ final Instant
earliestSessionEndTime,
+ final Instant
latestSessionStartTime) {
+ return wrapped().findSessions(key, earliestSessionEndTime,
latestSessionStartTime);
+ }
+
@Override
public void remove(final Windowed<K> sessionKey) {
wrapped().remove(sessionKey);