This is an automated email from the ASF dual-hosted git repository. domgarguilo pushed a commit to branch 2.1 in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push: new 5adc92bce1 Replace semaphore with ReentrantLock in Scanner to help clean up scan sessions (#3644) 5adc92bce1 is described below commit 5adc92bce11ecd9847af0f510a327b5cd2af4418 Author: Dom G <domgargu...@apache.org> AuthorDate: Wed Jul 26 11:15:51 2023 -0400 Replace semaphore with ReentrantLock in Scanner to help clean up scan sessions (#3644) * ReentrantLock.getOwner() to help clean up scan sessions --------- Co-authored-by: Keith Turner <ktur...@apache.org> --- .../apache/accumulo/tserver/tablet/Scanner.java | 57 ++++++++++++++++------ 1 file changed, 41 insertions(+), 16 deletions(-) diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Scanner.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Scanner.java index 311492b822..2b89a2005b 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Scanner.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Scanner.java @@ -20,9 +20,9 @@ package org.apache.accumulo.tserver.tablet; import java.io.IOException; import java.util.ArrayList; -import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.ReentrantLock; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Range; @@ -35,6 +35,8 @@ import org.apache.accumulo.tserver.scan.ScanParameters; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Preconditions; + public class Scanner { private static final Logger log = LoggerFactory.getLogger(Scanner.class); @@ -46,20 +48,22 @@ public class Scanner { private boolean sawException = false; private boolean scanClosed = false; /** - * A fair semaphore of one is used since explicitly know the access pattern will be one thread to - * read and another to call close if the session becomes idle. Since we're explicitly preventing - * re-entrance, we're currently using a Semaphore. If at any point we decide read needs to be - * re-entrant, we can switch to a Reentrant lock. + * An interruptible, re-entrant lock is used since we know the access pattern will be one thread + * to read and another to call close if the session becomes idle. This lock allows the closing + * thread to interrupt the reading thread if it can't obtain the lock immediately. This way, the + * reading thread can finish its current operation and release the lock promptly. */ - private Semaphore scannerSemaphore; + private final InterruptibleLock lock; + + private final AtomicBoolean interruptFlag; - private AtomicBoolean interruptFlag; + private boolean readInProgress = false; Scanner(TabletBase tablet, Range range, ScanParameters scanParams, AtomicBoolean interruptFlag) { this.tablet = tablet; this.range = range; this.scanParams = scanParams; - this.scannerSemaphore = new Semaphore(1, true); + this.lock = new InterruptibleLock(); this.interruptFlag = interruptFlag; } @@ -72,7 +76,11 @@ public class Scanner { try { try { - scannerSemaphore.acquire(); + lock.lockInterruptibly(); + Preconditions.checkState(!readInProgress); + // Simple check to ensure the same thread never calls this method recursively. This code + // would not handle that well. + readInProgress = true; } catch (InterruptedException e) { sawException = true; } @@ -162,22 +170,39 @@ public class Scanner { tablet.updateQueryStats(results.getResults().size(), results.getNumBytes()); } } finally { - scannerSemaphore.release(); + readInProgress = false; + lock.unlock(); } } } - // close and read are synchronized because can not call close on the data source while it is in - // use - // this could lead to the case where file iterators that are in use by a thread are returned - // to the pool... this would be bad + private static class InterruptibleLock extends ReentrantLock { + private static final long serialVersionUID = 1L; + + public Thread getLockOwner() { + return getOwner(); + } + } + + /* + * close and read are controlled by an InterruptibleLock because we cannot call close on the data + * source while it is in use. Without this lock, there could be a situation where file iterators + * that are in use by a thread are returned to the pool, which would be bad. With the lock, a + * thread can attempt to close the Scanner. If it can't immediately acquire the lock (because a + * read is in progress), it interrupts the reading thread. This ensures the reading thread can + * finish its current operation and release the lock, allowing close to finish. + */ public boolean close() { interruptFlag.set(true); boolean obtainedLock = false; try { - obtainedLock = scannerSemaphore.tryAcquire(10, TimeUnit.MILLISECONDS); + obtainedLock = lock.tryLock(10, TimeUnit.MILLISECONDS); if (!obtainedLock) { + Thread ownerThread = lock.getLockOwner(); + if (ownerThread != null) { + ownerThread.interrupt(); + } return false; } @@ -189,7 +214,7 @@ public class Scanner { return false; } finally { if (obtainedLock) { - scannerSemaphore.release(); + lock.unlock(); } } return true;