benwtrent commented on code in PR #13190:
URL: https://github.com/apache/lucene/pull/13190#discussion_r1528642636


##########
lucene/core/src/java/org/apache/lucene/index/ConcurrentMergeScheduler.java:
##########
@@ -281,11 +297,11 @@ public IndexOutput createOutput(String name, IOContext 
context) throws IOExcepti
 
         // Because rateLimiter is bound to a particular merge thread, this 
method should
         // always be called from that context. Verify this.
-        assert mergeThread == Thread.currentThread()
-            : "Not the same merge thread, current="
-                + Thread.currentThread()
-                + ", expected="
-                + mergeThread;
+        /*        assert mergeThread == Thread.currentThread()
+        : "Not the same merge thread, current="
+            + Thread.currentThread()
+            + ", expected="
+            + mergeThread;*/

Review Comment:
   I don't know about this, it seems like paranoid check that may not be 
necessary if RLIO was threadsafe?



##########
lucene/core/src/java/org/apache/lucene/store/RateLimitedIndexOutput.java:
##########
@@ -28,30 +30,30 @@ public final class RateLimitedIndexOutput extends 
FilterIndexOutput {
   private final RateLimiter rateLimiter;
 
   /** How many bytes we've written since we last called rateLimiter.pause. */
-  private long bytesSinceLastPause;
+  private final AtomicLong bytesSinceLastPause = new AtomicLong(0);

Review Comment:
   This obviously creates some overhead now as RLIO could be accessed via 
multiple threads.  I will likely have to benchmark to see the impact.



##########
lucene/core/src/java/org/apache/lucene/index/MergeRateLimiter.java:
##########
@@ -118,24 +118,32 @@ private long maybePause(long bytes, long curNS) throws 
MergePolicy.MergeAbortedE
       throw new MergePolicy.MergeAbortedException("Merge aborted.");
     }
 
-    double rate = mbPerSec; // read from volatile rate once.
-    double secondsToPause = (bytes / 1024. / 1024.) / rate;
-
-    // Time we should sleep until; this is purely instantaneous
-    // rate (just adds seconds onto the last time we had paused to);
-    // maybe we should also offer decayed recent history one?
-    long targetNS = lastNS + (long) (1000000000 * secondsToPause);
-
-    long curPauseNS = targetNS - curNS;
-
-    // We don't bother with thread pausing if the pause is smaller than 2 msec.
-    if (curPauseNS <= MIN_PAUSE_NS) {
-      // Set to curNS, not targetNS, to enforce the instant rate, not
-      // the "averaged over all history" rate:
-      lastNS = curNS;
+    final double rate = mbPerSec; // read from volatile rate once.
+    final double secondsToPause = (bytes / 1024. / 1024.) / rate;
+
+    AtomicLong curPauseNSSetter = new AtomicLong();
+    lastNS.updateAndGet(

Review Comment:
   We would need to lock here or do an `updateAndGet`. `lastNS` is only updated 
in this particular method, so using atomics seemed better to me.



##########
lucene/core/src/java/org/apache/lucene/store/RateLimitedIndexOutput.java:
##########
@@ -62,30 +64,42 @@ public void writeBytes(byte[] b, int offset, int length) 
throws IOException {
 
   @Override
   public void writeInt(int i) throws IOException {
-    bytesSinceLastPause += Integer.BYTES;
+    bytesSinceLastPause.addAndGet(Integer.BYTES);
     checkRate();
     out.writeInt(i);
   }
 
   @Override
   public void writeShort(short i) throws IOException {
-    bytesSinceLastPause += Short.BYTES;
+    bytesSinceLastPause.addAndGet(Short.BYTES);
     checkRate();
     out.writeShort(i);
   }
 
   @Override
   public void writeLong(long i) throws IOException {
-    bytesSinceLastPause += Long.BYTES;
+    bytesSinceLastPause.addAndGet(Long.BYTES);
     checkRate();
     out.writeLong(i);
   }
 
   private void checkRate() throws IOException {
-    if (bytesSinceLastPause > currentMinPauseCheckBytes) {
-      rateLimiter.pause(bytesSinceLastPause);
-      bytesSinceLastPause = 0;
-      currentMinPauseCheckBytes = rateLimiter.getMinPauseCheckBytes();
+    AtomicLong localBytesSinceLastPause = new AtomicLong(0);
+    AtomicBoolean shouldPause = new AtomicBoolean(false);
+    bytesSinceLastPause.updateAndGet(
+        bytes -> {
+          if (bytes > currentMinPauseCheckBytes.get()) {
+            shouldPause.set(true);
+            currentMinPauseCheckBytes.set(rateLimiter.getMinPauseCheckBytes());
+            localBytesSinceLastPause.set(bytes);
+            return 0;
+          } else {
+            shouldPause.set(false);
+          }

Review Comment:
   I figured if another thread reached this throttling and is paused & reset 
the counter, than this current thread shouldn't pause. Throttle tracking is 
effectively global and each thread uses the same localBytesSinceLastPause, so 
the work done by another thread could cause a current thread to pause. Or 
another thread pausing could allow this thread to continue work.



##########
lucene/core/src/java/org/apache/lucene/index/MergePolicy.java:
##########
@@ -136,13 +136,13 @@ public boolean isAborted() {
      */
     public void pauseNanos(long pauseNanos, PauseReason reason, 
BooleanSupplier condition)
         throws InterruptedException {
-      if (Thread.currentThread() != owner) {
+      /*      if (Thread.currentThread() != owner) {
         throw new RuntimeException(
             "Only the merge owner thread can call pauseNanos(). This thread: "
                 + Thread.currentThread().getName()
                 + ", owner thread: "
                 + owner);
-      }
+      }*/

Review Comment:
   Do we care about this at all if the underlying throttling is threadsafe?



##########
lucene/core/src/java/org/apache/lucene/index/MergeRateLimiter.java:
##########
@@ -118,24 +118,32 @@ private long maybePause(long bytes, long curNS) throws 
MergePolicy.MergeAbortedE
       throw new MergePolicy.MergeAbortedException("Merge aborted.");
     }
 
-    double rate = mbPerSec; // read from volatile rate once.
-    double secondsToPause = (bytes / 1024. / 1024.) / rate;
-
-    // Time we should sleep until; this is purely instantaneous
-    // rate (just adds seconds onto the last time we had paused to);
-    // maybe we should also offer decayed recent history one?
-    long targetNS = lastNS + (long) (1000000000 * secondsToPause);
-
-    long curPauseNS = targetNS - curNS;
-
-    // We don't bother with thread pausing if the pause is smaller than 2 msec.
-    if (curPauseNS <= MIN_PAUSE_NS) {
-      // Set to curNS, not targetNS, to enforce the instant rate, not
-      // the "averaged over all history" rate:
-      lastNS = curNS;
+    final double rate = mbPerSec; // read from volatile rate once.
+    final double secondsToPause = (bytes / 1024. / 1024.) / rate;
+
+    AtomicLong curPauseNSSetter = new AtomicLong();
+    lastNS.updateAndGet(
+        last -> {
+          // Time we should sleep until; this is purely instantaneous
+          // rate (just adds seconds onto the last time we had paused to);
+          // maybe we should also offer decayed recent history one?
+          long targetNS = last + (long) (1000000000 * secondsToPause);
+          long curPauseNS = targetNS - curNS;
+          // We don't bother with thread pausing if the pause is smaller than 
2 msec.
+          if (curPauseNS <= MIN_PAUSE_NS) {
+            // Set to curNS, not targetNS, to enforce the instant rate, not
+            // the "averaged over all history" rate:
+            curPauseNSSetter.set(0);
+            return curNS;
+          }

Review Comment:
   Its possible that other threads request for very short pauses, thus all 
updating `lastNS` to their respective `curNS`. I honestly don't know if this is 
OK or not. But to handle this case, I decided to use `updateAndGet` to ensure 
if `lastNS` is changed, we account for it and update our pause request 
accordingly.



##########
lucene/core/src/java/org/apache/lucene/store/RateLimitedIndexOutput.java:
##########
@@ -62,30 +64,42 @@ public void writeBytes(byte[] b, int offset, int length) 
throws IOException {
 
   @Override
   public void writeInt(int i) throws IOException {
-    bytesSinceLastPause += Integer.BYTES;
+    bytesSinceLastPause.addAndGet(Integer.BYTES);
     checkRate();
     out.writeInt(i);
   }
 
   @Override
   public void writeShort(short i) throws IOException {
-    bytesSinceLastPause += Short.BYTES;
+    bytesSinceLastPause.addAndGet(Short.BYTES);
     checkRate();
     out.writeShort(i);
   }
 
   @Override
   public void writeLong(long i) throws IOException {
-    bytesSinceLastPause += Long.BYTES;
+    bytesSinceLastPause.addAndGet(Long.BYTES);
     checkRate();
     out.writeLong(i);
   }
 
   private void checkRate() throws IOException {
-    if (bytesSinceLastPause > currentMinPauseCheckBytes) {
-      rateLimiter.pause(bytesSinceLastPause);
-      bytesSinceLastPause = 0;
-      currentMinPauseCheckBytes = rateLimiter.getMinPauseCheckBytes();
+    AtomicLong localBytesSinceLastPause = new AtomicLong(0);
+    AtomicBoolean shouldPause = new AtomicBoolean(false);
+    bytesSinceLastPause.updateAndGet(

Review Comment:
   I went with an `updateAndGet` instead of `synchronize` here. Either would 
work really.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org
For additional commands, e-mail: issues-h...@lucene.apache.org

Reply via email to