This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch 2.1 in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push: new f97055f6c9 cleanup scan session when batch scanner closed (#4841) f97055f6c9 is described below commit f97055f6c93b858bdc9b5d7f0a15353f62855891 Author: Keith Turner <ktur...@apache.org> AuthorDate: Tue Aug 27 14:40:55 2024 -0700 cleanup scan session when batch scanner closed (#4841) When not all data is read from a scanner it can leave server side scan sessions open. When the scanner object is closed on the client side it should close these server side scan sessions. The batch scanner was not doing this. Updated the batch scanner to close sever side sessions. Added test for the scanner and batch scanner to ensure sessions are cleaned up. Improved an existing test related to session timeout. --- .../TabletServerBatchReaderIterator.java | 20 +++++- .../test/functional/ScanSessionTimeOutIT.java | 19 ++++- .../apache/accumulo/test/functional/ScannerIT.java | 81 ++++++++++++++++++++++ 3 files changed, 117 insertions(+), 3 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java index ad3fedc2ff..a852531bc6 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java @@ -817,6 +817,9 @@ public class TabletServerBatchReaderIterator implements Iterator<Entry<Key,Value client = ThriftUtil.getClient(ThriftClientTypes.TABLET_SCAN, parsedServer, context); } + // Tracks unclosed scan session id for the case when the following try block exits with an + // exception. + Long scanIdToClose = null; try { Timer timer = null; @@ -850,6 +853,7 @@ public class TabletServerBatchReaderIterator implements Iterator<Entry<Key,Value ByteBufferUtil.toByteBuffers(authorizations.getAuthorizations()), waitForWrites, SamplerConfigurationImpl.toThrift(options.getSamplerConfiguration()), options.batchTimeout, options.classLoaderContext, execHints, busyTimeout); + scanIdToClose = imsr.scanID; if (waitForWrites) { ThriftScanner.serversWaitedForWrites.get(ttype).add(server.toString()); } @@ -916,9 +920,23 @@ public class TabletServerBatchReaderIterator implements Iterator<Entry<Key,Value } client.closeMultiScan(TraceUtil.traceInfo(), imsr.scanID); + scanIdToClose = null; } finally { - ThriftUtil.returnClient(client, context); + try { + if (scanIdToClose != null) { + // If this code is running it is likely that an exception happened and the scan session + // was never closed. Make a best effort attempt to close the scan session which will + // clean up server side resources. When the batch scanner is closed it will interrupt + // the threads in its thread pool which could cause an interrupted exception in this + // code. + client.closeMultiScan(TraceUtil.traceInfo(), scanIdToClose); + } + } catch (Exception e) { + log.trace("Failed to close scan session in finally {} {}", server, scanIdToClose, e); + } finally { + ThriftUtil.returnClient(client, context); + } } } catch (TTransportException e) { log.debug("Server : {} msg : {}", server, e.getMessage()); diff --git a/test/src/main/java/org/apache/accumulo/test/functional/ScanSessionTimeOutIT.java b/test/src/main/java/org/apache/accumulo/test/functional/ScanSessionTimeOutIT.java index 60d36035f8..741a02a18a 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/ScanSessionTimeOutIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/ScanSessionTimeOutIT.java @@ -19,6 +19,8 @@ package org.apache.accumulo.test.functional; import static org.apache.accumulo.core.util.UtilWaitThread.sleepUninterruptibly; +import static org.apache.accumulo.test.functional.ScannerIT.countActiveScans; +import static org.junit.jupiter.api.Assertions.assertEquals; import java.time.Duration; import java.util.Iterator; @@ -118,12 +120,25 @@ public class ScanSessionTimeOutIT extends AccumuloClusterHarness { Iterator<Entry<Key,Value>> iter = scanner.iterator(); verify(iter, 0, 200); + // There should be a scan session open since not all data was read from the iterator + assertEquals(1L, countActiveScans(c, tableName)); // sleep three times the session timeout sleepUninterruptibly(9, TimeUnit.SECONDS); - - verify(iter, 200, 100000); + // The scan session should have timed out and the next read should create a new one + assertEquals(0L, countActiveScans(c, tableName)); + + verify(iter, 200, 50000); + // Reading part of the data in the range should cause a new scan session to be created + assertEquals(1L, countActiveScans(c, tableName)); + verify(iter, 50000, 100000); + // Once all of the data in the range was read the scanner should automatically close the + // scan session + assertEquals(0L, countActiveScans(c, tableName)); } + + // Nothing should have created any ew scan sessions for the table + assertEquals(0L, countActiveScans(c, tableName)); } } diff --git a/test/src/main/java/org/apache/accumulo/test/functional/ScannerIT.java b/test/src/main/java/org/apache/accumulo/test/functional/ScannerIT.java index 805b791916..57c511f6ae 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/ScannerIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/ScannerIT.java @@ -18,10 +18,12 @@ */ package org.apache.accumulo.test.functional; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; import java.time.Duration; import java.util.Iterator; +import java.util.List; import java.util.Map.Entry; import org.apache.accumulo.core.client.Accumulo; @@ -29,6 +31,7 @@ import org.apache.accumulo.core.client.AccumuloClient; import org.apache.accumulo.core.client.BatchWriter; import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.Range; @@ -36,6 +39,7 @@ import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.util.UtilWaitThread; import org.apache.accumulo.harness.AccumuloClusterHarness; +import org.apache.accumulo.test.util.Wait; import org.junit.jupiter.api.Test; public class ScannerIT extends AccumuloClusterHarness { @@ -113,4 +117,81 @@ public class ScannerIT extends AccumuloClusterHarness { } } } + + @Test + public void testSessionCleanup() throws Exception { + final String tableName = getUniqueNames(1)[0]; + try (AccumuloClient accumuloClient = Accumulo.newClient().from(getClientProps()).build()) { + + accumuloClient.tableOperations().create(tableName); + + try (var writer = accumuloClient.createBatchWriter(tableName)) { + for (int i = 0; i < 100000; i++) { + var m = new Mutation(String.format("%09d", i)); + m.put("1", "1", "" + i); + writer.addMutation(m); + } + } + + // The test assumes the session timeout is configured to 1 minute, validate this. Later in the + // test 10s is given for session to disappear and we want this 10s to be much smaller than the + // configured session timeout. + assertEquals("1m", accumuloClient.instanceOperations().getSystemConfiguration() + .get(Property.TSERV_SESSION_MAXIDLE.getKey())); + + // The following test that when not all data is read from scanner that when the scanner is + // closed that any open sessions will be closed. + for (int i = 0; i < 3; i++) { + try (var scanner = accumuloClient.createScanner(tableName)) { + assertEquals(10, scanner.stream().limit(10).count()); + assertEquals(10000, scanner.stream().limit(10000).count()); + // since not all data in the range was read from the scanner it should leave an active + // scan session per scanner iterator created + assertEquals(2, countActiveScans(accumuloClient, tableName)); + } + // When close is called on on the scanner it should close the scan session. The session + // cleanup is async on the server because task may still be running server side, but it + // should happen in less than the session timeout. Also the server should start working on + // it immediately. + Wait.waitFor(() -> countActiveScans(accumuloClient, tableName) == 0, 10000); + + try (var scanner = accumuloClient.createBatchScanner(tableName)) { + scanner.setRanges(List.of(new Range())); + assertEquals(10, scanner.stream().limit(10).count()); + assertEquals(10000, scanner.stream().limit(10000).count()); + assertEquals(2, countActiveScans(accumuloClient, tableName)); + } + Wait.waitFor(() -> countActiveScans(accumuloClient, tableName) == 0, 10000); + } + + // Test the case where all data is read from a scanner. In this case the scanner should close + // the scan session at the end of the range even before the scanner itself is closed. + for (int i = 0; i < 3; i++) { + try (var scanner = accumuloClient.createScanner(tableName)) { + assertEquals(100000, scanner.stream().count()); + assertEquals(100000, scanner.stream().count()); + // The server side cleanup of the session should be able to happen immediately in this + // case because nothing should be running on the server side to fetch data because all + // data in the range was fetched. + assertEquals(0, countActiveScans(accumuloClient, tableName)); + } + + try (var scanner = accumuloClient.createBatchScanner(tableName)) { + scanner.setRanges(List.of(new Range())); + assertEquals(100000, scanner.stream().count()); + assertEquals(100000, scanner.stream().count()); + assertEquals(0, countActiveScans(accumuloClient, tableName)); + } + } + } + } + + public static long countActiveScans(AccumuloClient c, String tableName) throws Exception { + long count = 0; + for (String tserver : c.instanceOperations().getTabletServers()) { + count += c.instanceOperations().getActiveScans(tserver).stream() + .filter(activeScan -> activeScan.getTable().equals(tableName)).count(); + } + return count; + } }