This is an automated email from the ASF dual-hosted git repository. ddanielr pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push: new 9dd819437a Moves compaction properties and adds 'compaction.opts.queue' property. (#3915) 9dd819437a is described below commit 9dd819437a5e553004ce329acec369b6394be3ac Author: Daniel Roberts <ddani...@gmail.com> AuthorDate: Fri Nov 24 22:22:46 2023 -0500 Moves compaction properties and adds 'compaction.opts.queue' property. (#3915) * Migrated compaction properties away from the "TSERV" prefix to `compaction.service` prefix and added "queues" property support for defining external compaction queues. * Adds additional property validation to ignore properties that are defined using the old compaction service prefix. * Adds a top-level `compaction` prefix for all compaction based properties that are separate from specific server functions. * Moves `warn.time` under this `compaction` prefix. Passes in the prefix used so fully qualified properties are accurate. * Adds additional json parsing to compare field names and drop any additional field information that is not needed by the DefaultCompactionPlanner. --------- Co-authored-by: Keith Turner <ktur...@apache.org> --- .../org/apache/accumulo/core/conf/Property.java | 36 ++++- .../spi/compaction/DefaultCompactionPlanner.java | 179 ++++++++++++++++----- .../compaction/CompactionPlannerInitParams.java | 9 +- .../util/compaction/CompactionServicesConfig.java | 112 ++++++++++--- .../compaction/DefaultCompactionPlannerTest.java | 166 ++++++++++++++++++- .../compaction/CompactionServicesConfigTest.java | 105 ++++++++++++ .../server/compaction/CompactionWatcher.java | 2 +- .../server/conf/CheckCompactionConfig.java | 3 +- .../server/conf/CheckCompactionConfigTest.java | 47 +++++- .../tserver/compactions/CompactionManager.java | 7 +- .../tserver/compactions/CompactionService.java | 14 +- .../compaction/BadCompactionServiceConfigIT.java | 2 +- .../test/compaction/CompactionConfigChangeIT.java | 6 +- .../test/compaction/CompactionExecutorIT.java | 21 +-- .../apache/accumulo/test/shell/ConfigSetIT.java | 2 +- .../org/apache/accumulo/test/util/SlowOps.java | 1 + 16 files changed, 612 insertions(+), 100 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java index 6c1296fa17..3ab9924d8f 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java @@ -44,6 +44,23 @@ import org.slf4j.LoggerFactory; import com.google.common.base.Preconditions; public enum Property { + COMPACTION_PREFIX("compaction.", null, PropertyType.PREFIX, + "Both major and minor compaction properties can be included under this prefix.", "3.1.0"), + COMPACTION_SERVICE_PREFIX(COMPACTION_PREFIX + "service.", null, PropertyType.PREFIX, + "This prefix should be used to define all properties for the compaction services." + + "See {% jlink -f org.apache.accumulo.core.spi.compaction.DefaultCompactionPlanner %}.\n" + + "A new external compaction service would be defined like the following:\n" + + "`compaction.service.newService.planner=" + + "\"org.apache.accumulo.core.spi.compaction.DefaultCompactionPlanner\".`\n" + + "`compaction.service.newService.opts.queues=\"" + + "[{\"name\": \"small\", \"maxSize\":\"32M\"}," + + "{ \"name\":\"medium\", \"maxSize\":\"512M\"},{\"name\":\"large\"}]`\n" + + "`compaction.service.newService.opts.maxOpen=50`.\n" + + "Additional options can be defined using the `compaction.service.<service>.opts.<option>` property.", + "3.1.0"), + COMPACTION_WARN_TIME(COMPACTION_PREFIX + "warn.time", "10m", PropertyType.TIMEDURATION, + "When a compaction has not made progress for this time period, a warning will be logged.", + "3.1.0"), // SSL properties local to each node (see also instance.ssl.enabled which must be consistent // across all nodes in an instance) RPC_PREFIX("rpc.", null, PropertyType.PREFIX, @@ -205,12 +222,12 @@ public enum Property { + "encryption, replace this classname with an implementation of the" + "org.apache.accumulo.core.spi.crypto.CryptoFactory interface.", "2.1.0"), - // general properties GENERAL_PREFIX("general.", null, PropertyType.PREFIX, "Properties in this category affect the behavior of accumulo overall, but" + " do not have to be consistent throughout a cloud.", "1.3.5"), + GENERAL_CONTEXT_CLASSLOADER_FACTORY("general.context.class.loader.factory", "", PropertyType.CLASSNAME, "Name of classloader factory to be used to create classloaders for named contexts," @@ -568,8 +585,11 @@ public enum Property { "The maximum number of concurrent tablet migrations for a tablet server.", "1.3.5"), TSERV_MAJC_DELAY("tserver.compaction.major.delay", "30s", PropertyType.TIMEDURATION, "Time a tablet server will sleep between checking which tablets need compaction.", "1.3.5"), + @Deprecated(since = "3.1", forRemoval = true) + @ReplacedBy(property = COMPACTION_SERVICE_PREFIX) TSERV_COMPACTION_SERVICE_PREFIX("tserver.compaction.major.service.", null, PropertyType.PREFIX, "Prefix for compaction services.", "2.1.0"), + @Deprecated(since = "3.1", forRemoval = true) TSERV_COMPACTION_SERVICE_ROOT_PLANNER("tserver.compaction.major.service.root.planner", DefaultCompactionPlanner.class.getName(), PropertyType.CLASSNAME, "Compaction planner for root tablet service.", "2.1.0"), @@ -579,9 +599,11 @@ public enum Property { "Maximum number of bytes to read or write per second over all major" + " compactions in this compaction service, or 0B for unlimited.", "2.1.0"), + @Deprecated(since = "3.1", forRemoval = true) TSERV_COMPACTION_SERVICE_ROOT_MAX_OPEN( "tserver.compaction.major.service.root.planner.opts.maxOpen", "30", PropertyType.COUNT, "The maximum number of files a compaction will open.", "2.1.0"), + @Deprecated(since = "3.1", forRemoval = true) TSERV_COMPACTION_SERVICE_ROOT_EXECUTORS( "tserver.compaction.major.service.root.planner.opts.executors", "[{'name':'small','type':'internal','maxSize':'32M','numThreads':1},{'name':'huge','type':'internal','numThreads':1}]" @@ -589,6 +611,7 @@ public enum Property { PropertyType.STRING, "See {% jlink -f org.apache.accumulo.core.spi.compaction.DefaultCompactionPlanner %}.", "2.1.0"), + @Deprecated(since = "3.1", forRemoval = true) TSERV_COMPACTION_SERVICE_META_PLANNER("tserver.compaction.major.service.meta.planner", DefaultCompactionPlanner.class.getName(), PropertyType.CLASSNAME, "Compaction planner for metadata table.", "2.1.0"), @@ -598,9 +621,11 @@ public enum Property { "Maximum number of bytes to read or write per second over all major" + " compactions in this compaction service, or 0B for unlimited.", "2.1.0"), + @Deprecated(since = "3.1", forRemoval = true) TSERV_COMPACTION_SERVICE_META_MAX_OPEN( "tserver.compaction.major.service.meta.planner.opts.maxOpen", "30", PropertyType.COUNT, "The maximum number of files a compaction will open.", "2.1.0"), + @Deprecated(since = "3.1", forRemoval = true) TSERV_COMPACTION_SERVICE_META_EXECUTORS( "tserver.compaction.major.service.meta.planner.opts.executors", "[{'name':'small','type':'internal','maxSize':'32M','numThreads':2},{'name':'huge','type':'internal','numThreads':2}]" @@ -608,6 +633,7 @@ public enum Property { PropertyType.JSON, "See {% jlink -f org.apache.accumulo.core.spi.compaction.DefaultCompactionPlanner %}.", "2.1.0"), + @Deprecated(since = "3.1", forRemoval = true) TSERV_COMPACTION_SERVICE_DEFAULT_PLANNER("tserver.compaction.major.service.default.planner", DefaultCompactionPlanner.class.getName(), PropertyType.CLASSNAME, "Planner for default compaction service.", "2.1.0"), @@ -617,9 +643,11 @@ public enum Property { "Maximum number of bytes to read or write per second over all major" + " compactions in this compaction service, or 0B for unlimited.", "2.1.0"), + @Deprecated(since = "3.1", forRemoval = true) TSERV_COMPACTION_SERVICE_DEFAULT_MAX_OPEN( "tserver.compaction.major.service.default.planner.opts.maxOpen", "10", PropertyType.COUNT, "The maximum number of files a compaction will open.", "2.1.0"), + @Deprecated(since = "3.1", forRemoval = true) TSERV_COMPACTION_SERVICE_DEFAULT_EXECUTORS( "tserver.compaction.major.service.default.planner.opts.executors", "[{'name':'small','type':'internal','maxSize':'32M','numThreads':2},{'name':'medium','type':'internal','maxSize':'128M','numThreads':2},{'name':'large','type':'internal','numThreads':2}]" @@ -629,6 +657,8 @@ public enum Property { "2.1.0"), TSERV_MINC_MAXCONCURRENT("tserver.compaction.minor.concurrent.max", "4", PropertyType.COUNT, "The maximum number of concurrent minor compactions for a tablet server.", "1.3.5"), + @Deprecated(since = "3.1") + @ReplacedBy(property = COMPACTION_WARN_TIME) TSERV_COMPACTION_WARN_TIME("tserver.compaction.warn.time", "10m", PropertyType.TIMEDURATION, "When a compaction has not made progress for this time period, a warning will be logged.", "1.6.0"), @@ -1315,6 +1345,8 @@ public enum Property { ReplacedBy rb = getAnnotation(ReplacedBy.class); if (rb != null) { replacedBy = rb.property(); + } else { + isReplaced = false; } annotationsComputed = true; } @@ -1455,9 +1487,11 @@ public enum Property { // white list prefixes return key.startsWith(Property.TABLE_PREFIX.getKey()) || key.startsWith(Property.TSERV_PREFIX.getKey()) + || key.startsWith(Property.COMPACTION_SERVICE_PREFIX.getKey()) || key.startsWith(Property.MANAGER_PREFIX.getKey()) || key.startsWith(Property.GC_PREFIX.getKey()) || key.startsWith(Property.GENERAL_ARBITRARY_PROP_PREFIX.getKey()) + || key.equals(Property.COMPACTION_WARN_TIME.getKey()) || key.equals(Property.GENERAL_FILE_NAME_ALLOCATION_BATCH_SIZE_MIN.getKey()) || key.equals(Property.GENERAL_FILE_NAME_ALLOCATION_BATCH_SIZE_MAX.getKey()); } diff --git a/core/src/main/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlanner.java b/core/src/main/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlanner.java index bf30ceb2ad..cf6c099bf3 100644 --- a/core/src/main/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlanner.java +++ b/core/src/main/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlanner.java @@ -20,9 +20,11 @@ package org.apache.accumulo.core.spi.compaction; import static org.apache.accumulo.core.util.LazySingletons.GSON; +import java.lang.reflect.Field; import java.net.URI; import java.net.URISyntaxException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.Comparator; @@ -30,12 +32,19 @@ import java.util.HashSet; import java.util.List; import java.util.Objects; import java.util.Set; +import java.util.stream.Collectors; import org.apache.accumulo.core.client.admin.compaction.CompactableFile; import org.apache.accumulo.core.conf.ConfigurationTypeHelper; import org.apache.accumulo.core.util.compaction.CompactionJobPrioritizer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.common.base.Preconditions; +import com.google.gson.JsonArray; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonParseException; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; @@ -80,16 +89,19 @@ import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; * </tr> * </table> * <br> - * The maxSize field determines the maximum size of compaction that will run on an executor. The - * maxSize field can have a suffix of K,M,G for kilobytes, megabytes, or gigabytes and represents - * the sum of the input files for a given compaction. One executor can have no max size and it will - * run everything that is too large for the other executors. If all executors have a max size, then - * system compactions will only run for compactions smaller than the largest max size. User, chop, - * and selector compactions will always run, even if there is no executor for their size. These - * compactions will run on the executor with the largest max size. The following example value for - * this property will create 3 threads to run compactions of files whose file size sum is less than - * 100M, 3 threads to run compactions of files whose file size sum is less than 500M, and run all - * other compactions on Compactors configured to run compactions for Queue1: + * Note: The "executors" option has been deprecated in 3.1 and will be removed in a future release. + * The property prefix "tserver.compaction.major.service" has also been deprecated in 3.1 and will + * be removed in a future release. The maxSize field determines the maximum size of compaction that + * will run on an executor. The maxSize field can have a suffix of K,M,G for kilobytes, megabytes, + * or gigabytes and represents the sum of the input files for a given compaction. One executor can + * have no max size and it will run everything that is too large for the other executors. If all + * executors have a max size, then system compactions will only run for compactions smaller than the + * largest max size. User, chop, and selector compactions will always run, even if there is no + * executor for their size. These compactions will run on the executor with the largest max size. + * The following example value for this property will create 3 threads to run compactions of files + * whose file size sum is less than 100M, 3 threads to run compactions of files whose file size sum + * is less than 500M, and run all other compactions on Compactors configured to run compactions for + * Queue1: * * <pre> * {@code @@ -102,16 +114,38 @@ import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; * * Note that the use of 'external' requires that the CompactionCoordinator and at least one * Compactor for Queue1 is running. - * <li>{@code tserver.compaction.major.service.<service>.opts.maxOpen} This determines the maximum - * number of files that will be included in a single compaction. + * <li>{@code compaction.service.<service>.opts.maxOpen} This determines the maximum number of files + * that will be included in a single compaction. + * <li>{@code compaction.service.<service>.opts.queues} This is a json array of queue objects which + * have the following fields: + * <table> + * <caption>Default Compaction Planner Queue options</caption> + * <tr> + * <th>Field Name</th> + * <th>Description</th> + * </tr> + * <tr> + * <td>name</td> + * <td>name or alias of the queue (required)</td> + * </tr> + * <tr> + * <td>maxSize</td> + * <td>threshold sum of the input files (required for all but one of the configs)</td> + * </tr> + * </table> + * <br> + * This 'queues' object is used for defining external compaction queues without needing to use the + * thread-based 'executors' property. * </ul> * - * @since 2.1.0 + * @since 3.1.0 * @see org.apache.accumulo.core.spi.compaction */ public class DefaultCompactionPlanner implements CompactionPlanner { + private final static Logger log = LoggerFactory.getLogger(DefaultCompactionPlanner.class); + private static class ExecutorConfig { String type; String name; @@ -120,6 +154,11 @@ public class DefaultCompactionPlanner implements CompactionPlanner { String queue; } + private static class QueueConfig { + String name; + String maxSize; + } + private static class Executor { final CompactionExecutorId ceid; final Long maxSize; @@ -147,44 +186,78 @@ public class DefaultCompactionPlanner implements CompactionPlanner { justification = "Field is written by Gson") @Override public void init(InitParameters params) { - ExecutorConfig[] execConfigs = - GSON.get().fromJson(params.getOptions().get("executors"), ExecutorConfig[].class); - List<Executor> tmpExec = new ArrayList<>(); + String values; + + if (params.getOptions().containsKey("executors") + && !params.getOptions().get("executors").isBlank()) { + values = params.getOptions().get("executors"); - for (ExecutorConfig executorConfig : execConfigs) { - Long maxSize = executorConfig.maxSize == null ? null - : ConfigurationTypeHelper.getFixedMemoryAsBytes(executorConfig.maxSize); + // Generate a list of fields from the desired object. + final List<String> execFields = Arrays.stream(ExecutorConfig.class.getDeclaredFields()) + .map(Field::getName).collect(Collectors.toList()); - CompactionExecutorId ceid; + for (JsonElement element : GSON.get().fromJson(values, JsonArray.class)) { + validateConfig(element, execFields, ExecutorConfig.class.getName()); + ExecutorConfig executorConfig = GSON.get().fromJson(element, ExecutorConfig.class); - // If not supplied, GSON will leave type null. Default to internal - if (executorConfig.type == null) { - executorConfig.type = "internal"; + Long maxSize = executorConfig.maxSize == null ? null + : ConfigurationTypeHelper.getFixedMemoryAsBytes(executorConfig.maxSize); + CompactionExecutorId ceid; + + // If not supplied, GSON will leave type null. Default to internal + if (executorConfig.type == null) { + executorConfig.type = "internal"; + } + + switch (executorConfig.type) { + case "internal": + Preconditions.checkArgument(null == executorConfig.queue, + "'queue' should not be specified for internal compactions"); + int numThreads = Objects.requireNonNull(executorConfig.numThreads, + "'numThreads' must be specified for internal type"); + ceid = params.getExecutorManager().createExecutor(executorConfig.name, numThreads); + break; + case "external": + Preconditions.checkArgument(null == executorConfig.numThreads, + "'numThreads' should not be specified for external compactions"); + String queue = Objects.requireNonNull(executorConfig.queue, + "'queue' must be specified for external type"); + ceid = params.getExecutorManager().getExternalExecutor(queue); + break; + default: + throw new IllegalArgumentException("type must be 'internal' or 'external'"); + } + tmpExec.add(new Executor(ceid, maxSize)); } + } - switch (executorConfig.type) { - case "internal": - Preconditions.checkArgument(null == executorConfig.queue, - "'queue' should not be specified for internal compactions"); - int numThreads = Objects.requireNonNull(executorConfig.numThreads, - "'numThreads' must be specified for internal type"); - ceid = params.getExecutorManager().createExecutor(executorConfig.name, numThreads); - break; - case "external": - Preconditions.checkArgument(null == executorConfig.numThreads, - "'numThreads' should not be specified for external compactions"); - String queue = Objects.requireNonNull(executorConfig.queue, - "'queue' must be specified for external type"); - ceid = params.getExecutorManager().getExternalExecutor(queue); - break; - default: - throw new IllegalArgumentException("type must be 'internal' or 'external'"); + if (params.getOptions().containsKey("queues") && !params.getOptions().get("queues").isBlank()) { + values = params.getOptions().get("queues"); + + // Generate a list of fields from the desired object. + final List<String> queueFields = Arrays.stream(QueueConfig.class.getDeclaredFields()) + .map(Field::getName).collect(Collectors.toList()); + + for (JsonElement element : GSON.get().fromJson(values, JsonArray.class)) { + validateConfig(element, queueFields, QueueConfig.class.getName()); + QueueConfig queueConfig = GSON.get().fromJson(element, QueueConfig.class); + + Long maxSize = queueConfig.maxSize == null ? null + : ConfigurationTypeHelper.getFixedMemoryAsBytes(queueConfig.maxSize); + + CompactionExecutorId ceid; + String queue = Objects.requireNonNull(queueConfig.name, "'name' must be specified"); + ceid = params.getExecutorManager().getExternalExecutor(queue); + tmpExec.add(new Executor(ceid, maxSize)); } - tmpExec.add(new Executor(ceid, maxSize)); } - Collections.sort(tmpExec, Comparator.comparing(Executor::getMaxSize, + if (tmpExec.size() < 1) { + throw new IllegalStateException("No defined executors or queues for this planner"); + } + + tmpExec.sort(Comparator.comparing(Executor::getMaxSize, Comparator.nullsLast(Comparator.naturalOrder()))); executors = List.copyOf(tmpExec); @@ -207,7 +280,27 @@ public class DefaultCompactionPlanner implements CompactionPlanner { } private void determineMaxFilesToCompact(InitParameters params) { - this.maxFilesToCompact = Integer.parseInt(params.getOptions().getOrDefault("maxOpen", "10")); + + String maxOpen = params.getOptions().get("maxOpen"); + if (maxOpen == null) { + maxOpen = "10"; + log.trace("default maxOpen not set, defaulting to 10"); + } + this.maxFilesToCompact = Integer.parseInt(maxOpen); + } + + private void validateConfig(JsonElement json, List<String> fields, String className) { + + JsonObject jsonObject = GSON.get().fromJson(json, JsonObject.class); + + List<String> objectProperties = new ArrayList<>(jsonObject.keySet()); + HashSet<String> classFieldNames = new HashSet<>(fields); + + if (!classFieldNames.containsAll(objectProperties)) { + objectProperties.removeAll(classFieldNames); + throw new JsonParseException( + "Invalid fields: " + objectProperties + " provided for class: " + className); + } } @Override diff --git a/core/src/main/java/org/apache/accumulo/core/util/compaction/CompactionPlannerInitParams.java b/core/src/main/java/org/apache/accumulo/core/util/compaction/CompactionPlannerInitParams.java index 0f79ce4df0..3a3f72c699 100644 --- a/core/src/main/java/org/apache/accumulo/core/util/compaction/CompactionPlannerInitParams.java +++ b/core/src/main/java/org/apache/accumulo/core/util/compaction/CompactionPlannerInitParams.java @@ -23,7 +23,6 @@ import java.util.HashSet; import java.util.Map; import java.util.Set; -import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.spi.common.ServiceEnvironment; import org.apache.accumulo.core.spi.compaction.CompactionExecutorId; import org.apache.accumulo.core.spi.compaction.CompactionPlanner; @@ -38,14 +37,16 @@ public class CompactionPlannerInitParams implements CompactionPlanner.InitParame private final Set<CompactionExecutorId> requestedExternalExecutors; private final ServiceEnvironment senv; private final CompactionServiceId serviceId; + private final String prefix; - public CompactionPlannerInitParams(CompactionServiceId serviceId, Map<String,String> plannerOpts, - ServiceEnvironment senv) { + public CompactionPlannerInitParams(CompactionServiceId serviceId, String prefix, + Map<String,String> plannerOpts, ServiceEnvironment senv) { this.serviceId = serviceId; this.plannerOpts = plannerOpts; this.requestedExecutors = new HashMap<>(); this.requestedExternalExecutors = new HashSet<>(); this.senv = senv; + this.prefix = prefix; } @Override @@ -60,7 +61,7 @@ public class CompactionPlannerInitParams implements CompactionPlanner.InitParame @Override public String getFullyQualifiedOption(String key) { - return Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey() + serviceId + ".opts." + key; + return prefix + serviceId + ".opts." + key; } @Override diff --git a/core/src/main/java/org/apache/accumulo/core/util/compaction/CompactionServicesConfig.java b/core/src/main/java/org/apache/accumulo/core/util/compaction/CompactionServicesConfig.java index b3f911c467..5cf9d0abc8 100644 --- a/core/src/main/java/org/apache/accumulo/core/util/compaction/CompactionServicesConfig.java +++ b/core/src/main/java/org/apache/accumulo/core/util/compaction/CompactionServicesConfig.java @@ -21,11 +21,14 @@ package org.apache.accumulo.core.util.compaction; import java.util.HashMap; import java.util.Map; import java.util.Objects; +import java.util.stream.Collectors; import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.ConfigurationTypeHelper; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.spi.compaction.CompactionServiceId; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.common.collect.Sets; @@ -33,13 +36,18 @@ import com.google.common.collect.Sets; * This class serves to configure compaction services from an {@link AccumuloConfiguration} object. * * Specifically, compaction service properties (those prefixed by "tserver.compaction.major - * .service") are used. + * .service" or "compaction.service") are used. */ public class CompactionServicesConfig { + private static final Logger log = LoggerFactory.getLogger(CompactionServicesConfig.class); private final Map<String,String> planners = new HashMap<>(); + private final Map<String,String> plannerPrefixes = new HashMap<>(); private final Map<String,Long> rateLimits = new HashMap<>(); private final Map<String,Map<String,String>> options = new HashMap<>(); + @SuppressWarnings("removal") + private final Property oldPrefix = Property.TSERV_COMPACTION_SERVICE_PREFIX; + private final Property newPrefix = Property.COMPACTION_SERVICE_PREFIX; long defaultRateLimit; public static final CompactionServiceId DEFAULT_SERVICE = CompactionServiceId.of("default"); @@ -50,31 +58,91 @@ public class CompactionServicesConfig { .getMemoryAsBytes(Property.TSERV_COMPACTION_SERVICE_DEFAULT_RATE_LIMIT.getDefaultValue()); } - private Map<String,String> getConfiguration(AccumuloConfiguration aconf) { - return aconf.getAllPropertiesWithPrefix(Property.TSERV_COMPACTION_SERVICE_PREFIX); + private Map<String,Map<String,String>> getConfiguration(AccumuloConfiguration aconf) { + Map<String,Map<String,String>> properties = new HashMap<>(); + + var newProps = aconf.getAllPropertiesWithPrefixStripped(newPrefix); + properties.put(newPrefix.getKey(), newProps); + + // get all of the services under the new prefix + var newServices = + newProps.keySet().stream().map(prop -> prop.split("\\.")[0]).collect(Collectors.toSet()); + + Map<String,String> oldServices = new HashMap<>(); + + for (Map.Entry<String,String> entry : aconf.getAllPropertiesWithPrefixStripped(oldPrefix) + .entrySet()) { + // Discard duplicate service definitions + var service = entry.getKey().split("\\.")[0]; + if (newServices.contains(service)) { + log.warn("Duplicate compaction service '{}' definition exists. Ignoring property : '{}'", + service, entry.getKey()); + } else { + oldServices.put(entry.getKey(), entry.getValue()); + } + } + properties.put(oldPrefix.getKey(), oldServices); + // Return unmodifiable map + return Map.copyOf(properties); } public CompactionServicesConfig(AccumuloConfiguration aconf) { - Map<String,String> configs = getConfiguration(aconf); - - configs.forEach((prop, val) -> { - - var suffix = prop.substring(Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey().length()); - String[] tokens = suffix.split("\\."); - if (tokens.length == 4 && tokens[1].equals("planner") && tokens[2].equals("opts")) { - options.computeIfAbsent(tokens[0], k -> new HashMap<>()).put(tokens[3], val); - } else if (tokens.length == 2 && tokens[1].equals("planner")) { - planners.put(tokens[0], val); - } else if (tokens.length == 3 && tokens[1].equals("rate") && tokens[2].equals("limit")) { - var eprop = Property.getPropertyByKey(prop); - if (eprop == null || aconf.isPropertySet(eprop)) { - rateLimits.put(tokens[0], ConfigurationTypeHelper.getFixedMemoryAsBytes(val)); + Map<String,Map<String,String>> configs = getConfiguration(aconf); + + // Find compaction planner defs first. + configs.forEach((prefix, props) -> { + props.forEach((prop, val) -> { + String[] tokens = prop.split("\\."); + if (tokens.length == 2 && tokens[1].equals("planner")) { + if (prefix.equals(oldPrefix.getKey())) { + // Log a warning if the old prefix planner is defined by a user. + Property userDefined = null; + try { + userDefined = Property.valueOf(prefix + prop); + } catch (IllegalArgumentException e) { + log.trace("Property: {} is not set by default configuration", prefix + prop); + } + boolean isPropSet = true; + if (userDefined != null) { + isPropSet = aconf.isPropertySet(userDefined); + } + if (isPropSet) { + log.warn( + "Found compaction planner '{}' using a deprecated prefix. Please update property to use the '{}' prefix", + tokens[0], newPrefix); + } + } + plannerPrefixes.put(tokens[0], prefix); + planners.put(tokens[0], val); } - } else { - throw new IllegalArgumentException("Malformed compaction service property " + prop); - } + }); }); + // Now find all compaction planner options. + configs.forEach((prefix, props) -> { + props.forEach((prop, val) -> { + String[] tokens = prop.split("\\."); + if (!plannerPrefixes.containsKey(tokens[0])) { + throw new IllegalArgumentException( + "Incomplete compaction service definition, missing planner class: " + prop); + } + if (tokens.length == 4 && tokens[1].equals("planner") && tokens[2].equals("opts")) { + options.computeIfAbsent(tokens[0], k -> new HashMap<>()).put(tokens[3], val); + } else if (tokens.length == 3 && tokens[1].equals("rate") && tokens[2].equals("limit")) { + var eprop = Property.getPropertyByKey(prop); + if (eprop == null || aconf.isPropertySet(eprop)) { + rateLimits.put(tokens[0], ConfigurationTypeHelper.getFixedMemoryAsBytes(val)); + } + } else if (!(tokens.length == 2 && tokens[1].equals("planner"))) { + throw new IllegalArgumentException( + "Malformed compaction service property " + prefix + prop); + } else { + log.warn( + "Ignoring compaction property {} as does not match the prefix used by the referenced planner definition", + prop); + } + }); + }); defaultRateLimit = getDefaultThroughput(); var diff = Sets.difference(options.keySet(), planners.keySet()); @@ -110,6 +178,10 @@ public class CompactionServicesConfig { return planners; } + public String getPlannerPrefix(String service) { + return plannerPrefixes.get(service); + } + public Map<String,Long> getRateLimits() { return rateLimits; } diff --git a/core/src/test/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlannerTest.java b/core/src/test/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlannerTest.java index 06deb2e219..5ae90a4f2d 100644 --- a/core/src/test/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlannerTest.java +++ b/core/src/test/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlannerTest.java @@ -19,7 +19,9 @@ package org.apache.accumulo.core.spi.compaction; import static com.google.common.collect.MoreCollectors.onlyElement; +import static org.apache.accumulo.core.spi.compaction.CompactionPlanner.InitParameters; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; @@ -44,6 +46,8 @@ import org.apache.accumulo.core.util.compaction.CompactionPlanImpl; import org.easymock.EasyMock; import org.junit.jupiter.api.Test; +import com.google.gson.JsonParseException; + public class DefaultCompactionPlannerTest { private static <T> T getOnlyElement(Collection<T> c) { @@ -239,6 +243,74 @@ public class DefaultCompactionPlannerTest { assertEquals(CompactionExecutorIdImpl.externalId("large"), job.getExecutor()); } + @Test + public void testQueueCreation() throws Exception { + DefaultCompactionPlanner planner = new DefaultCompactionPlanner(); + Configuration conf = EasyMock.createMock(Configuration.class); + EasyMock.expect(conf.isSet(EasyMock.anyString())).andReturn(false).anyTimes(); + + ServiceEnvironment senv = EasyMock.createMock(ServiceEnvironment.class); + EasyMock.expect(senv.getConfiguration()).andReturn(conf).anyTimes(); + EasyMock.replay(conf, senv); + + String queues = "[{\"name\": \"small\", \"maxSize\":\"32M\"},{\"name\":\"midsize\"}]"; + planner.init(getInitParamQueues(senv, queues)); + + var all = createCFs("F1", "1M", "F2", "1M", "F3", "1M", "F4", "1M"); + var params = createPlanningParams(all, all, Set.of(), 2, CompactionKind.SYSTEM); + var plan = planner.makePlan(params); + + var job = getOnlyElement(plan.getJobs()); + assertEquals(all, job.getFiles()); + assertEquals(CompactionExecutorIdImpl.externalId("small"), job.getExecutor()); + + all = createCFs("F1", "100M", "F2", "100M", "F3", "100M", "F4", "100M"); + params = createPlanningParams(all, all, Set.of(), 2, CompactionKind.SYSTEM); + plan = planner.makePlan(params); + + job = getOnlyElement(plan.getJobs()); + assertEquals(all, job.getFiles()); + assertEquals(CompactionExecutorIdImpl.externalId("midsize"), job.getExecutor()); + } + + /** + * Tests that additional fields in the JSON objects cause errors to be thrown. + */ + @Test + public void testErrorAdditionalConfigFields() { + Configuration conf = EasyMock.createMock(Configuration.class); + EasyMock.expect(conf.isSet(EasyMock.anyString())).andReturn(false).anyTimes(); + + ServiceEnvironment senv = EasyMock.createMock(ServiceEnvironment.class); + EasyMock.expect(senv.getConfiguration()).andReturn(conf).anyTimes(); + EasyMock.replay(conf, senv); + + DefaultCompactionPlanner QueuePlanner = new DefaultCompactionPlanner(); + + String queues = + "[{\"name\":\"smallQueue\", \"maxSize\":\"32M\"}, {\"name\":\"largeQueue\", \"type\":\"internal\", \"foo\":\"bar\", \"queue\":\"broken\"}]"; + + final InitParameters queueParams = getInitParamQueues(senv, queues); + assertNotNull(queueParams); + var e = assertThrows(JsonParseException.class, () -> QueuePlanner.init(queueParams), + "Failed to throw error"); + assertTrue(e.getMessage().contains("[type, foo, queue]"), + "Error message didn't contain '[type, foo, queue]'"); + + String executors = getExecutors("'type': 'internal','maxSize':'32M','numThreads':1", + "'type': 'internal','maxSize':'128M','numThreads':2, 'foo':'bar'", + "'type': 'internal','numThreads':1, 'unexpectedField':'foo'"); + + final InitParameters execParams = getInitParams(senv, executors); + assertNotNull(execParams); + + DefaultCompactionPlanner ExecPlanner = new DefaultCompactionPlanner(); + var err = assertThrows(JsonParseException.class, () -> ExecPlanner.init(execParams), + "Failed to throw error"); + assertTrue(err.getMessage().contains("Invalid fields: [foo]"), + "Error message didn't contain '[foo]'"); + } + /** * Tests internal type executor with no numThreads set throws error */ @@ -302,6 +374,59 @@ public class DefaultCompactionPlannerTest { assertTrue(e.getMessage().contains("queue"), "Error message didn't contain queue"); } + /** + * Tests queue with missing name throws error + */ + @Test + public void testErrorQueueNoName() { + DefaultCompactionPlanner planner = new DefaultCompactionPlanner(); + Configuration conf = EasyMock.createMock(Configuration.class); + EasyMock.expect(conf.isSet(EasyMock.anyString())).andReturn(false).anyTimes(); + + ServiceEnvironment senv = EasyMock.createMock(ServiceEnvironment.class); + EasyMock.expect(senv.getConfiguration()).andReturn(conf).anyTimes(); + EasyMock.replay(conf, senv); + + String queues = "[{\"name\":\"smallQueue\", \"maxSize\":\"32M\"}, {\"maxSize\":\"120M\"}]"; + + final InitParameters params = getInitParamQueues(senv, queues); + assertNotNull(params); + + var e = assertThrows(NullPointerException.class, () -> planner.init(params), + "Failed to throw error"); + assertEquals(e.getMessage(), "'name' must be specified", "Error message didn't contain 'name'"); + } + + /** + * Tests not having executors or queues throws errors + */ + @Test + public void testErrorNoExecutors() { + DefaultCompactionPlanner planner = new DefaultCompactionPlanner(); + Configuration conf = EasyMock.createMock(Configuration.class); + EasyMock.expect(conf.isSet(EasyMock.anyString())).andReturn(false).anyTimes(); + + ServiceEnvironment senv = EasyMock.createMock(ServiceEnvironment.class); + EasyMock.expect(senv.getConfiguration()).andReturn(conf).anyTimes(); + EasyMock.replay(conf, senv); + + var execParams = getInitParams(senv, ""); + assertNotNull(execParams); + + var e = assertThrows(IllegalStateException.class, () -> planner.init(execParams), + "Failed to throw error"); + assertEquals("No defined executors or queues for this planner", e.getMessage(), + "Error message was not equal"); + + var params = getInitParamQueues(senv, ""); + assertNotNull(params); + + var err = assertThrows(IllegalStateException.class, () -> planner.init(params), + "Failed to throw error"); + assertEquals("No defined executors or queues for this planner", e.getMessage(), + "Error message was not equal"); + } + /** * Tests executors can only have one without a max size. */ @@ -360,7 +485,44 @@ public class DefaultCompactionPlannerTest { @Override public String getFullyQualifiedOption(String key) { assertEquals("maxOpen", key); - return Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey() + "cs1.planner.opts." + key; + return Property.COMPACTION_SERVICE_PREFIX.getKey() + "cs1.planner.opts." + key; + } + + @Override + public ExecutorManager getExecutorManager() { + return new ExecutorManager() { + @Override + public CompactionExecutorId createExecutor(String name, int threads) { + return CompactionExecutorIdImpl.externalId(name); + } + + @Override + public CompactionExecutorId getExternalExecutor(String name) { + return CompactionExecutorIdImpl.externalId(name); + } + }; + } + }; + } + + private CompactionPlanner.InitParameters getInitParamQueues(ServiceEnvironment senv, + String queues) { + return new CompactionPlanner.InitParameters() { + + @Override + public ServiceEnvironment getServiceEnvironment() { + return senv; + } + + @Override + public Map<String,String> getOptions() { + return Map.of("queues", queues, "maxOpen", "15"); + } + + @Override + public String getFullyQualifiedOption(String key) { + assertEquals("maxOpen", key); + return Property.COMPACTION_SERVICE_PREFIX.getKey() + "cs1.planner.opts." + key; } @Override @@ -521,7 +683,7 @@ public class DefaultCompactionPlannerTest { @Override public String getFullyQualifiedOption(String key) { assertEquals("maxOpen", key); - return Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey() + "cs1.planner.opts." + key; + return Property.COMPACTION_SERVICE_PREFIX.getKey() + "cs1.planner.opts." + key; } @Override diff --git a/core/src/test/java/org/apache/accumulo/core/util/compaction/CompactionServicesConfigTest.java b/core/src/test/java/org/apache/accumulo/core/util/compaction/CompactionServicesConfigTest.java new file mode 100644 index 0000000000..a5921e33eb --- /dev/null +++ b/core/src/test/java/org/apache/accumulo/core/util/compaction/CompactionServicesConfigTest.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.core.util.compaction; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.Map; + +import org.apache.accumulo.core.conf.ConfigurationCopy; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.spi.compaction.DefaultCompactionPlanner; +import org.junit.jupiter.api.Test; + +public class CompactionServicesConfigTest { + + @SuppressWarnings("removal") + private final Property oldPrefix = Property.TSERV_COMPACTION_SERVICE_PREFIX; + private final Property newPrefix = Property.COMPACTION_SERVICE_PREFIX; + + @Test + public void testCompactionProps() { + ConfigurationCopy conf = new ConfigurationCopy(); + + conf.set(newPrefix.getKey() + "default.planner", DefaultCompactionPlanner.class.getName()); + conf.set(newPrefix.getKey() + "default.planner.opts.maxOpen", "10"); + conf.set(newPrefix.getKey() + "default.planner.opts.executors", + "[{'name':'small','type':'internal','maxSize':'32M','numThreads':2},{'name':'medium','type':'internal','maxSize':'128M','numThreads':2},{'name':'large','type':'internal','numThreads':2}]"); + + conf.set(oldPrefix.getKey() + "default.planner.opts.ignoredProp", "1"); + conf.set(newPrefix.getKey() + "default.planner.opts.validProp", "1"); + conf.set(oldPrefix.getKey() + "default.planner.opts.validProp", "a"); + + var compactionConfig = new CompactionServicesConfig(conf); + assertEquals(Map.of("maxOpen", "10", "executors", + "[{'name':'small','type':'internal','maxSize':'32M','numThreads':2},{'name':'medium','type':'internal','maxSize':'128M','numThreads':2},{'name':'large','type':'internal','numThreads':2}]", + "validProp", "1"), compactionConfig.getOptions().get("default")); + } + + @Test + public void testDuplicateCompactionPlannerDefs() { + ConfigurationCopy conf = new ConfigurationCopy(); + + String planner = DefaultCompactionPlanner.class.getName(); + String oldPlanner = "OldPlanner"; + + conf.set(newPrefix.getKey() + "default.planner", planner); + conf.set(oldPrefix.getKey() + "default.planner", oldPlanner); + + conf.set(oldPrefix.getKey() + "old.planner", oldPlanner); + + var compactionConfig = new CompactionServicesConfig(conf); + assertEquals(Map.of("default", planner, "old", oldPlanner), compactionConfig.getPlanners()); + } + + @Test + public void testCompactionPlannerOldDef() { + ConfigurationCopy conf = new ConfigurationCopy(); + + conf.set(oldPrefix.getKey() + "cs1.planner", DefaultCompactionPlanner.class.getName()); + conf.set(oldPrefix.getKey() + "cs1.planner.opts.maxOpen", "10"); + conf.set(oldPrefix.getKey() + "cs1.planner.opts.executors", + "[{'name':'small','type':'internal','maxSize':'32M','numThreads':2},{'name':'medium','type':'internal','maxSize':'128M','numThreads':2},{'name':'large','type':'internal','numThreads':2}]"); + conf.set(oldPrefix.getKey() + "cs1.planner.opts.foo", "1"); + + var compactionConfig = new CompactionServicesConfig(conf); + assertTrue(compactionConfig.getOptions().get("cs1").containsKey("foo")); + assertEquals("1", compactionConfig.getOptions().get("cs1").get("foo")); + } + + @Test + public void testCompactionRateLimits() { + ConfigurationCopy conf = new ConfigurationCopy(); + CompactionServicesConfig compactionConfig; + + conf.set(oldPrefix.getKey() + "cs1.planner", DefaultCompactionPlanner.class.getName()); + conf.set(oldPrefix.getKey() + "cs1.rate.limit", "2M"); + compactionConfig = new CompactionServicesConfig(conf); + assertEquals(2097152, compactionConfig.getRateLimits().get("cs1")); + + // Test service collision + conf.set(newPrefix.getKey() + "cs1.rate.limit", "4M"); + var e = assertThrows(IllegalArgumentException.class, () -> new CompactionServicesConfig(conf), + "failed to throw error"); + assertEquals("Incomplete compaction service definition, missing planner class: cs1.rate.limit", + e.getMessage(), "Error message was not equal"); + } +} diff --git a/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionWatcher.java b/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionWatcher.java index 47e75fd42d..4790440ff6 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionWatcher.java +++ b/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionWatcher.java @@ -98,7 +98,7 @@ public class CompactionWatcher implements Runnable { // remove any compaction that completed or made progress observedCompactions.keySet().retainAll(newKeys); - long warnTime = config.getTimeInMillis(Property.TSERV_COMPACTION_WARN_TIME); + long warnTime = config.getTimeInMillis(Property.COMPACTION_WARN_TIME); // check for stuck compactions for (ObservedCompactionInfo oci : observedCompactions.values()) { diff --git a/server/base/src/main/java/org/apache/accumulo/server/conf/CheckCompactionConfig.java b/server/base/src/main/java/org/apache/accumulo/server/conf/CheckCompactionConfig.java index e297168e51..029cb2fb13 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/conf/CheckCompactionConfig.java +++ b/server/base/src/main/java/org/apache/accumulo/server/conf/CheckCompactionConfig.java @@ -112,7 +112,8 @@ public class CheckCompactionConfig implements KeywordExecutable { CompactionPlanner planner = plannerClass.getDeclaredConstructor().newInstance(); var initParams = new CompactionPlannerInitParams(CompactionServiceId.of(serviceId), - servicesConfig.getOptions().get(serviceId), senv); + servicesConfig.getPlannerPrefix(serviceId), servicesConfig.getOptions().get(serviceId), + senv); planner.init(initParams); diff --git a/server/base/src/test/java/org/apache/accumulo/server/conf/CheckCompactionConfigTest.java b/server/base/src/test/java/org/apache/accumulo/server/conf/CheckCompactionConfigTest.java index c80ab28f8d..4aa38abf90 100644 --- a/server/base/src/test/java/org/apache/accumulo/server/conf/CheckCompactionConfigTest.java +++ b/server/base/src/test/java/org/apache/accumulo/server/conf/CheckCompactionConfigTest.java @@ -79,6 +79,29 @@ public class CheckCompactionConfigTest extends WithTestNames { CheckCompactionConfig.main(new String[] {filePath}); } + @Test + public void testValidInput3() throws Exception { + String inputString = ("tserver.compaction.major.service.cs1.planner=" + + "org.apache.accumulo.core.spi.compaction.DefaultCompactionPlanner \n" + + "tserver.compaction.major.service.cs1.planner.opts.executors=\\\n" + + "[{'name':'small','type':'internal','maxSize':'16M','numThreads':8},\\\n" + + "{'name':'medium','type':'internal','maxSize':'128M','numThreads':4},\\\n" + + "{'name':'large','type':'internal','numThreads':2}] \n" + + "tserver.compaction.major.service.cs2.planner=" + + "org.apache.accumulo.core.spi.compaction.DefaultCompactionPlanner \n" + + "tserver.compaction.major.service.cs2.planner.opts.executors=\\\n" + + "[{'name':'small','type':'internal','maxSize':'16M','numThreads':7},\\\n" + + "{'name':'medium','type':'internal','maxSize':'128M','numThreads':5},\\\n" + + "{'name':'large','type':'external','queue':'DCQ1'}] \n" + + "compaction.service.cs3.planner=" + + "org.apache.accumulo.core.spi.compaction.DefaultCompactionPlanner \n" + + "compaction.service.cs3.planner.opts.queues=\\\n" + + "[{'name':'small','maxSize':'16M'},{'name':'large'}]").replaceAll("'", "\""); + + String filePath = writeToFileAndReturnPath(inputString); + CheckCompactionConfig.main(new String[] {filePath}); + } + @Test public void testThrowsExternalNumThreadsError() throws IOException { String inputString = ("tserver.compaction.major.service.cs1.planner=" @@ -93,7 +116,7 @@ public class CheckCompactionConfigTest extends WithTestNames { var e = assertThrows(IllegalArgumentException.class, () -> CheckCompactionConfig.main(new String[] {filePath})); - assertEquals(e.getMessage(), expectedErrorMsg); + assertEquals(expectedErrorMsg, e.getMessage()); } @Test @@ -119,7 +142,7 @@ public class CheckCompactionConfigTest extends WithTestNames { + "[{'name':'small','type':'internal','maxSize':'16M','numThreads':8},\\\n" + "{'name':'medium','type':'internal','maxSize':'128M','numThreads':4},\\\n" + "{'name':'large','type':'internal','numThreads':2}]").replaceAll("'", "\""); - String expectedErrorMsg = "Incomplete compaction service definitions, missing planner class"; + String expectedErrorMsg = "Incomplete compaction service definition, missing planner class"; String filePath = writeToFileAndReturnPath(inputString); @@ -138,13 +161,31 @@ public class CheckCompactionConfigTest extends WithTestNames { + "{'name':'small','type':'internal','numThreads':2}]").replaceAll("'", "\""); String expectedErrorMsg = "Duplicate Compaction Executor ID found"; - String filePath = writeToFileAndReturnPath(inputString); + final String filePath = writeToFileAndReturnPath(inputString); var e = assertThrows(IllegalStateException.class, () -> CheckCompactionConfig.main(new String[] {filePath})); assertTrue(e.getMessage().startsWith(expectedErrorMsg)); } + @Test + public void testRepeatedQueueName() throws Exception { + String inputString = ("compaction.service.cs1.planner=" + + "org.apache.accumulo.core.spi.compaction.DefaultCompactionPlanner \n" + + "compaction.service.cs1.planner.opts.executors=\\\n" + + "[{'name':'small','type':'external','maxSize':'16M','queue':'failedQueue'}] \n" + + "compaction.service.cs1.planner.opts.queues=[{'name':'failedQueue'}]") + .replaceAll("'", "\""); + + String expectedErrorMsg = "Duplicate external executor for queue failedQueue"; + + final String filePath = writeToFileAndReturnPath(inputString); + + var err = assertThrows(IllegalArgumentException.class, + () -> CheckCompactionConfig.main(new String[] {filePath})); + assertEquals(err.getMessage(), expectedErrorMsg); + } + @Test public void testInvalidTypeValue() throws Exception { String inputString = ("tserver.compaction.major.service.cs1.planner=" diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionManager.java index bbabc9f262..6332842236 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionManager.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionManager.java @@ -205,7 +205,7 @@ public class CompactionManager { try { tmpServices.put(CompactionServiceId.of(serviceName), new CompactionService(serviceName, plannerClassName, - currentCfg.getRateLimit(serviceName), + currentCfg.getPlannerPrefix(serviceName), currentCfg.getRateLimit(serviceName), currentCfg.getOptions().getOrDefault(serviceName, Map.of()), context, ceMetrics, this::getExternalExecutor)); } catch (RuntimeException e) { @@ -249,11 +249,12 @@ public class CompactionManager { if (service == null) { tmpServices.put(csid, new CompactionService(serviceName, plannerClassName, - tmpCfg.getRateLimit(serviceName), + tmpCfg.getPlannerPrefix(serviceName), tmpCfg.getRateLimit(serviceName), tmpCfg.getOptions().getOrDefault(serviceName, Map.of()), context, ceMetrics, this::getExternalExecutor)); } else { - service.configurationChanged(plannerClassName, tmpCfg.getRateLimit(serviceName), + service.configurationChanged(plannerClassName, tmpCfg.getPlannerPrefix(serviceName), + tmpCfg.getRateLimit(serviceName), tmpCfg.getOptions().getOrDefault(serviceName, Map.of())); tmpServices.put(csid, service); } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionService.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionService.java index 926152e1be..fffbbe1afe 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionService.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionService.java @@ -86,8 +86,8 @@ public class CompactionService { private static final Logger log = LoggerFactory.getLogger(CompactionService.class); - public CompactionService(String serviceName, String plannerClass, Long maxRate, - Map<String,String> plannerOptions, ServerContext context, + public CompactionService(String serviceName, String plannerClass, String plannerPrefix, + Long maxRate, Map<String,String> plannerOptions, ServerContext context, CompactionExecutorsMetrics ceMetrics, Function<CompactionExecutorId,ExternalCompactionExecutor> externExecutorSupplier) { @@ -100,8 +100,8 @@ public class CompactionService { this.ceMetrics = ceMetrics; this.externExecutorSupplier = externExecutorSupplier; - var initParams = - new CompactionPlannerInitParams(myId, plannerOpts, new ServiceEnvironmentImpl(context)); + var initParams = new CompactionPlannerInitParams(myId, plannerPrefix, plannerOpts, + new ServiceEnvironmentImpl(context)); planner = createPlanner(myId, plannerClass, plannerOptions, initParams); Map<CompactionExecutorId,CompactionExecutor> tmpExecutors = new HashMap<>(); @@ -359,7 +359,7 @@ public class CompactionService { .anyMatch(job -> job.getStatus() == Status.QUEUED); } - public void configurationChanged(String plannerClassName, Long maxRate, + public void configurationChanged(String plannerClassName, String plannerPrefix, Long maxRate, Map<String,String> plannerOptions) { Preconditions.checkArgument(maxRate >= 0); @@ -372,8 +372,8 @@ public class CompactionService { return; } - var initParams = - new CompactionPlannerInitParams(myId, plannerOptions, new ServiceEnvironmentImpl(context)); + var initParams = new CompactionPlannerInitParams(myId, plannerPrefix, plannerOptions, + new ServiceEnvironmentImpl(context)); var tmpPlanner = createPlanner(myId, plannerClassName, plannerOptions, initParams); Map<CompactionExecutorId,CompactionExecutor> tmpExecutors = new HashMap<>(); diff --git a/test/src/main/java/org/apache/accumulo/test/compaction/BadCompactionServiceConfigIT.java b/test/src/main/java/org/apache/accumulo/test/compaction/BadCompactionServiceConfigIT.java index 5f5ec1dd4b..72f53b94c0 100644 --- a/test/src/main/java/org/apache/accumulo/test/compaction/BadCompactionServiceConfigIT.java +++ b/test/src/main/java/org/apache/accumulo/test/compaction/BadCompactionServiceConfigIT.java @@ -54,7 +54,7 @@ import com.google.common.collect.MoreCollectors; public class BadCompactionServiceConfigIT extends AccumuloClusterHarness { - private static final String CSP = Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey(); + private static final String CSP = Property.COMPACTION_SERVICE_PREFIX.getKey(); @Override public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { diff --git a/test/src/main/java/org/apache/accumulo/test/compaction/CompactionConfigChangeIT.java b/test/src/main/java/org/apache/accumulo/test/compaction/CompactionConfigChangeIT.java index 60ede3dff7..6b8dfe2fb9 100644 --- a/test/src/main/java/org/apache/accumulo/test/compaction/CompactionConfigChangeIT.java +++ b/test/src/main/java/org/apache/accumulo/test/compaction/CompactionConfigChangeIT.java @@ -57,10 +57,10 @@ public class CompactionConfigChangeIT extends AccumuloClusterHarness { final String table = getUniqueNames(1)[0]; client.instanceOperations().setProperty( - Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey() + "cs1.planner", + Property.COMPACTION_SERVICE_PREFIX.getKey() + "cs1.planner", DefaultCompactionPlanner.class.getName()); client.instanceOperations().setProperty( - Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey() + "cs1.planner.opts.executors", + Property.COMPACTION_SERVICE_PREFIX.getKey() + "cs1.planner.opts.executors", ("[{'name':'small','type':'internal','maxSize':'2M','numThreads':2}," + "{'name':'medium','type':'internal','maxSize':'128M','numThreads':2}," + "{'name':'large','type':'internal','numThreads':2}]").replaceAll("'", "\"")); @@ -91,7 +91,7 @@ public class CompactionConfigChangeIT extends AccumuloClusterHarness { // compactions. Because the compactions are running slow, expect this config change to overlap // with running compactions. client.instanceOperations().setProperty( - Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey() + "cs1.planner.opts.executors", + Property.COMPACTION_SERVICE_PREFIX.getKey() + "cs1.planner.opts.executors", ("[{'name':'little','type':'internal','maxSize':'128M','numThreads':8}," + "{'name':'big','type':'internal','numThreads':2}]").replaceAll("'", "\"")); diff --git a/test/src/main/java/org/apache/accumulo/test/compaction/CompactionExecutorIT.java b/test/src/main/java/org/apache/accumulo/test/compaction/CompactionExecutorIT.java index 1832117ce5..45e0b20d08 100644 --- a/test/src/main/java/org/apache/accumulo/test/compaction/CompactionExecutorIT.java +++ b/test/src/main/java/org/apache/accumulo/test/compaction/CompactionExecutorIT.java @@ -140,7 +140,7 @@ public class CompactionExecutorIT extends SharedMiniClusterBase { SharedMiniClusterBase.startMiniClusterWithConfig((miniCfg, coreSite) -> { Map<String,String> siteCfg = new HashMap<>(); - var csp = Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey(); + var csp = Property.COMPACTION_SERVICE_PREFIX.getKey(); siteCfg.put(csp + "cs1.planner", TestPlanner.class.getName()); siteCfg.put(csp + "cs1.planner.opts.executors", "3"); siteCfg.put(csp + "cs1.planner.opts.filesPerCompaction", "5"); @@ -205,10 +205,11 @@ public class CompactionExecutorIT extends SharedMiniClusterBase { assertEquals(2, getFiles(client, "rctt").size()); - client.instanceOperations().setProperty(Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey() - + "recfg.planner.opts.filesPerCompaction", "5"); client.instanceOperations().setProperty( - Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey() + "recfg.planner.opts.executors", "1"); + Property.COMPACTION_SERVICE_PREFIX.getKey() + "recfg.planner.opts.filesPerCompaction", + "5"); + client.instanceOperations().setProperty( + Property.COMPACTION_SERVICE_PREFIX.getKey() + "recfg.planner.opts.executors", "1"); addFiles(client, "rctt", 10); @@ -223,15 +224,15 @@ public class CompactionExecutorIT extends SharedMiniClusterBase { @Test public void testAddCompactionService() throws Exception { try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { - client.instanceOperations().setProperty(Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey() - + "newcs.planner.opts.filesPerCompaction", "7"); client.instanceOperations().setProperty( - Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey() + "newcs.planner.opts.process", - "SYSTEM"); + Property.COMPACTION_SERVICE_PREFIX.getKey() + "newcs.planner.opts.filesPerCompaction", + "7"); + client.instanceOperations().setProperty( + Property.COMPACTION_SERVICE_PREFIX.getKey() + "newcs.planner.opts.process", "SYSTEM"); client.instanceOperations().setProperty( - Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey() + "newcs.planner.opts.executors", "3"); + Property.COMPACTION_SERVICE_PREFIX.getKey() + "newcs.planner.opts.executors", "3"); client.instanceOperations().setProperty( - Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey() + "newcs.planner", + Property.COMPACTION_SERVICE_PREFIX.getKey() + "newcs.planner", TestPlanner.class.getName()); createTable(client, "acst", "newcs"); diff --git a/test/src/main/java/org/apache/accumulo/test/shell/ConfigSetIT.java b/test/src/main/java/org/apache/accumulo/test/shell/ConfigSetIT.java index 9da4c6b216..2c1ae92d7e 100644 --- a/test/src/main/java/org/apache/accumulo/test/shell/ConfigSetIT.java +++ b/test/src/main/java/org/apache/accumulo/test/shell/ConfigSetIT.java @@ -49,6 +49,7 @@ public class ConfigSetIT extends SharedMiniClusterBase { private static final Logger log = LoggerFactory.getLogger(ConfigSetIT.class); @Test + @SuppressWarnings("removal") public void setInvalidJson() throws Exception { log.debug("Starting setInvalidJson test ------------------"); @@ -61,7 +62,6 @@ public class ConfigSetIT extends SharedMiniClusterBase { try (AccumuloClient client = getCluster().createAccumuloClient("root", new PasswordToken(getRootPassword()))) { - client.instanceOperations().setProperty(TSERV_COMPACTION_SERVICE_ROOT_EXECUTORS.getKey(), validJson); assertThrows(AccumuloException.class, () -> client.instanceOperations() diff --git a/test/src/main/java/org/apache/accumulo/test/util/SlowOps.java b/test/src/main/java/org/apache/accumulo/test/util/SlowOps.java index 979c3b85dd..d0012c5d37 100644 --- a/test/src/main/java/org/apache/accumulo/test/util/SlowOps.java +++ b/test/src/main/java/org/apache/accumulo/test/util/SlowOps.java @@ -79,6 +79,7 @@ public class SlowOps { createData(); } + @SuppressWarnings("removal") public static void setExpectedCompactions(AccumuloClient client, final int numParallelExpected) { final int target = numParallelExpected + 1; try {