This is an automated email from the ASF dual-hosted git repository. domgarguilo pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push: new 4cc8e3bbe1 ScanServerSelector - Break nested interfaces into their own files (#2896) 4cc8e3bbe1 is described below commit 4cc8e3bbe142b26f44ca54a38743f979c6c41af6 Author: Dom G <dominic.gargu...@gmail.com> AuthorDate: Fri Sep 2 13:19:26 2022 -0400 ScanServerSelector - Break nested interfaces into their own files (#2896) * Break nested interfaces into thier own classes * Rename some ScanServer classes --- .../accumulo/core/clientImpl/ClientContext.java | 6 +- ...temptsImpl.java => ScanServerAttemptsImpl.java} | 42 +++++---- .../TabletServerBatchReaderIterator.java | 24 ++--- .../accumulo/core/clientImpl/ThriftScanner.java | 19 ++-- .../spi/scan/ConfigurableScanServerSelector.java | 10 +-- .../accumulo/core/spi/scan/ScanServerAttempt.java | 40 +++++++++ .../accumulo/core/spi/scan/ScanServerInfo.java | 40 +++++++++ .../core/spi/scan/ScanServerSelections.java | 48 ++++++++++ .../accumulo/core/spi/scan/ScanServerSelector.java | 100 ++++----------------- .../core/clientImpl/ScanAttemptsImplTest.java | 14 +-- .../scan/ConfigurableScanServerSelectorTest.java | 45 +++++----- .../accumulo/tserver/TabletHostingServer.java | 5 +- 12 files changed, 231 insertions(+), 162 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java index 3d2949a7ca..93a31b4d9c 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java @@ -88,8 +88,8 @@ import org.apache.accumulo.core.securityImpl.thrift.TCredentials; import org.apache.accumulo.core.singletons.SingletonManager; import org.apache.accumulo.core.singletons.SingletonReservation; import org.apache.accumulo.core.spi.common.ServiceEnvironment; +import org.apache.accumulo.core.spi.scan.ScanServerInfo; import org.apache.accumulo.core.spi.scan.ScanServerSelector; -import org.apache.accumulo.core.spi.scan.ScanServerSelector.ScanServer; import org.apache.accumulo.core.util.OpTimer; import org.apache.accumulo.core.util.Pair; import org.apache.accumulo.core.util.tables.TableZooHelper; @@ -193,9 +193,9 @@ public class ClientContext implements AccumuloClient { } @Override - public Supplier<Collection<ScanServer>> getScanServers() { + public Supplier<Collection<ScanServerInfo>> getScanServers() { return () -> ClientContext.this.getScanServers().entrySet().stream() - .map(entry -> new ScanServer() { + .map(entry -> new ScanServerInfo() { @Override public String getAddress() { return entry.getKey(); diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/ScanAttemptsImpl.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/ScanServerAttemptsImpl.java similarity index 70% rename from core/src/main/java/org/apache/accumulo/core/clientImpl/ScanAttemptsImpl.java rename to core/src/main/java/org/apache/accumulo/core/clientImpl/ScanServerAttemptsImpl.java index 8ca3a44720..fc0896aeb2 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ScanAttemptsImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ScanServerAttemptsImpl.java @@ -26,7 +26,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; import org.apache.accumulo.core.data.TabletId; -import org.apache.accumulo.core.spi.scan.ScanServerSelector.ScanAttempt; +import org.apache.accumulo.core.spi.scan.ScanServerAttempt; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,18 +37,18 @@ import org.slf4j.LoggerFactory; * Snapshots are useful for ensuring that authors of ScanServerSelector plugins do not have to * consider strange concurrency issues when writing a plugin. */ -public class ScanAttemptsImpl { +public class ScanServerAttemptsImpl { - private static final Logger LOG = LoggerFactory.getLogger(ScanAttemptsImpl.class); + private static final Logger LOG = LoggerFactory.getLogger(ScanServerAttemptsImpl.class); - static class ScanAttemptImpl implements ScanAttempt { + static class ScanServerAttemptImpl implements ScanServerAttempt { private final String server; private final long time; private final Result result; private volatile long mutationCount = Long.MAX_VALUE; - ScanAttemptImpl(Result result, String server, long time) { + ScanServerAttemptImpl(Result result, String server, long time) { this.result = result; this.server = Objects.requireNonNull(server); this.time = time; @@ -78,12 +78,13 @@ public class ScanAttemptsImpl { } } - private final Map<TabletId,Collection<ScanAttemptImpl>> attempts = new ConcurrentHashMap<>(); + private final Map<TabletId,Collection<ScanServerAttemptImpl>> attempts = + new ConcurrentHashMap<>(); private long mutationCounter = 0; - private void add(TabletId tablet, ScanAttempt.Result result, String server, long endTime) { + private void add(TabletId tablet, ScanServerAttempt.Result result, String server, long endTime) { - ScanAttemptImpl sa = new ScanAttemptImpl(result, server, endTime); + ScanServerAttemptImpl sa = new ScanServerAttemptImpl(result, server, endTime); attempts.computeIfAbsent(tablet, k -> ConcurrentHashMap.newKeySet()).add(sa); @@ -95,14 +96,14 @@ public class ScanAttemptsImpl { } - public static interface ScanAttemptReporter { - void report(ScanAttempt.Result result); + public interface ScanAttemptReporter { + void report(ScanServerAttempt.Result result); } ScanAttemptReporter createReporter(String server, TabletId tablet) { return new ScanAttemptReporter() { @Override - public void report(ScanAttempt.Result result) { + public void report(ScanServerAttempt.Result result) { LOG.trace("Received result: {}", result); add(tablet, result, server, System.currentTimeMillis()); } @@ -110,27 +111,30 @@ public class ScanAttemptsImpl { } /** - * Creates and returns a snapshot of ScanAttempt objects that were added before this call + * Creates and returns a snapshot of {@link ScanServerAttempt} objects that were added before this + * call * - * @return a map of TabletId to a collection ScanAttempt objects associated with that TabletId + * @return TabletIds mapped to a collection of {@link ScanServerAttempt} objects associated with + * that TabletId */ - Map<TabletId,Collection<ScanAttemptImpl>> snapshot() { + Map<TabletId,Collection<ScanServerAttemptImpl>> snapshot() { final long mutationCounterSnapshot; - synchronized (ScanAttemptsImpl.this) { + synchronized (ScanServerAttemptsImpl.this) { mutationCounterSnapshot = mutationCounter; } - Map<TabletId,Collection<ScanAttemptImpl>> result = new ConcurrentHashMap<>(); + Map<TabletId,Collection<ScanServerAttemptImpl>> result = new ConcurrentHashMap<>(); attempts.forEach((tabletId, scanAttempts) -> { - // filter out ScanAttempt objects that were added after this call - List<ScanAttemptImpl> filteredScanAttempts = scanAttempts.stream() + // filter out ScanServerScanAttempt objects that were added after this call + List<ScanServerAttemptImpl> filteredScanAttempts = scanAttempts.stream() .filter(scanAttempt -> scanAttempt.getMutationCount() < mutationCounterSnapshot) .collect(Collectors.toList()); - // only add an entry to the map if there are ScanAttempt objects for the current TabletId + // only add an entry to the map if there are ScanServerScanAttempt objects for the current + // TabletId if (!filteredScanAttempts.isEmpty()) result.put(tabletId, filteredScanAttempts); diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java index e470602372..63eb186a49 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java @@ -66,6 +66,8 @@ import org.apache.accumulo.core.rpc.ThriftUtil; import org.apache.accumulo.core.rpc.clients.ThriftClientTypes; import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl; import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.spi.scan.ScanServerAttempt; +import org.apache.accumulo.core.spi.scan.ScanServerSelections; import org.apache.accumulo.core.spi.scan.ScanServerSelector; import org.apache.accumulo.core.tabletserver.thrift.NoSuchScanIDException; import org.apache.accumulo.core.tabletserver.thrift.ScanServerBusyException; @@ -109,7 +111,7 @@ public class TabletServerBatchReaderIterator implements Iterator<Entry<Key,Value private TabletLocator locator; - private ScanAttemptsImpl scanAttempts = new ScanAttemptsImpl(); + private ScanServerAttemptsImpl scanAttempts = new ScanServerAttemptsImpl(); public interface ResultReceiver { void receive(List<Entry<Key,Value>> entries); @@ -342,12 +344,12 @@ public class TabletServerBatchReaderIterator implements Iterator<Entry<Key,Value private List<Column> columns; private int semaphoreSize; private final long busyTimeout; - private final ScanAttemptsImpl.ScanAttemptReporter reporter; + private final ScanServerAttemptsImpl.ScanAttemptReporter reporter; private final Duration scanServerSelectorDelay; QueryTask(String tsLocation, Map<KeyExtent,List<Range>> tabletsRanges, Map<KeyExtent,List<Range>> failures, ResultReceiver receiver, List<Column> columns, - long busyTimeout, ScanAttemptsImpl.ScanAttemptReporter reporter, + long busyTimeout, ScanServerAttemptsImpl.ScanAttemptReporter reporter, Duration scanServerSelectorDelay) { this.tsLocation = tsLocation; this.tabletsRanges = tabletsRanges; @@ -399,9 +401,9 @@ public class TabletServerBatchReaderIterator implements Iterator<Entry<Key,Value } log.debug("IOException thrown", e); - ScanServerSelector.ScanAttempt.Result result = ScanServerSelector.ScanAttempt.Result.ERROR; + ScanServerAttempt.Result result = ScanServerAttempt.Result.ERROR; if (e.getCause() instanceof ScanServerBusyException) { - result = ScanServerSelector.ScanAttempt.Result.BUSY; + result = ScanServerAttempt.Result.BUSY; } reporter.report(result); } catch (AccumuloSecurityException e) { @@ -485,7 +487,7 @@ public class TabletServerBatchReaderIterator implements Iterator<Entry<Key,Value long busyTimeout = 0; Duration scanServerSelectorDelay = null; - Map<String,ScanAttemptsImpl.ScanAttemptReporter> reporters = Map.of(); + Map<String,ScanServerAttemptsImpl.ScanAttemptReporter> reporters = Map.of(); if (options.getConsistencyLevel().equals(ConsistencyLevel.EVENTUAL)) { var scanServerData = rebinToScanServers(binnedRanges); @@ -577,8 +579,8 @@ public class TabletServerBatchReaderIterator implements Iterator<Entry<Key,Value private static class ScanServerData { Map<String,Map<KeyExtent,List<Range>>> binnedRanges; - ScanServerSelector.Actions actions; - Map<String,ScanAttemptsImpl.ScanAttemptReporter> reporters; + ScanServerSelections actions; + Map<String,ScanServerAttemptsImpl.ScanAttemptReporter> reporters; } private ScanServerData rebinToScanServers(Map<String,Map<KeyExtent,List<Range>>> binnedRanges) { @@ -598,7 +600,7 @@ public class TabletServerBatchReaderIterator implements Iterator<Entry<Key,Value } @Override - public Collection<? extends ScanServerSelector.ScanAttempt> getAttempts(TabletId tabletId) { + public Collection<? extends ScanServerAttempt> getAttempts(TabletId tabletId) { return scanAttemptsSnapshot.getOrDefault(tabletId, Set.of()); } @@ -608,7 +610,7 @@ public class TabletServerBatchReaderIterator implements Iterator<Entry<Key,Value } }; - var actions = ecsm.determineActions(params); + var actions = ecsm.selectServers(params); Map<KeyExtent,String> extentToTserverMap = new HashMap<>(); Map<KeyExtent,List<Range>> extentToRangesMap = new HashMap<>(); @@ -622,7 +624,7 @@ public class TabletServerBatchReaderIterator implements Iterator<Entry<Key,Value Map<String,Map<KeyExtent,List<Range>>> binnedRanges2 = new HashMap<>(); - Map<String,ScanAttemptsImpl.ScanAttemptReporter> reporters = new HashMap<>(); + Map<String,ScanServerAttemptsImpl.ScanAttemptReporter> reporters = new HashMap<>(); for (TabletIdImpl tabletId : tabletIds) { KeyExtent extent = tabletId.toKeyExtent(); diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftScanner.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftScanner.java index 306cd70a7d..a47310e2d0 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftScanner.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftScanner.java @@ -62,6 +62,8 @@ import org.apache.accumulo.core.rpc.ThriftUtil; import org.apache.accumulo.core.rpc.clients.ThriftClientTypes; import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl; import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.spi.scan.ScanServerAttempt; +import org.apache.accumulo.core.spi.scan.ScanServerSelections; import org.apache.accumulo.core.spi.scan.ScanServerSelector; import org.apache.accumulo.core.tabletserver.thrift.NoSuchScanIDException; import org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException; @@ -186,7 +188,7 @@ public class ThriftScanner { SamplerConfiguration samplerConfig; Map<String,String> executionHints; - ScanAttemptsImpl scanAttempts; + ScanServerAttemptsImpl scanAttempts; Duration busyTimeout; @@ -240,7 +242,7 @@ public class ThriftScanner { this.runOnScanServer = useScanServer; if (useScanServer) { - scanAttempts = new ScanAttemptsImpl(); + scanAttempts = new ScanServerAttemptsImpl(); } } } @@ -517,8 +519,7 @@ public class ThriftScanner { } @Override - public Collection<? extends ScanServerSelector.ScanAttempt> - getAttempts(TabletId tabletId) { + public Collection<? extends ScanServerAttempt> getAttempts(TabletId tabletId) { return attempts.getOrDefault(tabletId, Set.of()); } @@ -530,8 +531,7 @@ public class ThriftScanner { } }; - ScanServerSelector.Actions actions = - context.getScanServerSelector().determineActions(params); + ScanServerSelections actions = context.getScanServerSelector().selectServers(params); Duration delay = null; @@ -563,13 +563,12 @@ public class ThriftScanner { var reporter = scanState.scanAttempts.createReporter(newLoc.tablet_location, tabletId); try { - var ret = scanRpc(newLoc, scanState, context, scanState.busyTimeout.toMillis()); - return ret; + return scanRpc(newLoc, scanState, context, scanState.busyTimeout.toMillis()); } catch (ScanServerBusyException ssbe) { - reporter.report(ScanServerSelector.ScanAttempt.Result.BUSY); + reporter.report(ScanServerAttempt.Result.BUSY); throw ssbe; } catch (Exception e) { - reporter.report(ScanServerSelector.ScanAttempt.Result.ERROR); + reporter.report(ScanServerAttempt.Result.ERROR); throw e; } } else { diff --git a/core/src/main/java/org/apache/accumulo/core/spi/scan/ConfigurableScanServerSelector.java b/core/src/main/java/org/apache/accumulo/core/spi/scan/ConfigurableScanServerSelector.java index 79a4a9679b..e1c9fe1212 100644 --- a/core/src/main/java/org/apache/accumulo/core/spi/scan/ConfigurableScanServerSelector.java +++ b/core/src/main/java/org/apache/accumulo/core/spi/scan/ConfigurableScanServerSelector.java @@ -290,10 +290,10 @@ public class ConfigurableScanServerSelector implements ScanServerSelector { } @Override - public void init(InitParameters params) { + public void init(ScanServerSelector.InitParameters params) { // avoid constantly resorting the scan servers, just do it periodically in case they change orderedScanServersSupplier = Suppliers.memoizeWithExpiration(() -> { - Collection<ScanServer> scanServers = params.getScanServers().get(); + Collection<ScanServerInfo> scanServers = params.getScanServers().get(); Map<String,List<String>> groupedServers = new HashMap<>(); scanServers.forEach(sserver -> groupedServers .computeIfAbsent(sserver.getGroup(), k -> new ArrayList<>()).add(sserver.getAddress())); @@ -311,7 +311,7 @@ public class ConfigurableScanServerSelector implements ScanServerSelector { } @Override - public Actions determineActions(SelectorParameters params) { + public ScanServerSelections selectServers(ScanServerSelector.SelectorParameters params) { String scanType = params.getHints().get("scan_type"); @@ -329,7 +329,7 @@ public class ConfigurableScanServerSelector implements ScanServerSelector { orderedScanServersSupplier.get().getOrDefault(profile.group, List.of()); if (orderedScanServers.isEmpty()) { - return new Actions() { + return new ScanServerSelections() { @Override public String getScanServer(TabletId tabletId) { return null; @@ -371,7 +371,7 @@ public class ConfigurableScanServerSelector implements ScanServerSelector { Duration busyTO = Duration.ofMillis(profile.getBusyTimeout(attempts)); - return new Actions() { + return new ScanServerSelections() { @Override public String getScanServer(TabletId tabletId) { return serversToUse.get(tabletId); diff --git a/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanServerAttempt.java b/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanServerAttempt.java new file mode 100644 index 0000000000..dde2ec4021 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanServerAttempt.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.core.spi.scan; + +/** + * This object is used to communicate what previous actions were attempted, when they were + * attempted, and the result of those attempts + * + * @since 2.1.0 + */ +public interface ScanServerAttempt { + + // represents reasons that previous attempts to scan failed + enum Result { + BUSY, ERROR + } + + String getServer(); + + long getEndTime(); + + ScanServerAttempt.Result getResult(); + +} diff --git a/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanServerInfo.java b/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanServerInfo.java new file mode 100644 index 0000000000..5ef9344d1b --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanServerInfo.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.core.spi.scan; + +/** + * Information about a scan server. + * + * @since 2.1.0 + */ +public interface ScanServerInfo { + + /** + * @return the address in the form of {@code <host>:<port>} where the scan server is running. + */ + String getAddress(); + + /** + * @return the group name set when the scan server was started. If a group name was not set for + * the scan server, then the string + * {@value ScanServerSelector#DEFAULT_SCAN_SERVER_GROUP_NAME} is returned. + */ + String getGroup(); + +} diff --git a/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanServerSelections.java b/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanServerSelections.java new file mode 100644 index 0000000000..c5dbab2f2c --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanServerSelections.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.core.spi.scan; + +import java.time.Duration; + +import org.apache.accumulo.core.data.TabletId; + +public interface ScanServerSelections { + + /** + * @return what scan server to use for a given tablet. Returning null indicates the tablet server + * should be used for this tablet. + */ + String getScanServer(TabletId tabletId); + + /** + * @return The amount of time to wait on the client side before starting to contact servers. + * Return {@link Duration#ZERO} if no client side wait is desired. + */ + Duration getDelay(); + + /** + * @return The amount of time to wait for a scan to start on the server side before reporting + * busy. For example if a scan request is sent to scan server with a busy timeout of 50ms + * and the scan has not started running within that time then the scan server will not + * ever run the scan and it will report back busy. If the scan starts running, then it + * will never report back busy. Setting a busy timeout that is ≤ 0 means that it will + * wait indefinitely on the server side for the task to start. + */ + Duration getBusyTimeout(); +} diff --git a/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanServerSelector.java b/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanServerSelector.java index 9a7aa63f06..8504cc5ceb 100644 --- a/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanServerSelector.java +++ b/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanServerSelector.java @@ -18,7 +18,6 @@ */ package org.apache.accumulo.core.spi.scan; -import java.time.Duration; import java.util.Collection; import java.util.Map; import java.util.function.Supplier; @@ -35,7 +34,7 @@ import com.google.common.base.Preconditions; * {@link org.apache.accumulo.core.client.ScannerBase#setConsistencyLevel(ScannerBase.ConsistencyLevel)} * to {@link org.apache.accumulo.core.client.ScannerBase.ConsistencyLevel#EVENTUAL} then this plugin * is used to determine which scan servers to use for a given tablet. To configure a class to use - * for this plugin set its name using the client config {@code scan.server.selector.impl} + * for this plugin, set its name using the client config {@code scan.server.selector.impl} * * @since 2.1.0 */ @@ -44,34 +43,22 @@ public interface ScanServerSelector { /** * The scan server group name that will be used when one is not specified. */ - public static final String DEFAULT_SCAN_SERVER_GROUP_NAME = "default"; + String DEFAULT_SCAN_SERVER_GROUP_NAME = "default"; /** - * Information about a scan server. - * - * @since 2.1.0 + * This method is called once after a {@link ScanServerSelector} is instantiated. */ - public interface ScanServer { - /** - * @return the address in the form of {@code <host>:<port>} where the scan server is running. - */ - String getAddress(); - - /** - * @return the group name set when the scan server was started. If a group name was not set for - * the scan server, then the string {@value #DEFAULT_SCAN_SERVER_GROUP_NAME} is - * returned. - */ - String getGroup(); + default void init(InitParameters params) { + Preconditions.checkArgument(params.getOptions().isEmpty(), "No options expected"); } /** * This interface exists so that is easier to evolve what is passed to - * {@link #init(InitParameters)} without having to make breaking changes. + * {@link ScanServerSelector#init(InitParameters)} without having to make breaking changes. * * @since 2.1.0 */ - public interface InitParameters { + interface InitParameters { /** * @return Options that were set in the client config using the prefix @@ -86,46 +73,21 @@ public interface ScanServerSelector { /** * @return the set of live ScanServers. Each time the supplier is called it may return something * different. A good practice would be to call this no more than once per a call to - * {@link #determineActions(SelectorParameters)} so that decisions are made using a - * consistent set of scan servers. + * {@link ScanServerSelector#selectServers(SelectorParameters)} so that decisions are + * made using a consistent set of scan servers. */ - Supplier<Collection<ScanServer>> getScanServers(); - } - - /** - * This method is called once after a ScanSelector is instantiated. - */ - default void init(InitParameters params) { - Preconditions.checkArgument(params.getOptions().isEmpty(), "No options expected"); - } - - /** - * this object is used to communicate what the previous actions were attempted, when they were - * attempted, and the result of the attempt - * - * @since 2.1.0 - */ - interface ScanAttempt { - - // represents reasons that previous attempts to scan failed - enum Result { - BUSY, ERROR - } - - String getServer(); + Supplier<Collection<ScanServerInfo>> getScanServers(); - long getEndTime(); - - Result getResult(); } /** * This interface exists so that is easier to evolve what is passed to - * {@link #determineActions(SelectorParameters)} without having to make breaking changes. + * {@link ScanServerSelector#selectServers(SelectorParameters)} without having to make breaking + * changes. * * @since 2.1.0 */ - public interface SelectorParameters { + interface SelectorParameters { /** * @return the set of tablets to be scanned @@ -135,48 +97,24 @@ public interface ScanServerSelector { /** * @return scan attempt information for the tablet */ - Collection<? extends ScanAttempt> getAttempts(TabletId tabletId); + Collection<? extends ScanServerAttempt> getAttempts(TabletId tabletId); /** * @return any hints set on a scanner using * {@link org.apache.accumulo.core.client.ScannerBase#setExecutionHints(Map)}. If none - * were set an empty map is returned. + * were set, an empty map is returned. */ Map<String,String> getHints(); } - public interface Actions { - - /** - * @return what scan server to use for a given tablet. Returning null indicates the tablet - * server should be used for this tablet. - */ - String getScanServer(TabletId tabletId); - - /** - * @return The amount of time to wait on the client side before starting to contact servers. - * Return {@link Duration#ZERO} if no client side wait is desired. - */ - public Duration getDelay(); - - /** - * @return The amount of time to wait for a scan to start on the server side before reporting - * busy. For example if a scan request is sent to scan server with a busy timeout of - * 50ms and the scan has not started running within that time then the scan server will - * not ever run the scan and it will report back busy. If the scan starts running, then - * it will never report back busy. Setting a busy timeout that is ≤ 0 means that it - * will wait indefinitely on the server side for the task to start. - */ - public Duration getBusyTimeout(); - } - /** - * Uses the SelectorParameters to determine which, if any, ScanServer should be used for scanning - * a tablet. + * Uses the {@link SelectorParameters} to determine which, if any, ScanServer should be used for + * scanning a tablet. * * @param params * parameters for the calculation * @return results */ - Actions determineActions(SelectorParameters params); + ScanServerSelections selectServers(SelectorParameters params); + } diff --git a/core/src/test/java/org/apache/accumulo/core/clientImpl/ScanAttemptsImplTest.java b/core/src/test/java/org/apache/accumulo/core/clientImpl/ScanAttemptsImplTest.java index 201620e5d2..829f2d3f5a 100644 --- a/core/src/test/java/org/apache/accumulo/core/clientImpl/ScanAttemptsImplTest.java +++ b/core/src/test/java/org/apache/accumulo/core/clientImpl/ScanAttemptsImplTest.java @@ -28,13 +28,13 @@ import java.util.Map; import java.util.Set; import org.apache.accumulo.core.data.TabletId; -import org.apache.accumulo.core.spi.scan.ScanServerSelector; +import org.apache.accumulo.core.spi.scan.ScanServerAttempt; import org.junit.jupiter.api.Test; public class ScanAttemptsImplTest { private Map<TabletId,Collection<String>> - simplify(Map<TabletId,Collection<ScanAttemptsImpl.ScanAttemptImpl>> map) { + simplify(Map<TabletId,Collection<ScanServerAttemptsImpl.ScanServerAttemptImpl>> map) { Map<TabletId,Collection<String>> ret = new HashMap<>(); map.forEach((tabletId, scanAttempts) -> { @@ -49,7 +49,7 @@ public class ScanAttemptsImplTest { @Test public void testBasic() { - ScanAttemptsImpl sai = new ScanAttemptsImpl(); + ScanServerAttemptsImpl sai = new ScanServerAttemptsImpl(); var snap1 = sai.snapshot(); @@ -59,7 +59,7 @@ public class ScanAttemptsImplTest { var reporter1 = sai.createReporter("ss1:1", tablet1); - reporter1.report(ScanServerSelector.ScanAttempt.Result.BUSY); + reporter1.report(ScanServerAttempt.Result.BUSY); assertEquals(Map.of(), snap1); @@ -67,7 +67,7 @@ public class ScanAttemptsImplTest { assertEquals(Map.of(tablet1, Set.of("ss1:1_BUSY")), simplify(snap2)); - reporter1.report(ScanServerSelector.ScanAttempt.Result.ERROR); + reporter1.report(ScanServerAttempt.Result.ERROR); assertEquals(Map.of(), snap1); assertEquals(Map.of(tablet1, Set.of("ss1:1_BUSY")), simplify(snap2)); @@ -81,8 +81,8 @@ public class ScanAttemptsImplTest { var tablet3 = nti("2", "r"); var reporter3 = sai.createReporter("ss2:2", tablet3); - reporter2.report(ScanServerSelector.ScanAttempt.Result.BUSY); - reporter3.report(ScanServerSelector.ScanAttempt.Result.ERROR); + reporter2.report(ScanServerAttempt.Result.BUSY); + reporter3.report(ScanServerAttempt.Result.ERROR); var snap4 = sai.snapshot(); diff --git a/core/src/test/java/org/apache/accumulo/core/spi/scan/ConfigurableScanServerSelectorTest.java b/core/src/test/java/org/apache/accumulo/core/spi/scan/ConfigurableScanServerSelectorTest.java index 744bf8a5d6..dd22c2913a 100644 --- a/core/src/test/java/org/apache/accumulo/core/spi/scan/ConfigurableScanServerSelectorTest.java +++ b/core/src/test/java/org/apache/accumulo/core/spi/scan/ConfigurableScanServerSelectorTest.java @@ -39,7 +39,6 @@ import org.apache.accumulo.core.data.TabletId; import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.dataImpl.TabletIdImpl; import org.apache.accumulo.core.spi.common.ServiceEnvironment; -import org.apache.accumulo.core.spi.scan.ScanServerSelector.ScanServer; import org.apache.hadoop.io.Text; import org.junit.jupiter.api.Test; @@ -77,8 +76,8 @@ public class ConfigurableScanServerSelectorTest { } @Override - public Supplier<Collection<ScanServer>> getScanServers() { - return () -> scanServers.entrySet().stream().map(entry -> new ScanServer() { + public Supplier<Collection<ScanServerInfo>> getScanServers() { + return () -> scanServers.entrySet().stream().map(entry -> new ScanServerInfo() { @Override public String getAddress() { @@ -97,7 +96,7 @@ public class ConfigurableScanServerSelectorTest { static class DaParams implements ScanServerSelector.SelectorParameters { private final Collection<TabletId> tablets; - private final Map<TabletId,Collection<? extends ScanServerSelector.ScanAttempt>> attempts; + private final Map<TabletId,Collection<? extends ScanServerAttempt>> attempts; private final Map<String,String> hints; DaParams(TabletId tablet) { @@ -106,8 +105,7 @@ public class ConfigurableScanServerSelectorTest { this.hints = Map.of(); } - DaParams(TabletId tablet, - Map<TabletId,Collection<? extends ScanServerSelector.ScanAttempt>> attempts, + DaParams(TabletId tablet, Map<TabletId,Collection<? extends ScanServerAttempt>> attempts, Map<String,String> hints) { this.tablets = Set.of(tablet); this.attempts = attempts; @@ -120,7 +118,7 @@ public class ConfigurableScanServerSelectorTest { } @Override - public Collection<? extends ScanServerSelector.ScanAttempt> getAttempts(TabletId tabletId) { + public Collection<? extends ScanServerAttempt> getAttempts(TabletId tabletId) { return attempts.getOrDefault(tabletId, Set.of()); } @@ -130,13 +128,13 @@ public class ConfigurableScanServerSelectorTest { } } - static class TestScanAttempt implements ScanServerSelector.ScanAttempt { + static class TestScanServerAttempt implements ScanServerAttempt { private final String server; private final long endTime; private final Result result; - TestScanAttempt(String server, long endTime, Result result) { + TestScanServerAttempt(String server, long endTime, Result result) { this.server = server; this.endTime = endTime; this.result = result; @@ -174,7 +172,7 @@ public class ConfigurableScanServerSelectorTest { for (int i = 0; i < 100; i++) { var tabletId = nti("1", "m"); - ScanServerSelector.Actions actions = selector.determineActions(new DaParams(tabletId)); + ScanServerSelections actions = selector.selectServers(new DaParams(tabletId)); servers.add(actions.getScanServer(tabletId)); } @@ -206,16 +204,15 @@ public class ConfigurableScanServerSelectorTest { var tabletId = nti("1", "m"); var tabletAttempts = Stream.iterate(1, i -> i <= busyAttempts, i -> i + 1) - .map(i -> (new TestScanAttempt("ss" + i + ":" + i, i, - ScanServerSelector.ScanAttempt.Result.BUSY))) + .map(i -> (new TestScanServerAttempt("ss" + i + ":" + i, i, ScanServerAttempt.Result.BUSY))) .collect(Collectors.toList()); - Map<TabletId,Collection<? extends ScanServerSelector.ScanAttempt>> attempts = new HashMap<>(); + Map<TabletId,Collection<? extends ScanServerAttempt>> attempts = new HashMap<>(); attempts.put(tabletId, tabletAttempts); for (int i = 0; i < 100 * numServers; i++) { - ScanServerSelector.Actions actions = - selector.determineActions(new DaParams(tabletId, attempts, hints)); + ScanServerSelections actions = + selector.selectServers(new DaParams(tabletId, attempts, hints)); assertEquals(expectedBusyTimeout, actions.getBusyTimeout().toMillis()); assertEquals(0, actions.getDelay().toMillis()); @@ -273,7 +270,7 @@ public class ConfigurableScanServerSelectorTest { var tabletId = t % 1000 == 0 ? nti("" + t, null) : nti("" + t, endRow); for (int i = 0; i < 100; i++) { - ScanServerSelector.Actions actions = selector.determineActions(new DaParams(tabletId)); + ScanServerSelections actions = selector.selectServers(new DaParams(tabletId)); serversSeen.add(actions.getScanServer(tabletId)); allServersSeen.merge(actions.getScanServer(tabletId), 1L, Long::sum); } @@ -387,7 +384,7 @@ public class ConfigurableScanServerSelectorTest { selector.init(new InitParams(Set.of())); var tabletId = nti("1", "m"); - ScanServerSelector.Actions actions = selector.determineActions(new DaParams(tabletId)); + ScanServerSelections actions = selector.selectServers(new DaParams(tabletId)); assertNull(actions.getScanServer(tabletId)); assertEquals(Duration.ZERO, actions.getDelay()); assertEquals(Duration.ZERO, actions.getBusyTimeout()); @@ -420,7 +417,7 @@ public class ConfigurableScanServerSelectorTest { for (int i = 0; i < 1000; i++) { var tabletId = nti("1", "m" + i); - ScanServerSelector.Actions actions = selector.determineActions(new DaParams(tabletId)); + ScanServerSelections actions = selector.selectServers(new DaParams(tabletId)); servers.add(actions.getScanServer(tabletId)); } @@ -435,8 +432,8 @@ public class ConfigurableScanServerSelectorTest { for (int i = 0; i < 1000; i++) { var tabletId = nti("1", "m" + i); - ScanServerSelector.Actions actions = - selector.determineActions(new DaParams(tabletId, Map.of(), hints)); + ScanServerSelections actions = + selector.selectServers(new DaParams(tabletId, Map.of(), hints)); servers.add(actions.getScanServer(tabletId)); } @@ -451,8 +448,8 @@ public class ConfigurableScanServerSelectorTest { for (int i = 0; i < 1000; i++) { var tabletId = nti("1", "m" + i); - ScanServerSelector.Actions actions = - selector.determineActions(new DaParams(tabletId, Map.of(), hints)); + ScanServerSelections actions = + selector.selectServers(new DaParams(tabletId, Map.of(), hints)); servers.add(actions.getScanServer(tabletId)); } @@ -467,8 +464,8 @@ public class ConfigurableScanServerSelectorTest { for (int i = 0; i < 1000; i++) { var tabletId = nti("1", "m" + i); - ScanServerSelector.Actions actions = - selector.determineActions(new DaParams(tabletId, Map.of(), hints)); + ScanServerSelections actions = + selector.selectServers(new DaParams(tabletId, Map.of(), hints)); servers.add(actions.getScanServer(tabletId)); } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletHostingServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletHostingServer.java index 55810ba847..07c54e3911 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletHostingServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletHostingServer.java @@ -21,6 +21,7 @@ package org.apache.accumulo.tserver; import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.spi.cache.BlockCacheManager; +import org.apache.accumulo.core.spi.scan.ScanServerInfo; import org.apache.accumulo.fate.zookeeper.ServiceLock; import org.apache.accumulo.fate.zookeeper.ZooCache; import org.apache.accumulo.server.GarbageCollectionLogger; @@ -32,8 +33,8 @@ import org.apache.accumulo.tserver.session.SessionManager; import org.apache.accumulo.tserver.tablet.Tablet; /** - * This interface exist to support passing a {@link TabletServer} or {@link ScanServer} to a method - * that can take either. + * This interface exist to support passing a {@link TabletServer} or {@link ScanServerInfo} to a + * method that can take either. */ public interface TabletHostingServer {