stevenschlansker commented on code in PR #20512:
URL: https://github.com/apache/kafka/pull/20512#discussion_r2337284038


##########
streams/src/main/java/org/apache/kafka/streams/internals/metrics/OpenIterators.java:
##########
@@ -48,10 +51,11 @@ public OpenIterators(final TaskId taskId,
 
     public void add(final MeteredIterator iterator) {
         openIterators.add(iterator);
+        updateOldestStartTimestamp();
 
         if (openIterators.size() == 1) {

Review Comment:
   Thinking about this more, I wonder if there is another possible race here 
that would not cause an exception but could cause missing or incorrectly 
persisting metrics. The OpenIterators tracker itself is not synchronized (good 
to avoid contention) but that means that we are mutating `add()` or `remove()` 
and calling `size()` expecting the results in program order.
   But, I don't see any guarantee this is the case - my (limited) understanding 
is that OpenIterators is a property of a KeyValueStore, which may have 
partitions shared between threads.
   
   Imagine:
   
   ```
   T1: open iterator on store S1 (partition 0)
   T2: open iterator on store S1 (partition 1)
   T1: openIterators.add(it1)
   T2: openIterators.add(it2)
   T1: if (openIterators.size() == 1) {  (2 == 1? nope)
   T2: if (openIterators.size() == 1) {  (2 == 1? nope)
   End: 2 iterators open, no metric registered
   ```
   
   Same race is possible with remove.
   (This is a slightly different issue than originally reported, so if it's 
more appropriate to fix it with a different bug / PR, that's totally 
understandable)



##########
streams/src/main/java/org/apache/kafka/streams/internals/metrics/OpenIterators.java:
##########
@@ -48,10 +51,11 @@ public OpenIterators(final TaskId taskId,
 
     public void add(final MeteredIterator iterator) {
         openIterators.add(iterator);
+        updateOldestStartTimestamp();

Review Comment:
   Possible nano-optimization: the oldest start timestamp will only change if 
this is the first open iterator - any other open iterator will definitionally 
be older - so we could possibly only update the oldest timestamp inside the `if 
(openIterators.size() == 1)` block below?
   (assuming timestamps are all coming from the same time source)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to