This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/elasticity by this push: new 0b49af8573 handles failure in compaction planner (#4586) 0b49af8573 is described below commit 0b49af8573aac77ee67405ddea6f7be91bc1d2c1 Author: Keith Turner <ktur...@apache.org> AuthorDate: Thu May 30 17:11:38 2024 -0400 handles failure in compaction planner (#4586) Updates the CompactionJobGenerator to log failures in planning compactions. This avoids causing problems for the tablet management iterator when its trying to do other things like process tablet location updates. Added a test that ensures tables with flaky compaction planners can still be read and written. If failed compaction planning were to interfere with tablet assignment this would cause the reads and/or writes to fail. --- .../apache/accumulo/core/util/cache/Caches.java | 1 - .../server/compaction/CompactionJobGenerator.java | 69 ++++++++++++---------- .../test/compaction/CompactionExecutorIT.java | 54 +++++++++++++++++ 3 files changed, 92 insertions(+), 32 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/util/cache/Caches.java b/core/src/main/java/org/apache/accumulo/core/util/cache/Caches.java index 8fb51f1dee..531061a872 100644 --- a/core/src/main/java/org/apache/accumulo/core/util/cache/Caches.java +++ b/core/src/main/java/org/apache/accumulo/core/util/cache/Caches.java @@ -41,7 +41,6 @@ public class Caches implements MetricsProducer { COMPACTION_CONFIGS, COMPACTION_DIR_CACHE, COMPACTION_DISPATCHERS, - COMPACTION_SERVICE_UNKNOWN, COMPACTOR_GROUP_ID, COMPRESSION_ALGORITHM, CRYPT_PASSWORDS, diff --git a/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionJobGenerator.java b/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionJobGenerator.java index 3b20c1ef99..e9f61d4cb8 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionJobGenerator.java +++ b/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionJobGenerator.java @@ -18,6 +18,7 @@ */ package org.apache.accumulo.server.compaction; +import java.time.Duration; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -26,7 +27,6 @@ import java.util.HashSet; import java.util.Map; import java.util.Optional; import java.util.Set; -import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import org.apache.accumulo.core.client.PluginEnvironment; @@ -35,6 +35,7 @@ import org.apache.accumulo.core.conf.ConfigurationTypeHelper; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.fate.FateId; +import org.apache.accumulo.core.logging.ConditionalLogger; import org.apache.accumulo.core.metadata.CompactableFileImpl; import org.apache.accumulo.core.metadata.schema.TabletMetadata; import org.apache.accumulo.core.spi.common.ServiceEnvironment; @@ -45,7 +46,6 @@ import org.apache.accumulo.core.spi.compaction.CompactionPlan; import org.apache.accumulo.core.spi.compaction.CompactionPlanner; import org.apache.accumulo.core.spi.compaction.CompactionServiceId; import org.apache.accumulo.core.spi.compaction.CompactionServices; -import org.apache.accumulo.core.util.Pair; import org.apache.accumulo.core.util.cache.Caches; import org.apache.accumulo.core.util.cache.Caches.CacheName; import org.apache.accumulo.core.util.compaction.CompactionJobImpl; @@ -55,18 +55,25 @@ import org.apache.accumulo.core.util.compaction.CompactionServicesConfig; import org.apache.accumulo.core.util.time.SteadyTime; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.slf4j.event.Level; import com.github.benmanes.caffeine.cache.Cache; public class CompactionJobGenerator { private static final Logger log = LoggerFactory.getLogger(CompactionJobGenerator.class); + private static final Logger UNKNOWN_SERVICE_ERROR_LOG = + new ConditionalLogger.EscalatingLogger(log, Duration.ofMinutes(5), 3000, Level.ERROR); + private static final Logger PLANNING_INIT_ERROR_LOG = + new ConditionalLogger.EscalatingLogger(log, Duration.ofMinutes(5), 3000, Level.ERROR); + private static final Logger PLANNING_ERROR_LOG = + new ConditionalLogger.EscalatingLogger(log, Duration.ofMinutes(5), 3000, Level.ERROR); + private final CompactionServicesConfig servicesConfig; private final Map<CompactionServiceId,CompactionPlanner> planners = new HashMap<>(); private final Cache<TableId,CompactionDispatcher> dispatchers; private final Set<CompactionServiceId> serviceIds; private final PluginEnvironment env; private final Map<FateId,Map<String,String>> allExecutionHints; - private final Cache<Pair<TableId,CompactionServiceId>,Long> unknownCompactionServiceErrorCache; private final SteadyTime steadyTime; public CompactionJobGenerator(PluginEnvironment env, @@ -87,22 +94,15 @@ public class CompactionJobGenerator { executionHints.forEach((k, v) -> allExecutionHints.put(k, v.isEmpty() ? Map.of() : Collections.unmodifiableMap(v))); } - unknownCompactionServiceErrorCache = - Caches.getInstance().createNewBuilder(CacheName.COMPACTION_SERVICE_UNKNOWN, false) - .expireAfterWrite(5, TimeUnit.MINUTES).build(); + this.steadyTime = steadyTime; } public Collection<CompactionJob> generateJobs(TabletMetadata tablet, Set<CompactionKind> kinds) { - - // ELASTICITY_TODO do not want user configured plugins to cause exceptions that prevents tablets - // from being - // assigned. So probably want to catch exceptions and log, but not too spammily OR some how - // report something - // back to the manager so it can log. - Collection<CompactionJob> systemJobs = Set.of(); + log.debug("Planning for {} {} {}", tablet.getExtent(), kinds, this.hashCode()); + if (kinds.contains(CompactionKind.SYSTEM)) { CompactionServiceId serviceId = dispatch(CompactionKind.SYSTEM, tablet, Map.of()); systemJobs = planCompactions(serviceId, CompactionKind.SYSTEM, tablet, Map.of()); @@ -166,19 +166,11 @@ public class CompactionJobGenerator { CompactionKind kind, TabletMetadata tablet, Map<String,String> executionHints) { if (!servicesConfig.getPlanners().containsKey(serviceId.canonical())) { - var cacheKey = new Pair<>(tablet.getTableId(), serviceId); - var last = unknownCompactionServiceErrorCache.getIfPresent(cacheKey); - if (last == null) { - // have not logged an error recently for this, so lets log one - log.error( - "Tablet {} returned non-existent compaction service {} for compaction type {}. Check" - + " the table compaction dispatcher configuration. No compactions will happen" - + " until the configuration is fixed. This log message is temporarily suppressed for the" - + " entire table.", - tablet.getExtent(), serviceId, kind); - unknownCompactionServiceErrorCache.put(cacheKey, System.currentTimeMillis()); - } - + UNKNOWN_SERVICE_ERROR_LOG.trace( + "Table {} returned non-existent compaction service {} for compaction type {}. Check" + + " the table compaction dispatcher configuration. No compactions will happen" + + " until the configuration is fixed. This log message is temporarily suppressed.", + tablet.getExtent().tableId(), serviceId, kind); return Set.of(); } @@ -299,8 +291,7 @@ public class CompactionJobGenerator { return new CompactionPlanImpl.BuilderImpl(kind, allFiles, candidates); } }; - - return planner.makePlan(params).getJobs(); + return planCompactions(planner, params, serviceId); } private CompactionPlanner createPlanner(TableId tableId, CompactionServiceId serviceId) { @@ -317,11 +308,27 @@ public class CompactionJobGenerator { servicesConfig.getOptions().get(serviceId.canonical()), (ServiceEnvironment) env); planner.init(initParameters); } catch (Exception e) { - log.error( - "Failed to create compaction planner for {} using class:{} options:{}. Compaction service will not start any new compactions until its configuration is fixed.", - serviceId, plannerClassName, options, e); + PLANNING_INIT_ERROR_LOG.trace( + "Failed to create compaction planner for service:{} tableId:{} using class:{} options:{}. Compaction " + + "service will not start any new compactions until its configuration is fixed. This log message is " + + "temporarily suppressed.", + serviceId, tableId, plannerClassName, options, e); planner = new ProvisionalCompactionPlanner(serviceId); } return planner; } + + private Collection<CompactionJob> planCompactions(CompactionPlanner planner, + CompactionPlanner.PlanningParameters params, CompactionServiceId serviceId) { + try { + return planner.makePlan(params).getJobs(); + } catch (Exception e) { + PLANNING_ERROR_LOG.trace( + "Failed to plan compactions for service:{} kind:{} tableId:{} hints:{}. Compaction service may not start any" + + " new compactions until this issue is resolved. Duplicates of this log message are temporarily" + + " suppressed.", + serviceId, params.getKind(), params.getTableId(), params.getExecutionHints(), e); + return Set.of(); + } + } } diff --git a/test/src/main/java/org/apache/accumulo/test/compaction/CompactionExecutorIT.java b/test/src/main/java/org/apache/accumulo/test/compaction/CompactionExecutorIT.java index 67c01286a9..a6bd658c8a 100644 --- a/test/src/main/java/org/apache/accumulo/test/compaction/CompactionExecutorIT.java +++ b/test/src/main/java/org/apache/accumulo/test/compaction/CompactionExecutorIT.java @@ -152,6 +152,21 @@ public class CompactionExecutorIT extends SharedMiniClusterBase { } } + public static class ErroringPlanner implements CompactionPlanner { + @Override + public void init(InitParameters params) { + if (Boolean.parseBoolean(params.getOptions().getOrDefault("failInInit", "false"))) { + throw new IllegalStateException("error initializing"); + } + + } + + @Override + public CompactionPlan makePlan(PlanningParameters params) { + throw new IllegalStateException("error planning"); + } + } + public static class CompactionExecutorITConfig implements MiniClusterConfigurationCallback { @Override public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration conf) { @@ -177,6 +192,16 @@ public class CompactionExecutorIT extends SharedMiniClusterBase { cfg.setProperty(csp + "cs4.planner.opts.filesPerCompaction", "11"); cfg.setProperty(csp + "cs4.planner.opts.process", "USER"); + // Setup three planner that fail to initialize or plan, these planners should not impede + // tablet assignment. + cfg.setProperty(csp + "cse1.planner", ErroringPlanner.class.getName()); + cfg.setProperty(csp + "cse1.planner.opts.failInInit", "true"); + + cfg.setProperty(csp + "cse2.planner", ErroringPlanner.class.getName()); + cfg.setProperty(csp + "cse2.planner.opts.failInInit", "false"); + + cfg.setProperty(csp + "cse3.planner", "NonExistentPlanner20240522"); + // this is meant to be dynamically reconfigured cfg.setProperty(csp + "recfg.planner", TestPlanner.class.getName()); cfg.setProperty(csp + "recfg.planner.opts.groups", "[{'group':'i1'},{'group':'i2'}]"); @@ -230,6 +255,35 @@ public class CompactionExecutorIT extends SharedMiniClusterBase { } } + @Test + public void testFailingPlanners() throws Exception { + // This test ensures that a table w/ failing compaction planner can still be read and written. + + try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { + createTable(client, "fail1", "cse1"); + createTable(client, "fail2", "cse2"); + createTable(client, "fail3", "cse3"); + + // ensure tablets can still be assigned and written w/ failing compaction services + addFiles(client, "fail1", 30); + addFiles(client, "fail2", 30); + addFiles(client, "fail3", 30); + + // ensure tablets can still be assigned and scanned w/ failing compaction services + assertEquals(30, scanTable(client, "fail1").size()); + assertEquals(30, scanTable(client, "fail2").size()); + assertEquals(30, scanTable(client, "fail3").size()); + + // compactions should never run on these tables, but sleep a bit to be sure + Thread.sleep(2000); + + // do no expect any compactions to run + assertEquals(30, getFiles(client, "fail1").size()); + assertEquals(30, getFiles(client, "fail2").size()); + assertEquals(30, getFiles(client, "fail3").size()); + } + } + @Test public void testReconfigureCompactionService() throws Exception { Stream.of("i1", "i2").forEach(g -> {