benwtrent commented on code in PR #13190: URL: https://github.com/apache/lucene/pull/13190#discussion_r1528642636
########## lucene/core/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java: ########## @@ -281,11 +297,11 @@ public IndexOutput createOutput(String name, IOContext context) throws IOExcepti // Because rateLimiter is bound to a particular merge thread, this method should // always be called from that context. Verify this. - assert mergeThread == Thread.currentThread() - : "Not the same merge thread, current=" - + Thread.currentThread() - + ", expected=" - + mergeThread; + /* assert mergeThread == Thread.currentThread() + : "Not the same merge thread, current=" + + Thread.currentThread() + + ", expected=" + + mergeThread;*/ Review Comment: I don't know about this, it seems like paranoid check that may not be necessary if RLIO was threadsafe? ########## lucene/core/src/java/org/apache/lucene/store/RateLimitedIndexOutput.java: ########## @@ -28,30 +30,30 @@ public final class RateLimitedIndexOutput extends FilterIndexOutput { private final RateLimiter rateLimiter; /** How many bytes we've written since we last called rateLimiter.pause. */ - private long bytesSinceLastPause; + private final AtomicLong bytesSinceLastPause = new AtomicLong(0); Review Comment: This obviously creates some overhead now as RLIO could be accessed via multiple threads. I will likely have to benchmark to see the impact. ########## lucene/core/src/java/org/apache/lucene/index/MergeRateLimiter.java: ########## @@ -118,24 +118,32 @@ private long maybePause(long bytes, long curNS) throws MergePolicy.MergeAbortedE throw new MergePolicy.MergeAbortedException("Merge aborted."); } - double rate = mbPerSec; // read from volatile rate once. - double secondsToPause = (bytes / 1024. / 1024.) / rate; - - // Time we should sleep until; this is purely instantaneous - // rate (just adds seconds onto the last time we had paused to); - // maybe we should also offer decayed recent history one? - long targetNS = lastNS + (long) (1000000000 * secondsToPause); - - long curPauseNS = targetNS - curNS; - - // We don't bother with thread pausing if the pause is smaller than 2 msec. - if (curPauseNS <= MIN_PAUSE_NS) { - // Set to curNS, not targetNS, to enforce the instant rate, not - // the "averaged over all history" rate: - lastNS = curNS; + final double rate = mbPerSec; // read from volatile rate once. + final double secondsToPause = (bytes / 1024. / 1024.) / rate; + + AtomicLong curPauseNSSetter = new AtomicLong(); + lastNS.updateAndGet( Review Comment: We would need to lock here or do an `updateAndGet`. `lastNS` is only updated in this particular method, so using atomics seemed better to me. ########## lucene/core/src/java/org/apache/lucene/store/RateLimitedIndexOutput.java: ########## @@ -62,30 +64,42 @@ public void writeBytes(byte[] b, int offset, int length) throws IOException { @Override public void writeInt(int i) throws IOException { - bytesSinceLastPause += Integer.BYTES; + bytesSinceLastPause.addAndGet(Integer.BYTES); checkRate(); out.writeInt(i); } @Override public void writeShort(short i) throws IOException { - bytesSinceLastPause += Short.BYTES; + bytesSinceLastPause.addAndGet(Short.BYTES); checkRate(); out.writeShort(i); } @Override public void writeLong(long i) throws IOException { - bytesSinceLastPause += Long.BYTES; + bytesSinceLastPause.addAndGet(Long.BYTES); checkRate(); out.writeLong(i); } private void checkRate() throws IOException { - if (bytesSinceLastPause > currentMinPauseCheckBytes) { - rateLimiter.pause(bytesSinceLastPause); - bytesSinceLastPause = 0; - currentMinPauseCheckBytes = rateLimiter.getMinPauseCheckBytes(); + AtomicLong localBytesSinceLastPause = new AtomicLong(0); + AtomicBoolean shouldPause = new AtomicBoolean(false); + bytesSinceLastPause.updateAndGet( + bytes -> { + if (bytes > currentMinPauseCheckBytes.get()) { + shouldPause.set(true); + currentMinPauseCheckBytes.set(rateLimiter.getMinPauseCheckBytes()); + localBytesSinceLastPause.set(bytes); + return 0; + } else { + shouldPause.set(false); + } Review Comment: I figured if another thread reached this throttling and is paused & reset the counter, than this current thread shouldn't pause. Throttle tracking is effectively global and each thread uses the same localBytesSinceLastPause, so the work done by another thread could cause a current thread to pause. Or another thread pausing could allow this thread to continue work. ########## lucene/core/src/java/org/apache/lucene/index/MergePolicy.java: ########## @@ -136,13 +136,13 @@ public boolean isAborted() { */ public void pauseNanos(long pauseNanos, PauseReason reason, BooleanSupplier condition) throws InterruptedException { - if (Thread.currentThread() != owner) { + /* if (Thread.currentThread() != owner) { throw new RuntimeException( "Only the merge owner thread can call pauseNanos(). This thread: " + Thread.currentThread().getName() + ", owner thread: " + owner); - } + }*/ Review Comment: Do we care about this at all if the underlying throttling is threadsafe? ########## lucene/core/src/java/org/apache/lucene/index/MergeRateLimiter.java: ########## @@ -118,24 +118,32 @@ private long maybePause(long bytes, long curNS) throws MergePolicy.MergeAbortedE throw new MergePolicy.MergeAbortedException("Merge aborted."); } - double rate = mbPerSec; // read from volatile rate once. - double secondsToPause = (bytes / 1024. / 1024.) / rate; - - // Time we should sleep until; this is purely instantaneous - // rate (just adds seconds onto the last time we had paused to); - // maybe we should also offer decayed recent history one? - long targetNS = lastNS + (long) (1000000000 * secondsToPause); - - long curPauseNS = targetNS - curNS; - - // We don't bother with thread pausing if the pause is smaller than 2 msec. - if (curPauseNS <= MIN_PAUSE_NS) { - // Set to curNS, not targetNS, to enforce the instant rate, not - // the "averaged over all history" rate: - lastNS = curNS; + final double rate = mbPerSec; // read from volatile rate once. + final double secondsToPause = (bytes / 1024. / 1024.) / rate; + + AtomicLong curPauseNSSetter = new AtomicLong(); + lastNS.updateAndGet( + last -> { + // Time we should sleep until; this is purely instantaneous + // rate (just adds seconds onto the last time we had paused to); + // maybe we should also offer decayed recent history one? + long targetNS = last + (long) (1000000000 * secondsToPause); + long curPauseNS = targetNS - curNS; + // We don't bother with thread pausing if the pause is smaller than 2 msec. + if (curPauseNS <= MIN_PAUSE_NS) { + // Set to curNS, not targetNS, to enforce the instant rate, not + // the "averaged over all history" rate: + curPauseNSSetter.set(0); + return curNS; + } Review Comment: Its possible that other threads request for very short pauses, thus all updating `lastNS` to their respective `curNS`. I honestly don't know if this is OK or not. But to handle this case, I decided to use `updateAndGet` to ensure if `lastNS` is changed, we account for it and update our pause request accordingly. ########## lucene/core/src/java/org/apache/lucene/store/RateLimitedIndexOutput.java: ########## @@ -62,30 +64,42 @@ public void writeBytes(byte[] b, int offset, int length) throws IOException { @Override public void writeInt(int i) throws IOException { - bytesSinceLastPause += Integer.BYTES; + bytesSinceLastPause.addAndGet(Integer.BYTES); checkRate(); out.writeInt(i); } @Override public void writeShort(short i) throws IOException { - bytesSinceLastPause += Short.BYTES; + bytesSinceLastPause.addAndGet(Short.BYTES); checkRate(); out.writeShort(i); } @Override public void writeLong(long i) throws IOException { - bytesSinceLastPause += Long.BYTES; + bytesSinceLastPause.addAndGet(Long.BYTES); checkRate(); out.writeLong(i); } private void checkRate() throws IOException { - if (bytesSinceLastPause > currentMinPauseCheckBytes) { - rateLimiter.pause(bytesSinceLastPause); - bytesSinceLastPause = 0; - currentMinPauseCheckBytes = rateLimiter.getMinPauseCheckBytes(); + AtomicLong localBytesSinceLastPause = new AtomicLong(0); + AtomicBoolean shouldPause = new AtomicBoolean(false); + bytesSinceLastPause.updateAndGet( Review Comment: I went with an `updateAndGet` instead of `synchronize` here. Either would work really. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org For additional commands, e-mail: issues-h...@lucene.apache.org