jpountz commented on code in PR #13190: URL: https://github.com/apache/lucene/pull/13190#discussion_r1530384809
########## lucene/core/src/java/org/apache/lucene/index/MergePolicy.java: ########## @@ -136,13 +136,13 @@ public boolean isAborted() { */ public void pauseNanos(long pauseNanos, PauseReason reason, BooleanSupplier condition) throws InterruptedException { - if (Thread.currentThread() != owner) { + /* if (Thread.currentThread() != owner) { throw new RuntimeException( "Only the merge owner thread can call pauseNanos(). This thread: " + Thread.currentThread().getName() + ", owner thread: " + owner); - } + }*/ Review Comment: I guess not, but now we need to make sure that the rate is applied across all threads rather than per-thread? ########## lucene/core/src/java/org/apache/lucene/store/RateLimitedIndexOutput.java: ########## @@ -28,30 +30,30 @@ public final class RateLimitedIndexOutput extends FilterIndexOutput { private final RateLimiter rateLimiter; /** How many bytes we've written since we last called rateLimiter.pause. */ - private long bytesSinceLastPause; + private final AtomicLong bytesSinceLastPause = new AtomicLong(0); Review Comment: I'm still unclear why this needs to become an `AtomicLong`, if `bytesSinceLastPause` could be updated/read concurrently, then this would imply that multiple `writeXXX` methods would also be called concurrently, which doesn't make much sense as bytes would be written to disk in a non-deterministic order? ########## lucene/core/src/java/org/apache/lucene/index/MergeRateLimiter.java: ########## @@ -118,24 +118,32 @@ private long maybePause(long bytes, long curNS) throws MergePolicy.MergeAbortedE throw new MergePolicy.MergeAbortedException("Merge aborted."); } - double rate = mbPerSec; // read from volatile rate once. - double secondsToPause = (bytes / 1024. / 1024.) / rate; - - // Time we should sleep until; this is purely instantaneous - // rate (just adds seconds onto the last time we had paused to); - // maybe we should also offer decayed recent history one? - long targetNS = lastNS + (long) (1000000000 * secondsToPause); - - long curPauseNS = targetNS - curNS; - - // We don't bother with thread pausing if the pause is smaller than 2 msec. - if (curPauseNS <= MIN_PAUSE_NS) { - // Set to curNS, not targetNS, to enforce the instant rate, not - // the "averaged over all history" rate: - lastNS = curNS; + final double rate = mbPerSec; // read from volatile rate once. + final double secondsToPause = (bytes / 1024. / 1024.) / rate; + + AtomicLong curPauseNSSetter = new AtomicLong(); + lastNS.updateAndGet( Review Comment: If we used a lock for this whole method, then this may help pause all threads that share the same rate limiter, and effectively apply the rate limit across all threads that run the same merge, rather than applying the rate limit on a per-thread basis? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org For additional commands, e-mail: issues-h...@lucene.apache.org