This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push: new 47b75d3cac Executes conditional mutations in a thread pool (#5184) 47b75d3cac is described below commit 47b75d3cac6d347d176c664ae83b4eef15026c41 Author: Keith Turner <ktur...@apache.org> AuthorDate: Sat Dec 14 16:56:47 2024 -0500 Executes conditional mutations in a thread pool (#5184) Conditional mutations currently execute in the thrift thread pool which can grow unbounded in size. Having an unbounded number of conditional mutations executing concurrently could degrade a tablet servers health in some cases. This change adds threads pools for executing conditional updates. This will help limit the CPU and memory used by executing conditional mutations. Conditional mutations can also contain data as part of the mutation that needs to be checked, if this is large that could still cause memory issues. This change more limits the memory used by reading current tablet data to check the condition. Limiting the memory used by conditional mutations waiting to execute is something that would need to somehow be controlled by thrift. So this change protects memory and CPU resources for conditional mutations that are executing but does not protect memory for ones that are waiting to execute. To protect memory for those waiting to execute would need to end the current practice of letting the thrift thread pool grow unbounded, but that is a long existing problem. Co-authored-by: Christopher L. Shannon <cshan...@apache.org> --- .../org/apache/accumulo/core/conf/Property.java | 9 ++++++ .../core/util/threads/ThreadPoolNames.java | 3 ++ .../accumulo/core/util/threads/ThreadPools.java | 24 +++++++++++++++ .../accumulo/tserver/TabletClientHandler.java | 34 +++++++++++++++------- .../tserver/TabletServerResourceManager.java | 34 ++++++++++++++++++++++ 5 files changed, 93 insertions(+), 11 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java index 122c7f414b..63cbd397ac 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java @@ -767,6 +767,15 @@ public enum Property { "Resource group name for this TabletServer. Resource groups can be defined to dedicate resources " + " to specific tables (e.g. balancing tablets for table(s) within a group, see TableLoadBalancer).", "4.0.0"), + TSERV_CONDITIONAL_UPDATE_THREADS_ROOT("tserver.conditionalupdate.threads.root", "16", + PropertyType.COUNT, "Numbers of threads for executing conditional updates on the root table.", + "4.0.0"), + TSERV_CONDITIONAL_UPDATE_THREADS_META("tserver.conditionalupdate.threads.meta", "64", + PropertyType.COUNT, + "Numbers of threads for executing conditional updates on the metadata table.", "4.0.0"), + TSERV_CONDITIONAL_UPDATE_THREADS_USER("tserver.conditionalupdate.threads.user", "64", + PropertyType.COUNT, "Numbers of threads for executing conditional updates on user tables.", + "4.0.0"), // accumulo garbage collector properties GC_PREFIX("gc.", null, PropertyType.PREFIX, diff --git a/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPoolNames.java b/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPoolNames.java index b57baf0d5d..b360e41b6b 100644 --- a/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPoolNames.java +++ b/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPoolNames.java @@ -62,6 +62,9 @@ public enum ThreadPoolNames { TSERVER_TABLET_MIGRATION_POOL("accumulo.pool.tserver.tablet.migration"), TSERVER_WAL_CREATOR_POOL("accumulo.pool.tserver.wal.creator"), TSERVER_WAL_SORT_CONCURRENT_POOL("accumulo.pool.tserver.wal.sort.concurrent"), + TSERVER_CONDITIONAL_UPDATE_ROOT_POOL("accumulo.pool.tserver.conditionalupdate.root"), + TSERVER_CONDITIONAL_UPDATE_META_POOL("accumulo.pool.tserver.conditionalupdate.meta"), + TSERVER_CONDITIONAL_UPDATE_USER_POOL("accumulo.pool.tserver.conditionalupdate.user"), UTILITY_CHECK_FILE_TASKS("accumulo.pool.util.check.file.tasks"), UTILITY_VERIFY_TABLET_ASSIGNMENTS("accumulo.pool.util.check.tablet.servers"); diff --git a/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java b/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java index 44fd7d64e7..12e2567bdf 100644 --- a/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java +++ b/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java @@ -28,6 +28,9 @@ import static org.apache.accumulo.core.util.threads.ThreadPoolNames.MANAGER_FATE import static org.apache.accumulo.core.util.threads.ThreadPoolNames.MANAGER_STATUS_POOL; import static org.apache.accumulo.core.util.threads.ThreadPoolNames.SCHED_FUTURE_CHECKER_POOL; import static org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_ASSIGNMENT_POOL; +import static org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_CONDITIONAL_UPDATE_META_POOL; +import static org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_CONDITIONAL_UPDATE_ROOT_POOL; +import static org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_CONDITIONAL_UPDATE_USER_POOL; import static org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_MIGRATIONS_POOL; import static org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_MINOR_COMPACTOR_POOL; import static org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_SUMMARY_PARTITION_POOL; @@ -343,6 +346,27 @@ public class ThreadPools { builder.enableThreadPoolMetrics(); } return builder.build(); + case TSERV_CONDITIONAL_UPDATE_THREADS_ROOT: + builder = getPoolBuilder(TSERVER_CONDITIONAL_UPDATE_ROOT_POOL) + .numCoreThreads(conf.getCount(p)).withTimeOut(60L, MILLISECONDS); + if (emitThreadPoolMetrics) { + builder.enableThreadPoolMetrics(); + } + return builder.build(); + case TSERV_CONDITIONAL_UPDATE_THREADS_META: + builder = getPoolBuilder(TSERVER_CONDITIONAL_UPDATE_META_POOL) + .numCoreThreads(conf.getCount(p)).withTimeOut(60L, MILLISECONDS); + if (emitThreadPoolMetrics) { + builder.enableThreadPoolMetrics(); + } + return builder.build(); + case TSERV_CONDITIONAL_UPDATE_THREADS_USER: + builder = getPoolBuilder(TSERVER_CONDITIONAL_UPDATE_USER_POOL) + .numCoreThreads(conf.getCount(p)).withTimeOut(60L, MILLISECONDS); + if (emitThreadPoolMetrics) { + builder.enableThreadPoolMetrics(); + } + return builder.build(); case GC_DELETE_THREADS: return getPoolBuilder(GC_DELETE_POOL).numCoreThreads(conf.getCount(p)).build(); default: diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java index 0b5c246647..117d87a96d 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java @@ -794,19 +794,30 @@ public class TabletClientHandler implements TabletServerClientService.Iface, } } - ArrayList<TCMResult> results = new ArrayList<>(); - - Map<KeyExtent,List<ServerConditionalMutation>> deferred = - conditionalUpdate(cs, updates, results, symbols); + var lambdaCs = cs; + // Conditional updates read data into memory, examine it, and then make an update. This can be + // CPU, I/O, and memory intensive. Using a thread pool directly limits CPU usage and + // indirectly limits memory and I/O usage. + Future<ArrayList<TCMResult>> future = + server.resourceManager.executeConditionalUpdate(cs.tableId, () -> { + ArrayList<TCMResult> results = new ArrayList<>(); + + Map<KeyExtent,List<ServerConditionalMutation>> deferred = + conditionalUpdate(lambdaCs, updates, results, symbols); + + while (!deferred.isEmpty()) { + deferred = conditionalUpdate(lambdaCs, deferred, results, symbols); + } - while (!deferred.isEmpty()) { - deferred = conditionalUpdate(cs, deferred, results, symbols); - } + return results; + }); - return results; - } catch (IOException ioe) { - throw new TException(ioe); - } catch (Exception e) { + return future.get(); + } catch (ExecutionException | InterruptedException e) { + log.warn("Exception returned for conditionalUpdate. tableId: {}, opid: {}", + cs == null ? null : cs.tableId, opid, e); + throw new TException(e); + } catch (RuntimeException e) { log.warn("Exception returned for conditionalUpdate. tableId: {}, opid: {}", cs == null ? null : cs.tableId, opid, e); throw e; @@ -814,6 +825,7 @@ public class TabletClientHandler implements TabletServerClientService.Iface, if (opid != null) { writeTracker.finishWrite(opid); } + if (cs != null) { server.sessionManager.unreserveSession(sessID); } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java index ff23188a5a..f09165c450 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java @@ -27,6 +27,9 @@ import static org.apache.accumulo.core.util.threads.ThreadPoolNames.ACCUMULO_POO import static org.apache.accumulo.core.util.threads.ThreadPoolNames.METADATA_TABLET_ASSIGNMENT_POOL; import static org.apache.accumulo.core.util.threads.ThreadPoolNames.METADATA_TABLET_MIGRATION_POOL; import static org.apache.accumulo.core.util.threads.ThreadPoolNames.TABLET_ASSIGNMENT_POOL; +import static org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_CONDITIONAL_UPDATE_META_POOL; +import static org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_CONDITIONAL_UPDATE_ROOT_POOL; +import static org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_CONDITIONAL_UPDATE_USER_POOL; import static org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_MINOR_COMPACTOR_POOL; import static org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_SUMMARY_FILE_RETRIEVER_POOL; import static org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_SUMMARY_PARTITION_POOL; @@ -46,8 +49,10 @@ import java.util.Optional; import java.util.OptionalInt; import java.util.Queue; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; @@ -62,9 +67,11 @@ import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.AccumuloConfiguration.ScanExecutorConfig; import org.apache.accumulo.core.conf.ConfigurationTypeHelper; import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.file.blockfile.cache.impl.BlockCacheManagerFactory; import org.apache.accumulo.core.file.blockfile.impl.ScanCacheProvider; +import org.apache.accumulo.core.metadata.schema.Ample; import org.apache.accumulo.core.spi.cache.BlockCache; import org.apache.accumulo.core.spi.cache.BlockCacheManager; import org.apache.accumulo.core.spi.cache.CacheType; @@ -118,6 +125,8 @@ public class TabletServerResourceManager { private final Map<String,ThreadPoolExecutor> scanExecutors; private final Map<String,ScanExecutor> scanExecutorChoices; + private final Map<Ample.DataLevel,ThreadPoolExecutor> conditionalMutationExecutors; + private final ConcurrentHashMap<KeyExtent,RunnableStartedAt> activeAssignments; private final FileManager fileManager; @@ -385,6 +394,27 @@ public class TabletServerResourceManager { memMgmt = new MemoryManagementFramework(); memMgmt.startThreads(); + var rootConditionalPool = ThreadPools.getServerThreadPools().createExecutorService(acuConf, + Property.TSERV_CONDITIONAL_UPDATE_THREADS_ROOT, enableMetrics); + modifyThreadPoolSizesAtRuntime( + () -> context.getConfiguration().getCount(Property.TSERV_CONDITIONAL_UPDATE_THREADS_ROOT), + TSERVER_CONDITIONAL_UPDATE_ROOT_POOL.poolName, rootConditionalPool); + + var metaConditionalPool = ThreadPools.getServerThreadPools().createExecutorService(acuConf, + Property.TSERV_CONDITIONAL_UPDATE_THREADS_USER, enableMetrics); + modifyThreadPoolSizesAtRuntime( + () -> context.getConfiguration().getCount(Property.TSERV_CONDITIONAL_UPDATE_THREADS_USER), + TSERVER_CONDITIONAL_UPDATE_META_POOL.poolName, metaConditionalPool); + + var userConditionalPool = ThreadPools.getServerThreadPools().createExecutorService(acuConf, + Property.TSERV_CONDITIONAL_UPDATE_THREADS_USER, enableMetrics); + modifyThreadPoolSizesAtRuntime( + () -> context.getConfiguration().getCount(Property.TSERV_CONDITIONAL_UPDATE_THREADS_USER), + TSERVER_CONDITIONAL_UPDATE_USER_POOL.poolName, userConditionalPool); + + conditionalMutationExecutors = Map.of(Ample.DataLevel.ROOT, rootConditionalPool, + Ample.DataLevel.METADATA, metaConditionalPool, Ample.DataLevel.USER, userConditionalPool); + // We can use the same map for both metadata and normal assignments since the keyspace (extent) // is guaranteed to be unique. Schedule the task once, the task will reschedule itself. ThreadPools.watchCriticalScheduledTask(context.getScheduledExecutor().schedule( @@ -835,6 +865,10 @@ public class TabletServerResourceManager { } } + public <T> Future<T> executeConditionalUpdate(TableId tableId, Callable<T> updateTask) { + return conditionalMutationExecutors.get(Ample.DataLevel.of(tableId)).submit(updateTask); + } + public BlockCache getIndexCache() { return _iCache; }