This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new 95f128342f refactors how scan sessions track scan task (#4829)
95f128342f is described below

commit 95f128342ffbec2517352e6fb80ab6799cda273d
Author: Keith Turner <ktur...@apache.org>
AuthorDate: Sat Aug 24 10:42:23 2024 -0700

    refactors how scan sessions track scan task (#4829)
    
    This is a manual refactor of how scan sessions track scan task.
    MultiScanSession and SinceScanSession both extend ScanSession.  Each was
    tracking  scan task separately.  Moved this tracking into the parent
    class ScanSession.  This refactoring is needed for #4756 but want to
    do it on its own to simplify review for bug fix release changes.
    
    
    Co-authored-by: Christopher L. Shannon <cshan...@apache.org>
---
 .../org/apache/accumulo/tserver/ScanServer.java    |  2 +-
 .../tserver/TabletServerResourceManager.java       |  2 +-
 .../accumulo/tserver/ThriftScanClientHandler.java  | 26 ++++++++---------
 .../accumulo/tserver/session/MultiScanSession.java | 11 ++++---
 .../accumulo/tserver/session/ScanSession.java      | 34 +++++++++++++---------
 .../accumulo/tserver/session/SessionManager.java   |  8 ++---
 .../tserver/session/SingleScanSession.java         | 11 +++----
 7 files changed, 51 insertions(+), 43 deletions(-)

diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
index 18cb23b37b..b7b1d472c5 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
@@ -793,7 +793,7 @@ public class ScanServer extends AbstractServer
     return new ScanReservation(scanSessionFiles, myReservationId);
   }
 
-  private static Set<StoredTabletFile> getScanSessionFiles(ScanSession 
session) {
+  private static Set<StoredTabletFile> getScanSessionFiles(ScanSession<?> 
session) {
     if (session instanceof SingleScanSession) {
       var sss = (SingleScanSession) session;
       return 
Set.copyOf(session.getTabletResolver().getTablet(sss.extent).getDatafiles().keySet());
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
index 655b54fdbd..8f90e42d7a 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
@@ -785,7 +785,7 @@ public class TabletServerResourceManager {
 
   }
 
-  public void executeReadAhead(KeyExtent tablet, ScanDispatcher dispatcher, 
ScanSession scanInfo,
+  public void executeReadAhead(KeyExtent tablet, ScanDispatcher dispatcher, 
ScanSession<?> scanInfo,
       Runnable task) {
 
     task = ScanSession.wrap(scanInfo, task);
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftScanClientHandler.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftScanClientHandler.java
index 8a99b2315e..666076caeb 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftScanClientHandler.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftScanClientHandler.java
@@ -252,17 +252,17 @@ public class ThriftScanClientHandler implements 
TabletScanClientService.Iface {
 
     server.getScanMetrics().incrementContinueScan(1.0D);
 
-    if (scanSession.nextBatchTask == null) {
-      scanSession.nextBatchTask = new NextBatchTask(server, scanID, 
scanSession.interruptFlag);
+    if (scanSession.getScanTask() == null) {
+      scanSession.setScanTask(new NextBatchTask(server, scanID, 
scanSession.interruptFlag));
       server.getResourceManager().executeReadAhead(scanSession.extent,
-          getScanDispatcher(scanSession.extent), scanSession, 
scanSession.nextBatchTask);
+          getScanDispatcher(scanSession.extent), scanSession, 
scanSession.getScanTask());
     }
 
     ScanBatch bresult;
     try {
-      bresult = scanSession.nextBatchTask.get(busyTimeout, 
MAX_TIME_TO_WAIT_FOR_SCAN_RESULT_MILLIS,
+      bresult = scanSession.getScanTask().get(busyTimeout, 
MAX_TIME_TO_WAIT_FOR_SCAN_RESULT_MILLIS,
           TimeUnit.MILLISECONDS);
-      scanSession.nextBatchTask = null;
+      scanSession.setScanTask(null);
     } catch (ExecutionException e) {
       server.getSessionManager().removeSession(scanID);
       if (e.getCause() instanceof NotServingTabletException) {
@@ -276,7 +276,7 @@ public class ThriftScanClientHandler implements 
TabletScanClientService.Iface {
         sleepUninterruptibly(MAX_TIME_TO_WAIT_FOR_SCAN_RESULT_MILLIS, 
TimeUnit.MILLISECONDS);
         List<KVEntry> empty = Collections.emptyList();
         bresult = new ScanBatch(empty, true);
-        scanSession.nextBatchTask = null;
+        scanSession.setScanTask(null);
       } else {
         throw new RuntimeException(e);
       }
@@ -311,9 +311,9 @@ public class ThriftScanClientHandler implements 
TabletScanClientService.Iface {
     if (scanResult.more && scanSession.batchCount > 
scanSession.readaheadThreshold) {
       // start reading next batch while current batch is transmitted
       // to client
-      scanSession.nextBatchTask = new NextBatchTask(server, scanID, 
scanSession.interruptFlag);
+      scanSession.setScanTask(new NextBatchTask(server, scanID, 
scanSession.interruptFlag));
       server.getResourceManager().executeReadAhead(scanSession.extent,
-          getScanDispatcher(scanSession.extent), scanSession, 
scanSession.nextBatchTask);
+          getScanDispatcher(scanSession.extent), scanSession, 
scanSession.getScanTask());
     }
 
     if (!scanResult.more) {
@@ -472,17 +472,17 @@ public class ThriftScanClientHandler implements 
TabletScanClientService.Iface {
 
     server.getScanMetrics().incrementContinueScan(1.0D);
 
-    if (session.lookupTask == null) {
-      session.lookupTask = new LookupTask(server, scanID);
+    if (session.getScanTask() == null) {
+      session.setScanTask(new LookupTask(server, scanID));
       server.getResourceManager().executeReadAhead(session.threadPoolExtent,
-          getScanDispatcher(session.threadPoolExtent), session, 
session.lookupTask);
+          getScanDispatcher(session.threadPoolExtent), session, 
session.getScanTask());
     }
 
     try {
 
-      MultiScanResult scanResult = session.lookupTask.get(busyTimeout,
+      MultiScanResult scanResult = session.getScanTask().get(busyTimeout,
           MAX_TIME_TO_WAIT_FOR_SCAN_RESULT_MILLIS, TimeUnit.MILLISECONDS);
-      session.lookupTask = null;
+      session.setScanTask(null);
       return scanResult;
     } catch (ExecutionException e) {
       server.getSessionManager().removeSession(scanID);
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/MultiScanSession.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/MultiScanSession.java
index c47c35656c..da597e94d0 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/MultiScanSession.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/MultiScanSession.java
@@ -28,9 +28,8 @@ import org.apache.accumulo.core.dataImpl.KeyExtent;
 import org.apache.accumulo.core.dataImpl.thrift.MultiScanResult;
 import org.apache.accumulo.core.securityImpl.thrift.TCredentials;
 import org.apache.accumulo.tserver.scan.ScanParameters;
-import org.apache.accumulo.tserver.scan.ScanTask;
 
-public class MultiScanSession extends ScanSession {
+public class MultiScanSession extends ScanSession<MultiScanResult> {
   public final KeyExtent threadPoolExtent;
   public final Map<KeyExtent,List<Range>> queries;
   public final Set<KeyExtent> exents;
@@ -41,8 +40,6 @@ public class MultiScanSession extends ScanSession {
   public int numEntries;
   public long totalLookupTime;
 
-  public volatile ScanTask<MultiScanResult> lookupTask;
-
   public MultiScanSession(TCredentials credentials, KeyExtent threadPoolExtent,
       Map<KeyExtent,List<Range>> queries, ScanParameters scanParams,
       Map<String,String> executionHints, TabletResolver tabletResolver) {
@@ -64,8 +61,10 @@ public class MultiScanSession extends ScanSession {
 
   @Override
   public boolean cleanup() {
-    if (lookupTask != null) {
-      lookupTask.cancel(true);
+    // read volatile once to avoid race conditions
+    var localScanTask = getScanTask();
+    if (localScanTask != null) {
+      localScanTask.cancel(true);
     }
     // the cancellation should provide us the safety to return true here
     return super.cleanup();
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/ScanSession.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/ScanSession.java
index 0fefcc1327..32e6c8bcaf 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/ScanSession.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/ScanSession.java
@@ -34,11 +34,12 @@ import org.apache.accumulo.core.spi.common.Stats;
 import org.apache.accumulo.core.spi.scan.ScanInfo;
 import org.apache.accumulo.core.util.Stat;
 import org.apache.accumulo.tserver.scan.ScanParameters;
+import org.apache.accumulo.tserver.scan.ScanTask;
 import org.apache.accumulo.tserver.tablet.TabletBase;
 
 import com.google.common.base.Preconditions;
 
-public abstract class ScanSession extends Session implements ScanInfo {
+public abstract class ScanSession<T> extends Session implements ScanInfo {
 
   public interface TabletResolver {
     TabletBase getTablet(KeyExtent extent);
@@ -48,10 +49,10 @@ public abstract class ScanSession extends Session 
implements ScanInfo {
 
   public static class ScanMeasurer implements Runnable {
 
-    private ScanSession session;
-    private Runnable task;
+    private final ScanSession<?> session;
+    private final Runnable task;
 
-    ScanMeasurer(ScanSession session, Runnable task) {
+    ScanMeasurer(ScanSession<?> session, Runnable task) {
       this.session = session;
       this.task = task;
     }
@@ -69,18 +70,20 @@ public abstract class ScanSession extends Session 
implements ScanInfo {
     }
   }
 
-  public static ScanMeasurer wrap(ScanSession scanInfo, Runnable r) {
+  public static ScanMeasurer wrap(ScanSession<?> scanInfo, Runnable r) {
     return new ScanMeasurer(scanInfo, r);
   }
 
   private OptionalLong lastRunTime = OptionalLong.empty();
-  private Stat idleStats = new Stat();
-  public Stat runStats = new Stat();
+  private final Stat idleStats = new Stat();
+  public final Stat runStats = new Stat();
 
   public final ScanParameters scanParams;
-  private Map<String,String> executionHints;
+  private final Map<String,String> executionHints;
   private final TabletResolver tabletResolver;
 
+  private volatile ScanTask<T> scanTask;
+
   ScanSession(TCredentials credentials, ScanParameters scanParams,
       Map<String,String> executionHints, TabletResolver tabletResolver) {
     super(credentials);
@@ -129,7 +132,7 @@ public abstract class ScanSession extends Session 
implements ScanInfo {
 
   private class IterConfImpl implements IteratorConfiguration {
 
-    private IterInfo ii;
+    private final IterInfo ii;
 
     IterConfImpl(IterInfo ii) {
       this.ii = ii;
@@ -180,13 +183,18 @@ public abstract class ScanSession extends Session 
implements ScanInfo {
     return tabletResolver;
   }
 
+  public ScanTask<T> getScanTask() {
+    return scanTask;
+  }
+
+  public void setScanTask(ScanTask<T> scanTask) {
+    this.scanTask = scanTask;
+  }
+
   @Override
   public boolean cleanup() {
     tabletResolver.close();
-    if (!super.cleanup()) {
-      return false;
-    }
-    return true;
+    return super.cleanup();
   }
 
   @Override
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java
index 41d7b89738..7217ce9015 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java
@@ -368,11 +368,11 @@ public class SessionManager {
 
       if (session instanceof SingleScanSession) {
         SingleScanSession ss = (SingleScanSession) session;
-        nbt = ss.nextBatchTask;
+        nbt = ss.getScanTask();
         tableID = ss.extent.tableId();
       } else if (session instanceof MultiScanSession) {
         MultiScanSession mss = (MultiScanSession) session;
-        nbt = mss.lookupTask;
+        nbt = mss.getScanTask();
         tableID = mss.threadPoolExtent.tableId();
       }
 
@@ -407,7 +407,7 @@ public class SessionManager {
 
         ScanState state = ScanState.RUNNING;
 
-        ScanTask<ScanBatch> nbt = ss.nextBatchTask;
+        ScanTask<ScanBatch> nbt = ss.getScanTask();
         if (nbt == null) {
           state = ScanState.IDLE;
         } else {
@@ -443,7 +443,7 @@ public class SessionManager {
 
         ScanState state = ScanState.RUNNING;
 
-        ScanTask<MultiScanResult> nbt = mss.lookupTask;
+        ScanTask<MultiScanResult> nbt = mss.getScanTask();
         if (nbt == null) {
           state = ScanState.IDLE;
         } else {
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SingleScanSession.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SingleScanSession.java
index 8c48a01263..d99fa2cd93 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SingleScanSession.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SingleScanSession.java
@@ -25,16 +25,15 @@ import org.apache.accumulo.core.data.TableId;
 import org.apache.accumulo.core.dataImpl.KeyExtent;
 import org.apache.accumulo.core.securityImpl.thrift.TCredentials;
 import org.apache.accumulo.tserver.scan.ScanParameters;
-import org.apache.accumulo.tserver.scan.ScanTask;
 import org.apache.accumulo.tserver.tablet.ScanBatch;
 import org.apache.accumulo.tserver.tablet.Scanner;
 
-public class SingleScanSession extends ScanSession {
+public class SingleScanSession extends ScanSession<ScanBatch> {
   public final KeyExtent extent;
   public final AtomicBoolean interruptFlag = new AtomicBoolean();
   public long entriesReturned = 0;
   public long batchCount = 0;
-  public volatile ScanTask<ScanBatch> nextBatchTask;
+
   public Scanner scanner;
   public final long readaheadThreshold;
 
@@ -59,8 +58,10 @@ public class SingleScanSession extends ScanSession {
   public boolean cleanup() {
     final boolean ret;
     try {
-      if (nextBatchTask != null) {
-        nextBatchTask.cancel(true);
+      // read volatile once to avoid race conditions
+      var localScanTask = getScanTask();
+      if (localScanTask != null) {
+        localScanTask.cancel(true);
       }
     } finally {
       if (scanner != null) {

Reply via email to