stevenschlansker commented on code in PR #20512:
URL: https://github.com/apache/kafka/pull/20512#discussion_r2337284038


##########
streams/src/main/java/org/apache/kafka/streams/internals/metrics/OpenIterators.java:
##########
@@ -48,10 +51,11 @@ public OpenIterators(final TaskId taskId,
 
     public void add(final MeteredIterator iterator) {
         openIterators.add(iterator);
+        updateOldestStartTimestamp();
 
         if (openIterators.size() == 1) {

Review Comment:
   Thinking about this more, I wonder if there is another possible race here 
that would not cause an exception but could cause missing or incorrectly 
persisting metrics. The OpenIterators tracker itself is not synchronized (good 
to avoid contention) but that means that we are mutating `add()` or `remove()` 
and calling `size()` expecting the results in program order.
   But, I don't see any guarantee this is the case - my (limited) understanding 
is that OpenIterators is a property of a KeyValueStore, which may have 
partitions shared between threads.
   
   Imagine:
   
   ```
   T1: open iterator on store S1 (partition 0)
   T2: open iterator on store S1 (partition 1)
   T1: openIterators.add(it1)
   T2: openIterators.add(it2)
   T1: if (openIterators.size() == 1) {  (2 == 1? nope)
   T2: if (openIterators.size() == 1) {  (2 == 1? nope)
   End: 2 iterators open, no metric registered
   ```
   
   Same race is possible with remove.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to