This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/elasticity by this push: new 07e781c80f Added runtime check for shared compaction queues (#4800) 07e781c80f is described below commit 07e781c80f549459ce64cbc1e130dbe615db43b7 Author: Dave Marion <dlmar...@apache.org> AuthorDate: Mon Aug 19 07:49:18 2024 -0400 Added runtime check for shared compaction queues (#4800) Closes #4034 --- .../server/conf/CheckCompactionConfig.java | 28 ++++++++++++++-- .../server/conf/CheckCompactionConfigTest.java | 37 ++++++++++++++++------ .../accumulo/manager/TabletGroupWatcher.java | 13 +++++++- 3 files changed, 65 insertions(+), 13 deletions(-) diff --git a/server/base/src/main/java/org/apache/accumulo/server/conf/CheckCompactionConfig.java b/server/base/src/main/java/org/apache/accumulo/server/conf/CheckCompactionConfig.java index fb278fbfe5..d9c4ebcc50 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/conf/CheckCompactionConfig.java +++ b/server/base/src/main/java/org/apache/accumulo/server/conf/CheckCompactionConfig.java @@ -22,6 +22,10 @@ import static org.apache.accumulo.core.Constants.DEFAULT_COMPACTION_SERVICE_NAME import java.io.FileNotFoundException; import java.nio.file.Path; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Map.Entry; import java.util.Set; import org.apache.accumulo.core.cli.Help; @@ -31,6 +35,7 @@ import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.spi.common.ServiceEnvironment; import org.apache.accumulo.core.spi.compaction.CompactionPlanner; import org.apache.accumulo.core.spi.compaction.CompactionServiceId; +import org.apache.accumulo.core.spi.compaction.CompactorGroupId; import org.apache.accumulo.core.util.ConfigurationImpl; import org.apache.accumulo.core.util.compaction.CompactionPlannerInitParams; import org.apache.accumulo.core.util.compaction.CompactionServicesConfig; @@ -104,6 +109,7 @@ public class CheckCompactionConfig implements KeywordExecutable { return; } + Map<CompactorGroupId,Set<String>> groupToServices = new HashMap<>(); for (var entry : servicesConfig.getPlanners().entrySet()) { String serviceId = entry.getKey(); String plannerClassName = entry.getValue(); @@ -120,9 +126,25 @@ public class CheckCompactionConfig implements KeywordExecutable { planner.init(initParams); - initParams.getRequestedGroups().forEach( - (groupId -> log.info("Compaction service '{}' requested with compactor group '{}'", - serviceId, groupId))); + initParams.getRequestedGroups().forEach(groupId -> { + log.info("Compaction service '{}' requested with compactor group '{}'", serviceId, groupId); + groupToServices.computeIfAbsent(groupId, f -> new HashSet<>()).add(serviceId); + }); + } + + boolean dupesFound = false; + for (Entry<CompactorGroupId,Set<String>> e : groupToServices.entrySet()) { + if (e.getValue().size() > 1) { + log.warn("Compaction services " + e.getValue().toString() + + " mapped to the same compactor group: " + e.getKey()); + dupesFound = true; + } + } + + if (dupesFound) { + throw new IllegalStateException( + "Multiple compaction services configured to use the same group. This could lead" + + " to undesired behavior. Please fix the configuration"); } log.info("Properties file has passed all checks."); diff --git a/server/base/src/test/java/org/apache/accumulo/server/conf/CheckCompactionConfigTest.java b/server/base/src/test/java/org/apache/accumulo/server/conf/CheckCompactionConfigTest.java index 9bb7cc163f..7404380a78 100644 --- a/server/base/src/test/java/org/apache/accumulo/server/conf/CheckCompactionConfigTest.java +++ b/server/base/src/test/java/org/apache/accumulo/server/conf/CheckCompactionConfigTest.java @@ -65,12 +65,12 @@ public class CheckCompactionConfigTest extends WithTestNames { String inputString = ("compaction.service.cs1.planner=" + "org.apache.accumulo.core.spi.compaction.RatioBasedCompactionPlanner \n" + "compaction.service.cs1.planner.opts.groups=\\\n" - + "[{'group':'small','maxSize':'16M'},{'group':'medium','maxSize':'128M'},\\\n" - + "{'group':'large'}] \ncompaction.service.cs2.planner=" + + "[{'group':'cs1_small','maxSize':'16M'},{'group':'cs1_medium','maxSize':'128M'},\\\n" + + "{'group':'cs1_large'}] \ncompaction.service.cs2.planner=" + "org.apache.accumulo.core.spi.compaction.RatioBasedCompactionPlanner \n" + "compaction.service.cs2.planner.opts.groups=\\\n" - + "[{'group':'small','maxSize':'16M'},{'group':'medium','maxSize':'128M'},\\\n" - + "{'group':'large'}]").replaceAll("'", "\""); + + "[{'group':'cs2_small','maxSize':'16M'},{'group':'cs2_medium','maxSize':'128M'},\\\n" + + "{'group':'cs2_large'}]").replaceAll("'", "\""); String filePath = writeToFileAndReturnPath(inputString); @@ -82,15 +82,15 @@ public class CheckCompactionConfigTest extends WithTestNames { String inputString = ("compaction.service.cs1.planner=" + "org.apache.accumulo.core.spi.compaction.RatioBasedCompactionPlanner \n" + "compaction.service.cs1.planner.opts.groups=\\\n" - + "[{'group':'small','maxSize':'16M'},{'group':'medium','maxSize':'128M'},\\\n" - + "{'group':'large'}] \ncompaction.service.cs2.planner=" + + "[{'group':'cs1_small','maxSize':'16M'},{'group':'cs1_medium','maxSize':'128M'},\\\n" + + "{'group':'cs1_large'}] \ncompaction.service.cs2.planner=" + "org.apache.accumulo.core.spi.compaction.RatioBasedCompactionPlanner \n" + "compaction.service.cs2.planner.opts.groups=\\\n" - + "[{'group':'small','maxSize':'16M'}, {'group':'medium','maxSize':'128M'},\\\n" - + "{'group':'large'}] \ncompaction.service.cs3.planner=" + + "[{'group':'cs2_small','maxSize':'16M'}, {'group':'cs2_medium','maxSize':'128M'},\\\n" + + "{'group':'cs2_large'}] \ncompaction.service.cs3.planner=" + "org.apache.accumulo.core.spi.compaction.RatioBasedCompactionPlanner \n" + "compaction.service.cs3.planner.opts.groups=\\\n" - + "[{'group':'small','maxSize':'16M'},{'group':'large'}]").replaceAll("'", "\""); + + "[{'group':'cs3_small','maxSize':'16M'},{'group':'cs3_large'}]").replaceAll("'", "\""); String filePath = writeToFileAndReturnPath(inputString); CheckCompactionConfig.main(new String[] {filePath}); @@ -178,4 +178,23 @@ public class CheckCompactionConfigTest extends WithTestNames { log.info("Wrote to path: {}\nWith string:\n{}", file.getAbsolutePath(), inputString); return file.getAbsolutePath(); } + + @Test + public void testGroupReuse() throws Exception { + String inputString = ("compaction.service.cs1.planner=" + + "org.apache.accumulo.core.spi.compaction.RatioBasedCompactionPlanner \n" + + "compaction.service.cs1.planner.opts.groups=\\\n" + + "[{'group':'small','maxSize':'16M'},{'group':'medium','maxSize':'128M'},\\\n" + + "{'group':'large'}] \ncompaction.service.cs2.planner=" + + "org.apache.accumulo.core.spi.compaction.RatioBasedCompactionPlanner \n" + + "compaction.service.cs2.planner.opts.groups=\\\n" + + "[{'group':'small','maxSize':'16M'},{'group':'medium','maxSize':'128M'},\\\n" + + "{'group':'large'}]").replaceAll("'", "\""); + + String filePath = writeToFileAndReturnPath(inputString); + + assertThrows(IllegalStateException.class, + () -> CheckCompactionConfig.main(new String[] {filePath})); + } + } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java index d621b65a46..3b77816a1e 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java @@ -81,6 +81,7 @@ import org.apache.accumulo.manager.state.TableStats; import org.apache.accumulo.manager.upgrade.UpgradeCoordinator; import org.apache.accumulo.server.ServiceEnvironmentImpl; import org.apache.accumulo.server.compaction.CompactionJobGenerator; +import org.apache.accumulo.server.conf.CheckCompactionConfig; import org.apache.accumulo.server.conf.TableConfiguration; import org.apache.accumulo.server.fs.VolumeUtil; import org.apache.accumulo.server.log.WalStateManager; @@ -440,6 +441,16 @@ abstract class TabletGroupWatcher extends AccumuloDaemonThread { new CompactionJobGenerator(new ServiceEnvironmentImpl(manager.getContext()), tableMgmtParams.getCompactionHints(), tableMgmtParams.getSteadyTime()); + try { + CheckCompactionConfig.validate(manager.getConfiguration()); + } catch (SecurityException | IllegalArgumentException | IllegalStateException + | ReflectiveOperationException e) { + LOG.error( + "Error validating compaction configuration, all compactions are paused until the configuration is fixed.", + e); + compactionGenerator = null; + } + Set<TServerInstance> filteredServersToShutdown = new HashSet<>(tableMgmtParams.getServersToShutdown()); @@ -588,7 +599,7 @@ abstract class TabletGroupWatcher extends AccumuloDaemonThread { manager.getSplitter().initiateSplit(new SeedSplitTask(manager, tm.getExtent())); } - if (actions.contains(ManagementAction.NEEDS_COMPACTING)) { + if (actions.contains(ManagementAction.NEEDS_COMPACTING) && compactionGenerator != null) { var jobs = compactionGenerator.generateJobs(tm, TabletManagementIterator.determineCompactionKinds(actions)); LOG.debug("{} may need compacting adding {} jobs", tm.getExtent(), jobs.size());