This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new e8357c4d7f fixes CloseScannerIT to work with changes in #4840 (#4846)
e8357c4d7f is described below

commit e8357c4d7fc84f609f27ac530ea7b3554d4ea0ff
Author: Keith Turner <ktur...@apache.org>
AuthorDate: Thu Aug 29 16:54:28 2024 -0700

    fixes CloseScannerIT to work with changes in #4840 (#4846)
    
    After the changes in #4840 a scan session with an active thread would
    not be discarded.  The change made CloseScannerIT start failing in 3.1.
    Adjusted the test to account for this by giving time for deferred
    session cleanup that happens when there is an active thread associated
    with a scan session.
    
    The test was not failing in 2.1 because the test was less strict in this
    branch.  Applied this fix starting in 2.1 to make the test consistent
    across branches.
---
 .../org/apache/accumulo/test/CloseScannerIT.java   | 72 ++++++++++++++++++----
 .../apache/accumulo/test/functional/ScannerIT.java |  4 ++
 2 files changed, 64 insertions(+), 12 deletions(-)

diff --git a/test/src/main/java/org/apache/accumulo/test/CloseScannerIT.java 
b/test/src/main/java/org/apache/accumulo/test/CloseScannerIT.java
index 9f0bd56bb1..f6c157fd2b 100644
--- a/test/src/main/java/org/apache/accumulo/test/CloseScannerIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/CloseScannerIT.java
@@ -18,25 +18,45 @@
  */
 package org.apache.accumulo.test;
 
-import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.apache.accumulo.test.functional.ScannerIT.countActiveScans;
 
-import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.accumulo.core.client.Accumulo;
 import org.apache.accumulo.core.client.AccumuloClient;
 import org.apache.accumulo.core.client.IsolatedScanner;
 import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.Timer;
 import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
 import org.apache.accumulo.test.functional.ReadWriteIT;
+import org.apache.accumulo.test.util.Wait;
+import org.apache.hadoop.conf.Configuration;
 import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class CloseScannerIT extends AccumuloClusterHarness {
 
   static final int ROWS = 1000;
   static final int COLS = 1000;
 
+  private static final Logger log = 
LoggerFactory.getLogger(CloseScannerIT.class);
+
+  @Override
+  public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration 
hadoopCoreSite) {
+    Map<String,String> siteConfig = cfg.getSiteConfig();
+    siteConfig.put(Property.TSERV_SESSION_MAXIDLE.getKey(), "20s");
+    cfg.setSiteConfig(siteConfig);
+  }
+
+  /**
+   * {@link 
org.apache.accumulo.test.functional.ScannerIT#testSessionCleanup()} is a 
similar test.
+   */
   @Test
   public void testManyScans() throws Exception {
 
@@ -49,25 +69,53 @@ public class CloseScannerIT extends AccumuloClusterHarness {
 
       client.tableOperations().flush(tableName, null, null, true);
 
-      for (int i = 0; i < 200; i++) {
-        try (Scanner scanner = createScanner(client, tableName, i)) {
+      Timer timer = Timer.startNew();
+
+      int count = 0;
+      while (count < 200 && timer.elapsed(TimeUnit.MILLISECONDS) < 3000) {
+        try (Scanner scanner = createScanner(client, tableName, count)) {
           scanner.setRange(new Range());
-          scanner.setReadaheadThreshold(i % 2 == 0 ? 0 : 3);
+          scanner.setReadaheadThreshold(count % 2 == 0 ? 0 : 3);
 
-          for (int j = 0; j < i % 7 + 1; j++) {
+          for (int j = 0; j < count % 7 + 1; j++) {
             // only read a little data and quit, this should leave a session 
open on the tserver
             scanner.stream().limit(10).forEach(e -> {});
           }
         } // when the scanner is closed, all open sessions should be closed
+        count++;
       }
 
-      List<String> tservers = client.instanceOperations().getTabletServers();
-      int activeScans = 0;
-      for (String tserver : tservers) {
-        activeScans += 
client.instanceOperations().getActiveScans(tserver).size();
-      }
+      log.debug("Ran {} scans in {} ms", count, 
timer.elapsed(TimeUnit.MILLISECONDS));
+
+      // The goal of this test it to ensure the scanner client object closes 
server side scan
+      // sessions and not idle session cleanup. To do this the test is making 
the following
+      // assumptions about how Accumulo works to set the timings in this test :
+      // 1. Sessions not closed by the scanner will be cleaned up in 20s based 
on config set before
+      // starting test
+      // 2. This test creates readahead threads for some scans. The presence 
of a thread will
+      // prevent immediate cleanup of the server side scan session. So when 
the scanner sends the
+      // RPC to close the session if a thread is present, then cleanup will be 
deferred. A scheduled
+      // task in the tserver runs deferred cleanup every 
TSERV_SESSION_MAXIDLE/2 which is 10s.
+      //
+      // Putting the assumptions above together we know that if sessions are 
closed in less than
+      // 20s, then they were closed as result of the scanner.close() method 
initiating an RPC to
+      // remove the scan session. The 13s below allows time for the 10s 
deferred cleanup to run in
+      // the case when a thread is present. The 3s cap the test puts on 
running scans sets the total
+      // time the test will allow to 3s+13s=16s which is less than the 20s 
when idle session clean
+      // starts.
 
-      assertTrue(activeScans < 3);
+      Wait.waitFor(() -> countActiveScans(client, tableName) < 1, 13000, 250,
+          "Found active scans after closing all scanners. Expected to find no 
scans");
+
+      var elasped = timer.elapsed(TimeUnit.MILLISECONDS);
+      if (elasped > 20000) {
+        log.warn(
+            "Total time since first scan was run {}ms.  Unable to verify that 
scanner RPC closed "
+                + "sessions, could have been closed by idle session cleanup.",
+            elasped);
+      } else {
+        log.debug("Total time since first scan was run {}ms.", elasped);
+      }
     }
   }
 
diff --git 
a/test/src/main/java/org/apache/accumulo/test/functional/ScannerIT.java 
b/test/src/main/java/org/apache/accumulo/test/functional/ScannerIT.java
index 57c511f6ae..f767df4fdf 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/ScannerIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/ScannerIT.java
@@ -39,6 +39,7 @@ import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.util.UtilWaitThread;
 import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.accumulo.test.CloseScannerIT;
 import org.apache.accumulo.test.util.Wait;
 import org.junit.jupiter.api.Test;
 
@@ -118,6 +119,9 @@ public class ScannerIT extends AccumuloClusterHarness {
     }
   }
 
+  /**
+   * {@link CloseScannerIT#testManyScans()} is a similar test.
+   */
   @Test
   public void testSessionCleanup() throws Exception {
     final String tableName = getUniqueNames(1)[0];

Reply via email to