This is an automated email from the ASF dual-hosted git repository.
bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 153a458d3ca KAFKA-19711: Add commit-rate metric to metered state
stores Add commit metric (#21853)
153a458d3ca is described below
commit 153a458d3ca6db389a3b945cb95a2148af4d152d
Author: Bill Bejeck <[email protected]>
AuthorDate: Wed Mar 25 09:49:27 2026 -0400
KAFKA-19711: Add commit-rate metric to metered state stores Add commit
metric (#21853)
Add `commit-rate`, `commit-latency-avg`, and `commit-latency-max`
metrics to replace the `flush-`* metrics which are now deprecated. The
flush-* metrics will be removed in the next major release.
Reviewers: Eduwer Camacaro <[email protected]>, Chriso Lolov
<[email protected]>, Alieh Saeedii <[email protected]>
---
docs/operations/monitoring.md | 71 +++++++++++++++++-----
.../state/internals/MeteredKeyValueStore.java | 10 ++-
.../state/internals/MeteredSessionStore.java | 10 ++-
.../state/internals/MeteredWindowStore.java | 10 ++-
.../state/internals/metrics/StateStoreMetrics.java | 28 +++++++++
.../state/internals/MeteredKeyValueStoreTest.java | 4 +-
.../MeteredTimestampedKeyValueStoreTest.java | 4 +-
...redTimestampedKeyValueStoreWithHeadersTest.java | 4 +-
.../MeteredVersionedKeyValueStoreTest.java | 2 +-
.../state/internals/MeteredWindowStoreTest.java | 6 +-
.../internals/metrics/StateStoreMetricsTest.java | 17 ++++++
11 files changed, 130 insertions(+), 36 deletions(-)
diff --git a/docs/operations/monitoring.md b/docs/operations/monitoring.md
index dee195d8dca..9ecaa2ed51c 100644
--- a/docs/operations/monitoring.md
+++ b/docs/operations/monitoring.md
@@ -5316,25 +5316,51 @@
kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[s
<tr>
<td>
-flush-latency-avg
-</td>
+flush-latency-avg (deprecated)
+</td>
<td>
-The average flush execution time in ns.
-</td>
+The average flush execution time in ns. Deprecated: use commit-latency-avg
instead.
+</td>
<td>
kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
-</td> </tr>
-<tr>
+</td> </tr>
+<tr>
<td>
-flush-latency-max
-</td>
+flush-latency-max (deprecated)
+</td>
<td>
-The maximum flush execution time in ns.
-</td>
+The maximum flush execution time in ns. Deprecated: use commit-latency-max
instead.
+</td>
+<td>
+
+kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
+</td> </tr>
+<tr>
+<td>
+
+commit-latency-avg
+</td>
+<td>
+
+The average commit execution time in ns.
+</td>
+<td>
+
+kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
+</td> </tr>
+<tr>
+<td>
+
+commit-latency-max
+</td>
+<td>
+
+The maximum commit execution time in ns.
+</td>
<td>
kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
@@ -5472,17 +5498,30 @@
kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[s
<tr>
<td>
-flush-rate
-</td>
+flush-rate (deprecated)
+</td>
<td>
-The average flush rate for this store.
-</td>
+The average flush rate for this store. Deprecated: use commit-rate instead.
+</td>
<td>
kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
-</td> </tr>
-<tr>
+</td> </tr>
+<tr>
+<td>
+
+commit-rate
+</td>
+<td>
+
+The average commit rate for this store.
+</td>
+<td>
+
+kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)
+</td> </tr>
+<tr>
<td>
restore-rate
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
index 5586b71d41b..7c43e484652 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
@@ -91,7 +91,7 @@ public class MeteredKeyValueStore<K, V>
protected Sensor allSensor;
protected Sensor rangeSensor;
protected Sensor prefixScanSensor;
- private Sensor flushSensor;
+ private Sensor commitSensor;
private Sensor e2eLatencySensor;
protected Sensor iteratorDurationSensor;
protected InternalProcessorContext<?, ?> internalContext;
@@ -143,6 +143,7 @@ public class MeteredKeyValueStore<K, V>
super.init(stateStoreContext, root);
}
+ @SuppressWarnings("deprecation")
private void registerMetrics() {
putSensor = StateStoreMetrics.putSensor(taskId.toString(),
metricsScope, name(), streamsMetrics);
putIfAbsentSensor =
StateStoreMetrics.putIfAbsentSensor(taskId.toString(), metricsScope, name(),
streamsMetrics);
@@ -151,7 +152,10 @@ public class MeteredKeyValueStore<K, V>
allSensor = StateStoreMetrics.allSensor(taskId.toString(),
metricsScope, name(), streamsMetrics);
rangeSensor = StateStoreMetrics.rangeSensor(taskId.toString(),
metricsScope, name(), streamsMetrics);
prefixScanSensor =
StateStoreMetrics.prefixScanSensor(taskId.toString(), metricsScope, name(),
streamsMetrics);
- flushSensor = StateStoreMetrics.flushSensor(taskId.toString(),
metricsScope, name(), streamsMetrics);
+ // flush metrics ar deprecated per KIP-1035 and will be removed in the
next major release.
+ // Here we just register the sensor without recording
+ StateStoreMetrics.flushSensor(taskId.toString(), metricsScope, name(),
streamsMetrics);
+ commitSensor = StateStoreMetrics.commitSensor(taskId.toString(),
metricsScope, name(), streamsMetrics);
deleteSensor = StateStoreMetrics.deleteSensor(taskId.toString(),
metricsScope, name(), streamsMetrics);
e2eLatencySensor =
StateStoreMetrics.e2ELatencySensor(taskId.toString(), metricsScope, name(),
streamsMetrics);
iteratorDurationSensor =
StateStoreMetrics.iteratorDurationSensor(taskId.toString(), metricsScope,
name(), streamsMetrics);
@@ -416,7 +420,7 @@ public class MeteredKeyValueStore<K, V>
@Override
public void commit(final Map<TopicPartition, Long> changelogOffsets) {
- maybeMeasureLatency(() -> super.commit(changelogOffsets), time,
flushSensor);
+ maybeMeasureLatency(() -> super.commit(changelogOffsets), time,
commitSensor);
}
@Override
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
index 5d678cbfd25..16fe87227af 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
@@ -73,7 +73,7 @@ public class MeteredSessionStore<K, V>
protected StreamsMetricsImpl streamsMetrics;
protected Sensor putSensor;
protected Sensor fetchSensor;
- protected Sensor flushSensor;
+ protected Sensor commitSensor;
protected Sensor removeSensor;
protected Sensor e2eLatencySensor;
protected Sensor iteratorDurationSensor;
@@ -119,10 +119,14 @@ public class MeteredSessionStore<K, V>
super.init(stateStoreContext, root);
}
+ @SuppressWarnings("deprecation")
private void registerMetrics() {
putSensor = StateStoreMetrics.putSensor(taskId.toString(),
metricsScope, name(), streamsMetrics);
fetchSensor = StateStoreMetrics.fetchSensor(taskId.toString(),
metricsScope, name(), streamsMetrics);
- flushSensor = StateStoreMetrics.flushSensor(taskId.toString(),
metricsScope, name(), streamsMetrics);
+ // flushSensor is deprecated per KIP-1035 and will be removed in the
next major release.
+ // Here we just register the sensor without recording
+ StateStoreMetrics.flushSensor(taskId.toString(), metricsScope, name(),
streamsMetrics);
+ commitSensor = StateStoreMetrics.commitSensor(taskId.toString(),
metricsScope, name(), streamsMetrics);
removeSensor = StateStoreMetrics.removeSensor(taskId.toString(),
metricsScope, name(), streamsMetrics);
e2eLatencySensor =
StateStoreMetrics.e2ELatencySensor(taskId.toString(), metricsScope, name(),
streamsMetrics);
iteratorDurationSensor =
StateStoreMetrics.iteratorDurationSensor(taskId.toString(), metricsScope,
name(), streamsMetrics);
@@ -430,7 +434,7 @@ public class MeteredSessionStore<K, V>
@Override
public void commit(final Map<TopicPartition, Long> changelogOffsets) {
- maybeMeasureLatency(() -> super.commit(changelogOffsets), time,
flushSensor);
+ maybeMeasureLatency(() -> super.commit(changelogOffsets), time,
commitSensor);
}
@Override
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
index 2929c2b654a..5455f635a74 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
@@ -76,7 +76,7 @@ public class MeteredWindowStore<K, V>
protected StreamsMetricsImpl streamsMetrics;
protected Sensor putSensor;
protected Sensor fetchSensor;
- private Sensor flushSensor;
+ private Sensor commitSensor;
private Sensor e2eLatencySensor;
protected Sensor iteratorDurationSensor;
protected InternalProcessorContext<?, ?> internalContext;
@@ -141,10 +141,14 @@ public class MeteredWindowStore<K, V>
return WrappingNullableUtils.prepareValueSerde(valueSerde, getter);
}
+ @SuppressWarnings("deprecation")
private void registerMetrics() {
putSensor = StateStoreMetrics.putSensor(taskId.toString(),
metricsScope, name(), streamsMetrics);
fetchSensor = StateStoreMetrics.fetchSensor(taskId.toString(),
metricsScope, name(), streamsMetrics);
- flushSensor = StateStoreMetrics.flushSensor(taskId.toString(),
metricsScope, name(), streamsMetrics);
+ // flushSensor is deprecated per KIP-1035 and will be removed in the
next major release.
+ // Here we just register the sensor without recording
+ StateStoreMetrics.flushSensor(taskId.toString(), metricsScope, name(),
streamsMetrics);
+ commitSensor = StateStoreMetrics.commitSensor(taskId.toString(),
metricsScope, name(), streamsMetrics);
e2eLatencySensor =
StateStoreMetrics.e2ELatencySensor(taskId.toString(), metricsScope, name(),
streamsMetrics);
iteratorDurationSensor =
StateStoreMetrics.iteratorDurationSensor(taskId.toString(), metricsScope,
name(), streamsMetrics);
StateStoreMetrics.addNumOpenIteratorsGauge(taskId.toString(),
metricsScope, name(), streamsMetrics,
@@ -384,7 +388,7 @@ public class MeteredWindowStore<K, V>
@Override
public void commit(final Map<TopicPartition, Long> changelogOffsets) {
- maybeMeasureLatency(() -> super.commit(changelogOffsets), time,
flushSensor);
+ maybeMeasureLatency(() -> super.commit(changelogOffsets), time,
commitSensor);
}
@Override
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/StateStoreMetrics.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/StateStoreMetrics.java
index 356d26bbe97..551ee1d0983 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/StateStoreMetrics.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/StateStoreMetrics.java
@@ -111,6 +111,13 @@ public class StateStoreMetrics {
private static final String FLUSH_AVG_LATENCY_DESCRIPTION =
AVG_LATENCY_DESCRIPTION_PREFIX + FLUSH_DESCRIPTION;
private static final String FLUSH_MAX_LATENCY_DESCRIPTION =
MAX_LATENCY_DESCRIPTION_PREFIX + FLUSH_DESCRIPTION;
+ private static final String COMMIT = "commit";
+ private static final String COMMIT_DESCRIPTION = "calls to commit";
+ private static final String COMMIT_RATE_DESCRIPTION =
+ RATE_DESCRIPTION_PREFIX + COMMIT_DESCRIPTION + RATE_DESCRIPTION_SUFFIX;
+ private static final String COMMIT_AVG_LATENCY_DESCRIPTION =
AVG_LATENCY_DESCRIPTION_PREFIX + COMMIT_DESCRIPTION;
+ private static final String COMMIT_MAX_LATENCY_DESCRIPTION =
MAX_LATENCY_DESCRIPTION_PREFIX + COMMIT_DESCRIPTION;
+
private static final String DELETE = "delete";
private static final String DELETE_DESCRIPTION = "calls to delete";
private static final String DELETE_RATE_DESCRIPTION =
@@ -309,6 +316,10 @@ public class StateStoreMetrics {
return sensor;
}
+ /**
+ * @deprecated since 4.3. Use {@link #commitSensor(String, String, String,
StreamsMetricsImpl)} instead.
+ */
+ @Deprecated
public static Sensor flushSensor(final String taskId,
final String storeType,
final String storeName,
@@ -326,6 +337,23 @@ public class StateStoreMetrics {
);
}
+ public static Sensor commitSensor(final String taskId,
+ final String storeType,
+ final String storeName,
+ final StreamsMetricsImpl streamsMetrics)
{
+ return throughputAndLatencySensor(
+ taskId,
+ storeType,
+ storeName,
+ COMMIT,
+ COMMIT_RATE_DESCRIPTION,
+ COMMIT_AVG_LATENCY_DESCRIPTION,
+ COMMIT_MAX_LATENCY_DESCRIPTION,
+ RecordingLevel.DEBUG,
+ streamsMetrics
+ );
+ }
+
public static Sensor deleteSensor(final String taskId,
final String storeType,
final String storeName,
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
index 69f9f710039..42fee48f61d 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
@@ -326,8 +326,8 @@ public class MeteredKeyValueStoreTest {
metered.commit(Map.of());
- final KafkaMetric metric = metric("flush-rate");
- assertTrue((Double) metric.metricValue() > 0);
+ final KafkaMetric commitMetric = metric("commit-rate");
+ assertTrue((Double) commitMetric.metricValue() > 0);
}
private interface CachedKeyValueStore extends KeyValueStore<Bytes,
byte[]>, CachedStateStore<byte[], byte[]> { }
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java
index 602be7a9e6f..6e05513ec9c 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java
@@ -351,8 +351,8 @@ public class MeteredTimestampedKeyValueStoreTest {
metered.commit(Map.of());
- final KafkaMetric metric = metric("flush-rate");
- assertTrue((Double) metric.metricValue() > 0);
+ final KafkaMetric commitMetric = metric("commit-rate");
+ assertTrue((Double) commitMetric.metricValue() > 0);
}
private interface CachedKeyValueStore extends KeyValueStore<Bytes,
byte[]>, CachedStateStore<byte[], byte[]> { }
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeadersTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeadersTest.java
index 58e6ee45e9e..f6355e7bd6a 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeadersTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeadersTest.java
@@ -292,8 +292,8 @@ public class MeteredTimestampedKeyValueStoreWithHeadersTest
{
metered.commit(Map.of());
- final KafkaMetric metric = metric("flush-rate");
- assertTrue((Double) metric.metricValue() > 0);
+ final KafkaMetric commitMetric = metric("commit-rate");
+ assertTrue((Double) commitMetric.metricValue() > 0);
}
private interface CachedKeyValueStore extends KeyValueStore<Bytes,
byte[]>, CachedStateStore<byte[], byte[]> { }
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStoreTest.java
index 4fcf0010d21..53fa00d1938 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStoreTest.java
@@ -233,7 +233,7 @@ public class MeteredVersionedKeyValueStoreTest {
store.commit(Map.of());
verify(inner).commit(Map.of());
- assertThat((Double) getMetric("flush-rate").metricValue(),
greaterThan(0.0));
+ assertThat((Double) getMetric("commit-rate").metricValue(),
greaterThan(0.0));
}
@Test
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
index 4af34c9bd8d..6b60bacb6bc 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
@@ -356,10 +356,8 @@ public class MeteredWindowStoreTest {
store.init(context, store);
store.commit(Map.of());
- // it suffices to verify one flush metric since all flush metrics are
recorded by the same sensor
- // and the sensor is tested elsewhere
- final KafkaMetric metric = metric("flush-rate");
- assertTrue((Double) metric.metricValue() > 0);
+ final KafkaMetric commitMetric = metric("commit-rate");
+ assertTrue((Double) commitMetric.metricValue() > 0);
}
@Test
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/StateStoreMetricsTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/StateStoreMetricsTest.java
index 151d78da06c..740a87669d8 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/StateStoreMetricsTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/StateStoreMetricsTest.java
@@ -183,6 +183,23 @@ public class StateStoreMetricsTest {
);
}
+ @Test
+ public void shouldGetCommitSensor() {
+ final String metricName = "commit";
+ final String descriptionOfRate = "The average number of calls to
commit per second";
+ final String descriptionOfAvg = "The average latency of calls to
commit";
+ final String descriptionOfMax = "The maximum latency of calls to
commit";
+ setupStreamsMetrics(metricName);
+
+ getAndVerifySensor(
+ () -> StateStoreMetrics.commitSensor(TASK_ID, STORE_TYPE,
STORE_NAME, streamsMetrics),
+ metricName,
+ descriptionOfAvg,
+ descriptionOfMax,
+ descriptionOfRate
+ );
+ }
+
@Test
public void shouldGetRemoveSensor() {
final String metricName = "remove";