This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/elasticity by this push: new 770d949bfd Added ScanServer tests for different tablet hosting goals (#3388) 770d949bfd is described below commit 770d949bfdc592a641f406cf3e73460ff4d34474 Author: Dave Marion <dlmar...@apache.org> AuthorDate: Mon May 15 10:06:47 2023 -0400 Added ScanServer tests for different tablet hosting goals (#3388) Co-authored-by: DomGarguilo <dominic.gargu...@gmail.com> --- .../org/apache/accumulo/test/ScanServerIT.java | 113 +++++++++++++++++++++ .../accumulo/test/ScanServerIT_NoServers.java | 63 ++++++++++++ 2 files changed, 176 insertions(+) diff --git a/test/src/main/java/org/apache/accumulo/test/ScanServerIT.java b/test/src/main/java/org/apache/accumulo/test/ScanServerIT.java index 7576413129..b68f311871 100644 --- a/test/src/main/java/org/apache/accumulo/test/ScanServerIT.java +++ b/test/src/main/java/org/apache/accumulo/test/ScanServerIT.java @@ -24,10 +24,16 @@ import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; +import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.Objects; import java.util.Properties; +import java.util.SortedSet; +import java.util.TreeSet; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.IntStream; import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.client.Accumulo; @@ -38,10 +44,13 @@ import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.ScannerBase.ConsistencyLevel; import org.apache.accumulo.core.client.TimedOutException; import org.apache.accumulo.core.client.admin.NewTableConfiguration; +import org.apache.accumulo.core.client.admin.TabletHostingGoal; import org.apache.accumulo.core.conf.ClientProperty; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; +import org.apache.accumulo.core.metadata.MetadataTable; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.CurrentLocationColumnFamily; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.harness.MiniClusterConfigurationCallback; import org.apache.accumulo.harness.SharedMiniClusterBase; @@ -49,6 +58,8 @@ import org.apache.accumulo.minicluster.ServerType; import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; import org.apache.accumulo.test.functional.ReadWriteIT; import org.apache.accumulo.test.functional.SlowIterator; +import org.apache.accumulo.test.util.Wait; +import org.apache.hadoop.io.Text; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Disabled; @@ -74,6 +85,10 @@ public class ScanServerIT extends SharedMiniClusterBase { // Configure the scan server to only have 1 scan executor thread. This means // that the scan server will run scans serially, not concurrently. cfg.setProperty(Property.SSERV_SCAN_EXECUTORS_DEFAULT_THREADS, "1"); + + cfg.setProperty(Property.MANAGER_TABLET_GROUP_WATCHER_INTERVAL, "5"); + cfg.setProperty(Property.TSERV_ONDEMAND_UNLOADER_INTERVAL, "10"); + cfg.setProperty("table.custom.ondemand.unloader.inactivity.threshold.seconds", "15"); } } @@ -205,6 +220,96 @@ public class ScanServerIT extends SharedMiniClusterBase { } } + @Test + public void testScanWithTabletHostingMix() throws Exception { + try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { + String tableName = getUniqueNames(1)[0]; + + final int ingestedEntryCount = setupTableWithHostingMix(client, tableName); + + try (Scanner scanner = client.createScanner(tableName, Authorizations.EMPTY)) { + scanner.setRange(new Range()); + scanner.setConsistencyLevel(ConsistencyLevel.EVENTUAL); + assertEquals(ingestedEntryCount, Iterables.size(scanner), + "The scan server scanner should have seen all ingested and flushed entries"); + // Throws an exception because of the tablets with the NEVER hosting goal + scanner.setConsistencyLevel(ConsistencyLevel.IMMEDIATE); + assertThrows(RuntimeException.class, () -> Iterables.size(scanner)); + + // Test that hosted ranges work + scanner.setRange(new Range(null, "row_0000000003")); + assertEquals(40, Iterables.size(scanner)); + + scanner.setRange(new Range("row_0000000008", null)); + assertEquals(20, Iterables.size(scanner)); + } // when the scanner is closed, all open sessions should be closed + } + } + + @Test + public void testBatchScanWithTabletHostingMix() throws Exception { + try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { + String tableName = getUniqueNames(1)[0]; + + final int ingestedEntryCount = setupTableWithHostingMix(client, tableName); + + try (BatchScanner scanner = client.createBatchScanner(tableName, Authorizations.EMPTY)) { + scanner.setRanges(Collections.singleton(new Range())); + scanner.setConsistencyLevel(ConsistencyLevel.EVENTUAL); + assertEquals(ingestedEntryCount, Iterables.size(scanner), + "The scan server scanner should have seen all ingested and flushed entries"); + // Throws an exception because of the tablets with the NEVER hosting goal + scanner.setConsistencyLevel(ConsistencyLevel.IMMEDIATE); + assertThrows(RuntimeException.class, () -> Iterables.size(scanner)); + + // Test that hosted ranges work + Collection<Range> ranges = new ArrayList<>(); + ranges.add(new Range(null, "row_0000000003")); + ranges.add(new Range("row_0000000008", null)); + scanner.setRanges(ranges); + assertEquals(60, Iterables.size(scanner)); + } // when the scanner is closed, all open sessions should be closed + } + } + + /** + * Sets up a table with a mix of tablet hosting goals. Specific ranges of rows are set to ALWAYS, + * NEVER, and ONDEMAND hosting goals. The method waits for the NEVER and ONDEMAND tablets to be + * unloaded due to inactivity before returning. + * + * @param client The AccumuloClient to use for the operation + * @param tableName The name of the table to be created and set up + * @return The count of ingested entries + */ + protected static int setupTableWithHostingMix(AccumuloClient client, String tableName) + throws Exception { + SortedSet<Text> splits = + IntStream.rangeClosed(1, 9).mapToObj(i -> new Text("row_000000000" + i)) + .collect(Collectors.toCollection(TreeSet::new)); + + NewTableConfiguration ntc = new NewTableConfiguration(); + ntc.withSplits(splits); + ntc.withInitialHostingGoal(TabletHostingGoal.ALWAYS); // speed up ingest + final int ingestedEntryCount = createTableAndIngest(client, tableName, ntc, 10, 10, "colf"); + + String tableId = client.tableOperations().tableIdMap().get(tableName); + + // row 1 -> 3 are always + client.tableOperations().setTabletHostingGoal(tableName, + new Range(null, true, "row_0000000003", true), TabletHostingGoal.ALWAYS); + // row 4 -> 7 are never + client.tableOperations().setTabletHostingGoal(tableName, + new Range("row_0000000004", true, "row_0000000007", true), TabletHostingGoal.NEVER); + // row 8 and 9 are ondemand + client.tableOperations().setTabletHostingGoal(tableName, + new Range("row_0000000008", true, null, true), TabletHostingGoal.ONDEMAND); + + // Wait for the NEVER and ONDEMAND tablets to be unloaded due to inactivity + Wait.waitFor(() -> ScanServerIT.getNumHostedTablets(client, tableId) == 3, 30_000, 1_000); + + return ingestedEntryCount; + } + /** * Create a table with the given name and the given client. Then, ingest into the table using * {@link #ingest(AccumuloClient, String, int, int, int, String, boolean)} @@ -253,4 +358,12 @@ public class ScanServerIT extends SharedMiniClusterBase { return ingestedEntriesCount; } + + protected static int getNumHostedTablets(AccumuloClient client, String tableId) throws Exception { + try (Scanner scanner = client.createScanner(MetadataTable.NAME)) { + scanner.setRange(new Range(tableId, tableId + "<")); + scanner.fetchColumnFamily(CurrentLocationColumnFamily.NAME); + return Iterables.size(scanner); + } + } } diff --git a/test/src/main/java/org/apache/accumulo/test/ScanServerIT_NoServers.java b/test/src/main/java/org/apache/accumulo/test/ScanServerIT_NoServers.java index 782cb56739..3e3bd7a183 100644 --- a/test/src/main/java/org/apache/accumulo/test/ScanServerIT_NoServers.java +++ b/test/src/main/java/org/apache/accumulo/test/ScanServerIT_NoServers.java @@ -21,9 +21,12 @@ package org.apache.accumulo.test; import static org.apache.accumulo.harness.AccumuloITBase.MINI_CLUSTER_ONLY; import static org.apache.accumulo.test.ScanServerIT.createTableAndIngest; import static org.apache.accumulo.test.ScanServerIT.ingest; +import static org.apache.accumulo.test.ScanServerIT.setupTableWithHostingMix; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; +import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Properties; @@ -68,6 +71,10 @@ public class ScanServerIT_NoServers extends SharedMiniClusterBase { // Configure the scan server to only have 1 scan executor thread. This means // that the scan server will run scans serially, not concurrently. cfg.setProperty(Property.SSERV_SCAN_EXECUTORS_DEFAULT_THREADS, "1"); + + cfg.setProperty(Property.MANAGER_TABLET_GROUP_WATCHER_INTERVAL, "5"); + cfg.setProperty(Property.TSERV_ONDEMAND_UNLOADER_INTERVAL, "10"); + cfg.setProperty("table.custom.ondemand.unloader.inactivity.threshold.seconds", "15"); } } @@ -181,4 +188,60 @@ public class ScanServerIT_NoServers extends SharedMiniClusterBase { }); } } + + @Test + public void testScanWithTabletHostingMix() throws Exception { + try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { + String tableName = getUniqueNames(1)[0]; + + setupTableWithHostingMix(client, tableName); + + try (Scanner scanner = client.createScanner(tableName, Authorizations.EMPTY)) { + scanner.setRange(new Range()); + scanner.setConsistencyLevel(ConsistencyLevel.EVENTUAL); + // Throws an exception because no scan servers and falls back to tablet server with tablets + // with the NEVER hosting goal + assertThrows(RuntimeException.class, () -> Iterables.size(scanner)); + // Throws an exception because of the tablets with the NEVER hosting goal + scanner.setConsistencyLevel(ConsistencyLevel.IMMEDIATE); + assertThrows(RuntimeException.class, () -> Iterables.size(scanner)); + + // Test that hosted ranges work + scanner.setRange(new Range(null, "row_0000000003")); + assertEquals(40, Iterables.size(scanner)); + + scanner.setRange(new Range("row_0000000008", null)); + assertEquals(20, Iterables.size(scanner)); + + } // when the scanner is closed, all open sessions should be closed + } + } + + @Test + public void testBatchScanWithTabletHostingMix() throws Exception { + try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { + final String tableName = getUniqueNames(1)[0]; + + setupTableWithHostingMix(client, tableName); + + try (BatchScanner scanner = client.createBatchScanner(tableName, Authorizations.EMPTY)) { + scanner.setRanges(Collections.singleton(new Range())); + scanner.setConsistencyLevel(ConsistencyLevel.EVENTUAL); + // Throws an exception because no scan servers and falls back to tablet server with tablets + // with the NEVER hosting goal + assertThrows(RuntimeException.class, () -> Iterables.size(scanner)); + // Throws an exception because of the tablets with the NEVER hosting goal + scanner.setConsistencyLevel(ConsistencyLevel.IMMEDIATE); + assertThrows(RuntimeException.class, () -> Iterables.size(scanner)); + + // Test that hosted ranges work + Collection<Range> ranges = new ArrayList<>(); + ranges.add(new Range(null, "row_0000000003")); + ranges.add(new Range("row_0000000008", null)); + scanner.setRanges(ranges); + assertEquals(60, Iterables.size(scanner)); + } // when the scanner is closed, all open sessions should be closed + } + } + }