mjsax commented on code in PR #16041:
URL: https://github.com/apache/kafka/pull/16041#discussion_r1612395523
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/StateStoreMetrics.java:
##########
@@ -451,6 +455,23 @@ public static void addNumOpenIteratorsGauge(final String
taskId,
}
+ public static void addOldestOpenIteratorGauge(final String taskId,
+ final String storeType,
+ final String storeName,
+ final StreamsMetricsImpl
streamsMetrics,
+ final Gauge<Long>
oldestOpenIteratorGauge) {
+ streamsMetrics.addStoreLevelMutableMetric(
+ taskId,
+ storeType,
+ storeName,
+ OLDEST_ITERATOR_OPEN_SINCE_MS,
+ OLDEST_ITERATOR_OPEN_SINCE_MS_DESCRIPTION,
+ RecordingLevel.INFO,
+ oldestOpenIteratorGauge
+ );
+
Review Comment:
nit: remove blank line
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java:
##########
@@ -169,6 +172,10 @@ private void registerMetrics() {
iteratorDurationSensor =
StateStoreMetrics.iteratorDurationSensor(taskId.toString(), metricsScope,
name(), streamsMetrics);
StateStoreMetrics.addNumOpenIteratorsGauge(taskId.toString(),
metricsScope, name(), streamsMetrics,
(config, now) -> numOpenIterators.get());
+ StateStoreMetrics.addOldestOpenIteratorGauge(taskId.toString(),
metricsScope, name(), streamsMetrics,
+ (config, now) -> openIterators.isEmpty() ? null :
+
openIterators.stream().mapToLong(MeteredIterator::startTimestamp).min().getAsLong()
Review Comment:
I don't want to over-engineer (given that we can safely assume that the
`openIterator` set should be small), but wondering if this is the best
implementation?
In the end, we only want to track the create ts, not the iterators
themselves. And for create ts we could just maintain a list if longs, and we
would `return list.first()` here, and always append to the end of the list when
a new iterator is created? Only "remove" would be more expensive, but we could
use a sorted tree for the list, and thus remove would be O(log n) not O(n)).
For this case, we also don't need the `MeteredIterator` helper interface.
Thoughts?
##########
streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java:
##########
@@ -490,6 +490,31 @@ public void shouldTimeIteratorDuration() {
assertThat((double) iteratorDurationMaxMetric.metricValue(),
equalTo(3.0 * TimeUnit.MILLISECONDS.toNanos(1)));
}
+ @Test
+ public void shouldTrackOldestOpenIteratorTimestamp() {
+ when(inner.all()).thenReturn(KeyValueIterators.emptyIterator());
+ init();
+
+ final KafkaMetric oldestIteratorTimestampMetric =
metric("oldest-iterator-open-since-ms");
+ assertThat(oldestIteratorTimestampMetric, not(nullValue()));
+
+ assertThat(oldestIteratorTimestampMetric.metricValue(), nullValue());
+
+ try (final KeyValueIterator<String, String> first = metered.all()) {
+ final long oldestTimestamp = mockTime.milliseconds();
+ assertThat((Long) oldestIteratorTimestampMetric.metricValue(),
equalTo(oldestTimestamp));
+ mockTime.sleep(100);
+
+ // open a second iterator before closing the first to test that we
still produce the first iterator's timestamp
+ try (final KeyValueIterator<String, String> second =
metered.all()) {
+ assertThat((Long) oldestIteratorTimestampMetric.metricValue(),
equalTo(oldestTimestamp));
+ mockTime.sleep(100);
+ }
Review Comment:
It would be better to not close the second iterator here, but close the
first one first, to see if the metric advances to the second's iterator create
ts -- would need some rewriting of the test; try-with-resource won't allow for
proper nesting, but we can still use try-finally.
Might actually be best, to open like 5 iterators and close them in some
non-linear order (including closing the oldest one like 2 or 3 times) to verify
correct behavior.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]