This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit e78a8f1c7bd7887c3ada9f4dcc9027c10bd3a48c Merge: 48cdef8a3b 586b210e84 Author: Keith Turner <[email protected]> AuthorDate: Wed Apr 15 16:55:14 2026 +0000 Merge branch '2.1' .../core/clientImpl/ScanServerAttemptImpl.java | 4 + .../core/clientImpl/ScanServerAttemptsImpl.java | 22 ++++- .../TabletServerBatchReaderIterator.java | 27 +++--- .../scan/ConfigurableScanServerHostSelector.java | 43 ++++++++-- .../spi/scan/ConfigurableScanServerSelector.java | 76 ++++++++++------- .../scan/ConfigurableScanServerSelectorTest.java | 97 ++++++++++++++++++++-- 6 files changed, 218 insertions(+), 51 deletions(-) diff --cc core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java index 9bc090d77b,9c042c961a..372d119f05 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java @@@ -52,7 -52,7 +52,8 @@@ import org.apache.accumulo.core.client. import org.apache.accumulo.core.client.TableDeletedException; import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.client.TimedOutException; +import org.apache.accumulo.core.clientImpl.ClientTabletCache.LocationNeed; + import org.apache.accumulo.core.clientImpl.ScanServerAttemptsImpl.BatchAttemptReporter; import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException; import org.apache.accumulo.core.data.Column; import org.apache.accumulo.core.data.Key; @@@ -581,25 -568,25 +585,26 @@@ public final class TabletServerBatchRea final Map<KeyExtent,List<Range>> tabletsRanges = binnedRanges.get(tsLocation); if (maxTabletsPerRequest == Integer.MAX_VALUE || tabletsRanges.size() == 1) { QueryTask queryTask = new QueryTask(tsLocation, tabletsRanges, failures, receiver, columns, - ssd.getBusyTimeout(), ssd.reporters.getOrDefault(tsLocation, r -> {}), ssd.getDelay()); - busyTimeout, reporters.getOrDefault(tsLocation, (t, r) -> {}), scanServerSelectorDelay); ++ ssd.getBusyTimeout(), ssd.reporters.getOrDefault(tsLocation, (t, r) -> {}), ++ ssd.getDelay()); queryTasks.add(queryTask); } else { HashMap<KeyExtent,List<Range>> tabletSubset = new HashMap<>(); for (Entry<KeyExtent,List<Range>> entry : tabletsRanges.entrySet()) { tabletSubset.put(entry.getKey(), entry.getValue()); if (tabletSubset.size() >= maxTabletsPerRequest) { - QueryTask queryTask = - new QueryTask(tsLocation, tabletSubset, failures, receiver, columns, busyTimeout, - reporters.getOrDefault(tsLocation, (t, r) -> {}), scanServerSelectorDelay); + QueryTask queryTask = new QueryTask(tsLocation, tabletSubset, failures, receiver, - columns, ssd.getBusyTimeout(), ssd.reporters.getOrDefault(tsLocation, r -> {}), ++ columns, ssd.getBusyTimeout(), ssd.reporters.getOrDefault(tsLocation, (t, r) -> {}), + ssd.getDelay()); queryTasks.add(queryTask); tabletSubset = new HashMap<>(); } } if (!tabletSubset.isEmpty()) { - QueryTask queryTask = - new QueryTask(tsLocation, tabletSubset, failures, receiver, columns, busyTimeout, - reporters.getOrDefault(tsLocation, (t, r) -> {}), scanServerSelectorDelay); + QueryTask queryTask = new QueryTask(tsLocation, tabletSubset, failures, receiver, columns, - ssd.getBusyTimeout(), ssd.reporters.getOrDefault(tsLocation, r -> {}), ++ ssd.getBusyTimeout(), ssd.reporters.getOrDefault(tsLocation, (t, r) -> {}), + ssd.getDelay()); queryTasks.add(queryTask); } } @@@ -615,62 -602,18 +620,62 @@@ } private static class ScanServerData { - Map<String,Map<KeyExtent,List<Range>>> binnedRanges; - ScanServerSelections actions; - Map<String,BatchAttemptReporter> reporters; + final List<Range> failures; + final ScanServerSelections actions; - final Map<String,ScanServerAttemptReporter> reporters; ++ final Map<String,BatchAttemptReporter> reporters; + + public ScanServerData(List<Range> failures) { + this.failures = failures; + this.actions = null; + this.reporters = Map.of(); + } + + public ScanServerData(ScanServerSelections actions, - Map<String,ScanServerAttemptReporter> reporters) { ++ Map<String,BatchAttemptReporter> reporters) { + this.actions = actions; + this.reporters = reporters; + this.failures = List.of(); + } + + public ScanServerData() { + this.failures = List.of(); + this.actions = null; + this.reporters = Map.of(); + } + + public long getBusyTimeout() { + return actions == null ? 0L : actions.getBusyTimeout().toMillis(); + } + + public Duration getDelay() { + return actions == null ? null : actions.getDelay(); + } } - private ScanServerData rebinToScanServers(Map<String,Map<KeyExtent,List<Range>>> binnedRanges, - Timer startTime) { + private ScanServerData binRangesForScanServers(ClientTabletCache clientTabletCache, + List<Range> ranges, Map<String,Map<KeyExtent,List<Range>>> binnedRanges, + CountDownTimer retryCountDownTimer) throws AccumuloException, TableNotFoundException, + AccumuloSecurityException, InvalidTabletHostingRequestException { ScanServerSelector ecsm = context.getScanServerSelector(); - List<TabletIdImpl> tabletIds = - binnedRanges.values().stream().flatMap(extentMap -> extentMap.keySet().stream()) - .map(TabletIdImpl::new).collect(Collectors.toList()); + Map<KeyExtent,String> extentToTserverMap = new HashMap<>(); + Map<KeyExtent,List<Range>> extentToRangesMap = new HashMap<>(); + + Set<TabletIdImpl> tabletIds = new HashSet<>(); + + List<Range> failures = clientTabletCache.findTablets(context, ranges, (cachedTablet, range) -> { + if (cachedTablet.getTserverLocation().isPresent()) { + extentToTserverMap.put(cachedTablet.getExtent(), + cachedTablet.getTserverLocation().orElseThrow()); + } + extentToRangesMap.computeIfAbsent(cachedTablet.getExtent(), k -> new ArrayList<>()) + .add(range); + tabletIds.add(new TabletIdImpl(cachedTablet.getExtent())); + }, LocationNeed.NOT_REQUIRED); + + if (!failures.isEmpty()) { + return new ScanServerData(failures); + } // get a snapshot of this once,not each time the plugin request it var scanAttemptsSnapshot = scanAttempts.snapshot(); @@@ -702,10 -645,20 +707,10 @@@ var actions = ecsm.selectServers(params); - Map<String,ScanServerAttemptReporter> reporters = new HashMap<>(); - Map<KeyExtent,String> extentToTserverMap = new HashMap<>(); - Map<KeyExtent,List<Range>> extentToRangesMap = new HashMap<>(); - - binnedRanges.forEach((server, extentMap) -> { - extentMap.forEach((extent, ranges) -> { - extentToTserverMap.put(extent, server); - extentToRangesMap.put(extent, ranges); - }); - }); - - Map<String,Map<KeyExtent,List<Range>>> binnedRanges2 = new HashMap<>(); - + Map<String,BatchAttemptReporter> reporters = new HashMap<>(); + failures = new ArrayList<>(); + for (TabletIdImpl tabletId : tabletIds) { KeyExtent extent = tabletId.toKeyExtent(); String serverToUse = actions.getScanServer(tabletId); @@@ -725,28 -672,19 +730,28 @@@ tabletId, options.executionHints, serverToUse); } - var rangeMap = binnedRanges2.computeIfAbsent(serverToUse, k -> new HashMap<>()); - List<Range> ranges = extentToRangesMap.get(extent); - rangeMap.put(extent, ranges); + if (serverToUse != null) { + var rangeMap = binnedRanges.computeIfAbsent(serverToUse, k -> new HashMap<>()); + List<Range> extentRanges = extentToRangesMap.get(extent); + rangeMap.put(extent, extentRanges); - var server = serverToUse; - reporters.computeIfAbsent(serverToUse, k -> scanAttempts.createReporter(server)); + var server = serverToUse; - reporters.computeIfAbsent(serverToUse, k -> scanAttempts.createReporter(server, tabletId)); ++ reporters.computeIfAbsent(serverToUse, k -> scanAttempts.createReporter(server)); + } else { + failures.addAll(extentToRangesMap.get(extent)); + } } - ScanServerData ssd = new ScanServerData(); + if (!failures.isEmpty()) { + // if there are failures at this point its because tablets are not hosted, so lets attempt to + // get them hosted + clientTabletCache.findTablets(context, ranges, (cachedTablet, range) -> {}, + LocationNeed.REQUIRED); + return new ScanServerData(failures); + } + + ScanServerData ssd = new ScanServerData(actions, reporters); - ssd.binnedRanges = binnedRanges2; - ssd.actions = actions; - ssd.reporters = reporters; log.trace("Scan server selector chose delay:{} busyTimeout:{}", actions.getDelay(), actions.getBusyTimeout()); return ssd; diff --cc core/src/main/java/org/apache/accumulo/core/spi/scan/ConfigurableScanServerHostSelector.java index 3585f5b2bc,a492d80af5..818c3d8f88 --- a/core/src/main/java/org/apache/accumulo/core/spi/scan/ConfigurableScanServerHostSelector.java +++ b/core/src/main/java/org/apache/accumulo/core/spi/scan/ConfigurableScanServerHostSelector.java @@@ -19,9 -19,10 +19,11 @@@ package org.apache.accumulo.core.spi.scan; import static org.apache.accumulo.core.spi.scan.RendezvousHasher.Mode.HOST; +import static org.apache.accumulo.core.util.LazySingletons.RANDOM; + import java.time.Duration; import java.util.ArrayList; + import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.List; diff --cc core/src/main/java/org/apache/accumulo/core/spi/scan/ConfigurableScanServerSelector.java index 0f1bee6f11,64123308c5..1f0428dfa6 --- a/core/src/main/java/org/apache/accumulo/core/spi/scan/ConfigurableScanServerSelector.java +++ b/core/src/main/java/org/apache/accumulo/core/spi/scan/ConfigurableScanServerSelector.java @@@ -475,41 -425,33 +476,33 @@@ public class ConfigurableScanServerSele }; } - Map<TabletId,String> serversToUse = new HashMap<>(); + return selectServers(params, profile, rhasher); + } - int maxAttempts = selectServers(params, profile, rhasher, serversToUse); + protected Duration computeDelay(int errorAttempts) { + if (errorAttempts == 0) { + return Duration.ZERO; + } else { + return Duration.ofMillis((long) Math.min(30_000, 100 * Math.pow(2, (errorAttempts - 1)))); + } + } - Duration busyTO = Duration.ofMillis(profile.getBusyTimeout(maxAttempts)); + ScanServerSelections selectServers(ScanServerSelector.SelectorParameters params, Profile profile, + RendezvousHasher rhasher) { + int attempts = 0; + int errorAttempts = 0; - LOG.trace("Returning servers to use: {}", serversToUse); - return new ScanServerSelections() { - @Override - public String getScanServer(TabletId tabletId) { - return serversToUse.get(tabletId); - } + HashMap<TabletId,String> serversToUse = new HashMap<>(); - @Override - public Duration getDelay() { - return Duration.ZERO; - } - - @Override - public Duration getBusyTimeout() { - return busyTO; - } - }; - } - - int selectServers(ScanServerSelector.SelectorParameters params, Profile profile, - RendezvousHasher rhasher, Map<TabletId,String> serversToUse) { - int attempts = params.getTablets().stream() - .mapToInt(tablet -> params.getAttempts(tablet).size()).max().orElse(0); + for (TabletId tablet : params.getTablets()) { + attempts = Math.max(attempts, params.getAttempts(tablet).size()); + } int numServers = profile.getNumServers(attempts, - rhasher.getSnapshot().getServersForGroup(profile.group).size()); + rhasher.getSnapshot().getServersForGroup(profile.getGroupId()).size()); for (TabletId tablet : params.getTablets()) { - List<String> rendezvousServers = - rhasher.rendezvous(SERVER, profile.group, tablet, profile.getSalt(attempts), numServers); + List<String> rendezvousServers = rhasher.rendezvous(SERVER, profile.getGroupId(), tablet, + profile.getSalt(attempts), numServers); var tabletAttempts = params.getAttempts(tablet); if (!tabletAttempts.isEmpty()) { @@@ -524,9 -474,28 +525,28 @@@ } // else all servers have failed, so just try any one of them again } // pick a random server from the set of rendezvous servers - String serverToUse = rendezvousServers.get(RANDOM.nextInt(rendezvousServers.size())); + String serverToUse = rendezvousServers.get(RANDOM.get().nextInt(rendezvousServers.size())); serversToUse.put(tablet, serverToUse); } - return attempts; + + Duration busyTO = Duration.ofMillis(profile.getBusyTimeout(attempts)); + Duration delay = computeDelay(errorAttempts); + + return new ScanServerSelections() { + @Override + public String getScanServer(TabletId tabletId) { + return serversToUse.get(tabletId); + } + + @Override + public Duration getDelay() { + return delay; + } + + @Override + public Duration getBusyTimeout() { + return busyTO; + } + }; } } diff --cc core/src/test/java/org/apache/accumulo/core/spi/scan/ConfigurableScanServerSelectorTest.java index b3dfc24fe5,d815421070..f55226977f --- a/core/src/test/java/org/apache/accumulo/core/spi/scan/ConfigurableScanServerSelectorTest.java +++ b/core/src/test/java/org/apache/accumulo/core/spi/scan/ConfigurableScanServerSelectorTest.java @@@ -26,7 -25,9 +26,8 @@@ import static org.junit.jupiter.api.Ass import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; -import java.security.SecureRandom; import java.time.Duration; + import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; @@@ -48,9 -48,9 +50,11 @@@ import org.apache.accumulo.core.spi.com import org.apache.accumulo.core.util.UtilWaitThread; import org.apache.hadoop.io.Text; import org.junit.jupiter.api.Test; + import org.junit.jupiter.params.ParameterizedTest; + import org.junit.jupiter.params.provider.EnumSource; +import com.google.common.base.Preconditions; + public class ConfigurableScanServerSelectorTest { static class InitParams implements ScanServerSelector.InitParameters { @@@ -611,11 -606,13 +622,12 @@@ /** * Test that previous failures are not used again unless all servers have failed */ - @Test - public void testPreviousFailures() { + @ParameterizedTest + @EnumSource + public void testPreviousFailures(ScanServerAttempt.Result result) { - var dg = ScanServerSelector.DEFAULT_SCAN_SERVER_GROUP_NAME; - HashMap<String,String> servers = new HashMap<>(); + HashMap<String,ResourceGroupId> servers = new HashMap<>(); for (int i = 0; i < 30; i++) { - servers.put(String.format("localhost:%d", 8000 + i), dg); + servers.put(String.format("localhost:%d", 8000 + i), ResourceGroupId.DEFAULT); } String defaultProfile = @@@ -654,4 -651,79 +666,79 @@@ .getScanServer(tabletId); assertTrue(Set.of(selected, selected2, selected3).contains(selected4)); } + + @Test + public void testErrors() { - var dg = ScanServerSelector.DEFAULT_SCAN_SERVER_GROUP_NAME; - HashMap<String,String> servers = new HashMap<>(); ++ var dg = ResourceGroupId.DEFAULT; ++ HashMap<String,ResourceGroupId> servers = new HashMap<>(); + for (int i = 0; i < 30; i++) { + servers.put(String.format("localhost:%d", 8000 + i), dg); + } + + String defaultProfile = + "{'isDefault':true,'maxBusyTimeout':'5m','busyTimeoutMultiplier':4,'timeToWaitForScanServers':'120s'," + + "'attemptPlans':[{'servers':3, 'busyTimeout':'60s'}]}"; + var opts = Map.of("profiles", "[" + defaultProfile + "]".replace('\'', '"')); + ConfigurableScanServerSelector selector = new ConfigurableScanServerSelector(); + selector.init(new InitParams(() -> servers, opts)); + + var tablet1 = nti("1", "m"); + var tablet2 = nti("1", "x"); + var selections = + selector.selectServers(new SelectorParams(Set.of(tablet1, tablet2), Map.of(), Map.of())); + // no errors so there should be no delay + assertEquals(Duration.ZERO, selections.getDelay()); + var selected1 = selections.getScanServer(tablet1); + var selected2 = selections.getScanServer(tablet2); + assertTrue(servers.containsKey(selected1)); + assertTrue(servers.containsKey(selected2)); + + Map<TabletId,Collection<? extends ScanServerAttempt>> attempts = new HashMap<>(); + List<ScanServerAttempt> tablet1Attempts = new ArrayList<>(); + attempts.put(tablet1, tablet1Attempts); + List<ScanServerAttempt> tablet2Attempts = new ArrayList<>(); + attempts.put(tablet2, tablet2Attempts); + + tablet1Attempts.add(new TestScanServerAttempt(selected1, ScanServerAttempt.Result.BUSY)); + selections = + selector.selectServers(new SelectorParams(Set.of(tablet1, tablet2), attempts, Map.of())); + // no errors, only a busy timeout, so there should be no delay + assertEquals(Duration.ZERO, selections.getDelay()); + + // add a single error to single tablet, should cause a delay + tablet2Attempts.add(new TestScanServerAttempt(selected1, ScanServerAttempt.Result.ERROR)); + selections = + selector.selectServers(new SelectorParams(Set.of(tablet1, tablet2), attempts, Map.of())); + assertEquals(Duration.ofMillis(100), selections.getDelay()); + + // add a single error to another tablet, should not increase the delay + tablet1Attempts.add(new TestScanServerAttempt(selected1, ScanServerAttempt.Result.ERROR)); + selections = + selector.selectServers(new SelectorParams(Set.of(tablet1, tablet2), attempts, Map.of())); + assertEquals(Duration.ofMillis(100), selections.getDelay()); + + // make tablet 1 have two errors, should cause a 200 ms delay + tablet1Attempts.add(new TestScanServerAttempt(selected1, ScanServerAttempt.Result.ERROR)); + selections = + selector.selectServers(new SelectorParams(Set.of(tablet1, tablet2), attempts, Map.of())); + assertEquals(Duration.ofMillis(200), selections.getDelay()); + + // make tablet 2 have three errors, should cause a 400ms delay + tablet2Attempts.add(new TestScanServerAttempt(selected1, ScanServerAttempt.Result.ERROR)); + tablet2Attempts.add(new TestScanServerAttempt(selected1, ScanServerAttempt.Result.ERROR)); + selections = + selector.selectServers(new SelectorParams(Set.of(tablet1, tablet2), attempts, Map.of())); + assertEquals(Duration.ofMillis(400), selections.getDelay()); + + // keep adding errors until max is reached + int expected = 400; + while (expected < 30_000) { + expected *= 2; + expected = Math.min(30_000, expected); + tablet2Attempts.add(new TestScanServerAttempt(selected1, ScanServerAttempt.Result.ERROR)); + selections = + selector.selectServers(new SelectorParams(Set.of(tablet1, tablet2), attempts, Map.of())); + assertEquals(Duration.ofMillis(expected), selections.getDelay()); + } + } }
