This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push: new f208a886c5 avoids uneeded work when splitting tablets (#5071) f208a886c5 is described below commit f208a886c54bf1307cb7afc1af9efd8d54aadc59 Author: Keith Turner <ktur...@apache.org> AuthorDate: Sat Nov 16 18:09:29 2024 -0500 avoids uneeded work when splitting tablets (#5071) When tablets needed to split they were queued in a thread pool to seed a fate operation. When the tablet group watcher ran again if the tablet was still in queue it would add it again. This would not cause any correctness issues, but it would cause a lot of wasted work. Also if one tablet was queued multiple times it could prevent another tablet that was not queued at all from being added. This made splitting very when a large number of tablets needed to split very inefficient. --- .../org/apache/accumulo/core/conf/Property.java | 6 ++-- .../accumulo/manager/split/SeedSplitTask.java | 4 +++ .../apache/accumulo/manager/split/Splitter.java | 42 +++++++++++++++------- 3 files changed, 38 insertions(+), 14 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java index 6e2fed76af..98412a97d7 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java @@ -447,8 +447,10 @@ public enum Property { + "indefinitely. Default is 0 to block indefinitely. Only valid when tserver available " + "threshold is set greater than 0.", "1.10.0"), - MANAGER_SPLIT_WORKER_THREADS("manager.split.inspection.threadpool.size", "8", PropertyType.COUNT, - "The number of threads used to inspect tablets files to find split points.", "4.0.0"), + MANAGER_SPLIT_WORKER_THREADS("manager.split.seed.threadpool.size", "8", PropertyType.COUNT, + "The number of threads used to seed fate split task, the actual split work is done by fate" + + " threads.", + "4.0.0"), MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_INITIAL_SIZE( "manager.compaction.major.service.queue.initial.size", "10000", PropertyType.COUNT, diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/split/SeedSplitTask.java b/server/manager/src/main/java/org/apache/accumulo/manager/split/SeedSplitTask.java index 95904229aa..78f08a9471 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/split/SeedSplitTask.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/split/SeedSplitTask.java @@ -59,4 +59,8 @@ public class SeedSplitTask implements Runnable { log.error("Failed to split {}", extent, e); } } + + public KeyExtent getExtent() { + return extent; + } } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/split/Splitter.java b/server/manager/src/main/java/org/apache/accumulo/manager/split/Splitter.java index 9a99465820..85b841d1cf 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/split/Splitter.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/split/Splitter.java @@ -23,8 +23,8 @@ import java.util.HashMap; import java.util.Map; import java.util.Objects; import java.util.Set; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -50,6 +50,8 @@ public class Splitter { private static final Logger LOG = LoggerFactory.getLogger(Splitter.class); private final ThreadPoolExecutor splitExecutor; + // tracks which tablets are queued in splitExecutor + private final Set<Text> queuedTablets = ConcurrentHashMap.newKeySet(); public static class FileInfo { final Text firstRow; @@ -151,17 +153,10 @@ public class Splitter { public Splitter(ServerContext context) { int numThreads = context.getConfiguration().getCount(Property.MANAGER_SPLIT_WORKER_THREADS); - // Set up thread pool that constrains the amount of task it queues and when full discards task. - // The purpose of this is to avoid reading lots of data into memory if lots of tablets need to - // split. - BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(10000); + this.splitExecutor = context.threadPools().getPoolBuilder("split_seeder") .numCoreThreads(numThreads).numMaxThreads(numThreads).withTimeOut(0L, TimeUnit.MILLISECONDS) - .withQueue(queue).enableThreadPoolMetrics().build(); - - // Discard task when the queue is full, this allows the TGW to continue processing task other - // than splits. - this.splitExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy()); + .enableThreadPoolMetrics().build(); Weigher<CacheKey, FileInfo> weigher = (key, info) -> key.tableId.canonical().length() @@ -191,6 +186,29 @@ public class Splitter { } public void initiateSplit(SeedSplitTask seedSplitTask) { - splitExecutor.execute(seedSplitTask); + // Want to avoid queuing the same tablet multiple times, it would not cause bugs but would waste + // work. Use the metadata row to identify a tablet because the KeyExtent also includes the prev + // end row which may change when splits happen. The metaRow is conceptually tableId+endRow and + // that does not change for a split. + Text metaRow = seedSplitTask.getExtent().toMetaRow(); + int qsize = queuedTablets.size(); + if (qsize < 10_000 && queuedTablets.add(metaRow)) { + Runnable taskWrapper = () -> { + try { + seedSplitTask.run(); + } finally { + queuedTablets.remove(metaRow); + } + }; + + try { + splitExecutor.execute(taskWrapper); + } catch (RejectedExecutionException rje) { + queuedTablets.remove(metaRow); + throw rje; + } + } else { + LOG.trace("Did not add {} to split queue {}", metaRow, qsize); + } } }