This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/main by this push:
     new 632dbc2ed0 limits memory and cpu used by compaction reservation 
request (#5185)
632dbc2ed0 is described below

commit 632dbc2ed0d7bfed30a2f9da7053257b07720962
Author: Keith Turner <ktur...@apache.org>
AuthorDate: Sun Dec 15 14:27:19 2024 -0500

    limits memory and cpu used by compaction reservation request (#5185)
    
    Added threads pools to execute compaction reservation request in order
    to limit memory and cpu used by executing reservations.  Request queued up
    for the pool could still potentially use a lot of memory.  Did two
    things to control memory of things in the queue.  First only allow a
    compactor process to have one reservation processing at time.  Second
    made the data related to a resevation request a soft reference which
    should allow it be garbage collected if memory gets low while it sitting
    in the queue.  Once the request starts executing it obtains a strong
    refrence to the data so it can no longer be garbage collected.
    
    fixes #5177
---
 .../org/apache/accumulo/core/conf/Property.java    |  12 ++
 .../core/util/threads/ThreadPoolNames.java         |   3 +
 .../accumulo/core/util/threads/ThreadPools.java    |  24 +++
 .../coordinator/CompactionCoordinator.java         | 211 +++++++++++++++------
 4 files changed, 190 insertions(+), 60 deletions(-)

diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java 
b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index 63cbd397ac..77fa9c461a 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -1189,6 +1189,18 @@ public enum Property {
   COMPACTION_COORDINATOR_PREFIX("compaction.coordinator.", null, 
PropertyType.PREFIX,
       "Properties in this category affect the behavior of the accumulo 
compaction coordinator server.",
       "2.1.0"),
+  
COMPACTION_COORDINATOR_RESERVATION_THREADS_ROOT("compaction.coordinator.reservation.threads.root",
+      "1", PropertyType.COUNT,
+      "The number of threads used to reserve files for compaction in a tablet 
for the root tablet.",
+      "4.0.0"),
+  
COMPACTION_COORDINATOR_RESERVATION_THREADS_META("compaction.coordinator.reservation.threads.meta",
+      "1", PropertyType.COUNT,
+      "The number of threads used to reserve files for compaction in a tablet 
for accumulo.metadata tablets.",
+      "4.0.0"),
+  
COMPACTION_COORDINATOR_RESERVATION_THREADS_USER("compaction.coordinator.reservation.threads.user",
+      "64", PropertyType.COUNT,
+      "The number of threads used to reserve files for compaction in a tablet 
for user tables.",
+      "4.0.0"),
   @Experimental
   COMPACTION_COORDINATOR_DEAD_COMPACTOR_CHECK_INTERVAL(
       "compaction.coordinator.compactor.dead.check.interval", "5m", 
PropertyType.TIMEDURATION,
diff --git 
a/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPoolNames.java 
b/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPoolNames.java
index b360e41b6b..6c025a6815 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPoolNames.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPoolNames.java
@@ -34,6 +34,9 @@ public enum ThreadPoolNames {
   
CONDITIONAL_WRITER_CLEANUP_POOL("accumulo.pool.client.context.conditional.writer.cleanup"),
   
COORDINATOR_FINALIZER_BACKGROUND_POOL("accumulo.pool.compaction.finalizer.background.pool"),
   
COORDINATOR_FINALIZER_NOTIFIER_POOL("accumulo.pool.compaction.coordinator.compaction.finalizer"),
+  
COORDINATOR_RESERVATION_ROOT_POOL("accumulo.pool.compaction.coordinator.reservation.root"),
+  
COORDINATOR_RESERVATION_META_POOL("accumulo.pool.compaction.coordinator.reservation.meta"),
+  
COORDINATOR_RESERVATION_USER_POOL("accumulo.pool.compaction.coordinator.reservation.user"),
   GC_DELETE_POOL("accumulo.pool.gc.threads.delete"),
   GENERAL_SERVER_POOL("accumulo.pool.general.server"),
   SERVICE_LOCK_POOL("accumulo.pool.service.lock"),
diff --git 
a/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java 
b/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java
index 12e2567bdf..08d1d82989 100644
--- a/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java
+++ b/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java
@@ -22,6 +22,9 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static java.util.concurrent.TimeUnit.MINUTES;
 import static java.util.concurrent.TimeUnit.SECONDS;
 import static 
org.apache.accumulo.core.util.threads.ThreadPoolNames.ACCUMULO_POOL_PREFIX;
+import static 
org.apache.accumulo.core.util.threads.ThreadPoolNames.COORDINATOR_RESERVATION_META_POOL;
+import static 
org.apache.accumulo.core.util.threads.ThreadPoolNames.COORDINATOR_RESERVATION_ROOT_POOL;
+import static 
org.apache.accumulo.core.util.threads.ThreadPoolNames.COORDINATOR_RESERVATION_USER_POOL;
 import static 
org.apache.accumulo.core.util.threads.ThreadPoolNames.GC_DELETE_POOL;
 import static 
org.apache.accumulo.core.util.threads.ThreadPoolNames.GENERAL_SERVER_POOL;
 import static 
org.apache.accumulo.core.util.threads.ThreadPoolNames.MANAGER_FATE_POOL;
@@ -369,6 +372,27 @@ public class ThreadPools {
         return builder.build();
       case GC_DELETE_THREADS:
         return 
getPoolBuilder(GC_DELETE_POOL).numCoreThreads(conf.getCount(p)).build();
+      case COMPACTION_COORDINATOR_RESERVATION_THREADS_ROOT:
+        builder = 
getPoolBuilder(COORDINATOR_RESERVATION_ROOT_POOL).numCoreThreads(conf.getCount(p))
+            .withTimeOut(60L, MILLISECONDS);
+        if (emitThreadPoolMetrics) {
+          builder.enableThreadPoolMetrics();
+        }
+        return builder.build();
+      case COMPACTION_COORDINATOR_RESERVATION_THREADS_META:
+        builder = 
getPoolBuilder(COORDINATOR_RESERVATION_META_POOL).numCoreThreads(conf.getCount(p))
+            .withTimeOut(60L, MILLISECONDS);
+        if (emitThreadPoolMetrics) {
+          builder.enableThreadPoolMetrics();
+        }
+        return builder.build();
+      case COMPACTION_COORDINATOR_RESERVATION_THREADS_USER:
+        builder = 
getPoolBuilder(COORDINATOR_RESERVATION_USER_POOL).numCoreThreads(conf.getCount(p))
+            .withTimeOut(60L, MILLISECONDS);
+        if (emitThreadPoolMetrics) {
+          builder.enableThreadPoolMetrics();
+        }
+        return builder.build();
       default:
         throw new IllegalArgumentException("Unhandled thread pool property: " 
+ p);
     }
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
index 2640ed4bcc..6f2eca3756 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
@@ -31,6 +31,7 @@ import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.UncheckedIOException;
+import java.lang.ref.SoftReference;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -43,13 +44,17 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
 import org.apache.accumulo.core.Constants;
@@ -193,6 +198,9 @@ public class CompactionCoordinator
 
   private volatile long coordinatorStartTime;
 
+  private final Map<DataLevel,ThreadPoolExecutor> reservationPools;
+  private final Set<String> activeCompactorReservationRequest = 
ConcurrentHashMap.newKeySet();
+
   public CompactionCoordinator(ServerContext ctx, SecurityOperation security,
       AtomicReference<Map<FateInstanceType,Fate<Manager>>> fateInstances, 
Manager manager) {
     this.ctx = ctx;
@@ -232,6 +240,18 @@ public class CompactionCoordinator
     deadCompactionDetector =
         new DeadCompactionDetector(this.ctx, this, schedExecutor, 
fateInstances);
 
+    var rootReservationPool = 
ThreadPools.getServerThreadPools().createExecutorService(
+        ctx.getConfiguration(), 
Property.COMPACTION_COORDINATOR_RESERVATION_THREADS_ROOT, true);
+
+    var metaReservationPool = 
ThreadPools.getServerThreadPools().createExecutorService(
+        ctx.getConfiguration(), 
Property.COMPACTION_COORDINATOR_RESERVATION_THREADS_META, true);
+
+    var userReservationPool = 
ThreadPools.getServerThreadPools().createExecutorService(
+        ctx.getConfiguration(), 
Property.COMPACTION_COORDINATOR_RESERVATION_THREADS_USER, true);
+
+    reservationPools = Map.of(Ample.DataLevel.ROOT, rootReservationPool, 
Ample.DataLevel.METADATA,
+        metaReservationPool, Ample.DataLevel.USER, userReservationPool);
+
     compactorCounts = 
ctx.getCaches().createNewBuilder(CacheName.COMPACTOR_COUNTS, false)
         .expireAfterWrite(30, TimeUnit.SECONDS).build(this::countCompactors);
     // At this point the manager does not have its lock so no actions should 
be taken yet
@@ -250,6 +270,9 @@ public class CompactionCoordinator
 
   public void shutdown() {
     shutdown.countDown();
+
+    reservationPools.values().forEach(ExecutorService::shutdownNow);
+
     var localThread = serviceThread;
     if (localThread != null) {
       try {
@@ -528,82 +551,142 @@ public class CompactionCoordinator
 
   }
 
-  protected CompactionMetadata reserveCompaction(CompactionJobQueues.MetaJob 
metaJob,
-      String compactorAddress, ExternalCompactionId externalCompactionId) {
+  private class ReserveCompactionTask implements Supplier<CompactionMetadata> {
+
+    // Use a soft reference for this in case free memory gets low while this 
is sitting in the queue
+    // waiting to process. This object can contain the tablets list of files 
and if there are lots
+    // of tablet with lots of files then that could start to cause memory 
problems. This hack could
+    // be removed if #5188 were implemented.
+    private final SoftReference<CompactionJobQueues.MetaJob> metaJobRef;
+    private final String compactorAddress;
+    private final ExternalCompactionId externalCompactionId;
+
+    private ReserveCompactionTask(CompactionJobQueues.MetaJob metaJob, String 
compactorAddress,
+        ExternalCompactionId externalCompactionId) {
+      Preconditions.checkArgument(metaJob.getJob().getKind() == 
CompactionKind.SYSTEM
+          || metaJob.getJob().getKind() == CompactionKind.USER);
+      this.metaJobRef = new SoftReference<>(Objects.requireNonNull(metaJob));
+      this.compactorAddress = Objects.requireNonNull(compactorAddress);
+      this.externalCompactionId = Objects.requireNonNull(externalCompactionId);
+      
Preconditions.checkState(activeCompactorReservationRequest.add(compactorAddress),
+          "compactor %s already on has a reservation in flight, cannot process 
%s",
+          compactorAddress, externalCompactionId);
+    }
 
-    Preconditions.checkArgument(metaJob.getJob().getKind() == 
CompactionKind.SYSTEM
-        || metaJob.getJob().getKind() == CompactionKind.USER);
+    @Override
+    public CompactionMetadata get() {
+      try {
+        var metaJob = metaJobRef.get();
+        if (metaJob == null) {
+          LOG.warn("Compaction reservation request for {} {} was garbage 
collected.",
+              compactorAddress, externalCompactionId);
+          return null;
+        }
 
-    var tabletMetadata = metaJob.getTabletMetadata();
+        var tabletMetadata = metaJob.getTabletMetadata();
 
-    var jobFiles = 
metaJob.getJob().getFiles().stream().map(CompactableFileImpl::toStoredTabletFile)
-        .collect(Collectors.toSet());
+        var jobFiles = metaJob.getJob().getFiles().stream()
+            
.map(CompactableFileImpl::toStoredTabletFile).collect(Collectors.toSet());
 
-    Retry retry = 
Retry.builder().maxRetries(5).retryAfter(Duration.ofMillis(100))
-        
.incrementBy(Duration.ofMillis(100)).maxWait(Duration.ofSeconds(10)).backOffFactor(1.5)
-        .logInterval(Duration.ofMinutes(3)).createRetry();
+        Retry retry = 
Retry.builder().maxRetries(5).retryAfter(Duration.ofMillis(100))
+            
.incrementBy(Duration.ofMillis(100)).maxWait(Duration.ofSeconds(10)).backOffFactor(1.5)
+            .logInterval(Duration.ofMinutes(3)).createRetry();
 
-    while (retry.canRetry()) {
-      try (var tabletsMutator = ctx.getAmple().conditionallyMutateTablets()) {
-        var extent = metaJob.getTabletMetadata().getExtent();
+        while (retry.canRetry()) {
+          try (var tabletsMutator = 
ctx.getAmple().conditionallyMutateTablets()) {
+            var extent = metaJob.getTabletMetadata().getExtent();
 
-        if (!canReserveCompaction(tabletMetadata, metaJob.getJob().getKind(), 
jobFiles, ctx,
-            manager.getSteadyTime())) {
-          return null;
-        }
+            if (!canReserveCompaction(tabletMetadata, 
metaJob.getJob().getKind(), jobFiles, ctx,
+                manager.getSteadyTime())) {
+              return null;
+            }
 
-        var ecm = createExternalCompactionMetadata(metaJob.getJob(), jobFiles, 
tabletMetadata,
-            compactorAddress, externalCompactionId);
-
-        // any data that is read from the tablet to make a decision about if 
it can compact or not
-        // must be checked for changes in the conditional mutation.
-        var tabletMutator = 
tabletsMutator.mutateTablet(extent).requireAbsentOperation()
-            .requireFiles(jobFiles).requireNotCompacting(jobFiles);
-        if (metaJob.getJob().getKind() == CompactionKind.SYSTEM) {
-          // For system compactions the user compaction requested column is 
examined when deciding
-          // if a compaction can start so need to check for changes to this 
column.
-          tabletMutator.requireSame(tabletMetadata, SELECTED, 
USER_COMPACTION_REQUESTED);
-        } else {
-          tabletMutator.requireSame(tabletMetadata, SELECTED);
-        }
+            var ecm = createExternalCompactionMetadata(metaJob.getJob(), 
jobFiles, tabletMetadata,
+                compactorAddress, externalCompactionId);
+
+            // any data that is read from the tablet to make a decision about 
if it can compact or
+            // not
+            // must be checked for changes in the conditional mutation.
+            var tabletMutator = 
tabletsMutator.mutateTablet(extent).requireAbsentOperation()
+                .requireFiles(jobFiles).requireNotCompacting(jobFiles);
+            if (metaJob.getJob().getKind() == CompactionKind.SYSTEM) {
+              // For system compactions the user compaction requested column 
is examined when
+              // deciding
+              // if a compaction can start so need to check for changes to 
this column.
+              tabletMutator.requireSame(tabletMetadata, SELECTED, 
USER_COMPACTION_REQUESTED);
+            } else {
+              tabletMutator.requireSame(tabletMetadata, SELECTED);
+            }
 
-        if (metaJob.getJob().getKind() == CompactionKind.SYSTEM) {
-          var selectedFiles = tabletMetadata.getSelectedFiles();
-          var reserved = getFilesReservedBySelection(tabletMetadata, 
manager.getSteadyTime(), ctx);
-
-          // If there is a selectedFiles column, and the reserved set is empty 
this means that
-          // either no user jobs were completed yet or the selection 
expiration time has passed
-          // so the column is eligible to be deleted so a system job can run 
instead
-          if (selectedFiles != null && reserved.isEmpty()
-              && !Collections.disjoint(jobFiles, selectedFiles.getFiles())) {
-            LOG.debug("Deleting user compaction selected files for {} {}", 
extent,
-                externalCompactionId);
-            tabletMutator.deleteSelectedFiles();
-          }
-        }
+            if (metaJob.getJob().getKind() == CompactionKind.SYSTEM) {
+              var selectedFiles = tabletMetadata.getSelectedFiles();
+              var reserved =
+                  getFilesReservedBySelection(tabletMetadata, 
manager.getSteadyTime(), ctx);
+
+              // If there is a selectedFiles column, and the reserved set is 
empty this means that
+              // either no user jobs were completed yet or the selection 
expiration time has passed
+              // so the column is eligible to be deleted so a system job can 
run instead
+              if (selectedFiles != null && reserved.isEmpty()
+                  && !Collections.disjoint(jobFiles, 
selectedFiles.getFiles())) {
+                LOG.debug("Deleting user compaction selected files for {} {}", 
extent,
+                    externalCompactionId);
+                tabletMutator.deleteSelectedFiles();
+              }
+            }
 
-        tabletMutator.putExternalCompaction(externalCompactionId, ecm);
-        tabletMutator.submit(tm -> 
tm.getExternalCompactions().containsKey(externalCompactionId));
+            tabletMutator.putExternalCompaction(externalCompactionId, ecm);
+            tabletMutator
+                .submit(tm -> 
tm.getExternalCompactions().containsKey(externalCompactionId));
 
-        var result = tabletsMutator.process().get(extent);
+            var result = tabletsMutator.process().get(extent);
 
-        if (result.getStatus() == Ample.ConditionalResult.Status.ACCEPTED) {
-          return ecm;
-        } else {
-          tabletMetadata = result.readMetadata();
+            if (result.getStatus() == Ample.ConditionalResult.Status.ACCEPTED) 
{
+              return ecm;
+            } else {
+              tabletMetadata = result.readMetadata();
+            }
+          }
+
+          retry.useRetry();
+          try {
+            retry.waitForNextAttempt(LOG,
+                "Reserved compaction for " + 
metaJob.getTabletMetadata().getExtent());
+          } catch (InterruptedException e) {
+            throw new RuntimeException(e);
+          }
         }
-      }
 
-      retry.useRetry();
-      try {
-        retry.waitForNextAttempt(LOG,
-            "Reserved compaction for " + 
metaJob.getTabletMetadata().getExtent());
-      } catch (InterruptedException e) {
-        throw new RuntimeException(e);
+        return null;
+      } finally {
+        
Preconditions.checkState(activeCompactorReservationRequest.remove(compactorAddress),
+            "compactorAddress:%s", compactorAddress);
       }
     }
+  }
+
+  protected CompactionMetadata reserveCompaction(CompactionJobQueues.MetaJob 
metaJob,
+      String compactorAddress, ExternalCompactionId externalCompactionId) {
+
+    if (activeCompactorReservationRequest.contains(compactorAddress)) {
+      // In this case the compactor has a previously submitted reservation 
request that is still
+      // processing. Do not want to let it queue up another reservation 
request. One possible cause
+      // of this is that compactor timed out waiting for its last request to 
process and is now
+      // making another request. The previously submitted request can not be 
used because the
+      // compactor generates a new uuid for each request it makes. So the best 
thing to do is to
+      // return null and wait for this situation to resolve. This will likely 
happen when some part
+      // of the distributed system is not working well, so at this point want 
to avoid making
+      // problems worse instead of trying to reserve a job.
+      LOG.warn(
+          "Ignoring request from {} to reserve compaction job because it has a 
reservation request in progress.",
+          compactorAddress);
+      return null;
+    }
 
-    return null;
+    var dataLevel = DataLevel.of(metaJob.getTabletMetadata().getTableId());
+    var future = CompletableFuture.supplyAsync(
+        new ReserveCompactionTask(metaJob, compactorAddress, 
externalCompactionId),
+        reservationPools.get(dataLevel));
+    return future.join();
   }
 
   protected TExternalCompactionJob createThriftJob(String externalCompactionId,
@@ -1123,6 +1206,14 @@ public class CompactionCoordinator
     // 5. Log compactors with no groups
     // 6. Log groups with compactors and queued jos that have not checked in
 
+    var config = ctx.getConfiguration();
+    ThreadPools.resizePool(reservationPools.get(DataLevel.ROOT), config,
+        Property.COMPACTION_COORDINATOR_RESERVATION_THREADS_ROOT);
+    ThreadPools.resizePool(reservationPools.get(DataLevel.METADATA), config,
+        Property.COMPACTION_COORDINATOR_RESERVATION_THREADS_META);
+    ThreadPools.resizePool(reservationPools.get(DataLevel.USER), config,
+        Property.COMPACTION_COORDINATOR_RESERVATION_THREADS_USER);
+
     // grab a snapshot of the ids in the set before reading the metadata 
table. This is done to
     // avoid removing things that are added while reading the metadata.
     final Set<ExternalCompactionId> idsSnapshot = 
Set.copyOf(RUNNING_CACHE.keySet());

Reply via email to