This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 153a458d3ca KAFKA-19711: Add commit-rate metric to metered state 
stores Add commit metric (#21853)
153a458d3ca is described below

commit 153a458d3ca6db389a3b945cb95a2148af4d152d
Author: Bill Bejeck <[email protected]>
AuthorDate: Wed Mar 25 09:49:27 2026 -0400

    KAFKA-19711: Add commit-rate metric to metered state stores Add commit 
metric (#21853)
    
    Add `commit-rate`, `commit-latency-avg`, and `commit-latency-max`
    metrics to replace the `flush-`* metrics which are now deprecated.  The
    flush-* metrics will be removed in the next major  release.
    
    Reviewers: Eduwer Camacaro <[email protected]>, Chriso Lolov
     <[email protected]>, Alieh Saeedii <[email protected]>
---
 docs/operations/monitoring.md                      | 71 +++++++++++++++++-----
 .../state/internals/MeteredKeyValueStore.java      | 10 ++-
 .../state/internals/MeteredSessionStore.java       | 10 ++-
 .../state/internals/MeteredWindowStore.java        | 10 ++-
 .../state/internals/metrics/StateStoreMetrics.java | 28 +++++++++
 .../state/internals/MeteredKeyValueStoreTest.java  |  4 +-
 .../MeteredTimestampedKeyValueStoreTest.java       |  4 +-
 ...redTimestampedKeyValueStoreWithHeadersTest.java |  4 +-
 .../MeteredVersionedKeyValueStoreTest.java         |  2 +-
 .../state/internals/MeteredWindowStoreTest.java    |  6 +-
 .../internals/metrics/StateStoreMetricsTest.java   | 17 ++++++
 11 files changed, 130 insertions(+), 36 deletions(-)

diff --git a/docs/operations/monitoring.md b/docs/operations/monitoring.md
index dee195d8dca..9ecaa2ed51c 100644
--- a/docs/operations/monitoring.md
+++ b/docs/operations/monitoring.md
@@ -5316,25 +5316,51 @@ 
kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[s
 <tr>  
 <td>
 
-flush-latency-avg
-</td>  
+flush-latency-avg (deprecated)
+</td>
 <td>
 
-The average flush execution time in ns.
-</td>  
+The average flush execution time in ns. Deprecated: use commit-latency-avg 
instead.
+</td>
 <td>
 
 
kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
-</td> </tr>  
-<tr>  
+</td> </tr>
+<tr>
 <td>
 
-flush-latency-max
-</td>  
+flush-latency-max (deprecated)
+</td>
 <td>
 
-The maximum flush execution time in ns.
-</td>  
+The maximum flush execution time in ns. Deprecated: use commit-latency-max 
instead.
+</td>
+<td>
+
+kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
+</td> </tr>
+<tr>
+<td>
+
+commit-latency-avg
+</td>
+<td>
+
+The average commit execution time in ns.
+</td>
+<td>
+
+kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
+</td> </tr>
+<tr>
+<td>
+
+commit-latency-max
+</td>
+<td>
+
+The maximum commit execution time in ns.
+</td>
 <td>
 
 
kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
@@ -5472,17 +5498,30 @@ 
kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[s
 <tr>  
 <td>
 
-flush-rate
-</td>  
+flush-rate (deprecated)
+</td>
 <td>
 
-The average flush rate for this store.
-</td>  
+The average flush rate for this store. Deprecated: use commit-rate instead.
+</td>
 <td>
 
 
kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
-</td> </tr>  
-<tr>  
+</td> </tr>
+<tr>
+<td>
+
+commit-rate
+</td>
+<td>
+
+The average commit rate for this store.
+</td>
+<td>
+
+kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
+</td> </tr>
+<tr>
 <td>
 
 restore-rate
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
index 5586b71d41b..7c43e484652 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
@@ -91,7 +91,7 @@ public class MeteredKeyValueStore<K, V>
     protected Sensor allSensor;
     protected Sensor rangeSensor;
     protected Sensor prefixScanSensor;
-    private Sensor flushSensor;
+    private Sensor commitSensor;
     private Sensor e2eLatencySensor;
     protected Sensor iteratorDurationSensor;
     protected InternalProcessorContext<?, ?> internalContext;
@@ -143,6 +143,7 @@ public class MeteredKeyValueStore<K, V>
         super.init(stateStoreContext, root);
     }
 
+    @SuppressWarnings("deprecation")
     private void registerMetrics() {
         putSensor = StateStoreMetrics.putSensor(taskId.toString(), 
metricsScope, name(), streamsMetrics);
         putIfAbsentSensor = 
StateStoreMetrics.putIfAbsentSensor(taskId.toString(), metricsScope, name(), 
streamsMetrics);
@@ -151,7 +152,10 @@ public class MeteredKeyValueStore<K, V>
         allSensor = StateStoreMetrics.allSensor(taskId.toString(), 
metricsScope, name(), streamsMetrics);
         rangeSensor = StateStoreMetrics.rangeSensor(taskId.toString(), 
metricsScope, name(), streamsMetrics);
         prefixScanSensor = 
StateStoreMetrics.prefixScanSensor(taskId.toString(), metricsScope, name(), 
streamsMetrics);
-        flushSensor = StateStoreMetrics.flushSensor(taskId.toString(), 
metricsScope, name(), streamsMetrics);
+        // flush metrics ar deprecated per KIP-1035 and will be removed in the 
next major release.
+        // Here we just register the sensor without recording
+        StateStoreMetrics.flushSensor(taskId.toString(), metricsScope, name(), 
streamsMetrics);
+        commitSensor = StateStoreMetrics.commitSensor(taskId.toString(), 
metricsScope, name(), streamsMetrics);
         deleteSensor = StateStoreMetrics.deleteSensor(taskId.toString(), 
metricsScope, name(), streamsMetrics);
         e2eLatencySensor = 
StateStoreMetrics.e2ELatencySensor(taskId.toString(), metricsScope, name(), 
streamsMetrics);
         iteratorDurationSensor = 
StateStoreMetrics.iteratorDurationSensor(taskId.toString(), metricsScope, 
name(), streamsMetrics);
@@ -416,7 +420,7 @@ public class MeteredKeyValueStore<K, V>
 
     @Override
     public void commit(final Map<TopicPartition, Long> changelogOffsets) {
-        maybeMeasureLatency(() -> super.commit(changelogOffsets), time, 
flushSensor);
+        maybeMeasureLatency(() -> super.commit(changelogOffsets), time, 
commitSensor);
     }
 
     @Override
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
index 5d678cbfd25..16fe87227af 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
@@ -73,7 +73,7 @@ public class MeteredSessionStore<K, V>
     protected StreamsMetricsImpl streamsMetrics;
     protected Sensor putSensor;
     protected Sensor fetchSensor;
-    protected Sensor flushSensor;
+    protected Sensor commitSensor;
     protected Sensor removeSensor;
     protected Sensor e2eLatencySensor;
     protected Sensor iteratorDurationSensor;
@@ -119,10 +119,14 @@ public class MeteredSessionStore<K, V>
         super.init(stateStoreContext, root);
     }
 
+    @SuppressWarnings("deprecation")
     private void registerMetrics() {
         putSensor = StateStoreMetrics.putSensor(taskId.toString(), 
metricsScope, name(), streamsMetrics);
         fetchSensor = StateStoreMetrics.fetchSensor(taskId.toString(), 
metricsScope, name(), streamsMetrics);
-        flushSensor = StateStoreMetrics.flushSensor(taskId.toString(), 
metricsScope, name(), streamsMetrics);
+        // flushSensor is deprecated per KIP-1035 and will be removed in the 
next major release.
+        // Here we just register the sensor without recording
+        StateStoreMetrics.flushSensor(taskId.toString(), metricsScope, name(), 
streamsMetrics);
+        commitSensor = StateStoreMetrics.commitSensor(taskId.toString(), 
metricsScope, name(), streamsMetrics);
         removeSensor = StateStoreMetrics.removeSensor(taskId.toString(), 
metricsScope, name(), streamsMetrics);
         e2eLatencySensor = 
StateStoreMetrics.e2ELatencySensor(taskId.toString(), metricsScope, name(), 
streamsMetrics);
         iteratorDurationSensor = 
StateStoreMetrics.iteratorDurationSensor(taskId.toString(), metricsScope, 
name(), streamsMetrics);
@@ -430,7 +434,7 @@ public class MeteredSessionStore<K, V>
 
     @Override
     public void commit(final Map<TopicPartition, Long> changelogOffsets) {
-        maybeMeasureLatency(() -> super.commit(changelogOffsets), time, 
flushSensor);
+        maybeMeasureLatency(() -> super.commit(changelogOffsets), time, 
commitSensor);
     }
 
     @Override
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
index 2929c2b654a..5455f635a74 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
@@ -76,7 +76,7 @@ public class MeteredWindowStore<K, V>
     protected StreamsMetricsImpl streamsMetrics;
     protected Sensor putSensor;
     protected Sensor fetchSensor;
-    private Sensor flushSensor;
+    private Sensor commitSensor;
     private Sensor e2eLatencySensor;
     protected Sensor iteratorDurationSensor;
     protected InternalProcessorContext<?, ?> internalContext;
@@ -141,10 +141,14 @@ public class MeteredWindowStore<K, V>
         return WrappingNullableUtils.prepareValueSerde(valueSerde, getter);
     }
 
+    @SuppressWarnings("deprecation")
     private void registerMetrics() {
         putSensor = StateStoreMetrics.putSensor(taskId.toString(), 
metricsScope, name(), streamsMetrics);
         fetchSensor = StateStoreMetrics.fetchSensor(taskId.toString(), 
metricsScope, name(), streamsMetrics);
-        flushSensor = StateStoreMetrics.flushSensor(taskId.toString(), 
metricsScope, name(), streamsMetrics);
+        // flushSensor is deprecated per KIP-1035 and will be removed in the 
next major release.
+        // Here we just register the sensor without recording
+        StateStoreMetrics.flushSensor(taskId.toString(), metricsScope, name(), 
streamsMetrics);
+        commitSensor = StateStoreMetrics.commitSensor(taskId.toString(), 
metricsScope, name(), streamsMetrics);
         e2eLatencySensor = 
StateStoreMetrics.e2ELatencySensor(taskId.toString(), metricsScope, name(), 
streamsMetrics);
         iteratorDurationSensor = 
StateStoreMetrics.iteratorDurationSensor(taskId.toString(), metricsScope, 
name(), streamsMetrics);
         StateStoreMetrics.addNumOpenIteratorsGauge(taskId.toString(), 
metricsScope, name(), streamsMetrics,
@@ -384,7 +388,7 @@ public class MeteredWindowStore<K, V>
 
     @Override
     public void commit(final Map<TopicPartition, Long> changelogOffsets) {
-        maybeMeasureLatency(() -> super.commit(changelogOffsets), time, 
flushSensor);
+        maybeMeasureLatency(() -> super.commit(changelogOffsets), time, 
commitSensor);
     }
 
     @Override
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/StateStoreMetrics.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/StateStoreMetrics.java
index 356d26bbe97..551ee1d0983 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/StateStoreMetrics.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/StateStoreMetrics.java
@@ -111,6 +111,13 @@ public class StateStoreMetrics {
     private static final String FLUSH_AVG_LATENCY_DESCRIPTION = 
AVG_LATENCY_DESCRIPTION_PREFIX + FLUSH_DESCRIPTION;
     private static final String FLUSH_MAX_LATENCY_DESCRIPTION = 
MAX_LATENCY_DESCRIPTION_PREFIX + FLUSH_DESCRIPTION;
 
+    private static final String COMMIT = "commit";
+    private static final String COMMIT_DESCRIPTION = "calls to commit";
+    private static final String COMMIT_RATE_DESCRIPTION =
+        RATE_DESCRIPTION_PREFIX + COMMIT_DESCRIPTION + RATE_DESCRIPTION_SUFFIX;
+    private static final String COMMIT_AVG_LATENCY_DESCRIPTION = 
AVG_LATENCY_DESCRIPTION_PREFIX + COMMIT_DESCRIPTION;
+    private static final String COMMIT_MAX_LATENCY_DESCRIPTION = 
MAX_LATENCY_DESCRIPTION_PREFIX + COMMIT_DESCRIPTION;
+
     private static final String DELETE = "delete";
     private static final String DELETE_DESCRIPTION = "calls to delete";
     private static final String DELETE_RATE_DESCRIPTION =
@@ -309,6 +316,10 @@ public class StateStoreMetrics {
         return sensor;
     }
 
+    /**
+     * @deprecated since 4.3. Use {@link #commitSensor(String, String, String, 
StreamsMetricsImpl)} instead.
+     */
+    @Deprecated
     public static Sensor flushSensor(final String taskId,
                                      final String storeType,
                                      final String storeName,
@@ -326,6 +337,23 @@ public class StateStoreMetrics {
         );
     }
 
+    public static Sensor commitSensor(final String taskId,
+                                      final String storeType,
+                                      final String storeName,
+                                      final StreamsMetricsImpl streamsMetrics) 
{
+        return throughputAndLatencySensor(
+            taskId,
+            storeType,
+            storeName,
+            COMMIT,
+            COMMIT_RATE_DESCRIPTION,
+            COMMIT_AVG_LATENCY_DESCRIPTION,
+            COMMIT_MAX_LATENCY_DESCRIPTION,
+            RecordingLevel.DEBUG,
+            streamsMetrics
+        );
+    }
+
     public static Sensor deleteSensor(final String taskId,
                                       final String storeType,
                                       final String storeName,
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
index 69f9f710039..42fee48f61d 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
@@ -326,8 +326,8 @@ public class MeteredKeyValueStoreTest {
 
         metered.commit(Map.of());
 
-        final KafkaMetric metric = metric("flush-rate");
-        assertTrue((Double) metric.metricValue() > 0);
+        final KafkaMetric commitMetric = metric("commit-rate");
+        assertTrue((Double) commitMetric.metricValue() > 0);
     }
 
     private interface CachedKeyValueStore extends KeyValueStore<Bytes, 
byte[]>, CachedStateStore<byte[], byte[]> { }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java
index 602be7a9e6f..6e05513ec9c 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java
@@ -351,8 +351,8 @@ public class MeteredTimestampedKeyValueStoreTest {
 
         metered.commit(Map.of());
 
-        final KafkaMetric metric = metric("flush-rate");
-        assertTrue((Double) metric.metricValue() > 0);
+        final KafkaMetric commitMetric = metric("commit-rate");
+        assertTrue((Double) commitMetric.metricValue() > 0);
     }
 
     private interface CachedKeyValueStore extends KeyValueStore<Bytes, 
byte[]>, CachedStateStore<byte[], byte[]> { }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeadersTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeadersTest.java
index 58e6ee45e9e..f6355e7bd6a 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeadersTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeadersTest.java
@@ -292,8 +292,8 @@ public class MeteredTimestampedKeyValueStoreWithHeadersTest 
{
 
         metered.commit(Map.of());
 
-        final KafkaMetric metric = metric("flush-rate");
-        assertTrue((Double) metric.metricValue() > 0);
+        final KafkaMetric commitMetric = metric("commit-rate");
+        assertTrue((Double) commitMetric.metricValue() > 0);
     }
 
     private interface CachedKeyValueStore extends KeyValueStore<Bytes, 
byte[]>, CachedStateStore<byte[], byte[]> { }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStoreTest.java
index 4fcf0010d21..53fa00d1938 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStoreTest.java
@@ -233,7 +233,7 @@ public class MeteredVersionedKeyValueStoreTest {
         store.commit(Map.of());
 
         verify(inner).commit(Map.of());
-        assertThat((Double) getMetric("flush-rate").metricValue(), 
greaterThan(0.0));
+        assertThat((Double) getMetric("commit-rate").metricValue(), 
greaterThan(0.0));
     }
 
     @Test
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
index 4af34c9bd8d..6b60bacb6bc 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
@@ -356,10 +356,8 @@ public class MeteredWindowStoreTest {
         store.init(context, store);
         store.commit(Map.of());
 
-        // it suffices to verify one flush metric since all flush metrics are 
recorded by the same sensor
-        // and the sensor is tested elsewhere
-        final KafkaMetric metric = metric("flush-rate");
-        assertTrue((Double) metric.metricValue() > 0);
+        final KafkaMetric commitMetric = metric("commit-rate");
+        assertTrue((Double) commitMetric.metricValue() > 0);
     }
 
     @Test
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/StateStoreMetricsTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/StateStoreMetricsTest.java
index 151d78da06c..740a87669d8 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/StateStoreMetricsTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/StateStoreMetricsTest.java
@@ -183,6 +183,23 @@ public class StateStoreMetricsTest {
         );
     }
 
+    @Test
+    public void shouldGetCommitSensor() {
+        final String metricName = "commit";
+        final String descriptionOfRate = "The average number of calls to 
commit per second";
+        final String descriptionOfAvg = "The average latency of calls to 
commit";
+        final String descriptionOfMax = "The maximum latency of calls to 
commit";
+        setupStreamsMetrics(metricName);
+
+        getAndVerifySensor(
+            () -> StateStoreMetrics.commitSensor(TASK_ID, STORE_TYPE, 
STORE_NAME, streamsMetrics),
+            metricName,
+            descriptionOfAvg,
+            descriptionOfMax,
+            descriptionOfRate
+        );
+    }
+
     @Test
     public void shouldGetRemoveSensor() {
         final String metricName = "remove";

Reply via email to