jpountz commented on code in PR #13190:
URL: https://github.com/apache/lucene/pull/13190#discussion_r1530384809


##########
lucene/core/src/java/org/apache/lucene/index/MergePolicy.java:
##########
@@ -136,13 +136,13 @@ public boolean isAborted() {
      */
     public void pauseNanos(long pauseNanos, PauseReason reason, 
BooleanSupplier condition)
         throws InterruptedException {
-      if (Thread.currentThread() != owner) {
+      /*      if (Thread.currentThread() != owner) {
         throw new RuntimeException(
             "Only the merge owner thread can call pauseNanos(). This thread: "
                 + Thread.currentThread().getName()
                 + ", owner thread: "
                 + owner);
-      }
+      }*/

Review Comment:
   I guess not, but now we need to make sure that the rate is applied across 
all threads rather than per-thread?



##########
lucene/core/src/java/org/apache/lucene/store/RateLimitedIndexOutput.java:
##########
@@ -28,30 +30,30 @@ public final class RateLimitedIndexOutput extends 
FilterIndexOutput {
   private final RateLimiter rateLimiter;
 
   /** How many bytes we've written since we last called rateLimiter.pause. */
-  private long bytesSinceLastPause;
+  private final AtomicLong bytesSinceLastPause = new AtomicLong(0);

Review Comment:
   I'm still unclear why this needs to become an `AtomicLong`, if 
`bytesSinceLastPause` could be updated/read concurrently, then this would imply 
that multiple `writeXXX` methods would also be called concurrently, which 
doesn't make much sense as bytes would be written to disk in a 
non-deterministic order?



##########
lucene/core/src/java/org/apache/lucene/index/MergeRateLimiter.java:
##########
@@ -118,24 +118,32 @@ private long maybePause(long bytes, long curNS) throws 
MergePolicy.MergeAbortedE
       throw new MergePolicy.MergeAbortedException("Merge aborted.");
     }
 
-    double rate = mbPerSec; // read from volatile rate once.
-    double secondsToPause = (bytes / 1024. / 1024.) / rate;
-
-    // Time we should sleep until; this is purely instantaneous
-    // rate (just adds seconds onto the last time we had paused to);
-    // maybe we should also offer decayed recent history one?
-    long targetNS = lastNS + (long) (1000000000 * secondsToPause);
-
-    long curPauseNS = targetNS - curNS;
-
-    // We don't bother with thread pausing if the pause is smaller than 2 msec.
-    if (curPauseNS <= MIN_PAUSE_NS) {
-      // Set to curNS, not targetNS, to enforce the instant rate, not
-      // the "averaged over all history" rate:
-      lastNS = curNS;
+    final double rate = mbPerSec; // read from volatile rate once.
+    final double secondsToPause = (bytes / 1024. / 1024.) / rate;
+
+    AtomicLong curPauseNSSetter = new AtomicLong();
+    lastNS.updateAndGet(

Review Comment:
   If we used a lock for this whole method, then this may help pause all 
threads that share the same rate limiter, and effectively apply the rate limit 
across all threads that run the same merge, rather than applying the rate limit 
on a per-thread basis?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org
For additional commands, e-mail: issues-h...@lucene.apache.org

Reply via email to