This is an automated email from the ASF dual-hosted git repository. ddanielr pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit 32e4f3fda25fbe1208582f05a7f22ccbff60362d Merge: 9414041323 2cc38dff5e Author: Daniel Roberts <ddani...@gmail.com> AuthorDate: Tue Jun 18 20:38:14 2024 +0000 Merge branch '2.1' * Added Durations to ThriftScanner to solve merge issues. .../TabletServerBatchReaderIterator.java | 19 +++++- .../accumulo/core/clientImpl/ThriftScanner.java | 60 +++++++++++++++++-- .../spi/scan/ConfigurableScanServerSelector.java | 70 ++++++++++++++++++---- .../accumulo/core/spi/scan/ScanServerSelector.java | 39 +++++++++++- .../scan/ConfigurableScanServerSelectorTest.java | 70 ++++++++++++++++++++-- .../accumulo/test/ScanServerIT_NoServers.java | 52 ++++++++++++++-- 6 files changed, 282 insertions(+), 28 deletions(-) diff --cc core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftScanner.java index f36944daaf,c76277f79b..14027327d1 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftScanner.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftScanner.java @@@ -33,6 -34,8 +34,7 @@@ import java.util.Optional import java.util.Set; import java.util.SortedMap; import java.util.SortedSet; -import java.util.concurrent.TimeUnit; + import java.util.function.Supplier; import java.util.stream.Collectors; import org.apache.accumulo.core.Constants; @@@ -40,9 -43,9 +42,10 @@@ import org.apache.accumulo.core.client. import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.SampleNotPresentException; import org.apache.accumulo.core.client.TableNotFoundException; + import org.apache.accumulo.core.client.TimedOutException; import org.apache.accumulo.core.client.sample.SamplerConfiguration; import org.apache.accumulo.core.clientImpl.TabletLocator.TabletLocation; +import org.apache.accumulo.core.clientImpl.thrift.TInfo; import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.Column; @@@ -66,14 -69,17 +69,15 @@@ import org.apache.accumulo.core.securit import org.apache.accumulo.core.spi.scan.ScanServerAttempt; import org.apache.accumulo.core.spi.scan.ScanServerSelections; import org.apache.accumulo.core.spi.scan.ScanServerSelector; +import org.apache.accumulo.core.tabletscan.thrift.ScanServerBusyException; +import org.apache.accumulo.core.tabletscan.thrift.TSampleNotPresentException; +import org.apache.accumulo.core.tabletscan.thrift.TabletScanClientService; +import org.apache.accumulo.core.tabletscan.thrift.TooManyFilesException; import org.apache.accumulo.core.tabletserver.thrift.NoSuchScanIDException; import org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException; -import org.apache.accumulo.core.tabletserver.thrift.ScanServerBusyException; -import org.apache.accumulo.core.tabletserver.thrift.TSampleNotPresentException; -import org.apache.accumulo.core.tabletserver.thrift.TabletScanClientService; -import org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException; import org.apache.accumulo.core.trace.TraceUtil; -import org.apache.accumulo.core.trace.thrift.TInfo; -import org.apache.accumulo.core.util.HostAndPort; import org.apache.accumulo.core.util.OpTimer; + import org.apache.accumulo.core.util.Retry; import org.apache.hadoop.io.Text; import org.apache.thrift.TApplicationException; import org.apache.thrift.TException; @@@ -262,6 -267,44 +266,44 @@@ public class ThriftScanner } } + static <T> Optional<T> waitUntil(Supplier<Optional<T>> condition, Duration maxWaitTime, + String description, Duration timeoutLeft, ClientContext context, TableId tableId, + Logger log) { - Retry retry = Retry.builder().infiniteRetries().retryAfter(100, TimeUnit.MILLISECONDS) - .incrementBy(100, TimeUnit.MILLISECONDS).maxWait(1, SECONDS).backOffFactor(1.5) - .logInterval(3, TimeUnit.MINUTES).createRetry(); ++ Retry retry = Retry.builder().infiniteRetries().retryAfter(Duration.ofMillis(100)) ++ .incrementBy(Duration.ofMillis(100)).maxWait(Duration.ofSeconds(1)).backOffFactor(1.5) ++ .logInterval(Duration.ofMinutes(3)).createRetry(); + + long startTime = System.nanoTime(); + Optional<T> optional = condition.get(); + while (optional.isEmpty()) { + log.trace("For tableId {} scan server selector is waiting for '{}'", tableId, description); + + var elapsedTime = Duration.ofNanos(System.nanoTime() - startTime); + + if (elapsedTime.compareTo(timeoutLeft) > 0) { + throw new TimedOutException("While waiting for '" + description + + "' in order to select a scan server, the scan timed out. "); + } + + if (elapsedTime.compareTo(maxWaitTime) > 0) { + return Optional.empty(); + } + + context.requireNotDeleted(tableId); + + try { + retry.waitForNextAttempt(log, String.format( + "For tableId %s scan server selector is waiting for '%s'", tableId, description)); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + + optional = condition.get(); + } + + return optional; + } + public static class ScanTimedOutException extends IOException { private static final long serialVersionUID = 1L; @@@ -368,9 -411,9 +410,9 @@@ } Span child2 = TraceUtil.startSpan(ThriftScanner.class, "scan::location", - Map.of("tserver", loc.tablet_location)); + Map.of("tserver", loc.getTserverLocation())); try (Scope scanLocation = child2.makeCurrent()) { - results = scan(loc, scanState, context); + results = scan(loc, scanState, context, timeOut, startTime); } catch (AccumuloSecurityException e) { context.clearTableListCache(); context.requireNotDeleted(scanState.tableId); @@@ -555,6 -601,13 +600,13 @@@ } return scanState.executionHints; } + + @Override + public <T> Optional<T> waitUntil(Supplier<Optional<T>> condition, Duration maxWaitTime, + String description) { + return ThriftScanner.waitUntil(condition, maxWaitTime, description, timeoutLeft, - context, loc.tablet_extent.tableId(), log); ++ context, loc.getExtent().tableId(), log); + } }; ScanServerSelections actions = context.getScanServerSelector().selectServers(params); diff --cc core/src/test/java/org/apache/accumulo/core/spi/scan/ConfigurableScanServerSelectorTest.java index bec0cd9a57,25ed62e89a..54de138520 --- a/core/src/test/java/org/apache/accumulo/core/spi/scan/ConfigurableScanServerSelectorTest.java +++ b/core/src/test/java/org/apache/accumulo/core/spi/scan/ConfigurableScanServerSelectorTest.java @@@ -18,8 -18,8 +18,9 @@@ */ package org.apache.accumulo.core.spi.scan; +import static org.apache.accumulo.core.util.LazySingletons.RANDOM; import static org.junit.jupiter.api.Assertions.assertEquals; + import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue;