This is an automated email from the ASF dual-hosted git repository.

ddanielr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit 32e4f3fda25fbe1208582f05a7f22ccbff60362d
Merge: 9414041323 2cc38dff5e
Author: Daniel Roberts <ddani...@gmail.com>
AuthorDate: Tue Jun 18 20:38:14 2024 +0000

    Merge branch '2.1'
    
    * Added Durations to ThriftScanner to solve merge issues.

 .../TabletServerBatchReaderIterator.java           | 19 +++++-
 .../accumulo/core/clientImpl/ThriftScanner.java    | 60 +++++++++++++++++--
 .../spi/scan/ConfigurableScanServerSelector.java   | 70 ++++++++++++++++++----
 .../accumulo/core/spi/scan/ScanServerSelector.java | 39 +++++++++++-
 .../scan/ConfigurableScanServerSelectorTest.java   | 70 ++++++++++++++++++++--
 .../accumulo/test/ScanServerIT_NoServers.java      | 52 ++++++++++++++--
 6 files changed, 282 insertions(+), 28 deletions(-)

diff --cc 
core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftScanner.java
index f36944daaf,c76277f79b..14027327d1
--- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftScanner.java
+++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftScanner.java
@@@ -33,6 -34,8 +34,7 @@@ import java.util.Optional
  import java.util.Set;
  import java.util.SortedMap;
  import java.util.SortedSet;
 -import java.util.concurrent.TimeUnit;
+ import java.util.function.Supplier;
  import java.util.stream.Collectors;
  
  import org.apache.accumulo.core.Constants;
@@@ -40,9 -43,9 +42,10 @@@ import org.apache.accumulo.core.client.
  import org.apache.accumulo.core.client.AccumuloSecurityException;
  import org.apache.accumulo.core.client.SampleNotPresentException;
  import org.apache.accumulo.core.client.TableNotFoundException;
+ import org.apache.accumulo.core.client.TimedOutException;
  import org.apache.accumulo.core.client.sample.SamplerConfiguration;
  import org.apache.accumulo.core.clientImpl.TabletLocator.TabletLocation;
 +import org.apache.accumulo.core.clientImpl.thrift.TInfo;
  import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException;
  import org.apache.accumulo.core.conf.Property;
  import org.apache.accumulo.core.data.Column;
@@@ -66,14 -69,17 +69,15 @@@ import org.apache.accumulo.core.securit
  import org.apache.accumulo.core.spi.scan.ScanServerAttempt;
  import org.apache.accumulo.core.spi.scan.ScanServerSelections;
  import org.apache.accumulo.core.spi.scan.ScanServerSelector;
 +import org.apache.accumulo.core.tabletscan.thrift.ScanServerBusyException;
 +import org.apache.accumulo.core.tabletscan.thrift.TSampleNotPresentException;
 +import org.apache.accumulo.core.tabletscan.thrift.TabletScanClientService;
 +import org.apache.accumulo.core.tabletscan.thrift.TooManyFilesException;
  import org.apache.accumulo.core.tabletserver.thrift.NoSuchScanIDException;
  import org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException;
 -import org.apache.accumulo.core.tabletserver.thrift.ScanServerBusyException;
 -import 
org.apache.accumulo.core.tabletserver.thrift.TSampleNotPresentException;
 -import org.apache.accumulo.core.tabletserver.thrift.TabletScanClientService;
 -import org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException;
  import org.apache.accumulo.core.trace.TraceUtil;
 -import org.apache.accumulo.core.trace.thrift.TInfo;
 -import org.apache.accumulo.core.util.HostAndPort;
  import org.apache.accumulo.core.util.OpTimer;
+ import org.apache.accumulo.core.util.Retry;
  import org.apache.hadoop.io.Text;
  import org.apache.thrift.TApplicationException;
  import org.apache.thrift.TException;
@@@ -262,6 -267,44 +266,44 @@@ public class ThriftScanner 
      }
    }
  
+   static <T> Optional<T> waitUntil(Supplier<Optional<T>> condition, Duration 
maxWaitTime,
+       String description, Duration timeoutLeft, ClientContext context, 
TableId tableId,
+       Logger log) {
 -    Retry retry = Retry.builder().infiniteRetries().retryAfter(100, 
TimeUnit.MILLISECONDS)
 -        .incrementBy(100, TimeUnit.MILLISECONDS).maxWait(1, 
SECONDS).backOffFactor(1.5)
 -        .logInterval(3, TimeUnit.MINUTES).createRetry();
++    Retry retry = 
Retry.builder().infiniteRetries().retryAfter(Duration.ofMillis(100))
++        
.incrementBy(Duration.ofMillis(100)).maxWait(Duration.ofSeconds(1)).backOffFactor(1.5)
++        .logInterval(Duration.ofMinutes(3)).createRetry();
+ 
+     long startTime = System.nanoTime();
+     Optional<T> optional = condition.get();
+     while (optional.isEmpty()) {
+       log.trace("For tableId {} scan server selector is waiting for '{}'", 
tableId, description);
+ 
+       var elapsedTime = Duration.ofNanos(System.nanoTime() - startTime);
+ 
+       if (elapsedTime.compareTo(timeoutLeft) > 0) {
+         throw new TimedOutException("While waiting for '" + description
+             + "' in order to select a scan server, the scan timed out. ");
+       }
+ 
+       if (elapsedTime.compareTo(maxWaitTime) > 0) {
+         return Optional.empty();
+       }
+ 
+       context.requireNotDeleted(tableId);
+ 
+       try {
+         retry.waitForNextAttempt(log, String.format(
+             "For tableId %s scan server selector is waiting for '%s'", 
tableId, description));
+       } catch (InterruptedException e) {
+         throw new RuntimeException(e);
+       }
+ 
+       optional = condition.get();
+     }
+ 
+     return optional;
+   }
+ 
    public static class ScanTimedOutException extends IOException {
  
      private static final long serialVersionUID = 1L;
@@@ -368,9 -411,9 +410,9 @@@
          }
  
          Span child2 = TraceUtil.startSpan(ThriftScanner.class, 
"scan::location",
 -            Map.of("tserver", loc.tablet_location));
 +            Map.of("tserver", loc.getTserverLocation()));
          try (Scope scanLocation = child2.makeCurrent()) {
-           results = scan(loc, scanState, context);
+           results = scan(loc, scanState, context, timeOut, startTime);
          } catch (AccumuloSecurityException e) {
            context.clearTableListCache();
            context.requireNotDeleted(scanState.tableId);
@@@ -555,6 -601,13 +600,13 @@@
              }
              return scanState.executionHints;
            }
+ 
+           @Override
+           public <T> Optional<T> waitUntil(Supplier<Optional<T>> condition, 
Duration maxWaitTime,
+               String description) {
+             return ThriftScanner.waitUntil(condition, maxWaitTime, 
description, timeoutLeft,
 -                context, loc.tablet_extent.tableId(), log);
++                context, loc.getExtent().tableId(), log);
+           }
          };
  
          ScanServerSelections actions = 
context.getScanServerSelector().selectServers(params);
diff --cc 
core/src/test/java/org/apache/accumulo/core/spi/scan/ConfigurableScanServerSelectorTest.java
index bec0cd9a57,25ed62e89a..54de138520
--- 
a/core/src/test/java/org/apache/accumulo/core/spi/scan/ConfigurableScanServerSelectorTest.java
+++ 
b/core/src/test/java/org/apache/accumulo/core/spi/scan/ConfigurableScanServerSelectorTest.java
@@@ -18,8 -18,8 +18,9 @@@
   */
  package org.apache.accumulo.core.spi.scan;
  
 +import static org.apache.accumulo.core.util.LazySingletons.RANDOM;
  import static org.junit.jupiter.api.Assertions.assertEquals;
+ import static org.junit.jupiter.api.Assertions.assertFalse;
  import static org.junit.jupiter.api.Assertions.assertNull;
  import static org.junit.jupiter.api.Assertions.assertThrows;
  import static org.junit.jupiter.api.Assertions.assertTrue;

Reply via email to