This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/elasticity by this push: new 5aca710487 lowers time to host ondemand tablets (#4581) 5aca710487 is described below commit 5aca710487d4b40bb5a63588707bc9e35c5694b9 Author: Keith Turner <ktur...@apache.org> AuthorDate: Tue May 21 18:28:16 2024 -0400 lowers time to host ondemand tablets (#4581) This change lowers the time it takes to host ondemand tablets by moving this functionality into TabletGroupWatcher. The client RPC thread processing the hosting request can now directly call a function in TGW that will immediately start on the work of hosting the tablets. Updated SplitMillionIT to request hosting of 200 tablets all at once instead of one by one. This was done by using a BatchScanner instead of lots of scanners. --- .../java/org/apache/accumulo/manager/Manager.java | 10 ++ .../manager/ManagerClientServiceHandler.java | 49 +--------- .../accumulo/manager/TabletGroupWatcher.java | 102 ++++++++++++++++----- .../accumulo/test/functional/SplitMillionIT.java | 42 +++++---- 4 files changed, 118 insertions(+), 85 deletions(-) diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java index 244a2696ab..404fe0ba7c 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java @@ -549,6 +549,16 @@ public class Manager extends AbstractServer return compactionCoordinator; } + public void hostOndemand(List<KeyExtent> extents) { + extents.forEach(e -> Preconditions.checkArgument(DataLevel.of(e.tableId()) == DataLevel.USER)); + + for (var watcher : watchers) { + if (watcher.getLevel() == DataLevel.USER) { + watcher.hostOndemand(extents); + } + } + } + private class MigrationCleanupThread implements Runnable { @Override diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/ManagerClientServiceHandler.java b/server/manager/src/main/java/org/apache/accumulo/manager/ManagerClientServiceHandler.java index e70e151ace..abfb6675af 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/ManagerClientServiceHandler.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/ManagerClientServiceHandler.java @@ -33,13 +33,11 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; -import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.TimeUnit; import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.client.admin.DelegationTokenConfig; -import org.apache.accumulo.core.client.admin.TabletAvailability; import org.apache.accumulo.core.clientImpl.AuthenticationTokenIdentifier; import org.apache.accumulo.core.clientImpl.ClientContext; import org.apache.accumulo.core.clientImpl.DelegationTokenConfigSerializer; @@ -69,7 +67,6 @@ import org.apache.accumulo.core.manager.thrift.TabletLoadState; import org.apache.accumulo.core.manager.thrift.ThriftPropertyException; import org.apache.accumulo.core.metadata.AccumuloTable; import org.apache.accumulo.core.metadata.TServerInstance; -import org.apache.accumulo.core.metadata.schema.Ample.ConditionalResult.Status; import org.apache.accumulo.core.metadata.schema.TabletDeletedException; import org.apache.accumulo.core.metadata.schema.TabletMetadata; import org.apache.accumulo.core.metadata.schema.TabletsMetadata; @@ -92,15 +89,15 @@ import org.apache.thrift.TException; import org.apache.zookeeper.KeeperException.NoNodeException; import org.slf4j.Logger; +import com.google.common.collect.Lists; + public class ManagerClientServiceHandler implements ManagerClientService.Iface { private static final Logger log = Manager.log; private final Manager manager; - private final Set<KeyExtent> hostingRequestInProgress; protected ManagerClientServiceHandler(Manager manager) { this.manager = manager; - this.hostingRequestInProgress = new ConcurrentSkipListSet<>(); } @Override @@ -611,51 +608,11 @@ public class ManagerClientServiceHandler implements ManagerClientService.Iface { manager.mustBeOnline(tableId); - final List<KeyExtent> success = new ArrayList<>(); - final List<KeyExtent> inProgress = new ArrayList<>(); - extents.forEach(e -> { - KeyExtent ke = KeyExtent.fromThrift(e); - if (hostingRequestInProgress.add(ke)) { - log.info("Tablet hosting requested for: {} ", KeyExtent.fromThrift(e)); - inProgress.add(ke); - } else { - log.trace("Ignoring hosting request because another thread is currently processing it {}", - ke); - } - }); - // Do not add any code here, it may interfere with the finally block removing extents from - // hostingRequestInProgress - try (var mutator = manager.getContext().getAmple().conditionallyMutateTablets()) { - inProgress.forEach(ke -> { - mutator.mutateTablet(ke).requireAbsentOperation() - .requireTabletAvailability(TabletAvailability.ONDEMAND).requireAbsentLocation() - .setHostingRequested().submit(TabletMetadata::getHostingRequested); - - }); - - mutator.process().forEach((extent, result) -> { - if (result.getStatus() == Status.ACCEPTED) { - // cache this success for a bit - success.add(extent); - } else { - if (log.isTraceEnabled()) { - // only read the metadata if the logging is enabled - log.trace("Failed to set hosting request {}", result.readMetadata()); - } - } - }); - } finally { - inProgress.forEach(hostingRequestInProgress::remove); - } - - manager.getEventCoordinator().event(success, - "Tablet hosting requested for %d of %d tablets in %s", success.size(), extents.size(), - tableId); + manager.hostOndemand(Lists.transform(extents, KeyExtent::fromThrift)); } protected TableId getTableId(ClientContext context, String tableName) throws ThriftTableOperationException { return ClientServiceHandler.checkTableId(context, tableName, null); } - } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java index ee9765138c..e15b289118 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java @@ -26,6 +26,7 @@ import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -39,6 +40,7 @@ import java.util.SortedSet; import java.util.TreeMap; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -46,6 +48,7 @@ import java.util.stream.Collectors; import org.apache.accumulo.core.client.BatchWriter; import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.admin.TabletAvailability; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Mutation; @@ -99,6 +102,7 @@ import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableSortedSet; import com.google.common.collect.Iterators; @@ -154,6 +158,10 @@ abstract class TabletGroupWatcher extends AccumuloDaemonThread { return stats.getLast(tableId); } + public Ample.DataLevel getLevel() { + return store.getLevel(); + } + /** * True if the collection of live tservers specified in 'candidates' hasn't changed since the last * time an assignment scan was started. @@ -238,29 +246,8 @@ abstract class TabletGroupWatcher extends AccumuloDaemonThread { rangesToProcess.drainTo(ranges); - if (manager.getManagerGoalState() == ManagerGoalState.CLEAN_STOP) { - // only do full scans when trying to shutdown - setNeedsFullScan(); - continue; - } - - TabletManagementParameters tabletMgmtParams = createTabletManagementParameters(false); - - var currentTservers = getCurrentTservers(tabletMgmtParams.getOnlineTsevers()); - if (currentTservers.isEmpty()) { + if (!processRanges(ranges)) { setNeedsFullScan(); - continue; - } - - try (var iter = store.iterator(ranges, tabletMgmtParams)) { - long t1 = System.currentTimeMillis(); - manageTablets(iter, tabletMgmtParams, currentTservers, false); - long t2 = System.currentTimeMillis(); - Manager.log.debug(String.format("[%s]: partial scan time %.2f seconds for %,d ranges", - store.name(), (t2 - t1) / 1000., ranges.size())); - } catch (Exception e) { - Manager.log.error("Error processing {} ranges for store {} ", ranges.size(), - store.name(), e); } } } catch (InterruptedException e) { @@ -322,6 +309,77 @@ abstract class TabletGroupWatcher extends AccumuloDaemonThread { } } + private boolean processRanges(List<Range> ranges) { + if (manager.getManagerGoalState() == ManagerGoalState.CLEAN_STOP) { + return false; + } + + TabletManagementParameters tabletMgmtParams = createTabletManagementParameters(false); + + var currentTservers = getCurrentTservers(tabletMgmtParams.getOnlineTsevers()); + if (currentTservers.isEmpty()) { + return false; + } + + try (var iter = store.iterator(ranges, tabletMgmtParams)) { + long t1 = System.currentTimeMillis(); + manageTablets(iter, tabletMgmtParams, currentTservers, false); + long t2 = System.currentTimeMillis(); + Manager.log.debug(String.format("[%s]: partial scan time %.2f seconds for %,d ranges", + store.name(), (t2 - t1) / 1000., ranges.size())); + } catch (Exception e) { + Manager.log.error("Error processing {} ranges for store {} ", ranges.size(), store.name(), e); + } + + return true; + } + + private final Set<KeyExtent> hostingRequestInProgress = new ConcurrentSkipListSet<>(); + + public void hostOndemand(Collection<KeyExtent> extents) { + // This is only expected to be called for the user level + Preconditions.checkState(getLevel() == Ample.DataLevel.USER); + + final List<KeyExtent> inProgress = new ArrayList<>(); + extents.forEach(ke -> { + if (hostingRequestInProgress.add(ke)) { + LOG.info("Tablet hosting requested for: {} ", ke); + inProgress.add(ke); + } else { + LOG.trace("Ignoring hosting request because another thread is currently processing it {}", + ke); + } + }); + // Do not add any code here, it may interfere with the finally block removing extents from + // hostingRequestInProgress + try (var mutator = manager.getContext().getAmple().conditionallyMutateTablets()) { + inProgress.forEach(ke -> { + mutator.mutateTablet(ke).requireAbsentOperation() + .requireTabletAvailability(TabletAvailability.ONDEMAND).requireAbsentLocation() + .setHostingRequested().submit(TabletMetadata::getHostingRequested); + + }); + + List<Range> ranges = new ArrayList<>(); + + mutator.process().forEach((extent, result) -> { + if (result.getStatus() == Ample.ConditionalResult.Status.ACCEPTED) { + // cache this success for a bit + ranges.add(extent.toMetaRange()); + } else { + if (LOG.isTraceEnabled()) { + // only read the metadata if the logging is enabled + LOG.trace("Failed to set hosting request {}", result.readMetadata()); + } + } + }); + + processRanges(ranges); + } finally { + inProgress.forEach(hostingRequestInProgress::remove); + } + } + private TabletManagementParameters createTabletManagementParameters(boolean lookForTabletsNeedingVolReplacement) { diff --git a/test/src/main/java/org/apache/accumulo/test/functional/SplitMillionIT.java b/test/src/main/java/org/apache/accumulo/test/functional/SplitMillionIT.java index 0953e7ccef..2094c15430 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/SplitMillionIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/SplitMillionIT.java @@ -20,6 +20,7 @@ package org.apache.accumulo.test.functional; import static org.junit.jupiter.api.Assertions.assertEquals; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -108,41 +109,48 @@ public class SplitMillionIT extends ConfigurableMacBase { IntStream.of(0, 1, 99_999_998, 99_999_999)) .toArray(); - // read and write to a few of the 1 million tablets. The following should touch the first, - // last, and a few middle tablets. - for (var rowInt : rows) { - - var row = String.format("%010d", rowInt); - - long t1 = System.currentTimeMillis(); - try (var scanner = c.createScanner(tableName)) { - scanner.setRange(new Range(row)); - assertEquals(0, scanner.stream().count()); - } - - long t2 = System.currentTimeMillis(); + long t1 = System.currentTimeMillis(); + try (var scanner = c.createBatchScanner(tableName)) { + var ranges = Arrays.stream(rows).mapToObj(rowInt -> String.format("%010d", rowInt)) + .map(Range::new).collect(Collectors.toList()); + scanner.setRanges(ranges); + assertEquals(0, scanner.stream().count()); + } + long t2 = System.currentTimeMillis(); + log.info("Time to scan {} rows {}ms", rows.length, (t2 - t1)); - try (var writer = c.createBatchWriter(tableName)) { + t1 = System.currentTimeMillis(); + try (var writer = c.createBatchWriter(tableName)) { + for (var rowInt : rows) { + var row = String.format("%010d", rowInt); Mutation m = new Mutation(row); m.put("c", "x", "200"); m.put("c", "y", "900"); m.put("c", "z", "300"); writer.addMutation(m); } + } + t2 = System.currentTimeMillis(); + log.info("Time to write {} rows {}ms", rows.length, (t2 - t1)); + + // read and write to a few of the 1 million tablets. The following should touch the first, + // last, and a few middle tablets. + for (var rowInt : rows) { + var row = String.format("%010d", rowInt); long t3 = System.currentTimeMillis(); verifyRow(c, tableName, row, Map.of("x", "200", "y", "900", "z", "300")); long t4 = System.currentTimeMillis(); - log.info("Row: {} scan1: {}ms write: {}ms scan2: {}ms", row, t2 - t1, t3 - t2, t4 - t3); + log.info("Row: {} scan: {}ms", row, t4 - t3); } long count; - long t1 = System.currentTimeMillis(); + t1 = System.currentTimeMillis(); try (var tabletInformation = c.tableOperations().getTabletInformation(tableName, new Range())) { count = tabletInformation.count(); } - long t2 = System.currentTimeMillis(); + t2 = System.currentTimeMillis(); assertEquals(1_000_000, count); log.info("Time to scan all tablets information : {}ms", t2 - t1);