This is an automated email from the ASF dual-hosted git repository.

domgarguilo pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/elasticity by this push:
     new 7636a8b975 Parallelize split seeding in TableOperationImpl.addSplits() 
(#4700)
7636a8b975 is described below

commit 7636a8b975a24e12e1b5f96c0c83e03b59f512a6
Author: Dom G <domgargu...@apache.org>
AuthorDate: Thu Jun 27 15:30:01 2024 -0400

    Parallelize split seeding in TableOperationImpl.addSplits() (#4700)
    
    * parallelize the starting and waiting fate operation steps
---
 .../core/clientImpl/TableOperationsImpl.java       | 100 ++++++++++++---------
 1 file changed, 57 insertions(+), 43 deletions(-)

diff --git 
a/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java
 
b/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java
index 213e8ba291..780177d68e 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java
@@ -67,6 +67,8 @@ import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeMap;
 import java.util.TreeSet;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.BiConsumer;
@@ -484,7 +486,14 @@ public class TableOperationsImpl extends 
TableOperationsHelper {
 
     ClientTabletCache tabLocator = ClientTabletCache.getInstance(context, 
tableId);
 
-    SortedSet<Text> splitsTodo = new TreeSet<>(splits);
+    SortedSet<Text> splitsTodo = Collections.synchronizedSortedSet(new 
TreeSet<>(splits));
+
+    final ByteBuffer EMPTY = ByteBuffer.allocate(0);
+
+    ExecutorService startExecutor =
+        
context.threadPools().getPoolBuilder("addSplitsStart").numCoreThreads(16).build();
+    ExecutorService waitExecutor =
+        
context.threadPools().getPoolBuilder("addSplitsWait").numCoreThreads(16).build();
 
     while (!splitsTodo.isEmpty()) {
 
@@ -493,59 +502,64 @@ public class TableOperationsImpl extends 
TableOperationsHelper {
       Map<KeyExtent,List<Text>> tabletSplits =
           mapSplitsToTablets(tableName, tableId, tabLocator, splitsTodo);
 
-      ArrayList<Pair<TFateId,List<Text>>> opids = new 
ArrayList<>(tabletSplits.size());
-
-      final ByteBuffer EMPTY = ByteBuffer.allocate(0);
+      List<CompletableFuture<Void>> futures = new ArrayList<>();
 
       // begin the fate operation for each tablet without waiting for the 
operation to complete
       for (Entry<KeyExtent,List<Text>> splitsForTablet : 
tabletSplits.entrySet()) {
-        var extent = splitsForTablet.getKey();
+        CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
+          var extent = splitsForTablet.getKey();
 
-        List<ByteBuffer> args = new ArrayList<>();
-        
args.add(ByteBuffer.wrap(extent.tableId().canonical().getBytes(UTF_8)));
-        args.add(extent.endRow() == null ? EMPTY : 
TextUtil.getByteBuffer(extent.endRow()));
-        args.add(extent.prevEndRow() == null ? EMPTY : 
TextUtil.getByteBuffer(extent.prevEndRow()));
-        splitsForTablet.getValue().forEach(split -> 
args.add(TextUtil.getByteBuffer(split)));
+          List<ByteBuffer> args = new ArrayList<>();
+          
args.add(ByteBuffer.wrap(extent.tableId().canonical().getBytes(UTF_8)));
+          args.add(extent.endRow() == null ? EMPTY : 
TextUtil.getByteBuffer(extent.endRow()));
+          args.add(
+              extent.prevEndRow() == null ? EMPTY : 
TextUtil.getByteBuffer(extent.prevEndRow()));
+          splitsForTablet.getValue().forEach(split -> 
args.add(TextUtil.getByteBuffer(split)));
 
-        try {
-          handleFateOperation(() -> {
-            TFateInstanceType t = 
FateInstanceType.fromNamespaceOrTableName(tableName).toThrift();
-            TFateId opid = beginFateOperation(t);
-            executeFateOperation(opid, FateOperation.TABLE_SPLIT, args, 
Map.of(), false);
-            opids.add(new Pair<>(opid, splitsForTablet.getValue()));
-            return null;
-          }, tableName);
-        } catch (TableExistsException | NamespaceExistsException | 
NamespaceNotFoundException e) {
-          throw new RuntimeException(e);
-        }
-      }
-
-      // after all operations have been started, wait for them to complete
-      for (Pair<TFateId,List<Text>> entry : opids) {
-        final TFateId opid = entry.getFirst();
-        final List<Text> completedSplits = entry.getSecond();
+          try {
+            return handleFateOperation(() -> {
+              TFateInstanceType t = 
FateInstanceType.fromNamespaceOrTableName(tableName).toThrift();
+              TFateId opid = beginFateOperation(t);
+              executeFateOperation(opid, FateOperation.TABLE_SPLIT, args, 
Map.of(), false);
+              return new Pair<>(opid, splitsForTablet.getValue());
+            }, tableName);
+          } catch (TableExistsException | NamespaceExistsException | 
NamespaceNotFoundException
+              | AccumuloSecurityException | TableNotFoundException | 
AccumuloException e) {
+            throw new RuntimeException(e);
+          }
+          // wait for the fate operation to complete in a separate thread pool
+        }, startExecutor).thenApplyAsync(pair -> {
+          final TFateId opid = pair.getFirst();
+          final List<Text> completedSplits = pair.getSecond();
 
-        try {
-          String status = handleFateOperation(() -> 
waitForFateOperation(opid), tableName);
+          try {
+            String status = handleFateOperation(() -> 
waitForFateOperation(opid), tableName);
 
-          if (SPLIT_SUCCESS_MSG.equals(status)) {
-            completedSplits.forEach(splitsTodo::remove);
-          }
-        } catch (TableExistsException | NamespaceExistsException | 
NamespaceNotFoundException e) {
-          throw new RuntimeException(e);
-        } finally {
-          context.clearTableListCache();
-          // always finish table op, even when exception
-          if (opid != null) {
-            try {
-              finishFateOperation(opid);
-            } catch (Exception e) {
-              log.warn("Exception thrown while finishing fate table 
operation", e);
+            if (SPLIT_SUCCESS_MSG.equals(status)) {
+              completedSplits.forEach(splitsTodo::remove);
+            }
+          } catch (TableExistsException | NamespaceExistsException | 
NamespaceNotFoundException
+              | AccumuloSecurityException | TableNotFoundException | 
AccumuloException e) {
+            throw new RuntimeException(e);
+          } finally {
+            // always finish table op, even when exception
+            if (opid != null) {
+              try {
+                finishFateOperation(opid);
+              } catch (Exception e) {
+                log.warn("Exception thrown while finishing fate table 
operation", e);
+              }
             }
           }
-        }
+          return null;
+        }, waitExecutor);
+        futures.add(future);
       }
+
+      futures.forEach(CompletableFuture::join);
     }
+    startExecutor.shutdown();
+    waitExecutor.shutdown();
   }
 
   private Map<KeyExtent,List<Text>> mapSplitsToTablets(String tableName, 
TableId tableId,

Reply via email to