This is an automated email from the ASF dual-hosted git repository.

dlmarion pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/elasticity by this push:
     new 1b7861b063 Made tablet refresh thread pool size configurable (#4405)
1b7861b063 is described below

commit 1b7861b0633d93e3f4dc0a7c8a177dfce296ed9b
Author: Dave Marion <dlmar...@apache.org>
AuthorDate: Fri Mar 22 08:34:49 2024 -0400

    Made tablet refresh thread pool size configurable (#4405)
---
 .../java/org/apache/accumulo/core/conf/Property.java  | 12 ++++++++++++
 .../java/org/apache/accumulo/manager/Manager.java     | 12 ++++++++++++
 .../manager/tableOps/bulkVer2/RefreshTablets.java     |  9 ++-------
 .../manager/tableOps/bulkVer2/TabletRefresher.java    | 19 ++++++-------------
 .../manager/tableOps/compact/CompactionDriver.java    |  3 +--
 .../manager/tableOps/compact/RefreshTablets.java      |  3 +--
 6 files changed, 34 insertions(+), 24 deletions(-)

diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java 
b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index a3057f9d36..d5a9504f28 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -367,6 +367,18 @@ public enum Property {
       "Maximum number of threads the TabletGroupWatcher will use in its 
BatchScanner to"
           + " look for tablets that need maintenance.",
       "4.0.0"),
+  MANAGER_TABLET_REFRESH_MINTHREADS("manager.tablet.refresh.threads.mininum", 
"10",
+      PropertyType.COUNT,
+      "The Manager will notify TabletServers that a Tablet needs to be 
refreshed after certain operations"
+          + " are performed (e.g. Bulk Import). This property specifies the 
number of core threads in a"
+          + " ThreadPool in the Manager that will be used to request these 
refresh operations.",
+      "4.0.0"),
+  MANAGER_TABLET_REFRESH_MAXTHREADS("manager.tablet.refresh.threads.maximum", 
"10",
+      PropertyType.COUNT,
+      "The Manager will notify TabletServers that a Tablet needs to be 
refreshed after certain operations"
+          + " are performed (e.g. Bulk Import). This property specifies the 
maximum number of threads in a"
+          + " ThreadPool in the Manager that will be used to request these 
refresh operations.",
+      "4.0.0"),
   MANAGER_BULK_TIMEOUT("manager.bulk.timeout", "5m", PropertyType.TIMEDURATION,
       "The time to wait for a tablet server to process a bulk import 
request.", "1.4.3"),
   MANAGER_RENAME_THREADS("manager.rename.threadpool.size", "20", 
PropertyType.COUNT,
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java 
b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
index 174edd3dcf..d17f5f570c 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
@@ -50,6 +50,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
@@ -236,6 +237,7 @@ public class Manager extends AbstractServer
 
   private final long timeToCacheRecoveryWalExistence;
   private ExecutorService tableInformationStatusPool = null;
+  private ThreadPoolExecutor tabletRefreshThreadPool;
 
   private final TabletStateStore rootTabletStore;
   private final TabletStateStore metadataTabletStore;
@@ -436,6 +438,10 @@ public class Manager extends AbstractServer
     return getContext().getTableManager();
   }
 
+  public ThreadPoolExecutor getTabletRefreshThreadPool() {
+    return tabletRefreshThreadPool;
+  }
+
   public static void main(String[] args) throws Exception {
     try (Manager manager = new Manager(new ConfigOpts(), args)) {
       manager.runServer();
@@ -991,6 +997,11 @@ public class Manager extends AbstractServer
     tableInformationStatusPool = ThreadPools.getServerThreadPools()
         .createExecutorService(getConfiguration(), 
Property.MANAGER_STATUS_THREAD_POOL_SIZE, false);
 
+    tabletRefreshThreadPool = 
ThreadPools.getServerThreadPools().getPoolBuilder("Tablet refresh ")
+        
.numCoreThreads(getConfiguration().getCount(Property.MANAGER_TABLET_REFRESH_MINTHREADS))
+        
.numMaxThreads(getConfiguration().getCount(Property.MANAGER_TABLET_REFRESH_MAXTHREADS))
+        .build();
+
     Thread statusThread = Threads.createThread("Status Thread", new 
StatusThread());
     statusThread.start();
 
@@ -1155,6 +1166,7 @@ public class Manager extends AbstractServer
     }
 
     tableInformationStatusPool.shutdownNow();
+    tabletRefreshThreadPool.shutdownNow();
 
     compactionCoordinator.shutdown();
 
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/RefreshTablets.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/RefreshTablets.java
index 95816691a6..d166eda3c3 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/RefreshTablets.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/RefreshTablets.java
@@ -22,8 +22,6 @@ import org.apache.accumulo.core.fate.FateId;
 import org.apache.accumulo.core.fate.Repo;
 import org.apache.accumulo.manager.Manager;
 import org.apache.accumulo.manager.tableOps.ManagerRepo;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * This Repo asks hosted tablets that were bulk loaded into to refresh their 
metadata. It works by
@@ -34,8 +32,6 @@ import org.slf4j.LoggerFactory;
  */
 public class RefreshTablets extends ManagerRepo {
 
-  private static final Logger log = 
LoggerFactory.getLogger(RefreshTablets.class);
-
   private static final long serialVersionUID = 1L;
 
   private final BulkInfo bulkInfo;
@@ -52,9 +48,8 @@ public class RefreshTablets extends ManagerRepo {
   @Override
   public Repo<Manager> call(FateId fateId, Manager manager) throws Exception {
 
-    TabletRefresher.refresh(manager.getContext(), 
manager::onlineTabletServers, fateId,
-        bulkInfo.tableId, bulkInfo.firstSplit, bulkInfo.lastSplit,
-        tabletMetadata -> tabletMetadata.getLoaded().containsValue(fateId));
+    TabletRefresher.refresh(manager, fateId, bulkInfo.tableId, 
bulkInfo.firstSplit,
+        bulkInfo.lastSplit, tabletMetadata -> 
tabletMetadata.getLoaded().containsValue(fateId));
 
     return new CleanUpBulkImport(bulkInfo);
   }
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/TabletRefresher.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/TabletRefresher.java
index a5cdbe847f..a3d341a12b 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/TabletRefresher.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/TabletRefresher.java
@@ -30,7 +30,6 @@ import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
-import java.util.concurrent.ThreadPoolExecutor;
 import java.util.function.Predicate;
 import java.util.function.Supplier;
 
@@ -46,6 +45,7 @@ import org.apache.accumulo.core.rpc.clients.ThriftClientTypes;
 import org.apache.accumulo.core.tabletserver.thrift.TabletServerClientService;
 import org.apache.accumulo.core.trace.TraceUtil;
 import org.apache.accumulo.core.util.Retry;
+import org.apache.accumulo.manager.Manager;
 import org.apache.accumulo.server.ServerContext;
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
@@ -57,15 +57,10 @@ public class TabletRefresher {
 
   private static final Logger log = 
LoggerFactory.getLogger(TabletRefresher.class);
 
-  public static void refresh(ServerContext context,
-      Supplier<Set<TServerInstance>> onlineTserversSupplier, FateId fateId, 
TableId tableId,
-      byte[] startRow, byte[] endRow, Predicate<TabletMetadata> needsRefresh) {
+  public static void refresh(Manager manager, FateId fateId, TableId tableId, 
byte[] startRow,
+      byte[] endRow, Predicate<TabletMetadata> needsRefresh) {
 
-    // ELASTICITY_TODO should this thread pool be configurable?
-    ThreadPoolExecutor threadPool =
-        context.threadPools().getPoolBuilder("Tablet refresh " + 
fateId).numCoreThreads(10).build();
-
-    try (var tablets = context.getAmple().readTablets().forTable(tableId)
+    try (var tablets = 
manager.getContext().getAmple().readTablets().forTable(tableId)
         .overlapping(startRow, endRow).checkConsistency()
         .fetch(ColumnType.LOADED, ColumnType.LOCATION, 
ColumnType.PREV_ROW).build()) {
 
@@ -84,12 +79,10 @@ public class TabletRefresher {
         var refreshesNeeded = 
batch.stream().collect(groupingBy(TabletMetadata::getLocation,
             mapping(tabletMetadata -> tabletMetadata.getExtent().toThrift(), 
toList())));
 
-        refreshTablets(threadPool, fateId.canonical(), context, 
onlineTserversSupplier,
-            refreshesNeeded);
+        refreshTablets(manager.getTabletRefreshThreadPool(), 
fateId.canonical(),
+            manager.getContext(), () -> manager.onlineTabletServers(), 
refreshesNeeded);
       });
 
-    } finally {
-      threadPool.shutdownNow();
     }
 
   }
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java
index 6167ca05cf..0f224736a0 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java
@@ -329,8 +329,7 @@ class CompactionDriver extends ManagerRepo {
 
     // For any compactions that may have happened before this operation 
failed, attempt to refresh
     // tablets.
-    TabletRefresher.refresh(env.getContext(), env::onlineTabletServers, 
fateId, tableId, startRow,
-        endRow, tabletMetadata -> true);
+    TabletRefresher.refresh(env, fateId, tableId, startRow, endRow, 
tabletMetadata -> true);
   }
 
   /**
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/RefreshTablets.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/RefreshTablets.java
index fd4daf0c4c..f7dc869c9e 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/RefreshTablets.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/RefreshTablets.java
@@ -45,8 +45,7 @@ public class RefreshTablets extends ManagerRepo {
 
   @Override
   public Repo<Manager> call(FateId fateId, Manager manager) throws Exception {
-    TabletRefresher.refresh(manager.getContext(), 
manager::onlineTabletServers, fateId, tableId,
-        startRow, endRow, tabletMetadata -> true);
+    TabletRefresher.refresh(manager, fateId, tableId, startRow, endRow, 
tabletMetadata -> true);
 
     return new CleanUp(tableId, namespaceId, startRow, endRow);
   }

Reply via email to