This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d1a05f99e8d KAFKA-20571: Added store unwrapping for publishing 
num-keys metric. (#22267)
d1a05f99e8d is described below

commit d1a05f99e8dda1e0cd26a1bcf6a399a32b35d716
Author: Nikita Shupletsov <[email protected]>
AuthorDate: Tue May 12 23:39:53 2026 -0700

    KAFKA-20571: Added store unwrapping for publishing num-keys metric. (#22267)
    
    Added store unwrapping to bypass the check in CachingKeyValueStore(if
    it's used) following the same pattern as MeteredSessionStore
    
    Reviewers: Bill Bejeck <[email protected]>, Matthias J. Sax
     <[email protected]>, Evan Zhou <[email protected]>
---
 .../InMemoryStoreMetricsIntegrationTest.java       | 268 +++++++++++++++++++++
 .../state/internals/MeteredKeyValueStore.java      |   6 +-
 .../state/internals/MeteredSessionStore.java       |  12 +-
 .../state/internals/MeteredWindowStore.java        |  12 +-
 .../streams/state/internals/WrappedStateStore.java |  14 ++
 .../state/internals/MeteredKeyValueStoreTest.java  |   4 +-
 6 files changed, 291 insertions(+), 25 deletions(-)

diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/InMemoryStoreMetricsIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/InMemoryStoreMetricsIntegrationTest.java
new file mode 100644
index 00000000000..baf90e91f19
--- /dev/null
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/InMemoryStoreMetricsIntegrationTest.java
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.SessionBytesStoreSupplier;
+import org.apache.kafka.streams.state.SessionStore;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore;
+import org.apache.kafka.streams.state.internals.InMemorySessionStore;
+import org.apache.kafka.streams.state.internals.InMemoryWindowStore;
+import org.apache.kafka.test.MockApiProcessorSupplier;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.Timeout;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+
+import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName;
+import static org.junit.jupiter.api.Assertions.fail;
+
+@Timeout(120)
+@Tag("integration")
+public class InMemoryStoreMetricsIntegrationTest {
+    private static final int NUM_BROKERS = 1;
+    private static final String INPUT_TOPIC = "in-memory-num-keys-input";
+
+    private final EmbeddedKafkaCluster cluster = new 
EmbeddedKafkaCluster(NUM_BROKERS);
+    private String safeTestName;
+
+    @BeforeEach
+    public void startCluster(final TestInfo testInfo) throws 
InterruptedException {
+        cluster.start();
+        cluster.createTopic(INPUT_TOPIC, 1, 1);
+        safeTestName = safeUniqueTestName(testInfo);
+    }
+
+    @AfterEach
+    public void closeCluster() {
+        cluster.stop();
+    }
+
+    @Test
+    public void 
keyValueStoreMetricValueShouldNotThrowIfStoreIsNotInitialized() throws 
Exception {
+        final CountDownLatch initLatch = new CountDownLatch(1);
+        final CountDownLatch finishLatch = new CountDownLatch(1);
+
+        final StoreBuilder<KeyValueStore<String, String>> storeBuilder = 
Stores.keyValueStoreBuilder(
+                new KeyValueBytesStoreSupplier() {
+                    @Override
+                    public String name() {
+                        return "store";
+                    }
+
+                    @Override
+                    public KeyValueStore<Bytes, byte[]> get() {
+                        return new InMemoryKeyValueStore(name()) {
+                            @Override
+                            public void init(final StateStoreContext 
stateStoreContext, final StateStore root) {
+                                initLatch.countDown();
+                                try {
+                                    finishLatch.await();
+                                } catch (final InterruptedException e) {
+                                    throw new RuntimeException(e);
+                                }
+                                super.init(stateStoreContext, root);
+                            }
+                        };
+                    }
+
+                    @Override
+                    public String metricsScope() {
+                        return "in-memory";
+                    }
+                },
+                Serdes.String(),
+                Serdes.String())
+            .withCachingEnabled()
+            .withLoggingEnabled(Collections.emptyMap());
+
+        test(storeBuilder, initLatch, finishLatch);
+    }
+
+    @Test
+    public void sessionStoreMetricValueShouldNotThrowIfStoreIsNotInitialized() 
throws Exception {
+        final CountDownLatch initLatch = new CountDownLatch(1);
+        final CountDownLatch finishLatch = new CountDownLatch(1);
+
+        final long retentionMs = 60_000L;
+
+        final StoreBuilder<SessionStore<String, String>> storeBuilder = 
Stores.sessionStoreBuilder(
+                new SessionBytesStoreSupplier() {
+                    @Override
+                    public String name() {
+                        return "store";
+                    }
+
+                    @Override
+                    public SessionStore<Bytes, byte[]> get() {
+                        return new InMemorySessionStore(name(), retentionMs, 
metricsScope()) {
+                            @Override
+                            public void init(final StateStoreContext 
stateStoreContext, final StateStore root) {
+                                initLatch.countDown();
+                                try {
+                                    finishLatch.await();
+                                } catch (final InterruptedException e) {
+                                    throw new RuntimeException(e);
+                                }
+                                super.init(stateStoreContext, root);
+                            }
+                        };
+                    }
+
+                    @Override
+                    public String metricsScope() {
+                        return "in-memory-session";
+                    }
+
+                    @Override
+                    public long segmentIntervalMs() {
+                        return 1L;
+                    }
+
+                    @Override
+                    public long retentionPeriod() {
+                        return retentionMs;
+                    }
+                },
+                Serdes.String(),
+                Serdes.String())
+            .withCachingEnabled()
+            .withLoggingEnabled(Collections.emptyMap());
+
+        test(storeBuilder, initLatch, finishLatch);
+    }
+
+    @Test
+    public void windowStoreMetricValueShouldNotThrowIfStoreIsNotInitialized() 
throws Exception {
+        final CountDownLatch initLatch = new CountDownLatch(1);
+        final CountDownLatch finishLatch = new CountDownLatch(1);
+
+        final long retentionMs = 60_000L;
+        final long windowMs = 1_000L;
+
+        final StoreBuilder<WindowStore<String, String>> storeBuilder = 
Stores.windowStoreBuilder(
+                new WindowBytesStoreSupplier() {
+                    @Override
+                    public String name() {
+                        return "store";
+                    }
+
+                    @Override
+                    public WindowStore<Bytes, byte[]> get() {
+                        return new InMemoryWindowStore(name(), retentionMs, 
windowMs, false, metricsScope()) {
+                            @Override
+                            public void init(final StateStoreContext 
stateStoreContext, final StateStore root) {
+                                initLatch.countDown();
+                                try {
+                                    finishLatch.await();
+                                } catch (final InterruptedException e) {
+                                    throw new RuntimeException(e);
+                                }
+                                super.init(stateStoreContext, root);
+                            }
+                        };
+                    }
+
+                    @Override
+                    public String metricsScope() {
+                        return "in-memory-window";
+                    }
+
+                    @Override
+                    public long segmentIntervalMs() {
+                        return 1L;
+                    }
+
+                    @Override
+                    public long windowSize() {
+                        return windowMs;
+                    }
+
+                    @Override
+                    public boolean retainDuplicates() {
+                        return false;
+                    }
+
+                    @Override
+                    public long retentionPeriod() {
+                        return retentionMs;
+                    }
+                },
+                Serdes.String(),
+                Serdes.String())
+            .withCachingEnabled()
+            .withLoggingEnabled(Collections.emptyMap());
+
+        test(storeBuilder, initLatch, finishLatch);
+    }
+
+    private void test(final StoreBuilder<?> storeBuilder,
+                      final CountDownLatch initLatch,
+                      final CountDownLatch finishLatch) throws Exception {
+        final StreamsBuilder builder = new StreamsBuilder();
+        builder.addStateStore(storeBuilder);
+        builder.stream(INPUT_TOPIC, Consumed.with(Serdes.String(), 
Serdes.String()))
+            .process(new MockApiProcessorSupplier<>(), "store");
+
+        final Properties props = new Properties();
+        props.put(StreamsConfig.APPLICATION_ID_CONFIG, safeTestName);
+        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
cluster.bootstrapServers());
+        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.StringSerde.class);
+        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.StringSerde.class);
+        props.put(StreamsConfig.STATE_DIR_CONFIG, 
org.apache.kafka.test.TestUtils.tempDirectory().getAbsolutePath());
+
+        try (KafkaStreams streams = new KafkaStreams(builder.build(), props)) {
+            streams.start();
+
+            initLatch.await();
+
+            try {
+                for (final Map.Entry<MetricName, ? extends Metric> entry : 
streams.metrics().entrySet()) {
+                    entry.getValue().metricValue();
+                }
+            } catch (final Exception e) {
+                fail("Getting metric values on an uninitialized store 
shouldn't throw exceptions", e);
+            } finally {
+                finishLatch.countDown();
+            }
+        }
+    }
+}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
index 0fa2c50b88f..673f887779c 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
@@ -182,7 +182,11 @@ public class MeteredKeyValueStore<K, V>
         );
         if (!persistent()) {
             StateStoreMetrics.addNumKeysGauge(taskId.toString(), metricsScope, 
name(), streamsMetrics,
-                    (config, now) -> wrapped().approximateNumEntries());
+                    (config, now) -> {
+                        final InMemoryKeyValueStore inMemoryStore = 
findInner(InMemoryKeyValueStore.class);
+                        return inMemoryStore != null ? 
inMemoryStore.approximateNumEntries() : -1L;
+                    }
+            );
         }
     }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
index 38e06649144..a8ae70068a4 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
@@ -146,23 +146,13 @@ public class MeteredSessionStore<K, V>
         if (!persistent()) {
             StateStoreMetrics.addNumKeysGauge(taskId.toString(), metricsScope, 
name(), streamsMetrics,
                     (config, now) -> {
-                        final InMemorySessionStore inMemoryStore = 
findInMemorySessionStore(wrapped());
+                        final InMemorySessionStore inMemoryStore = 
findInner(InMemorySessionStore.class);
                         return inMemoryStore != null ? 
inMemoryStore.numEntries() : -1L;
                     }
             );
         }
     }
 
-    private static InMemorySessionStore findInMemorySessionStore(final 
StateStore store) {
-        if (store instanceof InMemorySessionStore) {
-            return (InMemorySessionStore) store;
-        } else if (store instanceof WrappedStateStore) {
-            return findInMemorySessionStore(((WrappedStateStore<?, ?, ?>) 
store).wrapped());
-        } else {
-            return null;
-        }
-    }
-
     @Override
     public void recordRestoreTime(final long restoreTimeNs) {
         restoreSensor.record(restoreTimeNs);
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
index 44e7d1b4510..f4009b9f9ea 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
@@ -163,23 +163,13 @@ public class MeteredWindowStore<K, V>
         if (!persistent()) {
             StateStoreMetrics.addNumKeysGauge(taskId.toString(), metricsScope, 
name(), streamsMetrics,
                 (config, now) -> {
-                    final InMemoryWindowStore inMemoryStore = 
findInMemoryWindowStore(wrapped());
+                    final InMemoryWindowStore inMemoryStore = 
findInner(InMemoryWindowStore.class);
                     return inMemoryStore != null ? inMemoryStore.numEntries() 
: -1L;
                 }
             );
         }
     }
 
-    private static InMemoryWindowStore findInMemoryWindowStore(final 
StateStore store) {
-        if (store instanceof InMemoryWindowStore) {
-            return (InMemoryWindowStore) store;
-        } else if (store instanceof WrappedStateStore) {
-            return findInMemoryWindowStore(((WrappedStateStore<?, ?, ?>) 
store).wrapped());
-        } else {
-            return null;
-        }
-    }
-
     @Override
     public void recordRestoreTime(final long restoreTimeNs) {
         restoreSensor.record(restoreTimeNs);
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java
index e6e6c92e559..73786170a27 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java
@@ -72,6 +72,20 @@ public abstract class WrappedStateStore<S extends 
StateStore, K, V> implements S
         this.wrapped = wrapped;
     }
 
+    /**
+     * Walks the wrapped-store chain rooted at this store's {@code wrapped()} 
until it
+     * finds an instance of {@code innerType}, or returns {@code null} if none 
is present.
+     */
+    public <T extends StateStore> T findInner(final Class<T> innerType) {
+        if (innerType.isInstance(wrapped)) {
+            return innerType.cast(wrapped);
+        } else if (wrapped instanceof WrappedStateStore) {
+            return ((WrappedStateStore<?, ?, ?>) wrapped).findInner(innerType);
+        } else {
+            return null;
+        }
+    }
+
     @Override
     public void init(final StateStoreContext stateStoreContext, final 
StateStore root) {
         wrapped.init(stateStoreContext, root);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
index 98bbb5e0be1..2a2a445502f 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
@@ -538,12 +538,12 @@ public class MeteredKeyValueStoreTest {
     @Test
     public void shouldTrackNumKeysMetric() {
         setUp();
-        when(inner.approximateNumEntries()).thenReturn(42L);
         init();
 
         final KafkaMetric numKeysMetric = metric("num-keys");
         assertThat(numKeysMetric, not(nullValue()));
-        assertThat((Long) numKeysMetric.metricValue(), equalTo(42L));
+        // inner store is a mock (not InMemoryKeyValueStore), so returns -1
+        assertThat((Long) numKeysMetric.metricValue(), equalTo(-1L));
     }
 
     @SuppressWarnings("unused")

Reply via email to