This is an automated email from the ASF dual-hosted git repository. krathbun pushed a commit to branch 2.1 in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push: new 2bec74ad83 Bug fix no scan id for batch scans (#4871) 2bec74ad83 is described below commit 2bec74ad83b0c4cbdf98ba67337f62aaf5806ce7 Author: Kevin Rathbun <krath...@apache.org> AuthorDate: Fri Sep 13 11:39:25 2024 -0400 Bug fix no scan id for batch scans (#4871) * Bug fix no scan id for batch scans closes #4868 - Batch scans scan ids are now set - At the same time, got rid of code duplication and improved readability of SessionManager.getActiveScans() - Added functionality to test this case in ScanIdIT --- .../accumulo/tserver/session/SessionManager.java | 109 +++++++--------- .../apache/accumulo/test/functional/ScanIdIT.java | 139 +++++++++++++++------ 2 files changed, 144 insertions(+), 104 deletions(-) diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java index d32dbcd14c..497287cda6 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java @@ -47,7 +47,7 @@ import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.Column; import org.apache.accumulo.core.data.TableId; -import org.apache.accumulo.core.dataImpl.thrift.MultiScanResult; +import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.tabletserver.thrift.ActiveScan; import org.apache.accumulo.core.tabletserver.thrift.ScanState; import org.apache.accumulo.core.tabletserver.thrift.ScanType; @@ -55,10 +55,10 @@ import org.apache.accumulo.core.util.MapCounter; import org.apache.accumulo.core.util.Retry; import org.apache.accumulo.core.util.threads.ThreadPools; import org.apache.accumulo.server.ServerContext; +import org.apache.accumulo.tserver.scan.ScanParameters; import org.apache.accumulo.tserver.scan.ScanRunState; import org.apache.accumulo.tserver.scan.ScanTask; import org.apache.accumulo.tserver.session.Session.State; -import org.apache.accumulo.tserver.tablet.ScanBatch; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -453,7 +453,7 @@ public class SessionManager { final Set<Entry<Long,Session>> copiedIdleSessions = new HashSet<>(); /** - * Add sessions so that get the list returned in the active scans call + * Add sessions that get the list returned in the active scans call */ for (Session session : deferredCleanupQueue) { copiedIdleSessions.add(Maps.immutableEntry(expiredSessionMarker, session)); @@ -461,75 +461,56 @@ public class SessionManager { List.of(sessions.entrySet(), copiedIdleSessions).forEach(s -> s.forEach(entry -> { Session session = entry.getValue(); - if (session instanceof SingleScanSession) { - SingleScanSession ss = (SingleScanSession) session; - - ScanState state = ScanState.RUNNING; - - ScanTask<ScanBatch> nbt = ss.getScanTask(); - if (nbt == null) { - state = ScanState.IDLE; - } else { - switch (nbt.getScanRunState()) { - case QUEUED: - state = ScanState.QUEUED; - break; - case FINISHED: - state = ScanState.IDLE; - break; - case RUNNING: - default: - /* do nothing */ - break; - } - } - var params = ss.scanParams; - ActiveScan activeScan = new ActiveScan(ss.client, ss.getUser(), - ss.extent.tableId().canonical(), ct - ss.startTime, ct - ss.lastAccessTime, - ScanType.SINGLE, state, ss.extent.toThrift(), - params.getColumnSet().stream().map(Column::toThrift).collect(Collectors.toList()), - params.getSsiList(), params.getSsio(), params.getAuthorizations().getAuthorizationsBB(), - params.getClassLoaderContext()); + if (session instanceof ScanSession) { + ScanSession<?> scanSession = (ScanSession<?>) session; + boolean isSingle = session instanceof SingleScanSession; - // scanId added by ACCUMULO-2641 is an optional thrift argument and not available in - // ActiveScan constructor - activeScan.setScanId(entry.getKey()); - activeScans.add(activeScan); + addActiveScan(activeScans, scanSession, + isSingle ? ((SingleScanSession) scanSession).extent + : ((MultiScanSession) scanSession).threadPoolExtent, + ct, isSingle ? ScanType.SINGLE : ScanType.BATCH, + computeScanState(scanSession.getScanTask()), scanSession.scanParams, entry.getKey()); + } + })); - } else if (session instanceof MultiScanSession) { - MultiScanSession mss = (MultiScanSession) session; + return activeScans; + } - ScanState state = ScanState.RUNNING; + private ScanState computeScanState(ScanTask<?> scanTask) { + ScanState state = ScanState.RUNNING; - ScanTask<MultiScanResult> nbt = mss.getScanTask(); - if (nbt == null) { + if (scanTask == null) { + state = ScanState.IDLE; + } else { + switch (scanTask.getScanRunState()) { + case QUEUED: + state = ScanState.QUEUED; + break; + case FINISHED: state = ScanState.IDLE; - } else { - switch (nbt.getScanRunState()) { - case QUEUED: - state = ScanState.QUEUED; - break; - case FINISHED: - state = ScanState.IDLE; - break; - case RUNNING: - default: - /* do nothing */ - break; - } - } + break; + case RUNNING: + default: + /* do nothing */ + break; + } + } - var params = mss.scanParams; - activeScans.add(new ActiveScan(mss.client, mss.getUser(), - mss.threadPoolExtent.tableId().canonical(), ct - mss.startTime, ct - mss.lastAccessTime, - ScanType.BATCH, state, mss.threadPoolExtent.toThrift(), + return state; + } + + private void addActiveScan(List<ActiveScan> activeScans, Session session, KeyExtent extent, + long ct, ScanType scanType, ScanState state, ScanParameters params, long scanId) { + ActiveScan activeScan = + new ActiveScan(session.client, session.getUser(), extent.tableId().canonical(), + ct - session.startTime, ct - session.lastAccessTime, scanType, state, extent.toThrift(), params.getColumnSet().stream().map(Column::toThrift).collect(Collectors.toList()), params.getSsiList(), params.getSsio(), params.getAuthorizations().getAuthorizationsBB(), - params.getClassLoaderContext())); - } - })); - - return activeScans; + params.getClassLoaderContext()); + // scanId added by ACCUMULO-2641 is an optional thrift argument and not available in + // ActiveScan constructor + activeScan.setScanId(scanId); + activeScans.add(activeScan); } } diff --git a/test/src/main/java/org/apache/accumulo/test/functional/ScanIdIT.java b/test/src/main/java/org/apache/accumulo/test/functional/ScanIdIT.java index c880052a1d..4f7139c6a8 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/ScanIdIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/ScanIdIT.java @@ -19,13 +19,14 @@ package org.apache.accumulo.test.functional; import static org.apache.accumulo.core.util.UtilWaitThread.sleepUninterruptibly; -import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; import java.time.Duration; import java.util.ArrayList; +import java.util.Collections; import java.util.EnumSet; import java.util.HashSet; import java.util.List; @@ -44,6 +45,7 @@ import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.client.AccumuloClient; import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.BatchScanner; import org.apache.accumulo.core.client.BatchWriter; import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.client.MutationsRejectedException; @@ -86,20 +88,17 @@ import org.slf4j.LoggerFactory; public class ScanIdIT extends AccumuloClusterHarness { private static final Logger log = LoggerFactory.getLogger(ScanIdIT.class); - - private static final int NUM_SCANNERS = 8; - + private static final int NUM_SINGLE_SCANNERS = 8; + private static final int NUM_BATCH_SCANNERS = 1; + private static final int NUM_TOTAL_SCANNERS = NUM_SINGLE_SCANNERS + NUM_BATCH_SCANNERS; private static final int NUM_DATA_ROWS = 100; - - private static final ExecutorService pool = Executors.newFixedThreadPool(NUM_SCANNERS); - + private static final ExecutorService pool = Executors.newFixedThreadPool(NUM_TOTAL_SCANNERS); private static final AtomicBoolean testInProgress = new AtomicBoolean(true); - private static final Map<Integer,Value> resultsByWorker = new ConcurrentHashMap<>(); @Override protected Duration defaultTimeout() { - return Duration.ofMinutes(1); + return Duration.ofMinutes(2); } /** @@ -122,19 +121,28 @@ public class ScanIdIT extends AccumuloClusterHarness { attachSlowIterator(client, tableName); - CountDownLatch latch = new CountDownLatch(NUM_SCANNERS); + CountDownLatch latch = new CountDownLatch(NUM_TOTAL_SCANNERS); - List<ScannerThread> scanThreadsToClose = new ArrayList<>(NUM_SCANNERS); - for (int scannerIndex = 0; scannerIndex < NUM_SCANNERS; scannerIndex++) { + // Ensure that scan ids are working as expected for both Scanner and BatchScanner + List<ScannerThread> scanThreadsToClose = new ArrayList<>(NUM_SINGLE_SCANNERS); + List<BatchScannerThread> batchScanThreadsToClose = new ArrayList<>(NUM_BATCH_SCANNERS); + // workers 0 through NUM_SINGLE_SCANNERS - 1 use Scanner + for (int scannerIndex = 0; scannerIndex < NUM_SINGLE_SCANNERS; scannerIndex++) { ScannerThread st = new ScannerThread(client, scannerIndex, tableName, latch); scanThreadsToClose.add(st); pool.execute(st); } + // workers NUM_SINGLE_SCANNER through NUM_TOTAL_SCANNERS - 1 use BatchScanner + for (int bsIndex = NUM_SINGLE_SCANNERS; bsIndex < NUM_TOTAL_SCANNERS; bsIndex++) { + BatchScannerThread bst = new BatchScannerThread(client, bsIndex, tableName, latch); + batchScanThreadsToClose.add(bst); + pool.execute(bst); + } // wait for scanners to report a result. while (testInProgress.get()) { - if (resultsByWorker.size() < NUM_SCANNERS) { + if (resultsByWorker.size() < NUM_TOTAL_SCANNERS) { log.trace("Results reported {}", resultsByWorker.size()); sleepUninterruptibly(750, TimeUnit.MILLISECONDS); } else { @@ -150,21 +158,19 @@ public class ScanIdIT extends AccumuloClusterHarness { } Set<Long> scanIds = getScanIds(client); - assertTrue(scanIds.size() >= NUM_SCANNERS, - "Expected at least " + NUM_SCANNERS + " scanIds, but saw " + scanIds.size()); + assertTrue(scanIds.size() >= NUM_TOTAL_SCANNERS, + "Expected at least " + NUM_TOTAL_SCANNERS + " scanIds, but saw " + scanIds.size()); + // A scan id should have been set regardless of whether a Scanner or BatchScanner was used + scanIds.forEach(scanId -> assertNotEquals(0L, scanId, "saw a scanId that was never set")); - scanThreadsToClose.forEach(st -> { - if (st.scanner != null) { - st.scanner.close(); - } - }); + // Close all scanners. All should be non-null, test should fail (NPE) otherwise + scanThreadsToClose.forEach(st -> st.scanner.close()); + batchScanThreadsToClose.forEach(bst -> bst.bs.close()); - while (!(scanIds = getScanIds(client)).isEmpty()) { + while (!getScanIds(client).isEmpty()) { log.debug("Waiting for active scans to stop..."); Thread.sleep(200); } - assertEquals(0, scanIds.size(), "Expected no scanIds after closing scanners"); - } } @@ -236,33 +242,26 @@ public class ScanIdIT extends AccumuloClusterHarness { */ @Override public void run() { - latch.countDown(); try { latch.await(); } catch (InterruptedException e) { - log.error("Thread interrupted with id {}", workerIndex); + log.error("ScannerThread interrupted with id {}", workerIndex); Thread.currentThread().interrupt(); return; } - log.debug("Creating scanner in worker thread {}", workerIndex); - + log.debug("Creating Scanner in ScannerThread worker {}", workerIndex); try { - scanner = accumuloClient.createScanner(tablename, new Authorizations()); - // Never start readahead scanner.setReadaheadThreshold(Long.MAX_VALUE); scanner.setBatchSize(1); - // create different ranges to try to hit more than one tablet. scanner.setRange(new Range(new Text(Integer.toString(workerIndex)), new Text("9"))); - scanner.fetchColumnFamily(new Text("fam1")); for (Map.Entry<Key,Value> entry : scanner) { - // exit when success condition is met. if (!testInProgress.get()) { scanner.clearScanIterators(); @@ -271,29 +270,89 @@ public class ScanIdIT extends AccumuloClusterHarness { Text row = entry.getKey().getRow(); - log.debug("worker {}, row {}", workerIndex, row); + log.debug("ScannerThread worker {}, row {}", workerIndex, row); if (entry.getValue() != null) { - Value prevValue = resultsByWorker.put(workerIndex, entry.getValue()); - // value should always being increasing if (prevValue != null) { - - log.trace("worker {} values {}", workerIndex, + log.trace("ScannerThread worker {} values {}", workerIndex, String.format("%1$s < %2$s", prevValue, entry.getValue())); - assertTrue(prevValue.compareTo(entry.getValue()) > 0); } } else { log.info("Scanner returned null"); fail("Scanner returned unexpected null value"); } - } log.debug("Scanner ran out of data. (info only, not an error) "); } catch (TableNotFoundException e) { - throw new IllegalStateException("Initialization failure. Could not create scanner", e); + throw new IllegalStateException("Initialization failure. Could not create Scanner", e); + } finally { + // don't close scanner here, because it will clean up the scan ids we're checking for + } + } + } + + /** + * Runs BatchScanner in separate thread to allow multiple scanners to execute in parallel. + * <p> + * The thread run method is terminated when the testInProgress flag is set to false. + */ + private static class BatchScannerThread implements Runnable { + private final AccumuloClient accumuloClient; + private BatchScanner bs; + private final int workerIndex; + private final String tableName; + private final CountDownLatch latch; + + public BatchScannerThread(AccumuloClient accumuloClient, int workerIndex, String tableName, + CountDownLatch latch) { + this.accumuloClient = accumuloClient; + this.workerIndex = workerIndex; + this.tableName = tableName; + this.latch = latch; + } + + @Override + public void run() { + latch.countDown(); + try { + latch.await(); + } catch (InterruptedException e) { + log.error("BatchScannerThread interrupted with id {}", workerIndex); + Thread.currentThread().interrupt(); + return; + } + + log.debug("Creating BatchScanner in BatchScannerThread worker {}", workerIndex); + try { + bs = accumuloClient.createBatchScanner(tableName); + bs.setRanges(Collections.singletonList(new Range())); + bs.fetchColumnFamily(new Text("fam1")); + + for (Map.Entry<Key,Value> entry : bs) { + // exit when success condition is met. + if (!testInProgress.get()) { + bs.clearScanIterators(); + return; + } + + Text row = entry.getKey().getRow(); + + log.debug("BatchScannerThread worker {}, row {}", workerIndex, row); + + if (entry.getValue() != null) { + resultsByWorker.put(workerIndex, entry.getValue()); + // should not check that the values are increasing since this is a BatchScanner + } else { + log.info("BatchScanner returned null"); + fail("BatchScanner returned unexpected null value"); + } + } + log.debug("BatchScanner ran out of data. (info only, not an error) "); + } catch (TableNotFoundException | AccumuloSecurityException | AccumuloException e) { + throw new IllegalStateException("Initialization failure. Could not create BatchScanner", e); } finally { // don't close scanner here, because it will clean up the scan ids we're checking for }