This is an automated email from the ASF dual-hosted git repository.

cshannon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/main by this push:
     new 359712e526 Process seeding of split fate operations in batches (#5404)
359712e526 is described below

commit 359712e526b4760862574adcaa6c178c853662dc
Author: Christopher L. Shannon <cshan...@apache.org>
AuthorDate: Fri Mar 21 09:18:40 2025 -0400

    Process seeding of split fate operations in batches (#5404)
    
    Updates the Seeder in the Manager that handles seeding split fate ops to
    use a single thread and to submit multiple outstanding operations to
    be seeded together instead of individually in order to improve
    performance. The user fate store will now track outstanding fate
    operations and return a future for each pending operation that will be
    completed when the batch is submitted.
    
    This closes #5160
---
 .../org/apache/accumulo/core/conf/Property.java    |   4 -
 .../java/org/apache/accumulo/core/fate/Fate.java   |   5 +
 .../org/apache/accumulo/core/fate/FateStore.java   |  11 +-
 .../accumulo/core/fate/user/FateMutator.java       |   3 +
 .../accumulo/core/fate/user/FateMutatorImpl.java   |   5 +
 .../accumulo/core/fate/user/UserFateStore.java     | 165 +++++++++++++++++----
 .../java/org/apache/accumulo/manager/Manager.java  |   2 +-
 .../accumulo/manager/TabletGroupWatcher.java       |   3 +-
 .../accumulo/manager/split/SeedSplitTask.java      |  55 -------
 .../apache/accumulo/manager/split/Splitter.java    | 102 +++++++++----
 .../apache/accumulo/test/fate/FateStoreUtil.java   |   3 -
 11 files changed, 225 insertions(+), 133 deletions(-)

diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java 
b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index 9687d2b7f6..62ae03cf69 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -466,10 +466,6 @@ public enum Property {
           + "indefinitely. Default is 0 to block indefinitely. Only valid when 
tserver available "
           + "threshold is set greater than 0.",
       "1.10.0"),
-  MANAGER_SPLIT_WORKER_THREADS("manager.split.seed.threadpool.size", "8", 
PropertyType.COUNT,
-      "The number of threads used to seed fate split task, the actual split 
work is done by fate"
-          + " threads.",
-      "4.0.0"),
   
MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_SIZE("manager.compaction.major.service.queue.size",
       "1M", PropertyType.MEMORY,
       "The data size of each resource groups compaction job priority queue.  
The memory size of "
diff --git a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java 
b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
index 93f4218119..60d3f427c0 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
@@ -55,6 +55,7 @@ import 
org.apache.accumulo.core.clientImpl.AcceptableThriftTableOperationExcepti
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.fate.FateStore.FateTxStore;
+import org.apache.accumulo.core.fate.FateStore.Seeder;
 import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus;
 import org.apache.accumulo.core.logging.FateLogger;
 import org.apache.accumulo.core.manager.thrift.TFateOperation;
@@ -539,6 +540,10 @@ public class Fate<T> {
     return store.create();
   }
 
+  public Seeder<T> beginSeeding() {
+    return store.beginSeeding();
+  }
+
   public void seedTransaction(FateOperation fateOp, FateKey fateKey, Repo<T> 
repo,
       boolean autoCleanUp) {
     try (var seeder = store.beginSeeding()) {
diff --git a/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java 
b/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java
index c7ec3b4e4c..3f5a8ec040 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java
@@ -56,10 +56,6 @@ public interface FateStore<T> extends ReadOnlyFateStore<T> {
      * Attempts to seed a transaction with the given repo if it does not 
exist. A fateId will be
      * derived from the fateKey. If seeded, sets the following data for the 
fateId in the store.
      *
-     * TODO: Support completing futures later in close method The current 
version will always return
-     * with a CompleteableFuture that is already completed. Future version 
will process will
-     * complete in the close() method for the User store.
-     *
      * <ul>
      * <li>Set the fate op</li>
      * <li>Set the status to SUBMITTED</li>
@@ -76,15 +72,12 @@ public interface FateStore<T> extends ReadOnlyFateStore<T> {
     CompletableFuture<Optional<FateId>> 
attemptToSeedTransaction(Fate.FateOperation fateOp,
         FateKey fateKey, Repo<T> repo, boolean autoCleanUp);
 
-    // TODO: Right now all implementations do nothing
-    // Eventually this would check the status of all added conditional 
mutations,
-    // retry unknown, and then close the conditional writer.
     @Override
     void close();
   }
 
-  // Creates a conditional writer for the user fate store. For Zookeeper all 
this code will probably
-  // do the same thing its currently doing as zookeeper does not support 
multi-node operations.
+  // Creates a conditional writer for the user fate store. For Zookeeper this 
will be a no-op
+  // because currently zookeeper does not support multi-node operations.
   Seeder<T> beginSeeding();
 
   /**
diff --git 
a/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutator.java 
b/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutator.java
index 25fc8fdd47..0280dbf749 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutator.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutator.java
@@ -18,6 +18,7 @@
  */
 package org.apache.accumulo.core.fate.user;
 
+import org.apache.accumulo.core.data.ConditionalMutation;
 import org.apache.accumulo.core.fate.Fate;
 import org.apache.accumulo.core.fate.FateKey;
 import org.apache.accumulo.core.fate.FateStore;
@@ -101,4 +102,6 @@ public interface FateMutator<T> {
 
   Status tryMutate();
 
+  ConditionalMutation getMutation();
+
 }
diff --git 
a/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutatorImpl.java 
b/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutatorImpl.java
index b742361ccf..bb33f6ea81 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutatorImpl.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutatorImpl.java
@@ -260,4 +260,9 @@ public class FateMutatorImpl<T> implements FateMutator<T> {
       throw new RuntimeException(e);
     }
   }
+
+  @Override
+  public ConditionalMutation getMutation() {
+    return mutation;
+  }
 }
diff --git 
a/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java 
b/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java
index 466c771d1e..2a6efbed0d 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java
@@ -21,19 +21,26 @@ package org.apache.accumulo.core.fate.user;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.SortedMap;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Function;
 import java.util.function.Predicate;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.ConditionalWriter;
 import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.accumulo.core.clientImpl.ClientContext;
@@ -47,9 +54,11 @@ import org.apache.accumulo.core.fate.Fate.TxInfo;
 import org.apache.accumulo.core.fate.FateId;
 import org.apache.accumulo.core.fate.FateInstanceType;
 import org.apache.accumulo.core.fate.FateKey;
+import org.apache.accumulo.core.fate.FateKey.FateKeyType;
 import org.apache.accumulo.core.fate.ReadOnlyRepo;
 import org.apache.accumulo.core.fate.Repo;
 import org.apache.accumulo.core.fate.StackOverflowException;
+import org.apache.accumulo.core.fate.user.FateMutator.Status;
 import org.apache.accumulo.core.fate.user.schema.FateSchema.RepoColumnFamily;
 import org.apache.accumulo.core.fate.user.schema.FateSchema.TxColumnFamily;
 import org.apache.accumulo.core.fate.user.schema.FateSchema.TxInfoColumnFamily;
@@ -57,6 +66,7 @@ import org.apache.accumulo.core.fate.zookeeper.ZooUtil;
 import org.apache.accumulo.core.iterators.user.WholeRowIterator;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.util.ColumnFQ;
+import org.apache.accumulo.core.util.Pair;
 import org.apache.accumulo.core.util.UtilWaitThread;
 import org.apache.hadoop.io.Text;
 import org.slf4j.Logger;
@@ -136,36 +146,14 @@ public class UserFateStore<T> extends 
AbstractFateStore<T> {
 
   @Override
   public Seeder<T> beginSeeding() {
-    // TODO: For now can handle seeding 1 transaction at a time so just process
-    // everything in attemptToSeedTransaction
-    // Part 2 of the changes in #5160 will allow multiple seeding attempts to 
be combined
-    // into one conditional mutation and we will need to track the pending 
operations
-    // and futures in a map
-    return new Seeder<T>() {
-      @Override
-      public CompletableFuture<Optional<FateId>> 
attemptToSeedTransaction(FateOperation fateOp,
-          FateKey fateKey, Repo<T> repo, boolean autoCleanUp) {
-        return CompletableFuture
-            .completedFuture(seedTransaction(fateOp, fateKey, repo, 
autoCleanUp));
-      }
-
-      @Override
-      public void close() {
-        // TODO: This will be used in Part 2 of #5160
-      }
-    };
+    return new BatchSeeder();
   }
 
-  private Optional<FateId> seedTransaction(Fate.FateOperation fateOp, FateKey 
fateKey, Repo<T> repo,
-      boolean autoCleanUp) {
-    final var fateId = fateIdGenerator.fromTypeAndKey(type(), fateKey);
+  private FateMutator<T> seedTransaction(Fate.FateOperation fateOp, FateKey 
fateKey, FateId fateId,
+      Repo<T> repo, boolean autoCleanUp) {
     Supplier<FateMutator<T>> mutatorFactory = () -> 
newMutator(fateId).requireAbsent()
         .putKey(fateKey).putCreateTime(System.currentTimeMillis());
-    if (seedTransaction(mutatorFactory, fateKey + " " + fateId, fateOp, repo, 
autoCleanUp)) {
-      return Optional.of(fateId);
-    } else {
-      return Optional.empty();
-    }
+    return buildMutator(mutatorFactory, fateOp, repo, autoCleanUp);
   }
 
   @Override
@@ -176,16 +164,22 @@ public class UserFateStore<T> extends 
AbstractFateStore<T> {
     return seedTransaction(mutatorFactory, fateId.canonical(), fateOp, repo, 
autoCleanUp);
   }
 
+  private FateMutator<T> buildMutator(Supplier<FateMutator<T>> mutatorFactory,
+      Fate.FateOperation fateOp, Repo<T> repo, boolean autoCleanUp) {
+    var mutator = mutatorFactory.get();
+    mutator =
+        mutator.putFateOp(serializeTxInfo(fateOp)).putRepo(1, 
repo).putStatus(TStatus.SUBMITTED);
+    if (autoCleanUp) {
+      mutator = mutator.putAutoClean(serializeTxInfo(autoCleanUp));
+    }
+    return mutator;
+  }
+
   private boolean seedTransaction(Supplier<FateMutator<T>> mutatorFactory, 
String logId,
       Fate.FateOperation fateOp, Repo<T> repo, boolean autoCleanUp) {
+    var mutator = buildMutator(mutatorFactory, fateOp, repo, autoCleanUp);
     int maxAttempts = 5;
     for (int attempt = 0; attempt < maxAttempts; attempt++) {
-      var mutator = mutatorFactory.get();
-      mutator =
-          mutator.putFateOp(serializeTxInfo(fateOp)).putRepo(1, 
repo).putStatus(TStatus.SUBMITTED);
-      if (autoCleanUp) {
-        mutator = mutator.putAutoClean(serializeTxInfo(autoCleanUp));
-      }
       var status = mutator.tryMutate();
       if (status == FateMutator.Status.ACCEPTED) {
         // signal to the super class that a new fate transaction was seeded 
and is ready to run
@@ -393,6 +387,113 @@ public class UserFateStore<T> extends 
AbstractFateStore<T> {
     return fateInstanceType;
   }
 
+  private class BatchSeeder implements Seeder<T> {
+    private final AtomicBoolean closed = new AtomicBoolean(false);
+
+    private final 
Map<FateId,Pair<FateMutator<T>,CompletableFuture<Optional<FateId>>>> pending =
+        new HashMap<>();
+
+    @Override
+    public CompletableFuture<Optional<FateId>> 
attemptToSeedTransaction(FateOperation fateOp,
+        FateKey fateKey, Repo<T> repo, boolean autoCleanUp) {
+      Preconditions.checkState(!closed.get(), "Can't attempt to seed with a 
closed seeder.");
+
+      final var fateId = fateIdGenerator.fromTypeAndKey(type(), fateKey);
+      // If not already submitted, add to the pending list and return the 
future
+      // or the existing future if duplicate. The pending map will store the 
mutator
+      // to be processed on close in a one batch.
+      return pending.computeIfAbsent(fateId, id -> {
+        FateMutator<T> mutator = seedTransaction(fateOp, fateKey, fateId, 
repo, autoCleanUp);
+        CompletableFuture<Optional<FateId>> future = new CompletableFuture<>();
+        return new Pair<>(mutator, future);
+      }).getSecond();
+    }
+
+    @Override
+    public void close() {
+      closed.set(true);
+
+      int maxAttempts = 5;
+
+      // This loop will submit all the pending mutations as one batch
+      // to a conditional writer and any known results will be removed
+      // from the pending map. Unknown results will be re-attempted up
+      // to the maxAttempts count
+      for (int attempt = 0; attempt < maxAttempts && !pending.isEmpty(); 
attempt++) {
+        var currentResults = tryMutateBatch();
+        for (Entry<FateId,ConditionalWriter.Status> result : 
currentResults.entrySet()) {
+          var fateId = result.getKey();
+          var status = result.getValue();
+          var future = pending.get(fateId).getSecond();
+          switch (result.getValue()) {
+            case ACCEPTED:
+              seededTx();
+              log.trace("Attempt to seed {} returned {}", fateId.canonical(), 
status);
+              // Complete the future with the fatId and remove from pending
+              future.complete(Optional.of(fateId));
+              pending.remove(fateId);
+              break;
+            case REJECTED:
+              log.debug("Attempt to seed {} returned {}", fateId.canonical(), 
status);
+              // Rejected so complete with an empty optional and remove from 
pending
+              future.complete(Optional.empty());
+              pending.remove(fateId);
+              break;
+            case UNKNOWN:
+              log.debug("Attempt to seed {} returned {} status, retrying", 
fateId.canonical(),
+                  status);
+              // unknown, so don't remove from map so that we try again if 
still under
+              // max attempts
+              break;
+            default:
+              // do not expect other statuses
+              throw new IllegalStateException("Unhandled status for mutation " 
+ status);
+          }
+        }
+
+        if (!pending.isEmpty()) {
+          // At this point can not reliably determine if the unknown pending 
mutations were
+          // successful or not because no reservation was acquired. For 
example since no
+          // reservation was acquired it is possible that seeding was a 
success and something
+          // immediately picked it up and started operating on it and changing 
it.
+          // If scanning after that point can not conclude success or failure. 
Another situation
+          // is that maybe the fateId already existed in a seeded form prior 
to getting this
+          // unknown.
+          UtilWaitThread.sleep(250);
+        }
+      }
+
+      // Any remaining will be UNKNOWN status, so complete the futures with an 
optional empty
+      pending.forEach((fateId, pair) -> {
+        pair.getSecond().complete(Optional.empty());
+        log.warn("Repeatedly received unknown status when attempting to seed 
{}",
+            fateId.canonical());
+      });
+    }
+
+    // Submit all the pending mutations to a single conditional writer
+    // as one batch and return the results for each mutation
+    private Map<FateId,ConditionalWriter.Status> tryMutateBatch() {
+      if (pending.isEmpty()) {
+        return Map.of();
+      }
+
+      final Map<FateId,ConditionalWriter.Status> resultsMap = new HashMap<>();
+      try (ConditionalWriter writer = 
context.createConditionalWriter(tableName)) {
+        Iterator<ConditionalWriter.Result> results = writer
+            .write(pending.values().stream().map(pair -> 
pair.getFirst().getMutation()).iterator());
+        while (results.hasNext()) {
+          var result = results.next();
+          var row = new Text(result.getMutation().getRow());
+          resultsMap.put(FateId.from(FateInstanceType.USER, row.toString()), 
result.getStatus());
+        }
+      } catch (AccumuloException | AccumuloSecurityException | 
TableNotFoundException e) {
+        throw new IllegalStateException(e);
+      }
+      return resultsMap;
+    }
+  }
+
   private class FateTxStoreImpl extends AbstractFateTxStoreImpl {
 
     private FateTxStoreImpl(FateId fateId) {
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java 
b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
index 5da833ee84..94ac0e76b5 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
@@ -1337,7 +1337,7 @@ public class Manager extends AbstractServer implements 
LiveTServerSet.Listener,
     // Don't call start the CompactionCoordinator until we have tservers and 
upgrade is complete.
     compactionCoordinator.start();
 
-    this.splitter = new Splitter(context);
+    this.splitter = new Splitter(this);
     this.splitter.start();
 
     try {
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java
index e29413e82a..fe39e60381 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java
@@ -75,7 +75,6 @@ import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.util.threads.Threads;
 import org.apache.accumulo.core.util.threads.Threads.AccumuloDaemonThread;
 import org.apache.accumulo.manager.metrics.ManagerMetrics;
-import org.apache.accumulo.manager.split.SeedSplitTask;
 import org.apache.accumulo.manager.state.TableCounts;
 import org.apache.accumulo.manager.state.TableStats;
 import org.apache.accumulo.manager.upgrade.UpgradeCoordinator;
@@ -607,7 +606,7 @@ abstract class TabletGroupWatcher extends 
AccumuloDaemonThread {
       final boolean needsSplit = 
actions.contains(ManagementAction.NEEDS_SPLITTING);
       if (needsSplit) {
         LOG.debug("{} may need splitting.", tm.getExtent());
-        manager.getSplitter().initiateSplit(new SeedSplitTask(manager, 
tm.getExtent()));
+        manager.getSplitter().initiateSplit(tm.getExtent());
       }
 
       if (actions.contains(ManagementAction.NEEDS_COMPACTING) && 
compactionGenerator != null) {
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/split/SeedSplitTask.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/split/SeedSplitTask.java
deleted file mode 100644
index 8270bc423f..0000000000
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/split/SeedSplitTask.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.accumulo.manager.split;
-
-import org.apache.accumulo.core.dataImpl.KeyExtent;
-import org.apache.accumulo.core.fate.Fate;
-import org.apache.accumulo.core.fate.FateInstanceType;
-import org.apache.accumulo.core.fate.FateKey;
-import org.apache.accumulo.manager.Manager;
-import org.apache.accumulo.manager.tableOps.split.FindSplits;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class SeedSplitTask implements Runnable {
-
-  private static final Logger log = 
LoggerFactory.getLogger(SeedSplitTask.class);
-  private final Manager manager;
-  private final KeyExtent extent;
-
-  public SeedSplitTask(Manager manager, KeyExtent extent) {
-    this.manager = manager;
-    this.extent = extent;
-  }
-
-  @Override
-  public void run() {
-    try {
-      var fateInstanceType = FateInstanceType.fromTableId((extent.tableId()));
-      
manager.fate(fateInstanceType).seedTransaction(Fate.FateOperation.SYSTEM_SPLIT,
-          FateKey.forSplit(extent), new FindSplits(extent), true);
-    } catch (Exception e) {
-      log.error("Failed to split {}", extent, e);
-    }
-  }
-
-  public KeyExtent getExtent() {
-    return extent;
-  }
-}
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/split/Splitter.java 
b/server/manager/src/main/java/org/apache/accumulo/manager/split/Splitter.java
index 85b841d1cf..d88e52ed66 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/split/Splitter.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/split/Splitter.java
@@ -18,22 +18,28 @@
  */
 package org.apache.accumulo.manager.split;
 
+import static 
com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
+
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.core.fate.Fate;
+import org.apache.accumulo.core.fate.FateInstanceType;
+import org.apache.accumulo.core.fate.FateKey;
 import org.apache.accumulo.core.file.FileOperations;
 import org.apache.accumulo.core.file.FileSKVIterator;
 import org.apache.accumulo.core.metadata.TabletFile;
 import org.apache.accumulo.core.util.cache.Caches.CacheName;
+import org.apache.accumulo.manager.Manager;
+import org.apache.accumulo.manager.tableOps.split.FindSplits;
 import org.apache.accumulo.server.ServerContext;
 import org.apache.accumulo.server.conf.TableConfiguration;
 import org.apache.hadoop.fs.FileSystem;
@@ -49,9 +55,64 @@ public class Splitter {
 
   private static final Logger LOG = LoggerFactory.getLogger(Splitter.class);
 
+  private final Manager manager;
   private final ThreadPoolExecutor splitExecutor;
   // tracks which tablets are queued in splitExecutor
-  private final Set<Text> queuedTablets = ConcurrentHashMap.newKeySet();
+  private final Map<Text,KeyExtent> queuedTablets = new ConcurrentHashMap<>();
+
+  class SplitWorker implements Runnable {
+
+    @Override
+    public void run() {
+      try {
+        while (manager.stillManager()) {
+          if (queuedTablets.isEmpty()) {
+            sleepUninterruptibly(10, TimeUnit.MILLISECONDS);
+            continue;
+          }
+
+          final Map<Text,KeyExtent> userSplits = new HashMap<>();
+          final Map<Text,KeyExtent> metaSplits = new HashMap<>();
+
+          // Go through all the queued up splits and partition
+          // into the different store types to be submitted.
+          queuedTablets.forEach((metaRow, extent) -> {
+            switch (FateInstanceType.fromTableId((extent.tableId()))) {
+              case USER:
+                userSplits.put(metaRow, extent);
+                break;
+              case META:
+                metaSplits.put(metaRow, extent);
+                break;
+              default:
+                throw new IllegalStateException("Unexpected FateInstanceType");
+            }
+          });
+
+          // see the user and then meta splits
+          // The meta plits (zk) will be processed one at a time but there 
will not be
+          // many of those splits. The user splits are processed as a batch.
+          seedSplits(FateInstanceType.USER, userSplits);
+          seedSplits(FateInstanceType.META, metaSplits);
+        }
+      } catch (Exception e) {
+        LOG.error("Failed to split", e);
+      }
+    }
+  }
+
+  private void seedSplits(FateInstanceType instanceType, Map<Text,KeyExtent> 
splits) {
+    if (!splits.isEmpty()) {
+      try (var seeder = manager.fate(instanceType).beginSeeding()) {
+        for (KeyExtent extent : splits.values()) {
+          var unused = 
seeder.attemptToSeedTransaction(Fate.FateOperation.SYSTEM_SPLIT,
+              FateKey.forSplit(extent), new FindSplits(extent), true);
+        }
+      } finally {
+        queuedTablets.keySet().removeAll(splits.keySet());
+      }
+    }
+  }
 
   public static class FileInfo {
     final Text firstRow;
@@ -151,12 +212,12 @@ public class Splitter {
 
   final LoadingCache<CacheKey,FileInfo> splitFileCache;
 
-  public Splitter(ServerContext context) {
-    int numThreads = 
context.getConfiguration().getCount(Property.MANAGER_SPLIT_WORKER_THREADS);
+  public Splitter(Manager manager) {
+    this.manager = manager;
+    ServerContext context = manager.getContext();
 
-    this.splitExecutor = context.threadPools().getPoolBuilder("split_seeder")
-        .numCoreThreads(numThreads).numMaxThreads(numThreads).withTimeOut(0L, 
TimeUnit.MILLISECONDS)
-        .enableThreadPoolMetrics().build();
+    this.splitExecutor = 
context.threadPools().getPoolBuilder("split_seeder").numCoreThreads(1)
+        .numMaxThreads(1).withTimeOut(0L, 
TimeUnit.MILLISECONDS).enableThreadPoolMetrics().build();
 
     Weigher<CacheKey,
         FileInfo> weigher = (key, info) -> key.tableId.canonical().length()
@@ -175,7 +236,9 @@ public class Splitter {
 
   }
 
-  public synchronized void start() {}
+  public synchronized void start() {
+    splitExecutor.execute(new SplitWorker());
+  }
 
   public synchronized void stop() {
     splitExecutor.shutdownNow();
@@ -185,29 +248,14 @@ public class Splitter {
     return splitFileCache.get(new CacheKey(tableId, tabletFile));
   }
 
-  public void initiateSplit(SeedSplitTask seedSplitTask) {
+  public void initiateSplit(KeyExtent extent) {
     // Want to avoid queuing the same tablet multiple times, it would not 
cause bugs but would waste
     // work. Use the metadata row to identify a tablet because the KeyExtent 
also includes the prev
     // end row which may change when splits happen. The metaRow is 
conceptually tableId+endRow and
     // that does not change for a split.
-    Text metaRow = seedSplitTask.getExtent().toMetaRow();
+    Text metaRow = extent.toMetaRow();
     int qsize = queuedTablets.size();
-    if (qsize < 10_000 && queuedTablets.add(metaRow)) {
-      Runnable taskWrapper = () -> {
-        try {
-          seedSplitTask.run();
-        } finally {
-          queuedTablets.remove(metaRow);
-        }
-      };
-
-      try {
-        splitExecutor.execute(taskWrapper);
-      } catch (RejectedExecutionException rje) {
-        queuedTablets.remove(metaRow);
-        throw rje;
-      }
-    } else {
+    if (qsize >= 10_000 || queuedTablets.putIfAbsent(metaRow, extent) != null) 
{
       LOG.trace("Did not add {} to split queue {}", metaRow, qsize);
     }
   }
diff --git 
a/test/src/main/java/org/apache/accumulo/test/fate/FateStoreUtil.java 
b/test/src/main/java/org/apache/accumulo/test/fate/FateStoreUtil.java
index 61ea073a6f..5b12b0f3cd 100644
--- a/test/src/main/java/org/apache/accumulo/test/fate/FateStoreUtil.java
+++ b/test/src/main/java/org/apache/accumulo/test/fate/FateStoreUtil.java
@@ -77,9 +77,6 @@ public class FateStoreUtil {
     assertEquals(fateTableProps, testFateTableProps);
   }
 
-  // For now just process one at a time as the current impl completes
-  // each seed transaction individually. In future versions we can test
-  // batching multiple seeding atempts together.
   public static <T> Optional<FateId> seedTransaction(FateStore<T> store, 
Fate.FateOperation fateOp,
       FateKey fateKey, Repo<T> repo, boolean autoCleanUp) {
     CompletableFuture<Optional<FateId>> fateIdFuture;

Reply via email to