This is an automated email from the ASF dual-hosted git repository.

domgarguilo pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new 5adc92bce1 Replace semaphore with ReentrantLock in Scanner to help 
clean up scan sessions (#3644)
5adc92bce1 is described below

commit 5adc92bce11ecd9847af0f510a327b5cd2af4418
Author: Dom G <domgargu...@apache.org>
AuthorDate: Wed Jul 26 11:15:51 2023 -0400

    Replace semaphore with ReentrantLock in Scanner to help clean up scan 
sessions (#3644)
    
    * ReentrantLock.getOwner() to help clean up scan sessions
    
    ---------
    
    Co-authored-by: Keith Turner <ktur...@apache.org>
---
 .../apache/accumulo/tserver/tablet/Scanner.java    | 57 ++++++++++++++++------
 1 file changed, 41 insertions(+), 16 deletions(-)

diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Scanner.java 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Scanner.java
index 311492b822..2b89a2005b 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Scanner.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Scanner.java
@@ -20,9 +20,9 @@ package org.apache.accumulo.tserver.tablet;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Range;
@@ -35,6 +35,8 @@ import org.apache.accumulo.tserver.scan.ScanParameters;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Preconditions;
+
 public class Scanner {
   private static final Logger log = LoggerFactory.getLogger(Scanner.class);
 
@@ -46,20 +48,22 @@ public class Scanner {
   private boolean sawException = false;
   private boolean scanClosed = false;
   /**
-   * A fair semaphore of one is used since explicitly know the access pattern 
will be one thread to
-   * read and another to call close if the session becomes idle. Since we're 
explicitly preventing
-   * re-entrance, we're currently using a Semaphore. If at any point we decide 
read needs to be
-   * re-entrant, we can switch to a Reentrant lock.
+   * An interruptible, re-entrant lock is used since we know the access 
pattern will be one thread
+   * to read and another to call close if the session becomes idle. This lock 
allows the closing
+   * thread to interrupt the reading thread if it can't obtain the lock 
immediately. This way, the
+   * reading thread can finish its current operation and release the lock 
promptly.
    */
-  private Semaphore scannerSemaphore;
+  private final InterruptibleLock lock;
+
+  private final AtomicBoolean interruptFlag;
 
-  private AtomicBoolean interruptFlag;
+  private boolean readInProgress = false;
 
   Scanner(TabletBase tablet, Range range, ScanParameters scanParams, 
AtomicBoolean interruptFlag) {
     this.tablet = tablet;
     this.range = range;
     this.scanParams = scanParams;
-    this.scannerSemaphore = new Semaphore(1, true);
+    this.lock = new InterruptibleLock();
     this.interruptFlag = interruptFlag;
   }
 
@@ -72,7 +76,11 @@ public class Scanner {
     try {
 
       try {
-        scannerSemaphore.acquire();
+        lock.lockInterruptibly();
+        Preconditions.checkState(!readInProgress);
+        // Simple check to ensure the same thread never calls this method 
recursively. This code
+        // would not handle that well.
+        readInProgress = true;
       } catch (InterruptedException e) {
         sawException = true;
       }
@@ -162,22 +170,39 @@ public class Scanner {
           tablet.updateQueryStats(results.getResults().size(), 
results.getNumBytes());
         }
       } finally {
-        scannerSemaphore.release();
+        readInProgress = false;
+        lock.unlock();
       }
     }
   }
 
-  // close and read are synchronized because can not call close on the data 
source while it is in
-  // use
-  // this could lead to the case where file iterators that are in use by a 
thread are returned
-  // to the pool... this would be bad
+  private static class InterruptibleLock extends ReentrantLock {
+    private static final long serialVersionUID = 1L;
+
+    public Thread getLockOwner() {
+      return getOwner();
+    }
+  }
+
+  /*
+   * close and read are controlled by an InterruptibleLock because we cannot 
call close on the data
+   * source while it is in use. Without this lock, there could be a situation 
where file iterators
+   * that are in use by a thread are returned to the pool, which would be bad. 
With the lock, a
+   * thread can attempt to close the Scanner. If it can't immediately acquire 
the lock (because a
+   * read is in progress), it interrupts the reading thread. This ensures the 
reading thread can
+   * finish its current operation and release the lock, allowing close to 
finish.
+   */
   public boolean close() {
     interruptFlag.set(true);
 
     boolean obtainedLock = false;
     try {
-      obtainedLock = scannerSemaphore.tryAcquire(10, TimeUnit.MILLISECONDS);
+      obtainedLock = lock.tryLock(10, TimeUnit.MILLISECONDS);
       if (!obtainedLock) {
+        Thread ownerThread = lock.getLockOwner();
+        if (ownerThread != null) {
+          ownerThread.interrupt();
+        }
         return false;
       }
 
@@ -189,7 +214,7 @@ public class Scanner {
       return false;
     } finally {
       if (obtainedLock) {
-        scannerSemaphore.release();
+        lock.unlock();
       }
     }
     return true;

Reply via email to