This is an automated email from the ASF dual-hosted git repository.

dlmarion pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new c53bebc3e9 Added Scan Server Group Configuration IT (#4506)
c53bebc3e9 is described below

commit c53bebc3e9accf63de0c41dac3e2da5ce9597605
Author: Dave Marion <dlmar...@apache.org>
AuthorDate: Mon May 20 10:43:28 2024 -0400

    Added Scan Server Group Configuration IT (#4506)
    
    Closes #4504
---
 .../test/ScanServerGroupConfigurationIT.java       | 197 +++++++++++++++++++++
 .../org/apache/accumulo/test/ScanServerIT.java     |   4 +-
 2 files changed, 199 insertions(+), 2 deletions(-)

diff --git 
a/test/src/main/java/org/apache/accumulo/test/ScanServerGroupConfigurationIT.java
 
b/test/src/main/java/org/apache/accumulo/test/ScanServerGroupConfigurationIT.java
new file mode 100644
index 0000000000..c18e6e1aff
--- /dev/null
+++ 
b/test/src/main/java/org/apache/accumulo/test/ScanServerGroupConfigurationIT.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.util.Map;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.Accumulo;
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.ScannerBase.ConsistencyLevel;
+import org.apache.accumulo.core.clientImpl.ClientContext;
+import org.apache.accumulo.core.conf.ClientProperty;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.spi.scan.ScanServerSelector;
+import org.apache.accumulo.harness.MiniClusterConfigurationCallback;
+import org.apache.accumulo.harness.SharedMiniClusterBase;
+import org.apache.accumulo.minicluster.ServerType;
+import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.test.util.Wait;
+import org.apache.accumulo.tserver.ScanServer;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.zookeeper.ZooKeeper;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import com.google.common.collect.Iterables;
+
+public class ScanServerGroupConfigurationIT extends SharedMiniClusterBase {
+
+  // @formatter:off
+  private static final String clientConfiguration =
+     "["+
+     " {"+
+     "   \"isDefault\": true,"+
+     "   \"maxBusyTimeout\": \"5m\","+
+     "   \"busyTimeoutMultiplier\": 8,"+
+     "   \"scanTypeActivations\": [],"+
+     "   \"attemptPlans\": ["+
+     "     {"+
+     "       \"servers\": \"3\","+
+     "       \"busyTimeout\": \"33ms\","+
+     "       \"salt\": \"one\""+
+     "     },"+
+     "     {"+
+     "       \"servers\": \"13\","+
+     "       \"busyTimeout\": \"33ms\","+
+     "       \"salt\": \"two\""+
+     "     },"+
+     "     {"+
+     "       \"servers\": \"100%\","+
+     "       \"busyTimeout\": \"33ms\""+
+     "     }"+
+     "   ]"+
+     "  },"+
+     " {"+
+     "   \"isDefault\": false,"+
+     "   \"maxBusyTimeout\": \"5m\","+
+     "   \"busyTimeoutMultiplier\": 8,"+
+     "   \"group\": \"GROUP1\","+
+     "   \"scanTypeActivations\": [\"use_group1\"],"+
+     "   \"attemptPlans\": ["+
+     "     {"+
+     "       \"servers\": \"3\","+
+     "       \"busyTimeout\": \"33ms\","+
+     "       \"salt\": \"one\""+
+     "     },"+
+     "     {"+
+     "       \"servers\": \"13\","+
+     "       \"busyTimeout\": \"33ms\","+
+     "       \"salt\": \"two\""+
+     "     },"+
+     "     {"+
+     "       \"servers\": \"100%\","+
+     "       \"busyTimeout\": \"33ms\""+
+     "     }"+
+     "   ]"+
+     "  }"+
+     "]";
+  // @formatter:on
+
+  private static class Config implements MiniClusterConfigurationCallback {
+    @Override
+    public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration 
coreSite) {
+      cfg.setNumScanServers(0); // start with no scan servers
+      cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT.getKey(), "10s");
+
+      
cfg.setClientProperty(ClientProperty.SCAN_SERVER_SELECTOR_OPTS_PREFIX.getKey() 
+ "profiles",
+          clientConfiguration);
+    }
+  }
+
+  @BeforeAll
+  public static void start() throws Exception {
+    SharedMiniClusterBase.startMiniClusterWithConfig(new Config());
+  }
+
+  @AfterAll
+  public static void after() throws Exception {
+    SharedMiniClusterBase.stopMiniCluster();
+  }
+
+  @Test
+  public void testClientConfiguration() throws Exception {
+
+    final String zooRoot = getCluster().getServerContext().getZooKeeperRoot();
+    final ZooKeeper zk = 
getCluster().getServerContext().getZooReaderWriter().getZooKeeper();
+    final String scanServerRoot = zooRoot + Constants.ZSSERVERS;
+
+    // Ensure no scan servers running
+    Wait.waitFor(() -> zk.getChildren(scanServerRoot, false).size() == 0);
+
+    try (AccumuloClient client = 
Accumulo.newClient().from(getClientProps()).build()) {
+      final String tableName = getUniqueNames(1)[0];
+
+      final int ingestedEntryCount =
+          ScanServerIT.createTableAndIngest(client, tableName, null, 10, 10, 
"colf");
+      assertEquals(100, ingestedEntryCount);
+
+      try (Scanner scanner = client.createScanner(tableName, 
Authorizations.EMPTY)) {
+        scanner.setRange(new Range());
+        scanner.setConsistencyLevel(ConsistencyLevel.EVENTUAL);
+
+        assertEquals(ingestedEntryCount, Iterables.size(scanner),
+            "The scanner should fall back to the tserver and should have seen 
all ingested and flushed entries");
+
+        // Allow one scan server to be started at this time
+        getCluster().getConfig().setNumScanServers(1);
+
+        // Start a ScanServer. No group specified, should be in the default 
group.
+        getCluster().getClusterControl().start(ServerType.SCAN_SERVER, 
"localhost");
+        Wait.waitFor(() -> zk.getChildren(scanServerRoot, false).size() == 1, 
30_000);
+        Wait.waitFor(() -> ((ClientContext) 
client).getScanServers().values().stream().anyMatch(
+            (p) -> 
p.getSecond().equals(ScanServerSelector.DEFAULT_SCAN_SERVER_GROUP_NAME))
+            == true);
+
+        assertEquals(ingestedEntryCount, Iterables.size(scanner),
+            "The scan server scanner should have seen all ingested and flushed 
entries");
+
+        // if scanning against tserver would see the following, but should not 
on scan server
+        final int additionalIngest1 =
+            ScanServerIT.ingest(client, tableName, 10, 10, 10, "colf", true);
+        assertEquals(100, additionalIngest1);
+
+        // Bump the number of scan serves that can run to start the GROUP1 
scan server
+        getCluster().getConfig().setNumScanServers(2);
+        getCluster()._exec(ScanServer.class, ServerType.SCAN_SERVER, Map.of(),
+            new String[] {"-g", "GROUP1"});
+        Wait.waitFor(() -> zk.getChildren(scanServerRoot, false).size() == 2);
+        Wait.waitFor(() -> ((ClientContext) 
client).getScanServers().values().stream().anyMatch(
+            (p) -> 
p.getSecond().equals(ScanServerSelector.DEFAULT_SCAN_SERVER_GROUP_NAME))
+            == true);
+        Wait.waitFor(() -> ((ClientContext) 
client).getScanServers().values().stream()
+            .anyMatch((p) -> p.getSecond().equals("GROUP1")) == true);
+
+        scanner.setExecutionHints(Map.of("scan_type", "use_group1"));
+        assertEquals(ingestedEntryCount + additionalIngest1, 
Iterables.size(scanner),
+            "The scan server scanner should have seen all ingested and flushed 
entries");
+
+        // if scanning against tserver would see the following, but should not 
on scan server
+        final int additionalIngest2 =
+            ScanServerIT.ingest(client, tableName, 10, 10, 20, "colf", false);
+        assertEquals(100, additionalIngest2);
+
+        assertEquals(ingestedEntryCount + additionalIngest1, 
Iterables.size(scanner),
+            "The scan server scanner should have seen all ingested and flushed 
entries");
+
+        scanner.setConsistencyLevel(ConsistencyLevel.IMMEDIATE);
+        assertEquals(ingestedEntryCount + additionalIngest1 + 
additionalIngest2,
+            Iterables.size(scanner),
+            "Scanning against tserver should have resulted in seeing all 
ingested entries");
+      }
+    }
+
+  }
+}
diff --git a/test/src/main/java/org/apache/accumulo/test/ScanServerIT.java 
b/test/src/main/java/org/apache/accumulo/test/ScanServerIT.java
index ad6a38ea72..77cdc8c647 100644
--- a/test/src/main/java/org/apache/accumulo/test/ScanServerIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/ScanServerIT.java
@@ -206,7 +206,7 @@ public class ScanServerIT extends SharedMiniClusterBase {
    * @param colf column family to use for ingest
    * @return the number of ingested entries
    */
-  protected static int createTableAndIngest(AccumuloClient client, String 
tableName,
+  public static int createTableAndIngest(AccumuloClient client, String 
tableName,
       NewTableConfiguration ntc, int rowCount, int colCount, String colf) 
throws Exception {
 
     if (Objects.isNull(ntc)) {
@@ -230,7 +230,7 @@ public class ScanServerIT extends SharedMiniClusterBase {
    * @param shouldFlush if true, the entries will be flushed after ingest
    * @return the number of ingested entries
    */
-  protected static int ingest(AccumuloClient client, String tableName, int 
rowCount, int colCount,
+  public static int ingest(AccumuloClient client, String tableName, int 
rowCount, int colCount,
       int offset, String colf, boolean shouldFlush) throws Exception {
     ReadWriteIT.ingest(client, colCount, rowCount, 50, offset, colf, 
tableName);
 

Reply via email to