This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch 2.1 in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push: new e8357c4d7f fixes CloseScannerIT to work with changes in #4840 (#4846) e8357c4d7f is described below commit e8357c4d7fc84f609f27ac530ea7b3554d4ea0ff Author: Keith Turner <ktur...@apache.org> AuthorDate: Thu Aug 29 16:54:28 2024 -0700 fixes CloseScannerIT to work with changes in #4840 (#4846) After the changes in #4840 a scan session with an active thread would not be discarded. The change made CloseScannerIT start failing in 3.1. Adjusted the test to account for this by giving time for deferred session cleanup that happens when there is an active thread associated with a scan session. The test was not failing in 2.1 because the test was less strict in this branch. Applied this fix starting in 2.1 to make the test consistent across branches. --- .../org/apache/accumulo/test/CloseScannerIT.java | 72 ++++++++++++++++++---- .../apache/accumulo/test/functional/ScannerIT.java | 4 ++ 2 files changed, 64 insertions(+), 12 deletions(-) diff --git a/test/src/main/java/org/apache/accumulo/test/CloseScannerIT.java b/test/src/main/java/org/apache/accumulo/test/CloseScannerIT.java index 9f0bd56bb1..f6c157fd2b 100644 --- a/test/src/main/java/org/apache/accumulo/test/CloseScannerIT.java +++ b/test/src/main/java/org/apache/accumulo/test/CloseScannerIT.java @@ -18,25 +18,45 @@ */ package org.apache.accumulo.test; -import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.apache.accumulo.test.functional.ScannerIT.countActiveScans; -import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.client.AccumuloClient; import org.apache.accumulo.core.client.IsolatedScanner; import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.util.Timer; import org.apache.accumulo.harness.AccumuloClusterHarness; +import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; import org.apache.accumulo.test.functional.ReadWriteIT; +import org.apache.accumulo.test.util.Wait; +import org.apache.hadoop.conf.Configuration; import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class CloseScannerIT extends AccumuloClusterHarness { static final int ROWS = 1000; static final int COLS = 1000; + private static final Logger log = LoggerFactory.getLogger(CloseScannerIT.class); + + @Override + public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { + Map<String,String> siteConfig = cfg.getSiteConfig(); + siteConfig.put(Property.TSERV_SESSION_MAXIDLE.getKey(), "20s"); + cfg.setSiteConfig(siteConfig); + } + + /** + * {@link org.apache.accumulo.test.functional.ScannerIT#testSessionCleanup()} is a similar test. + */ @Test public void testManyScans() throws Exception { @@ -49,25 +69,53 @@ public class CloseScannerIT extends AccumuloClusterHarness { client.tableOperations().flush(tableName, null, null, true); - for (int i = 0; i < 200; i++) { - try (Scanner scanner = createScanner(client, tableName, i)) { + Timer timer = Timer.startNew(); + + int count = 0; + while (count < 200 && timer.elapsed(TimeUnit.MILLISECONDS) < 3000) { + try (Scanner scanner = createScanner(client, tableName, count)) { scanner.setRange(new Range()); - scanner.setReadaheadThreshold(i % 2 == 0 ? 0 : 3); + scanner.setReadaheadThreshold(count % 2 == 0 ? 0 : 3); - for (int j = 0; j < i % 7 + 1; j++) { + for (int j = 0; j < count % 7 + 1; j++) { // only read a little data and quit, this should leave a session open on the tserver scanner.stream().limit(10).forEach(e -> {}); } } // when the scanner is closed, all open sessions should be closed + count++; } - List<String> tservers = client.instanceOperations().getTabletServers(); - int activeScans = 0; - for (String tserver : tservers) { - activeScans += client.instanceOperations().getActiveScans(tserver).size(); - } + log.debug("Ran {} scans in {} ms", count, timer.elapsed(TimeUnit.MILLISECONDS)); + + // The goal of this test it to ensure the scanner client object closes server side scan + // sessions and not idle session cleanup. To do this the test is making the following + // assumptions about how Accumulo works to set the timings in this test : + // 1. Sessions not closed by the scanner will be cleaned up in 20s based on config set before + // starting test + // 2. This test creates readahead threads for some scans. The presence of a thread will + // prevent immediate cleanup of the server side scan session. So when the scanner sends the + // RPC to close the session if a thread is present, then cleanup will be deferred. A scheduled + // task in the tserver runs deferred cleanup every TSERV_SESSION_MAXIDLE/2 which is 10s. + // + // Putting the assumptions above together we know that if sessions are closed in less than + // 20s, then they were closed as result of the scanner.close() method initiating an RPC to + // remove the scan session. The 13s below allows time for the 10s deferred cleanup to run in + // the case when a thread is present. The 3s cap the test puts on running scans sets the total + // time the test will allow to 3s+13s=16s which is less than the 20s when idle session clean + // starts. - assertTrue(activeScans < 3); + Wait.waitFor(() -> countActiveScans(client, tableName) < 1, 13000, 250, + "Found active scans after closing all scanners. Expected to find no scans"); + + var elasped = timer.elapsed(TimeUnit.MILLISECONDS); + if (elasped > 20000) { + log.warn( + "Total time since first scan was run {}ms. Unable to verify that scanner RPC closed " + + "sessions, could have been closed by idle session cleanup.", + elasped); + } else { + log.debug("Total time since first scan was run {}ms.", elasped); + } } } diff --git a/test/src/main/java/org/apache/accumulo/test/functional/ScannerIT.java b/test/src/main/java/org/apache/accumulo/test/functional/ScannerIT.java index 57c511f6ae..f767df4fdf 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/ScannerIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/ScannerIT.java @@ -39,6 +39,7 @@ import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.util.UtilWaitThread; import org.apache.accumulo.harness.AccumuloClusterHarness; +import org.apache.accumulo.test.CloseScannerIT; import org.apache.accumulo.test.util.Wait; import org.junit.jupiter.api.Test; @@ -118,6 +119,9 @@ public class ScannerIT extends AccumuloClusterHarness { } } + /** + * {@link CloseScannerIT#testManyScans()} is a similar test. + */ @Test public void testSessionCleanup() throws Exception { final String tableName = getUniqueNames(1)[0];