This is an automated email from the ASF dual-hosted git repository.
kturner pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/elasticity by this push:
new 0b49af8573 handles failure in compaction planner (#4586)
0b49af8573 is described below
commit 0b49af8573aac77ee67405ddea6f7be91bc1d2c1
Author: Keith Turner <[email protected]>
AuthorDate: Thu May 30 17:11:38 2024 -0400
handles failure in compaction planner (#4586)
Updates the CompactionJobGenerator to log failures in planning compactions.
This avoids causing problems for the tablet management iterator when its
trying to do other things like process tablet location updates.
Added a test that ensures tables with flaky compaction planners can still
be read and written. If failed compaction planning were to interfere with
tablet assignment this would cause the reads and/or writes to fail.
---
.../apache/accumulo/core/util/cache/Caches.java | 1 -
.../server/compaction/CompactionJobGenerator.java | 69 ++++++++++++----------
.../test/compaction/CompactionExecutorIT.java | 54 +++++++++++++++++
3 files changed, 92 insertions(+), 32 deletions(-)
diff --git a/core/src/main/java/org/apache/accumulo/core/util/cache/Caches.java
b/core/src/main/java/org/apache/accumulo/core/util/cache/Caches.java
index 8fb51f1dee..531061a872 100644
--- a/core/src/main/java/org/apache/accumulo/core/util/cache/Caches.java
+++ b/core/src/main/java/org/apache/accumulo/core/util/cache/Caches.java
@@ -41,7 +41,6 @@ public class Caches implements MetricsProducer {
COMPACTION_CONFIGS,
COMPACTION_DIR_CACHE,
COMPACTION_DISPATCHERS,
- COMPACTION_SERVICE_UNKNOWN,
COMPACTOR_GROUP_ID,
COMPRESSION_ALGORITHM,
CRYPT_PASSWORDS,
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionJobGenerator.java
b/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionJobGenerator.java
index 3b20c1ef99..e9f61d4cb8 100644
---
a/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionJobGenerator.java
+++
b/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionJobGenerator.java
@@ -18,6 +18,7 @@
*/
package org.apache.accumulo.server.compaction;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -26,7 +27,6 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
-import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.accumulo.core.client.PluginEnvironment;
@@ -35,6 +35,7 @@ import org.apache.accumulo.core.conf.ConfigurationTypeHelper;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.fate.FateId;
+import org.apache.accumulo.core.logging.ConditionalLogger;
import org.apache.accumulo.core.metadata.CompactableFileImpl;
import org.apache.accumulo.core.metadata.schema.TabletMetadata;
import org.apache.accumulo.core.spi.common.ServiceEnvironment;
@@ -45,7 +46,6 @@ import org.apache.accumulo.core.spi.compaction.CompactionPlan;
import org.apache.accumulo.core.spi.compaction.CompactionPlanner;
import org.apache.accumulo.core.spi.compaction.CompactionServiceId;
import org.apache.accumulo.core.spi.compaction.CompactionServices;
-import org.apache.accumulo.core.util.Pair;
import org.apache.accumulo.core.util.cache.Caches;
import org.apache.accumulo.core.util.cache.Caches.CacheName;
import org.apache.accumulo.core.util.compaction.CompactionJobImpl;
@@ -55,18 +55,25 @@ import
org.apache.accumulo.core.util.compaction.CompactionServicesConfig;
import org.apache.accumulo.core.util.time.SteadyTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.slf4j.event.Level;
import com.github.benmanes.caffeine.cache.Cache;
public class CompactionJobGenerator {
private static final Logger log =
LoggerFactory.getLogger(CompactionJobGenerator.class);
+ private static final Logger UNKNOWN_SERVICE_ERROR_LOG =
+ new ConditionalLogger.EscalatingLogger(log, Duration.ofMinutes(5), 3000,
Level.ERROR);
+ private static final Logger PLANNING_INIT_ERROR_LOG =
+ new ConditionalLogger.EscalatingLogger(log, Duration.ofMinutes(5), 3000,
Level.ERROR);
+ private static final Logger PLANNING_ERROR_LOG =
+ new ConditionalLogger.EscalatingLogger(log, Duration.ofMinutes(5), 3000,
Level.ERROR);
+
private final CompactionServicesConfig servicesConfig;
private final Map<CompactionServiceId,CompactionPlanner> planners = new
HashMap<>();
private final Cache<TableId,CompactionDispatcher> dispatchers;
private final Set<CompactionServiceId> serviceIds;
private final PluginEnvironment env;
private final Map<FateId,Map<String,String>> allExecutionHints;
- private final Cache<Pair<TableId,CompactionServiceId>,Long>
unknownCompactionServiceErrorCache;
private final SteadyTime steadyTime;
public CompactionJobGenerator(PluginEnvironment env,
@@ -87,22 +94,15 @@ public class CompactionJobGenerator {
executionHints.forEach((k, v) -> allExecutionHints.put(k,
v.isEmpty() ? Map.of() : Collections.unmodifiableMap(v)));
}
- unknownCompactionServiceErrorCache =
-
Caches.getInstance().createNewBuilder(CacheName.COMPACTION_SERVICE_UNKNOWN,
false)
- .expireAfterWrite(5, TimeUnit.MINUTES).build();
+
this.steadyTime = steadyTime;
}
public Collection<CompactionJob> generateJobs(TabletMetadata tablet,
Set<CompactionKind> kinds) {
-
- // ELASTICITY_TODO do not want user configured plugins to cause exceptions
that prevents tablets
- // from being
- // assigned. So probably want to catch exceptions and log, but not too
spammily OR some how
- // report something
- // back to the manager so it can log.
-
Collection<CompactionJob> systemJobs = Set.of();
+ log.debug("Planning for {} {} {}", tablet.getExtent(), kinds,
this.hashCode());
+
if (kinds.contains(CompactionKind.SYSTEM)) {
CompactionServiceId serviceId = dispatch(CompactionKind.SYSTEM, tablet,
Map.of());
systemJobs = planCompactions(serviceId, CompactionKind.SYSTEM, tablet,
Map.of());
@@ -166,19 +166,11 @@ public class CompactionJobGenerator {
CompactionKind kind, TabletMetadata tablet, Map<String,String>
executionHints) {
if (!servicesConfig.getPlanners().containsKey(serviceId.canonical())) {
- var cacheKey = new Pair<>(tablet.getTableId(), serviceId);
- var last = unknownCompactionServiceErrorCache.getIfPresent(cacheKey);
- if (last == null) {
- // have not logged an error recently for this, so lets log one
- log.error(
- "Tablet {} returned non-existent compaction service {} for
compaction type {}. Check"
- + " the table compaction dispatcher configuration. No
compactions will happen"
- + " until the configuration is fixed. This log message is
temporarily suppressed for the"
- + " entire table.",
- tablet.getExtent(), serviceId, kind);
- unknownCompactionServiceErrorCache.put(cacheKey,
System.currentTimeMillis());
- }
-
+ UNKNOWN_SERVICE_ERROR_LOG.trace(
+ "Table {} returned non-existent compaction service {} for compaction
type {}. Check"
+ + " the table compaction dispatcher configuration. No
compactions will happen"
+ + " until the configuration is fixed. This log message is
temporarily suppressed.",
+ tablet.getExtent().tableId(), serviceId, kind);
return Set.of();
}
@@ -299,8 +291,7 @@ public class CompactionJobGenerator {
return new CompactionPlanImpl.BuilderImpl(kind, allFiles, candidates);
}
};
-
- return planner.makePlan(params).getJobs();
+ return planCompactions(planner, params, serviceId);
}
private CompactionPlanner createPlanner(TableId tableId, CompactionServiceId
serviceId) {
@@ -317,11 +308,27 @@ public class CompactionJobGenerator {
servicesConfig.getOptions().get(serviceId.canonical()),
(ServiceEnvironment) env);
planner.init(initParameters);
} catch (Exception e) {
- log.error(
- "Failed to create compaction planner for {} using class:{}
options:{}. Compaction service will not start any new compactions until its
configuration is fixed.",
- serviceId, plannerClassName, options, e);
+ PLANNING_INIT_ERROR_LOG.trace(
+ "Failed to create compaction planner for service:{} tableId:{} using
class:{} options:{}. Compaction "
+ + "service will not start any new compactions until its
configuration is fixed. This log message is "
+ + "temporarily suppressed.",
+ serviceId, tableId, plannerClassName, options, e);
planner = new ProvisionalCompactionPlanner(serviceId);
}
return planner;
}
+
+ private Collection<CompactionJob> planCompactions(CompactionPlanner planner,
+ CompactionPlanner.PlanningParameters params, CompactionServiceId
serviceId) {
+ try {
+ return planner.makePlan(params).getJobs();
+ } catch (Exception e) {
+ PLANNING_ERROR_LOG.trace(
+ "Failed to plan compactions for service:{} kind:{} tableId:{}
hints:{}. Compaction service may not start any"
+ + " new compactions until this issue is resolved. Duplicates of
this log message are temporarily"
+ + " suppressed.",
+ serviceId, params.getKind(), params.getTableId(),
params.getExecutionHints(), e);
+ return Set.of();
+ }
+ }
}
diff --git
a/test/src/main/java/org/apache/accumulo/test/compaction/CompactionExecutorIT.java
b/test/src/main/java/org/apache/accumulo/test/compaction/CompactionExecutorIT.java
index 67c01286a9..a6bd658c8a 100644
---
a/test/src/main/java/org/apache/accumulo/test/compaction/CompactionExecutorIT.java
+++
b/test/src/main/java/org/apache/accumulo/test/compaction/CompactionExecutorIT.java
@@ -152,6 +152,21 @@ public class CompactionExecutorIT extends
SharedMiniClusterBase {
}
}
+ public static class ErroringPlanner implements CompactionPlanner {
+ @Override
+ public void init(InitParameters params) {
+ if (Boolean.parseBoolean(params.getOptions().getOrDefault("failInInit",
"false"))) {
+ throw new IllegalStateException("error initializing");
+ }
+
+ }
+
+ @Override
+ public CompactionPlan makePlan(PlanningParameters params) {
+ throw new IllegalStateException("error planning");
+ }
+ }
+
public static class CompactionExecutorITConfig implements
MiniClusterConfigurationCallback {
@Override
public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration
conf) {
@@ -177,6 +192,16 @@ public class CompactionExecutorIT extends
SharedMiniClusterBase {
cfg.setProperty(csp + "cs4.planner.opts.filesPerCompaction", "11");
cfg.setProperty(csp + "cs4.planner.opts.process", "USER");
+ // Setup three planner that fail to initialize or plan, these planners
should not impede
+ // tablet assignment.
+ cfg.setProperty(csp + "cse1.planner", ErroringPlanner.class.getName());
+ cfg.setProperty(csp + "cse1.planner.opts.failInInit", "true");
+
+ cfg.setProperty(csp + "cse2.planner", ErroringPlanner.class.getName());
+ cfg.setProperty(csp + "cse2.planner.opts.failInInit", "false");
+
+ cfg.setProperty(csp + "cse3.planner", "NonExistentPlanner20240522");
+
// this is meant to be dynamically reconfigured
cfg.setProperty(csp + "recfg.planner", TestPlanner.class.getName());
cfg.setProperty(csp + "recfg.planner.opts.groups",
"[{'group':'i1'},{'group':'i2'}]");
@@ -230,6 +255,35 @@ public class CompactionExecutorIT extends
SharedMiniClusterBase {
}
}
+ @Test
+ public void testFailingPlanners() throws Exception {
+ // This test ensures that a table w/ failing compaction planner can still
be read and written.
+
+ try (AccumuloClient client =
Accumulo.newClient().from(getClientProps()).build()) {
+ createTable(client, "fail1", "cse1");
+ createTable(client, "fail2", "cse2");
+ createTable(client, "fail3", "cse3");
+
+ // ensure tablets can still be assigned and written w/ failing
compaction services
+ addFiles(client, "fail1", 30);
+ addFiles(client, "fail2", 30);
+ addFiles(client, "fail3", 30);
+
+ // ensure tablets can still be assigned and scanned w/ failing
compaction services
+ assertEquals(30, scanTable(client, "fail1").size());
+ assertEquals(30, scanTable(client, "fail2").size());
+ assertEquals(30, scanTable(client, "fail3").size());
+
+ // compactions should never run on these tables, but sleep a bit to be
sure
+ Thread.sleep(2000);
+
+ // do no expect any compactions to run
+ assertEquals(30, getFiles(client, "fail1").size());
+ assertEquals(30, getFiles(client, "fail2").size());
+ assertEquals(30, getFiles(client, "fail3").size());
+ }
+ }
+
@Test
public void testReconfigureCompactionService() throws Exception {
Stream.of("i1", "i2").forEach(g -> {