This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 5008c3b2018 KAFKA-20497: Add readOnly(IsolationLevel) to 
MeteredWindowStore (#22318)
5008c3b2018 is described below

commit 5008c3b20184b4ad0d2ba8de9dc959072131b639
Author: Nick Telford <[email protected]>
AuthorDate: Tue Jun 2 15:49:27 2026 +0100

    KAFKA-20497: Add readOnly(IsolationLevel) to MeteredWindowStore (#22318)
    
    The metered layer must wrap both isolation levels rather than delegating
    the inner store's readOnly view directly: it holds the serde context
    needed to deserialise keys and values, and it owns the sensor
    registration for iterator metrics. The ReadOnlyView applies the same
    key/value deserialisation and sensor instrumentation as the public
    window fetch methods so that IQ operations through a readOnly view are
    measured identically to direct queries.
    
    KAFKA-20497
    
    Reviewers: Bill Bejeck <[email protected]>
---
 .../state/internals/MeteredWindowStore.java        | 208 ++++++++++++---------
 .../state/internals/MeteredWindowStoreTest.java    | 172 +++++++++++++++++
 2 files changed, 296 insertions(+), 84 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
index f4009b9f9ea..31c5dbfc0cf 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.IsolationLevel;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.serialization.Serde;
@@ -41,12 +42,14 @@ import org.apache.kafka.streams.query.WindowKeyQuery;
 import org.apache.kafka.streams.query.WindowRangeQuery;
 import org.apache.kafka.streams.query.internals.InternalQueryResultUtil;
 import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.ReadOnlyWindowStore;
 import org.apache.kafka.streams.state.StateSerdes;
 import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.streams.state.WindowStoreIterator;
 import org.apache.kafka.streams.state.internals.StoreQueryUtils.QueryHandler;
 import org.apache.kafka.streams.state.internals.metrics.StateStoreMetrics;
 
+import java.time.Instant;
 import java.util.Comparator;
 import java.util.Iterator;
 import java.util.Map;
@@ -253,112 +256,62 @@ public class MeteredWindowStore<K, V>
     @Override
     public WindowStoreIterator<V> fetch(final K key, final long timeFrom, 
final long timeTo) {
         Objects.requireNonNull(key, "key cannot be null");
-        return new MeteredWindowStoreIterator<>(
-            wrapped().fetch(serializeKey(key), timeFrom, timeTo),
-            fetchSensor,
-            iteratorDurationSensor,
-            this::deserializeValue,
-            time,
-            numOpenIterators,
-            openIterators
-        );
+        return meteredTimeRangeIterator(wrapped().fetch(serializeKey(key), 
timeFrom, timeTo));
     }
 
     @Override
     public WindowStoreIterator<V> backwardFetch(final K key, final long 
timeFrom, final long timeTo) {
         Objects.requireNonNull(key, "key cannot be null");
-        return new MeteredWindowStoreIterator<>(
-            wrapped().backwardFetch(serializeKey(key), timeFrom, timeTo),
-            fetchSensor,
-            iteratorDurationSensor,
-            this::deserializeValue,
-            time,
-            numOpenIterators,
-            openIterators
-        );
+        return 
meteredTimeRangeIterator(wrapped().backwardFetch(serializeKey(key), timeFrom, 
timeTo));
     }
 
     @Override
-    public KeyValueIterator<Windowed<K>, V> fetch(
-        final K keyFrom,
-        final K keyTo,
-        final long timeFrom,
-        final long timeTo
-    ) {
-        return new MeteredWindowedKeyValueIterator<>(
-            wrapped().fetch(
-                serializeKey(keyFrom),
-                serializeKey(keyTo),
-                timeFrom,
-                timeTo),
-            fetchSensor,
-            iteratorDurationSensor,
-            this::deserializeKey,
-            this::deserializeValue,
-            time,
-            numOpenIterators,
-            openIterators
-        );
+    public KeyValueIterator<Windowed<K>, V> fetch(final K keyFrom,
+                                                  final K keyTo,
+                                                  final long timeFrom,
+                                                  final long timeTo) {
+        return meteredWindowedIterator(wrapped().fetch(serializeKey(keyFrom), 
serializeKey(keyTo), timeFrom, timeTo));
     }
 
     @Override
-    public KeyValueIterator<Windowed<K>, V> backwardFetch(
-        final K keyFrom,
-        final K keyTo,
-        final long timeFrom,
-        final long timeTo
-    ) {
-        return new MeteredWindowedKeyValueIterator<>(
-            wrapped().backwardFetch(
-                serializeKey(keyFrom),
-                serializeKey(keyTo),
-                timeFrom,
-                timeTo),
-            fetchSensor,
-            iteratorDurationSensor,
-            this::deserializeKey,
-            this::deserializeValue,
-            time,
-            numOpenIterators,
-            openIterators
-        );
+    public KeyValueIterator<Windowed<K>, V> backwardFetch(final K keyFrom,
+                                                          final K keyTo,
+                                                          final long timeFrom,
+                                                          final long timeTo) {
+        return 
meteredWindowedIterator(wrapped().backwardFetch(serializeKey(keyFrom), 
serializeKey(keyTo), timeFrom, timeTo));
     }
 
     @Override
     public KeyValueIterator<Windowed<K>, V> fetchAll(final long timeFrom, 
final long timeTo) {
-        return new MeteredWindowedKeyValueIterator<>(
-            wrapped().fetchAll(timeFrom, timeTo),
-            fetchSensor,
-            iteratorDurationSensor,
-            this::deserializeKey,
-            this::deserializeValue,
-            time,
-            numOpenIterators,
-            openIterators
-        );
+        return meteredWindowedIterator(wrapped().fetchAll(timeFrom, timeTo));
     }
 
     @Override
     public KeyValueIterator<Windowed<K>, V> backwardFetchAll(final long 
timeFrom, final long timeTo) {
-        return new MeteredWindowedKeyValueIterator<>(
-            wrapped().backwardFetchAll(timeFrom, timeTo),
-            fetchSensor,
-            iteratorDurationSensor,
-            this::deserializeKey,
-            this::deserializeValue,
-            time,
-            numOpenIterators,
-            openIterators
-        );
+        return meteredWindowedIterator(wrapped().backwardFetchAll(timeFrom, 
timeTo));
     }
 
     @Override
     public KeyValueIterator<Windowed<K>, V> all() {
-        return new MeteredWindowedKeyValueIterator<>(
-            wrapped().all(),
+        return meteredWindowedIterator(wrapped().all());
+    }
+
+    @Override
+    public KeyValueIterator<Windowed<K>, V> backwardAll() {
+        return meteredWindowedIterator(wrapped().backwardAll());
+    }
+
+    @Override
+    public ReadOnlyWindowStore<K, V> readOnly(final IsolationLevel 
isolationLevel) {
+        Objects.requireNonNull(isolationLevel, "isolationLevel cannot be 
null");
+        return new ReadOnlyView(wrapped().readOnly(isolationLevel));
+    }
+
+    private WindowStoreIterator<V> meteredTimeRangeIterator(final 
WindowStoreIterator<byte[]> iter) {
+        return new MeteredWindowStoreIterator<>(
+            iter,
             fetchSensor,
             iteratorDurationSensor,
-            this::deserializeKey,
             this::deserializeValue,
             time,
             numOpenIterators,
@@ -366,10 +319,9 @@ public class MeteredWindowStore<K, V>
         );
     }
 
-    @Override
-    public KeyValueIterator<Windowed<K>, V> backwardAll() {
+    private KeyValueIterator<Windowed<K>, V> meteredWindowedIterator(final 
KeyValueIterator<Windowed<Bytes>, byte[]> iter) {
         return new MeteredWindowedKeyValueIterator<>(
-            wrapped().backwardAll(),
+            iter,
             fetchSensor,
             iteratorDurationSensor,
             this::deserializeKey,
@@ -380,6 +332,94 @@ public class MeteredWindowStore<K, V>
         );
     }
 
+    private final class ReadOnlyView implements ReadOnlyWindowStore<K, V> {
+
+        private final ReadOnlyWindowStore<Bytes, byte[]> underlying;
+
+        ReadOnlyView(final ReadOnlyWindowStore<Bytes, byte[]> underlying) {
+            this.underlying = underlying;
+        }
+
+        @Override
+        public V fetch(final K key, final long windowStartTimestamp) {
+            Objects.requireNonNull(key, "key cannot be null");
+            return maybeMeasureLatency(
+                () -> {
+                    final byte[] result = underlying.fetch(serializeKey(key), 
windowStartTimestamp);
+                    return result == null ? null : deserializeValue(result);
+                },
+                time,
+                fetchSensor
+            );
+        }
+
+        @Override
+        public WindowStoreIterator<V> fetch(
+            final K key,
+            final Instant timeFrom,
+            final Instant timeTo
+        ) {
+            Objects.requireNonNull(key, "key cannot be null");
+            return 
meteredTimeRangeIterator(underlying.fetch(serializeKey(key), timeFrom, timeTo));
+        }
+
+        @Override
+        public WindowStoreIterator<V> backwardFetch(
+            final K key,
+            final Instant timeFrom,
+            final Instant timeTo
+        ) {
+            Objects.requireNonNull(key, "key cannot be null");
+            return 
meteredTimeRangeIterator(underlying.backwardFetch(serializeKey(key), timeFrom, 
timeTo));
+        }
+
+        @Override
+        public KeyValueIterator<Windowed<K>, V> fetch(
+            final K keyFrom,
+            final K keyTo,
+            final Instant timeFrom,
+            final Instant timeTo
+        ) {
+            return 
meteredWindowedIterator(underlying.fetch(serializeKey(keyFrom), 
serializeKey(keyTo), timeFrom, timeTo));
+        }
+
+        @Override
+        public KeyValueIterator<Windowed<K>, V> backwardFetch(
+            final K keyFrom,
+            final K keyTo,
+            final Instant timeFrom,
+            final Instant timeTo
+        ) {
+            return 
meteredWindowedIterator(underlying.backwardFetch(serializeKey(keyFrom), 
serializeKey(keyTo), timeFrom, timeTo));
+        }
+
+        @Override
+        public KeyValueIterator<Windowed<K>, V> all() {
+            return meteredWindowedIterator(underlying.all());
+        }
+
+        @Override
+        public KeyValueIterator<Windowed<K>, V> backwardAll() {
+            return meteredWindowedIterator(underlying.backwardAll());
+        }
+
+        @Override
+        public KeyValueIterator<Windowed<K>, V> fetchAll(
+            final Instant timeFrom,
+            final Instant timeTo
+        ) {
+            return meteredWindowedIterator(underlying.fetchAll(timeFrom, 
timeTo));
+        }
+
+        @Override
+        public KeyValueIterator<Windowed<K>, V> backwardFetchAll(
+            final Instant timeFrom,
+            final Instant timeTo
+        ) {
+            return 
meteredWindowedIterator(underlying.backwardFetchAll(timeFrom, timeTo));
+        }
+    }
+
     @Override
     public void commit(final Map<TopicPartition, Long> changelogOffsets) {
         maybeMeasureLatency(() -> super.commit(changelogOffsets), time, 
commitSensor);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
index 1d15d483886..e1e497bce36 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.IsolationLevel;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.header.Header;
 import org.apache.kafka.common.header.Headers;
@@ -41,6 +42,7 @@ import 
org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
 import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.ReadOnlyWindowStore;
 import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.test.InternalMockProcessorContext;
 import org.apache.kafka.test.MockRecordCollector;
@@ -583,6 +585,176 @@ public class MeteredWindowStoreTest {
         assertThat(oldestIteratorTimestampMetric.metricValue(), equalTo(0L));
     }
 
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldReadOnlyViewFetchPointInTimeApplySerdesAndRecordMetric() 
{
+        final ReadOnlyWindowStore<Bytes, byte[]> innerView = 
mock(ReadOnlyWindowStore.class);
+        
when(innerStoreMock.readOnly(IsolationLevel.READ_UNCOMMITTED)).thenReturn(innerView);
+        when(innerView.fetch(KEY_BYTES, TIMESTAMP)).thenReturn(VALUE_BYTES);
+
+        store.init(context, store);
+
+        final ReadOnlyWindowStore<String, String> view = 
store.readOnly(IsolationLevel.READ_UNCOMMITTED);
+        assertThat(view.fetch(KEY, TIMESTAMP), equalTo(VALUE));
+        assertThat((Double) metric("fetch-rate").metricValue(), 
greaterThan(0.0));
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldReadOnlyViewFetchSingleKeyApplySerdesAndRecordMetric() {
+        final ReadOnlyWindowStore<Bytes, byte[]> innerView = 
mock(ReadOnlyWindowStore.class);
+        
when(innerStoreMock.readOnly(IsolationLevel.READ_UNCOMMITTED)).thenReturn(innerView);
+        when(innerView.fetch(KEY_BYTES, ofEpochMilli(1), ofEpochMilli(1)))
+            .thenReturn(KeyValueIterators.emptyWindowStoreIterator());
+
+        store.init(context, store);
+
+        final ReadOnlyWindowStore<String, String> view = 
store.readOnly(IsolationLevel.READ_UNCOMMITTED);
+        view.fetch(KEY, ofEpochMilli(1), ofEpochMilli(1)).close();
+        assertThat((Double) metric("fetch-rate").metricValue(), 
greaterThan(0.0));
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void 
shouldReadOnlyViewBackwardFetchSingleKeyApplySerdesAndRecordMetric() {
+        final ReadOnlyWindowStore<Bytes, byte[]> innerView = 
mock(ReadOnlyWindowStore.class);
+        
when(innerStoreMock.readOnly(IsolationLevel.READ_UNCOMMITTED)).thenReturn(innerView);
+        when(innerView.backwardFetch(KEY_BYTES, ofEpochMilli(1), 
ofEpochMilli(1)))
+            .thenReturn(KeyValueIterators.emptyWindowStoreIterator());
+
+        store.init(context, store);
+
+        final ReadOnlyWindowStore<String, String> view = 
store.readOnly(IsolationLevel.READ_UNCOMMITTED);
+        view.backwardFetch(KEY, ofEpochMilli(1), ofEpochMilli(1)).close();
+        assertThat((Double) metric("fetch-rate").metricValue(), 
greaterThan(0.0));
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldReadOnlyViewFetchRangeApplySerdesAndRecordMetric() {
+        final ReadOnlyWindowStore<Bytes, byte[]> innerView = 
mock(ReadOnlyWindowStore.class);
+        
when(innerStoreMock.readOnly(IsolationLevel.READ_UNCOMMITTED)).thenReturn(innerView);
+        when(innerView.fetch(KEY_BYTES, KEY_BYTES, ofEpochMilli(1), 
ofEpochMilli(1)))
+            .thenReturn(KeyValueIterators.emptyIterator());
+
+        store.init(context, store);
+
+        final ReadOnlyWindowStore<String, String> view = 
store.readOnly(IsolationLevel.READ_UNCOMMITTED);
+        view.fetch(KEY, KEY, ofEpochMilli(1), ofEpochMilli(1)).close();
+        assertThat((Double) metric("fetch-rate").metricValue(), 
greaterThan(0.0));
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void 
shouldReadOnlyViewBackwardFetchRangeApplySerdesAndRecordMetric() {
+        final ReadOnlyWindowStore<Bytes, byte[]> innerView = 
mock(ReadOnlyWindowStore.class);
+        
when(innerStoreMock.readOnly(IsolationLevel.READ_UNCOMMITTED)).thenReturn(innerView);
+        when(innerView.backwardFetch(KEY_BYTES, KEY_BYTES, ofEpochMilli(1), 
ofEpochMilli(1)))
+            .thenReturn(KeyValueIterators.emptyIterator());
+
+        store.init(context, store);
+
+        final ReadOnlyWindowStore<String, String> view = 
store.readOnly(IsolationLevel.READ_UNCOMMITTED);
+        view.backwardFetch(KEY, KEY, ofEpochMilli(1), ofEpochMilli(1)).close();
+        assertThat((Double) metric("fetch-rate").metricValue(), 
greaterThan(0.0));
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldReadOnlyViewAllApplySerdesAndRecordMetric() {
+        final ReadOnlyWindowStore<Bytes, byte[]> innerView = 
mock(ReadOnlyWindowStore.class);
+        
when(innerStoreMock.readOnly(IsolationLevel.READ_UNCOMMITTED)).thenReturn(innerView);
+        when(innerView.all()).thenReturn(KeyValueIterators.emptyIterator());
+
+        store.init(context, store);
+
+        final ReadOnlyWindowStore<String, String> view = 
store.readOnly(IsolationLevel.READ_UNCOMMITTED);
+        view.all().close();
+        assertThat((Double) metric("fetch-rate").metricValue(), 
greaterThan(0.0));
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldReadOnlyViewBackwardAllApplySerdesAndRecordMetric() {
+        final ReadOnlyWindowStore<Bytes, byte[]> innerView = 
mock(ReadOnlyWindowStore.class);
+        
when(innerStoreMock.readOnly(IsolationLevel.READ_UNCOMMITTED)).thenReturn(innerView);
+        
when(innerView.backwardAll()).thenReturn(KeyValueIterators.emptyIterator());
+
+        store.init(context, store);
+
+        final ReadOnlyWindowStore<String, String> view = 
store.readOnly(IsolationLevel.READ_UNCOMMITTED);
+        view.backwardAll().close();
+        assertThat((Double) metric("fetch-rate").metricValue(), 
greaterThan(0.0));
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldReadOnlyViewFetchAllApplySerdesAndRecordMetric() {
+        final ReadOnlyWindowStore<Bytes, byte[]> innerView = 
mock(ReadOnlyWindowStore.class);
+        
when(innerStoreMock.readOnly(IsolationLevel.READ_UNCOMMITTED)).thenReturn(innerView);
+        when(innerView.fetchAll(ofEpochMilli(1), 
ofEpochMilli(1))).thenReturn(KeyValueIterators.emptyIterator());
+
+        store.init(context, store);
+
+        final ReadOnlyWindowStore<String, String> view = 
store.readOnly(IsolationLevel.READ_UNCOMMITTED);
+        view.fetchAll(ofEpochMilli(1), ofEpochMilli(1)).close();
+        assertThat((Double) metric("fetch-rate").metricValue(), 
greaterThan(0.0));
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldReadOnlyViewBackwardFetchAllApplySerdesAndRecordMetric() 
{
+        final ReadOnlyWindowStore<Bytes, byte[]> innerView = 
mock(ReadOnlyWindowStore.class);
+        
when(innerStoreMock.readOnly(IsolationLevel.READ_UNCOMMITTED)).thenReturn(innerView);
+        when(innerView.backwardFetchAll(ofEpochMilli(1), 
ofEpochMilli(1))).thenReturn(KeyValueIterators.emptyIterator());
+
+        store.init(context, store);
+
+        final ReadOnlyWindowStore<String, String> view = 
store.readOnly(IsolationLevel.READ_UNCOMMITTED);
+        view.backwardFetchAll(ofEpochMilli(1), ofEpochMilli(1)).close();
+        assertThat((Double) metric("fetch-rate").metricValue(), 
greaterThan(0.0));
+    }
+
+    @SuppressWarnings({"unchecked", "unused"})
+    @Test
+    public void shouldRecordOpenIteratorMetricsOnReadOnlyViewIterators() {
+        final ReadOnlyWindowStore<Bytes, byte[]> innerView = 
mock(ReadOnlyWindowStore.class);
+        
when(innerStoreMock.readOnly(IsolationLevel.READ_UNCOMMITTED)).thenReturn(innerView);
+        when(innerView.all()).thenReturn(KeyValueIterators.emptyIterator());
+
+        store.init(context, store);
+
+        final KafkaMetric openIteratorsMetric = metric("num-open-iterators");
+        assertThat((Long) openIteratorsMetric.metricValue(), equalTo(0L));
+
+        final ReadOnlyWindowStore<String, String> view = 
store.readOnly(IsolationLevel.READ_UNCOMMITTED);
+        try (final KeyValueIterator<Windowed<String>, String> unused = 
view.all()) {
+            assertThat((Long) openIteratorsMetric.metricValue(), equalTo(1L));
+        }
+
+        assertThat((Long) openIteratorsMetric.metricValue(), equalTo(0L));
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldPassReadCommittedThroughToInner() {
+        final ReadOnlyWindowStore<Bytes, byte[]> innerView = 
mock(ReadOnlyWindowStore.class);
+        
when(innerStoreMock.readOnly(IsolationLevel.READ_COMMITTED)).thenReturn(innerView);
+
+        store.init(context, store);
+
+        store.readOnly(IsolationLevel.READ_COMMITTED);
+
+        verify(innerStoreMock).readOnly(IsolationLevel.READ_COMMITTED);
+    }
+
+    @Test
+    public void shouldThrowNpeOnNullIsolationLevel() {
+        store.init(context, store);
+
+        assertThrows(NullPointerException.class, () -> store.readOnly(null));
+    }
+
     private KafkaMetric metric(final String name) {
         return metrics.metric(new MetricName(name, STORE_LEVEL_GROUP, "", 
tags));
     }

Reply via email to