This is an automated email from the ASF dual-hosted git repository.

dlmarion pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/elasticity by this push:
     new 07e781c80f Added runtime check for shared compaction queues (#4800)
07e781c80f is described below

commit 07e781c80f549459ce64cbc1e130dbe615db43b7
Author: Dave Marion <dlmar...@apache.org>
AuthorDate: Mon Aug 19 07:49:18 2024 -0400

    Added runtime check for shared compaction queues (#4800)
    
    Closes #4034
---
 .../server/conf/CheckCompactionConfig.java         | 28 ++++++++++++++--
 .../server/conf/CheckCompactionConfigTest.java     | 37 ++++++++++++++++------
 .../accumulo/manager/TabletGroupWatcher.java       | 13 +++++++-
 3 files changed, 65 insertions(+), 13 deletions(-)

diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/conf/CheckCompactionConfig.java
 
b/server/base/src/main/java/org/apache/accumulo/server/conf/CheckCompactionConfig.java
index fb278fbfe5..d9c4ebcc50 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/conf/CheckCompactionConfig.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/conf/CheckCompactionConfig.java
@@ -22,6 +22,10 @@ import static 
org.apache.accumulo.core.Constants.DEFAULT_COMPACTION_SERVICE_NAME
 
 import java.io.FileNotFoundException;
 import java.nio.file.Path;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
 
 import org.apache.accumulo.core.cli.Help;
@@ -31,6 +35,7 @@ import org.apache.accumulo.core.data.TableId;
 import org.apache.accumulo.core.spi.common.ServiceEnvironment;
 import org.apache.accumulo.core.spi.compaction.CompactionPlanner;
 import org.apache.accumulo.core.spi.compaction.CompactionServiceId;
+import org.apache.accumulo.core.spi.compaction.CompactorGroupId;
 import org.apache.accumulo.core.util.ConfigurationImpl;
 import org.apache.accumulo.core.util.compaction.CompactionPlannerInitParams;
 import org.apache.accumulo.core.util.compaction.CompactionServicesConfig;
@@ -104,6 +109,7 @@ public class CheckCompactionConfig implements 
KeywordExecutable {
       return;
     }
 
+    Map<CompactorGroupId,Set<String>> groupToServices = new HashMap<>();
     for (var entry : servicesConfig.getPlanners().entrySet()) {
       String serviceId = entry.getKey();
       String plannerClassName = entry.getValue();
@@ -120,9 +126,25 @@ public class CheckCompactionConfig implements 
KeywordExecutable {
 
       planner.init(initParams);
 
-      initParams.getRequestedGroups().forEach(
-          (groupId -> log.info("Compaction service '{}' requested with 
compactor group '{}'",
-              serviceId, groupId)));
+      initParams.getRequestedGroups().forEach(groupId -> {
+        log.info("Compaction service '{}' requested with compactor group 
'{}'", serviceId, groupId);
+        groupToServices.computeIfAbsent(groupId, f -> new 
HashSet<>()).add(serviceId);
+      });
+    }
+
+    boolean dupesFound = false;
+    for (Entry<CompactorGroupId,Set<String>> e : groupToServices.entrySet()) {
+      if (e.getValue().size() > 1) {
+        log.warn("Compaction services " + e.getValue().toString()
+            + " mapped to the same compactor group: " + e.getKey());
+        dupesFound = true;
+      }
+    }
+
+    if (dupesFound) {
+      throw new IllegalStateException(
+          "Multiple compaction services configured to use the same group. This 
could lead"
+              + " to undesired behavior. Please fix the configuration");
     }
 
     log.info("Properties file has passed all checks.");
diff --git 
a/server/base/src/test/java/org/apache/accumulo/server/conf/CheckCompactionConfigTest.java
 
b/server/base/src/test/java/org/apache/accumulo/server/conf/CheckCompactionConfigTest.java
index 9bb7cc163f..7404380a78 100644
--- 
a/server/base/src/test/java/org/apache/accumulo/server/conf/CheckCompactionConfigTest.java
+++ 
b/server/base/src/test/java/org/apache/accumulo/server/conf/CheckCompactionConfigTest.java
@@ -65,12 +65,12 @@ public class CheckCompactionConfigTest extends 
WithTestNames {
     String inputString = ("compaction.service.cs1.planner="
         + "org.apache.accumulo.core.spi.compaction.RatioBasedCompactionPlanner 
\n"
         + "compaction.service.cs1.planner.opts.groups=\\\n"
-        + 
"[{'group':'small','maxSize':'16M'},{'group':'medium','maxSize':'128M'},\\\n"
-        + "{'group':'large'}] \ncompaction.service.cs2.planner="
+        + 
"[{'group':'cs1_small','maxSize':'16M'},{'group':'cs1_medium','maxSize':'128M'},\\\n"
+        + "{'group':'cs1_large'}] \ncompaction.service.cs2.planner="
         + "org.apache.accumulo.core.spi.compaction.RatioBasedCompactionPlanner 
\n"
         + "compaction.service.cs2.planner.opts.groups=\\\n"
-        + 
"[{'group':'small','maxSize':'16M'},{'group':'medium','maxSize':'128M'},\\\n"
-        + "{'group':'large'}]").replaceAll("'", "\"");
+        + 
"[{'group':'cs2_small','maxSize':'16M'},{'group':'cs2_medium','maxSize':'128M'},\\\n"
+        + "{'group':'cs2_large'}]").replaceAll("'", "\"");
 
     String filePath = writeToFileAndReturnPath(inputString);
 
@@ -82,15 +82,15 @@ public class CheckCompactionConfigTest extends 
WithTestNames {
     String inputString = ("compaction.service.cs1.planner="
         + "org.apache.accumulo.core.spi.compaction.RatioBasedCompactionPlanner 
\n"
         + "compaction.service.cs1.planner.opts.groups=\\\n"
-        + 
"[{'group':'small','maxSize':'16M'},{'group':'medium','maxSize':'128M'},\\\n"
-        + "{'group':'large'}] \ncompaction.service.cs2.planner="
+        + 
"[{'group':'cs1_small','maxSize':'16M'},{'group':'cs1_medium','maxSize':'128M'},\\\n"
+        + "{'group':'cs1_large'}] \ncompaction.service.cs2.planner="
         + "org.apache.accumulo.core.spi.compaction.RatioBasedCompactionPlanner 
\n"
         + "compaction.service.cs2.planner.opts.groups=\\\n"
-        + "[{'group':'small','maxSize':'16M'}, 
{'group':'medium','maxSize':'128M'},\\\n"
-        + "{'group':'large'}] \ncompaction.service.cs3.planner="
+        + "[{'group':'cs2_small','maxSize':'16M'}, 
{'group':'cs2_medium','maxSize':'128M'},\\\n"
+        + "{'group':'cs2_large'}] \ncompaction.service.cs3.planner="
         + "org.apache.accumulo.core.spi.compaction.RatioBasedCompactionPlanner 
\n"
         + "compaction.service.cs3.planner.opts.groups=\\\n"
-        + 
"[{'group':'small','maxSize':'16M'},{'group':'large'}]").replaceAll("'", "\"");
+        + 
"[{'group':'cs3_small','maxSize':'16M'},{'group':'cs3_large'}]").replaceAll("'",
 "\"");
 
     String filePath = writeToFileAndReturnPath(inputString);
     CheckCompactionConfig.main(new String[] {filePath});
@@ -178,4 +178,23 @@ public class CheckCompactionConfigTest extends 
WithTestNames {
     log.info("Wrote to path: {}\nWith string:\n{}", file.getAbsolutePath(), 
inputString);
     return file.getAbsolutePath();
   }
+
+  @Test
+  public void testGroupReuse() throws Exception {
+    String inputString = ("compaction.service.cs1.planner="
+        + "org.apache.accumulo.core.spi.compaction.RatioBasedCompactionPlanner 
\n"
+        + "compaction.service.cs1.planner.opts.groups=\\\n"
+        + 
"[{'group':'small','maxSize':'16M'},{'group':'medium','maxSize':'128M'},\\\n"
+        + "{'group':'large'}] \ncompaction.service.cs2.planner="
+        + "org.apache.accumulo.core.spi.compaction.RatioBasedCompactionPlanner 
\n"
+        + "compaction.service.cs2.planner.opts.groups=\\\n"
+        + 
"[{'group':'small','maxSize':'16M'},{'group':'medium','maxSize':'128M'},\\\n"
+        + "{'group':'large'}]").replaceAll("'", "\"");
+
+    String filePath = writeToFileAndReturnPath(inputString);
+
+    assertThrows(IllegalStateException.class,
+        () -> CheckCompactionConfig.main(new String[] {filePath}));
+  }
+
 }
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java
index d621b65a46..3b77816a1e 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java
@@ -81,6 +81,7 @@ import org.apache.accumulo.manager.state.TableStats;
 import org.apache.accumulo.manager.upgrade.UpgradeCoordinator;
 import org.apache.accumulo.server.ServiceEnvironmentImpl;
 import org.apache.accumulo.server.compaction.CompactionJobGenerator;
+import org.apache.accumulo.server.conf.CheckCompactionConfig;
 import org.apache.accumulo.server.conf.TableConfiguration;
 import org.apache.accumulo.server.fs.VolumeUtil;
 import org.apache.accumulo.server.log.WalStateManager;
@@ -440,6 +441,16 @@ abstract class TabletGroupWatcher extends 
AccumuloDaemonThread {
         new CompactionJobGenerator(new 
ServiceEnvironmentImpl(manager.getContext()),
             tableMgmtParams.getCompactionHints(), 
tableMgmtParams.getSteadyTime());
 
+    try {
+      CheckCompactionConfig.validate(manager.getConfiguration());
+    } catch (SecurityException | IllegalArgumentException | 
IllegalStateException
+        | ReflectiveOperationException e) {
+      LOG.error(
+          "Error validating compaction configuration, all compactions are 
paused until the configuration is fixed.",
+          e);
+      compactionGenerator = null;
+    }
+
     Set<TServerInstance> filteredServersToShutdown =
         new HashSet<>(tableMgmtParams.getServersToShutdown());
 
@@ -588,7 +599,7 @@ abstract class TabletGroupWatcher extends 
AccumuloDaemonThread {
         manager.getSplitter().initiateSplit(new SeedSplitTask(manager, 
tm.getExtent()));
       }
 
-      if (actions.contains(ManagementAction.NEEDS_COMPACTING)) {
+      if (actions.contains(ManagementAction.NEEDS_COMPACTING) && 
compactionGenerator != null) {
         var jobs = compactionGenerator.generateJobs(tm,
             TabletManagementIterator.determineCompactionKinds(actions));
         LOG.debug("{} may need compacting adding {} jobs", tm.getExtent(), 
jobs.size());

Reply via email to