This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 7891a4efeb0 KAFKA-20497: Add readOnly(IsolationLevel) to 
CachingKeyValueStore (#22312)
7891a4efeb0 is described below

commit 7891a4efeb0f8164ea0039d5995f43264d3a6b9e
Author: Nick Telford <[email protected]>
AuthorDate: Wed Jun 3 14:00:00 2026 +0100

    KAFKA-20497: Add readOnly(IsolationLevel) to CachingKeyValueStore (#22312)
    
    The cache holds uncommitted writes that must not be visible under
    READ_COMMITTED, so that isolation level bypasses the cache entirely and
    delegates straight to the inner store's readOnly view. READ_UNCOMMITTED
    requires a merged view of both cache and store, so a ReadOnlyView inner
    class is introduced that reuses the rangeInternal/allInternal/
    prefixScanInternal helpers extracted from the public methods.
    
    getInternal is refactored to accept an explicit underlying store so that
    the ReadOnlyView can share the same cache-merge logic without
    duplicating the MergedSortedCache wiring, keeping the view consistent
    with the store's own point-lookup semantics as the implementation
    evolves. The null-check, store-open guard, and locking are consolidated
    into getInternal itself so that both the public get() and the
    ReadOnlyView's get() share the same precondition and synchronisation
    behaviour.
    
    KAFKA-20497
    
    Reviewers: Bill Bejeck <[email protected]>
---
 .../state/internals/CachingKeyValueStore.java      | 169 ++++++++++++++-------
 .../CachingInMemoryKeyValueStoreTest.java          | 150 ++++++++++++++++++
 2 files changed, 263 insertions(+), 56 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
index 8ef68e0d06c..b6d85590091 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.IsolationLevel;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.Bytes;
@@ -35,6 +36,7 @@ import org.apache.kafka.streams.query.QueryConfig;
 import org.apache.kafka.streams.query.QueryResult;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -296,7 +298,7 @@ public class CachingKeyValueStore
         lock.writeLock().lock();
         try {
             validateStoreOpen();
-            final byte[] v = getInternal(key);
+            final byte[] v = getInternal(key, wrapped());
             if (v == null) {
                 putInternal(key, value);
             }
@@ -335,97 +337,107 @@ public class CachingKeyValueStore
     }
 
     private byte[] deleteInternal(final Bytes key) {
-        final byte[] v = getInternal(key);
+        final byte[] v = getInternal(key, wrapped());
         putInternal(key, null);
         return v;
     }
 
     @Override
     public byte[] get(final Bytes key) {
+        return getInternal(key, wrapped());
+    }
+
+    private byte[] getInternal(final Bytes key, final 
ReadOnlyKeyValueStore<Bytes, byte[]> underlying) {
         Objects.requireNonNull(key, "key cannot be null");
         validateStoreOpen();
-        final Lock theLock;
-        if (Thread.currentThread().equals(streamThread)) {
-            theLock = lock.writeLock();
-        } else {
-            theLock = lock.readLock();
-        }
+        final Lock theLock = Thread.currentThread().equals(streamThread) ? 
lock.writeLock() : lock.readLock();
         theLock.lock();
         try {
             validateStoreOpen();
-            return getInternal(key);
-        } finally {
-            theLock.unlock();
-        }
-    }
-
-    private byte[] getInternal(final Bytes key) {
-        LRUCacheEntry entry = null;
-        if (internalContext.cache() != null) {
-            entry = internalContext.cache().get(cacheName, key);
-        }
-        if (entry == null) {
-            final byte[] rawValue = wrapped().get(key);
-            if (rawValue == null) {
-                return null;
+            LRUCacheEntry entry = null;
+            if (internalContext.cache() != null) {
+                entry = internalContext.cache().get(cacheName, key);
             }
-            // only update the cache if this call is on the streamThread
-            // as we don't want other threads to trigger an eviction/flush
-            if (Thread.currentThread().equals(streamThread)) {
-                internalContext.cache().put(cacheName, key, new 
LRUCacheEntry(rawValue));
+            if (entry == null) {
+                final byte[] rawValue = underlying.get(key);
+                if (rawValue == null) {
+                    return null;
+                }
+                // only update the cache if this call is on the streamThread
+                // as we don't want other threads to trigger an eviction/flush
+                if (Thread.currentThread().equals(streamThread)) {
+                    internalContext.cache().put(cacheName, key, new 
LRUCacheEntry(rawValue));
+                }
+                return rawValue;
+            } else {
+                return entry.value();
             }
-            return rawValue;
-        } else {
-            return entry.value();
+        } finally {
+            theLock.unlock();
         }
     }
 
     @Override
     public KeyValueIterator<Bytes, byte[]> range(final Bytes from,
                                                  final Bytes to) {
-        if (Objects.nonNull(from) && Objects.nonNull(to) && from.compareTo(to) 
> 0) {
-            LOG.warn("Returning empty iterator for fetch with invalid key 
range: from > to. " +
-                "This may be due to range arguments set in the wrong order, " +
-                "or serdes that don't preserve ordering when lexicographically 
comparing the serialized bytes. " +
-                "Note that the built-in numerical serdes do not follow this 
for negative numbers");
-            return KeyValueIterators.emptyIterator();
-        }
-
-        validateStoreOpen();
-        final KeyValueIterator<Bytes, byte[]> storeIterator = 
wrapped().range(from, to);
-        final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = 
internalContext.cache().range(cacheName, from, to);
-        return new MergedSortedCacheKeyValueBytesStoreIterator(cacheIterator, 
storeIterator, true);
+        return rangeInternal(from, to, wrapped(), true);
     }
 
     @Override
     public KeyValueIterator<Bytes, byte[]> reverseRange(final Bytes from,
                                                         final Bytes to) {
-        if (Objects.nonNull(from) && Objects.nonNull(to) && from.compareTo(to) 
> 0) {
+        return rangeInternal(from, to, wrapped(), false);
+    }
+
+    private KeyValueIterator<Bytes, byte[]> rangeInternal(final Bytes from,
+                                                          final Bytes to,
+                                                          final 
ReadOnlyKeyValueStore<Bytes, byte[]> underlying,
+                                                          final boolean 
forward) {
+        if (from != null && to != null && from.compareTo(to) > 0) {
             LOG.warn("Returning empty iterator for fetch with invalid key 
range: from > to. " +
                 "This may be due to range arguments set in the wrong order, " +
                 "or serdes that don't preserve ordering when lexicographically 
comparing the serialized bytes. " +
                 "Note that the built-in numerical serdes do not follow this 
for negative numbers");
             return KeyValueIterators.emptyIterator();
         }
-
         validateStoreOpen();
-        final KeyValueIterator<Bytes, byte[]> storeIterator = 
wrapped().reverseRange(from, to);
-        final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = 
internalContext.cache().reverseRange(cacheName, from, to);
-        return new MergedSortedCacheKeyValueBytesStoreIterator(cacheIterator, 
storeIterator, false);
+        final KeyValueIterator<Bytes, byte[]> storeIterator = forward ? 
underlying.range(from, to) : underlying.reverseRange(from, to);
+        final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = forward ?
+            internalContext.cache().range(cacheName, from, to) :
+            internalContext.cache().reverseRange(cacheName, from, to);
+        return new MergedSortedCacheKeyValueBytesStoreIterator(cacheIterator, 
storeIterator, forward);
     }
 
     @Override
     public KeyValueIterator<Bytes, byte[]> all() {
+        return allInternal(wrapped(), true);
+    }
+
+    @Override
+    public KeyValueIterator<Bytes, byte[]> reverseAll() {
+        return allInternal(wrapped(), false);
+    }
+
+    private KeyValueIterator<Bytes, byte[]> allInternal(final 
ReadOnlyKeyValueStore<Bytes, byte[]> underlying,
+                                                        final boolean forward) 
{
         validateStoreOpen();
-        final KeyValueIterator<Bytes, byte[]> storeIterator = wrapped().all();
-        final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = 
internalContext.cache().all(cacheName);
-        return new MergedSortedCacheKeyValueBytesStoreIterator(cacheIterator, 
storeIterator, true);
+        final KeyValueIterator<Bytes, byte[]> storeIterator = forward ? 
underlying.all() : underlying.reverseAll();
+        final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = forward ?
+            internalContext.cache().all(cacheName) :
+            internalContext.cache().reverseAll(cacheName);
+        return new MergedSortedCacheKeyValueBytesStoreIterator(cacheIterator, 
storeIterator, forward);
     }
 
     @Override
     public <PS extends Serializer<P>, P> KeyValueIterator<Bytes, byte[]> 
prefixScan(final P prefix, final PS prefixKeySerializer) {
+        return prefixScanInternal(prefix, prefixKeySerializer, wrapped());
+    }
+
+    private <PS extends Serializer<P>, P> KeyValueIterator<Bytes, byte[]> 
prefixScanInternal(final P prefix,
+                                                                               
               final PS prefixKeySerializer,
+                                                                               
               final ReadOnlyKeyValueStore<Bytes, byte[]> underlying) {
         validateStoreOpen();
-        final KeyValueIterator<Bytes, byte[]> storeIterator = 
wrapped().prefixScan(prefix, prefixKeySerializer);
+        final KeyValueIterator<Bytes, byte[]> storeIterator = 
underlying.prefixScan(prefix, prefixKeySerializer);
         final Bytes from = Bytes.wrap(prefixKeySerializer.serialize(null, 
prefix));
         final Bytes to = ByteUtils.increment(from);
         final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = 
internalContext.cache().range(cacheName, from, to, false);
@@ -433,11 +445,56 @@ public class CachingKeyValueStore
     }
 
     @Override
-    public KeyValueIterator<Bytes, byte[]> reverseAll() {
-        validateStoreOpen();
-        final KeyValueIterator<Bytes, byte[]> storeIterator = 
wrapped().reverseAll();
-        final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = 
internalContext.cache().reverseAll(cacheName);
-        return new MergedSortedCacheKeyValueBytesStoreIterator(cacheIterator, 
storeIterator, false);
+    public ReadOnlyKeyValueStore<Bytes, byte[]> readOnly(final IsolationLevel 
isolationLevel) {
+        Objects.requireNonNull(isolationLevel, "isolationLevel cannot be 
null");
+        if (isolationLevel == IsolationLevel.READ_COMMITTED) {
+            return wrapped().readOnly(isolationLevel);
+        }
+        return new ReadOnlyView(wrapped().readOnly(isolationLevel));
+    }
+
+    private final class ReadOnlyView implements ReadOnlyKeyValueStore<Bytes, 
byte[]> {
+
+        private final ReadOnlyKeyValueStore<Bytes, byte[]> underlying;
+
+        ReadOnlyView(final ReadOnlyKeyValueStore<Bytes, byte[]> underlying) {
+            this.underlying = underlying;
+        }
+
+        @Override
+        public byte[] get(final Bytes key) {
+            return getInternal(key, underlying);
+        }
+
+        @Override
+        public KeyValueIterator<Bytes, byte[]> range(final Bytes from, final 
Bytes to) {
+            return rangeInternal(from, to, underlying, true);
+        }
+
+        @Override
+        public KeyValueIterator<Bytes, byte[]> reverseRange(final Bytes from, 
final Bytes to) {
+            return rangeInternal(from, to, underlying, false);
+        }
+
+        @Override
+        public KeyValueIterator<Bytes, byte[]> all() {
+            return allInternal(underlying, true);
+        }
+
+        @Override
+        public KeyValueIterator<Bytes, byte[]> reverseAll() {
+            return allInternal(underlying, false);
+        }
+
+        @Override
+        public <PS extends Serializer<P>, P> KeyValueIterator<Bytes, byte[]> 
prefixScan(final P prefix, final PS prefixKeySerializer) {
+            return prefixScanInternal(prefix, prefixKeySerializer, underlying);
+        }
+
+        @Override
+        public long approximateNumEntries() {
+            return underlying.approximateNumEntries();
+        }
     }
 
     @Override
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemoryKeyValueStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemoryKeyValueStoreTest.java
index 60ba5e0c593..5cd430555f0 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemoryKeyValueStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemoryKeyValueStoreTest.java
@@ -16,12 +16,14 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.IsolationLevel;
 import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.LogCaptureAppender;
 import org.apache.kafka.common.utils.internals.LogContext;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
@@ -31,6 +33,7 @@ import 
org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
 import org.apache.kafka.streams.query.Position;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
 import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.test.InternalMockProcessorContext;
@@ -55,6 +58,7 @@ import static org.apache.kafka.common.utils.Utils.mkMap;
 import static 
org.apache.kafka.streams.state.internals.ThreadCacheTest.memoryCacheEntrySize;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.hasItem;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -629,6 +633,152 @@ public class CachingInMemoryKeyValueStoreTest extends 
AbstractKeyValueStoreTest
         });
     }
 
+    // readOnly(IsolationLevel) tests
+
+    @Test
+    public void shouldReadCommittedBypassCacheForGet() {
+        // cache-only entry should be invisible under READ_COMMITTED
+        store.put(bytesKey("cache-only"), bytesValue("v"));
+        assertEquals(0, underlyingStore.approximateNumEntries());
+        final ReadOnlyKeyValueStore<Bytes, byte[]> view = 
store.readOnly(IsolationLevel.READ_COMMITTED);
+        assertNull(view.get(bytesKey("cache-only")));
+
+        // after flush the entry is in the underlying store and visible
+        store.commit(Map.of());
+        assertNotNull(view.get(bytesKey("cache-only")));
+    }
+
+    @Test
+    public void shouldReadUncommittedViewGetFromCacheOnly() {
+        store.put(bytesKey("c"), bytesValue("cache-val"));
+        assertEquals(0, underlyingStore.approximateNumEntries());
+        final ReadOnlyKeyValueStore<Bytes, byte[]> view = 
store.readOnly(IsolationLevel.READ_UNCOMMITTED);
+        assertThat(view.get(bytesKey("c")), equalTo(bytesValue("cache-val")));
+    }
+
+    @Test
+    public void shouldReadUncommittedViewGetFromStoreOnly() {
+        underlyingStore.put(bytesKey("s"), bytesValue("store-val"));
+        final ReadOnlyKeyValueStore<Bytes, byte[]> view = 
store.readOnly(IsolationLevel.READ_UNCOMMITTED);
+        assertThat(view.get(bytesKey("s")), equalTo(bytesValue("store-val")));
+    }
+
+    @Test
+    public void shouldReadUncommittedViewGetCacheShadowsStore() {
+        underlyingStore.put(bytesKey("k"), bytesValue("store-val"));
+        store.put(bytesKey("k"), bytesValue("cache-val"));
+        final ReadOnlyKeyValueStore<Bytes, byte[]> view = 
store.readOnly(IsolationLevel.READ_UNCOMMITTED);
+        assertThat(view.get(bytesKey("k")), equalTo(bytesValue("cache-val")));
+    }
+
+    @Test
+    public void shouldReadUncommittedViewRange() {
+        underlyingStore.put(bytesKey("a"), bytesValue("1"));
+        store.put(bytesKey("b"), bytesValue("2"));
+        underlyingStore.put(bytesKey("c"), bytesValue("3"));
+        final ReadOnlyKeyValueStore<Bytes, byte[]> view = 
store.readOnly(IsolationLevel.READ_UNCOMMITTED);
+        final List<Bytes> keys = new ArrayList<>();
+        try (final KeyValueIterator<Bytes, byte[]> it = 
view.range(bytesKey("a"), bytesKey("c"))) {
+            while (it.hasNext()) {
+                keys.add(it.next().key);
+            }
+        }
+        assertEquals(Arrays.asList(bytesKey("a"), bytesKey("b"), 
bytesKey("c")), keys);
+    }
+
+    @Test
+    public void shouldReadUncommittedViewReverseRange() {
+        underlyingStore.put(bytesKey("a"), bytesValue("1"));
+        store.put(bytesKey("b"), bytesValue("2"));
+        underlyingStore.put(bytesKey("c"), bytesValue("3"));
+        final ReadOnlyKeyValueStore<Bytes, byte[]> view = 
store.readOnly(IsolationLevel.READ_UNCOMMITTED);
+        final List<Bytes> keys = new ArrayList<>();
+        try (final KeyValueIterator<Bytes, byte[]> it = 
view.reverseRange(bytesKey("a"), bytesKey("c"))) {
+            while (it.hasNext()) {
+                keys.add(it.next().key);
+            }
+        }
+        assertEquals(Arrays.asList(bytesKey("c"), bytesKey("b"), 
bytesKey("a")), keys);
+    }
+
+    @Test
+    public void shouldReadUncommittedViewAll() {
+        underlyingStore.put(bytesKey("a"), bytesValue("1"));
+        store.put(bytesKey("b"), bytesValue("2"));
+        final ReadOnlyKeyValueStore<Bytes, byte[]> view = 
store.readOnly(IsolationLevel.READ_UNCOMMITTED);
+        final List<Bytes> keys = new ArrayList<>();
+        try (final KeyValueIterator<Bytes, byte[]> it = view.all()) {
+            while (it.hasNext()) {
+                keys.add(it.next().key);
+            }
+        }
+        assertEquals(Arrays.asList(bytesKey("a"), bytesKey("b")), keys);
+    }
+
+    @Test
+    public void shouldReadUncommittedViewReverseAll() {
+        underlyingStore.put(bytesKey("a"), bytesValue("1"));
+        store.put(bytesKey("b"), bytesValue("2"));
+        final ReadOnlyKeyValueStore<Bytes, byte[]> view = 
store.readOnly(IsolationLevel.READ_UNCOMMITTED);
+        final List<Bytes> keys = new ArrayList<>();
+        try (final KeyValueIterator<Bytes, byte[]> it = view.reverseAll()) {
+            while (it.hasNext()) {
+                keys.add(it.next().key);
+            }
+        }
+        assertEquals(Arrays.asList(bytesKey("b"), bytesKey("a")), keys);
+    }
+
+    @Test
+    public void shouldReadUncommittedViewPrefixScan() {
+        underlyingStore.put(bytesKey("foo1"), bytesValue("1"));
+        store.put(bytesKey("foo2"), bytesValue("2"));
+        underlyingStore.put(bytesKey("bar1"), bytesValue("3"));
+        final ReadOnlyKeyValueStore<Bytes, byte[]> view = 
store.readOnly(IsolationLevel.READ_UNCOMMITTED);
+        final List<Bytes> keys = new ArrayList<>();
+        try (final KeyValueIterator<Bytes, byte[]> it = view.prefixScan("foo", 
new StringSerializer())) {
+            while (it.hasNext()) {
+                keys.add(it.next().key);
+            }
+        }
+        assertEquals(Arrays.asList(bytesKey("foo1"), bytesKey("foo2")), keys);
+    }
+
+    @Test
+    public void 
shouldReadUncommittedViewApproximateNumEntriesDelegatesToUnderlying() {
+        underlyingStore.put(bytesKey("a"), bytesValue("1"));
+        store.put(bytesKey("b"), bytesValue("2")); // cache only, not in 
underlying
+        final ReadOnlyKeyValueStore<Bytes, byte[]> view = 
store.readOnly(IsolationLevel.READ_UNCOMMITTED);
+        assertEquals(underlyingStore.approximateNumEntries(), 
view.approximateNumEntries());
+    }
+
+    @Test
+    public void shouldReturnEmptyAndWarnOnInvertedRangeOnOuter() {
+        try (final LogCaptureAppender appender = 
LogCaptureAppender.createAndRegister(CachingKeyValueStore.class);
+             final KeyValueIterator<Bytes, byte[]> it = 
store.range(bytesKey("z"), bytesKey("a"))) {
+            assertFalse(it.hasNext());
+            assertThat(appender.getMessages(), hasItem(
+                "Returning empty iterator for fetch with invalid key range: 
from > to. " +
+                "This may be due to range arguments set in the wrong order, " +
+                "or serdes that don't preserve ordering when lexicographically 
comparing the serialized bytes. " +
+                "Note that the built-in numerical serdes do not follow this 
for negative numbers"));
+        }
+    }
+
+    @Test
+    public void shouldReturnEmptyAndWarnOnInvertedRangeViaView() {
+        final ReadOnlyKeyValueStore<Bytes, byte[]> view = 
store.readOnly(IsolationLevel.READ_UNCOMMITTED);
+        try (final LogCaptureAppender appender = 
LogCaptureAppender.createAndRegister(CachingKeyValueStore.class);
+             final KeyValueIterator<Bytes, byte[]> it = 
view.range(bytesKey("z"), bytesKey("a"))) {
+            assertFalse(it.hasNext());
+            assertThat(appender.getMessages(), hasItem(
+                "Returning empty iterator for fetch with invalid key range: 
from > to. " +
+                "This may be due to range arguments set in the wrong order, " +
+                "or serdes that don't preserve ordering when lexicographically 
comparing the serialized bytes. " +
+                "Note that the built-in numerical serdes do not follow this 
for negative numbers"));
+        }
+    }
+
     private int addItemsToCache() {
         long cachedSize = 0;
         int i = 0;

Reply via email to