This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 0fe902bed23 KAFKA-20497: Add readOnly(IsolationLevel) to 
MeteredKeyValueStore (#22316)
0fe902bed23 is described below

commit 0fe902bed238eb9b11d6ef36724cd57ccb69dbd0
Author: Nick Telford <[email protected]>
AuthorDate: Tue Jun 2 23:38:13 2026 +0100

    KAFKA-20497: Add readOnly(IsolationLevel) to MeteredKeyValueStore (#22316)
    
    The metered layer must wrap both isolation levels rather than delegating
    the inner store's readOnly view directly: it holds the serde context
    needed to deserialise keys and values, and it owns the sensor
    registration for iterator metrics. The ReadOnlyView inner class applies
    the same key/value deserialisation and sensor instrumentation as the
    public fetch methods so that IQ operations through a readOnly view are
    measured identically to direct queries.
    
    KAFKA-20497
    
    Reviewers: Bill Bejeck <[email protected]>
---
 .../state/internals/MeteredKeyValueStore.java      | 113 +++++++++++++---
 ...MeteredTimestampedKeyValueStoreWithHeaders.java | 125 +++++++++++++++---
 .../state/internals/MeteredKeyValueStoreTest.java  | 147 +++++++++++++++++++++
 3 files changed, 350 insertions(+), 35 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
index 673f887779c..327ba3ff69a 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.IsolationLevel;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.serialization.Serde;
@@ -44,6 +45,7 @@ import org.apache.kafka.streams.query.ResultOrder;
 import org.apache.kafka.streams.query.internals.InternalQueryResultUtil;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
 import org.apache.kafka.streams.state.StateSerdes;
 import org.apache.kafka.streams.state.internals.StoreQueryUtils.QueryHandler;
 import org.apache.kafka.streams.state.internals.metrics.StateStoreMetrics;
@@ -337,8 +339,12 @@ public class MeteredKeyValueStore<K, V>
     @Override
     public V get(final K key) {
         Objects.requireNonNull(key, "key cannot be null");
+        return getInternal(wrapped(), key);
+    }
+
+    private V getInternal(final ReadOnlyKeyValueStore<Bytes, byte[]> store, 
final K key) {
         try {
-            return maybeMeasureLatency(() -> 
deserializeValue(wrapped().get(serializeKey(key))), time, getSensor);
+            return maybeMeasureLatency(() -> 
deserializeValue(store.get(serializeKey(key))), time, getSensor);
         } catch (final ProcessorStateException e) {
             final String message = String.format(e.getMessage(), key);
             throw new ProcessorStateException(message, e);
@@ -392,35 +398,57 @@ public class MeteredKeyValueStore<K, V>
     public <PS extends Serializer<P>, P> KeyValueIterator<K, V> 
prefixScan(final P prefix, final PS prefixKeySerializer) {
         Objects.requireNonNull(prefix, "prefix cannot be null");
         Objects.requireNonNull(prefixKeySerializer, "prefixKeySerializer 
cannot be null");
-        return new MeteredKeyValueStoreIterator(wrapped().prefixScan(prefix, 
prefixKeySerializer), prefixScanSensor);
+        return prefixScanInternal(wrapped(), prefix, prefixKeySerializer);
+    }
+
+    private <PS extends Serializer<P>, P> KeyValueIterator<K, V> 
prefixScanInternal(
+        final ReadOnlyKeyValueStore<Bytes, byte[]> store, final P prefix, 
final PS prefixKeySerializer
+    ) {
+        return meteredKeyValueIterator(store.prefixScan(prefix, 
prefixKeySerializer), prefixScanSensor);
     }
 
     @Override
-    public KeyValueIterator<K, V> range(final K from,
-                                        final K to) {
-        return new MeteredKeyValueStoreIterator(
-            wrapped().range(serializeKey(from), serializeKey(to)),
-            rangeSensor
-        );
+    public KeyValueIterator<K, V> range(final K from, final K to) {
+        return rangeInternal(wrapped(), from, to);
+    }
+
+    private KeyValueIterator<K, V> rangeInternal(
+        final ReadOnlyKeyValueStore<Bytes, byte[]> store, final K from, final 
K to
+    ) {
+        return meteredKeyValueIterator(store.range(serializeKey(from), 
serializeKey(to)), rangeSensor);
     }
 
     @Override
-    public KeyValueIterator<K, V> reverseRange(final K from,
-                                               final K to) {
-        return new MeteredKeyValueStoreIterator(
-            wrapped().reverseRange(serializeKey(from), serializeKey(to)),
-            rangeSensor
-        );
+    public KeyValueIterator<K, V> reverseRange(final K from, final K to) {
+        return reverseRangeInternal(wrapped(), from, to);
+    }
+
+    private KeyValueIterator<K, V> reverseRangeInternal(
+        final ReadOnlyKeyValueStore<Bytes, byte[]> store, final K from, final 
K to
+    ) {
+        return meteredKeyValueIterator(store.reverseRange(serializeKey(from), 
serializeKey(to)), rangeSensor);
     }
 
     @Override
     public KeyValueIterator<K, V> all() {
-        return new MeteredKeyValueStoreIterator(wrapped().all(), allSensor);
+        return allInternal(wrapped());
+    }
+
+    private KeyValueIterator<K, V> allInternal(final 
ReadOnlyKeyValueStore<Bytes, byte[]> store) {
+        return meteredKeyValueIterator(store.all(), allSensor);
     }
 
     @Override
     public KeyValueIterator<K, V> reverseAll() {
-        return new MeteredKeyValueStoreIterator(wrapped().reverseAll(), 
allSensor);
+        return reverseAllInternal(wrapped());
+    }
+
+    private KeyValueIterator<K, V> reverseAllInternal(final 
ReadOnlyKeyValueStore<Bytes, byte[]> store) {
+        return meteredKeyValueIterator(store.reverseAll(), allSensor);
+    }
+
+    private KeyValueIterator<K, V> meteredKeyValueIterator(final 
KeyValueIterator<Bytes, byte[]> iter, final Sensor sensor) {
+        return new MeteredKeyValueStoreIterator(iter, sensor);
     }
 
     @Override
@@ -433,6 +461,59 @@ public class MeteredKeyValueStore<K, V>
         return wrapped().approximateNumEntries();
     }
 
+    @Override
+    public ReadOnlyKeyValueStore<K, V> readOnly(final IsolationLevel 
isolationLevel) {
+        Objects.requireNonNull(isolationLevel, "isolationLevel cannot be 
null");
+        return new ReadOnlyView(wrapped().readOnly(isolationLevel));
+    }
+
+    private final class ReadOnlyView implements ReadOnlyKeyValueStore<K, V> {
+
+        private final ReadOnlyKeyValueStore<Bytes, byte[]> underlying;
+
+        ReadOnlyView(final ReadOnlyKeyValueStore<Bytes, byte[]> underlying) {
+            this.underlying = underlying;
+        }
+
+        @Override
+        public V get(final K key) {
+            Objects.requireNonNull(key, "key cannot be null");
+            return getInternal(underlying, key);
+        }
+
+        @Override
+        public KeyValueIterator<K, V> range(final K from, final K to) {
+            return rangeInternal(underlying, from, to);
+        }
+
+        @Override
+        public KeyValueIterator<K, V> reverseRange(final K from, final K to) {
+            return reverseRangeInternal(underlying, from, to);
+        }
+
+        @Override
+        public KeyValueIterator<K, V> all() {
+            return allInternal(underlying);
+        }
+
+        @Override
+        public KeyValueIterator<K, V> reverseAll() {
+            return reverseAllInternal(underlying);
+        }
+
+        @Override
+        public <PS extends Serializer<P>, P> KeyValueIterator<K, V> 
prefixScan(final P prefix, final PS prefixKeySerializer) {
+            Objects.requireNonNull(prefix, "prefix cannot be null");
+            Objects.requireNonNull(prefixKeySerializer, "prefixKeySerializer 
cannot be null");
+            return prefixScanInternal(underlying, prefix, prefixKeySerializer);
+        }
+
+        @Override
+        public long approximateNumEntries() {
+            return underlying.approximateNumEntries();
+        }
+    }
+
     @Override
     public void close() {
         try {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java
index ebdcc8f73c5..8d10c6ccd25 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.IsolationLevel;
 import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.metrics.Sensor;
@@ -39,6 +40,7 @@ import org.apache.kafka.streams.query.TimestampedRangeQuery;
 import org.apache.kafka.streams.query.internals.InternalQueryResultUtil;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
 import org.apache.kafka.streams.state.TimestampedKeyValueStoreWithHeaders;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
 import org.apache.kafka.streams.state.ValueTimestampHeaders;
@@ -113,8 +115,16 @@ public class MeteredTimestampedKeyValueStoreWithHeaders<K, 
V>
     @Override
     public ValueTimestampHeaders<V> get(final K key) {
         Objects.requireNonNull(key, "key cannot be null");
+        return getInternal(wrapped(), key);
+    }
+
+    private ValueTimestampHeaders<V> getInternal(final 
ReadOnlyKeyValueStore<Bytes, byte[]> store, final K key) {
         try {
-            return maybeMeasureLatency(() -> 
deserializeValue(wrapped().get(serializeKey(key, internalContext.headers()))), 
time, getSensor);
+            return maybeMeasureLatency(
+                () -> deserializeValue(store.get(serializeKey(key, 
internalContext.headers()))),
+                time,
+                getSensor
+            );
         } catch (final ProcessorStateException e) {
             final String message = String.format(e.getMessage(), key);
             throw new ProcessorStateException(message, e);
@@ -463,21 +473,32 @@ public class 
MeteredTimestampedKeyValueStoreWithHeaders<K, V>
     }
 
     @Override
-    public <PS extends Serializer<P>, P> KeyValueIterator<K, 
ValueTimestampHeaders<V>> prefixScan(final P prefix,
-                                                                               
                   final PS prefixKeySerializer) {
+    public <PS extends Serializer<P>, P> KeyValueIterator<K, 
ValueTimestampHeaders<V>> prefixScan(
+        final P prefix, final PS prefixKeySerializer
+    ) {
         Objects.requireNonNull(prefix, "prefix cannot be null");
         Objects.requireNonNull(prefixKeySerializer, "prefixKeySerializer 
cannot be null");
+        return prefixScanInternal(wrapped(), prefix, prefixKeySerializer);
+    }
+
+    private <PS extends Serializer<P>, P> KeyValueIterator<K, 
ValueTimestampHeaders<V>> prefixScanInternal(
+        final ReadOnlyKeyValueStore<Bytes, byte[]> store, final P prefix, 
final PS prefixKeySerializer
+    ) {
         return new MeteredTimestampedKeyValueStoreWithHeadersIterator(
-            wrapped().prefixScan(prefix, prefixKeySerializer),
-            prefixScanSensor
+            store.prefixScan(prefix, prefixKeySerializer), prefixScanSensor
         );
     }
 
     @Override
-    public KeyValueIterator<K, ValueTimestampHeaders<V>> range(final K from,
-                                                               final K to) {
+    public KeyValueIterator<K, ValueTimestampHeaders<V>> range(final K from, 
final K to) {
+        return rangeInternal(wrapped(), from, to);
+    }
+
+    private KeyValueIterator<K, ValueTimestampHeaders<V>> rangeInternal(
+        final ReadOnlyKeyValueStore<Bytes, byte[]> store, final K from, final 
K to
+    ) {
         return new MeteredTimestampedKeyValueStoreWithHeadersIterator(
-            wrapped().range(
+            store.range(
                 serializeKey(from, internalContext.headers()),
                 serializeKey(to, internalContext.headers())
             ),
@@ -486,10 +507,15 @@ public class 
MeteredTimestampedKeyValueStoreWithHeaders<K, V>
     }
 
     @Override
-    public KeyValueIterator<K, ValueTimestampHeaders<V>> reverseRange(final K 
from,
-                                                                      final K 
to) {
+    public KeyValueIterator<K, ValueTimestampHeaders<V>> reverseRange(final K 
from, final K to) {
+        return reverseRangeInternal(wrapped(), from, to);
+    }
+
+    private KeyValueIterator<K, ValueTimestampHeaders<V>> reverseRangeInternal(
+        final ReadOnlyKeyValueStore<Bytes, byte[]> store, final K from, final 
K to
+    ) {
         return new MeteredTimestampedKeyValueStoreWithHeadersIterator(
-            wrapped().reverseRange(
+            store.reverseRange(
                 serializeKey(from, internalContext.headers()),
                 serializeKey(to, internalContext.headers())
             ),
@@ -499,18 +525,24 @@ public class 
MeteredTimestampedKeyValueStoreWithHeaders<K, V>
 
     @Override
     public KeyValueIterator<K, ValueTimestampHeaders<V>> all() {
-        return new MeteredTimestampedKeyValueStoreWithHeadersIterator(
-            wrapped().all(),
-            allSensor
-        );
+        return allInternal(wrapped());
+    }
+
+    private KeyValueIterator<K, ValueTimestampHeaders<V>> allInternal(
+        final ReadOnlyKeyValueStore<Bytes, byte[]> store
+    ) {
+        return new 
MeteredTimestampedKeyValueStoreWithHeadersIterator(store.all(), allSensor);
     }
 
     @Override
     public KeyValueIterator<K, ValueTimestampHeaders<V>> reverseAll() {
-        return new MeteredTimestampedKeyValueStoreWithHeadersIterator(
-            wrapped().reverseAll(),
-            allSensor
-        );
+        return reverseAllInternal(wrapped());
+    }
+
+    private KeyValueIterator<K, ValueTimestampHeaders<V>> reverseAllInternal(
+        final ReadOnlyKeyValueStore<Bytes, byte[]> store
+    ) {
+        return new 
MeteredTimestampedKeyValueStoreWithHeadersIterator(store.reverseAll(), 
allSensor);
     }
 
     @SuppressWarnings("unchecked")
@@ -659,6 +691,61 @@ public class MeteredTimestampedKeyValueStoreWithHeaders<K, 
V>
         }
     }
 
+    @Override
+    public ReadOnlyKeyValueStore<K, ValueTimestampHeaders<V>> readOnly(final 
IsolationLevel isolationLevel) {
+        Objects.requireNonNull(isolationLevel, "isolationLevel cannot be 
null");
+        return new ReadOnlyHeadersView(wrapped().readOnly(isolationLevel));
+    }
+
+    private final class ReadOnlyHeadersView implements 
ReadOnlyKeyValueStore<K, ValueTimestampHeaders<V>> {
+
+        private final ReadOnlyKeyValueStore<Bytes, byte[]> underlying;
+
+        ReadOnlyHeadersView(final ReadOnlyKeyValueStore<Bytes, byte[]> 
underlying) {
+            this.underlying = underlying;
+        }
+
+        @Override
+        public ValueTimestampHeaders<V> get(final K key) {
+            Objects.requireNonNull(key, "key cannot be null");
+            return getInternal(underlying, key);
+        }
+
+        @Override
+        public KeyValueIterator<K, ValueTimestampHeaders<V>> range(final K 
from, final K to) {
+            return rangeInternal(underlying, from, to);
+        }
+
+        @Override
+        public KeyValueIterator<K, ValueTimestampHeaders<V>> 
reverseRange(final K from, final K to) {
+            return reverseRangeInternal(underlying, from, to);
+        }
+
+        @Override
+        public KeyValueIterator<K, ValueTimestampHeaders<V>> all() {
+            return allInternal(underlying);
+        }
+
+        @Override
+        public KeyValueIterator<K, ValueTimestampHeaders<V>> reverseAll() {
+            return reverseAllInternal(underlying);
+        }
+
+        @Override
+        public <PS extends Serializer<P>, P> KeyValueIterator<K, 
ValueTimestampHeaders<V>> prefixScan(
+            final P prefix, final PS prefixKeySerializer
+        ) {
+            Objects.requireNonNull(prefix, "prefix cannot be null");
+            Objects.requireNonNull(prefixKeySerializer, "prefixKeySerializer 
cannot be null");
+            return prefixScanInternal(underlying, prefix, prefixKeySerializer);
+        }
+
+        @Override
+        public long approximateNumEntries() {
+            return underlying.approximateNumEntries();
+        }
+    }
+
     @Override
     protected Bytes serializeKey(final K key) {
         throw new 
UnsupportedOperationException("MeteredTimestampedKeyValueStoreWithHeaders 
required to pass in Headers when serializing a key.");
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
index 2a2a445502f..d5d65e56be5 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.IsolationLevel;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.header.Header;
 import org.apache.kafka.common.header.Headers;
@@ -44,6 +45,7 @@ import 
org.apache.kafka.streams.processor.internals.ProcessorStateManager;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
 import org.apache.kafka.test.InternalMockProcessorContext;
 import org.apache.kafka.test.KeyValueIteratorStub;
 import org.apache.kafka.test.MockRecordCollector;
@@ -636,6 +638,151 @@ public class MeteredKeyValueStoreTest {
         assertThat(oldestIteratorTimestampMetric.metricValue(), equalTo(0L));
     }
 
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldReadOnlyViewGetApplySerdesAndRecordGetMetric() {
+        setUp();
+        final ReadOnlyKeyValueStore<Bytes, byte[]> innerView = 
mock(ReadOnlyKeyValueStore.class);
+        
when(inner.readOnly(IsolationLevel.READ_UNCOMMITTED)).thenReturn(innerView);
+        when(innerView.get(KEY_BYTES)).thenReturn(VALUE_BYTES);
+        init();
+
+        final ReadOnlyKeyValueStore<String, String> view = 
metered.readOnly(IsolationLevel.READ_UNCOMMITTED);
+        assertThat(view.get(KEY), equalTo(VALUE));
+
+        assertTrue((Double) metric("get-rate").metricValue() > 0);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldReadOnlyViewRangeApplySerdesAndRecordRangeMetric() {
+        setUp();
+        final ReadOnlyKeyValueStore<Bytes, byte[]> innerView = 
mock(ReadOnlyKeyValueStore.class);
+        
when(inner.readOnly(IsolationLevel.READ_UNCOMMITTED)).thenReturn(innerView);
+        when(innerView.range(KEY_BYTES, KEY_BYTES))
+            .thenReturn(new 
KeyValueIteratorStub<>(Collections.singletonList(BYTE_KEY_VALUE_PAIR).iterator()));
+        init();
+
+        final ReadOnlyKeyValueStore<String, String> view = 
metered.readOnly(IsolationLevel.READ_UNCOMMITTED);
+        try (final KeyValueIterator<String, String> it = view.range(KEY, KEY)) 
{
+            assertThat(it.next().value, equalTo(VALUE));
+            assertFalse(it.hasNext());
+        }
+
+        assertTrue((Double) metric("range-rate").metricValue() > 0);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void 
shouldReadOnlyViewReverseRangeApplySerdesAndRecordRangeMetric() {
+        setUp();
+        final ReadOnlyKeyValueStore<Bytes, byte[]> innerView = 
mock(ReadOnlyKeyValueStore.class);
+        
when(inner.readOnly(IsolationLevel.READ_UNCOMMITTED)).thenReturn(innerView);
+        when(innerView.reverseRange(KEY_BYTES, KEY_BYTES))
+            .thenReturn(new 
KeyValueIteratorStub<>(Collections.singletonList(BYTE_KEY_VALUE_PAIR).iterator()));
+        init();
+
+        final ReadOnlyKeyValueStore<String, String> view = 
metered.readOnly(IsolationLevel.READ_UNCOMMITTED);
+        try (final KeyValueIterator<String, String> it = 
view.reverseRange(KEY, KEY)) {
+            assertThat(it.next().value, equalTo(VALUE));
+            assertFalse(it.hasNext());
+        }
+
+        assertTrue((Double) metric("range-rate").metricValue() > 0);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldReadOnlyViewAllApplySerdesAndRecordAllMetric() {
+        setUp();
+        final ReadOnlyKeyValueStore<Bytes, byte[]> innerView = 
mock(ReadOnlyKeyValueStore.class);
+        
when(inner.readOnly(IsolationLevel.READ_UNCOMMITTED)).thenReturn(innerView);
+        when(innerView.all())
+            .thenReturn(new 
KeyValueIteratorStub<>(Collections.singletonList(BYTE_KEY_VALUE_PAIR).iterator()));
+        init();
+
+        final ReadOnlyKeyValueStore<String, String> view = 
metered.readOnly(IsolationLevel.READ_UNCOMMITTED);
+        try (final KeyValueIterator<String, String> it = view.all()) {
+            assertThat(it.next().value, equalTo(VALUE));
+            assertFalse(it.hasNext());
+        }
+
+        assertTrue((Double) metric(new MetricName("all-rate", 
STORE_LEVEL_GROUP, "", tags)).metricValue() > 0);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldReadOnlyViewReverseAllApplySerdesAndRecordAllMetric() {
+        setUp();
+        final ReadOnlyKeyValueStore<Bytes, byte[]> innerView = 
mock(ReadOnlyKeyValueStore.class);
+        
when(inner.readOnly(IsolationLevel.READ_UNCOMMITTED)).thenReturn(innerView);
+        when(innerView.reverseAll())
+            .thenReturn(new 
KeyValueIteratorStub<>(Collections.singletonList(BYTE_KEY_VALUE_PAIR).iterator()));
+        init();
+
+        final ReadOnlyKeyValueStore<String, String> view = 
metered.readOnly(IsolationLevel.READ_UNCOMMITTED);
+        try (final KeyValueIterator<String, String> it = view.reverseAll()) {
+            assertThat(it.next().value, equalTo(VALUE));
+            assertFalse(it.hasNext());
+        }
+
+        assertTrue((Double) metric(new MetricName("all-rate", 
STORE_LEVEL_GROUP, "", tags)).metricValue() > 0);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void 
shouldReadOnlyViewPrefixScanApplySerdesAndRecordPrefixScanMetric() {
+        setUp();
+        final ReadOnlyKeyValueStore<Bytes, byte[]> innerView = 
mock(ReadOnlyKeyValueStore.class);
+        final StringSerializer stringSerializer = new StringSerializer();
+        
when(inner.readOnly(IsolationLevel.READ_UNCOMMITTED)).thenReturn(innerView);
+        when(innerView.prefixScan(KEY, stringSerializer))
+            .thenReturn(new 
KeyValueIteratorStub<>(Collections.singletonList(BYTE_KEY_VALUE_PAIR).iterator()));
+        init();
+
+        final ReadOnlyKeyValueStore<String, String> view = 
metered.readOnly(IsolationLevel.READ_UNCOMMITTED);
+        try (final KeyValueIterator<String, String> it = view.prefixScan(KEY, 
stringSerializer)) {
+            assertThat(it.next().value, equalTo(VALUE));
+            assertFalse(it.hasNext());
+        }
+
+        assertTrue((Double) metrics.metric(new MetricName("prefix-scan-rate", 
STORE_LEVEL_GROUP, "", tags)).metricValue() > 0);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldReadOnlyViewApproximateNumEntriesDelegatesToUnderlying() 
{
+        setUp();
+        final ReadOnlyKeyValueStore<Bytes, byte[]> innerView = 
mock(ReadOnlyKeyValueStore.class);
+        
when(inner.readOnly(IsolationLevel.READ_UNCOMMITTED)).thenReturn(innerView);
+        when(innerView.approximateNumEntries()).thenReturn(42L);
+        init();
+
+        final ReadOnlyKeyValueStore<String, String> view = 
metered.readOnly(IsolationLevel.READ_UNCOMMITTED);
+        assertThat(view.approximateNumEntries(), equalTo(42L));
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldPassReadCommittedThroughToInner() {
+        setUp();
+        final ReadOnlyKeyValueStore<Bytes, byte[]> innerView = 
mock(ReadOnlyKeyValueStore.class);
+        
when(inner.readOnly(IsolationLevel.READ_COMMITTED)).thenReturn(innerView);
+        init();
+
+        metered.readOnly(IsolationLevel.READ_COMMITTED);
+
+        verify(inner).readOnly(IsolationLevel.READ_COMMITTED);
+    }
+
+    @Test
+    public void shouldThrowNpeOnNullIsolationLevel() {
+        setUp();
+        init();
+
+        assertThrows(NullPointerException.class, () -> metered.readOnly(null));
+    }
+
     private KafkaMetric metric(final MetricName metricName) {
         return this.metrics.metric(metricName);
     }

Reply via email to