This is an automated email from the ASF dual-hosted git repository.
bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 7891a4efeb0 KAFKA-20497: Add readOnly(IsolationLevel) to
CachingKeyValueStore (#22312)
7891a4efeb0 is described below
commit 7891a4efeb0f8164ea0039d5995f43264d3a6b9e
Author: Nick Telford <[email protected]>
AuthorDate: Wed Jun 3 14:00:00 2026 +0100
KAFKA-20497: Add readOnly(IsolationLevel) to CachingKeyValueStore (#22312)
The cache holds uncommitted writes that must not be visible under
READ_COMMITTED, so that isolation level bypasses the cache entirely and
delegates straight to the inner store's readOnly view. READ_UNCOMMITTED
requires a merged view of both cache and store, so a ReadOnlyView inner
class is introduced that reuses the rangeInternal/allInternal/
prefixScanInternal helpers extracted from the public methods.
getInternal is refactored to accept an explicit underlying store so that
the ReadOnlyView can share the same cache-merge logic without
duplicating the MergedSortedCache wiring, keeping the view consistent
with the store's own point-lookup semantics as the implementation
evolves. The null-check, store-open guard, and locking are consolidated
into getInternal itself so that both the public get() and the
ReadOnlyView's get() share the same precondition and synchronisation
behaviour.
KAFKA-20497
Reviewers: Bill Bejeck <[email protected]>
---
.../state/internals/CachingKeyValueStore.java | 169 ++++++++++++++-------
.../CachingInMemoryKeyValueStoreTest.java | 150 ++++++++++++++++++
2 files changed, 263 insertions(+), 56 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
index 8ef68e0d06c..b6d85590091 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.state.internals;
+import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Bytes;
@@ -35,6 +36,7 @@ import org.apache.kafka.streams.query.QueryConfig;
import org.apache.kafka.streams.query.QueryResult;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -296,7 +298,7 @@ public class CachingKeyValueStore
lock.writeLock().lock();
try {
validateStoreOpen();
- final byte[] v = getInternal(key);
+ final byte[] v = getInternal(key, wrapped());
if (v == null) {
putInternal(key, value);
}
@@ -335,97 +337,107 @@ public class CachingKeyValueStore
}
private byte[] deleteInternal(final Bytes key) {
- final byte[] v = getInternal(key);
+ final byte[] v = getInternal(key, wrapped());
putInternal(key, null);
return v;
}
@Override
public byte[] get(final Bytes key) {
+ return getInternal(key, wrapped());
+ }
+
+ private byte[] getInternal(final Bytes key, final
ReadOnlyKeyValueStore<Bytes, byte[]> underlying) {
Objects.requireNonNull(key, "key cannot be null");
validateStoreOpen();
- final Lock theLock;
- if (Thread.currentThread().equals(streamThread)) {
- theLock = lock.writeLock();
- } else {
- theLock = lock.readLock();
- }
+ final Lock theLock = Thread.currentThread().equals(streamThread) ?
lock.writeLock() : lock.readLock();
theLock.lock();
try {
validateStoreOpen();
- return getInternal(key);
- } finally {
- theLock.unlock();
- }
- }
-
- private byte[] getInternal(final Bytes key) {
- LRUCacheEntry entry = null;
- if (internalContext.cache() != null) {
- entry = internalContext.cache().get(cacheName, key);
- }
- if (entry == null) {
- final byte[] rawValue = wrapped().get(key);
- if (rawValue == null) {
- return null;
+ LRUCacheEntry entry = null;
+ if (internalContext.cache() != null) {
+ entry = internalContext.cache().get(cacheName, key);
}
- // only update the cache if this call is on the streamThread
- // as we don't want other threads to trigger an eviction/flush
- if (Thread.currentThread().equals(streamThread)) {
- internalContext.cache().put(cacheName, key, new
LRUCacheEntry(rawValue));
+ if (entry == null) {
+ final byte[] rawValue = underlying.get(key);
+ if (rawValue == null) {
+ return null;
+ }
+ // only update the cache if this call is on the streamThread
+ // as we don't want other threads to trigger an eviction/flush
+ if (Thread.currentThread().equals(streamThread)) {
+ internalContext.cache().put(cacheName, key, new
LRUCacheEntry(rawValue));
+ }
+ return rawValue;
+ } else {
+ return entry.value();
}
- return rawValue;
- } else {
- return entry.value();
+ } finally {
+ theLock.unlock();
}
}
@Override
public KeyValueIterator<Bytes, byte[]> range(final Bytes from,
final Bytes to) {
- if (Objects.nonNull(from) && Objects.nonNull(to) && from.compareTo(to)
> 0) {
- LOG.warn("Returning empty iterator for fetch with invalid key
range: from > to. " +
- "This may be due to range arguments set in the wrong order, " +
- "or serdes that don't preserve ordering when lexicographically
comparing the serialized bytes. " +
- "Note that the built-in numerical serdes do not follow this
for negative numbers");
- return KeyValueIterators.emptyIterator();
- }
-
- validateStoreOpen();
- final KeyValueIterator<Bytes, byte[]> storeIterator =
wrapped().range(from, to);
- final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator =
internalContext.cache().range(cacheName, from, to);
- return new MergedSortedCacheKeyValueBytesStoreIterator(cacheIterator,
storeIterator, true);
+ return rangeInternal(from, to, wrapped(), true);
}
@Override
public KeyValueIterator<Bytes, byte[]> reverseRange(final Bytes from,
final Bytes to) {
- if (Objects.nonNull(from) && Objects.nonNull(to) && from.compareTo(to)
> 0) {
+ return rangeInternal(from, to, wrapped(), false);
+ }
+
+ private KeyValueIterator<Bytes, byte[]> rangeInternal(final Bytes from,
+ final Bytes to,
+ final
ReadOnlyKeyValueStore<Bytes, byte[]> underlying,
+ final boolean
forward) {
+ if (from != null && to != null && from.compareTo(to) > 0) {
LOG.warn("Returning empty iterator for fetch with invalid key
range: from > to. " +
"This may be due to range arguments set in the wrong order, " +
"or serdes that don't preserve ordering when lexicographically
comparing the serialized bytes. " +
"Note that the built-in numerical serdes do not follow this
for negative numbers");
return KeyValueIterators.emptyIterator();
}
-
validateStoreOpen();
- final KeyValueIterator<Bytes, byte[]> storeIterator =
wrapped().reverseRange(from, to);
- final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator =
internalContext.cache().reverseRange(cacheName, from, to);
- return new MergedSortedCacheKeyValueBytesStoreIterator(cacheIterator,
storeIterator, false);
+ final KeyValueIterator<Bytes, byte[]> storeIterator = forward ?
underlying.range(from, to) : underlying.reverseRange(from, to);
+ final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = forward ?
+ internalContext.cache().range(cacheName, from, to) :
+ internalContext.cache().reverseRange(cacheName, from, to);
+ return new MergedSortedCacheKeyValueBytesStoreIterator(cacheIterator,
storeIterator, forward);
}
@Override
public KeyValueIterator<Bytes, byte[]> all() {
+ return allInternal(wrapped(), true);
+ }
+
+ @Override
+ public KeyValueIterator<Bytes, byte[]> reverseAll() {
+ return allInternal(wrapped(), false);
+ }
+
+ private KeyValueIterator<Bytes, byte[]> allInternal(final
ReadOnlyKeyValueStore<Bytes, byte[]> underlying,
+ final boolean forward)
{
validateStoreOpen();
- final KeyValueIterator<Bytes, byte[]> storeIterator = wrapped().all();
- final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator =
internalContext.cache().all(cacheName);
- return new MergedSortedCacheKeyValueBytesStoreIterator(cacheIterator,
storeIterator, true);
+ final KeyValueIterator<Bytes, byte[]> storeIterator = forward ?
underlying.all() : underlying.reverseAll();
+ final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = forward ?
+ internalContext.cache().all(cacheName) :
+ internalContext.cache().reverseAll(cacheName);
+ return new MergedSortedCacheKeyValueBytesStoreIterator(cacheIterator,
storeIterator, forward);
}
@Override
public <PS extends Serializer<P>, P> KeyValueIterator<Bytes, byte[]>
prefixScan(final P prefix, final PS prefixKeySerializer) {
+ return prefixScanInternal(prefix, prefixKeySerializer, wrapped());
+ }
+
+ private <PS extends Serializer<P>, P> KeyValueIterator<Bytes, byte[]>
prefixScanInternal(final P prefix,
+
final PS prefixKeySerializer,
+
final ReadOnlyKeyValueStore<Bytes, byte[]> underlying) {
validateStoreOpen();
- final KeyValueIterator<Bytes, byte[]> storeIterator =
wrapped().prefixScan(prefix, prefixKeySerializer);
+ final KeyValueIterator<Bytes, byte[]> storeIterator =
underlying.prefixScan(prefix, prefixKeySerializer);
final Bytes from = Bytes.wrap(prefixKeySerializer.serialize(null,
prefix));
final Bytes to = ByteUtils.increment(from);
final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator =
internalContext.cache().range(cacheName, from, to, false);
@@ -433,11 +445,56 @@ public class CachingKeyValueStore
}
@Override
- public KeyValueIterator<Bytes, byte[]> reverseAll() {
- validateStoreOpen();
- final KeyValueIterator<Bytes, byte[]> storeIterator =
wrapped().reverseAll();
- final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator =
internalContext.cache().reverseAll(cacheName);
- return new MergedSortedCacheKeyValueBytesStoreIterator(cacheIterator,
storeIterator, false);
+ public ReadOnlyKeyValueStore<Bytes, byte[]> readOnly(final IsolationLevel
isolationLevel) {
+ Objects.requireNonNull(isolationLevel, "isolationLevel cannot be
null");
+ if (isolationLevel == IsolationLevel.READ_COMMITTED) {
+ return wrapped().readOnly(isolationLevel);
+ }
+ return new ReadOnlyView(wrapped().readOnly(isolationLevel));
+ }
+
+ private final class ReadOnlyView implements ReadOnlyKeyValueStore<Bytes,
byte[]> {
+
+ private final ReadOnlyKeyValueStore<Bytes, byte[]> underlying;
+
+ ReadOnlyView(final ReadOnlyKeyValueStore<Bytes, byte[]> underlying) {
+ this.underlying = underlying;
+ }
+
+ @Override
+ public byte[] get(final Bytes key) {
+ return getInternal(key, underlying);
+ }
+
+ @Override
+ public KeyValueIterator<Bytes, byte[]> range(final Bytes from, final
Bytes to) {
+ return rangeInternal(from, to, underlying, true);
+ }
+
+ @Override
+ public KeyValueIterator<Bytes, byte[]> reverseRange(final Bytes from,
final Bytes to) {
+ return rangeInternal(from, to, underlying, false);
+ }
+
+ @Override
+ public KeyValueIterator<Bytes, byte[]> all() {
+ return allInternal(underlying, true);
+ }
+
+ @Override
+ public KeyValueIterator<Bytes, byte[]> reverseAll() {
+ return allInternal(underlying, false);
+ }
+
+ @Override
+ public <PS extends Serializer<P>, P> KeyValueIterator<Bytes, byte[]>
prefixScan(final P prefix, final PS prefixKeySerializer) {
+ return prefixScanInternal(prefix, prefixKeySerializer, underlying);
+ }
+
+ @Override
+ public long approximateNumEntries() {
+ return underlying.approximateNumEntries();
+ }
}
@Override
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemoryKeyValueStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemoryKeyValueStoreTest.java
index 60ba5e0c593..5cd430555f0 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemoryKeyValueStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemoryKeyValueStoreTest.java
@@ -16,12 +16,14 @@
*/
package org.apache.kafka.streams.state.internals;
+import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.LogCaptureAppender;
import org.apache.kafka.common.utils.internals.LogContext;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
@@ -31,6 +33,7 @@ import
org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.query.Position;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.test.InternalMockProcessorContext;
@@ -55,6 +58,7 @@ import static org.apache.kafka.common.utils.Utils.mkMap;
import static
org.apache.kafka.streams.state.internals.ThreadCacheTest.memoryCacheEntrySize;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.hasItem;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -629,6 +633,152 @@ public class CachingInMemoryKeyValueStoreTest extends
AbstractKeyValueStoreTest
});
}
+ // readOnly(IsolationLevel) tests
+
+ @Test
+ public void shouldReadCommittedBypassCacheForGet() {
+ // cache-only entry should be invisible under READ_COMMITTED
+ store.put(bytesKey("cache-only"), bytesValue("v"));
+ assertEquals(0, underlyingStore.approximateNumEntries());
+ final ReadOnlyKeyValueStore<Bytes, byte[]> view =
store.readOnly(IsolationLevel.READ_COMMITTED);
+ assertNull(view.get(bytesKey("cache-only")));
+
+ // after flush the entry is in the underlying store and visible
+ store.commit(Map.of());
+ assertNotNull(view.get(bytesKey("cache-only")));
+ }
+
+ @Test
+ public void shouldReadUncommittedViewGetFromCacheOnly() {
+ store.put(bytesKey("c"), bytesValue("cache-val"));
+ assertEquals(0, underlyingStore.approximateNumEntries());
+ final ReadOnlyKeyValueStore<Bytes, byte[]> view =
store.readOnly(IsolationLevel.READ_UNCOMMITTED);
+ assertThat(view.get(bytesKey("c")), equalTo(bytesValue("cache-val")));
+ }
+
+ @Test
+ public void shouldReadUncommittedViewGetFromStoreOnly() {
+ underlyingStore.put(bytesKey("s"), bytesValue("store-val"));
+ final ReadOnlyKeyValueStore<Bytes, byte[]> view =
store.readOnly(IsolationLevel.READ_UNCOMMITTED);
+ assertThat(view.get(bytesKey("s")), equalTo(bytesValue("store-val")));
+ }
+
+ @Test
+ public void shouldReadUncommittedViewGetCacheShadowsStore() {
+ underlyingStore.put(bytesKey("k"), bytesValue("store-val"));
+ store.put(bytesKey("k"), bytesValue("cache-val"));
+ final ReadOnlyKeyValueStore<Bytes, byte[]> view =
store.readOnly(IsolationLevel.READ_UNCOMMITTED);
+ assertThat(view.get(bytesKey("k")), equalTo(bytesValue("cache-val")));
+ }
+
+ @Test
+ public void shouldReadUncommittedViewRange() {
+ underlyingStore.put(bytesKey("a"), bytesValue("1"));
+ store.put(bytesKey("b"), bytesValue("2"));
+ underlyingStore.put(bytesKey("c"), bytesValue("3"));
+ final ReadOnlyKeyValueStore<Bytes, byte[]> view =
store.readOnly(IsolationLevel.READ_UNCOMMITTED);
+ final List<Bytes> keys = new ArrayList<>();
+ try (final KeyValueIterator<Bytes, byte[]> it =
view.range(bytesKey("a"), bytesKey("c"))) {
+ while (it.hasNext()) {
+ keys.add(it.next().key);
+ }
+ }
+ assertEquals(Arrays.asList(bytesKey("a"), bytesKey("b"),
bytesKey("c")), keys);
+ }
+
+ @Test
+ public void shouldReadUncommittedViewReverseRange() {
+ underlyingStore.put(bytesKey("a"), bytesValue("1"));
+ store.put(bytesKey("b"), bytesValue("2"));
+ underlyingStore.put(bytesKey("c"), bytesValue("3"));
+ final ReadOnlyKeyValueStore<Bytes, byte[]> view =
store.readOnly(IsolationLevel.READ_UNCOMMITTED);
+ final List<Bytes> keys = new ArrayList<>();
+ try (final KeyValueIterator<Bytes, byte[]> it =
view.reverseRange(bytesKey("a"), bytesKey("c"))) {
+ while (it.hasNext()) {
+ keys.add(it.next().key);
+ }
+ }
+ assertEquals(Arrays.asList(bytesKey("c"), bytesKey("b"),
bytesKey("a")), keys);
+ }
+
+ @Test
+ public void shouldReadUncommittedViewAll() {
+ underlyingStore.put(bytesKey("a"), bytesValue("1"));
+ store.put(bytesKey("b"), bytesValue("2"));
+ final ReadOnlyKeyValueStore<Bytes, byte[]> view =
store.readOnly(IsolationLevel.READ_UNCOMMITTED);
+ final List<Bytes> keys = new ArrayList<>();
+ try (final KeyValueIterator<Bytes, byte[]> it = view.all()) {
+ while (it.hasNext()) {
+ keys.add(it.next().key);
+ }
+ }
+ assertEquals(Arrays.asList(bytesKey("a"), bytesKey("b")), keys);
+ }
+
+ @Test
+ public void shouldReadUncommittedViewReverseAll() {
+ underlyingStore.put(bytesKey("a"), bytesValue("1"));
+ store.put(bytesKey("b"), bytesValue("2"));
+ final ReadOnlyKeyValueStore<Bytes, byte[]> view =
store.readOnly(IsolationLevel.READ_UNCOMMITTED);
+ final List<Bytes> keys = new ArrayList<>();
+ try (final KeyValueIterator<Bytes, byte[]> it = view.reverseAll()) {
+ while (it.hasNext()) {
+ keys.add(it.next().key);
+ }
+ }
+ assertEquals(Arrays.asList(bytesKey("b"), bytesKey("a")), keys);
+ }
+
+ @Test
+ public void shouldReadUncommittedViewPrefixScan() {
+ underlyingStore.put(bytesKey("foo1"), bytesValue("1"));
+ store.put(bytesKey("foo2"), bytesValue("2"));
+ underlyingStore.put(bytesKey("bar1"), bytesValue("3"));
+ final ReadOnlyKeyValueStore<Bytes, byte[]> view =
store.readOnly(IsolationLevel.READ_UNCOMMITTED);
+ final List<Bytes> keys = new ArrayList<>();
+ try (final KeyValueIterator<Bytes, byte[]> it = view.prefixScan("foo",
new StringSerializer())) {
+ while (it.hasNext()) {
+ keys.add(it.next().key);
+ }
+ }
+ assertEquals(Arrays.asList(bytesKey("foo1"), bytesKey("foo2")), keys);
+ }
+
+ @Test
+ public void
shouldReadUncommittedViewApproximateNumEntriesDelegatesToUnderlying() {
+ underlyingStore.put(bytesKey("a"), bytesValue("1"));
+ store.put(bytesKey("b"), bytesValue("2")); // cache only, not in
underlying
+ final ReadOnlyKeyValueStore<Bytes, byte[]> view =
store.readOnly(IsolationLevel.READ_UNCOMMITTED);
+ assertEquals(underlyingStore.approximateNumEntries(),
view.approximateNumEntries());
+ }
+
+ @Test
+ public void shouldReturnEmptyAndWarnOnInvertedRangeOnOuter() {
+ try (final LogCaptureAppender appender =
LogCaptureAppender.createAndRegister(CachingKeyValueStore.class);
+ final KeyValueIterator<Bytes, byte[]> it =
store.range(bytesKey("z"), bytesKey("a"))) {
+ assertFalse(it.hasNext());
+ assertThat(appender.getMessages(), hasItem(
+ "Returning empty iterator for fetch with invalid key range:
from > to. " +
+ "This may be due to range arguments set in the wrong order, " +
+ "or serdes that don't preserve ordering when lexicographically
comparing the serialized bytes. " +
+ "Note that the built-in numerical serdes do not follow this
for negative numbers"));
+ }
+ }
+
+ @Test
+ public void shouldReturnEmptyAndWarnOnInvertedRangeViaView() {
+ final ReadOnlyKeyValueStore<Bytes, byte[]> view =
store.readOnly(IsolationLevel.READ_UNCOMMITTED);
+ try (final LogCaptureAppender appender =
LogCaptureAppender.createAndRegister(CachingKeyValueStore.class);
+ final KeyValueIterator<Bytes, byte[]> it =
view.range(bytesKey("z"), bytesKey("a"))) {
+ assertFalse(it.hasNext());
+ assertThat(appender.getMessages(), hasItem(
+ "Returning empty iterator for fetch with invalid key range:
from > to. " +
+ "This may be due to range arguments set in the wrong order, " +
+ "or serdes that don't preserve ordering when lexicographically
comparing the serialized bytes. " +
+ "Note that the built-in numerical serdes do not follow this
for negative numbers"));
+ }
+ }
+
private int addItemsToCache() {
long cachedSize = 0;
int i = 0;