This is an automated email from the ASF dual-hosted git repository.

dlmarion pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit 4575c2f6557519eff8dcae67177fcfee62e16a72
Merge: 79337889f7 ca80d7cb46
Author: Dave Marion <dlmar...@apache.org>
AuthorDate: Thu May 30 20:29:54 2024 +0000

    Merge branch 'main' into elasticity

 .../org/apache/accumulo/tserver/ScanServer.java    |   2 +-
 .../apache/accumulo/test/ScanServerShutdownIT.java | 139 +++++++++++++++++++++
 .../accumulo/test/SelfStoppingScanServer.java      |  59 +++++++++
 3 files changed, 199 insertions(+), 1 deletion(-)

diff --cc test/src/main/java/org/apache/accumulo/test/ScanServerShutdownIT.java
index 0000000000,6c2d7aac6b..247742ebd3
mode 000000,100644..100644
--- a/test/src/main/java/org/apache/accumulo/test/ScanServerShutdownIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/ScanServerShutdownIT.java
@@@ -1,0 -1,138 +1,139 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *   https://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing,
+  * software distributed under the License is distributed on an
+  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  * KIND, either express or implied.  See the License for the
+  * specific language governing permissions and limitations
+  * under the License.
+  */
+ package org.apache.accumulo.test;
+ 
+ import static org.junit.jupiter.api.Assertions.assertEquals;
+ import static org.junit.jupiter.api.Assertions.assertNotNull;
+ import static org.junit.jupiter.api.Assertions.assertTrue;
+ 
+ import java.util.Collections;
+ import java.util.Iterator;
+ import java.util.Map.Entry;
+ 
+ import org.apache.accumulo.core.Constants;
+ import org.apache.accumulo.core.client.Accumulo;
+ import org.apache.accumulo.core.client.AccumuloClient;
+ import org.apache.accumulo.core.client.BatchScanner;
+ import org.apache.accumulo.core.client.ScannerBase.ConsistencyLevel;
+ import org.apache.accumulo.core.clientImpl.ClientContext;
+ import org.apache.accumulo.core.conf.Property;
+ import org.apache.accumulo.core.data.Key;
+ import org.apache.accumulo.core.data.Range;
+ import org.apache.accumulo.core.data.Value;
+ import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
+ import org.apache.accumulo.core.security.Authorizations;
 -import org.apache.accumulo.core.spi.scan.ScanServerSelector;
+ import org.apache.accumulo.harness.MiniClusterConfigurationCallback;
+ import org.apache.accumulo.harness.SharedMiniClusterBase;
+ import org.apache.accumulo.minicluster.ServerType;
+ import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
+ import org.apache.accumulo.server.ServerContext;
+ import org.apache.accumulo.test.util.Wait;
+ import org.junit.jupiter.api.AfterAll;
+ import org.junit.jupiter.api.BeforeAll;
+ import org.junit.jupiter.api.Test;
+ 
+ public class ScanServerShutdownIT extends SharedMiniClusterBase {
+ 
+   private static class ScanServerShutdownITConfiguration
+       implements MiniClusterConfigurationCallback {
+ 
+     @Override
+     public void configureMiniCluster(MiniAccumuloConfigImpl cfg,
+         org.apache.hadoop.conf.Configuration coreSite) {
 -      cfg.setNumScanServers(1);
++
++      cfg.getClusterServerConfiguration().setNumDefaultScanServers(0);
+ 
+       // Timeout scan sessions after being idle for 3 seconds
+       cfg.setProperty(Property.TSERV_SESSION_MAXIDLE, "3s");
+ 
+       // Configure the scan server to only have 1 scan executor thread. This 
means
+       // that the scan server will run scans serially, not concurrently.
+       cfg.setProperty(Property.SSERV_SCAN_EXECUTORS_DEFAULT_THREADS, "1");
+     }
+   }
+ 
+   @BeforeAll
+   public static void start() throws Exception {
+     ScanServerShutdownITConfiguration c = new 
ScanServerShutdownITConfiguration();
+     SharedMiniClusterBase.startMiniClusterWithConfig(c);
+   }
+ 
+   @AfterAll
+   public static void stop() throws Exception {
+     SharedMiniClusterBase.stopMiniCluster();
+   }
+ 
+   @Test
+   public void testRefRemovalOnShutdown() throws Exception {
+ 
+     ServerContext ctx = getCluster().getServerContext();
+     String zooRoot = ctx.getZooKeeperRoot();
+     ZooReaderWriter zrw = ctx.getZooReaderWriter();
+     String scanServerRoot = zooRoot + Constants.ZSSERVERS;
+ 
+     Wait.waitFor(() -> zrw.getChildren(scanServerRoot).size() == 0);
+ 
+     // Stop normal ScanServers so that we can start our custom implementation
+     // that shuts down after 3 batch scans
 -    
getCluster().getClusterControl().startScanServer(SelfStoppingScanServer.class, 
1,
 -        ScanServerSelector.DEFAULT_SCAN_SERVER_GROUP_NAME);
++    
getCluster().getConfig().getClusterServerConfiguration().setNumDefaultScanServers(1);
++    getCluster().getClusterControl().start(ServerType.SCAN_SERVER, null, 1,
++        SelfStoppingScanServer.class);
+ 
+     // Wait for the ScanServer to register in ZK
+     Wait.waitFor(() -> zrw.getChildren(scanServerRoot).size() == 1);
+ 
+     try (AccumuloClient client = 
Accumulo.newClient().from(getClientProps()).build()) {
+       final String tableName = getUniqueNames(1)[0];
+ 
+       client.tableOperations().create(tableName);
+ 
+       // Make multiple files
+       final int fileCount = 3;
+       for (int i = 0; i < fileCount; i++) {
+         ScanServerIT.ingest(client, tableName, 10, 10, 0, "colf", true);
+       }
+       assertEquals(0, ctx.getAmple().getScanServerFileReferences().count());
+ 
+       for (int i = 0; i < 3; i++) {
+         try (BatchScanner scanner = client.createBatchScanner(tableName, 
Authorizations.EMPTY)) {
+           scanner.setRanges(Collections.singletonList(new Range()));
+           scanner.setConsistencyLevel(ConsistencyLevel.EVENTUAL);
+ 
+           Iterator<Entry<Key,Value>> iter = scanner.iterator();
+           assertTrue(iter.hasNext());
+           assertNotNull(iter.next());
+ 
+           assertEquals(fileCount, 
ctx.getAmple().getScanServerFileReferences().count());
+ 
+         }
+       }
+ 
+       // ScanServer should stop after the 3rd batch scan closes
+       Wait.waitFor(() -> ((ClientContext) client).getScanServers().size() == 
0);
+ 
+       // The ScanServer should clean up the references on normal shutdown
+       Wait.waitFor(() -> ctx.getAmple().getScanServerFileReferences().count() 
== 0);
+ 
+     } finally {
+       getCluster().getClusterControl().stopAllServers(ServerType.SCAN_SERVER);
+     }
+ 
+   }
+ 
+ }

Reply via email to