This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit e78a8f1c7bd7887c3ada9f4dcc9027c10bd3a48c
Merge: 48cdef8a3b 586b210e84
Author: Keith Turner <[email protected]>
AuthorDate: Wed Apr 15 16:55:14 2026 +0000

    Merge branch '2.1'

 .../core/clientImpl/ScanServerAttemptImpl.java     |  4 +
 .../core/clientImpl/ScanServerAttemptsImpl.java    | 22 ++++-
 .../TabletServerBatchReaderIterator.java           | 27 +++---
 .../scan/ConfigurableScanServerHostSelector.java   | 43 ++++++++--
 .../spi/scan/ConfigurableScanServerSelector.java   | 76 ++++++++++-------
 .../scan/ConfigurableScanServerSelectorTest.java   | 97 ++++++++++++++++++++--
 6 files changed, 218 insertions(+), 51 deletions(-)

diff --cc 
core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java
index 9bc090d77b,9c042c961a..372d119f05
--- 
a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java
@@@ -52,7 -52,7 +52,8 @@@ import org.apache.accumulo.core.client.
  import org.apache.accumulo.core.client.TableDeletedException;
  import org.apache.accumulo.core.client.TableNotFoundException;
  import org.apache.accumulo.core.client.TimedOutException;
 +import org.apache.accumulo.core.clientImpl.ClientTabletCache.LocationNeed;
+ import 
org.apache.accumulo.core.clientImpl.ScanServerAttemptsImpl.BatchAttemptReporter;
  import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException;
  import org.apache.accumulo.core.data.Column;
  import org.apache.accumulo.core.data.Key;
@@@ -581,25 -568,25 +585,26 @@@ public final class TabletServerBatchRea
        final Map<KeyExtent,List<Range>> tabletsRanges = 
binnedRanges.get(tsLocation);
        if (maxTabletsPerRequest == Integer.MAX_VALUE || tabletsRanges.size() 
== 1) {
          QueryTask queryTask = new QueryTask(tsLocation, tabletsRanges, 
failures, receiver, columns,
-             ssd.getBusyTimeout(), ssd.reporters.getOrDefault(tsLocation, r -> 
{}), ssd.getDelay());
 -            busyTimeout, reporters.getOrDefault(tsLocation, (t, r) -> {}), 
scanServerSelectorDelay);
++            ssd.getBusyTimeout(), ssd.reporters.getOrDefault(tsLocation, (t, 
r) -> {}),
++            ssd.getDelay());
          queryTasks.add(queryTask);
        } else {
          HashMap<KeyExtent,List<Range>> tabletSubset = new HashMap<>();
          for (Entry<KeyExtent,List<Range>> entry : tabletsRanges.entrySet()) {
            tabletSubset.put(entry.getKey(), entry.getValue());
            if (tabletSubset.size() >= maxTabletsPerRequest) {
 -            QueryTask queryTask =
 -                new QueryTask(tsLocation, tabletSubset, failures, receiver, 
columns, busyTimeout,
 -                    reporters.getOrDefault(tsLocation, (t, r) -> {}), 
scanServerSelectorDelay);
 +            QueryTask queryTask = new QueryTask(tsLocation, tabletSubset, 
failures, receiver,
-                 columns, ssd.getBusyTimeout(), 
ssd.reporters.getOrDefault(tsLocation, r -> {}),
++                columns, ssd.getBusyTimeout(), 
ssd.reporters.getOrDefault(tsLocation, (t, r) -> {}),
 +                ssd.getDelay());
              queryTasks.add(queryTask);
              tabletSubset = new HashMap<>();
            }
          }
  
          if (!tabletSubset.isEmpty()) {
 -          QueryTask queryTask =
 -              new QueryTask(tsLocation, tabletSubset, failures, receiver, 
columns, busyTimeout,
 -                  reporters.getOrDefault(tsLocation, (t, r) -> {}), 
scanServerSelectorDelay);
 +          QueryTask queryTask = new QueryTask(tsLocation, tabletSubset, 
failures, receiver, columns,
-               ssd.getBusyTimeout(), ssd.reporters.getOrDefault(tsLocation, r 
-> {}),
++              ssd.getBusyTimeout(), ssd.reporters.getOrDefault(tsLocation, 
(t, r) -> {}),
 +              ssd.getDelay());
            queryTasks.add(queryTask);
          }
        }
@@@ -615,62 -602,18 +620,62 @@@
    }
  
    private static class ScanServerData {
 -    Map<String,Map<KeyExtent,List<Range>>> binnedRanges;
 -    ScanServerSelections actions;
 -    Map<String,BatchAttemptReporter> reporters;
 +    final List<Range> failures;
 +    final ScanServerSelections actions;
-     final Map<String,ScanServerAttemptReporter> reporters;
++    final Map<String,BatchAttemptReporter> reporters;
 +
 +    public ScanServerData(List<Range> failures) {
 +      this.failures = failures;
 +      this.actions = null;
 +      this.reporters = Map.of();
 +    }
 +
 +    public ScanServerData(ScanServerSelections actions,
-         Map<String,ScanServerAttemptReporter> reporters) {
++        Map<String,BatchAttemptReporter> reporters) {
 +      this.actions = actions;
 +      this.reporters = reporters;
 +      this.failures = List.of();
 +    }
 +
 +    public ScanServerData() {
 +      this.failures = List.of();
 +      this.actions = null;
 +      this.reporters = Map.of();
 +    }
 +
 +    public long getBusyTimeout() {
 +      return actions == null ? 0L : actions.getBusyTimeout().toMillis();
 +    }
 +
 +    public Duration getDelay() {
 +      return actions == null ? null : actions.getDelay();
 +    }
    }
  
 -  private ScanServerData 
rebinToScanServers(Map<String,Map<KeyExtent,List<Range>>> binnedRanges,
 -      Timer startTime) {
 +  private ScanServerData binRangesForScanServers(ClientTabletCache 
clientTabletCache,
 +      List<Range> ranges, Map<String,Map<KeyExtent,List<Range>>> binnedRanges,
 +      CountDownTimer retryCountDownTimer) throws AccumuloException, 
TableNotFoundException,
 +      AccumuloSecurityException, InvalidTabletHostingRequestException {
      ScanServerSelector ecsm = context.getScanServerSelector();
  
 -    List<TabletIdImpl> tabletIds =
 -        binnedRanges.values().stream().flatMap(extentMap -> 
extentMap.keySet().stream())
 -            .map(TabletIdImpl::new).collect(Collectors.toList());
 +    Map<KeyExtent,String> extentToTserverMap = new HashMap<>();
 +    Map<KeyExtent,List<Range>> extentToRangesMap = new HashMap<>();
 +
 +    Set<TabletIdImpl> tabletIds = new HashSet<>();
 +
 +    List<Range> failures = clientTabletCache.findTablets(context, ranges, 
(cachedTablet, range) -> {
 +      if (cachedTablet.getTserverLocation().isPresent()) {
 +        extentToTserverMap.put(cachedTablet.getExtent(),
 +            cachedTablet.getTserverLocation().orElseThrow());
 +      }
 +      extentToRangesMap.computeIfAbsent(cachedTablet.getExtent(), k -> new 
ArrayList<>())
 +          .add(range);
 +      tabletIds.add(new TabletIdImpl(cachedTablet.getExtent()));
 +    }, LocationNeed.NOT_REQUIRED);
 +
 +    if (!failures.isEmpty()) {
 +      return new ScanServerData(failures);
 +    }
  
      // get a snapshot of this once,not each time the plugin request it
      var scanAttemptsSnapshot = scanAttempts.snapshot();
@@@ -702,10 -645,20 +707,10 @@@
  
      var actions = ecsm.selectServers(params);
  
-     Map<String,ScanServerAttemptReporter> reporters = new HashMap<>();
 -    Map<KeyExtent,String> extentToTserverMap = new HashMap<>();
 -    Map<KeyExtent,List<Range>> extentToRangesMap = new HashMap<>();
 -
 -    binnedRanges.forEach((server, extentMap) -> {
 -      extentMap.forEach((extent, ranges) -> {
 -        extentToTserverMap.put(extent, server);
 -        extentToRangesMap.put(extent, ranges);
 -      });
 -    });
 -
 -    Map<String,Map<KeyExtent,List<Range>>> binnedRanges2 = new HashMap<>();
 -
+     Map<String,BatchAttemptReporter> reporters = new HashMap<>();
  
 +    failures = new ArrayList<>();
 +
      for (TabletIdImpl tabletId : tabletIds) {
        KeyExtent extent = tabletId.toKeyExtent();
        String serverToUse = actions.getScanServer(tabletId);
@@@ -725,28 -672,19 +730,28 @@@
              tabletId, options.executionHints, serverToUse);
        }
  
 -      var rangeMap = binnedRanges2.computeIfAbsent(serverToUse, k -> new 
HashMap<>());
 -      List<Range> ranges = extentToRangesMap.get(extent);
 -      rangeMap.put(extent, ranges);
 +      if (serverToUse != null) {
 +        var rangeMap = binnedRanges.computeIfAbsent(serverToUse, k -> new 
HashMap<>());
 +        List<Range> extentRanges = extentToRangesMap.get(extent);
 +        rangeMap.put(extent, extentRanges);
  
 -      var server = serverToUse;
 -      reporters.computeIfAbsent(serverToUse, k -> 
scanAttempts.createReporter(server));
 +        var server = serverToUse;
-         reporters.computeIfAbsent(serverToUse, k -> 
scanAttempts.createReporter(server, tabletId));
++        reporters.computeIfAbsent(serverToUse, k -> 
scanAttempts.createReporter(server));
 +      } else {
 +        failures.addAll(extentToRangesMap.get(extent));
 +      }
      }
  
 -    ScanServerData ssd = new ScanServerData();
 +    if (!failures.isEmpty()) {
 +      // if there are failures at this point its because tablets are not 
hosted, so lets attempt to
 +      // get them hosted
 +      clientTabletCache.findTablets(context, ranges, (cachedTablet, range) -> 
{},
 +          LocationNeed.REQUIRED);
 +      return new ScanServerData(failures);
 +    }
 +
 +    ScanServerData ssd = new ScanServerData(actions, reporters);
  
 -    ssd.binnedRanges = binnedRanges2;
 -    ssd.actions = actions;
 -    ssd.reporters = reporters;
      log.trace("Scan server selector chose delay:{} busyTimeout:{}", 
actions.getDelay(),
          actions.getBusyTimeout());
      return ssd;
diff --cc 
core/src/main/java/org/apache/accumulo/core/spi/scan/ConfigurableScanServerHostSelector.java
index 3585f5b2bc,a492d80af5..818c3d8f88
--- 
a/core/src/main/java/org/apache/accumulo/core/spi/scan/ConfigurableScanServerHostSelector.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/spi/scan/ConfigurableScanServerHostSelector.java
@@@ -19,9 -19,10 +19,11 @@@
  package org.apache.accumulo.core.spi.scan;
  
  import static org.apache.accumulo.core.spi.scan.RendezvousHasher.Mode.HOST;
 +import static org.apache.accumulo.core.util.LazySingletons.RANDOM;
  
+ import java.time.Duration;
  import java.util.ArrayList;
+ import java.util.Collection;
  import java.util.HashMap;
  import java.util.HashSet;
  import java.util.List;
diff --cc 
core/src/main/java/org/apache/accumulo/core/spi/scan/ConfigurableScanServerSelector.java
index 0f1bee6f11,64123308c5..1f0428dfa6
--- 
a/core/src/main/java/org/apache/accumulo/core/spi/scan/ConfigurableScanServerSelector.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/spi/scan/ConfigurableScanServerSelector.java
@@@ -475,41 -425,33 +476,33 @@@ public class ConfigurableScanServerSele
        };
      }
  
-     Map<TabletId,String> serversToUse = new HashMap<>();
+     return selectServers(params, profile, rhasher);
+   }
  
-     int maxAttempts = selectServers(params, profile, rhasher, serversToUse);
+   protected Duration computeDelay(int errorAttempts) {
+     if (errorAttempts == 0) {
+       return Duration.ZERO;
+     } else {
+       return Duration.ofMillis((long) Math.min(30_000, 100 * Math.pow(2, 
(errorAttempts - 1))));
+     }
+   }
  
-     Duration busyTO = Duration.ofMillis(profile.getBusyTimeout(maxAttempts));
+   ScanServerSelections selectServers(ScanServerSelector.SelectorParameters 
params, Profile profile,
+       RendezvousHasher rhasher) {
+     int attempts = 0;
+     int errorAttempts = 0;
  
-     LOG.trace("Returning servers to use: {}", serversToUse);
-     return new ScanServerSelections() {
-       @Override
-       public String getScanServer(TabletId tabletId) {
-         return serversToUse.get(tabletId);
-       }
+     HashMap<TabletId,String> serversToUse = new HashMap<>();
  
-       @Override
-       public Duration getDelay() {
-         return Duration.ZERO;
-       }
- 
-       @Override
-       public Duration getBusyTimeout() {
-         return busyTO;
-       }
-     };
-   }
- 
-   int selectServers(ScanServerSelector.SelectorParameters params, Profile 
profile,
-       RendezvousHasher rhasher, Map<TabletId,String> serversToUse) {
-     int attempts = params.getTablets().stream()
-         .mapToInt(tablet -> 
params.getAttempts(tablet).size()).max().orElse(0);
+     for (TabletId tablet : params.getTablets()) {
+       attempts = Math.max(attempts, params.getAttempts(tablet).size());
+     }
  
      int numServers = profile.getNumServers(attempts,
 -        rhasher.getSnapshot().getServersForGroup(profile.group).size());
 +        
rhasher.getSnapshot().getServersForGroup(profile.getGroupId()).size());
      for (TabletId tablet : params.getTablets()) {
 -      List<String> rendezvousServers =
 -          rhasher.rendezvous(SERVER, profile.group, tablet, 
profile.getSalt(attempts), numServers);
 +      List<String> rendezvousServers = rhasher.rendezvous(SERVER, 
profile.getGroupId(), tablet,
 +          profile.getSalt(attempts), numServers);
  
        var tabletAttempts = params.getAttempts(tablet);
        if (!tabletAttempts.isEmpty()) {
@@@ -524,9 -474,28 +525,28 @@@
          } // else all servers have failed, so just try any one of them again
        }
        // pick a random server from the set of rendezvous servers
 -      String serverToUse = 
rendezvousServers.get(RANDOM.nextInt(rendezvousServers.size()));
 +      String serverToUse = 
rendezvousServers.get(RANDOM.get().nextInt(rendezvousServers.size()));
        serversToUse.put(tablet, serverToUse);
      }
-     return attempts;
+ 
+     Duration busyTO = Duration.ofMillis(profile.getBusyTimeout(attempts));
+     Duration delay = computeDelay(errorAttempts);
+ 
+     return new ScanServerSelections() {
+       @Override
+       public String getScanServer(TabletId tabletId) {
+         return serversToUse.get(tabletId);
+       }
+ 
+       @Override
+       public Duration getDelay() {
+         return delay;
+       }
+ 
+       @Override
+       public Duration getBusyTimeout() {
+         return busyTO;
+       }
+     };
    }
  }
diff --cc 
core/src/test/java/org/apache/accumulo/core/spi/scan/ConfigurableScanServerSelectorTest.java
index b3dfc24fe5,d815421070..f55226977f
--- 
a/core/src/test/java/org/apache/accumulo/core/spi/scan/ConfigurableScanServerSelectorTest.java
+++ 
b/core/src/test/java/org/apache/accumulo/core/spi/scan/ConfigurableScanServerSelectorTest.java
@@@ -26,7 -25,9 +26,8 @@@ import static org.junit.jupiter.api.Ass
  import static org.junit.jupiter.api.Assertions.assertThrows;
  import static org.junit.jupiter.api.Assertions.assertTrue;
  
 -import java.security.SecureRandom;
  import java.time.Duration;
+ import java.util.ArrayList;
  import java.util.Collection;
  import java.util.HashMap;
  import java.util.HashSet;
@@@ -48,9 -48,9 +50,11 @@@ import org.apache.accumulo.core.spi.com
  import org.apache.accumulo.core.util.UtilWaitThread;
  import org.apache.hadoop.io.Text;
  import org.junit.jupiter.api.Test;
+ import org.junit.jupiter.params.ParameterizedTest;
+ import org.junit.jupiter.params.provider.EnumSource;
  
 +import com.google.common.base.Preconditions;
 +
  public class ConfigurableScanServerSelectorTest {
  
    static class InitParams implements ScanServerSelector.InitParameters {
@@@ -611,11 -606,13 +622,12 @@@
    /**
     * Test that previous failures are not used again unless all servers have 
failed
     */
-   @Test
-   public void testPreviousFailures() {
+   @ParameterizedTest
+   @EnumSource
+   public void testPreviousFailures(ScanServerAttempt.Result result) {
 -    var dg = ScanServerSelector.DEFAULT_SCAN_SERVER_GROUP_NAME;
 -    HashMap<String,String> servers = new HashMap<>();
 +    HashMap<String,ResourceGroupId> servers = new HashMap<>();
      for (int i = 0; i < 30; i++) {
 -      servers.put(String.format("localhost:%d", 8000 + i), dg);
 +      servers.put(String.format("localhost:%d", 8000 + i), 
ResourceGroupId.DEFAULT);
      }
  
      String defaultProfile =
@@@ -654,4 -651,79 +666,79 @@@
              .getScanServer(tabletId);
      assertTrue(Set.of(selected, selected2, selected3).contains(selected4));
    }
+ 
+   @Test
+   public void testErrors() {
 -    var dg = ScanServerSelector.DEFAULT_SCAN_SERVER_GROUP_NAME;
 -    HashMap<String,String> servers = new HashMap<>();
++    var dg = ResourceGroupId.DEFAULT;
++    HashMap<String,ResourceGroupId> servers = new HashMap<>();
+     for (int i = 0; i < 30; i++) {
+       servers.put(String.format("localhost:%d", 8000 + i), dg);
+     }
+ 
+     String defaultProfile =
+         
"{'isDefault':true,'maxBusyTimeout':'5m','busyTimeoutMultiplier':4,'timeToWaitForScanServers':'120s',"
+             + "'attemptPlans':[{'servers':3, 'busyTimeout':'60s'}]}";
+     var opts = Map.of("profiles", "[" + defaultProfile + "]".replace('\'', 
'"'));
+     ConfigurableScanServerSelector selector = new 
ConfigurableScanServerSelector();
+     selector.init(new InitParams(() -> servers, opts));
+ 
+     var tablet1 = nti("1", "m");
+     var tablet2 = nti("1", "x");
+     var selections =
+         selector.selectServers(new SelectorParams(Set.of(tablet1, tablet2), 
Map.of(), Map.of()));
+     // no errors so there should be no delay
+     assertEquals(Duration.ZERO, selections.getDelay());
+     var selected1 = selections.getScanServer(tablet1);
+     var selected2 = selections.getScanServer(tablet2);
+     assertTrue(servers.containsKey(selected1));
+     assertTrue(servers.containsKey(selected2));
+ 
+     Map<TabletId,Collection<? extends ScanServerAttempt>> attempts = new 
HashMap<>();
+     List<ScanServerAttempt> tablet1Attempts = new ArrayList<>();
+     attempts.put(tablet1, tablet1Attempts);
+     List<ScanServerAttempt> tablet2Attempts = new ArrayList<>();
+     attempts.put(tablet2, tablet2Attempts);
+ 
+     tablet1Attempts.add(new TestScanServerAttempt(selected1, 
ScanServerAttempt.Result.BUSY));
+     selections =
+         selector.selectServers(new SelectorParams(Set.of(tablet1, tablet2), 
attempts, Map.of()));
+     // no errors, only a busy timeout, so there should be no delay
+     assertEquals(Duration.ZERO, selections.getDelay());
+ 
+     // add a single error to single tablet, should cause a delay
+     tablet2Attempts.add(new TestScanServerAttempt(selected1, 
ScanServerAttempt.Result.ERROR));
+     selections =
+         selector.selectServers(new SelectorParams(Set.of(tablet1, tablet2), 
attempts, Map.of()));
+     assertEquals(Duration.ofMillis(100), selections.getDelay());
+ 
+     // add a single error to another tablet, should not increase the delay
+     tablet1Attempts.add(new TestScanServerAttempt(selected1, 
ScanServerAttempt.Result.ERROR));
+     selections =
+         selector.selectServers(new SelectorParams(Set.of(tablet1, tablet2), 
attempts, Map.of()));
+     assertEquals(Duration.ofMillis(100), selections.getDelay());
+ 
+     // make tablet 1 have two errors, should cause a 200 ms delay
+     tablet1Attempts.add(new TestScanServerAttempt(selected1, 
ScanServerAttempt.Result.ERROR));
+     selections =
+         selector.selectServers(new SelectorParams(Set.of(tablet1, tablet2), 
attempts, Map.of()));
+     assertEquals(Duration.ofMillis(200), selections.getDelay());
+ 
+     // make tablet 2 have three errors, should cause a 400ms delay
+     tablet2Attempts.add(new TestScanServerAttempt(selected1, 
ScanServerAttempt.Result.ERROR));
+     tablet2Attempts.add(new TestScanServerAttempt(selected1, 
ScanServerAttempt.Result.ERROR));
+     selections =
+         selector.selectServers(new SelectorParams(Set.of(tablet1, tablet2), 
attempts, Map.of()));
+     assertEquals(Duration.ofMillis(400), selections.getDelay());
+ 
+     // keep adding errors until max is reached
+     int expected = 400;
+     while (expected < 30_000) {
+       expected *= 2;
+       expected = Math.min(30_000, expected);
+       tablet2Attempts.add(new TestScanServerAttempt(selected1, 
ScanServerAttempt.Result.ERROR));
+       selections =
+           selector.selectServers(new SelectorParams(Set.of(tablet1, tablet2), 
attempts, Map.of()));
+       assertEquals(Duration.ofMillis(expected), selections.getDelay());
+     }
+   }
  }

Reply via email to