This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/main by this push:
     new 47b75d3cac Executes conditional mutations in a thread pool (#5184)
47b75d3cac is described below

commit 47b75d3cac6d347d176c664ae83b4eef15026c41
Author: Keith Turner <ktur...@apache.org>
AuthorDate: Sat Dec 14 16:56:47 2024 -0500

    Executes conditional mutations in a thread pool (#5184)
    
    Conditional mutations currently execute in the thrift thread pool which
    can grow unbounded in size.  Having an unbounded number of conditional
    mutations executing concurrently could degrade a tablet servers health
    in some cases.  This change adds threads pools for executing conditional
    updates.  This will help limit the CPU and memory used by executing
    conditional mutations.  Conditional mutations can also contain data as
    part of the mutation that needs to be checked, if this is large that
    could still cause memory issues.  This change more limits the memory
    used by reading current tablet data to check the condition.  Limiting
    the memory used by conditional mutations waiting to execute is something
    that would need to somehow be controlled by thrift.  So this change
    protects memory and CPU resources for conditional mutations that are
    executing but does not protect memory for ones that are waiting to
    execute.  To protect memory for those waiting to execute would need to
    end the current practice of letting the thrift thread pool grow
    unbounded, but that is a long existing problem.
    
    
    Co-authored-by: Christopher L. Shannon <cshan...@apache.org>
---
 .../org/apache/accumulo/core/conf/Property.java    |  9 ++++++
 .../core/util/threads/ThreadPoolNames.java         |  3 ++
 .../accumulo/core/util/threads/ThreadPools.java    | 24 +++++++++++++++
 .../accumulo/tserver/TabletClientHandler.java      | 34 +++++++++++++++-------
 .../tserver/TabletServerResourceManager.java       | 34 ++++++++++++++++++++++
 5 files changed, 93 insertions(+), 11 deletions(-)

diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java 
b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index 122c7f414b..63cbd397ac 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -767,6 +767,15 @@ public enum Property {
       "Resource group name for this TabletServer. Resource groups can be 
defined to dedicate resources "
           + " to specific tables (e.g. balancing tablets for table(s) within a 
group, see TableLoadBalancer).",
       "4.0.0"),
+  
TSERV_CONDITIONAL_UPDATE_THREADS_ROOT("tserver.conditionalupdate.threads.root", 
"16",
+      PropertyType.COUNT, "Numbers of threads for executing conditional 
updates on the root table.",
+      "4.0.0"),
+  
TSERV_CONDITIONAL_UPDATE_THREADS_META("tserver.conditionalupdate.threads.meta", 
"64",
+      PropertyType.COUNT,
+      "Numbers of threads for executing conditional updates on the metadata 
table.", "4.0.0"),
+  
TSERV_CONDITIONAL_UPDATE_THREADS_USER("tserver.conditionalupdate.threads.user", 
"64",
+      PropertyType.COUNT, "Numbers of threads for executing conditional 
updates on user tables.",
+      "4.0.0"),
 
   // accumulo garbage collector properties
   GC_PREFIX("gc.", null, PropertyType.PREFIX,
diff --git 
a/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPoolNames.java 
b/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPoolNames.java
index b57baf0d5d..b360e41b6b 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPoolNames.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPoolNames.java
@@ -62,6 +62,9 @@ public enum ThreadPoolNames {
   TSERVER_TABLET_MIGRATION_POOL("accumulo.pool.tserver.tablet.migration"),
   TSERVER_WAL_CREATOR_POOL("accumulo.pool.tserver.wal.creator"),
   
TSERVER_WAL_SORT_CONCURRENT_POOL("accumulo.pool.tserver.wal.sort.concurrent"),
+  
TSERVER_CONDITIONAL_UPDATE_ROOT_POOL("accumulo.pool.tserver.conditionalupdate.root"),
+  
TSERVER_CONDITIONAL_UPDATE_META_POOL("accumulo.pool.tserver.conditionalupdate.meta"),
+  
TSERVER_CONDITIONAL_UPDATE_USER_POOL("accumulo.pool.tserver.conditionalupdate.user"),
   UTILITY_CHECK_FILE_TASKS("accumulo.pool.util.check.file.tasks"),
   UTILITY_VERIFY_TABLET_ASSIGNMENTS("accumulo.pool.util.check.tablet.servers");
 
diff --git 
a/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java 
b/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java
index 44fd7d64e7..12e2567bdf 100644
--- a/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java
+++ b/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java
@@ -28,6 +28,9 @@ import static 
org.apache.accumulo.core.util.threads.ThreadPoolNames.MANAGER_FATE
 import static 
org.apache.accumulo.core.util.threads.ThreadPoolNames.MANAGER_STATUS_POOL;
 import static 
org.apache.accumulo.core.util.threads.ThreadPoolNames.SCHED_FUTURE_CHECKER_POOL;
 import static 
org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_ASSIGNMENT_POOL;
+import static 
org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_CONDITIONAL_UPDATE_META_POOL;
+import static 
org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_CONDITIONAL_UPDATE_ROOT_POOL;
+import static 
org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_CONDITIONAL_UPDATE_USER_POOL;
 import static 
org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_MIGRATIONS_POOL;
 import static 
org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_MINOR_COMPACTOR_POOL;
 import static 
org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_SUMMARY_PARTITION_POOL;
@@ -343,6 +346,27 @@ public class ThreadPools {
           builder.enableThreadPoolMetrics();
         }
         return builder.build();
+      case TSERV_CONDITIONAL_UPDATE_THREADS_ROOT:
+        builder = getPoolBuilder(TSERVER_CONDITIONAL_UPDATE_ROOT_POOL)
+            .numCoreThreads(conf.getCount(p)).withTimeOut(60L, MILLISECONDS);
+        if (emitThreadPoolMetrics) {
+          builder.enableThreadPoolMetrics();
+        }
+        return builder.build();
+      case TSERV_CONDITIONAL_UPDATE_THREADS_META:
+        builder = getPoolBuilder(TSERVER_CONDITIONAL_UPDATE_META_POOL)
+            .numCoreThreads(conf.getCount(p)).withTimeOut(60L, MILLISECONDS);
+        if (emitThreadPoolMetrics) {
+          builder.enableThreadPoolMetrics();
+        }
+        return builder.build();
+      case TSERV_CONDITIONAL_UPDATE_THREADS_USER:
+        builder = getPoolBuilder(TSERVER_CONDITIONAL_UPDATE_USER_POOL)
+            .numCoreThreads(conf.getCount(p)).withTimeOut(60L, MILLISECONDS);
+        if (emitThreadPoolMetrics) {
+          builder.enableThreadPoolMetrics();
+        }
+        return builder.build();
       case GC_DELETE_THREADS:
         return 
getPoolBuilder(GC_DELETE_POOL).numCoreThreads(conf.getCount(p)).build();
       default:
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java
index 0b5c246647..117d87a96d 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java
@@ -794,19 +794,30 @@ public class TabletClientHandler implements 
TabletServerClientService.Iface,
         }
       }
 
-      ArrayList<TCMResult> results = new ArrayList<>();
-
-      Map<KeyExtent,List<ServerConditionalMutation>> deferred =
-          conditionalUpdate(cs, updates, results, symbols);
+      var lambdaCs = cs;
+      // Conditional updates read data into memory, examine it, and then make 
an update. This can be
+      // CPU, I/O, and memory intensive. Using a thread pool directly limits 
CPU usage and
+      // indirectly limits memory and I/O usage.
+      Future<ArrayList<TCMResult>> future =
+          server.resourceManager.executeConditionalUpdate(cs.tableId, () -> {
+            ArrayList<TCMResult> results = new ArrayList<>();
+
+            Map<KeyExtent,List<ServerConditionalMutation>> deferred =
+                conditionalUpdate(lambdaCs, updates, results, symbols);
+
+            while (!deferred.isEmpty()) {
+              deferred = conditionalUpdate(lambdaCs, deferred, results, 
symbols);
+            }
 
-      while (!deferred.isEmpty()) {
-        deferred = conditionalUpdate(cs, deferred, results, symbols);
-      }
+            return results;
+          });
 
-      return results;
-    } catch (IOException ioe) {
-      throw new TException(ioe);
-    } catch (Exception e) {
+      return future.get();
+    } catch (ExecutionException | InterruptedException e) {
+      log.warn("Exception returned for conditionalUpdate. tableId: {}, opid: 
{}",
+          cs == null ? null : cs.tableId, opid, e);
+      throw new TException(e);
+    } catch (RuntimeException e) {
       log.warn("Exception returned for conditionalUpdate. tableId: {}, opid: 
{}",
           cs == null ? null : cs.tableId, opid, e);
       throw e;
@@ -814,6 +825,7 @@ public class TabletClientHandler implements 
TabletServerClientService.Iface,
       if (opid != null) {
         writeTracker.finishWrite(opid);
       }
+
       if (cs != null) {
         server.sessionManager.unreserveSession(sessID);
       }
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
index ff23188a5a..f09165c450 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
@@ -27,6 +27,9 @@ import static 
org.apache.accumulo.core.util.threads.ThreadPoolNames.ACCUMULO_POO
 import static 
org.apache.accumulo.core.util.threads.ThreadPoolNames.METADATA_TABLET_ASSIGNMENT_POOL;
 import static 
org.apache.accumulo.core.util.threads.ThreadPoolNames.METADATA_TABLET_MIGRATION_POOL;
 import static 
org.apache.accumulo.core.util.threads.ThreadPoolNames.TABLET_ASSIGNMENT_POOL;
+import static 
org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_CONDITIONAL_UPDATE_META_POOL;
+import static 
org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_CONDITIONAL_UPDATE_ROOT_POOL;
+import static 
org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_CONDITIONAL_UPDATE_USER_POOL;
 import static 
org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_MINOR_COMPACTOR_POOL;
 import static 
org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_SUMMARY_FILE_RETRIEVER_POOL;
 import static 
org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_SUMMARY_PARTITION_POOL;
@@ -46,8 +49,10 @@ import java.util.Optional;
 import java.util.OptionalInt;
 import java.util.Queue;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.PriorityBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -62,9 +67,11 @@ import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.AccumuloConfiguration.ScanExecutorConfig;
 import org.apache.accumulo.core.conf.ConfigurationTypeHelper;
 import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.TableId;
 import org.apache.accumulo.core.dataImpl.KeyExtent;
 import 
org.apache.accumulo.core.file.blockfile.cache.impl.BlockCacheManagerFactory;
 import org.apache.accumulo.core.file.blockfile.impl.ScanCacheProvider;
+import org.apache.accumulo.core.metadata.schema.Ample;
 import org.apache.accumulo.core.spi.cache.BlockCache;
 import org.apache.accumulo.core.spi.cache.BlockCacheManager;
 import org.apache.accumulo.core.spi.cache.CacheType;
@@ -118,6 +125,8 @@ public class TabletServerResourceManager {
   private final Map<String,ThreadPoolExecutor> scanExecutors;
   private final Map<String,ScanExecutor> scanExecutorChoices;
 
+  private final Map<Ample.DataLevel,ThreadPoolExecutor> 
conditionalMutationExecutors;
+
   private final ConcurrentHashMap<KeyExtent,RunnableStartedAt> 
activeAssignments;
 
   private final FileManager fileManager;
@@ -385,6 +394,27 @@ public class TabletServerResourceManager {
     memMgmt = new MemoryManagementFramework();
     memMgmt.startThreads();
 
+    var rootConditionalPool = 
ThreadPools.getServerThreadPools().createExecutorService(acuConf,
+        Property.TSERV_CONDITIONAL_UPDATE_THREADS_ROOT, enableMetrics);
+    modifyThreadPoolSizesAtRuntime(
+        () -> 
context.getConfiguration().getCount(Property.TSERV_CONDITIONAL_UPDATE_THREADS_ROOT),
+        TSERVER_CONDITIONAL_UPDATE_ROOT_POOL.poolName, rootConditionalPool);
+
+    var metaConditionalPool = 
ThreadPools.getServerThreadPools().createExecutorService(acuConf,
+        Property.TSERV_CONDITIONAL_UPDATE_THREADS_USER, enableMetrics);
+    modifyThreadPoolSizesAtRuntime(
+        () -> 
context.getConfiguration().getCount(Property.TSERV_CONDITIONAL_UPDATE_THREADS_USER),
+        TSERVER_CONDITIONAL_UPDATE_META_POOL.poolName, metaConditionalPool);
+
+    var userConditionalPool = 
ThreadPools.getServerThreadPools().createExecutorService(acuConf,
+        Property.TSERV_CONDITIONAL_UPDATE_THREADS_USER, enableMetrics);
+    modifyThreadPoolSizesAtRuntime(
+        () -> 
context.getConfiguration().getCount(Property.TSERV_CONDITIONAL_UPDATE_THREADS_USER),
+        TSERVER_CONDITIONAL_UPDATE_USER_POOL.poolName, userConditionalPool);
+
+    conditionalMutationExecutors = Map.of(Ample.DataLevel.ROOT, 
rootConditionalPool,
+        Ample.DataLevel.METADATA, metaConditionalPool, Ample.DataLevel.USER, 
userConditionalPool);
+
     // We can use the same map for both metadata and normal assignments since 
the keyspace (extent)
     // is guaranteed to be unique. Schedule the task once, the task will 
reschedule itself.
     
ThreadPools.watchCriticalScheduledTask(context.getScheduledExecutor().schedule(
@@ -835,6 +865,10 @@ public class TabletServerResourceManager {
     }
   }
 
+  public <T> Future<T> executeConditionalUpdate(TableId tableId, Callable<T> 
updateTask) {
+    return 
conditionalMutationExecutors.get(Ample.DataLevel.of(tableId)).submit(updateTask);
+  }
+
   public BlockCache getIndexCache() {
     return _iCache;
   }

Reply via email to