This is an automated email from the ASF dual-hosted git repository.
bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 88a955d8354 KAFKA-20497: Add readOnly(IsolationLevel) to ChangeLogging
stores (#22311)
88a955d8354 is described below
commit 88a955d8354182399e599fc34d36d9267408c9d1
Author: Nick Telford <[email protected]>
AuthorDate: Fri May 29 19:30:01 2026 +0100
KAFKA-20497: Add readOnly(IsolationLevel) to ChangeLogging stores (#22311)
The ChangeLogging wrappers have no cache of their own — they simply
intercept writes to emit changelog records and delegate reads straight
through. There is no uncommitted state held at this layer, so the
isolation semantics belong entirely to the inner store. Pure delegation
to wrapped().readOnly(level) is therefore the only sensible
implementation.
KAFKA-20497
Reviewers: Murali Basani <[email protected]>, Bill Bejeck
<[email protected]>
---
.../internals/ChangeLoggingKeyValueBytesStore.java | 7 +++++++
.../internals/ChangeLoggingSessionBytesStore.java | 7 +++++++
.../state/internals/ChangeLoggingWindowBytesStore.java | 7 +++++++
.../internals/ChangeLoggingKeyValueBytesStoreTest.java | 14 ++++++++++++++
.../internals/ChangeLoggingSessionBytesStoreTest.java | 18 ++++++++++++++++++
.../internals/ChangeLoggingWindowBytesStoreTest.java | 18 ++++++++++++++++++
6 files changed, 71 insertions(+)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
index d6f31629c50..1654ef6380a 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.state.internals;
+import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.serialization.Serializer;
@@ -26,6 +27,7 @@ import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import java.util.List;
@@ -64,6 +66,11 @@ public class ChangeLoggingKeyValueBytesStore
return wrapped().approximateNumEntries();
}
+ @Override
+ public ReadOnlyKeyValueStore<Bytes, byte[]> readOnly(final IsolationLevel
isolationLevel) {
+ return wrapped().readOnly(isolationLevel);
+ }
+
@Override
public void put(final Bytes key,
final byte[] value) {
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java
index dcd85eab489..7ffa1d5c22f 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.state.internals;
+import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.Windowed;
@@ -23,6 +24,7 @@ import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.ReadOnlySessionStore;
import org.apache.kafka.streams.state.SessionStore;
import static
org.apache.kafka.streams.processor.internals.ProcessorContextUtils.asInternalProcessorContext;
@@ -47,6 +49,11 @@ public class ChangeLoggingSessionBytesStore
super.init(stateStoreContext, root);
}
+ @Override
+ public ReadOnlySessionStore<Bytes, byte[]> readOnly(final IsolationLevel
isolationLevel) {
+ return wrapped().readOnly(isolationLevel);
+ }
+
@Override
public KeyValueIterator<Windowed<Bytes>, byte[]> findSessions(final Bytes
key, final long earliestSessionEndTime, final long latestSessionStartTime) {
return wrapped().findSessions(key, earliestSessionEndTime,
latestSessionStartTime);
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
index 74a213f2ec1..d8f273a9c26 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.state.internals;
+import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.Windowed;
@@ -23,6 +24,7 @@ import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.ReadOnlyWindowStore;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.WindowStoreIterator;
@@ -67,6 +69,11 @@ class ChangeLoggingWindowBytesStore
return wrapped().fetch(key, timestamp);
}
+ @Override
+ public ReadOnlyWindowStore<Bytes, byte[]> readOnly(final IsolationLevel
isolationLevel) {
+ return wrapped().readOnly(isolationLevel);
+ }
+
@Override
public WindowStoreIterator<byte[]> fetch(final Bytes key,
final long from,
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
index 0c288a0ddd6..65ffd0cffba 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.state.internals;
+import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.metrics.Metrics;
@@ -61,6 +62,7 @@ import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.CoreMatchers.sameInstance;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.hasEntry;
import static org.mockito.Mockito.mock;
@@ -264,6 +266,18 @@ public class ChangeLoggingKeyValueBytesStoreTest {
}
+ @Test
+ public void shouldDelegateReadOnlyUncommittedToInner() {
+ assertThat(store.readOnly(IsolationLevel.READ_UNCOMMITTED),
+ sameInstance(inner.readOnly(IsolationLevel.READ_UNCOMMITTED)));
+ }
+
+ @Test
+ public void shouldDelegateReadOnlyCommittedToInner() {
+ assertThat(store.readOnly(IsolationLevel.READ_COMMITTED),
+ sameInstance(inner.readOnly(IsolationLevel.READ_COMMITTED)));
+ }
+
private StreamsConfig streamsConfigMock() {
final StreamsConfig streamsConfig = mock(StreamsConfig.class);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java
index b2695f766dc..a92d59d2743 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.state.internals;
+import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.Windowed;
@@ -24,6 +25,7 @@ import
org.apache.kafka.streams.processor.internals.ProcessorContextImpl;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.query.Position;
import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.ReadOnlySessionStore;
import org.apache.kafka.streams.state.SessionStore;
import org.junit.jupiter.api.AfterEach;
@@ -39,6 +41,8 @@ import java.util.Map;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.hamcrest.CoreMatchers.sameInstance;
+import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -51,6 +55,8 @@ public class ChangeLoggingSessionBytesStoreTest {
private SessionStore<Bytes, byte[]> inner;
@Mock
private ProcessorContextImpl context;
+ @Mock
+ private ReadOnlySessionStore<Bytes, byte[]> view;
private ChangeLoggingSessionBytesStore store;
private final byte[] value1 = {0};
@@ -189,4 +195,16 @@ public class ChangeLoggingSessionBytesStoreTest {
verify(inner).close();
}
+
+ @Test
+ public void shouldDelegateReadOnlyUncommittedToInner() {
+ when(inner.readOnly(IsolationLevel.READ_UNCOMMITTED)).thenReturn(view);
+ assertThat(store.readOnly(IsolationLevel.READ_UNCOMMITTED),
sameInstance(view));
+ }
+
+ @Test
+ public void shouldDelegateReadOnlyCommittedToInner() {
+ when(inner.readOnly(IsolationLevel.READ_COMMITTED)).thenReturn(view);
+ assertThat(store.readOnly(IsolationLevel.READ_COMMITTED),
sameInstance(view));
+ }
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java
index fabca5e3bdb..f993c4dfce2 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java
@@ -17,6 +17,7 @@
package org.apache.kafka.streams.state.internals;
+import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.Windowed;
@@ -24,6 +25,7 @@ import
org.apache.kafka.streams.processor.internals.ProcessorContextImpl;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.query.Position;
import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.ReadOnlyWindowStore;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.WindowStoreIterator;
@@ -38,6 +40,8 @@ import org.mockito.quality.Strictness;
import static java.time.Instant.ofEpochMilli;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.hamcrest.CoreMatchers.sameInstance;
+import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -53,6 +57,8 @@ public class ChangeLoggingWindowBytesStoreTest {
private WindowStore<Bytes, byte[]> inner;
@Mock
private ProcessorContextImpl context;
+ @Mock
+ private ReadOnlyWindowStore<Bytes, byte[]> view;
private ChangeLoggingWindowBytesStore store;
private static final Position POSITION =
Position.fromMap(mkMap(mkEntry("", mkMap(mkEntry(0, 1L)))));
@@ -124,6 +130,18 @@ public class ChangeLoggingWindowBytesStoreTest {
}
}
+ @Test
+ public void shouldDelegateReadOnlyUncommittedToInner() {
+ when(inner.readOnly(IsolationLevel.READ_UNCOMMITTED)).thenReturn(view);
+ assertThat(store.readOnly(IsolationLevel.READ_UNCOMMITTED),
sameInstance(view));
+ }
+
+ @Test
+ public void shouldDelegateReadOnlyCommittedToInner() {
+ when(inner.readOnly(IsolationLevel.READ_COMMITTED)).thenReturn(view);
+ assertThat(store.readOnly(IsolationLevel.READ_COMMITTED),
sameInstance(view));
+ }
+
@Test
public void shouldRetainDuplicatesWhenSet() {
store = new ChangeLoggingWindowBytesStore(inner, true,
WindowKeySchema::toStoreKeyBinary);