This is an automated email from the ASF dual-hosted git repository. domgarguilo pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/elasticity by this push: new 7636a8b975 Parallelize split seeding in TableOperationImpl.addSplits() (#4700) 7636a8b975 is described below commit 7636a8b975a24e12e1b5f96c0c83e03b59f512a6 Author: Dom G <domgargu...@apache.org> AuthorDate: Thu Jun 27 15:30:01 2024 -0400 Parallelize split seeding in TableOperationImpl.addSplits() (#4700) * parallelize the starting and waiting fate operation steps --- .../core/clientImpl/TableOperationsImpl.java | 100 ++++++++++++--------- 1 file changed, 57 insertions(+), 43 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java index 213e8ba291..780177d68e 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java @@ -67,6 +67,8 @@ import java.util.Set; import java.util.SortedSet; import java.util.TreeMap; import java.util.TreeSet; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; @@ -484,7 +486,14 @@ public class TableOperationsImpl extends TableOperationsHelper { ClientTabletCache tabLocator = ClientTabletCache.getInstance(context, tableId); - SortedSet<Text> splitsTodo = new TreeSet<>(splits); + SortedSet<Text> splitsTodo = Collections.synchronizedSortedSet(new TreeSet<>(splits)); + + final ByteBuffer EMPTY = ByteBuffer.allocate(0); + + ExecutorService startExecutor = + context.threadPools().getPoolBuilder("addSplitsStart").numCoreThreads(16).build(); + ExecutorService waitExecutor = + context.threadPools().getPoolBuilder("addSplitsWait").numCoreThreads(16).build(); while (!splitsTodo.isEmpty()) { @@ -493,59 +502,64 @@ public class TableOperationsImpl extends TableOperationsHelper { Map<KeyExtent,List<Text>> tabletSplits = mapSplitsToTablets(tableName, tableId, tabLocator, splitsTodo); - ArrayList<Pair<TFateId,List<Text>>> opids = new ArrayList<>(tabletSplits.size()); - - final ByteBuffer EMPTY = ByteBuffer.allocate(0); + List<CompletableFuture<Void>> futures = new ArrayList<>(); // begin the fate operation for each tablet without waiting for the operation to complete for (Entry<KeyExtent,List<Text>> splitsForTablet : tabletSplits.entrySet()) { - var extent = splitsForTablet.getKey(); + CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> { + var extent = splitsForTablet.getKey(); - List<ByteBuffer> args = new ArrayList<>(); - args.add(ByteBuffer.wrap(extent.tableId().canonical().getBytes(UTF_8))); - args.add(extent.endRow() == null ? EMPTY : TextUtil.getByteBuffer(extent.endRow())); - args.add(extent.prevEndRow() == null ? EMPTY : TextUtil.getByteBuffer(extent.prevEndRow())); - splitsForTablet.getValue().forEach(split -> args.add(TextUtil.getByteBuffer(split))); + List<ByteBuffer> args = new ArrayList<>(); + args.add(ByteBuffer.wrap(extent.tableId().canonical().getBytes(UTF_8))); + args.add(extent.endRow() == null ? EMPTY : TextUtil.getByteBuffer(extent.endRow())); + args.add( + extent.prevEndRow() == null ? EMPTY : TextUtil.getByteBuffer(extent.prevEndRow())); + splitsForTablet.getValue().forEach(split -> args.add(TextUtil.getByteBuffer(split))); - try { - handleFateOperation(() -> { - TFateInstanceType t = FateInstanceType.fromNamespaceOrTableName(tableName).toThrift(); - TFateId opid = beginFateOperation(t); - executeFateOperation(opid, FateOperation.TABLE_SPLIT, args, Map.of(), false); - opids.add(new Pair<>(opid, splitsForTablet.getValue())); - return null; - }, tableName); - } catch (TableExistsException | NamespaceExistsException | NamespaceNotFoundException e) { - throw new RuntimeException(e); - } - } - - // after all operations have been started, wait for them to complete - for (Pair<TFateId,List<Text>> entry : opids) { - final TFateId opid = entry.getFirst(); - final List<Text> completedSplits = entry.getSecond(); + try { + return handleFateOperation(() -> { + TFateInstanceType t = FateInstanceType.fromNamespaceOrTableName(tableName).toThrift(); + TFateId opid = beginFateOperation(t); + executeFateOperation(opid, FateOperation.TABLE_SPLIT, args, Map.of(), false); + return new Pair<>(opid, splitsForTablet.getValue()); + }, tableName); + } catch (TableExistsException | NamespaceExistsException | NamespaceNotFoundException + | AccumuloSecurityException | TableNotFoundException | AccumuloException e) { + throw new RuntimeException(e); + } + // wait for the fate operation to complete in a separate thread pool + }, startExecutor).thenApplyAsync(pair -> { + final TFateId opid = pair.getFirst(); + final List<Text> completedSplits = pair.getSecond(); - try { - String status = handleFateOperation(() -> waitForFateOperation(opid), tableName); + try { + String status = handleFateOperation(() -> waitForFateOperation(opid), tableName); - if (SPLIT_SUCCESS_MSG.equals(status)) { - completedSplits.forEach(splitsTodo::remove); - } - } catch (TableExistsException | NamespaceExistsException | NamespaceNotFoundException e) { - throw new RuntimeException(e); - } finally { - context.clearTableListCache(); - // always finish table op, even when exception - if (opid != null) { - try { - finishFateOperation(opid); - } catch (Exception e) { - log.warn("Exception thrown while finishing fate table operation", e); + if (SPLIT_SUCCESS_MSG.equals(status)) { + completedSplits.forEach(splitsTodo::remove); + } + } catch (TableExistsException | NamespaceExistsException | NamespaceNotFoundException + | AccumuloSecurityException | TableNotFoundException | AccumuloException e) { + throw new RuntimeException(e); + } finally { + // always finish table op, even when exception + if (opid != null) { + try { + finishFateOperation(opid); + } catch (Exception e) { + log.warn("Exception thrown while finishing fate table operation", e); + } } } - } + return null; + }, waitExecutor); + futures.add(future); } + + futures.forEach(CompletableFuture::join); } + startExecutor.shutdown(); + waitExecutor.shutdown(); } private Map<KeyExtent,List<Text>> mapSplitsToTablets(String tableName, TableId tableId,