This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit 8a886ce2c949c1cefff7fa38908017526dc4316b
Merge: 8926fe7a75 a4b4f540c8
Author: Keith Turner <ktur...@apache.org>
AuthorDate: Thu Nov 9 15:03:31 2023 -0500

    Merge branch 'main' into elasticity

 .../core/metadata/schema/RootTabletMetadata.java   |  65 ++++-
 .../schema/UpgraderDeprecatedConstants.java        |   8 +-
 .../metadata/schema/RootTabletMetadataTest.java    | 119 ++++++++
 .../accumulo/server/AccumuloDataVersion.java       |   4 +-
 .../server/compaction/CompactionJobGenerator.java  |  51 +++-
 .../compaction/ProvisionalCompactionPlanner.java   |  67 +++++
 .../server/constraints/MetadataConstraints.java    |   5 +-
 server/manager/pom.xml                             |   4 +
 .../accumulo/manager/upgrade/Upgrader11to12.java   | 250 ++++++++++------
 .../{Upgrader11to12.java => Upgrader12to13.java}   |   4 +-
 .../manager/upgrade/Upgrader11to12Test.java        | 317 +++++++++++++++++++++
 .../java/org/apache/accumulo/test/MetaSplitIT.java |  62 ++++
 .../compaction/BadCompactionServiceConfigIT.java   | 236 +++++++++++++++
 .../test/compaction/ExternalCompaction_1_IT.java   |  56 ++++
 .../test/functional/FunctionalTestUtils.java       |   7 +-
 .../test/functional/HalfClosedTabletIT.java        |   5 +
 .../apache/accumulo/test/lock/ServiceLockIT.java   |  14 +-
 .../accumulo/test/util/FileMetadataUtil.java       |  15 +
 18 files changed, 1162 insertions(+), 127 deletions(-)

diff --cc 
core/src/main/java/org/apache/accumulo/core/metadata/schema/RootTabletMetadata.java
index e1e638f7d4,8a39e34798..c5144bb9c4
--- 
a/core/src/main/java/org/apache/accumulo/core/metadata/schema/RootTabletMetadata.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/metadata/schema/RootTabletMetadata.java
@@@ -166,9 -205,14 +217,13 @@@ public class RootTabletMetadata 
              .map(qualVal -> new SimpleImmutableEntry<>(
                  new Key(row, famToQualVal.getKey(), qualVal.getKey(), 1),
                  new Value(qualVal.getValue()))));
 -    return TabletMetadata.convertRow(entries.iterator(),
 -        EnumSet.allOf(TabletMetadata.ColumnType.class), false);
 +    return entries;
    }
  
+   public static boolean needsUpgrade(final String json) {
+     return Data.needsUpgrade(json);
+   }
+ 
    /**
     * @return a JSON representation of the root tablet's data.
     */
diff --cc 
server/base/src/main/java/org/apache/accumulo/server/AccumuloDataVersion.java
index 8d60990a74,4aae44a974..a96721988e
--- 
a/server/base/src/main/java/org/apache/accumulo/server/AccumuloDataVersion.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/AccumuloDataVersion.java
@@@ -77,10 -77,8 +77,8 @@@ public class AccumuloDataVersion 
      return CURRENT_VERSION;
    }
  
-   // TODO - this disables upgrades until 
https://github.com/apache/accumulo/issues/3768 is done
-   // public static final Set<Integer> CAN_RUN = Set.of(
-   // REMOVE_DEPRECATIONS_FOR_VERSION_3, METADATA_FILE_JSON_ENCODING, 
CURRENT_VERSION);
 -  public static final Set<Integer> CAN_RUN =
 -      Set.of(ROOT_TABLET_META_CHANGES, REMOVE_DEPRECATIONS_FOR_VERSION_3, 
CURRENT_VERSION);
++  // ELASTICITY_TODO get upgrade working
 +  public static final Set<Integer> CAN_RUN = Set.of(CURRENT_VERSION);
  
    /**
     * Get the stored, current working version.
diff --cc 
server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionJobGenerator.java
index bf9c42516a,0000000000..409873c073
mode 100644,000000..100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionJobGenerator.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionJobGenerator.java
@@@ -1,281 -1,0 +1,308 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   https://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.accumulo.server.compaction;
 +
 +import java.util.ArrayList;
 +import java.util.Collection;
 +import java.util.Collections;
 +import java.util.HashMap;
 +import java.util.HashSet;
 +import java.util.Map;
 +import java.util.Optional;
 +import java.util.Set;
++import java.util.concurrent.TimeUnit;
 +import java.util.stream.Collectors;
 +
 +import org.apache.accumulo.core.client.PluginEnvironment;
 +import org.apache.accumulo.core.client.admin.compaction.CompactableFile;
 +import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.core.data.TableId;
 +import org.apache.accumulo.core.metadata.CompactableFileImpl;
 +import org.apache.accumulo.core.metadata.schema.TabletMetadata;
 +import org.apache.accumulo.core.spi.common.ServiceEnvironment;
 +import org.apache.accumulo.core.spi.compaction.CompactionDispatcher;
 +import org.apache.accumulo.core.spi.compaction.CompactionJob;
 +import org.apache.accumulo.core.spi.compaction.CompactionKind;
 +import org.apache.accumulo.core.spi.compaction.CompactionPlan;
 +import org.apache.accumulo.core.spi.compaction.CompactionPlanner;
 +import org.apache.accumulo.core.spi.compaction.CompactionServiceId;
 +import org.apache.accumulo.core.spi.compaction.CompactionServices;
++import org.apache.accumulo.core.util.Pair;
 +import org.apache.accumulo.core.util.cache.Caches;
 +import org.apache.accumulo.core.util.cache.Caches.CacheName;
 +import org.apache.accumulo.core.util.compaction.CompactionJobImpl;
 +import org.apache.accumulo.core.util.compaction.CompactionPlanImpl;
 +import org.apache.accumulo.core.util.compaction.CompactionPlannerInitParams;
 +import org.apache.accumulo.core.util.compaction.CompactionServicesConfig;
++import org.slf4j.Logger;
++import org.slf4j.LoggerFactory;
 +
 +import com.github.benmanes.caffeine.cache.Cache;
++import com.github.benmanes.caffeine.cache.Caffeine;
 +
 +public class CompactionJobGenerator {
- 
++  private static final Logger log = 
LoggerFactory.getLogger(CompactionJobGenerator.class);
 +  private final CompactionServicesConfig servicesConfig;
 +  private final Map<CompactionServiceId,CompactionPlanner> planners = new 
HashMap<>();
 +  private final Cache<TableId,CompactionDispatcher> dispatchers;
 +  private final Set<CompactionServiceId> serviceIds;
 +  private final PluginEnvironment env;
 +  private final Map<Long,Map<String,String>> allExecutionHints;
++  private final Cache<Pair<TableId,CompactionServiceId>,Long> 
unknownCompactionServiceErrorCache;
 +
 +  public CompactionJobGenerator(PluginEnvironment env,
 +      Map<Long,Map<String,String>> executionHints) {
 +    servicesConfig = new CompactionServicesConfig(env.getConfiguration());
 +    serviceIds = 
servicesConfig.getPlanners().keySet().stream().map(CompactionServiceId::of)
 +        .collect(Collectors.toUnmodifiableSet());
 +
 +    dispatchers = 
Caches.getInstance().createNewBuilder(CacheName.COMPACTION_DISPATCHERS, false)
 +        .maximumSize(10).build();
 +    this.env = env;
 +    if (executionHints.isEmpty()) {
 +      this.allExecutionHints = executionHints;
 +    } else {
 +      this.allExecutionHints = new HashMap<>();
 +      // Make the maps that will be passed to plugins unmodifiable. Do this 
once, so it does not
 +      // need to be done for each tablet.
 +      executionHints.forEach((k, v) -> allExecutionHints.put(k,
 +          v.isEmpty() ? Map.of() : Collections.unmodifiableMap(v)));
 +    }
++    unknownCompactionServiceErrorCache =
++        Caffeine.newBuilder().expireAfterWrite(5, TimeUnit.MINUTES).build();
 +  }
 +
 +  public Collection<CompactionJob> generateJobs(TabletMetadata tablet, 
Set<CompactionKind> kinds) {
 +
 +    // ELASTICITY_TODO do not want user configured plugins to cause 
exceptions that prevents tablets
 +    // from being
 +    // assigned. So probably want to catch exceptions and log, but not too 
spammily OR some how
 +    // report something
 +    // back to the manager so it can log.
 +
 +    Collection<CompactionJob> systemJobs = Set.of();
 +
 +    if (kinds.contains(CompactionKind.SYSTEM)) {
 +      CompactionServiceId serviceId = dispatch(CompactionKind.SYSTEM, tablet, 
Map.of());
 +      systemJobs = planCompactions(serviceId, CompactionKind.SYSTEM, tablet, 
Map.of());
 +    }
 +
 +    Collection<CompactionJob> userJobs = Set.of();
 +
 +    if (kinds.contains(CompactionKind.USER) && tablet.getSelectedFiles() != 
null) {
 +      var hints = 
allExecutionHints.get(tablet.getSelectedFiles().getFateTxId());
 +      if (hints != null) {
 +        CompactionServiceId serviceId = dispatch(CompactionKind.USER, tablet, 
hints);
 +        userJobs = planCompactions(serviceId, CompactionKind.USER, tablet, 
hints);
 +      }
 +    }
 +
 +    if (userJobs.isEmpty()) {
 +      return systemJobs;
 +    } else if (systemJobs.isEmpty()) {
 +      return userJobs;
 +    } else {
 +      var all = new ArrayList<CompactionJob>(systemJobs.size() + 
userJobs.size());
 +      all.addAll(systemJobs);
 +      all.addAll(userJobs);
 +      return all;
 +    }
 +  }
 +
 +  private CompactionServiceId dispatch(CompactionKind kind, TabletMetadata 
tablet,
 +      Map<String,String> executionHints) {
 +
 +    CompactionDispatcher dispatcher = dispatchers.get(tablet.getTableId(),
 +        tableId -> 
CompactionPluginUtils.createDispatcher((ServiceEnvironment) env, tableId));
 +
 +    CompactionDispatcher.DispatchParameters dispatchParams =
 +        new CompactionDispatcher.DispatchParameters() {
 +          @Override
 +          public CompactionServices getCompactionServices() {
 +            return () -> serviceIds;
 +          }
 +
 +          @Override
 +          public ServiceEnvironment getServiceEnv() {
 +            return (ServiceEnvironment) env;
 +          }
 +
 +          @Override
 +          public CompactionKind getCompactionKind() {
 +            return kind;
 +          }
 +
 +          @Override
 +          public Map<String,String> getExecutionHints() {
 +            return executionHints;
 +          }
 +        };
 +
 +    return dispatcher.dispatch(dispatchParams).getService();
 +  }
 +
 +  private Collection<CompactionJob> planCompactions(CompactionServiceId 
serviceId,
 +      CompactionKind kind, TabletMetadata tablet, Map<String,String> 
executionHints) {
 +
++    if (!servicesConfig.getPlanners().containsKey(serviceId.canonical())) {
++      var cacheKey = new Pair<>(tablet.getTableId(), serviceId);
++      var last = unknownCompactionServiceErrorCache.getIfPresent(cacheKey);
++      if (last == null) {
++        // have not logged an error recently for this, so lets log one
++        log.error(
++            "Tablet {} returned non-existent compaction service {} for 
compaction type {}.  Check"
++                + " the table compaction dispatcher configuration. No 
compactions will happen"
++                + " until the configuration is fixed. This log message is 
temporarily suppressed for the"
++                + " entire table.",
++            tablet.getExtent(), serviceId, kind);
++        unknownCompactionServiceErrorCache.put(cacheKey, 
System.currentTimeMillis());
++      }
++
++      return Set.of();
++    }
++
 +    CompactionPlanner planner =
 +        planners.computeIfAbsent(serviceId, sid -> 
createPlanner(tablet.getTableId(), serviceId));
 +
 +    // selecting indicator
 +    // selected files
 +
 +    String ratioStr =
 +        
env.getConfiguration(tablet.getTableId()).get(Property.TABLE_MAJC_RATIO.getKey());
 +    if (ratioStr == null) {
 +      ratioStr = Property.TABLE_MAJC_RATIO.getDefaultValue();
 +    }
 +
 +    double ratio = Double.parseDouble(ratioStr);
 +
 +    Set<CompactableFile> allFiles = tablet.getFilesMap().entrySet().stream()
 +        .map(entry -> new CompactableFileImpl(entry.getKey(), 
entry.getValue()))
 +        .collect(Collectors.toUnmodifiableSet());
 +    Set<CompactableFile> candidates;
 +
 +    if (kind == CompactionKind.SYSTEM) {
 +      if (tablet.getExternalCompactions().isEmpty() && 
tablet.getSelectedFiles() == null) {
 +        candidates = allFiles;
 +      } else {
 +        var tmpFiles = new HashMap<>(tablet.getFilesMap());
 +        // remove any files that are in active compactions
 +        tablet.getExternalCompactions().values().stream().flatMap(ecm -> 
ecm.getJobFiles().stream())
 +            .forEach(tmpFiles::remove);
 +        // remove any files that are selected
 +        if (tablet.getSelectedFiles() != null) {
 +          tmpFiles.keySet().removeAll(tablet.getSelectedFiles().getFiles());
 +        }
 +        candidates = tmpFiles.entrySet().stream()
 +            .map(entry -> new CompactableFileImpl(entry.getKey(), 
entry.getValue()))
 +            .collect(Collectors.toUnmodifiableSet());
 +      }
 +    } else if (kind == CompactionKind.USER) {
 +      var selectedFiles = new HashSet<>(tablet.getSelectedFiles().getFiles());
 +      tablet.getExternalCompactions().values().stream().flatMap(ecm -> 
ecm.getJobFiles().stream())
 +          .forEach(selectedFiles::remove);
 +      candidates = selectedFiles.stream()
 +          .map(file -> new CompactableFileImpl(file, 
tablet.getFilesMap().get(file)))
 +          .collect(Collectors.toUnmodifiableSet());
 +    } else {
 +      throw new UnsupportedOperationException();
 +    }
 +
 +    if (candidates.isEmpty()) {
 +      // there are not candidate files for compaction, so no reason to call 
the planner
 +      return Set.of();
 +    }
 +
 +    CompactionPlanner.PlanningParameters params = new 
CompactionPlanner.PlanningParameters() {
 +      @Override
 +      public TableId getTableId() {
 +        return tablet.getTableId();
 +      }
 +
 +      @Override
 +      public ServiceEnvironment getServiceEnvironment() {
 +        return (ServiceEnvironment) env;
 +      }
 +
 +      @Override
 +      public CompactionKind getKind() {
 +        return kind;
 +      }
 +
 +      @Override
 +      public double getRatio() {
 +        return ratio;
 +      }
 +
 +      @Override
 +      public Collection<CompactableFile> getAll() {
 +        return allFiles;
 +      }
 +
 +      @Override
 +      public Collection<CompactableFile> getCandidates() {
 +        return candidates;
 +      }
 +
 +      @Override
 +      public Collection<CompactionJob> getRunningCompactions() {
 +        var allFiles2 = tablet.getFilesMap();
 +        return tablet.getExternalCompactions().values().stream().map(ecMeta 
-> {
 +          Collection<CompactableFile> files = ecMeta.getJobFiles().stream()
 +              .map(f -> new CompactableFileImpl(f, 
allFiles2.get(f))).collect(Collectors.toList());
 +          CompactionJob job = new CompactionJobImpl(ecMeta.getPriority(),
 +              ecMeta.getCompactionExecutorId(), files, ecMeta.getKind(), 
Optional.empty());
 +          return job;
 +        }).collect(Collectors.toUnmodifiableList());
 +      }
 +
 +      @Override
 +      public Map<String,String> getExecutionHints() {
 +        return executionHints;
 +      }
 +
 +      @Override
 +      public CompactionPlan.Builder createPlanBuilder() {
 +        return new CompactionPlanImpl.BuilderImpl(kind, allFiles, candidates);
 +      }
 +    };
 +
 +    return planner.makePlan(params).getJobs();
 +  }
 +
 +  private CompactionPlanner createPlanner(TableId tableId, 
CompactionServiceId serviceId) {
 +
-     String plannerClassName = 
servicesConfig.getPlanners().get(serviceId.canonical());
- 
-     CompactionPlanner planner = null;
++    CompactionPlanner planner;
++    String plannerClassName = null;
++    Map<String,String> options = null;
 +    try {
++      plannerClassName = 
servicesConfig.getPlanners().get(serviceId.canonical());
++      options = servicesConfig.getOptions().get(serviceId.canonical());
 +      planner = env.instantiate(tableId, plannerClassName, 
CompactionPlanner.class);
-     } catch (ReflectiveOperationException e) {
-       throw new RuntimeException(e);
++      CompactionPlannerInitParams initParameters = new 
CompactionPlannerInitParams(serviceId,
++          servicesConfig.getOptions().get(serviceId.canonical()), 
(ServiceEnvironment) env);
++      planner.init(initParameters);
++    } catch (Exception e) {
++      log.error(
++          "Failed to create compaction planner for {} using class:{} 
options:{}.  Compaction service will not start any new compactions until its 
configuration is fixed.",
++          serviceId, plannerClassName, options, e);
++      planner = new ProvisionalCompactionPlanner(serviceId);
 +    }
- 
-     CompactionPlannerInitParams initParameters = new 
CompactionPlannerInitParams(serviceId,
-         servicesConfig.getOptions().get(serviceId.canonical()), 
(ServiceEnvironment) env);
- 
-     planner.init(initParameters);
- 
 +    return planner;
 +  }
 +}
diff --cc 
server/base/src/main/java/org/apache/accumulo/server/compaction/ProvisionalCompactionPlanner.java
index 0000000000,1d27e9f6e1..e3e870eb92
mode 000000,100644..100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/compaction/ProvisionalCompactionPlanner.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/compaction/ProvisionalCompactionPlanner.java
@@@ -1,0 -1,67 +1,67 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *   https://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing,
+  * software distributed under the License is distributed on an
+  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  * KIND, either express or implied.  See the License for the
+  * specific language governing permissions and limitations
+  * under the License.
+  */
 -package org.apache.accumulo.tserver.compactions;
++package org.apache.accumulo.server.compaction;
+ 
+ import java.util.concurrent.TimeUnit;
+ import java.util.concurrent.atomic.AtomicLong;
+ 
+ import org.apache.accumulo.core.spi.compaction.CompactionPlan;
+ import org.apache.accumulo.core.spi.compaction.CompactionPlanner;
+ import org.apache.accumulo.core.spi.compaction.CompactionServiceId;
+ import org.slf4j.LoggerFactory;
+ 
+ /**
+  * A compaction planner that makes no plans and is intended to be used 
temporarily when a compaction
+  * service has no compaction planner because it is misconfigured.
+  */
+ public class ProvisionalCompactionPlanner implements CompactionPlanner {
+ 
+   private final CompactionServiceId serviceId;
+   private AtomicLong lastWarnNanoTime = new AtomicLong(System.nanoTime());
+ 
+   public ProvisionalCompactionPlanner(CompactionServiceId serviceId) {
+     this.serviceId = serviceId;
+   }
+ 
+   @Override
+   public void init(InitParameters params) {
+ 
+   }
+ 
+   @Override
+   public CompactionPlan makePlan(PlanningParameters params) {
+     var nanoTime = System.nanoTime();
+     var updatedTime = lastWarnNanoTime.updateAndGet(last -> {
+       if (nanoTime - last > TimeUnit.MINUTES.toNanos(5)) {
+         return nanoTime;
+       }
+ 
+       return last;
+     });
+ 
+     if (updatedTime == nanoTime) {
+       LoggerFactory.getLogger(ProvisionalCompactionPlanner.class)
+           .error("The compaction service "
+               + "'{}' is currently disabled, likely because it has bad 
configuration. No "
+               + "compactions will occur on this service until it is fixed.", 
serviceId);
+     }
+ 
+     return params.createPlanBuilder().build();
+   }
+ }
diff --cc 
server/base/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java
index db71c37730,936d7f16ef..10e1aa9fc1
--- 
a/server/base/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java
@@@ -53,8 -51,7 +53,9 @@@ import org.apache.accumulo.core.metadat
  import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily;
  import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.SuspendLocationColumn;
  import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily;
 +import org.apache.accumulo.core.metadata.schema.SelectedFiles;
 +import org.apache.accumulo.core.metadata.schema.TabletOperationId;
+ import org.apache.accumulo.core.metadata.schema.UpgraderDeprecatedConstants;
  import org.apache.accumulo.core.util.ColumnFQ;
  import org.apache.accumulo.core.util.cleaner.CleanerUtil;
  import org.apache.accumulo.server.ServerContext;
@@@ -102,7 -98,8 +103,9 @@@ public class MetadataConstraints implem
            FutureLocationColumnFamily.NAME,
            ClonedColumnFamily.NAME,
            ExternalCompactionColumnFamily.NAME,
-               CompactedColumnFamily.NAME);
++              CompactedColumnFamily.NAME,
+           UpgraderDeprecatedConstants.ChoppedColumnFamily.NAME
+       );
    // @formatter:on
  
    private static boolean isValidColumn(ColumnUpdate cu) {
diff --cc 
server/manager/src/main/java/org/apache/accumulo/manager/upgrade/Upgrader12to13.java
index f1d11712d0,0000000000..1906d05ae5
mode 100644,000000..100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/Upgrader12to13.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/Upgrader12to13.java
@@@ -1,147 -1,0 +1,147 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   https://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.accumulo.manager.upgrade;
 +
 +import static 
org.apache.accumulo.core.metadata.schema.MetadataSchema.RESERVED_PREFIX;
 +
 +import java.util.Map;
 +
 +import org.apache.accumulo.core.client.admin.TabletHostingGoal;
 +import org.apache.accumulo.core.data.Key;
 +import org.apache.accumulo.core.data.Mutation;
 +import org.apache.accumulo.core.data.TableId;
 +import org.apache.accumulo.core.data.Value;
 +import org.apache.accumulo.core.metadata.MetadataTable;
 +import org.apache.accumulo.core.metadata.RootTable;
 +import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
 +import org.apache.accumulo.core.metadata.schema.Ample.TabletsMutator;
 +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
 +import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ExternalCompactionColumnFamily;
 +import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType;
 +import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
 +import org.apache.accumulo.core.schema.Section;
 +import org.apache.accumulo.server.ServerContext;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import com.google.common.base.Preconditions;
 +
- public class Upgrader11to12 implements Upgrader {
++public class Upgrader12to13 implements Upgrader {
 +
-   private static final Logger LOG = 
LoggerFactory.getLogger(Upgrader11to12.class);
++  private static final Logger LOG = 
LoggerFactory.getLogger(Upgrader12to13.class);
 +
 +  @Override
 +  public void upgradeZookeeper(ServerContext context) {
 +    LOG.info("setting root table stored hosting goal");
 +    addHostingGoalToRootTable(context);
 +  }
 +
 +  @Override
 +  public void upgradeRoot(ServerContext context) {
 +    LOG.info("setting metadata table hosting goal");
 +    addHostingGoalToMetadataTable(context);
 +  }
 +
 +  @Override
 +  public void upgradeMetadata(ServerContext context) {
 +    LOG.info("setting hosting goal on user tables");
 +    addHostingGoalToUserTables(context);
 +    deleteExternalCompactionFinalStates(context);
 +    deleteExternalCompactions(context);
 +  }
 +
 +  private void deleteExternalCompactionFinalStates(ServerContext context) {
 +    // This metadata was only written for user tablets as part of the 
compaction commit process.
 +    // Compactions are committed in a completely different way now, so delete 
these entries. Its
 +    // possible some completed compactions may need to be redone, but 
processing these entries would
 +    // not be easy to test so its better for correctness to delete them and 
redo the work.
 +    try (var scanner = context.createScanner(MetadataTable.NAME);
 +        var writer = context.createBatchWriter(MetadataTable.NAME)) {
 +      var section = new Section(RESERVED_PREFIX + "ecomp", true, 
RESERVED_PREFIX + "ecomq", false);
 +      scanner.setRange(section.getRange());
 +
 +      for (Map.Entry<Key,Value> entry : scanner) {
 +        var key = entry.getKey();
 +        var row = key.getRow();
 +        
Preconditions.checkState(row.toString().startsWith(section.getRowPrefix()));
 +        Mutation m = new Mutation(row);
 +        Preconditions.checkState(key.getColumnVisibilityData().length() == 0,
 +            "Expected empty visibility, saw %s ", 
key.getColumnVisibilityData());
 +        m.putDelete(key.getColumnFamily(), key.getColumnQualifier());
 +        writer.addMutation(m);
 +      }
 +    } catch (Exception e) {
 +      throw new IllegalStateException(e);
 +    }
 +  }
 +
 +  private void addHostingGoalToSystemTable(ServerContext context, TableId 
tableId) {
 +    try (
 +        TabletsMetadata tm =
 +            
context.getAmple().readTablets().forTable(tableId).fetch(ColumnType.PREV_ROW).build();
 +        TabletsMutator mut = context.getAmple().mutateTablets()) {
 +      tm.forEach(
 +          t -> 
mut.mutateTablet(t.getExtent()).putHostingGoal(TabletHostingGoal.ALWAYS).mutate());
 +    }
 +  }
 +
 +  private void addHostingGoalToRootTable(ServerContext context) {
 +    addHostingGoalToSystemTable(context, RootTable.ID);
 +  }
 +
 +  private void addHostingGoalToMetadataTable(ServerContext context) {
 +    addHostingGoalToSystemTable(context, MetadataTable.ID);
 +  }
 +
 +  private void addHostingGoalToUserTables(ServerContext context) {
 +    try (
 +        TabletsMetadata tm = 
context.getAmple().readTablets().forLevel(DataLevel.USER)
 +            .fetch(ColumnType.PREV_ROW).build();
 +        TabletsMutator mut = context.getAmple().mutateTablets()) {
 +      tm.forEach(
 +          t -> 
mut.mutateTablet(t.getExtent()).putHostingGoal(TabletHostingGoal.ONDEMAND).mutate());
 +    }
 +  }
 +
 +  private void deleteExternalCompactions(ServerContext context) {
 +    // External compactions were only written for user tablets in 3.x and 
earlier, so only need to
 +    // process the metadata table. The metadata related to an external 
compaction has changed so
 +    // delete any that exists. Not using Ample in case there are problems 
deserializing the old
 +    // external compaction metadata.
 +    try (var scanner = context.createScanner(MetadataTable.NAME);
 +        var writer = context.createBatchWriter(MetadataTable.NAME)) {
 +      scanner.setRange(TabletsSection.getRange());
 +      scanner.fetchColumnFamily(ExternalCompactionColumnFamily.NAME);
 +
 +      for (Map.Entry<Key,Value> entry : scanner) {
 +        var key = entry.getKey();
 +        Mutation m = new Mutation(key.getRow());
 +        
Preconditions.checkState(key.getColumnFamily().equals(ExternalCompactionColumnFamily.NAME),
 +            "Expected family %s, saw %s ", 
ExternalCompactionColumnFamily.NAME,
 +            key.getColumnVisibilityData());
 +        Preconditions.checkState(key.getColumnVisibilityData().length() == 0,
 +            "Expected empty visibility, saw %s ", 
key.getColumnVisibilityData());
 +        m.putDelete(key.getColumnFamily(), key.getColumnQualifier());
 +        writer.addMutation(m);
 +      }
 +    } catch (Exception e) {
 +      throw new IllegalStateException(e);
 +    }
 +  }
 +}
diff --cc 
test/src/main/java/org/apache/accumulo/test/compaction/BadCompactionServiceConfigIT.java
index 0000000000,5f5ec1dd4b..7fc9fde216
mode 000000,100644..100644
--- 
a/test/src/main/java/org/apache/accumulo/test/compaction/BadCompactionServiceConfigIT.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/compaction/BadCompactionServiceConfigIT.java
@@@ -1,0 -1,233 +1,236 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *   https://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing,
+  * software distributed under the License is distributed on an
+  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  * KIND, either express or implied.  See the License for the
+  * specific language governing permissions and limitations
+  * under the License.
+  */
+ package org.apache.accumulo.test.compaction;
+ 
+ import static org.junit.jupiter.api.Assertions.assertEquals;
+ 
+ import java.util.Collections;
+ import java.util.HashMap;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.TreeSet;
+ import java.util.concurrent.ExecutorService;
+ import java.util.concurrent.Executors;
+ import java.util.concurrent.Future;
+ import java.util.stream.Collectors;
+ import java.util.stream.IntStream;
+ 
+ import org.apache.accumulo.core.client.Accumulo;
+ import org.apache.accumulo.core.client.AccumuloClient;
+ import org.apache.accumulo.core.client.IteratorSetting;
+ import org.apache.accumulo.core.client.admin.CompactionConfig;
+ import org.apache.accumulo.core.client.admin.NewTableConfiguration;
+ import org.apache.accumulo.core.conf.Property;
+ import org.apache.accumulo.core.data.Key;
+ import org.apache.accumulo.core.data.Mutation;
+ import org.apache.accumulo.core.data.Value;
+ import org.apache.accumulo.core.iterators.Filter;
+ import org.apache.accumulo.core.spi.compaction.DefaultCompactionPlanner;
+ import org.apache.accumulo.harness.AccumuloClusterHarness;
+ import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
+ import org.apache.hadoop.conf.Configuration;
+ import org.apache.hadoop.io.Text;
+ import org.junit.jupiter.api.AfterEach;
+ import org.junit.jupiter.api.BeforeEach;
++import org.junit.jupiter.api.Disabled;
+ import org.junit.jupiter.api.Test;
+ 
+ import com.google.common.collect.MoreCollectors;
+ 
++//ELASTICITY_TODO
++@Disabled
+ public class BadCompactionServiceConfigIT extends AccumuloClusterHarness {
+ 
+   private static final String CSP = 
Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey();
+ 
+   @Override
+   public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration 
hadoopCoreSite) {
+     Map<String,String> siteCfg = new HashMap<>();
+     siteCfg.put(CSP + "cs1.planner", 
DefaultCompactionPlanner.class.getName());
+     // place invalid json in the planners config
+     siteCfg.put(CSP + "cs1.planner.opts.executors", "{{'name]");
+     cfg.setSiteConfig(siteCfg);
+   }
+ 
+   public static class EverythingFilter extends Filter {
+     @Override
+     public boolean accept(Key k, Value v) {
+       return false;
+     }
+   }
+ 
+   private ExecutorService executorService;
+ 
+   @BeforeEach
+   public void setup() {
+     executorService = Executors.newCachedThreadPool();
+   }
+ 
+   @AfterEach
+   public void teardown() {
+     executorService.shutdownNow();
+   }
+ 
+   @Test
+   public void testUsingMisconfiguredService() throws Exception {
+     String table = getUniqueNames(1)[0];
+ 
+     // Create a table that is configured to use a compaction service with bad 
configuration.
+     try (AccumuloClient client = 
Accumulo.newClient().from(getClientProps()).build()) {
+       NewTableConfiguration ntc = new NewTableConfiguration().setProperties(
+           Map.of(Property.TABLE_COMPACTION_DISPATCHER_OPTS.getKey() + 
"service", "cs1"));
+       client.tableOperations().create(table, ntc);
+ 
+       try (var writer = client.createBatchWriter(table)) {
+         writer.addMutation(new 
Mutation("0").at().family("f").qualifier("q").put("v"));
+       }
+ 
+       client.tableOperations().flush(table, null, null, true);
+ 
+       try (var scanner = client.createScanner(table)) {
+         assertEquals("0", scanner.stream().map(e -> 
e.getKey().getRowData().toString())
+             .collect(MoreCollectors.onlyElement()));
+       }
+ 
+       Future<?> fixerFuture = executorService.submit(() -> {
+         try {
+           Thread.sleep(2000);
+ 
+           // Verify the compaction has not run yet, it should not be able to 
with the bad config.
+           try (var scanner = client.createScanner(table)) {
+             assertEquals("0", scanner.stream().map(e -> 
e.getKey().getRowData().toString())
+                 .collect(MoreCollectors.onlyElement()));
+           }
+ 
+           var value =
+               "[{'name':'small', 'type': 'internal', 
'numThreads':1}]".replaceAll("'", "\"");
+           client.instanceOperations().setProperty(CSP + 
"cs1.planner.opts.executors", value);
+         } catch (Exception e) {
+           throw new RuntimeException(e);
+         }
+       });
+ 
+       List<IteratorSetting> iterators =
+           Collections.singletonList(new IteratorSetting(100, 
EverythingFilter.class));
+       client.tableOperations().compact(table,
+           new CompactionConfig().setIterators(iterators).setWait(true));
+ 
+       // Verify compaction ran.
+       try (var scanner = client.createScanner(table)) {
+         assertEquals(0, scanner.stream().count());
+       }
+ 
+       fixerFuture.get();
+ 
+       // misconfigure the service, test how going from good config to bad 
config works. The test
+       // started with an initial state of bad config.
+       client.instanceOperations().setProperty(CSP + 
"cs1.planner.opts.executors", "]o.o[");
+       try (var writer = client.createBatchWriter(table)) {
+         writer.addMutation(new 
Mutation("0").at().family("f").qualifier("q").put("v"));
+       }
+       client.tableOperations().flush(table, null, null, true);
+       try (var scanner = client.createScanner(table)) {
+         assertEquals("0", scanner.stream().map(e -> 
e.getKey().getRowData().toString())
+             .collect(MoreCollectors.onlyElement()));
+       }
+       fixerFuture = executorService.submit(() -> {
+         try {
+           Thread.sleep(2000);
+           var value =
+               "[{'name':'small', 'type': 'internal', 
'numThreads':1}]".replaceAll("'", "\"");
+           client.instanceOperations().setProperty(CSP + 
"cs1.planner.opts.executors", value);
+         } catch (Exception e) {
+           throw new RuntimeException(e);
+         }
+       });
+ 
+       client.tableOperations().compact(table,
+           new CompactionConfig().setIterators(iterators).setWait(true));
+ 
+       // Verify compaction ran.
+       try (var scanner = client.createScanner(table)) {
+         assertEquals(0, scanner.stream().count());
+       }
+ 
+       fixerFuture.get();
+ 
+     }
+   }
+ 
+   @Test
+   public void testUsingNonExistentService() throws Exception {
+     String table = getUniqueNames(1)[0];
+ 
+     // Create a table that is configured to use a compaction service that 
does not exist
+     try (AccumuloClient client = 
Accumulo.newClient().from(getClientProps()).build()) {
+       NewTableConfiguration ntc = new NewTableConfiguration().setProperties(
+           Map.of(Property.TABLE_COMPACTION_DISPATCHER_OPTS.getKey() + 
"service", "cs5"));
+       client.tableOperations().create(table, ntc);
+ 
+       // Add splits so that the tserver logs can manually be inspected to 
ensure they are not
+       // spammed. Not sure how to check this automatically.
+       var splits = IntStream.range(1, 10).mapToObj(i -> new Text(i + ""))
+           .collect(Collectors.toCollection(TreeSet::new));
+       client.tableOperations().addSplits(table, splits);
+ 
+       try (var writer = client.createBatchWriter(table)) {
+         writer.addMutation(new 
Mutation("0").at().family("f").qualifier("q").put("v"));
+       }
+ 
+       client.tableOperations().flush(table, null, null, true);
+ 
+       try (var scanner = client.createScanner(table)) {
+         assertEquals("0", scanner.stream().map(e -> 
e.getKey().getRowData().toString())
+             .collect(MoreCollectors.onlyElement()));
+       }
+ 
+       // Create a thread to fix the compaction config after a bit.
+       Future<?> fixerFuture = executorService.submit(() -> {
+         try {
+           Thread.sleep(2000);
+ 
+           // Verify the compaction has not run yet, it should not be able to 
with the bad config.
+           try (var scanner = client.createScanner(table)) {
+             assertEquals("0", scanner.stream().map(e -> 
e.getKey().getRowData().toString())
+                 .collect(MoreCollectors.onlyElement()));
+           }
+ 
+           // fix the compaction dispatcher config
+           client.tableOperations().setProperty(table,
+               Property.TABLE_COMPACTION_DISPATCHER_OPTS.getKey() + "service", 
"default");
+         } catch (Exception e) {
+           throw new RuntimeException(e);
+         }
+       });
+ 
+       List<IteratorSetting> iterators =
+           Collections.singletonList(new IteratorSetting(100, 
EverythingFilter.class));
+       client.tableOperations().compact(table,
+           new CompactionConfig().setIterators(iterators).setWait(true));
+ 
+       // Verify compaction ran.
+       try (var scanner = client.createScanner(table)) {
+         assertEquals(0, scanner.stream().count());
+       }
+ 
+       fixerFuture.get();
+ 
+     }
+   }
+ }
diff --cc 
test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java
index af13e5e576,35630437ab..bc763fc5f8
--- 
a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java
@@@ -31,10 -33,13 +31,12 @@@ import static org.apache.accumulo.test.
  import static 
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.row;
  import static 
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.verify;
  import static 
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.writeData;
+ import static org.apache.accumulo.test.util.FileMetadataUtil.countFencedFiles;
+ import static 
org.apache.accumulo.test.util.FileMetadataUtil.splitFilesIntoRanges;
  import static org.junit.jupiter.api.Assertions.assertEquals;
  import static org.junit.jupiter.api.Assertions.assertFalse;
 +import static org.junit.jupiter.api.Assertions.assertThrows;
  import static org.junit.jupiter.api.Assertions.assertTrue;
 -import static org.junit.jupiter.api.Assertions.fail;
  
  import java.io.IOException;
  import java.util.ArrayList;
@@@ -43,14 -48,18 +45,15 @@@ import java.util.EnumSet
  import java.util.List;
  import java.util.Map;
  import java.util.Map.Entry;
+ import java.util.Set;
  import java.util.SortedSet;
  import java.util.TreeSet;
 -import java.util.concurrent.TimeUnit;
  import java.util.stream.Collectors;
 -import java.util.stream.Stream;
  
 -import org.apache.accumulo.compactor.Compactor;
  import org.apache.accumulo.compactor.ExtCEnv.CompactorIterEnv;
 -import org.apache.accumulo.coordinator.CompactionCoordinator;
  import org.apache.accumulo.core.client.Accumulo;
  import org.apache.accumulo.core.client.AccumuloClient;
 +import org.apache.accumulo.core.client.AccumuloException;
  import org.apache.accumulo.core.client.BatchWriter;
  import org.apache.accumulo.core.client.IteratorSetting;
  import org.apache.accumulo.core.client.Scanner;
@@@ -419,6 -454,62 +423,58 @@@ public class ExternalCompaction_1_IT ex
      }
    }
  
+   @Test
+   public void testExternalCompactionWithFencedFiles() throws Exception {
+     String[] names = this.getUniqueNames(2);
+     try (AccumuloClient client =
+         
Accumulo.newClient().from(getCluster().getClientProperties()).build()) {
+ 
+       String table1 = names[0];
+       createTable(client, table1, "cs1");
+ 
+       String table2 = names[1];
+       createTable(client, table2, "cs2");
+ 
+       writeData(client, table1);
+       writeData(client, table2);
+ 
+       // Verify that all data can be seen
+       verify(client, table1, 1, MAX_DATA);
+       verify(client, table2, 1, MAX_DATA);
+ 
+       // Split file in table1 into two files each fenced off by 100 rows for 
a total of 200
+       splitFilesIntoRanges(getCluster().getServerContext(), table1,
+           Set.of(new Range(new Text(row(100)), new Text(row(199))),
+               new Range(new Text(row(300)), new Text(row(399)))));
+       assertEquals(2, countFencedFiles(getCluster().getServerContext(), 
table1));
+ 
+       // Fence file in table2 to 600 rows
+       splitFilesIntoRanges(getCluster().getServerContext(), table2,
+           Set.of(new Range(new Text(row(200)), new Text(row(799)))));
+       assertEquals(1, countFencedFiles(getCluster().getServerContext(), 
table2));
+ 
+       // Verify that a subset of the data is now seen after fencing
+       verify(client, table1, 1, 200);
+       verify(client, table2, 1, 600);
+ 
 -      
getCluster().getClusterControl().startCoordinator(CompactionCoordinator.class);
 -      getCluster().getClusterControl().startCompactors(Compactor.class, 1, 
QUEUE1);
 -      getCluster().getClusterControl().startCompactors(Compactor.class, 1, 
QUEUE2);
 -
+       // Compact and verify previousy fenced data didn't come back
 -      compact(client, table1, 2, QUEUE1, true);
++      compact(client, table1, 2, GROUP1, true);
+       verify(client, table1, 2, 200);
+ 
+       SortedSet<Text> splits = new TreeSet<>();
+       splits.add(new Text(row(MAX_DATA / 2)));
+       client.tableOperations().addSplits(table2, splits);
+ 
+       // Compact and verify previousy fenced data didn't come back
 -      compact(client, table2, 3, QUEUE2, true);
++      compact(client, table2, 3, GROUP2, true);
+       verify(client, table2, 3, 600);
+ 
+       // should be no more fenced files after compaction
+       assertEquals(0, countFencedFiles(getCluster().getServerContext(), 
table1));
+       assertEquals(0, countFencedFiles(getCluster().getServerContext(), 
table2));
+     }
+   }
+ 
    public static class FSelector implements CompactionSelector {
  
      @Override


Reply via email to