This is an automated email from the ASF dual-hosted git repository.
bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new d766d554aee KAFKA-20497: Add readOnly(IsolationLevel) to
MeteredSessionStore (#22317)
d766d554aee is described below
commit d766d554aee1674ca7ea528e3f6f8b8b0f1335c9
Author: Nick Telford <[email protected]>
AuthorDate: Tue Jun 2 15:50:18 2026 +0100
KAFKA-20497: Add readOnly(IsolationLevel) to MeteredSessionStore (#22317)
The metered layer must wrap both isolation levels rather than delegating
the inner store's readOnly view directly: it holds the serde context
needed to deserialise keys and values, and it owns the sensor
registration for iterator metrics. The ReadOnlyView applies the same
key/value deserialisation and wrapIterator/sensor instrumentation as the
public session fetch methods so that IQ operations through a readOnly
view are measured identically to direct queries.
KAFKA-20497
Reviewers: Bill Bejeck <[email protected]>
---
.../state/internals/MeteredSessionStore.java | 262 ++++++++++-----------
.../state/internals/MeteredSessionStoreTest.java | 145 ++++++++++++
2 files changed, 270 insertions(+), 137 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
index a8ae70068a4..a56fd4c9642 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.state.internals;
+import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.Serde;
@@ -40,6 +41,7 @@ import org.apache.kafka.streams.query.QueryResult;
import org.apache.kafka.streams.query.WindowRangeQuery;
import org.apache.kafka.streams.query.internals.InternalQueryResultUtil;
import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.ReadOnlySessionStore;
import org.apache.kafka.streams.state.SessionStore;
import org.apache.kafka.streams.state.StateSerdes;
import org.apache.kafka.streams.state.internals.StoreQueryUtils.QueryHandler;
@@ -254,172 +256,158 @@ public class MeteredSessionStore<K, V>
@Override
public KeyValueIterator<Windowed<K>, V> fetch(final K key) {
Objects.requireNonNull(key, "key cannot be null");
- return new MeteredWindowedKeyValueIterator<>(
- wrapped().fetch(serializeKey(key)),
- fetchSensor,
- iteratorDurationSensor,
- this::deserializeKey,
- this::deserializeValue,
- time,
- numOpenIterators,
- openIterators
- );
+ return meteredWindowedIterator(wrapped().fetch(serializeKey(key)));
}
@Override
public KeyValueIterator<Windowed<K>, V> backwardFetch(final K key) {
Objects.requireNonNull(key, "key cannot be null");
- return new MeteredWindowedKeyValueIterator<>(
- wrapped().backwardFetch(serializeKey(key)),
- fetchSensor,
- iteratorDurationSensor,
- this::deserializeKey,
- this::deserializeValue,
- time,
- numOpenIterators,
- openIterators
- );
+ return
meteredWindowedIterator(wrapped().backwardFetch(serializeKey(key)));
}
@Override
- public KeyValueIterator<Windowed<K>, V> fetch(
- final K keyFrom,
- final K keyTo
- ) {
- return new MeteredWindowedKeyValueIterator<>(
- wrapped().fetch(serializeKey(keyFrom), serializeKey(keyTo)),
- fetchSensor,
- iteratorDurationSensor,
- this::deserializeKey,
- this::deserializeValue,
- time,
- numOpenIterators,
- openIterators
- );
+ public KeyValueIterator<Windowed<K>, V> fetch(final K keyFrom, final K
keyTo) {
+ return meteredWindowedIterator(wrapped().fetch(serializeKey(keyFrom),
serializeKey(keyTo)));
}
@Override
- public KeyValueIterator<Windowed<K>, V> backwardFetch(
- final K keyFrom,
- final K keyTo
- ) {
- return new MeteredWindowedKeyValueIterator<>(
- wrapped().backwardFetch(serializeKey(keyFrom),
serializeKey(keyTo)),
- fetchSensor,
- iteratorDurationSensor,
- this::deserializeKey,
- this::deserializeValue,
- time,
- numOpenIterators,
- openIterators
- );
+ public KeyValueIterator<Windowed<K>, V> backwardFetch(final K keyFrom,
final K keyTo) {
+ return
meteredWindowedIterator(wrapped().backwardFetch(serializeKey(keyFrom),
serializeKey(keyTo)));
}
@Override
- public KeyValueIterator<Windowed<K>, V> findSessions(
- final K key,
- final long earliestSessionEndTime,
- final long latestSessionStartTime
- ) {
+ public KeyValueIterator<Windowed<K>, V> findSessions(final K key,
+ final long
earliestSessionEndTime,
+ final long
latestSessionStartTime) {
Objects.requireNonNull(key, "key cannot be null");
- final Bytes bytesKey = serializeKey(key);
- return new MeteredWindowedKeyValueIterator<>(
- wrapped().findSessions(
- bytesKey,
- earliestSessionEndTime,
- latestSessionStartTime),
- fetchSensor,
- iteratorDurationSensor,
- this::deserializeKey,
- this::deserializeValue,
- time,
- numOpenIterators,
- openIterators
- );
+ return
meteredWindowedIterator(wrapped().findSessions(serializeKey(key),
earliestSessionEndTime, latestSessionStartTime));
}
@Override
- public KeyValueIterator<Windowed<K>, V> backwardFindSessions(
- final K key,
- final long earliestSessionEndTime,
- final long latestSessionStartTime
- ) {
+ public KeyValueIterator<Windowed<K>, V> backwardFindSessions(final K key,
+ final long
earliestSessionEndTime,
+ final long
latestSessionStartTime) {
Objects.requireNonNull(key, "key cannot be null");
- final Bytes bytesKey = serializeKey(key);
- return new MeteredWindowedKeyValueIterator<>(
- wrapped().backwardFindSessions(
- bytesKey,
- earliestSessionEndTime,
- latestSessionStartTime
- ),
- fetchSensor,
- iteratorDurationSensor,
- this::deserializeKey,
- this::deserializeValue,
- time,
- numOpenIterators,
- openIterators
- );
+ return
meteredWindowedIterator(wrapped().backwardFindSessions(serializeKey(key),
earliestSessionEndTime, latestSessionStartTime));
}
@Override
- public KeyValueIterator<Windowed<K>, V> findSessions(
- final K keyFrom,
- final K keyTo,
- final long earliestSessionEndTime,
- final long latestSessionStartTime
- ) {
- final Bytes bytesKeyFrom = serializeKey(keyFrom);
- final Bytes bytesKeyTo = serializeKey(keyTo);
- return new MeteredWindowedKeyValueIterator<>(
- wrapped().findSessions(
- bytesKeyFrom,
- bytesKeyTo,
- earliestSessionEndTime,
- latestSessionStartTime),
- fetchSensor,
- iteratorDurationSensor,
- this::deserializeKey,
- this::deserializeValue,
- time,
- numOpenIterators,
- openIterators
- );
+ public KeyValueIterator<Windowed<K>, V> findSessions(final K keyFrom,
+ final K keyTo,
+ final long
earliestSessionEndTime,
+ final long
latestSessionStartTime) {
+ return
meteredWindowedIterator(wrapped().findSessions(serializeKey(keyFrom),
serializeKey(keyTo), earliestSessionEndTime, latestSessionStartTime));
}
@Override
- public KeyValueIterator<Windowed<K>, V> findSessions(
- final long earliestSessionEndTime,
- final long latestSessionEndTime
- ) {
- return new MeteredWindowedKeyValueIterator<>(
- wrapped().findSessions(earliestSessionEndTime,
latestSessionEndTime),
- fetchSensor,
- iteratorDurationSensor,
- this::deserializeKey,
- this::deserializeValue,
- time,
- numOpenIterators,
- openIterators
- );
+ public KeyValueIterator<Windowed<K>, V> findSessions(final long
earliestSessionEndTime,
+ final long
latestSessionEndTime) {
+ return
meteredWindowedIterator(wrapped().findSessions(earliestSessionEndTime,
latestSessionEndTime));
}
@Override
- public KeyValueIterator<Windowed<K>, V> backwardFindSessions(
- final K keyFrom,
- final K keyTo,
- final long earliestSessionEndTime,
- final long latestSessionStartTime
- ) {
- final Bytes bytesKeyFrom = serializeKey(keyFrom);
- final Bytes bytesKeyTo = serializeKey(keyTo);
+ public KeyValueIterator<Windowed<K>, V> backwardFindSessions(final K
keyFrom,
+ final K keyTo,
+ final long
earliestSessionEndTime,
+ final long
latestSessionStartTime) {
+ return
meteredWindowedIterator(wrapped().backwardFindSessions(serializeKey(keyFrom),
serializeKey(keyTo), earliestSessionEndTime, latestSessionStartTime));
+ }
+
+ @Override
+ public ReadOnlySessionStore<K, V> readOnly(final IsolationLevel
isolationLevel) {
+ Objects.requireNonNull(isolationLevel, "isolationLevel cannot be
null");
+ return new ReadOnlyView(wrapped().readOnly(isolationLevel));
+ }
+
+ private final class ReadOnlyView implements ReadOnlySessionStore<K, V> {
+
+ private final ReadOnlySessionStore<Bytes, byte[]> underlying;
+
+ ReadOnlyView(final ReadOnlySessionStore<Bytes, byte[]> underlying) {
+ this.underlying = underlying;
+ }
+
+ @Override
+ public V fetchSession(
+ final K key,
+ final long earliestSessionEndTime,
+ final long latestSessionStartTime
+ ) {
+ Objects.requireNonNull(key, "key cannot be null");
+ return maybeMeasureLatency(
+ () ->
deserializeValue(underlying.fetchSession(serializeKey(key),
earliestSessionEndTime, latestSessionStartTime)),
+ time,
+ fetchSensor
+ );
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<K>, V> fetch(final K key) {
+ Objects.requireNonNull(key, "key cannot be null");
+ return
meteredWindowedIterator(underlying.fetch(serializeKey(key)));
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<K>, V> backwardFetch(final K key) {
+ Objects.requireNonNull(key, "key cannot be null");
+ return
meteredWindowedIterator(underlying.backwardFetch(serializeKey(key)));
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<K>, V> fetch(final K keyFrom, final K
keyTo) {
+ return
meteredWindowedIterator(underlying.fetch(serializeKey(keyFrom),
serializeKey(keyTo)));
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<K>, V> backwardFetch(final K keyFrom,
final K keyTo) {
+ return
meteredWindowedIterator(underlying.backwardFetch(serializeKey(keyFrom),
serializeKey(keyTo)));
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<K>, V> findSessions(
+ final K key,
+ final long earliestSessionEndTime,
+ final long latestSessionStartTime
+ ) {
+ Objects.requireNonNull(key, "key cannot be null");
+ return
meteredWindowedIterator(underlying.findSessions(serializeKey(key),
earliestSessionEndTime, latestSessionStartTime));
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<K>, V> backwardFindSessions(
+ final K key,
+ final long earliestSessionEndTime,
+ final long latestSessionStartTime
+ ) {
+ Objects.requireNonNull(key, "key cannot be null");
+ return
meteredWindowedIterator(underlying.backwardFindSessions(serializeKey(key),
earliestSessionEndTime, latestSessionStartTime));
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<K>, V> findSessions(
+ final K keyFrom,
+ final K keyTo,
+ final long earliestSessionEndTime,
+ final long latestSessionStartTime
+ ) {
+ return
meteredWindowedIterator(underlying.findSessions(serializeKey(keyFrom),
serializeKey(keyTo), earliestSessionEndTime, latestSessionStartTime));
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<K>, V> backwardFindSessions(
+ final K keyFrom,
+ final K keyTo,
+ final long earliestSessionEndTime,
+ final long latestSessionStartTime
+ ) {
+ return
meteredWindowedIterator(underlying.backwardFindSessions(serializeKey(keyFrom),
serializeKey(keyTo), earliestSessionEndTime, latestSessionStartTime));
+ }
+
+ }
+
+ private KeyValueIterator<Windowed<K>, V> meteredWindowedIterator(final
KeyValueIterator<Windowed<Bytes>, byte[]> iter) {
return new MeteredWindowedKeyValueIterator<>(
- wrapped().backwardFindSessions(
- bytesKeyFrom,
- bytesKeyTo,
- earliestSessionEndTime,
- latestSessionStartTime
- ),
+ iter,
fetchSensor,
iteratorDurationSensor,
this::deserializeKey,
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
index 28982f9e229..11735047dfb 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
@@ -17,6 +17,7 @@
package org.apache.kafka.streams.state.internals;
+import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
@@ -45,6 +46,7 @@ import
org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.ReadOnlySessionStore;
import org.apache.kafka.streams.state.SessionStore;
import org.apache.kafka.test.InternalMockProcessorContext;
import org.apache.kafka.test.KeyValueIteratorStub;
@@ -820,6 +822,149 @@ public class MeteredSessionStoreTest {
assertThat(oldestIteratorTimestampMetric.metricValue(), equalTo(0L));
}
+ @SuppressWarnings("unchecked")
+ @Test
+ public void shouldReadOnlyViewFetchSessionApplySerdesAndRecordMetric() {
+ setUp();
+ final ReadOnlySessionStore<Bytes, byte[]> innerView =
mock(ReadOnlySessionStore.class);
+
when(innerStore.readOnly(IsolationLevel.READ_UNCOMMITTED)).thenReturn(innerView);
+ when(innerView.fetchSession(KEY_BYTES, 0, 0)).thenReturn(VALUE_BYTES);
+ init();
+
+ final ReadOnlySessionStore<String, String> view =
store.readOnly(IsolationLevel.READ_UNCOMMITTED);
+ assertThat(view.fetchSession(KEY, 0, 0), equalTo(VALUE));
+ assertTrue((Double) metric("fetch-rate").metricValue() > 0);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void shouldReadOnlyViewFindSessionsApplySerdesAndRecordMetric() {
+ setUp();
+ final ReadOnlySessionStore<Bytes, byte[]> innerView =
mock(ReadOnlySessionStore.class);
+
when(innerStore.readOnly(IsolationLevel.READ_UNCOMMITTED)).thenReturn(innerView);
+ when(innerView.findSessions(KEY_BYTES, 0, 0))
+ .thenReturn(new KeyValueIteratorStub<>(
+ Collections.singleton(KeyValue.pair(WINDOWED_KEY_BYTES,
VALUE_BYTES)).iterator()));
+ init();
+
+ final ReadOnlySessionStore<String, String> view =
store.readOnly(IsolationLevel.READ_UNCOMMITTED);
+ try (final KeyValueIterator<Windowed<String>, String> it =
view.findSessions(KEY, 0, 0)) {
+ assertThat(it.next().value, equalTo(VALUE));
+ }
+ assertTrue((Double) metric("fetch-rate").metricValue() > 0);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void
shouldReadOnlyViewBackwardFindSessionsApplySerdesAndRecordMetric() {
+ setUp();
+ final ReadOnlySessionStore<Bytes, byte[]> innerView =
mock(ReadOnlySessionStore.class);
+
when(innerStore.readOnly(IsolationLevel.READ_UNCOMMITTED)).thenReturn(innerView);
+ when(innerView.backwardFindSessions(KEY_BYTES, 0, 0))
+ .thenReturn(new KeyValueIteratorStub<>(
+ Collections.singleton(KeyValue.pair(WINDOWED_KEY_BYTES,
VALUE_BYTES)).iterator()));
+ init();
+
+ final ReadOnlySessionStore<String, String> view =
store.readOnly(IsolationLevel.READ_UNCOMMITTED);
+ try (final KeyValueIterator<Windowed<String>, String> it =
view.backwardFindSessions(KEY, 0, 0)) {
+ assertThat(it.next().value, equalTo(VALUE));
+ }
+ assertTrue((Double) metric("fetch-rate").metricValue() > 0);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void
shouldReadOnlyViewFindSessionsRangeApplySerdesAndRecordMetric() {
+ setUp();
+ final ReadOnlySessionStore<Bytes, byte[]> innerView =
mock(ReadOnlySessionStore.class);
+
when(innerStore.readOnly(IsolationLevel.READ_UNCOMMITTED)).thenReturn(innerView);
+ when(innerView.findSessions(KEY_BYTES, KEY_BYTES, 0, 0))
+ .thenReturn(new KeyValueIteratorStub<>(
+ Collections.singleton(KeyValue.pair(WINDOWED_KEY_BYTES,
VALUE_BYTES)).iterator()));
+ init();
+
+ final ReadOnlySessionStore<String, String> view =
store.readOnly(IsolationLevel.READ_UNCOMMITTED);
+ try (final KeyValueIterator<Windowed<String>, String> it =
view.findSessions(KEY, KEY, 0, 0)) {
+ assertThat(it.next().value, equalTo(VALUE));
+ }
+ assertTrue((Double) metric("fetch-rate").metricValue() > 0);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void shouldReadOnlyViewFetchApplySerdesAndRecordMetric() {
+ setUp();
+ final ReadOnlySessionStore<Bytes, byte[]> innerView =
mock(ReadOnlySessionStore.class);
+
when(innerStore.readOnly(IsolationLevel.READ_UNCOMMITTED)).thenReturn(innerView);
+ when(innerView.fetch(KEY_BYTES))
+ .thenReturn(new KeyValueIteratorStub<>(
+ Collections.singleton(KeyValue.pair(WINDOWED_KEY_BYTES,
VALUE_BYTES)).iterator()));
+ init();
+
+ final ReadOnlySessionStore<String, String> view =
store.readOnly(IsolationLevel.READ_UNCOMMITTED);
+ try (final KeyValueIterator<Windowed<String>, String> it =
view.fetch(KEY)) {
+ assertThat(it.next().value, equalTo(VALUE));
+ }
+ assertTrue((Double) metric("fetch-rate").metricValue() > 0);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void shouldReadOnlyViewBackwardFetchApplySerdesAndRecordMetric() {
+ setUp();
+ final ReadOnlySessionStore<Bytes, byte[]> innerView =
mock(ReadOnlySessionStore.class);
+
when(innerStore.readOnly(IsolationLevel.READ_UNCOMMITTED)).thenReturn(innerView);
+ when(innerView.backwardFetch(KEY_BYTES))
+ .thenReturn(new KeyValueIteratorStub<>(
+ Collections.singleton(KeyValue.pair(WINDOWED_KEY_BYTES,
VALUE_BYTES)).iterator()));
+ init();
+
+ final ReadOnlySessionStore<String, String> view =
store.readOnly(IsolationLevel.READ_UNCOMMITTED);
+ try (final KeyValueIterator<Windowed<String>, String> it =
view.backwardFetch(KEY)) {
+ assertThat(it.next().value, equalTo(VALUE));
+ }
+ assertTrue((Double) metric("fetch-rate").metricValue() > 0);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void shouldReadOnlyViewFetchRangeApplySerdesAndRecordMetric() {
+ setUp();
+ final ReadOnlySessionStore<Bytes, byte[]> innerView =
mock(ReadOnlySessionStore.class);
+
when(innerStore.readOnly(IsolationLevel.READ_UNCOMMITTED)).thenReturn(innerView);
+ when(innerView.fetch(KEY_BYTES, KEY_BYTES))
+ .thenReturn(new KeyValueIteratorStub<>(
+ Collections.singleton(KeyValue.pair(WINDOWED_KEY_BYTES,
VALUE_BYTES)).iterator()));
+ init();
+
+ final ReadOnlySessionStore<String, String> view =
store.readOnly(IsolationLevel.READ_UNCOMMITTED);
+ try (final KeyValueIterator<Windowed<String>, String> it =
view.fetch(KEY, KEY)) {
+ assertThat(it.next().value, equalTo(VALUE));
+ }
+ assertTrue((Double) metric("fetch-rate").metricValue() > 0);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void shouldPassReadCommittedThroughToInner() {
+ setUp();
+ final ReadOnlySessionStore<Bytes, byte[]> innerView =
mock(ReadOnlySessionStore.class);
+
when(innerStore.readOnly(IsolationLevel.READ_COMMITTED)).thenReturn(innerView);
+ init();
+
+ store.readOnly(IsolationLevel.READ_COMMITTED);
+
+ verify(innerStore).readOnly(IsolationLevel.READ_COMMITTED);
+ }
+
+ @Test
+ public void shouldThrowNpeOnNullIsolationLevel() {
+ setUp();
+ init();
+
+ assertThrows(NullPointerException.class, () -> store.readOnly(null));
+ }
+
private KafkaMetric metric(final String name) {
return this.metrics.metric(new MetricName(name, STORE_LEVEL_GROUP, "",
this.tags));
}