This is an automated email from the ASF dual-hosted git repository.
kturner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push:
new 47b75d3cac Executes conditional mutations in a thread pool (#5184)
47b75d3cac is described below
commit 47b75d3cac6d347d176c664ae83b4eef15026c41
Author: Keith Turner <[email protected]>
AuthorDate: Sat Dec 14 16:56:47 2024 -0500
Executes conditional mutations in a thread pool (#5184)
Conditional mutations currently execute in the thrift thread pool which
can grow unbounded in size. Having an unbounded number of conditional
mutations executing concurrently could degrade a tablet servers health
in some cases. This change adds threads pools for executing conditional
updates. This will help limit the CPU and memory used by executing
conditional mutations. Conditional mutations can also contain data as
part of the mutation that needs to be checked, if this is large that
could still cause memory issues. This change more limits the memory
used by reading current tablet data to check the condition. Limiting
the memory used by conditional mutations waiting to execute is something
that would need to somehow be controlled by thrift. So this change
protects memory and CPU resources for conditional mutations that are
executing but does not protect memory for ones that are waiting to
execute. To protect memory for those waiting to execute would need to
end the current practice of letting the thrift thread pool grow
unbounded, but that is a long existing problem.
Co-authored-by: Christopher L. Shannon <[email protected]>
---
.../org/apache/accumulo/core/conf/Property.java | 9 ++++++
.../core/util/threads/ThreadPoolNames.java | 3 ++
.../accumulo/core/util/threads/ThreadPools.java | 24 +++++++++++++++
.../accumulo/tserver/TabletClientHandler.java | 34 +++++++++++++++-------
.../tserver/TabletServerResourceManager.java | 34 ++++++++++++++++++++++
5 files changed, 93 insertions(+), 11 deletions(-)
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index 122c7f414b..63cbd397ac 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -767,6 +767,15 @@ public enum Property {
"Resource group name for this TabletServer. Resource groups can be
defined to dedicate resources "
+ " to specific tables (e.g. balancing tablets for table(s) within a
group, see TableLoadBalancer).",
"4.0.0"),
+
TSERV_CONDITIONAL_UPDATE_THREADS_ROOT("tserver.conditionalupdate.threads.root",
"16",
+ PropertyType.COUNT, "Numbers of threads for executing conditional
updates on the root table.",
+ "4.0.0"),
+
TSERV_CONDITIONAL_UPDATE_THREADS_META("tserver.conditionalupdate.threads.meta",
"64",
+ PropertyType.COUNT,
+ "Numbers of threads for executing conditional updates on the metadata
table.", "4.0.0"),
+
TSERV_CONDITIONAL_UPDATE_THREADS_USER("tserver.conditionalupdate.threads.user",
"64",
+ PropertyType.COUNT, "Numbers of threads for executing conditional
updates on user tables.",
+ "4.0.0"),
// accumulo garbage collector properties
GC_PREFIX("gc.", null, PropertyType.PREFIX,
diff --git
a/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPoolNames.java
b/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPoolNames.java
index b57baf0d5d..b360e41b6b 100644
---
a/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPoolNames.java
+++
b/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPoolNames.java
@@ -62,6 +62,9 @@ public enum ThreadPoolNames {
TSERVER_TABLET_MIGRATION_POOL("accumulo.pool.tserver.tablet.migration"),
TSERVER_WAL_CREATOR_POOL("accumulo.pool.tserver.wal.creator"),
TSERVER_WAL_SORT_CONCURRENT_POOL("accumulo.pool.tserver.wal.sort.concurrent"),
+
TSERVER_CONDITIONAL_UPDATE_ROOT_POOL("accumulo.pool.tserver.conditionalupdate.root"),
+
TSERVER_CONDITIONAL_UPDATE_META_POOL("accumulo.pool.tserver.conditionalupdate.meta"),
+
TSERVER_CONDITIONAL_UPDATE_USER_POOL("accumulo.pool.tserver.conditionalupdate.user"),
UTILITY_CHECK_FILE_TASKS("accumulo.pool.util.check.file.tasks"),
UTILITY_VERIFY_TABLET_ASSIGNMENTS("accumulo.pool.util.check.tablet.servers");
diff --git
a/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java
b/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java
index 44fd7d64e7..12e2567bdf 100644
--- a/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java
+++ b/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java
@@ -28,6 +28,9 @@ import static
org.apache.accumulo.core.util.threads.ThreadPoolNames.MANAGER_FATE
import static
org.apache.accumulo.core.util.threads.ThreadPoolNames.MANAGER_STATUS_POOL;
import static
org.apache.accumulo.core.util.threads.ThreadPoolNames.SCHED_FUTURE_CHECKER_POOL;
import static
org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_ASSIGNMENT_POOL;
+import static
org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_CONDITIONAL_UPDATE_META_POOL;
+import static
org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_CONDITIONAL_UPDATE_ROOT_POOL;
+import static
org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_CONDITIONAL_UPDATE_USER_POOL;
import static
org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_MIGRATIONS_POOL;
import static
org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_MINOR_COMPACTOR_POOL;
import static
org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_SUMMARY_PARTITION_POOL;
@@ -343,6 +346,27 @@ public class ThreadPools {
builder.enableThreadPoolMetrics();
}
return builder.build();
+ case TSERV_CONDITIONAL_UPDATE_THREADS_ROOT:
+ builder = getPoolBuilder(TSERVER_CONDITIONAL_UPDATE_ROOT_POOL)
+ .numCoreThreads(conf.getCount(p)).withTimeOut(60L, MILLISECONDS);
+ if (emitThreadPoolMetrics) {
+ builder.enableThreadPoolMetrics();
+ }
+ return builder.build();
+ case TSERV_CONDITIONAL_UPDATE_THREADS_META:
+ builder = getPoolBuilder(TSERVER_CONDITIONAL_UPDATE_META_POOL)
+ .numCoreThreads(conf.getCount(p)).withTimeOut(60L, MILLISECONDS);
+ if (emitThreadPoolMetrics) {
+ builder.enableThreadPoolMetrics();
+ }
+ return builder.build();
+ case TSERV_CONDITIONAL_UPDATE_THREADS_USER:
+ builder = getPoolBuilder(TSERVER_CONDITIONAL_UPDATE_USER_POOL)
+ .numCoreThreads(conf.getCount(p)).withTimeOut(60L, MILLISECONDS);
+ if (emitThreadPoolMetrics) {
+ builder.enableThreadPoolMetrics();
+ }
+ return builder.build();
case GC_DELETE_THREADS:
return
getPoolBuilder(GC_DELETE_POOL).numCoreThreads(conf.getCount(p)).build();
default:
diff --git
a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java
index 0b5c246647..117d87a96d 100644
---
a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java
+++
b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java
@@ -794,19 +794,30 @@ public class TabletClientHandler implements
TabletServerClientService.Iface,
}
}
- ArrayList<TCMResult> results = new ArrayList<>();
-
- Map<KeyExtent,List<ServerConditionalMutation>> deferred =
- conditionalUpdate(cs, updates, results, symbols);
+ var lambdaCs = cs;
+ // Conditional updates read data into memory, examine it, and then make
an update. This can be
+ // CPU, I/O, and memory intensive. Using a thread pool directly limits
CPU usage and
+ // indirectly limits memory and I/O usage.
+ Future<ArrayList<TCMResult>> future =
+ server.resourceManager.executeConditionalUpdate(cs.tableId, () -> {
+ ArrayList<TCMResult> results = new ArrayList<>();
+
+ Map<KeyExtent,List<ServerConditionalMutation>> deferred =
+ conditionalUpdate(lambdaCs, updates, results, symbols);
+
+ while (!deferred.isEmpty()) {
+ deferred = conditionalUpdate(lambdaCs, deferred, results,
symbols);
+ }
- while (!deferred.isEmpty()) {
- deferred = conditionalUpdate(cs, deferred, results, symbols);
- }
+ return results;
+ });
- return results;
- } catch (IOException ioe) {
- throw new TException(ioe);
- } catch (Exception e) {
+ return future.get();
+ } catch (ExecutionException | InterruptedException e) {
+ log.warn("Exception returned for conditionalUpdate. tableId: {}, opid:
{}",
+ cs == null ? null : cs.tableId, opid, e);
+ throw new TException(e);
+ } catch (RuntimeException e) {
log.warn("Exception returned for conditionalUpdate. tableId: {}, opid:
{}",
cs == null ? null : cs.tableId, opid, e);
throw e;
@@ -814,6 +825,7 @@ public class TabletClientHandler implements
TabletServerClientService.Iface,
if (opid != null) {
writeTracker.finishWrite(opid);
}
+
if (cs != null) {
server.sessionManager.unreserveSession(sessID);
}
diff --git
a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
index ff23188a5a..f09165c450 100644
---
a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
+++
b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
@@ -27,6 +27,9 @@ import static
org.apache.accumulo.core.util.threads.ThreadPoolNames.ACCUMULO_POO
import static
org.apache.accumulo.core.util.threads.ThreadPoolNames.METADATA_TABLET_ASSIGNMENT_POOL;
import static
org.apache.accumulo.core.util.threads.ThreadPoolNames.METADATA_TABLET_MIGRATION_POOL;
import static
org.apache.accumulo.core.util.threads.ThreadPoolNames.TABLET_ASSIGNMENT_POOL;
+import static
org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_CONDITIONAL_UPDATE_META_POOL;
+import static
org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_CONDITIONAL_UPDATE_ROOT_POOL;
+import static
org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_CONDITIONAL_UPDATE_USER_POOL;
import static
org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_MINOR_COMPACTOR_POOL;
import static
org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_SUMMARY_FILE_RETRIEVER_POOL;
import static
org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_SUMMARY_PARTITION_POOL;
@@ -46,8 +49,10 @@ import java.util.Optional;
import java.util.OptionalInt;
import java.util.Queue;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
@@ -62,9 +67,11 @@ import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.AccumuloConfiguration.ScanExecutorConfig;
import org.apache.accumulo.core.conf.ConfigurationTypeHelper;
import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import
org.apache.accumulo.core.file.blockfile.cache.impl.BlockCacheManagerFactory;
import org.apache.accumulo.core.file.blockfile.impl.ScanCacheProvider;
+import org.apache.accumulo.core.metadata.schema.Ample;
import org.apache.accumulo.core.spi.cache.BlockCache;
import org.apache.accumulo.core.spi.cache.BlockCacheManager;
import org.apache.accumulo.core.spi.cache.CacheType;
@@ -118,6 +125,8 @@ public class TabletServerResourceManager {
private final Map<String,ThreadPoolExecutor> scanExecutors;
private final Map<String,ScanExecutor> scanExecutorChoices;
+ private final Map<Ample.DataLevel,ThreadPoolExecutor>
conditionalMutationExecutors;
+
private final ConcurrentHashMap<KeyExtent,RunnableStartedAt>
activeAssignments;
private final FileManager fileManager;
@@ -385,6 +394,27 @@ public class TabletServerResourceManager {
memMgmt = new MemoryManagementFramework();
memMgmt.startThreads();
+ var rootConditionalPool =
ThreadPools.getServerThreadPools().createExecutorService(acuConf,
+ Property.TSERV_CONDITIONAL_UPDATE_THREADS_ROOT, enableMetrics);
+ modifyThreadPoolSizesAtRuntime(
+ () ->
context.getConfiguration().getCount(Property.TSERV_CONDITIONAL_UPDATE_THREADS_ROOT),
+ TSERVER_CONDITIONAL_UPDATE_ROOT_POOL.poolName, rootConditionalPool);
+
+ var metaConditionalPool =
ThreadPools.getServerThreadPools().createExecutorService(acuConf,
+ Property.TSERV_CONDITIONAL_UPDATE_THREADS_USER, enableMetrics);
+ modifyThreadPoolSizesAtRuntime(
+ () ->
context.getConfiguration().getCount(Property.TSERV_CONDITIONAL_UPDATE_THREADS_USER),
+ TSERVER_CONDITIONAL_UPDATE_META_POOL.poolName, metaConditionalPool);
+
+ var userConditionalPool =
ThreadPools.getServerThreadPools().createExecutorService(acuConf,
+ Property.TSERV_CONDITIONAL_UPDATE_THREADS_USER, enableMetrics);
+ modifyThreadPoolSizesAtRuntime(
+ () ->
context.getConfiguration().getCount(Property.TSERV_CONDITIONAL_UPDATE_THREADS_USER),
+ TSERVER_CONDITIONAL_UPDATE_USER_POOL.poolName, userConditionalPool);
+
+ conditionalMutationExecutors = Map.of(Ample.DataLevel.ROOT,
rootConditionalPool,
+ Ample.DataLevel.METADATA, metaConditionalPool, Ample.DataLevel.USER,
userConditionalPool);
+
// We can use the same map for both metadata and normal assignments since
the keyspace (extent)
// is guaranteed to be unique. Schedule the task once, the task will
reschedule itself.
ThreadPools.watchCriticalScheduledTask(context.getScheduledExecutor().schedule(
@@ -835,6 +865,10 @@ public class TabletServerResourceManager {
}
}
+ public <T> Future<T> executeConditionalUpdate(TableId tableId, Callable<T>
updateTask) {
+ return
conditionalMutationExecutors.get(Ample.DataLevel.of(tableId)).submit(updateTask);
+ }
+
public BlockCache getIndexCache() {
return _iCache;
}