This is an automated email from the ASF dual-hosted git repository. cshannon pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit c2e4874986157af720473af46d77030716e3009b Merge: 3a79823492 2a9aa76e97 Author: Christopher L. Shannon <cshan...@apache.org> AuthorDate: Sat Sep 7 17:24:21 2024 -0400 Merge branch '3.1' .../core/clientImpl/TabletServerBatchReaderIterator.java | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --cc core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java index aae5e9a927,3225223996..a076fd0147 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java @@@ -260,10 -256,6 +260,10 @@@ public class TabletServerBatchReaderIte .incrementBy(Duration.ofMillis(100)).maxWait(Duration.ofMinutes(10)).backOffFactor(1.07) .logInterval(Duration.ofMinutes(1)).createFactory().createRetry(); + ScanServerData ssd; + - long startTime = System.currentTimeMillis(); ++ Timer startTime = Timer.startNew(); + while (true) { binnedRanges.clear(); @@@ -294,15 -278,8 +294,15 @@@ lastFailureSize = failures.size(); if (log.isTraceEnabled()) { - log.trace("Failed to bin {} ranges, tablet locations were null, retrying in 100ms", - failures.size()); + log.trace( + "Failed to bin {} ranges for table {}, tablet locations were null, retrying in 100ms", + failures.size(), tableId); + } + - if (System.currentTimeMillis() - startTime > retryTimeout) { ++ if (startTime.elapsed(MILLISECONDS) > retryTimeout) { + // TODO exception used for timeout is inconsistent + throw new TimedOutException( + "Failed to find servers to process scans before timeout was exceeded."); } try { @@@ -617,63 -599,18 +617,63 @@@ } private static class ScanServerData { - Map<String,Map<KeyExtent,List<Range>>> binnedRanges; - ScanServerSelections actions; - Map<String,ScanServerAttemptReporter> reporters; + final List<Range> failures; + final ScanServerSelections actions; + final Map<String,ScanServerAttemptReporter> reporters; + + public ScanServerData(List<Range> failures) { + this.failures = failures; + this.actions = null; + this.reporters = Map.of(); + } + + public ScanServerData(ScanServerSelections actions, + Map<String,ScanServerAttemptReporter> reporters) { + this.actions = actions; + this.reporters = reporters; + this.failures = List.of(); + } + + public ScanServerData() { + this.failures = List.of(); + this.actions = null; + this.reporters = Map.of(); + } + + public long getBusyTimeout() { + return actions == null ? 0L : actions.getBusyTimeout().toMillis(); + } + + public Duration getDelay() { + return actions == null ? null : actions.getDelay(); + } } - private ScanServerData rebinToScanServers(Map<String,Map<KeyExtent,List<Range>>> binnedRanges, - Timer startTime) { + private ScanServerData binRangesForScanServers(ClientTabletCache clientTabletCache, - List<Range> ranges, Map<String,Map<KeyExtent,List<Range>>> binnedRanges, long startTime) ++ List<Range> ranges, Map<String,Map<KeyExtent,List<Range>>> binnedRanges, Timer startTime) + throws AccumuloException, TableNotFoundException, AccumuloSecurityException, + InvalidTabletHostingRequestException { + ScanServerSelector ecsm = context.getScanServerSelector(); - List<TabletIdImpl> tabletIds = - binnedRanges.values().stream().flatMap(extentMap -> extentMap.keySet().stream()) - .map(TabletIdImpl::new).collect(Collectors.toList()); + Map<KeyExtent,String> extentToTserverMap = new HashMap<>(); + Map<KeyExtent,List<Range>> extentToRangesMap = new HashMap<>(); + + Set<TabletIdImpl> tabletIds = new HashSet<>(); + + List<Range> failures = clientTabletCache.findTablets(context, ranges, (cachedTablet, range) -> { + if (cachedTablet.getTserverLocation().isPresent()) { + extentToTserverMap.put(cachedTablet.getExtent(), + cachedTablet.getTserverLocation().orElseThrow()); + } + extentToRangesMap.computeIfAbsent(cachedTablet.getExtent(), k -> new ArrayList<>()) + .add(range); + tabletIds.add(new TabletIdImpl(cachedTablet.getExtent())); + }, LocationNeed.NOT_REQUIRED); + + if (!failures.isEmpty()) { + return new ScanServerData(failures); + } // get a snapshot of this once,not each time the plugin request it var scanAttemptsSnapshot = scanAttempts.snapshot();