This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/elasticity by this push: new 1b7861b063 Made tablet refresh thread pool size configurable (#4405) 1b7861b063 is described below commit 1b7861b0633d93e3f4dc0a7c8a177dfce296ed9b Author: Dave Marion <dlmar...@apache.org> AuthorDate: Fri Mar 22 08:34:49 2024 -0400 Made tablet refresh thread pool size configurable (#4405) --- .../java/org/apache/accumulo/core/conf/Property.java | 12 ++++++++++++ .../java/org/apache/accumulo/manager/Manager.java | 12 ++++++++++++ .../manager/tableOps/bulkVer2/RefreshTablets.java | 9 ++------- .../manager/tableOps/bulkVer2/TabletRefresher.java | 19 ++++++------------- .../manager/tableOps/compact/CompactionDriver.java | 3 +-- .../manager/tableOps/compact/RefreshTablets.java | 3 +-- 6 files changed, 34 insertions(+), 24 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java index a3057f9d36..d5a9504f28 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java @@ -367,6 +367,18 @@ public enum Property { "Maximum number of threads the TabletGroupWatcher will use in its BatchScanner to" + " look for tablets that need maintenance.", "4.0.0"), + MANAGER_TABLET_REFRESH_MINTHREADS("manager.tablet.refresh.threads.mininum", "10", + PropertyType.COUNT, + "The Manager will notify TabletServers that a Tablet needs to be refreshed after certain operations" + + " are performed (e.g. Bulk Import). This property specifies the number of core threads in a" + + " ThreadPool in the Manager that will be used to request these refresh operations.", + "4.0.0"), + MANAGER_TABLET_REFRESH_MAXTHREADS("manager.tablet.refresh.threads.maximum", "10", + PropertyType.COUNT, + "The Manager will notify TabletServers that a Tablet needs to be refreshed after certain operations" + + " are performed (e.g. Bulk Import). This property specifies the maximum number of threads in a" + + " ThreadPool in the Manager that will be used to request these refresh operations.", + "4.0.0"), MANAGER_BULK_TIMEOUT("manager.bulk.timeout", "5m", PropertyType.TIMEDURATION, "The time to wait for a tablet server to process a bulk import request.", "1.4.3"), MANAGER_RENAME_THREADS("manager.rename.threadpool.size", "20", PropertyType.COUNT, diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java index 174edd3dcf..d17f5f570c 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java @@ -50,6 +50,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -236,6 +237,7 @@ public class Manager extends AbstractServer private final long timeToCacheRecoveryWalExistence; private ExecutorService tableInformationStatusPool = null; + private ThreadPoolExecutor tabletRefreshThreadPool; private final TabletStateStore rootTabletStore; private final TabletStateStore metadataTabletStore; @@ -436,6 +438,10 @@ public class Manager extends AbstractServer return getContext().getTableManager(); } + public ThreadPoolExecutor getTabletRefreshThreadPool() { + return tabletRefreshThreadPool; + } + public static void main(String[] args) throws Exception { try (Manager manager = new Manager(new ConfigOpts(), args)) { manager.runServer(); @@ -991,6 +997,11 @@ public class Manager extends AbstractServer tableInformationStatusPool = ThreadPools.getServerThreadPools() .createExecutorService(getConfiguration(), Property.MANAGER_STATUS_THREAD_POOL_SIZE, false); + tabletRefreshThreadPool = ThreadPools.getServerThreadPools().getPoolBuilder("Tablet refresh ") + .numCoreThreads(getConfiguration().getCount(Property.MANAGER_TABLET_REFRESH_MINTHREADS)) + .numMaxThreads(getConfiguration().getCount(Property.MANAGER_TABLET_REFRESH_MAXTHREADS)) + .build(); + Thread statusThread = Threads.createThread("Status Thread", new StatusThread()); statusThread.start(); @@ -1155,6 +1166,7 @@ public class Manager extends AbstractServer } tableInformationStatusPool.shutdownNow(); + tabletRefreshThreadPool.shutdownNow(); compactionCoordinator.shutdown(); diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/RefreshTablets.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/RefreshTablets.java index 95816691a6..d166eda3c3 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/RefreshTablets.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/RefreshTablets.java @@ -22,8 +22,6 @@ import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.fate.Repo; import org.apache.accumulo.manager.Manager; import org.apache.accumulo.manager.tableOps.ManagerRepo; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * This Repo asks hosted tablets that were bulk loaded into to refresh their metadata. It works by @@ -34,8 +32,6 @@ import org.slf4j.LoggerFactory; */ public class RefreshTablets extends ManagerRepo { - private static final Logger log = LoggerFactory.getLogger(RefreshTablets.class); - private static final long serialVersionUID = 1L; private final BulkInfo bulkInfo; @@ -52,9 +48,8 @@ public class RefreshTablets extends ManagerRepo { @Override public Repo<Manager> call(FateId fateId, Manager manager) throws Exception { - TabletRefresher.refresh(manager.getContext(), manager::onlineTabletServers, fateId, - bulkInfo.tableId, bulkInfo.firstSplit, bulkInfo.lastSplit, - tabletMetadata -> tabletMetadata.getLoaded().containsValue(fateId)); + TabletRefresher.refresh(manager, fateId, bulkInfo.tableId, bulkInfo.firstSplit, + bulkInfo.lastSplit, tabletMetadata -> tabletMetadata.getLoaded().containsValue(fateId)); return new CleanUpBulkImport(bulkInfo); } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/TabletRefresher.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/TabletRefresher.java index a5cdbe847f..a3d341a12b 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/TabletRefresher.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/TabletRefresher.java @@ -30,7 +30,6 @@ import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; -import java.util.concurrent.ThreadPoolExecutor; import java.util.function.Predicate; import java.util.function.Supplier; @@ -46,6 +45,7 @@ import org.apache.accumulo.core.rpc.clients.ThriftClientTypes; import org.apache.accumulo.core.tabletserver.thrift.TabletServerClientService; import org.apache.accumulo.core.trace.TraceUtil; import org.apache.accumulo.core.util.Retry; +import org.apache.accumulo.manager.Manager; import org.apache.accumulo.server.ServerContext; import org.apache.thrift.TException; import org.slf4j.Logger; @@ -57,15 +57,10 @@ public class TabletRefresher { private static final Logger log = LoggerFactory.getLogger(TabletRefresher.class); - public static void refresh(ServerContext context, - Supplier<Set<TServerInstance>> onlineTserversSupplier, FateId fateId, TableId tableId, - byte[] startRow, byte[] endRow, Predicate<TabletMetadata> needsRefresh) { + public static void refresh(Manager manager, FateId fateId, TableId tableId, byte[] startRow, + byte[] endRow, Predicate<TabletMetadata> needsRefresh) { - // ELASTICITY_TODO should this thread pool be configurable? - ThreadPoolExecutor threadPool = - context.threadPools().getPoolBuilder("Tablet refresh " + fateId).numCoreThreads(10).build(); - - try (var tablets = context.getAmple().readTablets().forTable(tableId) + try (var tablets = manager.getContext().getAmple().readTablets().forTable(tableId) .overlapping(startRow, endRow).checkConsistency() .fetch(ColumnType.LOADED, ColumnType.LOCATION, ColumnType.PREV_ROW).build()) { @@ -84,12 +79,10 @@ public class TabletRefresher { var refreshesNeeded = batch.stream().collect(groupingBy(TabletMetadata::getLocation, mapping(tabletMetadata -> tabletMetadata.getExtent().toThrift(), toList()))); - refreshTablets(threadPool, fateId.canonical(), context, onlineTserversSupplier, - refreshesNeeded); + refreshTablets(manager.getTabletRefreshThreadPool(), fateId.canonical(), + manager.getContext(), () -> manager.onlineTabletServers(), refreshesNeeded); }); - } finally { - threadPool.shutdownNow(); } } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java index 6167ca05cf..0f224736a0 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java @@ -329,8 +329,7 @@ class CompactionDriver extends ManagerRepo { // For any compactions that may have happened before this operation failed, attempt to refresh // tablets. - TabletRefresher.refresh(env.getContext(), env::onlineTabletServers, fateId, tableId, startRow, - endRow, tabletMetadata -> true); + TabletRefresher.refresh(env, fateId, tableId, startRow, endRow, tabletMetadata -> true); } /** diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/RefreshTablets.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/RefreshTablets.java index fd4daf0c4c..f7dc869c9e 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/RefreshTablets.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/RefreshTablets.java @@ -45,8 +45,7 @@ public class RefreshTablets extends ManagerRepo { @Override public Repo<Manager> call(FateId fateId, Manager manager) throws Exception { - TabletRefresher.refresh(manager.getContext(), manager::onlineTabletServers, fateId, tableId, - startRow, endRow, tabletMetadata -> true); + TabletRefresher.refresh(manager, fateId, tableId, startRow, endRow, tabletMetadata -> true); return new CleanUp(tableId, namespaceId, startRow, endRow); }