This is an automated email from the ASF dual-hosted git repository.

krathbun pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new 2bec74ad83 Bug fix no scan id for batch scans (#4871)
2bec74ad83 is described below

commit 2bec74ad83b0c4cbdf98ba67337f62aaf5806ce7
Author: Kevin Rathbun <krath...@apache.org>
AuthorDate: Fri Sep 13 11:39:25 2024 -0400

    Bug fix no scan id for batch scans (#4871)
    
    * Bug fix no scan id for batch scans
    
    closes #4868
    - Batch scans scan ids are now set
    - At the same time, got rid of code duplication and improved readability
      of SessionManager.getActiveScans()
    - Added functionality to test this case in ScanIdIT
---
 .../accumulo/tserver/session/SessionManager.java   | 109 +++++++---------
 .../apache/accumulo/test/functional/ScanIdIT.java  | 139 +++++++++++++++------
 2 files changed, 144 insertions(+), 104 deletions(-)

diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java
index d32dbcd14c..497287cda6 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java
@@ -47,7 +47,7 @@ import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.Column;
 import org.apache.accumulo.core.data.TableId;
-import org.apache.accumulo.core.dataImpl.thrift.MultiScanResult;
+import org.apache.accumulo.core.dataImpl.KeyExtent;
 import org.apache.accumulo.core.tabletserver.thrift.ActiveScan;
 import org.apache.accumulo.core.tabletserver.thrift.ScanState;
 import org.apache.accumulo.core.tabletserver.thrift.ScanType;
@@ -55,10 +55,10 @@ import org.apache.accumulo.core.util.MapCounter;
 import org.apache.accumulo.core.util.Retry;
 import org.apache.accumulo.core.util.threads.ThreadPools;
 import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.tserver.scan.ScanParameters;
 import org.apache.accumulo.tserver.scan.ScanRunState;
 import org.apache.accumulo.tserver.scan.ScanTask;
 import org.apache.accumulo.tserver.session.Session.State;
-import org.apache.accumulo.tserver.tablet.ScanBatch;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -453,7 +453,7 @@ public class SessionManager {
     final Set<Entry<Long,Session>> copiedIdleSessions = new HashSet<>();
 
     /**
-     * Add sessions so that get the list returned in the active scans call
+     * Add sessions that get the list returned in the active scans call
      */
     for (Session session : deferredCleanupQueue) {
       copiedIdleSessions.add(Maps.immutableEntry(expiredSessionMarker, 
session));
@@ -461,75 +461,56 @@ public class SessionManager {
 
     List.of(sessions.entrySet(), copiedIdleSessions).forEach(s -> 
s.forEach(entry -> {
       Session session = entry.getValue();
-      if (session instanceof SingleScanSession) {
-        SingleScanSession ss = (SingleScanSession) session;
-
-        ScanState state = ScanState.RUNNING;
-
-        ScanTask<ScanBatch> nbt = ss.getScanTask();
-        if (nbt == null) {
-          state = ScanState.IDLE;
-        } else {
-          switch (nbt.getScanRunState()) {
-            case QUEUED:
-              state = ScanState.QUEUED;
-              break;
-            case FINISHED:
-              state = ScanState.IDLE;
-              break;
-            case RUNNING:
-            default:
-              /* do nothing */
-              break;
-          }
-        }
 
-        var params = ss.scanParams;
-        ActiveScan activeScan = new ActiveScan(ss.client, ss.getUser(),
-            ss.extent.tableId().canonical(), ct - ss.startTime, ct - 
ss.lastAccessTime,
-            ScanType.SINGLE, state, ss.extent.toThrift(),
-            
params.getColumnSet().stream().map(Column::toThrift).collect(Collectors.toList()),
-            params.getSsiList(), params.getSsio(), 
params.getAuthorizations().getAuthorizationsBB(),
-            params.getClassLoaderContext());
+      if (session instanceof ScanSession) {
+        ScanSession<?> scanSession = (ScanSession<?>) session;
+        boolean isSingle = session instanceof SingleScanSession;
 
-        // scanId added by ACCUMULO-2641 is an optional thrift argument and 
not available in
-        // ActiveScan constructor
-        activeScan.setScanId(entry.getKey());
-        activeScans.add(activeScan);
+        addActiveScan(activeScans, scanSession,
+            isSingle ? ((SingleScanSession) scanSession).extent
+                : ((MultiScanSession) scanSession).threadPoolExtent,
+            ct, isSingle ? ScanType.SINGLE : ScanType.BATCH,
+            computeScanState(scanSession.getScanTask()), 
scanSession.scanParams, entry.getKey());
+      }
+    }));
 
-      } else if (session instanceof MultiScanSession) {
-        MultiScanSession mss = (MultiScanSession) session;
+    return activeScans;
+  }
 
-        ScanState state = ScanState.RUNNING;
+  private ScanState computeScanState(ScanTask<?> scanTask) {
+    ScanState state = ScanState.RUNNING;
 
-        ScanTask<MultiScanResult> nbt = mss.getScanTask();
-        if (nbt == null) {
+    if (scanTask == null) {
+      state = ScanState.IDLE;
+    } else {
+      switch (scanTask.getScanRunState()) {
+        case QUEUED:
+          state = ScanState.QUEUED;
+          break;
+        case FINISHED:
           state = ScanState.IDLE;
-        } else {
-          switch (nbt.getScanRunState()) {
-            case QUEUED:
-              state = ScanState.QUEUED;
-              break;
-            case FINISHED:
-              state = ScanState.IDLE;
-              break;
-            case RUNNING:
-            default:
-              /* do nothing */
-              break;
-          }
-        }
+          break;
+        case RUNNING:
+        default:
+          /* do nothing */
+          break;
+      }
+    }
 
-        var params = mss.scanParams;
-        activeScans.add(new ActiveScan(mss.client, mss.getUser(),
-            mss.threadPoolExtent.tableId().canonical(), ct - mss.startTime, ct 
- mss.lastAccessTime,
-            ScanType.BATCH, state, mss.threadPoolExtent.toThrift(),
+    return state;
+  }
+
+  private void addActiveScan(List<ActiveScan> activeScans, Session session, 
KeyExtent extent,
+      long ct, ScanType scanType, ScanState state, ScanParameters params, long 
scanId) {
+    ActiveScan activeScan =
+        new ActiveScan(session.client, session.getUser(), 
extent.tableId().canonical(),
+            ct - session.startTime, ct - session.lastAccessTime, scanType, 
state, extent.toThrift(),
             
params.getColumnSet().stream().map(Column::toThrift).collect(Collectors.toList()),
             params.getSsiList(), params.getSsio(), 
params.getAuthorizations().getAuthorizationsBB(),
-            params.getClassLoaderContext()));
-      }
-    }));
-
-    return activeScans;
+            params.getClassLoaderContext());
+    // scanId added by ACCUMULO-2641 is an optional thrift argument and not 
available in
+    // ActiveScan constructor
+    activeScan.setScanId(scanId);
+    activeScans.add(activeScan);
   }
 }
diff --git 
a/test/src/main/java/org/apache/accumulo/test/functional/ScanIdIT.java 
b/test/src/main/java/org/apache/accumulo/test/functional/ScanIdIT.java
index c880052a1d..4f7139c6a8 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/ScanIdIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/ScanIdIT.java
@@ -19,13 +19,14 @@
 package org.apache.accumulo.test.functional;
 
 import static 
org.apache.accumulo.core.util.UtilWaitThread.sleepUninterruptibly;
-import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
 
 import java.time.Duration;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashSet;
 import java.util.List;
@@ -44,6 +45,7 @@ import org.apache.accumulo.core.client.Accumulo;
 import org.apache.accumulo.core.client.AccumuloClient;
 import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchScanner;
 import org.apache.accumulo.core.client.BatchWriter;
 import org.apache.accumulo.core.client.IteratorSetting;
 import org.apache.accumulo.core.client.MutationsRejectedException;
@@ -86,20 +88,17 @@ import org.slf4j.LoggerFactory;
 public class ScanIdIT extends AccumuloClusterHarness {
 
   private static final Logger log = LoggerFactory.getLogger(ScanIdIT.class);
-
-  private static final int NUM_SCANNERS = 8;
-
+  private static final int NUM_SINGLE_SCANNERS = 8;
+  private static final int NUM_BATCH_SCANNERS = 1;
+  private static final int NUM_TOTAL_SCANNERS = NUM_SINGLE_SCANNERS + 
NUM_BATCH_SCANNERS;
   private static final int NUM_DATA_ROWS = 100;
-
-  private static final ExecutorService pool = 
Executors.newFixedThreadPool(NUM_SCANNERS);
-
+  private static final ExecutorService pool = 
Executors.newFixedThreadPool(NUM_TOTAL_SCANNERS);
   private static final AtomicBoolean testInProgress = new AtomicBoolean(true);
-
   private static final Map<Integer,Value> resultsByWorker = new 
ConcurrentHashMap<>();
 
   @Override
   protected Duration defaultTimeout() {
-    return Duration.ofMinutes(1);
+    return Duration.ofMinutes(2);
   }
 
   /**
@@ -122,19 +121,28 @@ public class ScanIdIT extends AccumuloClusterHarness {
 
       attachSlowIterator(client, tableName);
 
-      CountDownLatch latch = new CountDownLatch(NUM_SCANNERS);
+      CountDownLatch latch = new CountDownLatch(NUM_TOTAL_SCANNERS);
 
-      List<ScannerThread> scanThreadsToClose = new ArrayList<>(NUM_SCANNERS);
-      for (int scannerIndex = 0; scannerIndex < NUM_SCANNERS; scannerIndex++) {
+      // Ensure that scan ids are working as expected for both Scanner and 
BatchScanner
+      List<ScannerThread> scanThreadsToClose = new 
ArrayList<>(NUM_SINGLE_SCANNERS);
+      List<BatchScannerThread> batchScanThreadsToClose = new 
ArrayList<>(NUM_BATCH_SCANNERS);
+      // workers 0 through NUM_SINGLE_SCANNERS - 1 use Scanner
+      for (int scannerIndex = 0; scannerIndex < NUM_SINGLE_SCANNERS; 
scannerIndex++) {
         ScannerThread st = new ScannerThread(client, scannerIndex, tableName, 
latch);
         scanThreadsToClose.add(st);
         pool.execute(st);
       }
+      // workers NUM_SINGLE_SCANNER through NUM_TOTAL_SCANNERS - 1 use 
BatchScanner
+      for (int bsIndex = NUM_SINGLE_SCANNERS; bsIndex < NUM_TOTAL_SCANNERS; 
bsIndex++) {
+        BatchScannerThread bst = new BatchScannerThread(client, bsIndex, 
tableName, latch);
+        batchScanThreadsToClose.add(bst);
+        pool.execute(bst);
+      }
 
       // wait for scanners to report a result.
       while (testInProgress.get()) {
 
-        if (resultsByWorker.size() < NUM_SCANNERS) {
+        if (resultsByWorker.size() < NUM_TOTAL_SCANNERS) {
           log.trace("Results reported {}", resultsByWorker.size());
           sleepUninterruptibly(750, TimeUnit.MILLISECONDS);
         } else {
@@ -150,21 +158,19 @@ public class ScanIdIT extends AccumuloClusterHarness {
       }
 
       Set<Long> scanIds = getScanIds(client);
-      assertTrue(scanIds.size() >= NUM_SCANNERS,
-          "Expected at least " + NUM_SCANNERS + " scanIds, but saw " + 
scanIds.size());
+      assertTrue(scanIds.size() >= NUM_TOTAL_SCANNERS,
+          "Expected at least " + NUM_TOTAL_SCANNERS + " scanIds, but saw " + 
scanIds.size());
+      // A scan id should have been set regardless of whether a Scanner or 
BatchScanner was used
+      scanIds.forEach(scanId -> assertNotEquals(0L, scanId, "saw a scanId that 
was never set"));
 
-      scanThreadsToClose.forEach(st -> {
-        if (st.scanner != null) {
-          st.scanner.close();
-        }
-      });
+      // Close all scanners. All should be non-null, test should fail (NPE) 
otherwise
+      scanThreadsToClose.forEach(st -> st.scanner.close());
+      batchScanThreadsToClose.forEach(bst -> bst.bs.close());
 
-      while (!(scanIds = getScanIds(client)).isEmpty()) {
+      while (!getScanIds(client).isEmpty()) {
         log.debug("Waiting for active scans to stop...");
         Thread.sleep(200);
       }
-      assertEquals(0, scanIds.size(), "Expected no scanIds after closing 
scanners");
-
     }
   }
 
@@ -236,33 +242,26 @@ public class ScanIdIT extends AccumuloClusterHarness {
      */
     @Override
     public void run() {
-
       latch.countDown();
       try {
         latch.await();
       } catch (InterruptedException e) {
-        log.error("Thread interrupted with id {}", workerIndex);
+        log.error("ScannerThread interrupted with id {}", workerIndex);
         Thread.currentThread().interrupt();
         return;
       }
 
-      log.debug("Creating scanner in worker thread {}", workerIndex);
-
+      log.debug("Creating Scanner in ScannerThread worker {}", workerIndex);
       try {
-
         scanner = accumuloClient.createScanner(tablename, new 
Authorizations());
-
         // Never start readahead
         scanner.setReadaheadThreshold(Long.MAX_VALUE);
         scanner.setBatchSize(1);
-
         // create different ranges to try to hit more than one tablet.
         scanner.setRange(new Range(new Text(Integer.toString(workerIndex)), 
new Text("9")));
-
         scanner.fetchColumnFamily(new Text("fam1"));
 
         for (Map.Entry<Key,Value> entry : scanner) {
-
           // exit when success condition is met.
           if (!testInProgress.get()) {
             scanner.clearScanIterators();
@@ -271,29 +270,89 @@ public class ScanIdIT extends AccumuloClusterHarness {
 
           Text row = entry.getKey().getRow();
 
-          log.debug("worker {}, row {}", workerIndex, row);
+          log.debug("ScannerThread worker {}, row {}", workerIndex, row);
 
           if (entry.getValue() != null) {
-
             Value prevValue = resultsByWorker.put(workerIndex, 
entry.getValue());
-
             // value should always being increasing
             if (prevValue != null) {
-
-              log.trace("worker {} values {}", workerIndex,
+              log.trace("ScannerThread worker {} values {}", workerIndex,
                   String.format("%1$s < %2$s", prevValue, entry.getValue()));
-
               assertTrue(prevValue.compareTo(entry.getValue()) > 0);
             }
           } else {
             log.info("Scanner returned null");
             fail("Scanner returned unexpected null value");
           }
-
         }
         log.debug("Scanner ran out of data. (info only, not an error) ");
       } catch (TableNotFoundException e) {
-        throw new IllegalStateException("Initialization failure. Could not 
create scanner", e);
+        throw new IllegalStateException("Initialization failure. Could not 
create Scanner", e);
+      } finally {
+        // don't close scanner here, because it will clean up the scan ids 
we're checking for
+      }
+    }
+  }
+
+  /**
+   * Runs BatchScanner in separate thread to allow multiple scanners to 
execute in parallel.
+   * <p>
+   * The thread run method is terminated when the testInProgress flag is set 
to false.
+   */
+  private static class BatchScannerThread implements Runnable {
+    private final AccumuloClient accumuloClient;
+    private BatchScanner bs;
+    private final int workerIndex;
+    private final String tableName;
+    private final CountDownLatch latch;
+
+    public BatchScannerThread(AccumuloClient accumuloClient, int workerIndex, 
String tableName,
+        CountDownLatch latch) {
+      this.accumuloClient = accumuloClient;
+      this.workerIndex = workerIndex;
+      this.tableName = tableName;
+      this.latch = latch;
+    }
+
+    @Override
+    public void run() {
+      latch.countDown();
+      try {
+        latch.await();
+      } catch (InterruptedException e) {
+        log.error("BatchScannerThread interrupted with id {}", workerIndex);
+        Thread.currentThread().interrupt();
+        return;
+      }
+
+      log.debug("Creating BatchScanner in BatchScannerThread worker {}", 
workerIndex);
+      try {
+        bs = accumuloClient.createBatchScanner(tableName);
+        bs.setRanges(Collections.singletonList(new Range()));
+        bs.fetchColumnFamily(new Text("fam1"));
+
+        for (Map.Entry<Key,Value> entry : bs) {
+          // exit when success condition is met.
+          if (!testInProgress.get()) {
+            bs.clearScanIterators();
+            return;
+          }
+
+          Text row = entry.getKey().getRow();
+
+          log.debug("BatchScannerThread worker {}, row {}", workerIndex, row);
+
+          if (entry.getValue() != null) {
+            resultsByWorker.put(workerIndex, entry.getValue());
+            // should not check that the values are increasing since this is a 
BatchScanner
+          } else {
+            log.info("BatchScanner returned null");
+            fail("BatchScanner returned unexpected null value");
+          }
+        }
+        log.debug("BatchScanner ran out of data. (info only, not an error) ");
+      } catch (TableNotFoundException | AccumuloSecurityException | 
AccumuloException e) {
+        throw new IllegalStateException("Initialization failure. Could not 
create BatchScanner", e);
       } finally {
         // don't close scanner here, because it will clean up the scan ids 
we're checking for
       }

Reply via email to