This is an automated email from the ASF dual-hosted git repository.
bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 5008c3b2018 KAFKA-20497: Add readOnly(IsolationLevel) to
MeteredWindowStore (#22318)
5008c3b2018 is described below
commit 5008c3b20184b4ad0d2ba8de9dc959072131b639
Author: Nick Telford <[email protected]>
AuthorDate: Tue Jun 2 15:49:27 2026 +0100
KAFKA-20497: Add readOnly(IsolationLevel) to MeteredWindowStore (#22318)
The metered layer must wrap both isolation levels rather than delegating
the inner store's readOnly view directly: it holds the serde context
needed to deserialise keys and values, and it owns the sensor
registration for iterator metrics. The ReadOnlyView applies the same
key/value deserialisation and sensor instrumentation as the public
window fetch methods so that IQ operations through a readOnly view are
measured identically to direct queries.
KAFKA-20497
Reviewers: Bill Bejeck <[email protected]>
---
.../state/internals/MeteredWindowStore.java | 208 ++++++++++++---------
.../state/internals/MeteredWindowStoreTest.java | 172 +++++++++++++++++
2 files changed, 296 insertions(+), 84 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
index f4009b9f9ea..31c5dbfc0cf 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.state.internals;
+import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.Serde;
@@ -41,12 +42,14 @@ import org.apache.kafka.streams.query.WindowKeyQuery;
import org.apache.kafka.streams.query.WindowRangeQuery;
import org.apache.kafka.streams.query.internals.InternalQueryResultUtil;
import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.ReadOnlyWindowStore;
import org.apache.kafka.streams.state.StateSerdes;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.WindowStoreIterator;
import org.apache.kafka.streams.state.internals.StoreQueryUtils.QueryHandler;
import org.apache.kafka.streams.state.internals.metrics.StateStoreMetrics;
+import java.time.Instant;
import java.util.Comparator;
import java.util.Iterator;
import java.util.Map;
@@ -253,112 +256,62 @@ public class MeteredWindowStore<K, V>
@Override
public WindowStoreIterator<V> fetch(final K key, final long timeFrom,
final long timeTo) {
Objects.requireNonNull(key, "key cannot be null");
- return new MeteredWindowStoreIterator<>(
- wrapped().fetch(serializeKey(key), timeFrom, timeTo),
- fetchSensor,
- iteratorDurationSensor,
- this::deserializeValue,
- time,
- numOpenIterators,
- openIterators
- );
+ return meteredTimeRangeIterator(wrapped().fetch(serializeKey(key),
timeFrom, timeTo));
}
@Override
public WindowStoreIterator<V> backwardFetch(final K key, final long
timeFrom, final long timeTo) {
Objects.requireNonNull(key, "key cannot be null");
- return new MeteredWindowStoreIterator<>(
- wrapped().backwardFetch(serializeKey(key), timeFrom, timeTo),
- fetchSensor,
- iteratorDurationSensor,
- this::deserializeValue,
- time,
- numOpenIterators,
- openIterators
- );
+ return
meteredTimeRangeIterator(wrapped().backwardFetch(serializeKey(key), timeFrom,
timeTo));
}
@Override
- public KeyValueIterator<Windowed<K>, V> fetch(
- final K keyFrom,
- final K keyTo,
- final long timeFrom,
- final long timeTo
- ) {
- return new MeteredWindowedKeyValueIterator<>(
- wrapped().fetch(
- serializeKey(keyFrom),
- serializeKey(keyTo),
- timeFrom,
- timeTo),
- fetchSensor,
- iteratorDurationSensor,
- this::deserializeKey,
- this::deserializeValue,
- time,
- numOpenIterators,
- openIterators
- );
+ public KeyValueIterator<Windowed<K>, V> fetch(final K keyFrom,
+ final K keyTo,
+ final long timeFrom,
+ final long timeTo) {
+ return meteredWindowedIterator(wrapped().fetch(serializeKey(keyFrom),
serializeKey(keyTo), timeFrom, timeTo));
}
@Override
- public KeyValueIterator<Windowed<K>, V> backwardFetch(
- final K keyFrom,
- final K keyTo,
- final long timeFrom,
- final long timeTo
- ) {
- return new MeteredWindowedKeyValueIterator<>(
- wrapped().backwardFetch(
- serializeKey(keyFrom),
- serializeKey(keyTo),
- timeFrom,
- timeTo),
- fetchSensor,
- iteratorDurationSensor,
- this::deserializeKey,
- this::deserializeValue,
- time,
- numOpenIterators,
- openIterators
- );
+ public KeyValueIterator<Windowed<K>, V> backwardFetch(final K keyFrom,
+ final K keyTo,
+ final long timeFrom,
+ final long timeTo) {
+ return
meteredWindowedIterator(wrapped().backwardFetch(serializeKey(keyFrom),
serializeKey(keyTo), timeFrom, timeTo));
}
@Override
public KeyValueIterator<Windowed<K>, V> fetchAll(final long timeFrom,
final long timeTo) {
- return new MeteredWindowedKeyValueIterator<>(
- wrapped().fetchAll(timeFrom, timeTo),
- fetchSensor,
- iteratorDurationSensor,
- this::deserializeKey,
- this::deserializeValue,
- time,
- numOpenIterators,
- openIterators
- );
+ return meteredWindowedIterator(wrapped().fetchAll(timeFrom, timeTo));
}
@Override
public KeyValueIterator<Windowed<K>, V> backwardFetchAll(final long
timeFrom, final long timeTo) {
- return new MeteredWindowedKeyValueIterator<>(
- wrapped().backwardFetchAll(timeFrom, timeTo),
- fetchSensor,
- iteratorDurationSensor,
- this::deserializeKey,
- this::deserializeValue,
- time,
- numOpenIterators,
- openIterators
- );
+ return meteredWindowedIterator(wrapped().backwardFetchAll(timeFrom,
timeTo));
}
@Override
public KeyValueIterator<Windowed<K>, V> all() {
- return new MeteredWindowedKeyValueIterator<>(
- wrapped().all(),
+ return meteredWindowedIterator(wrapped().all());
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<K>, V> backwardAll() {
+ return meteredWindowedIterator(wrapped().backwardAll());
+ }
+
+ @Override
+ public ReadOnlyWindowStore<K, V> readOnly(final IsolationLevel
isolationLevel) {
+ Objects.requireNonNull(isolationLevel, "isolationLevel cannot be
null");
+ return new ReadOnlyView(wrapped().readOnly(isolationLevel));
+ }
+
+ private WindowStoreIterator<V> meteredTimeRangeIterator(final
WindowStoreIterator<byte[]> iter) {
+ return new MeteredWindowStoreIterator<>(
+ iter,
fetchSensor,
iteratorDurationSensor,
- this::deserializeKey,
this::deserializeValue,
time,
numOpenIterators,
@@ -366,10 +319,9 @@ public class MeteredWindowStore<K, V>
);
}
- @Override
- public KeyValueIterator<Windowed<K>, V> backwardAll() {
+ private KeyValueIterator<Windowed<K>, V> meteredWindowedIterator(final
KeyValueIterator<Windowed<Bytes>, byte[]> iter) {
return new MeteredWindowedKeyValueIterator<>(
- wrapped().backwardAll(),
+ iter,
fetchSensor,
iteratorDurationSensor,
this::deserializeKey,
@@ -380,6 +332,94 @@ public class MeteredWindowStore<K, V>
);
}
+ private final class ReadOnlyView implements ReadOnlyWindowStore<K, V> {
+
+ private final ReadOnlyWindowStore<Bytes, byte[]> underlying;
+
+ ReadOnlyView(final ReadOnlyWindowStore<Bytes, byte[]> underlying) {
+ this.underlying = underlying;
+ }
+
+ @Override
+ public V fetch(final K key, final long windowStartTimestamp) {
+ Objects.requireNonNull(key, "key cannot be null");
+ return maybeMeasureLatency(
+ () -> {
+ final byte[] result = underlying.fetch(serializeKey(key),
windowStartTimestamp);
+ return result == null ? null : deserializeValue(result);
+ },
+ time,
+ fetchSensor
+ );
+ }
+
+ @Override
+ public WindowStoreIterator<V> fetch(
+ final K key,
+ final Instant timeFrom,
+ final Instant timeTo
+ ) {
+ Objects.requireNonNull(key, "key cannot be null");
+ return
meteredTimeRangeIterator(underlying.fetch(serializeKey(key), timeFrom, timeTo));
+ }
+
+ @Override
+ public WindowStoreIterator<V> backwardFetch(
+ final K key,
+ final Instant timeFrom,
+ final Instant timeTo
+ ) {
+ Objects.requireNonNull(key, "key cannot be null");
+ return
meteredTimeRangeIterator(underlying.backwardFetch(serializeKey(key), timeFrom,
timeTo));
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<K>, V> fetch(
+ final K keyFrom,
+ final K keyTo,
+ final Instant timeFrom,
+ final Instant timeTo
+ ) {
+ return
meteredWindowedIterator(underlying.fetch(serializeKey(keyFrom),
serializeKey(keyTo), timeFrom, timeTo));
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<K>, V> backwardFetch(
+ final K keyFrom,
+ final K keyTo,
+ final Instant timeFrom,
+ final Instant timeTo
+ ) {
+ return
meteredWindowedIterator(underlying.backwardFetch(serializeKey(keyFrom),
serializeKey(keyTo), timeFrom, timeTo));
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<K>, V> all() {
+ return meteredWindowedIterator(underlying.all());
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<K>, V> backwardAll() {
+ return meteredWindowedIterator(underlying.backwardAll());
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<K>, V> fetchAll(
+ final Instant timeFrom,
+ final Instant timeTo
+ ) {
+ return meteredWindowedIterator(underlying.fetchAll(timeFrom,
timeTo));
+ }
+
+ @Override
+ public KeyValueIterator<Windowed<K>, V> backwardFetchAll(
+ final Instant timeFrom,
+ final Instant timeTo
+ ) {
+ return
meteredWindowedIterator(underlying.backwardFetchAll(timeFrom, timeTo));
+ }
+ }
+
@Override
public void commit(final Map<TopicPartition, Long> changelogOffsets) {
maybeMeasureLatency(() -> super.commit(changelogOffsets), time,
commitSensor);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
index 1d15d483886..e1e497bce36 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.state.internals;
+import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
@@ -41,6 +42,7 @@ import
org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.ReadOnlyWindowStore;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.test.InternalMockProcessorContext;
import org.apache.kafka.test.MockRecordCollector;
@@ -583,6 +585,176 @@ public class MeteredWindowStoreTest {
assertThat(oldestIteratorTimestampMetric.metricValue(), equalTo(0L));
}
+ @SuppressWarnings("unchecked")
+ @Test
+ public void shouldReadOnlyViewFetchPointInTimeApplySerdesAndRecordMetric()
{
+ final ReadOnlyWindowStore<Bytes, byte[]> innerView =
mock(ReadOnlyWindowStore.class);
+
when(innerStoreMock.readOnly(IsolationLevel.READ_UNCOMMITTED)).thenReturn(innerView);
+ when(innerView.fetch(KEY_BYTES, TIMESTAMP)).thenReturn(VALUE_BYTES);
+
+ store.init(context, store);
+
+ final ReadOnlyWindowStore<String, String> view =
store.readOnly(IsolationLevel.READ_UNCOMMITTED);
+ assertThat(view.fetch(KEY, TIMESTAMP), equalTo(VALUE));
+ assertThat((Double) metric("fetch-rate").metricValue(),
greaterThan(0.0));
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void shouldReadOnlyViewFetchSingleKeyApplySerdesAndRecordMetric() {
+ final ReadOnlyWindowStore<Bytes, byte[]> innerView =
mock(ReadOnlyWindowStore.class);
+
when(innerStoreMock.readOnly(IsolationLevel.READ_UNCOMMITTED)).thenReturn(innerView);
+ when(innerView.fetch(KEY_BYTES, ofEpochMilli(1), ofEpochMilli(1)))
+ .thenReturn(KeyValueIterators.emptyWindowStoreIterator());
+
+ store.init(context, store);
+
+ final ReadOnlyWindowStore<String, String> view =
store.readOnly(IsolationLevel.READ_UNCOMMITTED);
+ view.fetch(KEY, ofEpochMilli(1), ofEpochMilli(1)).close();
+ assertThat((Double) metric("fetch-rate").metricValue(),
greaterThan(0.0));
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void
shouldReadOnlyViewBackwardFetchSingleKeyApplySerdesAndRecordMetric() {
+ final ReadOnlyWindowStore<Bytes, byte[]> innerView =
mock(ReadOnlyWindowStore.class);
+
when(innerStoreMock.readOnly(IsolationLevel.READ_UNCOMMITTED)).thenReturn(innerView);
+ when(innerView.backwardFetch(KEY_BYTES, ofEpochMilli(1),
ofEpochMilli(1)))
+ .thenReturn(KeyValueIterators.emptyWindowStoreIterator());
+
+ store.init(context, store);
+
+ final ReadOnlyWindowStore<String, String> view =
store.readOnly(IsolationLevel.READ_UNCOMMITTED);
+ view.backwardFetch(KEY, ofEpochMilli(1), ofEpochMilli(1)).close();
+ assertThat((Double) metric("fetch-rate").metricValue(),
greaterThan(0.0));
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void shouldReadOnlyViewFetchRangeApplySerdesAndRecordMetric() {
+ final ReadOnlyWindowStore<Bytes, byte[]> innerView =
mock(ReadOnlyWindowStore.class);
+
when(innerStoreMock.readOnly(IsolationLevel.READ_UNCOMMITTED)).thenReturn(innerView);
+ when(innerView.fetch(KEY_BYTES, KEY_BYTES, ofEpochMilli(1),
ofEpochMilli(1)))
+ .thenReturn(KeyValueIterators.emptyIterator());
+
+ store.init(context, store);
+
+ final ReadOnlyWindowStore<String, String> view =
store.readOnly(IsolationLevel.READ_UNCOMMITTED);
+ view.fetch(KEY, KEY, ofEpochMilli(1), ofEpochMilli(1)).close();
+ assertThat((Double) metric("fetch-rate").metricValue(),
greaterThan(0.0));
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void
shouldReadOnlyViewBackwardFetchRangeApplySerdesAndRecordMetric() {
+ final ReadOnlyWindowStore<Bytes, byte[]> innerView =
mock(ReadOnlyWindowStore.class);
+
when(innerStoreMock.readOnly(IsolationLevel.READ_UNCOMMITTED)).thenReturn(innerView);
+ when(innerView.backwardFetch(KEY_BYTES, KEY_BYTES, ofEpochMilli(1),
ofEpochMilli(1)))
+ .thenReturn(KeyValueIterators.emptyIterator());
+
+ store.init(context, store);
+
+ final ReadOnlyWindowStore<String, String> view =
store.readOnly(IsolationLevel.READ_UNCOMMITTED);
+ view.backwardFetch(KEY, KEY, ofEpochMilli(1), ofEpochMilli(1)).close();
+ assertThat((Double) metric("fetch-rate").metricValue(),
greaterThan(0.0));
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void shouldReadOnlyViewAllApplySerdesAndRecordMetric() {
+ final ReadOnlyWindowStore<Bytes, byte[]> innerView =
mock(ReadOnlyWindowStore.class);
+
when(innerStoreMock.readOnly(IsolationLevel.READ_UNCOMMITTED)).thenReturn(innerView);
+ when(innerView.all()).thenReturn(KeyValueIterators.emptyIterator());
+
+ store.init(context, store);
+
+ final ReadOnlyWindowStore<String, String> view =
store.readOnly(IsolationLevel.READ_UNCOMMITTED);
+ view.all().close();
+ assertThat((Double) metric("fetch-rate").metricValue(),
greaterThan(0.0));
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void shouldReadOnlyViewBackwardAllApplySerdesAndRecordMetric() {
+ final ReadOnlyWindowStore<Bytes, byte[]> innerView =
mock(ReadOnlyWindowStore.class);
+
when(innerStoreMock.readOnly(IsolationLevel.READ_UNCOMMITTED)).thenReturn(innerView);
+
when(innerView.backwardAll()).thenReturn(KeyValueIterators.emptyIterator());
+
+ store.init(context, store);
+
+ final ReadOnlyWindowStore<String, String> view =
store.readOnly(IsolationLevel.READ_UNCOMMITTED);
+ view.backwardAll().close();
+ assertThat((Double) metric("fetch-rate").metricValue(),
greaterThan(0.0));
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void shouldReadOnlyViewFetchAllApplySerdesAndRecordMetric() {
+ final ReadOnlyWindowStore<Bytes, byte[]> innerView =
mock(ReadOnlyWindowStore.class);
+
when(innerStoreMock.readOnly(IsolationLevel.READ_UNCOMMITTED)).thenReturn(innerView);
+ when(innerView.fetchAll(ofEpochMilli(1),
ofEpochMilli(1))).thenReturn(KeyValueIterators.emptyIterator());
+
+ store.init(context, store);
+
+ final ReadOnlyWindowStore<String, String> view =
store.readOnly(IsolationLevel.READ_UNCOMMITTED);
+ view.fetchAll(ofEpochMilli(1), ofEpochMilli(1)).close();
+ assertThat((Double) metric("fetch-rate").metricValue(),
greaterThan(0.0));
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void shouldReadOnlyViewBackwardFetchAllApplySerdesAndRecordMetric()
{
+ final ReadOnlyWindowStore<Bytes, byte[]> innerView =
mock(ReadOnlyWindowStore.class);
+
when(innerStoreMock.readOnly(IsolationLevel.READ_UNCOMMITTED)).thenReturn(innerView);
+ when(innerView.backwardFetchAll(ofEpochMilli(1),
ofEpochMilli(1))).thenReturn(KeyValueIterators.emptyIterator());
+
+ store.init(context, store);
+
+ final ReadOnlyWindowStore<String, String> view =
store.readOnly(IsolationLevel.READ_UNCOMMITTED);
+ view.backwardFetchAll(ofEpochMilli(1), ofEpochMilli(1)).close();
+ assertThat((Double) metric("fetch-rate").metricValue(),
greaterThan(0.0));
+ }
+
+ @SuppressWarnings({"unchecked", "unused"})
+ @Test
+ public void shouldRecordOpenIteratorMetricsOnReadOnlyViewIterators() {
+ final ReadOnlyWindowStore<Bytes, byte[]> innerView =
mock(ReadOnlyWindowStore.class);
+
when(innerStoreMock.readOnly(IsolationLevel.READ_UNCOMMITTED)).thenReturn(innerView);
+ when(innerView.all()).thenReturn(KeyValueIterators.emptyIterator());
+
+ store.init(context, store);
+
+ final KafkaMetric openIteratorsMetric = metric("num-open-iterators");
+ assertThat((Long) openIteratorsMetric.metricValue(), equalTo(0L));
+
+ final ReadOnlyWindowStore<String, String> view =
store.readOnly(IsolationLevel.READ_UNCOMMITTED);
+ try (final KeyValueIterator<Windowed<String>, String> unused =
view.all()) {
+ assertThat((Long) openIteratorsMetric.metricValue(), equalTo(1L));
+ }
+
+ assertThat((Long) openIteratorsMetric.metricValue(), equalTo(0L));
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void shouldPassReadCommittedThroughToInner() {
+ final ReadOnlyWindowStore<Bytes, byte[]> innerView =
mock(ReadOnlyWindowStore.class);
+
when(innerStoreMock.readOnly(IsolationLevel.READ_COMMITTED)).thenReturn(innerView);
+
+ store.init(context, store);
+
+ store.readOnly(IsolationLevel.READ_COMMITTED);
+
+ verify(innerStoreMock).readOnly(IsolationLevel.READ_COMMITTED);
+ }
+
+ @Test
+ public void shouldThrowNpeOnNullIsolationLevel() {
+ store.init(context, store);
+
+ assertThrows(NullPointerException.class, () -> store.readOnly(null));
+ }
+
private KafkaMetric metric(final String name) {
return metrics.metric(new MetricName(name, STORE_LEVEL_GROUP, "",
tags));
}