This is an automated email from the ASF dual-hosted git repository.

tolbertam pushed a commit to branch 4.x
in repository https://gitbox.apache.org/repos/asf/cassandra-java-driver.git


The following commit(s) were added to refs/heads/4.x by this push:
     new 7e21eb202 Eliminate lock in ConcurrencyLimitingRequestThrottler
7e21eb202 is described below

commit 7e21eb20283f3781a0a748741b768d0adf0fc85b
Author: Jason Koch <[email protected]>
AuthorDate: Mon Feb 3 13:46:32 2025 -0800

    Eliminate lock in ConcurrencyLimitingRequestThrottler
    
    Following from 6d3ba47 this changes the throttler to a complete
    lock-free implementation. Update the related comments and README now
    that it is lock-free.
    
    Patch by Jason Koch; reviewed by Alexandre Dutra and Andy Tolbert
---
 .../core/session/throttling/RequestThrottler.java  |   8 +-
 .../ConcurrencyLimitingRequestThrottler.java       | 180 +++++++++------------
 manual/core/non_blocking/README.md                 |  14 +-
 3 files changed, 90 insertions(+), 112 deletions(-)

diff --git 
a/core/src/main/java/com/datastax/oss/driver/api/core/session/throttling/RequestThrottler.java
 
b/core/src/main/java/com/datastax/oss/driver/api/core/session/throttling/RequestThrottler.java
index 7e2b41ebb..73d347d53 100644
--- 
a/core/src/main/java/com/datastax/oss/driver/api/core/session/throttling/RequestThrottler.java
+++ 
b/core/src/main/java/com/datastax/oss/driver/api/core/session/throttling/RequestThrottler.java
@@ -23,10 +23,10 @@ import java.io.Closeable;
 /**
  * Limits the number of concurrent requests executed by the driver.
  *
- * <p>Usage in non-blocking applications: beware that all built-in 
implementations of this interface
- * use locks for internal coordination, and do not qualify as lock-free, with 
the obvious exception
- * of {@code PassThroughRequestThrottler}. If your application enforces strict 
lock-freedom, then
- * request throttling should not be enabled.
+ * <p>Usage in non-blocking applications: beware that some implementations of 
this interface use
+ * locks for internal coordination, and do not qualify as lock-free. If your 
application enforces
+ * strict lock-freedom, then you should use the {@code 
PassThroughRequestThrottler} or the {@code
+ * ConcurrencyLimitingRequestThrottler}.
  */
 public interface RequestThrottler extends Closeable {
 
diff --git 
a/core/src/main/java/com/datastax/oss/driver/internal/core/session/throttling/ConcurrencyLimitingRequestThrottler.java
 
b/core/src/main/java/com/datastax/oss/driver/internal/core/session/throttling/ConcurrencyLimitingRequestThrottler.java
index ffe0ffe96..8146c5b11 100644
--- 
a/core/src/main/java/com/datastax/oss/driver/internal/core/session/throttling/ConcurrencyLimitingRequestThrottler.java
+++ 
b/core/src/main/java/com/datastax/oss/driver/internal/core/session/throttling/ConcurrencyLimitingRequestThrottler.java
@@ -26,10 +26,9 @@ import 
com.datastax.oss.driver.api.core.session.throttling.Throttled;
 import 
com.datastax.oss.driver.shaded.guava.common.annotations.VisibleForTesting;
 import edu.umd.cs.findbugs.annotations.NonNull;
 import edu.umd.cs.findbugs.annotations.Nullable;
-import java.util.ArrayDeque;
 import java.util.Deque;
-import java.util.concurrent.locks.ReentrantLock;
-import net.jcip.annotations.GuardedBy;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.atomic.AtomicInteger;
 import net.jcip.annotations.ThreadSafe;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -61,17 +60,12 @@ public class ConcurrencyLimitingRequestThrottler implements 
RequestThrottler {
   private final String logPrefix;
   private final int maxConcurrentRequests;
   private final int maxQueueSize;
-
-  private final ReentrantLock lock = new ReentrantLock();
-
-  @GuardedBy("lock")
-  private int concurrentRequests;
-
-  @GuardedBy("lock")
-  private final Deque<Throttled> queue = new ArrayDeque<>();
-
-  @GuardedBy("lock")
-  private boolean closed;
+  private final AtomicInteger concurrentRequests = new AtomicInteger(0);
+  // CLQ is not O(1) for size(), as it forces a full iteration of the queue. 
So, we track
+  // the size of the queue explicitly.
+  private final Deque<Throttled> queue = new ConcurrentLinkedDeque<>();
+  private final AtomicInteger queueSize = new AtomicInteger(0);
+  private volatile boolean closed = false;
 
   public ConcurrencyLimitingRequestThrottler(DriverContext context) {
     this.logPrefix = context.getSessionName();
@@ -88,50 +82,62 @@ public class ConcurrencyLimitingRequestThrottler implements 
RequestThrottler {
 
   @Override
   public void register(@NonNull Throttled request) {
-    boolean notifyReadyRequired = false;
+    if (closed) {
+      LOG.trace("[{}] Rejecting request after shutdown", logPrefix);
+      fail(request, "The session is shutting down");
+      return;
+    }
 
-    lock.lock();
-    try {
-      if (closed) {
-        LOG.trace("[{}] Rejecting request after shutdown", logPrefix);
-        fail(request, "The session is shutting down");
-      } else if (queue.isEmpty() && concurrentRequests < 
maxConcurrentRequests) {
-        // We have capacity for one more concurrent request
+    // Implementation note: Technically the "concurrent requests" or "queue 
size"
+    // could read transiently over the limit, but the queue itself will never 
grow
+    // beyond the limit since we always check for that condition and revert if
+    // over-limit. We do this instead of a CAS-loop to avoid the potential 
loop.
+
+    // If no backlog exists AND we get capacity, we can execute immediately
+    if (queueSize.get() == 0) {
+      // Take a claim first, and then check if we are OK to proceed
+      int newConcurrent = concurrentRequests.incrementAndGet();
+      if (newConcurrent <= maxConcurrentRequests) {
         LOG.trace("[{}] Starting newly registered request", logPrefix);
-        concurrentRequests += 1;
-        notifyReadyRequired = true;
-      } else if (queue.size() < maxQueueSize) {
-        LOG.trace("[{}] Enqueuing request", logPrefix);
-        queue.add(request);
+        request.onThrottleReady(false);
+        return;
       } else {
-        LOG.trace("[{}] Rejecting request because of full queue", logPrefix);
-        fail(
-            request,
-            String.format(
-                "The session has reached its maximum capacity "
-                    + "(concurrent requests: %d, queue size: %d)",
-                maxConcurrentRequests, maxQueueSize));
+        // We exceeded the limit, decrement the count and fall through to the 
queuing logic
+        concurrentRequests.decrementAndGet();
       }
-    } finally {
-      lock.unlock();
     }
 
-    // no need to hold the lock while allowing the task to progress
-    if (notifyReadyRequired) {
-      request.onThrottleReady(false);
+    // If we have a backlog, or we failed to claim capacity, try to enqueue
+    int newQueueSize = queueSize.incrementAndGet();
+    if (newQueueSize <= maxQueueSize) {
+      LOG.trace("[{}] Enqueuing request", logPrefix);
+      queue.offer(request);
+
+      // Double-check that we were still supposed to be enqueued; it is 
possible
+      // that the session was closed while we were enqueuing, it's also 
possible
+      // that it is right now removing the request, so we need to check both
+      if (closed) {
+        if (queue.remove(request)) {
+          queueSize.decrementAndGet();
+          LOG.trace("[{}] Rejecting late request after shutdown", logPrefix);
+          fail(request, "The session is shutting down");
+        }
+      }
+    } else {
+      LOG.trace("[{}] Rejecting request because of full queue", logPrefix);
+      queueSize.decrementAndGet();
+      fail(
+          request,
+          String.format(
+              "The session has reached its maximum capacity "
+                  + "(concurrent requests: %d, queue size: %d)",
+              maxConcurrentRequests, maxQueueSize));
     }
   }
 
   @Override
   public void signalSuccess(@NonNull Throttled request) {
-    Throttled nextRequest = null;
-    lock.lock();
-    try {
-      nextRequest = onRequestDoneAndDequeNext();
-    } finally {
-      lock.unlock();
-    }
-
+    Throttled nextRequest = onRequestDoneAndDequeNext();
     if (nextRequest != null) {
       nextRequest.onThrottleReady(true);
     }
@@ -145,17 +151,13 @@ public class ConcurrencyLimitingRequestThrottler 
implements RequestThrottler {
   @Override
   public void signalTimeout(@NonNull Throttled request) {
     Throttled nextRequest = null;
-    lock.lock();
-    try {
-      if (!closed) {
-        if (queue.remove(request)) { // The request timed out before it was 
active
-          LOG.trace("[{}] Removing timed out request from the queue", 
logPrefix);
-        } else {
-          nextRequest = onRequestDoneAndDequeNext();
-        }
+    if (!closed) {
+      if (queue.remove(request)) { // The request timed out before it was 
active
+        queueSize.decrementAndGet();
+        LOG.trace("[{}] Removing timed out request from the queue", logPrefix);
+      } else {
+        nextRequest = onRequestDoneAndDequeNext();
       }
-    } finally {
-      lock.unlock();
     }
 
     if (nextRequest != null) {
@@ -166,17 +168,13 @@ public class ConcurrencyLimitingRequestThrottler 
implements RequestThrottler {
   @Override
   public void signalCancel(@NonNull Throttled request) {
     Throttled nextRequest = null;
-    lock.lock();
-    try {
-      if (!closed) {
-        if (queue.remove(request)) { // The request has been cancelled before 
it was active
-          LOG.trace("[{}] Removing cancelled request from the queue", 
logPrefix);
-        } else {
-          nextRequest = onRequestDoneAndDequeNext();
-        }
+    if (!closed) {
+      if (queue.remove(request)) { // The request has been cancelled before it 
was active
+        queueSize.decrementAndGet();
+        LOG.trace("[{}] Removing cancelled request from the queue", logPrefix);
+      } else {
+        nextRequest = onRequestDoneAndDequeNext();
       }
-    } finally {
-      lock.unlock();
     }
 
     if (nextRequest != null) {
@@ -184,17 +182,16 @@ public class ConcurrencyLimitingRequestThrottler 
implements RequestThrottler {
     }
   }
 
-  @SuppressWarnings("GuardedBy") // this method is only called with the lock 
held
   @Nullable
   private Throttled onRequestDoneAndDequeNext() {
-    assert lock.isHeldByCurrentThread();
     if (!closed) {
-      if (queue.isEmpty()) {
-        concurrentRequests -= 1;
+      Throttled nextRequest = queue.poll();
+      if (nextRequest == null) {
+        concurrentRequests.decrementAndGet();
       } else {
+        queueSize.decrementAndGet();
         LOG.trace("[{}] Starting dequeued request", logPrefix);
-        // don't touch concurrentRequests since we finished one but started 
another
-        return queue.poll();
+        return nextRequest;
       }
     }
 
@@ -204,45 +201,28 @@ public class ConcurrencyLimitingRequestThrottler 
implements RequestThrottler {
 
   @Override
   public void close() {
-    lock.lock();
-    try {
-      closed = true;
-      LOG.debug("[{}] Rejecting {} queued requests after shutdown", logPrefix, 
queue.size());
-      for (Throttled request : queue) {
-        fail(request, "The session is shutting down");
-      }
-    } finally {
-      lock.unlock();
+    closed = true;
+
+    LOG.debug("[{}] Rejecting {} queued requests after shutdown", logPrefix, 
queueSize.get());
+    Throttled request;
+    while ((request = queue.poll()) != null) {
+      queueSize.decrementAndGet();
+      fail(request, "The session is shutting down");
     }
   }
 
   public int getQueueSize() {
-    lock.lock();
-    try {
-      return queue.size();
-    } finally {
-      lock.unlock();
-    }
+    return queueSize.get();
   }
 
   @VisibleForTesting
   int getConcurrentRequests() {
-    lock.lock();
-    try {
-      return concurrentRequests;
-    } finally {
-      lock.unlock();
-    }
+    return concurrentRequests.get();
   }
 
   @VisibleForTesting
   Deque<Throttled> getQueue() {
-    lock.lock();
-    try {
-      return queue;
-    } finally {
-      lock.unlock();
-    }
+    return queue;
   }
 
   private static void fail(Throttled request, String message) {
diff --git a/manual/core/non_blocking/README.md 
b/manual/core/non_blocking/README.md
index 7abe9d856..f320ffd13 100644
--- a/manual/core/non_blocking/README.md
+++ b/manual/core/non_blocking/README.md
@@ -152,15 +152,13 @@ should not be used if strict lock-freedom is enforced.
 
 [`SafeInitNodeStateListener`]: 
https://docs.datastax.com/en/drivers/java/4.17/com/datastax/oss/driver/api/core/metadata/SafeInitNodeStateListener.html
 
-The same is valid for both built-in [request throttlers]:
+The `RateLimitingRequestThrottler` is currently blocking. The 
`ConcurrencyLimitingRequestThrottler`
+is lock-free.
 
-* `ConcurrencyLimitingRequestThrottler`
-* `RateLimitingRequestThrottler`
-
-See the section about [throttling](../throttling) for details about these 
components. Again, they 
-use locks internally, and depending on how many requests are being executed in 
parallel, the thread
-contention on these locks can be high: in short, if your application enforces 
strict lock-freedom, 
-then these components should not be used.
+See the section about [throttling](../throttling) for details about these 
components. Depending on
+how many requests are being executed in parallel, the thread contention on 
these locks can be high:
+in short, if your application enforces strict lock-freedom, then you should 
not use the
+`RateLimitingRequestThrottler`.
 
 [request throttlers]: 
https://docs.datastax.com/en/drivers/java/4.17/com/datastax/oss/driver/api/core/session/throttling/RequestThrottler.html
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to