This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/elasticity by this push:
     new 0b49af8573 handles failure in compaction planner (#4586)
0b49af8573 is described below

commit 0b49af8573aac77ee67405ddea6f7be91bc1d2c1
Author: Keith Turner <ktur...@apache.org>
AuthorDate: Thu May 30 17:11:38 2024 -0400

    handles failure in compaction planner (#4586)
    
    Updates the CompactionJobGenerator to log failures in planning compactions.
    This avoids causing problems for the tablet management iterator when its
    trying to do other things like process tablet location updates.
    
    Added a test that ensures tables with flaky compaction planners can still
    be read and written. If failed compaction planning were to interfere with
    tablet assignment this would cause the reads and/or writes to fail.
---
 .../apache/accumulo/core/util/cache/Caches.java    |  1 -
 .../server/compaction/CompactionJobGenerator.java  | 69 ++++++++++++----------
 .../test/compaction/CompactionExecutorIT.java      | 54 +++++++++++++++++
 3 files changed, 92 insertions(+), 32 deletions(-)

diff --git a/core/src/main/java/org/apache/accumulo/core/util/cache/Caches.java 
b/core/src/main/java/org/apache/accumulo/core/util/cache/Caches.java
index 8fb51f1dee..531061a872 100644
--- a/core/src/main/java/org/apache/accumulo/core/util/cache/Caches.java
+++ b/core/src/main/java/org/apache/accumulo/core/util/cache/Caches.java
@@ -41,7 +41,6 @@ public class Caches implements MetricsProducer {
     COMPACTION_CONFIGS,
     COMPACTION_DIR_CACHE,
     COMPACTION_DISPATCHERS,
-    COMPACTION_SERVICE_UNKNOWN,
     COMPACTOR_GROUP_ID,
     COMPRESSION_ALGORITHM,
     CRYPT_PASSWORDS,
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionJobGenerator.java
 
b/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionJobGenerator.java
index 3b20c1ef99..e9f61d4cb8 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionJobGenerator.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionJobGenerator.java
@@ -18,6 +18,7 @@
  */
 package org.apache.accumulo.server.compaction;
 
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -26,7 +27,6 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
-import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 import org.apache.accumulo.core.client.PluginEnvironment;
@@ -35,6 +35,7 @@ import org.apache.accumulo.core.conf.ConfigurationTypeHelper;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.TableId;
 import org.apache.accumulo.core.fate.FateId;
+import org.apache.accumulo.core.logging.ConditionalLogger;
 import org.apache.accumulo.core.metadata.CompactableFileImpl;
 import org.apache.accumulo.core.metadata.schema.TabletMetadata;
 import org.apache.accumulo.core.spi.common.ServiceEnvironment;
@@ -45,7 +46,6 @@ import org.apache.accumulo.core.spi.compaction.CompactionPlan;
 import org.apache.accumulo.core.spi.compaction.CompactionPlanner;
 import org.apache.accumulo.core.spi.compaction.CompactionServiceId;
 import org.apache.accumulo.core.spi.compaction.CompactionServices;
-import org.apache.accumulo.core.util.Pair;
 import org.apache.accumulo.core.util.cache.Caches;
 import org.apache.accumulo.core.util.cache.Caches.CacheName;
 import org.apache.accumulo.core.util.compaction.CompactionJobImpl;
@@ -55,18 +55,25 @@ import 
org.apache.accumulo.core.util.compaction.CompactionServicesConfig;
 import org.apache.accumulo.core.util.time.SteadyTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.slf4j.event.Level;
 
 import com.github.benmanes.caffeine.cache.Cache;
 
 public class CompactionJobGenerator {
   private static final Logger log = 
LoggerFactory.getLogger(CompactionJobGenerator.class);
+  private static final Logger UNKNOWN_SERVICE_ERROR_LOG =
+      new ConditionalLogger.EscalatingLogger(log, Duration.ofMinutes(5), 3000, 
Level.ERROR);
+  private static final Logger PLANNING_INIT_ERROR_LOG =
+      new ConditionalLogger.EscalatingLogger(log, Duration.ofMinutes(5), 3000, 
Level.ERROR);
+  private static final Logger PLANNING_ERROR_LOG =
+      new ConditionalLogger.EscalatingLogger(log, Duration.ofMinutes(5), 3000, 
Level.ERROR);
+
   private final CompactionServicesConfig servicesConfig;
   private final Map<CompactionServiceId,CompactionPlanner> planners = new 
HashMap<>();
   private final Cache<TableId,CompactionDispatcher> dispatchers;
   private final Set<CompactionServiceId> serviceIds;
   private final PluginEnvironment env;
   private final Map<FateId,Map<String,String>> allExecutionHints;
-  private final Cache<Pair<TableId,CompactionServiceId>,Long> 
unknownCompactionServiceErrorCache;
   private final SteadyTime steadyTime;
 
   public CompactionJobGenerator(PluginEnvironment env,
@@ -87,22 +94,15 @@ public class CompactionJobGenerator {
       executionHints.forEach((k, v) -> allExecutionHints.put(k,
           v.isEmpty() ? Map.of() : Collections.unmodifiableMap(v)));
     }
-    unknownCompactionServiceErrorCache =
-        
Caches.getInstance().createNewBuilder(CacheName.COMPACTION_SERVICE_UNKNOWN, 
false)
-            .expireAfterWrite(5, TimeUnit.MINUTES).build();
+
     this.steadyTime = steadyTime;
   }
 
   public Collection<CompactionJob> generateJobs(TabletMetadata tablet, 
Set<CompactionKind> kinds) {
-
-    // ELASTICITY_TODO do not want user configured plugins to cause exceptions 
that prevents tablets
-    // from being
-    // assigned. So probably want to catch exceptions and log, but not too 
spammily OR some how
-    // report something
-    // back to the manager so it can log.
-
     Collection<CompactionJob> systemJobs = Set.of();
 
+    log.debug("Planning for {} {} {}", tablet.getExtent(), kinds, 
this.hashCode());
+
     if (kinds.contains(CompactionKind.SYSTEM)) {
       CompactionServiceId serviceId = dispatch(CompactionKind.SYSTEM, tablet, 
Map.of());
       systemJobs = planCompactions(serviceId, CompactionKind.SYSTEM, tablet, 
Map.of());
@@ -166,19 +166,11 @@ public class CompactionJobGenerator {
       CompactionKind kind, TabletMetadata tablet, Map<String,String> 
executionHints) {
 
     if (!servicesConfig.getPlanners().containsKey(serviceId.canonical())) {
-      var cacheKey = new Pair<>(tablet.getTableId(), serviceId);
-      var last = unknownCompactionServiceErrorCache.getIfPresent(cacheKey);
-      if (last == null) {
-        // have not logged an error recently for this, so lets log one
-        log.error(
-            "Tablet {} returned non-existent compaction service {} for 
compaction type {}.  Check"
-                + " the table compaction dispatcher configuration. No 
compactions will happen"
-                + " until the configuration is fixed. This log message is 
temporarily suppressed for the"
-                + " entire table.",
-            tablet.getExtent(), serviceId, kind);
-        unknownCompactionServiceErrorCache.put(cacheKey, 
System.currentTimeMillis());
-      }
-
+      UNKNOWN_SERVICE_ERROR_LOG.trace(
+          "Table {} returned non-existent compaction service {} for compaction 
type {}.  Check"
+              + " the table compaction dispatcher configuration. No 
compactions will happen"
+              + " until the configuration is fixed. This log message is 
temporarily suppressed.",
+          tablet.getExtent().tableId(), serviceId, kind);
       return Set.of();
     }
 
@@ -299,8 +291,7 @@ public class CompactionJobGenerator {
         return new CompactionPlanImpl.BuilderImpl(kind, allFiles, candidates);
       }
     };
-
-    return planner.makePlan(params).getJobs();
+    return planCompactions(planner, params, serviceId);
   }
 
   private CompactionPlanner createPlanner(TableId tableId, CompactionServiceId 
serviceId) {
@@ -317,11 +308,27 @@ public class CompactionJobGenerator {
           servicesConfig.getOptions().get(serviceId.canonical()), 
(ServiceEnvironment) env);
       planner.init(initParameters);
     } catch (Exception e) {
-      log.error(
-          "Failed to create compaction planner for {} using class:{} 
options:{}.  Compaction service will not start any new compactions until its 
configuration is fixed.",
-          serviceId, plannerClassName, options, e);
+      PLANNING_INIT_ERROR_LOG.trace(
+          "Failed to create compaction planner for service:{} tableId:{} using 
class:{} options:{}.  Compaction "
+              + "service will not start any new compactions until its 
configuration is fixed. This log message is "
+              + "temporarily suppressed.",
+          serviceId, tableId, plannerClassName, options, e);
       planner = new ProvisionalCompactionPlanner(serviceId);
     }
     return planner;
   }
+
+  private Collection<CompactionJob> planCompactions(CompactionPlanner planner,
+      CompactionPlanner.PlanningParameters params, CompactionServiceId 
serviceId) {
+    try {
+      return planner.makePlan(params).getJobs();
+    } catch (Exception e) {
+      PLANNING_ERROR_LOG.trace(
+          "Failed to plan compactions for service:{} kind:{} tableId:{} 
hints:{}.  Compaction service may not start any"
+              + " new compactions until this issue is resolved. Duplicates of 
this log message are temporarily"
+              + " suppressed.",
+          serviceId, params.getKind(), params.getTableId(), 
params.getExecutionHints(), e);
+      return Set.of();
+    }
+  }
 }
diff --git 
a/test/src/main/java/org/apache/accumulo/test/compaction/CompactionExecutorIT.java
 
b/test/src/main/java/org/apache/accumulo/test/compaction/CompactionExecutorIT.java
index 67c01286a9..a6bd658c8a 100644
--- 
a/test/src/main/java/org/apache/accumulo/test/compaction/CompactionExecutorIT.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/compaction/CompactionExecutorIT.java
@@ -152,6 +152,21 @@ public class CompactionExecutorIT extends 
SharedMiniClusterBase {
     }
   }
 
+  public static class ErroringPlanner implements CompactionPlanner {
+    @Override
+    public void init(InitParameters params) {
+      if (Boolean.parseBoolean(params.getOptions().getOrDefault("failInInit", 
"false"))) {
+        throw new IllegalStateException("error initializing");
+      }
+
+    }
+
+    @Override
+    public CompactionPlan makePlan(PlanningParameters params) {
+      throw new IllegalStateException("error planning");
+    }
+  }
+
   public static class CompactionExecutorITConfig implements 
MiniClusterConfigurationCallback {
     @Override
     public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration 
conf) {
@@ -177,6 +192,16 @@ public class CompactionExecutorIT extends 
SharedMiniClusterBase {
       cfg.setProperty(csp + "cs4.planner.opts.filesPerCompaction", "11");
       cfg.setProperty(csp + "cs4.planner.opts.process", "USER");
 
+      // Setup three planner that fail to initialize or plan, these planners 
should not impede
+      // tablet assignment.
+      cfg.setProperty(csp + "cse1.planner", ErroringPlanner.class.getName());
+      cfg.setProperty(csp + "cse1.planner.opts.failInInit", "true");
+
+      cfg.setProperty(csp + "cse2.planner", ErroringPlanner.class.getName());
+      cfg.setProperty(csp + "cse2.planner.opts.failInInit", "false");
+
+      cfg.setProperty(csp + "cse3.planner", "NonExistentPlanner20240522");
+
       // this is meant to be dynamically reconfigured
       cfg.setProperty(csp + "recfg.planner", TestPlanner.class.getName());
       cfg.setProperty(csp + "recfg.planner.opts.groups", 
"[{'group':'i1'},{'group':'i2'}]");
@@ -230,6 +255,35 @@ public class CompactionExecutorIT extends 
SharedMiniClusterBase {
     }
   }
 
+  @Test
+  public void testFailingPlanners() throws Exception {
+    // This test ensures that a table w/ failing compaction planner can still 
be read and written.
+
+    try (AccumuloClient client = 
Accumulo.newClient().from(getClientProps()).build()) {
+      createTable(client, "fail1", "cse1");
+      createTable(client, "fail2", "cse2");
+      createTable(client, "fail3", "cse3");
+
+      // ensure tablets can still be assigned and written w/ failing 
compaction services
+      addFiles(client, "fail1", 30);
+      addFiles(client, "fail2", 30);
+      addFiles(client, "fail3", 30);
+
+      // ensure tablets can still be assigned and scanned w/ failing 
compaction services
+      assertEquals(30, scanTable(client, "fail1").size());
+      assertEquals(30, scanTable(client, "fail2").size());
+      assertEquals(30, scanTable(client, "fail3").size());
+
+      // compactions should never run on these tables, but sleep a bit to be 
sure
+      Thread.sleep(2000);
+
+      // do no expect any compactions to run
+      assertEquals(30, getFiles(client, "fail1").size());
+      assertEquals(30, getFiles(client, "fail2").size());
+      assertEquals(30, getFiles(client, "fail3").size());
+    }
+  }
+
   @Test
   public void testReconfigureCompactionService() throws Exception {
     Stream.of("i1", "i2").forEach(g -> {

Reply via email to