This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/elasticity by this push:
     new 5aca710487 lowers time to host ondemand tablets (#4581)
5aca710487 is described below

commit 5aca710487d4b40bb5a63588707bc9e35c5694b9
Author: Keith Turner <ktur...@apache.org>
AuthorDate: Tue May 21 18:28:16 2024 -0400

    lowers time to host ondemand tablets (#4581)
    
    This change lowers the time it takes to host ondemand tablets
    by moving this functionality into TabletGroupWatcher.
    
    The client RPC thread processing the hosting request can now
    directly call a function in TGW that will immediately start on
    the work of hosting the tablets.
    
    Updated SplitMillionIT to request hosting of 200 tablets all
    at once instead of one by one.  This was done by using a
    BatchScanner instead of lots of scanners.
---
 .../java/org/apache/accumulo/manager/Manager.java  |  10 ++
 .../manager/ManagerClientServiceHandler.java       |  49 +---------
 .../accumulo/manager/TabletGroupWatcher.java       | 102 ++++++++++++++++-----
 .../accumulo/test/functional/SplitMillionIT.java   |  42 +++++----
 4 files changed, 118 insertions(+), 85 deletions(-)

diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java 
b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
index 244a2696ab..404fe0ba7c 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
@@ -549,6 +549,16 @@ public class Manager extends AbstractServer
     return compactionCoordinator;
   }
 
+  public void hostOndemand(List<KeyExtent> extents) {
+    extents.forEach(e -> Preconditions.checkArgument(DataLevel.of(e.tableId()) 
== DataLevel.USER));
+
+    for (var watcher : watchers) {
+      if (watcher.getLevel() == DataLevel.USER) {
+        watcher.hostOndemand(extents);
+      }
+    }
+  }
+
   private class MigrationCleanupThread implements Runnable {
 
     @Override
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/ManagerClientServiceHandler.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/ManagerClientServiceHandler.java
index e70e151ace..abfb6675af 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/ManagerClientServiceHandler.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/ManagerClientServiceHandler.java
@@ -33,13 +33,11 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
-import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.accumulo.core.client.admin.DelegationTokenConfig;
-import org.apache.accumulo.core.client.admin.TabletAvailability;
 import org.apache.accumulo.core.clientImpl.AuthenticationTokenIdentifier;
 import org.apache.accumulo.core.clientImpl.ClientContext;
 import org.apache.accumulo.core.clientImpl.DelegationTokenConfigSerializer;
@@ -69,7 +67,6 @@ import 
org.apache.accumulo.core.manager.thrift.TabletLoadState;
 import org.apache.accumulo.core.manager.thrift.ThriftPropertyException;
 import org.apache.accumulo.core.metadata.AccumuloTable;
 import org.apache.accumulo.core.metadata.TServerInstance;
-import org.apache.accumulo.core.metadata.schema.Ample.ConditionalResult.Status;
 import org.apache.accumulo.core.metadata.schema.TabletDeletedException;
 import org.apache.accumulo.core.metadata.schema.TabletMetadata;
 import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
@@ -92,15 +89,15 @@ import org.apache.thrift.TException;
 import org.apache.zookeeper.KeeperException.NoNodeException;
 import org.slf4j.Logger;
 
+import com.google.common.collect.Lists;
+
 public class ManagerClientServiceHandler implements ManagerClientService.Iface 
{
 
   private static final Logger log = Manager.log;
   private final Manager manager;
-  private final Set<KeyExtent> hostingRequestInProgress;
 
   protected ManagerClientServiceHandler(Manager manager) {
     this.manager = manager;
-    this.hostingRequestInProgress = new ConcurrentSkipListSet<>();
   }
 
   @Override
@@ -611,51 +608,11 @@ public class ManagerClientServiceHandler implements 
ManagerClientService.Iface {
 
     manager.mustBeOnline(tableId);
 
-    final List<KeyExtent> success = new ArrayList<>();
-    final List<KeyExtent> inProgress = new ArrayList<>();
-    extents.forEach(e -> {
-      KeyExtent ke = KeyExtent.fromThrift(e);
-      if (hostingRequestInProgress.add(ke)) {
-        log.info("Tablet hosting requested for: {} ", KeyExtent.fromThrift(e));
-        inProgress.add(ke);
-      } else {
-        log.trace("Ignoring hosting request because another thread is 
currently processing it {}",
-            ke);
-      }
-    });
-    // Do not add any code here, it may interfere with the finally block 
removing extents from
-    // hostingRequestInProgress
-    try (var mutator = 
manager.getContext().getAmple().conditionallyMutateTablets()) {
-      inProgress.forEach(ke -> {
-        mutator.mutateTablet(ke).requireAbsentOperation()
-            
.requireTabletAvailability(TabletAvailability.ONDEMAND).requireAbsentLocation()
-            .setHostingRequested().submit(TabletMetadata::getHostingRequested);
-
-      });
-
-      mutator.process().forEach((extent, result) -> {
-        if (result.getStatus() == Status.ACCEPTED) {
-          // cache this success for a bit
-          success.add(extent);
-        } else {
-          if (log.isTraceEnabled()) {
-            // only read the metadata if the logging is enabled
-            log.trace("Failed to set hosting request {}", 
result.readMetadata());
-          }
-        }
-      });
-    } finally {
-      inProgress.forEach(hostingRequestInProgress::remove);
-    }
-
-    manager.getEventCoordinator().event(success,
-        "Tablet hosting requested for %d of %d tablets in %s", success.size(), 
extents.size(),
-        tableId);
+    manager.hostOndemand(Lists.transform(extents, KeyExtent::fromThrift));
   }
 
   protected TableId getTableId(ClientContext context, String tableName)
       throws ThriftTableOperationException {
     return ClientServiceHandler.checkTableId(context, tableName, null);
   }
-
 }
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java
index ee9765138c..e15b289118 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java
@@ -26,6 +26,7 @@ import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -39,6 +40,7 @@ import java.util.SortedSet;
 import java.util.TreeMap;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
@@ -46,6 +48,7 @@ import java.util.stream.Collectors;
 
 import org.apache.accumulo.core.client.BatchWriter;
 import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.admin.TabletAvailability;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Mutation;
@@ -99,6 +102,7 @@ import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableSortedSet;
 import com.google.common.collect.Iterators;
 
@@ -154,6 +158,10 @@ abstract class TabletGroupWatcher extends 
AccumuloDaemonThread {
     return stats.getLast(tableId);
   }
 
+  public Ample.DataLevel getLevel() {
+    return store.getLevel();
+  }
+
   /**
    * True if the collection of live tservers specified in 'candidates' hasn't 
changed since the last
    * time an assignment scan was started.
@@ -238,29 +246,8 @@ abstract class TabletGroupWatcher extends 
AccumuloDaemonThread {
 
             rangesToProcess.drainTo(ranges);
 
-            if (manager.getManagerGoalState() == ManagerGoalState.CLEAN_STOP) {
-              // only do full scans when trying to shutdown
-              setNeedsFullScan();
-              continue;
-            }
-
-            TabletManagementParameters tabletMgmtParams = 
createTabletManagementParameters(false);
-
-            var currentTservers = 
getCurrentTservers(tabletMgmtParams.getOnlineTsevers());
-            if (currentTservers.isEmpty()) {
+            if (!processRanges(ranges)) {
               setNeedsFullScan();
-              continue;
-            }
-
-            try (var iter = store.iterator(ranges, tabletMgmtParams)) {
-              long t1 = System.currentTimeMillis();
-              manageTablets(iter, tabletMgmtParams, currentTservers, false);
-              long t2 = System.currentTimeMillis();
-              Manager.log.debug(String.format("[%s]: partial scan time %.2f 
seconds for %,d ranges",
-                  store.name(), (t2 - t1) / 1000., ranges.size()));
-            } catch (Exception e) {
-              Manager.log.error("Error processing {} ranges for store {} ", 
ranges.size(),
-                  store.name(), e);
             }
           }
         } catch (InterruptedException e) {
@@ -322,6 +309,77 @@ abstract class TabletGroupWatcher extends 
AccumuloDaemonThread {
     }
   }
 
+  private boolean processRanges(List<Range> ranges) {
+    if (manager.getManagerGoalState() == ManagerGoalState.CLEAN_STOP) {
+      return false;
+    }
+
+    TabletManagementParameters tabletMgmtParams = 
createTabletManagementParameters(false);
+
+    var currentTservers = 
getCurrentTservers(tabletMgmtParams.getOnlineTsevers());
+    if (currentTservers.isEmpty()) {
+      return false;
+    }
+
+    try (var iter = store.iterator(ranges, tabletMgmtParams)) {
+      long t1 = System.currentTimeMillis();
+      manageTablets(iter, tabletMgmtParams, currentTservers, false);
+      long t2 = System.currentTimeMillis();
+      Manager.log.debug(String.format("[%s]: partial scan time %.2f seconds 
for %,d ranges",
+          store.name(), (t2 - t1) / 1000., ranges.size()));
+    } catch (Exception e) {
+      Manager.log.error("Error processing {} ranges for store {} ", 
ranges.size(), store.name(), e);
+    }
+
+    return true;
+  }
+
+  private final Set<KeyExtent> hostingRequestInProgress = new 
ConcurrentSkipListSet<>();
+
+  public void hostOndemand(Collection<KeyExtent> extents) {
+    // This is only expected to be called for the user level
+    Preconditions.checkState(getLevel() == Ample.DataLevel.USER);
+
+    final List<KeyExtent> inProgress = new ArrayList<>();
+    extents.forEach(ke -> {
+      if (hostingRequestInProgress.add(ke)) {
+        LOG.info("Tablet hosting requested for: {} ", ke);
+        inProgress.add(ke);
+      } else {
+        LOG.trace("Ignoring hosting request because another thread is 
currently processing it {}",
+            ke);
+      }
+    });
+    // Do not add any code here, it may interfere with the finally block 
removing extents from
+    // hostingRequestInProgress
+    try (var mutator = 
manager.getContext().getAmple().conditionallyMutateTablets()) {
+      inProgress.forEach(ke -> {
+        mutator.mutateTablet(ke).requireAbsentOperation()
+            
.requireTabletAvailability(TabletAvailability.ONDEMAND).requireAbsentLocation()
+            .setHostingRequested().submit(TabletMetadata::getHostingRequested);
+
+      });
+
+      List<Range> ranges = new ArrayList<>();
+
+      mutator.process().forEach((extent, result) -> {
+        if (result.getStatus() == Ample.ConditionalResult.Status.ACCEPTED) {
+          // cache this success for a bit
+          ranges.add(extent.toMetaRange());
+        } else {
+          if (LOG.isTraceEnabled()) {
+            // only read the metadata if the logging is enabled
+            LOG.trace("Failed to set hosting request {}", 
result.readMetadata());
+          }
+        }
+      });
+
+      processRanges(ranges);
+    } finally {
+      inProgress.forEach(hostingRequestInProgress::remove);
+    }
+  }
+
   private TabletManagementParameters
       createTabletManagementParameters(boolean 
lookForTabletsNeedingVolReplacement) {
 
diff --git 
a/test/src/main/java/org/apache/accumulo/test/functional/SplitMillionIT.java 
b/test/src/main/java/org/apache/accumulo/test/functional/SplitMillionIT.java
index 0953e7ccef..2094c15430 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/SplitMillionIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/SplitMillionIT.java
@@ -20,6 +20,7 @@ package org.apache.accumulo.test.functional;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -108,41 +109,48 @@ public class SplitMillionIT extends ConfigurableMacBase {
               IntStream.of(0, 1, 99_999_998, 99_999_999))
           .toArray();
 
-      // read and write to a few of the 1 million tablets. The following 
should touch the first,
-      // last, and a few middle tablets.
-      for (var rowInt : rows) {
-
-        var row = String.format("%010d", rowInt);
-
-        long t1 = System.currentTimeMillis();
-        try (var scanner = c.createScanner(tableName)) {
-          scanner.setRange(new Range(row));
-          assertEquals(0, scanner.stream().count());
-        }
-
-        long t2 = System.currentTimeMillis();
+      long t1 = System.currentTimeMillis();
+      try (var scanner = c.createBatchScanner(tableName)) {
+        var ranges = Arrays.stream(rows).mapToObj(rowInt -> 
String.format("%010d", rowInt))
+            .map(Range::new).collect(Collectors.toList());
+        scanner.setRanges(ranges);
+        assertEquals(0, scanner.stream().count());
+      }
+      long t2 = System.currentTimeMillis();
+      log.info("Time to scan {} rows {}ms", rows.length, (t2 - t1));
 
-        try (var writer = c.createBatchWriter(tableName)) {
+      t1 = System.currentTimeMillis();
+      try (var writer = c.createBatchWriter(tableName)) {
+        for (var rowInt : rows) {
+          var row = String.format("%010d", rowInt);
           Mutation m = new Mutation(row);
           m.put("c", "x", "200");
           m.put("c", "y", "900");
           m.put("c", "z", "300");
           writer.addMutation(m);
         }
+      }
+      t2 = System.currentTimeMillis();
+      log.info("Time to write {} rows {}ms", rows.length, (t2 - t1));
+
+      // read and write to a few of the 1 million tablets. The following 
should touch the first,
+      // last, and a few middle tablets.
+      for (var rowInt : rows) {
+        var row = String.format("%010d", rowInt);
 
         long t3 = System.currentTimeMillis();
         verifyRow(c, tableName, row, Map.of("x", "200", "y", "900", "z", 
"300"));
         long t4 = System.currentTimeMillis();
-        log.info("Row: {} scan1: {}ms write: {}ms scan2: {}ms", row, t2 - t1, 
t3 - t2, t4 - t3);
+        log.info("Row: {}  scan: {}ms", row, t4 - t3);
       }
 
       long count;
-      long t1 = System.currentTimeMillis();
+      t1 = System.currentTimeMillis();
       try (var tabletInformation =
           c.tableOperations().getTabletInformation(tableName, new Range())) {
         count = tabletInformation.count();
       }
-      long t2 = System.currentTimeMillis();
+      t2 = System.currentTimeMillis();
       assertEquals(1_000_000, count);
       log.info("Time to scan all tablets information : {}ms", t2 - t1);
 

Reply via email to