This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/main by this push:
     new f208a886c5 avoids uneeded work when splitting tablets (#5071)
f208a886c5 is described below

commit f208a886c54bf1307cb7afc1af9efd8d54aadc59
Author: Keith Turner <ktur...@apache.org>
AuthorDate: Sat Nov 16 18:09:29 2024 -0500

    avoids uneeded work when splitting tablets (#5071)
    
    When tablets needed to split they were queued in a thread pool to seed
    a fate operation.  When the tablet group watcher ran again if the tablet
    was still in queue it would add it again. This would not cause any
    correctness issues, but it would cause a lot of wasted work. Also if one
    tablet was queued multiple times it could prevent another tablet that
    was not queued at all from being added.  This made splitting very when a
    large number of tablets needed to split very inefficient.
---
 .../org/apache/accumulo/core/conf/Property.java    |  6 ++--
 .../accumulo/manager/split/SeedSplitTask.java      |  4 +++
 .../apache/accumulo/manager/split/Splitter.java    | 42 +++++++++++++++-------
 3 files changed, 38 insertions(+), 14 deletions(-)

diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java 
b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index 6e2fed76af..98412a97d7 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -447,8 +447,10 @@ public enum Property {
           + "indefinitely. Default is 0 to block indefinitely. Only valid when 
tserver available "
           + "threshold is set greater than 0.",
       "1.10.0"),
-  MANAGER_SPLIT_WORKER_THREADS("manager.split.inspection.threadpool.size", 
"8", PropertyType.COUNT,
-      "The number of threads used to inspect tablets files to find split 
points.", "4.0.0"),
+  MANAGER_SPLIT_WORKER_THREADS("manager.split.seed.threadpool.size", "8", 
PropertyType.COUNT,
+      "The number of threads used to seed fate split task, the actual split 
work is done by fate"
+          + " threads.",
+      "4.0.0"),
 
   MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_INITIAL_SIZE(
       "manager.compaction.major.service.queue.initial.size", "10000", 
PropertyType.COUNT,
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/split/SeedSplitTask.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/split/SeedSplitTask.java
index 95904229aa..78f08a9471 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/split/SeedSplitTask.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/split/SeedSplitTask.java
@@ -59,4 +59,8 @@ public class SeedSplitTask implements Runnable {
       log.error("Failed to split {}", extent, e);
     }
   }
+
+  public KeyExtent getExtent() {
+    return extent;
+  }
 }
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/split/Splitter.java 
b/server/manager/src/main/java/org/apache/accumulo/manager/split/Splitter.java
index 9a99465820..85b841d1cf 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/split/Splitter.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/split/Splitter.java
@@ -23,8 +23,8 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
@@ -50,6 +50,8 @@ public class Splitter {
   private static final Logger LOG = LoggerFactory.getLogger(Splitter.class);
 
   private final ThreadPoolExecutor splitExecutor;
+  // tracks which tablets are queued in splitExecutor
+  private final Set<Text> queuedTablets = ConcurrentHashMap.newKeySet();
 
   public static class FileInfo {
     final Text firstRow;
@@ -151,17 +153,10 @@ public class Splitter {
 
   public Splitter(ServerContext context) {
     int numThreads = 
context.getConfiguration().getCount(Property.MANAGER_SPLIT_WORKER_THREADS);
-    // Set up thread pool that constrains the amount of task it queues and 
when full discards task.
-    // The purpose of this is to avoid reading lots of data into memory if 
lots of tablets need to
-    // split.
-    BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(10000);
+
     this.splitExecutor = context.threadPools().getPoolBuilder("split_seeder")
         .numCoreThreads(numThreads).numMaxThreads(numThreads).withTimeOut(0L, 
TimeUnit.MILLISECONDS)
-        .withQueue(queue).enableThreadPoolMetrics().build();
-
-    // Discard task when the queue is full, this allows the TGW to continue 
processing task other
-    // than splits.
-    this.splitExecutor.setRejectedExecutionHandler(new 
ThreadPoolExecutor.DiscardPolicy());
+        .enableThreadPoolMetrics().build();
 
     Weigher<CacheKey,
         FileInfo> weigher = (key, info) -> key.tableId.canonical().length()
@@ -191,6 +186,29 @@ public class Splitter {
   }
 
   public void initiateSplit(SeedSplitTask seedSplitTask) {
-    splitExecutor.execute(seedSplitTask);
+    // Want to avoid queuing the same tablet multiple times, it would not 
cause bugs but would waste
+    // work. Use the metadata row to identify a tablet because the KeyExtent 
also includes the prev
+    // end row which may change when splits happen. The metaRow is 
conceptually tableId+endRow and
+    // that does not change for a split.
+    Text metaRow = seedSplitTask.getExtent().toMetaRow();
+    int qsize = queuedTablets.size();
+    if (qsize < 10_000 && queuedTablets.add(metaRow)) {
+      Runnable taskWrapper = () -> {
+        try {
+          seedSplitTask.run();
+        } finally {
+          queuedTablets.remove(metaRow);
+        }
+      };
+
+      try {
+        splitExecutor.execute(taskWrapper);
+      } catch (RejectedExecutionException rje) {
+        queuedTablets.remove(metaRow);
+        throw rje;
+      }
+    } else {
+      LOG.trace("Did not add {} to split queue {}", metaRow, qsize);
+    }
   }
 }

Reply via email to