This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit 8a886ce2c949c1cefff7fa38908017526dc4316b Merge: 8926fe7a75 a4b4f540c8 Author: Keith Turner <ktur...@apache.org> AuthorDate: Thu Nov 9 15:03:31 2023 -0500 Merge branch 'main' into elasticity .../core/metadata/schema/RootTabletMetadata.java | 65 ++++- .../schema/UpgraderDeprecatedConstants.java | 8 +- .../metadata/schema/RootTabletMetadataTest.java | 119 ++++++++ .../accumulo/server/AccumuloDataVersion.java | 4 +- .../server/compaction/CompactionJobGenerator.java | 51 +++- .../compaction/ProvisionalCompactionPlanner.java | 67 +++++ .../server/constraints/MetadataConstraints.java | 5 +- server/manager/pom.xml | 4 + .../accumulo/manager/upgrade/Upgrader11to12.java | 250 ++++++++++------ .../{Upgrader11to12.java => Upgrader12to13.java} | 4 +- .../manager/upgrade/Upgrader11to12Test.java | 317 +++++++++++++++++++++ .../java/org/apache/accumulo/test/MetaSplitIT.java | 62 ++++ .../compaction/BadCompactionServiceConfigIT.java | 236 +++++++++++++++ .../test/compaction/ExternalCompaction_1_IT.java | 56 ++++ .../test/functional/FunctionalTestUtils.java | 7 +- .../test/functional/HalfClosedTabletIT.java | 5 + .../apache/accumulo/test/lock/ServiceLockIT.java | 14 +- .../accumulo/test/util/FileMetadataUtil.java | 15 + 18 files changed, 1162 insertions(+), 127 deletions(-) diff --cc core/src/main/java/org/apache/accumulo/core/metadata/schema/RootTabletMetadata.java index e1e638f7d4,8a39e34798..c5144bb9c4 --- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/RootTabletMetadata.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/RootTabletMetadata.java @@@ -166,9 -205,14 +217,13 @@@ public class RootTabletMetadata .map(qualVal -> new SimpleImmutableEntry<>( new Key(row, famToQualVal.getKey(), qualVal.getKey(), 1), new Value(qualVal.getValue())))); - return TabletMetadata.convertRow(entries.iterator(), - EnumSet.allOf(TabletMetadata.ColumnType.class), false); + return entries; } + public static boolean needsUpgrade(final String json) { + return Data.needsUpgrade(json); + } + /** * @return a JSON representation of the root tablet's data. */ diff --cc server/base/src/main/java/org/apache/accumulo/server/AccumuloDataVersion.java index 8d60990a74,4aae44a974..a96721988e --- a/server/base/src/main/java/org/apache/accumulo/server/AccumuloDataVersion.java +++ b/server/base/src/main/java/org/apache/accumulo/server/AccumuloDataVersion.java @@@ -77,10 -77,8 +77,8 @@@ public class AccumuloDataVersion return CURRENT_VERSION; } - // TODO - this disables upgrades until https://github.com/apache/accumulo/issues/3768 is done - // public static final Set<Integer> CAN_RUN = Set.of( - // REMOVE_DEPRECATIONS_FOR_VERSION_3, METADATA_FILE_JSON_ENCODING, CURRENT_VERSION); - public static final Set<Integer> CAN_RUN = - Set.of(ROOT_TABLET_META_CHANGES, REMOVE_DEPRECATIONS_FOR_VERSION_3, CURRENT_VERSION); ++ // ELASTICITY_TODO get upgrade working + public static final Set<Integer> CAN_RUN = Set.of(CURRENT_VERSION); /** * Get the stored, current working version. diff --cc server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionJobGenerator.java index bf9c42516a,0000000000..409873c073 mode 100644,000000..100644 --- a/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionJobGenerator.java +++ b/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionJobGenerator.java @@@ -1,281 -1,0 +1,308 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.server.compaction; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Optional; +import java.util.Set; ++import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import org.apache.accumulo.core.client.PluginEnvironment; +import org.apache.accumulo.core.client.admin.compaction.CompactableFile; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.metadata.CompactableFileImpl; +import org.apache.accumulo.core.metadata.schema.TabletMetadata; +import org.apache.accumulo.core.spi.common.ServiceEnvironment; +import org.apache.accumulo.core.spi.compaction.CompactionDispatcher; +import org.apache.accumulo.core.spi.compaction.CompactionJob; +import org.apache.accumulo.core.spi.compaction.CompactionKind; +import org.apache.accumulo.core.spi.compaction.CompactionPlan; +import org.apache.accumulo.core.spi.compaction.CompactionPlanner; +import org.apache.accumulo.core.spi.compaction.CompactionServiceId; +import org.apache.accumulo.core.spi.compaction.CompactionServices; ++import org.apache.accumulo.core.util.Pair; +import org.apache.accumulo.core.util.cache.Caches; +import org.apache.accumulo.core.util.cache.Caches.CacheName; +import org.apache.accumulo.core.util.compaction.CompactionJobImpl; +import org.apache.accumulo.core.util.compaction.CompactionPlanImpl; +import org.apache.accumulo.core.util.compaction.CompactionPlannerInitParams; +import org.apache.accumulo.core.util.compaction.CompactionServicesConfig; ++import org.slf4j.Logger; ++import org.slf4j.LoggerFactory; + +import com.github.benmanes.caffeine.cache.Cache; ++import com.github.benmanes.caffeine.cache.Caffeine; + +public class CompactionJobGenerator { - ++ private static final Logger log = LoggerFactory.getLogger(CompactionJobGenerator.class); + private final CompactionServicesConfig servicesConfig; + private final Map<CompactionServiceId,CompactionPlanner> planners = new HashMap<>(); + private final Cache<TableId,CompactionDispatcher> dispatchers; + private final Set<CompactionServiceId> serviceIds; + private final PluginEnvironment env; + private final Map<Long,Map<String,String>> allExecutionHints; ++ private final Cache<Pair<TableId,CompactionServiceId>,Long> unknownCompactionServiceErrorCache; + + public CompactionJobGenerator(PluginEnvironment env, + Map<Long,Map<String,String>> executionHints) { + servicesConfig = new CompactionServicesConfig(env.getConfiguration()); + serviceIds = servicesConfig.getPlanners().keySet().stream().map(CompactionServiceId::of) + .collect(Collectors.toUnmodifiableSet()); + + dispatchers = Caches.getInstance().createNewBuilder(CacheName.COMPACTION_DISPATCHERS, false) + .maximumSize(10).build(); + this.env = env; + if (executionHints.isEmpty()) { + this.allExecutionHints = executionHints; + } else { + this.allExecutionHints = new HashMap<>(); + // Make the maps that will be passed to plugins unmodifiable. Do this once, so it does not + // need to be done for each tablet. + executionHints.forEach((k, v) -> allExecutionHints.put(k, + v.isEmpty() ? Map.of() : Collections.unmodifiableMap(v))); + } ++ unknownCompactionServiceErrorCache = ++ Caffeine.newBuilder().expireAfterWrite(5, TimeUnit.MINUTES).build(); + } + + public Collection<CompactionJob> generateJobs(TabletMetadata tablet, Set<CompactionKind> kinds) { + + // ELASTICITY_TODO do not want user configured plugins to cause exceptions that prevents tablets + // from being + // assigned. So probably want to catch exceptions and log, but not too spammily OR some how + // report something + // back to the manager so it can log. + + Collection<CompactionJob> systemJobs = Set.of(); + + if (kinds.contains(CompactionKind.SYSTEM)) { + CompactionServiceId serviceId = dispatch(CompactionKind.SYSTEM, tablet, Map.of()); + systemJobs = planCompactions(serviceId, CompactionKind.SYSTEM, tablet, Map.of()); + } + + Collection<CompactionJob> userJobs = Set.of(); + + if (kinds.contains(CompactionKind.USER) && tablet.getSelectedFiles() != null) { + var hints = allExecutionHints.get(tablet.getSelectedFiles().getFateTxId()); + if (hints != null) { + CompactionServiceId serviceId = dispatch(CompactionKind.USER, tablet, hints); + userJobs = planCompactions(serviceId, CompactionKind.USER, tablet, hints); + } + } + + if (userJobs.isEmpty()) { + return systemJobs; + } else if (systemJobs.isEmpty()) { + return userJobs; + } else { + var all = new ArrayList<CompactionJob>(systemJobs.size() + userJobs.size()); + all.addAll(systemJobs); + all.addAll(userJobs); + return all; + } + } + + private CompactionServiceId dispatch(CompactionKind kind, TabletMetadata tablet, + Map<String,String> executionHints) { + + CompactionDispatcher dispatcher = dispatchers.get(tablet.getTableId(), + tableId -> CompactionPluginUtils.createDispatcher((ServiceEnvironment) env, tableId)); + + CompactionDispatcher.DispatchParameters dispatchParams = + new CompactionDispatcher.DispatchParameters() { + @Override + public CompactionServices getCompactionServices() { + return () -> serviceIds; + } + + @Override + public ServiceEnvironment getServiceEnv() { + return (ServiceEnvironment) env; + } + + @Override + public CompactionKind getCompactionKind() { + return kind; + } + + @Override + public Map<String,String> getExecutionHints() { + return executionHints; + } + }; + + return dispatcher.dispatch(dispatchParams).getService(); + } + + private Collection<CompactionJob> planCompactions(CompactionServiceId serviceId, + CompactionKind kind, TabletMetadata tablet, Map<String,String> executionHints) { + ++ if (!servicesConfig.getPlanners().containsKey(serviceId.canonical())) { ++ var cacheKey = new Pair<>(tablet.getTableId(), serviceId); ++ var last = unknownCompactionServiceErrorCache.getIfPresent(cacheKey); ++ if (last == null) { ++ // have not logged an error recently for this, so lets log one ++ log.error( ++ "Tablet {} returned non-existent compaction service {} for compaction type {}. Check" ++ + " the table compaction dispatcher configuration. No compactions will happen" ++ + " until the configuration is fixed. This log message is temporarily suppressed for the" ++ + " entire table.", ++ tablet.getExtent(), serviceId, kind); ++ unknownCompactionServiceErrorCache.put(cacheKey, System.currentTimeMillis()); ++ } ++ ++ return Set.of(); ++ } ++ + CompactionPlanner planner = + planners.computeIfAbsent(serviceId, sid -> createPlanner(tablet.getTableId(), serviceId)); + + // selecting indicator + // selected files + + String ratioStr = + env.getConfiguration(tablet.getTableId()).get(Property.TABLE_MAJC_RATIO.getKey()); + if (ratioStr == null) { + ratioStr = Property.TABLE_MAJC_RATIO.getDefaultValue(); + } + + double ratio = Double.parseDouble(ratioStr); + + Set<CompactableFile> allFiles = tablet.getFilesMap().entrySet().stream() + .map(entry -> new CompactableFileImpl(entry.getKey(), entry.getValue())) + .collect(Collectors.toUnmodifiableSet()); + Set<CompactableFile> candidates; + + if (kind == CompactionKind.SYSTEM) { + if (tablet.getExternalCompactions().isEmpty() && tablet.getSelectedFiles() == null) { + candidates = allFiles; + } else { + var tmpFiles = new HashMap<>(tablet.getFilesMap()); + // remove any files that are in active compactions + tablet.getExternalCompactions().values().stream().flatMap(ecm -> ecm.getJobFiles().stream()) + .forEach(tmpFiles::remove); + // remove any files that are selected + if (tablet.getSelectedFiles() != null) { + tmpFiles.keySet().removeAll(tablet.getSelectedFiles().getFiles()); + } + candidates = tmpFiles.entrySet().stream() + .map(entry -> new CompactableFileImpl(entry.getKey(), entry.getValue())) + .collect(Collectors.toUnmodifiableSet()); + } + } else if (kind == CompactionKind.USER) { + var selectedFiles = new HashSet<>(tablet.getSelectedFiles().getFiles()); + tablet.getExternalCompactions().values().stream().flatMap(ecm -> ecm.getJobFiles().stream()) + .forEach(selectedFiles::remove); + candidates = selectedFiles.stream() + .map(file -> new CompactableFileImpl(file, tablet.getFilesMap().get(file))) + .collect(Collectors.toUnmodifiableSet()); + } else { + throw new UnsupportedOperationException(); + } + + if (candidates.isEmpty()) { + // there are not candidate files for compaction, so no reason to call the planner + return Set.of(); + } + + CompactionPlanner.PlanningParameters params = new CompactionPlanner.PlanningParameters() { + @Override + public TableId getTableId() { + return tablet.getTableId(); + } + + @Override + public ServiceEnvironment getServiceEnvironment() { + return (ServiceEnvironment) env; + } + + @Override + public CompactionKind getKind() { + return kind; + } + + @Override + public double getRatio() { + return ratio; + } + + @Override + public Collection<CompactableFile> getAll() { + return allFiles; + } + + @Override + public Collection<CompactableFile> getCandidates() { + return candidates; + } + + @Override + public Collection<CompactionJob> getRunningCompactions() { + var allFiles2 = tablet.getFilesMap(); + return tablet.getExternalCompactions().values().stream().map(ecMeta -> { + Collection<CompactableFile> files = ecMeta.getJobFiles().stream() + .map(f -> new CompactableFileImpl(f, allFiles2.get(f))).collect(Collectors.toList()); + CompactionJob job = new CompactionJobImpl(ecMeta.getPriority(), + ecMeta.getCompactionExecutorId(), files, ecMeta.getKind(), Optional.empty()); + return job; + }).collect(Collectors.toUnmodifiableList()); + } + + @Override + public Map<String,String> getExecutionHints() { + return executionHints; + } + + @Override + public CompactionPlan.Builder createPlanBuilder() { + return new CompactionPlanImpl.BuilderImpl(kind, allFiles, candidates); + } + }; + + return planner.makePlan(params).getJobs(); + } + + private CompactionPlanner createPlanner(TableId tableId, CompactionServiceId serviceId) { + - String plannerClassName = servicesConfig.getPlanners().get(serviceId.canonical()); - - CompactionPlanner planner = null; ++ CompactionPlanner planner; ++ String plannerClassName = null; ++ Map<String,String> options = null; + try { ++ plannerClassName = servicesConfig.getPlanners().get(serviceId.canonical()); ++ options = servicesConfig.getOptions().get(serviceId.canonical()); + planner = env.instantiate(tableId, plannerClassName, CompactionPlanner.class); - } catch (ReflectiveOperationException e) { - throw new RuntimeException(e); ++ CompactionPlannerInitParams initParameters = new CompactionPlannerInitParams(serviceId, ++ servicesConfig.getOptions().get(serviceId.canonical()), (ServiceEnvironment) env); ++ planner.init(initParameters); ++ } catch (Exception e) { ++ log.error( ++ "Failed to create compaction planner for {} using class:{} options:{}. Compaction service will not start any new compactions until its configuration is fixed.", ++ serviceId, plannerClassName, options, e); ++ planner = new ProvisionalCompactionPlanner(serviceId); + } - - CompactionPlannerInitParams initParameters = new CompactionPlannerInitParams(serviceId, - servicesConfig.getOptions().get(serviceId.canonical()), (ServiceEnvironment) env); - - planner.init(initParameters); - + return planner; + } +} diff --cc server/base/src/main/java/org/apache/accumulo/server/compaction/ProvisionalCompactionPlanner.java index 0000000000,1d27e9f6e1..e3e870eb92 mode 000000,100644..100644 --- a/server/base/src/main/java/org/apache/accumulo/server/compaction/ProvisionalCompactionPlanner.java +++ b/server/base/src/main/java/org/apache/accumulo/server/compaction/ProvisionalCompactionPlanner.java @@@ -1,0 -1,67 +1,67 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ -package org.apache.accumulo.tserver.compactions; ++package org.apache.accumulo.server.compaction; + + import java.util.concurrent.TimeUnit; + import java.util.concurrent.atomic.AtomicLong; + + import org.apache.accumulo.core.spi.compaction.CompactionPlan; + import org.apache.accumulo.core.spi.compaction.CompactionPlanner; + import org.apache.accumulo.core.spi.compaction.CompactionServiceId; + import org.slf4j.LoggerFactory; + + /** + * A compaction planner that makes no plans and is intended to be used temporarily when a compaction + * service has no compaction planner because it is misconfigured. + */ + public class ProvisionalCompactionPlanner implements CompactionPlanner { + + private final CompactionServiceId serviceId; + private AtomicLong lastWarnNanoTime = new AtomicLong(System.nanoTime()); + + public ProvisionalCompactionPlanner(CompactionServiceId serviceId) { + this.serviceId = serviceId; + } + + @Override + public void init(InitParameters params) { + + } + + @Override + public CompactionPlan makePlan(PlanningParameters params) { + var nanoTime = System.nanoTime(); + var updatedTime = lastWarnNanoTime.updateAndGet(last -> { + if (nanoTime - last > TimeUnit.MINUTES.toNanos(5)) { + return nanoTime; + } + + return last; + }); + + if (updatedTime == nanoTime) { + LoggerFactory.getLogger(ProvisionalCompactionPlanner.class) + .error("The compaction service " + + "'{}' is currently disabled, likely because it has bad configuration. No " + + "compactions will occur on this service until it is fixed.", serviceId); + } + + return params.createPlanBuilder().build(); + } + } diff --cc server/base/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java index db71c37730,936d7f16ef..10e1aa9fc1 --- a/server/base/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java +++ b/server/base/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java @@@ -53,8 -51,7 +53,9 @@@ import org.apache.accumulo.core.metadat import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.SuspendLocationColumn; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily; +import org.apache.accumulo.core.metadata.schema.SelectedFiles; +import org.apache.accumulo.core.metadata.schema.TabletOperationId; + import org.apache.accumulo.core.metadata.schema.UpgraderDeprecatedConstants; import org.apache.accumulo.core.util.ColumnFQ; import org.apache.accumulo.core.util.cleaner.CleanerUtil; import org.apache.accumulo.server.ServerContext; @@@ -102,7 -98,8 +103,9 @@@ public class MetadataConstraints implem FutureLocationColumnFamily.NAME, ClonedColumnFamily.NAME, ExternalCompactionColumnFamily.NAME, - CompactedColumnFamily.NAME); ++ CompactedColumnFamily.NAME, + UpgraderDeprecatedConstants.ChoppedColumnFamily.NAME + ); // @formatter:on private static boolean isValidColumn(ColumnUpdate cu) { diff --cc server/manager/src/main/java/org/apache/accumulo/manager/upgrade/Upgrader12to13.java index f1d11712d0,0000000000..1906d05ae5 mode 100644,000000..100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/Upgrader12to13.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/Upgrader12to13.java @@@ -1,147 -1,0 +1,147 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.manager.upgrade; + +import static org.apache.accumulo.core.metadata.schema.MetadataSchema.RESERVED_PREFIX; + +import java.util.Map; + +import org.apache.accumulo.core.client.admin.TabletHostingGoal; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.metadata.MetadataTable; +import org.apache.accumulo.core.metadata.RootTable; +import org.apache.accumulo.core.metadata.schema.Ample.DataLevel; +import org.apache.accumulo.core.metadata.schema.Ample.TabletsMutator; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ExternalCompactionColumnFamily; +import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType; +import org.apache.accumulo.core.metadata.schema.TabletsMetadata; +import org.apache.accumulo.core.schema.Section; +import org.apache.accumulo.server.ServerContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; + - public class Upgrader11to12 implements Upgrader { ++public class Upgrader12to13 implements Upgrader { + - private static final Logger LOG = LoggerFactory.getLogger(Upgrader11to12.class); ++ private static final Logger LOG = LoggerFactory.getLogger(Upgrader12to13.class); + + @Override + public void upgradeZookeeper(ServerContext context) { + LOG.info("setting root table stored hosting goal"); + addHostingGoalToRootTable(context); + } + + @Override + public void upgradeRoot(ServerContext context) { + LOG.info("setting metadata table hosting goal"); + addHostingGoalToMetadataTable(context); + } + + @Override + public void upgradeMetadata(ServerContext context) { + LOG.info("setting hosting goal on user tables"); + addHostingGoalToUserTables(context); + deleteExternalCompactionFinalStates(context); + deleteExternalCompactions(context); + } + + private void deleteExternalCompactionFinalStates(ServerContext context) { + // This metadata was only written for user tablets as part of the compaction commit process. + // Compactions are committed in a completely different way now, so delete these entries. Its + // possible some completed compactions may need to be redone, but processing these entries would + // not be easy to test so its better for correctness to delete them and redo the work. + try (var scanner = context.createScanner(MetadataTable.NAME); + var writer = context.createBatchWriter(MetadataTable.NAME)) { + var section = new Section(RESERVED_PREFIX + "ecomp", true, RESERVED_PREFIX + "ecomq", false); + scanner.setRange(section.getRange()); + + for (Map.Entry<Key,Value> entry : scanner) { + var key = entry.getKey(); + var row = key.getRow(); + Preconditions.checkState(row.toString().startsWith(section.getRowPrefix())); + Mutation m = new Mutation(row); + Preconditions.checkState(key.getColumnVisibilityData().length() == 0, + "Expected empty visibility, saw %s ", key.getColumnVisibilityData()); + m.putDelete(key.getColumnFamily(), key.getColumnQualifier()); + writer.addMutation(m); + } + } catch (Exception e) { + throw new IllegalStateException(e); + } + } + + private void addHostingGoalToSystemTable(ServerContext context, TableId tableId) { + try ( + TabletsMetadata tm = + context.getAmple().readTablets().forTable(tableId).fetch(ColumnType.PREV_ROW).build(); + TabletsMutator mut = context.getAmple().mutateTablets()) { + tm.forEach( + t -> mut.mutateTablet(t.getExtent()).putHostingGoal(TabletHostingGoal.ALWAYS).mutate()); + } + } + + private void addHostingGoalToRootTable(ServerContext context) { + addHostingGoalToSystemTable(context, RootTable.ID); + } + + private void addHostingGoalToMetadataTable(ServerContext context) { + addHostingGoalToSystemTable(context, MetadataTable.ID); + } + + private void addHostingGoalToUserTables(ServerContext context) { + try ( + TabletsMetadata tm = context.getAmple().readTablets().forLevel(DataLevel.USER) + .fetch(ColumnType.PREV_ROW).build(); + TabletsMutator mut = context.getAmple().mutateTablets()) { + tm.forEach( + t -> mut.mutateTablet(t.getExtent()).putHostingGoal(TabletHostingGoal.ONDEMAND).mutate()); + } + } + + private void deleteExternalCompactions(ServerContext context) { + // External compactions were only written for user tablets in 3.x and earlier, so only need to + // process the metadata table. The metadata related to an external compaction has changed so + // delete any that exists. Not using Ample in case there are problems deserializing the old + // external compaction metadata. + try (var scanner = context.createScanner(MetadataTable.NAME); + var writer = context.createBatchWriter(MetadataTable.NAME)) { + scanner.setRange(TabletsSection.getRange()); + scanner.fetchColumnFamily(ExternalCompactionColumnFamily.NAME); + + for (Map.Entry<Key,Value> entry : scanner) { + var key = entry.getKey(); + Mutation m = new Mutation(key.getRow()); + Preconditions.checkState(key.getColumnFamily().equals(ExternalCompactionColumnFamily.NAME), + "Expected family %s, saw %s ", ExternalCompactionColumnFamily.NAME, + key.getColumnVisibilityData()); + Preconditions.checkState(key.getColumnVisibilityData().length() == 0, + "Expected empty visibility, saw %s ", key.getColumnVisibilityData()); + m.putDelete(key.getColumnFamily(), key.getColumnQualifier()); + writer.addMutation(m); + } + } catch (Exception e) { + throw new IllegalStateException(e); + } + } +} diff --cc test/src/main/java/org/apache/accumulo/test/compaction/BadCompactionServiceConfigIT.java index 0000000000,5f5ec1dd4b..7fc9fde216 mode 000000,100644..100644 --- a/test/src/main/java/org/apache/accumulo/test/compaction/BadCompactionServiceConfigIT.java +++ b/test/src/main/java/org/apache/accumulo/test/compaction/BadCompactionServiceConfigIT.java @@@ -1,0 -1,233 +1,236 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.apache.accumulo.test.compaction; + + import static org.junit.jupiter.api.Assertions.assertEquals; + + import java.util.Collections; + import java.util.HashMap; + import java.util.List; + import java.util.Map; + import java.util.TreeSet; + import java.util.concurrent.ExecutorService; + import java.util.concurrent.Executors; + import java.util.concurrent.Future; + import java.util.stream.Collectors; + import java.util.stream.IntStream; + + import org.apache.accumulo.core.client.Accumulo; + import org.apache.accumulo.core.client.AccumuloClient; + import org.apache.accumulo.core.client.IteratorSetting; + import org.apache.accumulo.core.client.admin.CompactionConfig; + import org.apache.accumulo.core.client.admin.NewTableConfiguration; + import org.apache.accumulo.core.conf.Property; + import org.apache.accumulo.core.data.Key; + import org.apache.accumulo.core.data.Mutation; + import org.apache.accumulo.core.data.Value; + import org.apache.accumulo.core.iterators.Filter; + import org.apache.accumulo.core.spi.compaction.DefaultCompactionPlanner; + import org.apache.accumulo.harness.AccumuloClusterHarness; + import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; + import org.apache.hadoop.conf.Configuration; + import org.apache.hadoop.io.Text; + import org.junit.jupiter.api.AfterEach; + import org.junit.jupiter.api.BeforeEach; ++import org.junit.jupiter.api.Disabled; + import org.junit.jupiter.api.Test; + + import com.google.common.collect.MoreCollectors; + ++//ELASTICITY_TODO ++@Disabled + public class BadCompactionServiceConfigIT extends AccumuloClusterHarness { + + private static final String CSP = Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey(); + + @Override + public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { + Map<String,String> siteCfg = new HashMap<>(); + siteCfg.put(CSP + "cs1.planner", DefaultCompactionPlanner.class.getName()); + // place invalid json in the planners config + siteCfg.put(CSP + "cs1.planner.opts.executors", "{{'name]"); + cfg.setSiteConfig(siteCfg); + } + + public static class EverythingFilter extends Filter { + @Override + public boolean accept(Key k, Value v) { + return false; + } + } + + private ExecutorService executorService; + + @BeforeEach + public void setup() { + executorService = Executors.newCachedThreadPool(); + } + + @AfterEach + public void teardown() { + executorService.shutdownNow(); + } + + @Test + public void testUsingMisconfiguredService() throws Exception { + String table = getUniqueNames(1)[0]; + + // Create a table that is configured to use a compaction service with bad configuration. + try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { + NewTableConfiguration ntc = new NewTableConfiguration().setProperties( + Map.of(Property.TABLE_COMPACTION_DISPATCHER_OPTS.getKey() + "service", "cs1")); + client.tableOperations().create(table, ntc); + + try (var writer = client.createBatchWriter(table)) { + writer.addMutation(new Mutation("0").at().family("f").qualifier("q").put("v")); + } + + client.tableOperations().flush(table, null, null, true); + + try (var scanner = client.createScanner(table)) { + assertEquals("0", scanner.stream().map(e -> e.getKey().getRowData().toString()) + .collect(MoreCollectors.onlyElement())); + } + + Future<?> fixerFuture = executorService.submit(() -> { + try { + Thread.sleep(2000); + + // Verify the compaction has not run yet, it should not be able to with the bad config. + try (var scanner = client.createScanner(table)) { + assertEquals("0", scanner.stream().map(e -> e.getKey().getRowData().toString()) + .collect(MoreCollectors.onlyElement())); + } + + var value = + "[{'name':'small', 'type': 'internal', 'numThreads':1}]".replaceAll("'", "\""); + client.instanceOperations().setProperty(CSP + "cs1.planner.opts.executors", value); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + + List<IteratorSetting> iterators = + Collections.singletonList(new IteratorSetting(100, EverythingFilter.class)); + client.tableOperations().compact(table, + new CompactionConfig().setIterators(iterators).setWait(true)); + + // Verify compaction ran. + try (var scanner = client.createScanner(table)) { + assertEquals(0, scanner.stream().count()); + } + + fixerFuture.get(); + + // misconfigure the service, test how going from good config to bad config works. The test + // started with an initial state of bad config. + client.instanceOperations().setProperty(CSP + "cs1.planner.opts.executors", "]o.o["); + try (var writer = client.createBatchWriter(table)) { + writer.addMutation(new Mutation("0").at().family("f").qualifier("q").put("v")); + } + client.tableOperations().flush(table, null, null, true); + try (var scanner = client.createScanner(table)) { + assertEquals("0", scanner.stream().map(e -> e.getKey().getRowData().toString()) + .collect(MoreCollectors.onlyElement())); + } + fixerFuture = executorService.submit(() -> { + try { + Thread.sleep(2000); + var value = + "[{'name':'small', 'type': 'internal', 'numThreads':1}]".replaceAll("'", "\""); + client.instanceOperations().setProperty(CSP + "cs1.planner.opts.executors", value); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + + client.tableOperations().compact(table, + new CompactionConfig().setIterators(iterators).setWait(true)); + + // Verify compaction ran. + try (var scanner = client.createScanner(table)) { + assertEquals(0, scanner.stream().count()); + } + + fixerFuture.get(); + + } + } + + @Test + public void testUsingNonExistentService() throws Exception { + String table = getUniqueNames(1)[0]; + + // Create a table that is configured to use a compaction service that does not exist + try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { + NewTableConfiguration ntc = new NewTableConfiguration().setProperties( + Map.of(Property.TABLE_COMPACTION_DISPATCHER_OPTS.getKey() + "service", "cs5")); + client.tableOperations().create(table, ntc); + + // Add splits so that the tserver logs can manually be inspected to ensure they are not + // spammed. Not sure how to check this automatically. + var splits = IntStream.range(1, 10).mapToObj(i -> new Text(i + "")) + .collect(Collectors.toCollection(TreeSet::new)); + client.tableOperations().addSplits(table, splits); + + try (var writer = client.createBatchWriter(table)) { + writer.addMutation(new Mutation("0").at().family("f").qualifier("q").put("v")); + } + + client.tableOperations().flush(table, null, null, true); + + try (var scanner = client.createScanner(table)) { + assertEquals("0", scanner.stream().map(e -> e.getKey().getRowData().toString()) + .collect(MoreCollectors.onlyElement())); + } + + // Create a thread to fix the compaction config after a bit. + Future<?> fixerFuture = executorService.submit(() -> { + try { + Thread.sleep(2000); + + // Verify the compaction has not run yet, it should not be able to with the bad config. + try (var scanner = client.createScanner(table)) { + assertEquals("0", scanner.stream().map(e -> e.getKey().getRowData().toString()) + .collect(MoreCollectors.onlyElement())); + } + + // fix the compaction dispatcher config + client.tableOperations().setProperty(table, + Property.TABLE_COMPACTION_DISPATCHER_OPTS.getKey() + "service", "default"); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + + List<IteratorSetting> iterators = + Collections.singletonList(new IteratorSetting(100, EverythingFilter.class)); + client.tableOperations().compact(table, + new CompactionConfig().setIterators(iterators).setWait(true)); + + // Verify compaction ran. + try (var scanner = client.createScanner(table)) { + assertEquals(0, scanner.stream().count()); + } + + fixerFuture.get(); + + } + } + } diff --cc test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java index af13e5e576,35630437ab..bc763fc5f8 --- a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java +++ b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java @@@ -31,10 -33,13 +31,12 @@@ import static org.apache.accumulo.test. import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.row; import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.verify; import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.writeData; + import static org.apache.accumulo.test.util.FileMetadataUtil.countFencedFiles; + import static org.apache.accumulo.test.util.FileMetadataUtil.splitFilesIntoRanges; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.junit.jupiter.api.Assertions.fail; import java.io.IOException; import java.util.ArrayList; @@@ -43,14 -48,18 +45,15 @@@ import java.util.EnumSet import java.util.List; import java.util.Map; import java.util.Map.Entry; + import java.util.Set; import java.util.SortedSet; import java.util.TreeSet; -import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; -import java.util.stream.Stream; -import org.apache.accumulo.compactor.Compactor; import org.apache.accumulo.compactor.ExtCEnv.CompactorIterEnv; -import org.apache.accumulo.coordinator.CompactionCoordinator; import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.BatchWriter; import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.client.Scanner; @@@ -419,6 -454,62 +423,58 @@@ public class ExternalCompaction_1_IT ex } } + @Test + public void testExternalCompactionWithFencedFiles() throws Exception { + String[] names = this.getUniqueNames(2); + try (AccumuloClient client = + Accumulo.newClient().from(getCluster().getClientProperties()).build()) { + + String table1 = names[0]; + createTable(client, table1, "cs1"); + + String table2 = names[1]; + createTable(client, table2, "cs2"); + + writeData(client, table1); + writeData(client, table2); + + // Verify that all data can be seen + verify(client, table1, 1, MAX_DATA); + verify(client, table2, 1, MAX_DATA); + + // Split file in table1 into two files each fenced off by 100 rows for a total of 200 + splitFilesIntoRanges(getCluster().getServerContext(), table1, + Set.of(new Range(new Text(row(100)), new Text(row(199))), + new Range(new Text(row(300)), new Text(row(399))))); + assertEquals(2, countFencedFiles(getCluster().getServerContext(), table1)); + + // Fence file in table2 to 600 rows + splitFilesIntoRanges(getCluster().getServerContext(), table2, + Set.of(new Range(new Text(row(200)), new Text(row(799))))); + assertEquals(1, countFencedFiles(getCluster().getServerContext(), table2)); + + // Verify that a subset of the data is now seen after fencing + verify(client, table1, 1, 200); + verify(client, table2, 1, 600); + - getCluster().getClusterControl().startCoordinator(CompactionCoordinator.class); - getCluster().getClusterControl().startCompactors(Compactor.class, 1, QUEUE1); - getCluster().getClusterControl().startCompactors(Compactor.class, 1, QUEUE2); - + // Compact and verify previousy fenced data didn't come back - compact(client, table1, 2, QUEUE1, true); ++ compact(client, table1, 2, GROUP1, true); + verify(client, table1, 2, 200); + + SortedSet<Text> splits = new TreeSet<>(); + splits.add(new Text(row(MAX_DATA / 2))); + client.tableOperations().addSplits(table2, splits); + + // Compact and verify previousy fenced data didn't come back - compact(client, table2, 3, QUEUE2, true); ++ compact(client, table2, 3, GROUP2, true); + verify(client, table2, 3, 600); + + // should be no more fenced files after compaction + assertEquals(0, countFencedFiles(getCluster().getServerContext(), table1)); + assertEquals(0, countFencedFiles(getCluster().getServerContext(), table2)); + } + } + public static class FSelector implements CompactionSelector { @Override