This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit ca80d7cb4688796f1a15c307cbcf51a385bc80e9 Merge: 627ed85555 21f04a6778 Author: Dave Marion <dlmar...@apache.org> AuthorDate: Thu May 30 20:04:16 2024 +0000 Merge branch '2.1' .../apache/accumulo/cluster/ClusterControl.java | 11 ++ .../standalone/StandaloneClusterControl.java | 8 ++ .../MiniAccumuloClusterControl.java | 14 +++ .../org/apache/accumulo/tserver/ScanServer.java | 2 +- .../apache/accumulo/test/ScanServerShutdownIT.java | 138 +++++++++++++++++++++ .../accumulo/test/SelfStoppingScanServer.java | 59 +++++++++ 6 files changed, 231 insertions(+), 1 deletion(-) diff --cc minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterControl.java index ff9e6d3223,69f6f64297..9e395bb47c --- a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterControl.java +++ b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterControl.java @@@ -181,6 -178,18 +181,20 @@@ public class MiniAccumuloClusterContro } } + @Override + public synchronized void startScanServer(Class<? extends ScanServer> scanServer, int limit, + String groupName) throws IOException { + synchronized (scanServerProcesses) { + int count = + Math.min(limit, cluster.getConfig().getNumScanServers() - scanServerProcesses.size()); + for (int i = 0; i < count; i++) { - scanServerProcesses.add(cluster.exec(scanServer, "-g", groupName).getProcess()); ++ scanServerProcesses.add( ++ cluster.exec(scanServer, "-o", Property.SSERV_GROUP_NAME.getKey() + "=" + groupName) ++ .getProcess()); + } + } + } + @Override public synchronized void startAllServers(ServerType server) throws IOException { start(server, null); diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java index 201bfada4f,2216b55489..cd02951c63 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java @@@ -198,8 -200,9 +198,8 @@@ public class ScanServer extends Abstrac private final SessionManager sessionManager; private final TabletServerResourceManager resourceManager; HostAndPort clientAddress; - private final GarbageCollectionLogger gcLogger = new GarbageCollectionLogger(); - private volatile boolean serverStopRequested = false; + protected volatile boolean serverStopRequested = false; private ServiceLock scanServerLock; protected TabletServerScanMetrics scanMetrics; private ScanServerMetrics scanServerMetrics; diff --cc test/src/main/java/org/apache/accumulo/test/SelfStoppingScanServer.java index 0000000000,8391190984..73284dc8c5 mode 000000,100644..100644 --- a/test/src/main/java/org/apache/accumulo/test/SelfStoppingScanServer.java +++ b/test/src/main/java/org/apache/accumulo/test/SelfStoppingScanServer.java @@@ -1,0 -1,58 +1,59 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.apache.accumulo.test; + + import java.util.concurrent.atomic.AtomicInteger; + ++import org.apache.accumulo.core.cli.ConfigOpts; ++import org.apache.accumulo.core.clientImpl.thrift.TInfo; ++import org.apache.accumulo.core.tabletscan.thrift.TabletScanClientService; + import org.apache.accumulo.core.tabletserver.thrift.NoSuchScanIDException; -import org.apache.accumulo.core.tabletserver.thrift.TabletScanClientService; -import org.apache.accumulo.core.trace.thrift.TInfo; + import org.apache.accumulo.tserver.ScanServer; + import org.apache.accumulo.tserver.TabletHostingServer; + import org.apache.thrift.TException; + + /** + * ScanServer implementation that will stop itself after the the 3rd scan batch scan + * + */ + public class SelfStoppingScanServer extends ScanServer + implements TabletScanClientService.Iface, TabletHostingServer { + + private final AtomicInteger scanCount = new AtomicInteger(0); + - public SelfStoppingScanServer(ScanServerOpts opts, String[] args) { ++ public SelfStoppingScanServer(ConfigOpts opts, String[] args) { + super(opts, args); + } + + @Override + public void closeMultiScan(TInfo tinfo, long scanID) throws NoSuchScanIDException, TException { + scanCount.incrementAndGet(); + super.closeMultiScan(tinfo, scanID); + if (scanCount.get() == 3) { + serverStopRequested = true; + } + } + + public static void main(String[] args) throws Exception { - try (SelfStoppingScanServer tserver = new SelfStoppingScanServer(new ScanServerOpts(), args)) { ++ try (SelfStoppingScanServer tserver = new SelfStoppingScanServer(new ConfigOpts(), args)) { + tserver.runServer(); + } + } + + }