This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch 2.1 in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push: new 95f128342f refactors how scan sessions track scan task (#4829) 95f128342f is described below commit 95f128342ffbec2517352e6fb80ab6799cda273d Author: Keith Turner <ktur...@apache.org> AuthorDate: Sat Aug 24 10:42:23 2024 -0700 refactors how scan sessions track scan task (#4829) This is a manual refactor of how scan sessions track scan task. MultiScanSession and SinceScanSession both extend ScanSession. Each was tracking scan task separately. Moved this tracking into the parent class ScanSession. This refactoring is needed for #4756 but want to do it on its own to simplify review for bug fix release changes. Co-authored-by: Christopher L. Shannon <cshan...@apache.org> --- .../org/apache/accumulo/tserver/ScanServer.java | 2 +- .../tserver/TabletServerResourceManager.java | 2 +- .../accumulo/tserver/ThriftScanClientHandler.java | 26 ++++++++--------- .../accumulo/tserver/session/MultiScanSession.java | 11 ++++--- .../accumulo/tserver/session/ScanSession.java | 34 +++++++++++++--------- .../accumulo/tserver/session/SessionManager.java | 8 ++--- .../tserver/session/SingleScanSession.java | 11 +++---- 7 files changed, 51 insertions(+), 43 deletions(-) diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java index 18cb23b37b..b7b1d472c5 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java @@ -793,7 +793,7 @@ public class ScanServer extends AbstractServer return new ScanReservation(scanSessionFiles, myReservationId); } - private static Set<StoredTabletFile> getScanSessionFiles(ScanSession session) { + private static Set<StoredTabletFile> getScanSessionFiles(ScanSession<?> session) { if (session instanceof SingleScanSession) { var sss = (SingleScanSession) session; return Set.copyOf(session.getTabletResolver().getTablet(sss.extent).getDatafiles().keySet()); diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java index 655b54fdbd..8f90e42d7a 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java @@ -785,7 +785,7 @@ public class TabletServerResourceManager { } - public void executeReadAhead(KeyExtent tablet, ScanDispatcher dispatcher, ScanSession scanInfo, + public void executeReadAhead(KeyExtent tablet, ScanDispatcher dispatcher, ScanSession<?> scanInfo, Runnable task) { task = ScanSession.wrap(scanInfo, task); diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftScanClientHandler.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftScanClientHandler.java index 8a99b2315e..666076caeb 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftScanClientHandler.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftScanClientHandler.java @@ -252,17 +252,17 @@ public class ThriftScanClientHandler implements TabletScanClientService.Iface { server.getScanMetrics().incrementContinueScan(1.0D); - if (scanSession.nextBatchTask == null) { - scanSession.nextBatchTask = new NextBatchTask(server, scanID, scanSession.interruptFlag); + if (scanSession.getScanTask() == null) { + scanSession.setScanTask(new NextBatchTask(server, scanID, scanSession.interruptFlag)); server.getResourceManager().executeReadAhead(scanSession.extent, - getScanDispatcher(scanSession.extent), scanSession, scanSession.nextBatchTask); + getScanDispatcher(scanSession.extent), scanSession, scanSession.getScanTask()); } ScanBatch bresult; try { - bresult = scanSession.nextBatchTask.get(busyTimeout, MAX_TIME_TO_WAIT_FOR_SCAN_RESULT_MILLIS, + bresult = scanSession.getScanTask().get(busyTimeout, MAX_TIME_TO_WAIT_FOR_SCAN_RESULT_MILLIS, TimeUnit.MILLISECONDS); - scanSession.nextBatchTask = null; + scanSession.setScanTask(null); } catch (ExecutionException e) { server.getSessionManager().removeSession(scanID); if (e.getCause() instanceof NotServingTabletException) { @@ -276,7 +276,7 @@ public class ThriftScanClientHandler implements TabletScanClientService.Iface { sleepUninterruptibly(MAX_TIME_TO_WAIT_FOR_SCAN_RESULT_MILLIS, TimeUnit.MILLISECONDS); List<KVEntry> empty = Collections.emptyList(); bresult = new ScanBatch(empty, true); - scanSession.nextBatchTask = null; + scanSession.setScanTask(null); } else { throw new RuntimeException(e); } @@ -311,9 +311,9 @@ public class ThriftScanClientHandler implements TabletScanClientService.Iface { if (scanResult.more && scanSession.batchCount > scanSession.readaheadThreshold) { // start reading next batch while current batch is transmitted // to client - scanSession.nextBatchTask = new NextBatchTask(server, scanID, scanSession.interruptFlag); + scanSession.setScanTask(new NextBatchTask(server, scanID, scanSession.interruptFlag)); server.getResourceManager().executeReadAhead(scanSession.extent, - getScanDispatcher(scanSession.extent), scanSession, scanSession.nextBatchTask); + getScanDispatcher(scanSession.extent), scanSession, scanSession.getScanTask()); } if (!scanResult.more) { @@ -472,17 +472,17 @@ public class ThriftScanClientHandler implements TabletScanClientService.Iface { server.getScanMetrics().incrementContinueScan(1.0D); - if (session.lookupTask == null) { - session.lookupTask = new LookupTask(server, scanID); + if (session.getScanTask() == null) { + session.setScanTask(new LookupTask(server, scanID)); server.getResourceManager().executeReadAhead(session.threadPoolExtent, - getScanDispatcher(session.threadPoolExtent), session, session.lookupTask); + getScanDispatcher(session.threadPoolExtent), session, session.getScanTask()); } try { - MultiScanResult scanResult = session.lookupTask.get(busyTimeout, + MultiScanResult scanResult = session.getScanTask().get(busyTimeout, MAX_TIME_TO_WAIT_FOR_SCAN_RESULT_MILLIS, TimeUnit.MILLISECONDS); - session.lookupTask = null; + session.setScanTask(null); return scanResult; } catch (ExecutionException e) { server.getSessionManager().removeSession(scanID); diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/MultiScanSession.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/MultiScanSession.java index c47c35656c..da597e94d0 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/MultiScanSession.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/MultiScanSession.java @@ -28,9 +28,8 @@ import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.dataImpl.thrift.MultiScanResult; import org.apache.accumulo.core.securityImpl.thrift.TCredentials; import org.apache.accumulo.tserver.scan.ScanParameters; -import org.apache.accumulo.tserver.scan.ScanTask; -public class MultiScanSession extends ScanSession { +public class MultiScanSession extends ScanSession<MultiScanResult> { public final KeyExtent threadPoolExtent; public final Map<KeyExtent,List<Range>> queries; public final Set<KeyExtent> exents; @@ -41,8 +40,6 @@ public class MultiScanSession extends ScanSession { public int numEntries; public long totalLookupTime; - public volatile ScanTask<MultiScanResult> lookupTask; - public MultiScanSession(TCredentials credentials, KeyExtent threadPoolExtent, Map<KeyExtent,List<Range>> queries, ScanParameters scanParams, Map<String,String> executionHints, TabletResolver tabletResolver) { @@ -64,8 +61,10 @@ public class MultiScanSession extends ScanSession { @Override public boolean cleanup() { - if (lookupTask != null) { - lookupTask.cancel(true); + // read volatile once to avoid race conditions + var localScanTask = getScanTask(); + if (localScanTask != null) { + localScanTask.cancel(true); } // the cancellation should provide us the safety to return true here return super.cleanup(); diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/ScanSession.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/ScanSession.java index 0fefcc1327..32e6c8bcaf 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/ScanSession.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/ScanSession.java @@ -34,11 +34,12 @@ import org.apache.accumulo.core.spi.common.Stats; import org.apache.accumulo.core.spi.scan.ScanInfo; import org.apache.accumulo.core.util.Stat; import org.apache.accumulo.tserver.scan.ScanParameters; +import org.apache.accumulo.tserver.scan.ScanTask; import org.apache.accumulo.tserver.tablet.TabletBase; import com.google.common.base.Preconditions; -public abstract class ScanSession extends Session implements ScanInfo { +public abstract class ScanSession<T> extends Session implements ScanInfo { public interface TabletResolver { TabletBase getTablet(KeyExtent extent); @@ -48,10 +49,10 @@ public abstract class ScanSession extends Session implements ScanInfo { public static class ScanMeasurer implements Runnable { - private ScanSession session; - private Runnable task; + private final ScanSession<?> session; + private final Runnable task; - ScanMeasurer(ScanSession session, Runnable task) { + ScanMeasurer(ScanSession<?> session, Runnable task) { this.session = session; this.task = task; } @@ -69,18 +70,20 @@ public abstract class ScanSession extends Session implements ScanInfo { } } - public static ScanMeasurer wrap(ScanSession scanInfo, Runnable r) { + public static ScanMeasurer wrap(ScanSession<?> scanInfo, Runnable r) { return new ScanMeasurer(scanInfo, r); } private OptionalLong lastRunTime = OptionalLong.empty(); - private Stat idleStats = new Stat(); - public Stat runStats = new Stat(); + private final Stat idleStats = new Stat(); + public final Stat runStats = new Stat(); public final ScanParameters scanParams; - private Map<String,String> executionHints; + private final Map<String,String> executionHints; private final TabletResolver tabletResolver; + private volatile ScanTask<T> scanTask; + ScanSession(TCredentials credentials, ScanParameters scanParams, Map<String,String> executionHints, TabletResolver tabletResolver) { super(credentials); @@ -129,7 +132,7 @@ public abstract class ScanSession extends Session implements ScanInfo { private class IterConfImpl implements IteratorConfiguration { - private IterInfo ii; + private final IterInfo ii; IterConfImpl(IterInfo ii) { this.ii = ii; @@ -180,13 +183,18 @@ public abstract class ScanSession extends Session implements ScanInfo { return tabletResolver; } + public ScanTask<T> getScanTask() { + return scanTask; + } + + public void setScanTask(ScanTask<T> scanTask) { + this.scanTask = scanTask; + } + @Override public boolean cleanup() { tabletResolver.close(); - if (!super.cleanup()) { - return false; - } - return true; + return super.cleanup(); } @Override diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java index 41d7b89738..7217ce9015 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java @@ -368,11 +368,11 @@ public class SessionManager { if (session instanceof SingleScanSession) { SingleScanSession ss = (SingleScanSession) session; - nbt = ss.nextBatchTask; + nbt = ss.getScanTask(); tableID = ss.extent.tableId(); } else if (session instanceof MultiScanSession) { MultiScanSession mss = (MultiScanSession) session; - nbt = mss.lookupTask; + nbt = mss.getScanTask(); tableID = mss.threadPoolExtent.tableId(); } @@ -407,7 +407,7 @@ public class SessionManager { ScanState state = ScanState.RUNNING; - ScanTask<ScanBatch> nbt = ss.nextBatchTask; + ScanTask<ScanBatch> nbt = ss.getScanTask(); if (nbt == null) { state = ScanState.IDLE; } else { @@ -443,7 +443,7 @@ public class SessionManager { ScanState state = ScanState.RUNNING; - ScanTask<MultiScanResult> nbt = mss.lookupTask; + ScanTask<MultiScanResult> nbt = mss.getScanTask(); if (nbt == null) { state = ScanState.IDLE; } else { diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SingleScanSession.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SingleScanSession.java index 8c48a01263..d99fa2cd93 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SingleScanSession.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SingleScanSession.java @@ -25,16 +25,15 @@ import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.securityImpl.thrift.TCredentials; import org.apache.accumulo.tserver.scan.ScanParameters; -import org.apache.accumulo.tserver.scan.ScanTask; import org.apache.accumulo.tserver.tablet.ScanBatch; import org.apache.accumulo.tserver.tablet.Scanner; -public class SingleScanSession extends ScanSession { +public class SingleScanSession extends ScanSession<ScanBatch> { public final KeyExtent extent; public final AtomicBoolean interruptFlag = new AtomicBoolean(); public long entriesReturned = 0; public long batchCount = 0; - public volatile ScanTask<ScanBatch> nextBatchTask; + public Scanner scanner; public final long readaheadThreshold; @@ -59,8 +58,10 @@ public class SingleScanSession extends ScanSession { public boolean cleanup() { final boolean ret; try { - if (nextBatchTask != null) { - nextBatchTask.cancel(true); + // read volatile once to avoid race conditions + var localScanTask = getScanTask(); + if (localScanTask != null) { + localScanTask.cancel(true); } } finally { if (scanner != null) {