stevenschlansker commented on code in PR #20512:
URL: https://github.com/apache/kafka/pull/20512#discussion_r2337284038
##########
streams/src/main/java/org/apache/kafka/streams/internals/metrics/OpenIterators.java:
##########
@@ -48,10 +51,11 @@ public OpenIterators(final TaskId taskId,
public void add(final MeteredIterator iterator) {
openIterators.add(iterator);
+ updateOldestStartTimestamp();
if (openIterators.size() == 1) {
Review Comment:
Thinking about this more, I wonder if there is another possible race here
that would not cause an exception but could cause missing or incorrectly
persisting metrics. The OpenIterators tracker itself is not synchronized (good
to avoid contention) but that means that we are mutating `add()` or `remove()`
and calling `size()` expecting the results in program order.
But, I don't see any guarantee this is the case - my (limited) understanding
is that OpenIterators is a property of a KeyValueStore, which may have
partitions shared between threads.
Imagine:
```
T1: open iterator on store S1 (partition 0)
T2: open iterator on store S1 (partition 1)
T1: openIterators.add(it1)
T2: openIterators.add(it2)
T1: if (openIterators.size() == 1) { (2 == 1? nope)
T2: if (openIterators.size() == 1) { (2 == 1? nope)
End: 2 iterators open, no metric registered
```
Same race is possible with remove.
(This is a slightly different issue than originally reported, so if it's
more appropriate to fix it with a different bug / PR, that's totally
understandable)
##########
streams/src/main/java/org/apache/kafka/streams/internals/metrics/OpenIterators.java:
##########
@@ -48,10 +51,11 @@ public OpenIterators(final TaskId taskId,
public void add(final MeteredIterator iterator) {
openIterators.add(iterator);
+ updateOldestStartTimestamp();
Review Comment:
Possible nano-optimization: the oldest start timestamp will only change if
this is the first open iterator - any other open iterator will definitionally
be older - so we could possibly only update the oldest timestamp inside the `if
(openIterators.size() == 1)` block below?
(assuming timestamps are all coming from the same time source)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]