This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new f97055f6c9 cleanup scan session when batch scanner closed (#4841)
f97055f6c9 is described below

commit f97055f6c93b858bdc9b5d7f0a15353f62855891
Author: Keith Turner <ktur...@apache.org>
AuthorDate: Tue Aug 27 14:40:55 2024 -0700

    cleanup scan session when batch scanner closed (#4841)
    
    When not all data is read from a scanner it can leave server side scan
    sessions open.  When the scanner object is closed on the client side
    it should close these server side scan sessions.  The batch scanner was
    not doing this.  Updated the batch scanner to close sever side sessions.
    Added test for the scanner and batch scanner to ensure sessions are
    cleaned up.  Improved an existing test related to session timeout.
---
 .../TabletServerBatchReaderIterator.java           | 20 +++++-
 .../test/functional/ScanSessionTimeOutIT.java      | 19 ++++-
 .../apache/accumulo/test/functional/ScannerIT.java | 81 ++++++++++++++++++++++
 3 files changed, 117 insertions(+), 3 deletions(-)

diff --git 
a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java
 
b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java
index ad3fedc2ff..a852531bc6 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java
@@ -817,6 +817,9 @@ public class TabletServerBatchReaderIterator implements 
Iterator<Entry<Key,Value
         client = ThriftUtil.getClient(ThriftClientTypes.TABLET_SCAN, 
parsedServer, context);
       }
 
+      // Tracks unclosed scan session id for the case when the following try 
block exits with an
+      // exception.
+      Long scanIdToClose = null;
       try {
 
         Timer timer = null;
@@ -850,6 +853,7 @@ public class TabletServerBatchReaderIterator implements 
Iterator<Entry<Key,Value
             ByteBufferUtil.toByteBuffers(authorizations.getAuthorizations()), 
waitForWrites,
             
SamplerConfigurationImpl.toThrift(options.getSamplerConfiguration()),
             options.batchTimeout, options.classLoaderContext, execHints, 
busyTimeout);
+        scanIdToClose = imsr.scanID;
         if (waitForWrites) {
           
ThriftScanner.serversWaitedForWrites.get(ttype).add(server.toString());
         }
@@ -916,9 +920,23 @@ public class TabletServerBatchReaderIterator implements 
Iterator<Entry<Key,Value
         }
 
         client.closeMultiScan(TraceUtil.traceInfo(), imsr.scanID);
+        scanIdToClose = null;
 
       } finally {
-        ThriftUtil.returnClient(client, context);
+        try {
+          if (scanIdToClose != null) {
+            // If this code is running it is likely that an exception happened 
and the scan session
+            // was never closed. Make a best effort attempt to close the scan 
session which will
+            // clean up server side resources. When the batch scanner is 
closed it will interrupt
+            // the threads in its thread pool which could cause an interrupted 
exception in this
+            // code.
+            client.closeMultiScan(TraceUtil.traceInfo(), scanIdToClose);
+          }
+        } catch (Exception e) {
+          log.trace("Failed to close scan session in finally {} {}", server, 
scanIdToClose, e);
+        } finally {
+          ThriftUtil.returnClient(client, context);
+        }
       }
     } catch (TTransportException e) {
       log.debug("Server : {} msg : {}", server, e.getMessage());
diff --git 
a/test/src/main/java/org/apache/accumulo/test/functional/ScanSessionTimeOutIT.java
 
b/test/src/main/java/org/apache/accumulo/test/functional/ScanSessionTimeOutIT.java
index 60d36035f8..741a02a18a 100644
--- 
a/test/src/main/java/org/apache/accumulo/test/functional/ScanSessionTimeOutIT.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/functional/ScanSessionTimeOutIT.java
@@ -19,6 +19,8 @@
 package org.apache.accumulo.test.functional;
 
 import static 
org.apache.accumulo.core.util.UtilWaitThread.sleepUninterruptibly;
+import static org.apache.accumulo.test.functional.ScannerIT.countActiveScans;
+import static org.junit.jupiter.api.Assertions.assertEquals;
 
 import java.time.Duration;
 import java.util.Iterator;
@@ -118,12 +120,25 @@ public class ScanSessionTimeOutIT extends 
AccumuloClusterHarness {
         Iterator<Entry<Key,Value>> iter = scanner.iterator();
 
         verify(iter, 0, 200);
+        // There should be a scan session open since not all data was read 
from the iterator
+        assertEquals(1L, countActiveScans(c, tableName));
 
         // sleep three times the session timeout
         sleepUninterruptibly(9, TimeUnit.SECONDS);
-
-        verify(iter, 200, 100000);
+        // The scan session should have timed out and the next read should 
create a new one
+        assertEquals(0L, countActiveScans(c, tableName));
+
+        verify(iter, 200, 50000);
+        // Reading part of the data in the range should cause a new scan 
session to be created
+        assertEquals(1L, countActiveScans(c, tableName));
+        verify(iter, 50000, 100000);
+        // Once all of the data in the range was read the scanner should 
automatically close the
+        // scan session
+        assertEquals(0L, countActiveScans(c, tableName));
       }
+
+      // Nothing should have created any ew scan sessions for the table
+      assertEquals(0L, countActiveScans(c, tableName));
     }
   }
 
diff --git 
a/test/src/main/java/org/apache/accumulo/test/functional/ScannerIT.java 
b/test/src/main/java/org/apache/accumulo/test/functional/ScannerIT.java
index 805b791916..57c511f6ae 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/ScannerIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/ScannerIT.java
@@ -18,10 +18,12 @@
  */
 package org.apache.accumulo.test.functional;
 
+import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import java.time.Duration;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map.Entry;
 
 import org.apache.accumulo.core.client.Accumulo;
@@ -29,6 +31,7 @@ import org.apache.accumulo.core.client.AccumuloClient;
 import org.apache.accumulo.core.client.BatchWriter;
 import org.apache.accumulo.core.client.IteratorSetting;
 import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.Range;
@@ -36,6 +39,7 @@ import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.util.UtilWaitThread;
 import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.accumulo.test.util.Wait;
 import org.junit.jupiter.api.Test;
 
 public class ScannerIT extends AccumuloClusterHarness {
@@ -113,4 +117,81 @@ public class ScannerIT extends AccumuloClusterHarness {
       }
     }
   }
+
+  @Test
+  public void testSessionCleanup() throws Exception {
+    final String tableName = getUniqueNames(1)[0];
+    try (AccumuloClient accumuloClient = 
Accumulo.newClient().from(getClientProps()).build()) {
+
+      accumuloClient.tableOperations().create(tableName);
+
+      try (var writer = accumuloClient.createBatchWriter(tableName)) {
+        for (int i = 0; i < 100000; i++) {
+          var m = new Mutation(String.format("%09d", i));
+          m.put("1", "1", "" + i);
+          writer.addMutation(m);
+        }
+      }
+
+      // The test assumes the session timeout is configured to 1 minute, 
validate this. Later in the
+      // test 10s is given for session to disappear and we want this 10s to be 
much smaller than the
+      // configured session timeout.
+      assertEquals("1m", 
accumuloClient.instanceOperations().getSystemConfiguration()
+          .get(Property.TSERV_SESSION_MAXIDLE.getKey()));
+
+      // The following test that when not all data is read from scanner that 
when the scanner is
+      // closed that any open sessions will be closed.
+      for (int i = 0; i < 3; i++) {
+        try (var scanner = accumuloClient.createScanner(tableName)) {
+          assertEquals(10, scanner.stream().limit(10).count());
+          assertEquals(10000, scanner.stream().limit(10000).count());
+          // since not all data in the range was read from the scanner it 
should leave an active
+          // scan session per scanner iterator created
+          assertEquals(2, countActiveScans(accumuloClient, tableName));
+        }
+        // When close is called on on the scanner it should close the scan 
session. The session
+        // cleanup is async on the server because task may still be running 
server side, but it
+        // should happen in less than the session timeout. Also the server 
should start working on
+        // it immediately.
+        Wait.waitFor(() -> countActiveScans(accumuloClient, tableName) == 0, 
10000);
+
+        try (var scanner = accumuloClient.createBatchScanner(tableName)) {
+          scanner.setRanges(List.of(new Range()));
+          assertEquals(10, scanner.stream().limit(10).count());
+          assertEquals(10000, scanner.stream().limit(10000).count());
+          assertEquals(2, countActiveScans(accumuloClient, tableName));
+        }
+        Wait.waitFor(() -> countActiveScans(accumuloClient, tableName) == 0, 
10000);
+      }
+
+      // Test the case where all data is read from a scanner. In this case the 
scanner should close
+      // the scan session at the end of the range even before the scanner 
itself is closed.
+      for (int i = 0; i < 3; i++) {
+        try (var scanner = accumuloClient.createScanner(tableName)) {
+          assertEquals(100000, scanner.stream().count());
+          assertEquals(100000, scanner.stream().count());
+          // The server side cleanup of the session should be able to happen 
immediately in this
+          // case because nothing should be running on the server side to 
fetch data because all
+          // data in the range was fetched.
+          assertEquals(0, countActiveScans(accumuloClient, tableName));
+        }
+
+        try (var scanner = accumuloClient.createBatchScanner(tableName)) {
+          scanner.setRanges(List.of(new Range()));
+          assertEquals(100000, scanner.stream().count());
+          assertEquals(100000, scanner.stream().count());
+          assertEquals(0, countActiveScans(accumuloClient, tableName));
+        }
+      }
+    }
+  }
+
+  public static long countActiveScans(AccumuloClient c, String tableName) 
throws Exception {
+    long count = 0;
+    for (String tserver : c.instanceOperations().getTabletServers()) {
+      count += c.instanceOperations().getActiveScans(tserver).stream()
+          .filter(activeScan -> 
activeScan.getTable().equals(tableName)).count();
+    }
+    return count;
+  }
 }

Reply via email to