This is an automated email from the ASF dual-hosted git repository.

dlmarion pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit 996f94ec0b37ec597042ee688d8075c6f5d0d4b3
Merge: df8a3670a4 a49009c608
Author: Dave Marion <dlmar...@apache.org>
AuthorDate: Thu Aug 29 17:21:44 2024 +0000

    Merge branch '3.1'

 .../core/client/admin/InstanceOperations.java      | 19 +++--
 .../accumulo/core/clientImpl/ClientContext.java    |  9 +++
 .../core/clientImpl/ClientTabletCache.java         |  6 +-
 .../core/clientImpl/InstanceOperationsImpl.java    | 46 ++++++++----
 .../core/clientImpl/TabletServerBatchWriter.java   |  6 +-
 .../core/clientImpl/ZookeeperLockChecker.java      |  5 ++
 .../shell/commands/ListCompactionsCommand.java     |  3 +-
 .../accumulo/test/functional/CompactionIT.java     | 84 ++++++++++++++++++++++
 8 files changed, 151 insertions(+), 27 deletions(-)

diff --cc 
core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java
index 97ecfee777,cc93f79d00..9a0bf932fc
--- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java
+++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java
@@@ -1070,27 -1102,12 +1071,35 @@@ public class ClientContext implements A
      return thriftTransportPool;
    }
  
 +  public MeterRegistry getMeterRegistry() {
 +    ensureOpen();
 +    return micrometer;
 +  }
 +
 +  public void setMeterRegistry(MeterRegistry micrometer) {
 +    ensureOpen();
 +    this.micrometer = micrometer;
 +    getCaches();
 +  }
 +
 +  public synchronized Caches getCaches() {
 +    ensureOpen();
 +    if (caches == null) {
 +      caches = Caches.getInstance();
 +      if (micrometer != null
 +          && 
getConfiguration().getBoolean(Property.GENERAL_MICROMETER_CACHE_METRICS_ENABLED))
 {
 +        caches.registerMetrics(micrometer);
 +      }
 +    }
 +    return caches;
 +  }
 +
+   public synchronized ZookeeperLockChecker getTServerLockChecker() {
+     ensureOpen();
+     if (this.zkLockChecker == null) {
+       this.zkLockChecker = new ZookeeperLockChecker(this);
+     }
+     return this.zkLockChecker;
+   }
+ 
  }
diff --cc 
core/src/main/java/org/apache/accumulo/core/clientImpl/ClientTabletCache.java
index 4c4e7056a3,0000000000..61b96c5cdc
mode 100644,000000..100644
--- 
a/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientTabletCache.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientTabletCache.java
@@@ -1,429 -1,0 +1,429 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   https://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.accumulo.core.clientImpl;
 +
 +import static com.google.common.base.Preconditions.checkArgument;
 +
 +import java.util.ArrayList;
 +import java.util.Collection;
 +import java.util.HashMap;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Objects;
 +import java.util.Optional;
 +import java.util.function.BiConsumer;
 +
 +import org.apache.accumulo.core.client.AccumuloException;
 +import org.apache.accumulo.core.client.AccumuloSecurityException;
 +import org.apache.accumulo.core.client.InvalidTabletHostingRequestException;
 +import org.apache.accumulo.core.client.TableNotFoundException;
 +import org.apache.accumulo.core.client.admin.TabletAvailability;
 +import org.apache.accumulo.core.data.InstanceId;
 +import org.apache.accumulo.core.data.Mutation;
 +import org.apache.accumulo.core.data.Range;
 +import org.apache.accumulo.core.data.TableId;
 +import org.apache.accumulo.core.dataImpl.KeyExtent;
 +import org.apache.accumulo.core.metadata.AccumuloTable;
 +import org.apache.accumulo.core.metadata.MetadataCachedTabletObtainer;
 +import org.apache.accumulo.core.singletons.SingletonManager;
 +import org.apache.accumulo.core.singletons.SingletonService;
 +import org.apache.accumulo.core.util.Interner;
 +import org.apache.accumulo.core.util.Timer;
 +import org.apache.accumulo.core.util.UtilWaitThread;
 +import org.apache.hadoop.io.Text;
 +
 +import com.google.common.base.Preconditions;
 +
 +/**
 + * Client side cache of information about Tablets. Currently, a tablet prev 
end row is cached and
 + * locations are cached if they exist.
 + */
 +public abstract class ClientTabletCache {
 +
 +  /**
 +   * Flipped false on call to {@link #clearInstances}. Checked by client 
classes that locally cache
 +   * Locators.
 +   */
 +  private volatile boolean isValid = true;
 +
 +  boolean isValid() {
 +    return isValid;
 +  }
 +
 +  /**
 +   * Used to indicate if a user of this interface needs a tablet with a 
location. This simple enum
 +   * was created instead of using a boolean for code clarity.
 +   */
 +  public enum LocationNeed {
 +    REQUIRED, NOT_REQUIRED
 +  }
 +
 +  /**
 +   * This method allows linear scans to host tablet ahead of time that they 
may read in the future.
 +   * The goal of this method is to allow tablets to request hosting of tablet 
for a scan before the
 +   * scan actually needs it. Below is an example of how this method could 
work with a scan when
 +   * {@code minimumHostAhead=4} is passed and avoid the scan having to wait 
on tablet hosting.
 +   *
 +   * <ol>
 +   * <li>4*2 tablets are initially hosted (the scan has to wait on this)</li>
 +   * <li>The 1st,2nd,3rd, and 4th tablets are read by the scan</li>
 +   * <li>The request to read the 5th tablets causes a request to host 4 more 
tablets (this would be
 +   * the 9th,10th,11th, and 12th tablets)</li>
 +   * <li>The 5th,6th,7th, and 8th tablet are read by the scan</li>
 +   * <li>While the scan does the read above, the 9th,10th,11th, and 12th 
tablets are actually
 +   * hosted. This happens concurrently with the scan above.</li>
 +   * <li>When the scan goes to read the 9th tablet, hopefully its already 
hosted. Also attempting to
 +   * read the 9th tablet will cause a request to host the 13th,14th,15th, and 
16th tablets.</li>
 +   * </ol>
 +   *
 +   * In the situation above, the goal is that while we are reading 4 hosted 
tablets the 4 following
 +   * tablets are in the process of being hosted.
 +   *
 +   * @param minimumHostAhead Attempts to keep between minimumHostAhead and 
2*minimumHostAhead
 +   *        tablets following the found tablet hosted.
 +   * @param hostAheadRange Only host following tablets that are within this 
range.
 +   */
 +  public abstract CachedTablet findTablet(ClientContext context, Text row, 
boolean skipRow,
 +      LocationNeed locationNeed, int minimumHostAhead, Range hostAheadRange)
 +      throws AccumuloException, AccumuloSecurityException, 
TableNotFoundException,
 +      InvalidTabletHostingRequestException;
 +
 +  /**
 +   * Finds the tablet that contains the given row.
 +   *
 +   * @param locationNeed When {@link LocationNeed#REQUIRED} is passed will 
only return a tablet if
 +   *        it has location. When {@link LocationNeed#NOT_REQUIRED} is passed 
will return the tablet
 +   *        that overlaps the row with or without a location.
 +   *
 +   * @return overlapping tablet. If no overlapping tablet exists, returns 
null. If location is
 +   *         required and the tablet currently has no location ,returns null.
 +   */
 +  public CachedTablet findTablet(ClientContext context, Text row, boolean 
skipRow,
 +      LocationNeed locationNeed) throws AccumuloException, 
AccumuloSecurityException,
 +      TableNotFoundException, InvalidTabletHostingRequestException {
 +    return findTablet(context, row, skipRow, locationNeed, 0, null);
 +  }
 +
 +  public CachedTablet findTabletWithRetry(ClientContext context, Text row, 
boolean skipRow,
 +      LocationNeed locationNeed) throws AccumuloException, 
AccumuloSecurityException,
 +      TableNotFoundException, InvalidTabletHostingRequestException {
 +    var tl = findTablet(context, row, skipRow, locationNeed);
 +    while (tl == null && locationNeed == LocationNeed.REQUIRED) {
 +      UtilWaitThread.sleep(100);
 +      tl = findTablet(context, row, skipRow, locationNeed);
 +    }
 +    return tl;
 +  }
 +
 +  public abstract <T extends Mutation> void binMutations(ClientContext 
context, List<T> mutations,
 +      Map<String,TabletServerMutations<T>> binnedMutations, List<T> failures)
 +      throws AccumuloException, AccumuloSecurityException, 
TableNotFoundException,
 +      InvalidTabletHostingRequestException;
 +
 +  /**
 +   * <p>
 +   * This method finds what tablets overlap a given set of ranges, passing 
each range and its
 +   * associated tablet to the range consumer. If a range overlaps multiple 
tablets then it can be
 +   * passed to the range consumer multiple times.
 +   * </p>
 +   *
 +   * @param locationNeed When {@link LocationNeed#REQUIRED} is passed only 
tablets that have a
 +   *        location are provided to the rangeConsumer, any range that 
overlaps a tablet without a
 +   *        location will be returned as a failure. When {@link 
LocationNeed#NOT_REQUIRED} is
 +   *        passed, ranges that overlap tablets with and without a location 
are provided to the
 +   *        range consumer.
 +   * @param ranges For each range will try to find overlapping contiguous 
tablets that optionally
 +   *        have a location.
 +   * @param rangeConsumer If all of the tablets that a range overlaps are 
found, then the range and
 +   *        tablets will be passed to this consumer one at time. A range will 
either be passed to
 +   *        this consumer one more mor times OR returned as a failuer, but 
never both.
 +   *
 +   * @return The failed ranges that did not have a location (if a location is 
required) or where
 +   *         contiguous tablets could not be found.
 +   */
 +  public abstract List<Range> findTablets(ClientContext context, List<Range> 
ranges,
 +      BiConsumer<CachedTablet,Range> rangeConsumer, LocationNeed locationNeed)
 +      throws AccumuloException, AccumuloSecurityException, 
TableNotFoundException,
 +      InvalidTabletHostingRequestException;
 +
 +  /**
 +   * The behavior of this method is similar to
 +   * {@link #findTablets(ClientContext, List, BiConsumer, LocationNeed)}, 
except it bins ranges to
 +   * the passed in binnedRanges map instead of passing them to a consumer. 
This method only bins to
 +   * hosted tablets with a location.
 +   */
 +  public List<Range> binRanges(ClientContext context, List<Range> ranges,
 +      Map<String,Map<KeyExtent,List<Range>>> binnedRanges) throws 
AccumuloException,
 +      AccumuloSecurityException, TableNotFoundException, 
InvalidTabletHostingRequestException {
 +    return findTablets(context, ranges, ((cachedTablet, range) -> 
ClientTabletCacheImpl
 +        .addRange(binnedRanges, cachedTablet, range)), LocationNeed.REQUIRED);
 +  }
 +
 +  public abstract void invalidateCache(KeyExtent failedExtent);
 +
 +  public abstract void invalidateCache(Collection<KeyExtent> keySet);
 +
 +  /**
 +   * Invalidate entire cache
 +   */
 +  public abstract void invalidateCache();
 +
 +  /**
 +   * Invalidate all metadata entries that point to server
 +   */
 +  public abstract void invalidateCache(ClientContext context, String server);
 +
 +  private static class InstanceKey {
 +    final InstanceId instanceId;
 +    final TableId tableId;
 +
 +    InstanceKey(InstanceId instanceId, TableId table) {
 +      this.instanceId = instanceId;
 +      this.tableId = table;
 +    }
 +
 +    @Override
 +    public int hashCode() {
 +      return instanceId.hashCode() + tableId.hashCode();
 +    }
 +
 +    @Override
 +    public boolean equals(Object o) {
 +      if (o instanceof InstanceKey) {
 +        return equals((InstanceKey) o);
 +      }
 +      return false;
 +    }
 +
 +    public boolean equals(InstanceKey lk) {
 +      return instanceId.equals(lk.instanceId) && tableId.equals(lk.tableId);
 +    }
 +
 +  }
 +
 +  private static final HashMap<InstanceKey,ClientTabletCache> instances = new 
HashMap<>();
 +  private static boolean enabled = true;
 +
 +  public static synchronized void clearInstances() {
 +    for (ClientTabletCache locator : instances.values()) {
 +      locator.isValid = false;
 +    }
 +    instances.clear();
 +  }
 +
 +  static synchronized boolean isEnabled() {
 +    return enabled;
 +  }
 +
 +  static synchronized void disable() {
 +    clearInstances();
 +    enabled = false;
 +  }
 +
 +  static synchronized void enable() {
 +    enabled = true;
 +  }
 +
 +  public long getTabletHostingRequestCount() {
 +    return 0L;
 +  }
 +
 +  public static synchronized ClientTabletCache getInstance(ClientContext 
context, TableId tableId) {
 +    Preconditions.checkState(enabled, "The Accumulo singleton that that 
tracks tablet locations is "
 +        + "disabled. This is likely caused by all AccumuloClients being 
closed or garbage collected");
 +    InstanceKey key = new InstanceKey(context.getInstanceID(), tableId);
 +    ClientTabletCache tl = instances.get(key);
 +    if (tl == null) {
 +      MetadataCachedTabletObtainer mlo = new MetadataCachedTabletObtainer();
 +
 +      if (AccumuloTable.ROOT.tableId().equals(tableId)) {
-         tl = new RootClientTabletCache(new ZookeeperLockChecker(context));
++        tl = new RootClientTabletCache(context.getTServerLockChecker());
 +      } else if (AccumuloTable.METADATA.tableId().equals(tableId)) {
 +        tl = new ClientTabletCacheImpl(AccumuloTable.METADATA.tableId(),
 +            getInstance(context, AccumuloTable.ROOT.tableId()), mlo,
-             new ZookeeperLockChecker(context));
++            context.getTServerLockChecker());
 +      } else {
 +        tl = new ClientTabletCacheImpl(tableId,
 +            getInstance(context, AccumuloTable.METADATA.tableId()), mlo,
-             new ZookeeperLockChecker(context));
++            context.getTServerLockChecker());
 +      }
 +      instances.put(key, tl);
 +    }
 +
 +    return tl;
 +  }
 +
 +  static {
 +    SingletonManager.register(new SingletonService() {
 +
 +      @Override
 +      public boolean isEnabled() {
 +        return ClientTabletCache.isEnabled();
 +      }
 +
 +      @Override
 +      public void enable() {
 +        ClientTabletCache.enable();
 +      }
 +
 +      @Override
 +      public void disable() {
 +        ClientTabletCache.disable();
 +      }
 +    });
 +  }
 +
 +  public static class CachedTablets {
 +
 +    private final List<CachedTablet> cachedTablets;
 +
 +    public CachedTablets(List<CachedTablet> cachedTablets) {
 +      this.cachedTablets = cachedTablets;
 +    }
 +
 +    public List<CachedTablet> getCachedTablets() {
 +      return cachedTablets;
 +    }
 +  }
 +
 +  public static class CachedTablet {
 +    private static final Interner<String> interner = new Interner<>();
 +
 +    private final KeyExtent tablet_extent;
 +    private final String tserverLocation;
 +    private final String tserverSession;
 +    private final TabletAvailability availability;
 +    private final boolean hostingRequested;
 +
 +    private final Timer creationTimer = Timer.startNew();
 +
 +    public CachedTablet(KeyExtent tablet_extent, String tablet_location, 
String session,
 +        TabletAvailability availability, boolean hostingRequested) {
 +      checkArgument(tablet_extent != null, "tablet_extent is null");
 +      checkArgument(tablet_location != null, "tablet_location is null");
 +      checkArgument(session != null, "session is null");
 +      this.tablet_extent = tablet_extent;
 +      this.tserverLocation = interner.intern(tablet_location);
 +      this.tserverSession = interner.intern(session);
 +      this.availability = Objects.requireNonNull(availability);
 +      this.hostingRequested = hostingRequested;
 +    }
 +
 +    public CachedTablet(KeyExtent tablet_extent, Optional<String> 
tablet_location,
 +        Optional<String> session, TabletAvailability availability, boolean 
hostingRequested) {
 +      checkArgument(tablet_extent != null, "tablet_extent is null");
 +      this.tablet_extent = tablet_extent;
 +      this.tserverLocation = 
tablet_location.map(interner::intern).orElse(null);
 +      this.tserverSession = session.map(interner::intern).orElse(null);
 +      this.availability = Objects.requireNonNull(availability);
 +      this.hostingRequested = hostingRequested;
 +    }
 +
 +    public CachedTablet(KeyExtent tablet_extent, TabletAvailability 
availability,
 +        boolean hostingRequested) {
 +      checkArgument(tablet_extent != null, "tablet_extent is null");
 +      this.tablet_extent = tablet_extent;
 +      this.tserverLocation = null;
 +      this.tserverSession = null;
 +      this.availability = Objects.requireNonNull(availability);
 +      this.hostingRequested = hostingRequested;
 +    }
 +
 +    @Override
 +    public boolean equals(Object o) {
 +      if (o instanceof CachedTablet) {
 +        CachedTablet otl = (CachedTablet) o;
 +        return getExtent().equals(otl.getExtent())
 +            && getTserverLocation().equals(otl.getTserverLocation())
 +            && getTserverSession().equals(otl.getTserverSession())
 +            && getAvailability() == otl.getAvailability()
 +            && hostingRequested == otl.hostingRequested;
 +      }
 +      return false;
 +    }
 +
 +    @Override
 +    public int hashCode() {
 +      return Objects.hash(getExtent(), tserverLocation, tserverSession, 
availability,
 +          hostingRequested);
 +    }
 +
 +    @Override
 +    public String toString() {
 +      return "(" + getExtent() + "," + getTserverLocation() + "," + 
getTserverSession() + ","
 +          + getAvailability() + ")";
 +    }
 +
 +    public KeyExtent getExtent() {
 +      return tablet_extent;
 +    }
 +
 +    public Optional<String> getTserverLocation() {
 +      return Optional.ofNullable(tserverLocation);
 +    }
 +
 +    public Optional<String> getTserverSession() {
 +      return Optional.ofNullable(tserverSession);
 +    }
 +
 +    /**
 +     * The ClientTabletCache will remove and replace a CachedTablet when the 
location is no longer
 +     * valid. However, it will not do the same when the availability is no 
longer valid. The
 +     * availability returned by this method may be out of date. If this 
information is needed to be
 +     * fresh, then you may want to consider clearing the cache first.
 +     */
 +    public TabletAvailability getAvailability() {
 +      return this.availability;
 +    }
 +
 +    /**
 +     * @return a timer that was started when this object was created
 +     */
 +    public Timer getCreationTimer() {
 +      return creationTimer;
 +    }
 +
 +    public boolean wasHostingRequested() {
 +      return hostingRequested;
 +    }
 +  }
 +
 +  public static class TabletServerMutations<T extends Mutation> {
 +    private final Map<KeyExtent,List<T>> mutations;
 +    private final String tserverSession;
 +
 +    public TabletServerMutations(String tserverSession) {
 +      this.tserverSession = tserverSession;
 +      this.mutations = new HashMap<>();
 +    }
 +
 +    public void addMutation(KeyExtent ke, T m) {
 +      List<T> mutList = mutations.computeIfAbsent(ke, k -> new ArrayList<>());
 +      mutList.add(m);
 +    }
 +
 +    public Map<KeyExtent,List<T>> getMutations() {
 +      return mutations;
 +    }
 +
 +    final String getSession() {
 +      return tserverSession;
 +    }
 +  }
 +}
diff --cc 
test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java
index d9f77297c7,bdf04d0805..3f9d151a8c
--- a/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java
@@@ -24,7 -23,9 +24,8 @@@ import static java.util.stream.Collecto
  import static org.junit.jupiter.api.Assertions.assertEquals;
  import static org.junit.jupiter.api.Assertions.assertFalse;
  import static org.junit.jupiter.api.Assertions.assertNotNull;
 -import static org.junit.jupiter.api.Assertions.assertThrows;
  import static org.junit.jupiter.api.Assertions.assertTrue;
+ import static org.junit.jupiter.api.Assertions.fail;
  
  import java.io.IOException;
  import java.nio.file.FileVisitResult;
@@@ -41,17 -42,14 +42,19 @@@ import java.util.HashSet
  import java.util.List;
  import java.util.Map;
  import java.util.Map.Entry;
 -import java.util.NoSuchElementException;
 -import java.util.Objects;
 +import java.util.Optional;
  import java.util.Set;
 +import java.util.SortedSet;
 +import java.util.TreeSet;
+ import java.util.concurrent.CountDownLatch;
 +import java.util.concurrent.ExecutionException;
  import java.util.concurrent.ExecutorService;
  import java.util.concurrent.Executors;
 +import java.util.concurrent.Future;
  import java.util.concurrent.atomic.AtomicBoolean;
+ import java.util.concurrent.atomic.AtomicReference;
 +import java.util.function.Predicate;
 +import java.util.stream.IntStream;
  
  import org.apache.accumulo.core.client.Accumulo;
  import org.apache.accumulo.core.client.AccumuloClient;
@@@ -890,231 -959,83 +897,308 @@@ public class CompactionIT extends Compa
      }
    }
  
 +  @Test
 +  public void testCancelUserCompactionTimeoutExceeded() throws Exception {
 +    testCancelUserCompactionTimeout(true);
 +  }
 +
 +  @Test
 +  public void testCancelUserCompactionTimeoutNotExceeded() throws Exception {
 +    testCancelUserCompactionTimeout(false);
 +  }
 +
 +  private void testCancelUserCompactionTimeout(boolean timeout) throws 
Exception {
 +
 +    var uniqueNames = getUniqueNames(2);
 +    String table1 = uniqueNames[0];
 +    String table2 = uniqueNames[1];
 +
 +    try (final AccumuloClient client = 
Accumulo.newClient().from(getClientProps()).build()) {
 +
 +      // create a compaction service that uses a Planner that will schedule 
system jobs
 +      // at a higher priority than user jobs
 +      client.instanceOperations().setProperty(
 +          Property.COMPACTION_SERVICE_PREFIX.getKey() + "testcancel.planner",
 +          TestPlanner.class.getName());
 +      client.instanceOperations().setProperty(
 +          Property.COMPACTION_SERVICE_PREFIX.getKey() + 
"testcancel.planner.opts.groups",
 +          ("[{'group':'" + COMPACTOR_GROUP_1 + "'}]").replaceAll("'", "\""));
 +
 +      // create two tables that uses the compaction service
 +      Map<String,String> props = new HashMap<>();
 +      props.put(Property.TABLE_COMPACTION_DISPATCHER.getKey(),
 +          SimpleCompactionDispatcher.class.getName());
 +      props.put(Property.TABLE_COMPACTION_DISPATCHER_OPTS.getKey() + 
"service", "testcancel");
 +      // Disable system compactions to start for these tables
 +      props.put(Property.TABLE_MAJC_RATIO.getKey(), "20");
 +
 +      // configure tablet compaction iterator that slows compaction down
 +      var ntc = new NewTableConfiguration();
 +      IteratorSetting iterSetting = new IteratorSetting(50, 
SlowIterator.class);
 +      SlowIterator.setSleepTime(iterSetting, 5);
 +      ntc.attachIterator(iterSetting, EnumSet.of(IteratorScope.majc));
 +      ntc.setProperties(props);
 +
 +      // Create two tables and write some data
 +      client.tableOperations().create(table1, ntc);
 +      client.tableOperations().create(table2, ntc);
 +      writeRows((ClientContext) client, table1, MAX_DATA, true);
 +      writeRows((ClientContext) client, table2, MAX_DATA, true);
 +
 +      var ctx = getCluster().getServerContext();
 +      Optional<HostAndPort> coordinatorHost = 
ExternalCompactionUtil.findCompactionCoordinator(ctx);
 +      if (coordinatorHost.isEmpty()) {
 +        throw new TTransportException("Unable to get CompactionCoordinator 
address from ZooKeeper");
 +      }
 +
 +      // Start a compaction for table2, this is done so that the compactor 
will be busy
 +      // and new jobs will queue up and wait
 +      client.tableOperations().compact(table2, new 
CompactionConfig().setWait(false));
 +
 +      var tableId = 
TableId.of(client.tableOperations().tableIdMap().get(table1));
 +      var extent = new KeyExtent(tableId, null, null);
 +
 +      // If timeout is true then set a short timeout so the system job can 
cancel the user job
 +      // Otherwise the long timeout should prevent the system from clearing 
the selected files
 +      var expiration = timeout ? "100ms" : "100s";
 +      client.tableOperations().setProperty(table1,
 +          Property.TABLE_COMPACTION_SELECTION_EXPIRATION.getKey(), 
expiration);
 +
 +      // Submit a user job for table1 that will be put on the queue and 
waiting
 +      // for the current job to finish
 +      client.tableOperations().compact(table1, new 
CompactionConfig().setWait(false));
 +      // Wait for the fate operation to write selectedFiles
 +      Wait.waitFor(() -> {
 +        var tabletMeta = ((ClientContext) 
client).getAmple().readTablet(extent);
 +        var selectedFiles = tabletMeta.getSelectedFiles();
 +        if (selectedFiles != null) {
 +          return !selectedFiles.getFiles().isEmpty();
 +        }
 +        return false;
 +      }, Wait.MAX_WAIT_MILLIS, 10);
 +
 +      // Change the ratio so a system compaction will attempt to be scheduled 
for table 1
 +      client.tableOperations().setProperty(table1, 
Property.TABLE_MAJC_RATIO.getKey(), "1");
 +
 +      if (timeout) {
 +        // Because of the custom planner, the system compaction should now 
take priority
 +        // System compactions were previously not eligible to run if 
selectedFiles existed
 +        // for a user compaction already (and they overlapped). But now 
system compaction jobs
 +        // are eligible to run if the user compaction has not started or 
completed any jobs
 +        // and the expiration period has been exceeded.
 +        // When this happens the system compaction will delete the 
selectedFiles column
 +        Wait.waitFor(() -> {
 +          var tabletMeta = ((ClientContext) 
client).getAmple().readTablet(extent);
 +          return tabletMeta.getSelectedFiles() == null;
 +        }, Wait.MAX_WAIT_MILLIS, 100);
 +
 +        // Wait for the system compaction to be running
 +        Wait.waitFor(() -> {
 +          var tabletMeta = ((ClientContext) 
client).getAmple().readTablet(extent);
 +          var externalCompactions = tabletMeta.getExternalCompactions();
 +          assertTrue(externalCompactions.values().stream()
 +              .allMatch(ec -> ec.getKind() == CompactionKind.SYSTEM));
 +          return externalCompactions.size() == 1;
 +        }, Wait.MAX_WAIT_MILLIS, 10);
 +
 +        // Wait for the user compaction to now run after the system finishes
 +        Wait.waitFor(() -> {
 +          var tabletMeta = ((ClientContext) 
client).getAmple().readTablet(extent);
 +          var externalCompactions = tabletMeta.getExternalCompactions();
 +          var running = externalCompactions.values().stream()
 +              .filter(ec -> ec.getKind() == CompactionKind.USER).count();
 +          return running == 1;
 +        }, Wait.MAX_WAIT_MILLIS, 100);
 +      } else {
 +        // Wait for the user compaction to run, there should no system 
compactions scheduled
 +        // even though system has the higher priority in the test because the 
timeout was
 +        // not exceeded
 +        Wait.waitFor(() -> {
 +          var tabletMeta = ((ClientContext) 
client).getAmple().readTablet(extent);
 +          var externalCompactions = tabletMeta.getExternalCompactions();
 +          assertTrue(externalCompactions.values().stream()
 +              .allMatch(ec -> ec.getKind() == CompactionKind.USER));
 +          return externalCompactions.size() == 1;
 +        }, Wait.MAX_WAIT_MILLIS, 10);
 +      }
 +
 +      // Wait and verify all compactions finish
 +      Wait.waitFor(() -> {
 +        var tabletMeta = ((ClientContext) 
client).getAmple().readTablet(extent);
 +        var externalCompactions = tabletMeta.getExternalCompactions().size();
 +        log.debug("Waiting for compactions to finish, count {}", 
externalCompactions);
 +        return externalCompactions == 0 && tabletMeta.getCompacted().isEmpty()
 +            && tabletMeta.getSelectedFiles() == null;
 +      }, Wait.MAX_WAIT_MILLIS, 100);
 +    }
 +
 +    
ExternalCompactionTestUtils.assertNoCompactionMetadata(getServerContext(), 
table1);
 +  }
 +
 +  @Test
 +  public void testOfflineAndCompactions() throws Exception {
 +    var uniqueNames = getUniqueNames(1);
 +    String table = uniqueNames[0];
 +
 +    // This test exercises concurrent compactions and table offline.
 +
 +    try (final AccumuloClient client = 
Accumulo.newClient().from(getClientProps()).build()) {
 +
 +      SortedSet<Text> splits = new TreeSet<>();
 +      for (int i = 1; i < 32; i++) {
 +        splits.add(new Text(String.format("r:%04d", i)));
 +      }
 +
 +      client.tableOperations().create(table, new 
NewTableConfiguration().withSplits(splits));
 +      writeRows(client, table, 33, true);
 +      // create two files per tablet
 +      writeRows(client, table, 33, true);
 +
 +      var ctx = getCluster().getServerContext();
 +      var tableId = ctx.getTableId(table);
 +
 +      // verify assumptions of test, expect all tablets to have files
 +      var files0 = getFiles(ctx, tableId);
 +      assertEquals(32, files0.size());
 +      assertFalse(files0.values().stream().anyMatch(Set::isEmpty));
 +
 +      // lower the tables compaction ratio to cause system compactions
 +      client.tableOperations().setProperty(table, 
Property.TABLE_MAJC_RATIO.getKey(), "1");
 +
 +      // start a bunch of compactions in the background
 +      var executor = Executors.newCachedThreadPool();
 +      List<Future<?>> futures = new ArrayList<>();
 +      // start user compactions on a subset of the tables tablets, system 
compactions should attempt
 +      // to run on all tablets. With concurrency should get a mix.
 +      for (int i = 1; i < 20; i++) {
 +        var startRow = new Text(String.format("r:%04d", i - 1));
 +        var endRow = new Text(String.format("r:%04d", i));
 +        futures.add(executor.submit(() -> {
 +          CompactionConfig config = new CompactionConfig();
 +          config.setWait(true);
 +          config.setStartRow(startRow);
 +          config.setEndRow(endRow);
 +          client.tableOperations().compact(table, config);
 +          return null;
 +        }));
 +      }
 +
 +      log.debug("Waiting for offline");
 +      // take tablet offline while there are concurrent compactions
 +      client.tableOperations().offline(table, true);
 +
 +      // grab a snapshot of all the tablets files after waiting for offline, 
do not expect any
 +      // tablets files to change at this point
 +      var files1 = getFiles(ctx, tableId);
 +
 +      // wait for the background compactions
 +      log.debug("Waiting for futures");
 +      for (var future : futures) {
 +        try {
 +          future.get();
 +        } catch (ExecutionException ee) {
 +          // its ok if some of the compactions fail because the table was 
concurrently taken offline
 +          assertTrue(ee.getMessage().contains("is offline"));
 +        }
 +      }
 +
 +      // grab a second snapshot of the tablets files after all the background 
operations completed
 +      var files2 = getFiles(ctx, tableId);
 +
 +      // do not expect the files to have changed after the offline operation 
returned.
 +      assertEquals(files1, files2);
 +
 +      executor.shutdown();
 +    }
 +  }
 +
 +  private Map<KeyExtent,Set<StoredTabletFile>> getFiles(ServerContext ctx, 
TableId tableId) {
 +    Map<KeyExtent,Set<StoredTabletFile>> files = new HashMap<>();
 +    try (var tablets = 
ctx.getAmple().readTablets().forTable(tableId).build()) {
 +      for (var tablet : tablets) {
 +        files.put(tablet.getExtent(), tablet.getFiles());
 +      }
 +    }
 +    return files;
 +  }
 +
+   @Test
+   public void testGetActiveCompactions() throws Exception {
+     final String table1 = this.getUniqueNames(1)[0];
+     try (AccumuloClient client = 
Accumulo.newClient().from(getClientProps()).build()) {
+       client.tableOperations().create(table1);
+       try (BatchWriter bw = client.createBatchWriter(table1)) {
+         for (int i = 1; i <= MAX_DATA; i++) {
+           Mutation m = new Mutation(Integer.toString(i));
+           m.put("cf", "cq", new Value());
+           bw.addMutation(m);
+           bw.flush();
+           // flush often to create multiple files to compact
+           client.tableOperations().flush(table1, null, null, true);
+         }
+       }
+ 
+       final AtomicReference<Exception> error = new AtomicReference<>();
+       final CountDownLatch started = new CountDownLatch(1);
+       Thread t = new Thread(() -> {
+         try {
+           IteratorSetting setting = new IteratorSetting(50, "sleepy", 
SlowIterator.class);
+           setting.addOption("sleepTime", "3000");
+           setting.addOption("seekSleepTime", "3000");
+           client.tableOperations().attachIterator(table1, setting, 
EnumSet.of(IteratorScope.majc));
+           started.countDown();
+           client.tableOperations().compact(table1, new 
CompactionConfig().setWait(true));
+         } catch (AccumuloSecurityException | TableNotFoundException | 
AccumuloException e) {
+           error.set(e);
+         }
+       });
+       t.start();
+ 
+       started.await();
+ 
+       List<ActiveCompaction> compactions = new ArrayList<>();
+       do {
+         client.instanceOperations().getActiveCompactions().forEach((ac) -> {
+           try {
+             if (ac.getTable().equals(table1)) {
+               compactions.add(ac);
+             }
+           } catch (TableNotFoundException e1) {
+             fail("Table was deleted during test, should not happen");
+           }
+         });
+         Thread.sleep(1000);
+       } while (compactions.isEmpty());
+ 
+       ActiveCompaction running1 = compactions.get(0);
+       CompactionHost host = running1.getHost();
 -      assertTrue(host.getType() == CompactionHost.Type.TSERVER);
++      assertTrue(host.getType() == CompactionHost.Type.COMPACTOR);
+ 
+       compactions.clear();
+       do {
+         HostAndPort hp = HostAndPort.fromParts(host.getAddress(), 
host.getPort());
+         
client.instanceOperations().getActiveCompactions(hp.toString()).forEach((ac) -> 
{
+           try {
+             if (ac.getTable().equals(table1)) {
+               compactions.add(ac);
+             }
+           } catch (TableNotFoundException e1) {
+             fail("Table was deleted during test, should not happen");
+           }
+         });
+         Thread.sleep(1000);
+       } while (compactions.isEmpty());
+ 
+       ActiveCompaction running2 = compactions.get(0);
+       assertEquals(running1.getInputFiles(), running2.getInputFiles());
+       assertEquals(running1.getOutputFile(), running2.getOutputFile());
+       assertEquals(running1.getTablet(), running2.getTablet());
+ 
+       client.tableOperations().cancelCompaction(table1);
+       t.join();
+     }
+   }
+ 
    /**
     * Counts the number of tablets and files in a table.
     */


Reply via email to