This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit 996f94ec0b37ec597042ee688d8075c6f5d0d4b3 Merge: df8a3670a4 a49009c608 Author: Dave Marion <dlmar...@apache.org> AuthorDate: Thu Aug 29 17:21:44 2024 +0000 Merge branch '3.1' .../core/client/admin/InstanceOperations.java | 19 +++-- .../accumulo/core/clientImpl/ClientContext.java | 9 +++ .../core/clientImpl/ClientTabletCache.java | 6 +- .../core/clientImpl/InstanceOperationsImpl.java | 46 ++++++++---- .../core/clientImpl/TabletServerBatchWriter.java | 6 +- .../core/clientImpl/ZookeeperLockChecker.java | 5 ++ .../shell/commands/ListCompactionsCommand.java | 3 +- .../accumulo/test/functional/CompactionIT.java | 84 ++++++++++++++++++++++ 8 files changed, 151 insertions(+), 27 deletions(-) diff --cc core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java index 97ecfee777,cc93f79d00..9a0bf932fc --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java @@@ -1070,27 -1102,12 +1071,35 @@@ public class ClientContext implements A return thriftTransportPool; } + public MeterRegistry getMeterRegistry() { + ensureOpen(); + return micrometer; + } + + public void setMeterRegistry(MeterRegistry micrometer) { + ensureOpen(); + this.micrometer = micrometer; + getCaches(); + } + + public synchronized Caches getCaches() { + ensureOpen(); + if (caches == null) { + caches = Caches.getInstance(); + if (micrometer != null + && getConfiguration().getBoolean(Property.GENERAL_MICROMETER_CACHE_METRICS_ENABLED)) { + caches.registerMetrics(micrometer); + } + } + return caches; + } + + public synchronized ZookeeperLockChecker getTServerLockChecker() { + ensureOpen(); + if (this.zkLockChecker == null) { + this.zkLockChecker = new ZookeeperLockChecker(this); + } + return this.zkLockChecker; + } + } diff --cc core/src/main/java/org/apache/accumulo/core/clientImpl/ClientTabletCache.java index 4c4e7056a3,0000000000..61b96c5cdc mode 100644,000000..100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientTabletCache.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientTabletCache.java @@@ -1,429 -1,0 +1,429 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.core.clientImpl; + +import static com.google.common.base.Preconditions.checkArgument; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.function.BiConsumer; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.InvalidTabletHostingRequestException; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.client.admin.TabletAvailability; +import org.apache.accumulo.core.data.InstanceId; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.metadata.AccumuloTable; +import org.apache.accumulo.core.metadata.MetadataCachedTabletObtainer; +import org.apache.accumulo.core.singletons.SingletonManager; +import org.apache.accumulo.core.singletons.SingletonService; +import org.apache.accumulo.core.util.Interner; +import org.apache.accumulo.core.util.Timer; +import org.apache.accumulo.core.util.UtilWaitThread; +import org.apache.hadoop.io.Text; + +import com.google.common.base.Preconditions; + +/** + * Client side cache of information about Tablets. Currently, a tablet prev end row is cached and + * locations are cached if they exist. + */ +public abstract class ClientTabletCache { + + /** + * Flipped false on call to {@link #clearInstances}. Checked by client classes that locally cache + * Locators. + */ + private volatile boolean isValid = true; + + boolean isValid() { + return isValid; + } + + /** + * Used to indicate if a user of this interface needs a tablet with a location. This simple enum + * was created instead of using a boolean for code clarity. + */ + public enum LocationNeed { + REQUIRED, NOT_REQUIRED + } + + /** + * This method allows linear scans to host tablet ahead of time that they may read in the future. + * The goal of this method is to allow tablets to request hosting of tablet for a scan before the + * scan actually needs it. Below is an example of how this method could work with a scan when + * {@code minimumHostAhead=4} is passed and avoid the scan having to wait on tablet hosting. + * + * <ol> + * <li>4*2 tablets are initially hosted (the scan has to wait on this)</li> + * <li>The 1st,2nd,3rd, and 4th tablets are read by the scan</li> + * <li>The request to read the 5th tablets causes a request to host 4 more tablets (this would be + * the 9th,10th,11th, and 12th tablets)</li> + * <li>The 5th,6th,7th, and 8th tablet are read by the scan</li> + * <li>While the scan does the read above, the 9th,10th,11th, and 12th tablets are actually + * hosted. This happens concurrently with the scan above.</li> + * <li>When the scan goes to read the 9th tablet, hopefully its already hosted. Also attempting to + * read the 9th tablet will cause a request to host the 13th,14th,15th, and 16th tablets.</li> + * </ol> + * + * In the situation above, the goal is that while we are reading 4 hosted tablets the 4 following + * tablets are in the process of being hosted. + * + * @param minimumHostAhead Attempts to keep between minimumHostAhead and 2*minimumHostAhead + * tablets following the found tablet hosted. + * @param hostAheadRange Only host following tablets that are within this range. + */ + public abstract CachedTablet findTablet(ClientContext context, Text row, boolean skipRow, + LocationNeed locationNeed, int minimumHostAhead, Range hostAheadRange) + throws AccumuloException, AccumuloSecurityException, TableNotFoundException, + InvalidTabletHostingRequestException; + + /** + * Finds the tablet that contains the given row. + * + * @param locationNeed When {@link LocationNeed#REQUIRED} is passed will only return a tablet if + * it has location. When {@link LocationNeed#NOT_REQUIRED} is passed will return the tablet + * that overlaps the row with or without a location. + * + * @return overlapping tablet. If no overlapping tablet exists, returns null. If location is + * required and the tablet currently has no location ,returns null. + */ + public CachedTablet findTablet(ClientContext context, Text row, boolean skipRow, + LocationNeed locationNeed) throws AccumuloException, AccumuloSecurityException, + TableNotFoundException, InvalidTabletHostingRequestException { + return findTablet(context, row, skipRow, locationNeed, 0, null); + } + + public CachedTablet findTabletWithRetry(ClientContext context, Text row, boolean skipRow, + LocationNeed locationNeed) throws AccumuloException, AccumuloSecurityException, + TableNotFoundException, InvalidTabletHostingRequestException { + var tl = findTablet(context, row, skipRow, locationNeed); + while (tl == null && locationNeed == LocationNeed.REQUIRED) { + UtilWaitThread.sleep(100); + tl = findTablet(context, row, skipRow, locationNeed); + } + return tl; + } + + public abstract <T extends Mutation> void binMutations(ClientContext context, List<T> mutations, + Map<String,TabletServerMutations<T>> binnedMutations, List<T> failures) + throws AccumuloException, AccumuloSecurityException, TableNotFoundException, + InvalidTabletHostingRequestException; + + /** + * <p> + * This method finds what tablets overlap a given set of ranges, passing each range and its + * associated tablet to the range consumer. If a range overlaps multiple tablets then it can be + * passed to the range consumer multiple times. + * </p> + * + * @param locationNeed When {@link LocationNeed#REQUIRED} is passed only tablets that have a + * location are provided to the rangeConsumer, any range that overlaps a tablet without a + * location will be returned as a failure. When {@link LocationNeed#NOT_REQUIRED} is + * passed, ranges that overlap tablets with and without a location are provided to the + * range consumer. + * @param ranges For each range will try to find overlapping contiguous tablets that optionally + * have a location. + * @param rangeConsumer If all of the tablets that a range overlaps are found, then the range and + * tablets will be passed to this consumer one at time. A range will either be passed to + * this consumer one more mor times OR returned as a failuer, but never both. + * + * @return The failed ranges that did not have a location (if a location is required) or where + * contiguous tablets could not be found. + */ + public abstract List<Range> findTablets(ClientContext context, List<Range> ranges, + BiConsumer<CachedTablet,Range> rangeConsumer, LocationNeed locationNeed) + throws AccumuloException, AccumuloSecurityException, TableNotFoundException, + InvalidTabletHostingRequestException; + + /** + * The behavior of this method is similar to + * {@link #findTablets(ClientContext, List, BiConsumer, LocationNeed)}, except it bins ranges to + * the passed in binnedRanges map instead of passing them to a consumer. This method only bins to + * hosted tablets with a location. + */ + public List<Range> binRanges(ClientContext context, List<Range> ranges, + Map<String,Map<KeyExtent,List<Range>>> binnedRanges) throws AccumuloException, + AccumuloSecurityException, TableNotFoundException, InvalidTabletHostingRequestException { + return findTablets(context, ranges, ((cachedTablet, range) -> ClientTabletCacheImpl + .addRange(binnedRanges, cachedTablet, range)), LocationNeed.REQUIRED); + } + + public abstract void invalidateCache(KeyExtent failedExtent); + + public abstract void invalidateCache(Collection<KeyExtent> keySet); + + /** + * Invalidate entire cache + */ + public abstract void invalidateCache(); + + /** + * Invalidate all metadata entries that point to server + */ + public abstract void invalidateCache(ClientContext context, String server); + + private static class InstanceKey { + final InstanceId instanceId; + final TableId tableId; + + InstanceKey(InstanceId instanceId, TableId table) { + this.instanceId = instanceId; + this.tableId = table; + } + + @Override + public int hashCode() { + return instanceId.hashCode() + tableId.hashCode(); + } + + @Override + public boolean equals(Object o) { + if (o instanceof InstanceKey) { + return equals((InstanceKey) o); + } + return false; + } + + public boolean equals(InstanceKey lk) { + return instanceId.equals(lk.instanceId) && tableId.equals(lk.tableId); + } + + } + + private static final HashMap<InstanceKey,ClientTabletCache> instances = new HashMap<>(); + private static boolean enabled = true; + + public static synchronized void clearInstances() { + for (ClientTabletCache locator : instances.values()) { + locator.isValid = false; + } + instances.clear(); + } + + static synchronized boolean isEnabled() { + return enabled; + } + + static synchronized void disable() { + clearInstances(); + enabled = false; + } + + static synchronized void enable() { + enabled = true; + } + + public long getTabletHostingRequestCount() { + return 0L; + } + + public static synchronized ClientTabletCache getInstance(ClientContext context, TableId tableId) { + Preconditions.checkState(enabled, "The Accumulo singleton that that tracks tablet locations is " + + "disabled. This is likely caused by all AccumuloClients being closed or garbage collected"); + InstanceKey key = new InstanceKey(context.getInstanceID(), tableId); + ClientTabletCache tl = instances.get(key); + if (tl == null) { + MetadataCachedTabletObtainer mlo = new MetadataCachedTabletObtainer(); + + if (AccumuloTable.ROOT.tableId().equals(tableId)) { - tl = new RootClientTabletCache(new ZookeeperLockChecker(context)); ++ tl = new RootClientTabletCache(context.getTServerLockChecker()); + } else if (AccumuloTable.METADATA.tableId().equals(tableId)) { + tl = new ClientTabletCacheImpl(AccumuloTable.METADATA.tableId(), + getInstance(context, AccumuloTable.ROOT.tableId()), mlo, - new ZookeeperLockChecker(context)); ++ context.getTServerLockChecker()); + } else { + tl = new ClientTabletCacheImpl(tableId, + getInstance(context, AccumuloTable.METADATA.tableId()), mlo, - new ZookeeperLockChecker(context)); ++ context.getTServerLockChecker()); + } + instances.put(key, tl); + } + + return tl; + } + + static { + SingletonManager.register(new SingletonService() { + + @Override + public boolean isEnabled() { + return ClientTabletCache.isEnabled(); + } + + @Override + public void enable() { + ClientTabletCache.enable(); + } + + @Override + public void disable() { + ClientTabletCache.disable(); + } + }); + } + + public static class CachedTablets { + + private final List<CachedTablet> cachedTablets; + + public CachedTablets(List<CachedTablet> cachedTablets) { + this.cachedTablets = cachedTablets; + } + + public List<CachedTablet> getCachedTablets() { + return cachedTablets; + } + } + + public static class CachedTablet { + private static final Interner<String> interner = new Interner<>(); + + private final KeyExtent tablet_extent; + private final String tserverLocation; + private final String tserverSession; + private final TabletAvailability availability; + private final boolean hostingRequested; + + private final Timer creationTimer = Timer.startNew(); + + public CachedTablet(KeyExtent tablet_extent, String tablet_location, String session, + TabletAvailability availability, boolean hostingRequested) { + checkArgument(tablet_extent != null, "tablet_extent is null"); + checkArgument(tablet_location != null, "tablet_location is null"); + checkArgument(session != null, "session is null"); + this.tablet_extent = tablet_extent; + this.tserverLocation = interner.intern(tablet_location); + this.tserverSession = interner.intern(session); + this.availability = Objects.requireNonNull(availability); + this.hostingRequested = hostingRequested; + } + + public CachedTablet(KeyExtent tablet_extent, Optional<String> tablet_location, + Optional<String> session, TabletAvailability availability, boolean hostingRequested) { + checkArgument(tablet_extent != null, "tablet_extent is null"); + this.tablet_extent = tablet_extent; + this.tserverLocation = tablet_location.map(interner::intern).orElse(null); + this.tserverSession = session.map(interner::intern).orElse(null); + this.availability = Objects.requireNonNull(availability); + this.hostingRequested = hostingRequested; + } + + public CachedTablet(KeyExtent tablet_extent, TabletAvailability availability, + boolean hostingRequested) { + checkArgument(tablet_extent != null, "tablet_extent is null"); + this.tablet_extent = tablet_extent; + this.tserverLocation = null; + this.tserverSession = null; + this.availability = Objects.requireNonNull(availability); + this.hostingRequested = hostingRequested; + } + + @Override + public boolean equals(Object o) { + if (o instanceof CachedTablet) { + CachedTablet otl = (CachedTablet) o; + return getExtent().equals(otl.getExtent()) + && getTserverLocation().equals(otl.getTserverLocation()) + && getTserverSession().equals(otl.getTserverSession()) + && getAvailability() == otl.getAvailability() + && hostingRequested == otl.hostingRequested; + } + return false; + } + + @Override + public int hashCode() { + return Objects.hash(getExtent(), tserverLocation, tserverSession, availability, + hostingRequested); + } + + @Override + public String toString() { + return "(" + getExtent() + "," + getTserverLocation() + "," + getTserverSession() + "," + + getAvailability() + ")"; + } + + public KeyExtent getExtent() { + return tablet_extent; + } + + public Optional<String> getTserverLocation() { + return Optional.ofNullable(tserverLocation); + } + + public Optional<String> getTserverSession() { + return Optional.ofNullable(tserverSession); + } + + /** + * The ClientTabletCache will remove and replace a CachedTablet when the location is no longer + * valid. However, it will not do the same when the availability is no longer valid. The + * availability returned by this method may be out of date. If this information is needed to be + * fresh, then you may want to consider clearing the cache first. + */ + public TabletAvailability getAvailability() { + return this.availability; + } + + /** + * @return a timer that was started when this object was created + */ + public Timer getCreationTimer() { + return creationTimer; + } + + public boolean wasHostingRequested() { + return hostingRequested; + } + } + + public static class TabletServerMutations<T extends Mutation> { + private final Map<KeyExtent,List<T>> mutations; + private final String tserverSession; + + public TabletServerMutations(String tserverSession) { + this.tserverSession = tserverSession; + this.mutations = new HashMap<>(); + } + + public void addMutation(KeyExtent ke, T m) { + List<T> mutList = mutations.computeIfAbsent(ke, k -> new ArrayList<>()); + mutList.add(m); + } + + public Map<KeyExtent,List<T>> getMutations() { + return mutations; + } + + final String getSession() { + return tserverSession; + } + } +} diff --cc test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java index d9f77297c7,bdf04d0805..3f9d151a8c --- a/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java @@@ -24,7 -23,9 +24,8 @@@ import static java.util.stream.Collecto import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; + import static org.junit.jupiter.api.Assertions.fail; import java.io.IOException; import java.nio.file.FileVisitResult; @@@ -41,17 -42,14 +42,19 @@@ import java.util.HashSet import java.util.List; import java.util.Map; import java.util.Map.Entry; -import java.util.NoSuchElementException; -import java.util.Objects; +import java.util.Optional; import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; + import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicBoolean; + import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Predicate; +import java.util.stream.IntStream; import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.client.AccumuloClient; @@@ -890,231 -959,83 +897,308 @@@ public class CompactionIT extends Compa } } + @Test + public void testCancelUserCompactionTimeoutExceeded() throws Exception { + testCancelUserCompactionTimeout(true); + } + + @Test + public void testCancelUserCompactionTimeoutNotExceeded() throws Exception { + testCancelUserCompactionTimeout(false); + } + + private void testCancelUserCompactionTimeout(boolean timeout) throws Exception { + + var uniqueNames = getUniqueNames(2); + String table1 = uniqueNames[0]; + String table2 = uniqueNames[1]; + + try (final AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { + + // create a compaction service that uses a Planner that will schedule system jobs + // at a higher priority than user jobs + client.instanceOperations().setProperty( + Property.COMPACTION_SERVICE_PREFIX.getKey() + "testcancel.planner", + TestPlanner.class.getName()); + client.instanceOperations().setProperty( + Property.COMPACTION_SERVICE_PREFIX.getKey() + "testcancel.planner.opts.groups", + ("[{'group':'" + COMPACTOR_GROUP_1 + "'}]").replaceAll("'", "\"")); + + // create two tables that uses the compaction service + Map<String,String> props = new HashMap<>(); + props.put(Property.TABLE_COMPACTION_DISPATCHER.getKey(), + SimpleCompactionDispatcher.class.getName()); + props.put(Property.TABLE_COMPACTION_DISPATCHER_OPTS.getKey() + "service", "testcancel"); + // Disable system compactions to start for these tables + props.put(Property.TABLE_MAJC_RATIO.getKey(), "20"); + + // configure tablet compaction iterator that slows compaction down + var ntc = new NewTableConfiguration(); + IteratorSetting iterSetting = new IteratorSetting(50, SlowIterator.class); + SlowIterator.setSleepTime(iterSetting, 5); + ntc.attachIterator(iterSetting, EnumSet.of(IteratorScope.majc)); + ntc.setProperties(props); + + // Create two tables and write some data + client.tableOperations().create(table1, ntc); + client.tableOperations().create(table2, ntc); + writeRows((ClientContext) client, table1, MAX_DATA, true); + writeRows((ClientContext) client, table2, MAX_DATA, true); + + var ctx = getCluster().getServerContext(); + Optional<HostAndPort> coordinatorHost = ExternalCompactionUtil.findCompactionCoordinator(ctx); + if (coordinatorHost.isEmpty()) { + throw new TTransportException("Unable to get CompactionCoordinator address from ZooKeeper"); + } + + // Start a compaction for table2, this is done so that the compactor will be busy + // and new jobs will queue up and wait + client.tableOperations().compact(table2, new CompactionConfig().setWait(false)); + + var tableId = TableId.of(client.tableOperations().tableIdMap().get(table1)); + var extent = new KeyExtent(tableId, null, null); + + // If timeout is true then set a short timeout so the system job can cancel the user job + // Otherwise the long timeout should prevent the system from clearing the selected files + var expiration = timeout ? "100ms" : "100s"; + client.tableOperations().setProperty(table1, + Property.TABLE_COMPACTION_SELECTION_EXPIRATION.getKey(), expiration); + + // Submit a user job for table1 that will be put on the queue and waiting + // for the current job to finish + client.tableOperations().compact(table1, new CompactionConfig().setWait(false)); + // Wait for the fate operation to write selectedFiles + Wait.waitFor(() -> { + var tabletMeta = ((ClientContext) client).getAmple().readTablet(extent); + var selectedFiles = tabletMeta.getSelectedFiles(); + if (selectedFiles != null) { + return !selectedFiles.getFiles().isEmpty(); + } + return false; + }, Wait.MAX_WAIT_MILLIS, 10); + + // Change the ratio so a system compaction will attempt to be scheduled for table 1 + client.tableOperations().setProperty(table1, Property.TABLE_MAJC_RATIO.getKey(), "1"); + + if (timeout) { + // Because of the custom planner, the system compaction should now take priority + // System compactions were previously not eligible to run if selectedFiles existed + // for a user compaction already (and they overlapped). But now system compaction jobs + // are eligible to run if the user compaction has not started or completed any jobs + // and the expiration period has been exceeded. + // When this happens the system compaction will delete the selectedFiles column + Wait.waitFor(() -> { + var tabletMeta = ((ClientContext) client).getAmple().readTablet(extent); + return tabletMeta.getSelectedFiles() == null; + }, Wait.MAX_WAIT_MILLIS, 100); + + // Wait for the system compaction to be running + Wait.waitFor(() -> { + var tabletMeta = ((ClientContext) client).getAmple().readTablet(extent); + var externalCompactions = tabletMeta.getExternalCompactions(); + assertTrue(externalCompactions.values().stream() + .allMatch(ec -> ec.getKind() == CompactionKind.SYSTEM)); + return externalCompactions.size() == 1; + }, Wait.MAX_WAIT_MILLIS, 10); + + // Wait for the user compaction to now run after the system finishes + Wait.waitFor(() -> { + var tabletMeta = ((ClientContext) client).getAmple().readTablet(extent); + var externalCompactions = tabletMeta.getExternalCompactions(); + var running = externalCompactions.values().stream() + .filter(ec -> ec.getKind() == CompactionKind.USER).count(); + return running == 1; + }, Wait.MAX_WAIT_MILLIS, 100); + } else { + // Wait for the user compaction to run, there should no system compactions scheduled + // even though system has the higher priority in the test because the timeout was + // not exceeded + Wait.waitFor(() -> { + var tabletMeta = ((ClientContext) client).getAmple().readTablet(extent); + var externalCompactions = tabletMeta.getExternalCompactions(); + assertTrue(externalCompactions.values().stream() + .allMatch(ec -> ec.getKind() == CompactionKind.USER)); + return externalCompactions.size() == 1; + }, Wait.MAX_WAIT_MILLIS, 10); + } + + // Wait and verify all compactions finish + Wait.waitFor(() -> { + var tabletMeta = ((ClientContext) client).getAmple().readTablet(extent); + var externalCompactions = tabletMeta.getExternalCompactions().size(); + log.debug("Waiting for compactions to finish, count {}", externalCompactions); + return externalCompactions == 0 && tabletMeta.getCompacted().isEmpty() + && tabletMeta.getSelectedFiles() == null; + }, Wait.MAX_WAIT_MILLIS, 100); + } + + ExternalCompactionTestUtils.assertNoCompactionMetadata(getServerContext(), table1); + } + + @Test + public void testOfflineAndCompactions() throws Exception { + var uniqueNames = getUniqueNames(1); + String table = uniqueNames[0]; + + // This test exercises concurrent compactions and table offline. + + try (final AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { + + SortedSet<Text> splits = new TreeSet<>(); + for (int i = 1; i < 32; i++) { + splits.add(new Text(String.format("r:%04d", i))); + } + + client.tableOperations().create(table, new NewTableConfiguration().withSplits(splits)); + writeRows(client, table, 33, true); + // create two files per tablet + writeRows(client, table, 33, true); + + var ctx = getCluster().getServerContext(); + var tableId = ctx.getTableId(table); + + // verify assumptions of test, expect all tablets to have files + var files0 = getFiles(ctx, tableId); + assertEquals(32, files0.size()); + assertFalse(files0.values().stream().anyMatch(Set::isEmpty)); + + // lower the tables compaction ratio to cause system compactions + client.tableOperations().setProperty(table, Property.TABLE_MAJC_RATIO.getKey(), "1"); + + // start a bunch of compactions in the background + var executor = Executors.newCachedThreadPool(); + List<Future<?>> futures = new ArrayList<>(); + // start user compactions on a subset of the tables tablets, system compactions should attempt + // to run on all tablets. With concurrency should get a mix. + for (int i = 1; i < 20; i++) { + var startRow = new Text(String.format("r:%04d", i - 1)); + var endRow = new Text(String.format("r:%04d", i)); + futures.add(executor.submit(() -> { + CompactionConfig config = new CompactionConfig(); + config.setWait(true); + config.setStartRow(startRow); + config.setEndRow(endRow); + client.tableOperations().compact(table, config); + return null; + })); + } + + log.debug("Waiting for offline"); + // take tablet offline while there are concurrent compactions + client.tableOperations().offline(table, true); + + // grab a snapshot of all the tablets files after waiting for offline, do not expect any + // tablets files to change at this point + var files1 = getFiles(ctx, tableId); + + // wait for the background compactions + log.debug("Waiting for futures"); + for (var future : futures) { + try { + future.get(); + } catch (ExecutionException ee) { + // its ok if some of the compactions fail because the table was concurrently taken offline + assertTrue(ee.getMessage().contains("is offline")); + } + } + + // grab a second snapshot of the tablets files after all the background operations completed + var files2 = getFiles(ctx, tableId); + + // do not expect the files to have changed after the offline operation returned. + assertEquals(files1, files2); + + executor.shutdown(); + } + } + + private Map<KeyExtent,Set<StoredTabletFile>> getFiles(ServerContext ctx, TableId tableId) { + Map<KeyExtent,Set<StoredTabletFile>> files = new HashMap<>(); + try (var tablets = ctx.getAmple().readTablets().forTable(tableId).build()) { + for (var tablet : tablets) { + files.put(tablet.getExtent(), tablet.getFiles()); + } + } + return files; + } + + @Test + public void testGetActiveCompactions() throws Exception { + final String table1 = this.getUniqueNames(1)[0]; + try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { + client.tableOperations().create(table1); + try (BatchWriter bw = client.createBatchWriter(table1)) { + for (int i = 1; i <= MAX_DATA; i++) { + Mutation m = new Mutation(Integer.toString(i)); + m.put("cf", "cq", new Value()); + bw.addMutation(m); + bw.flush(); + // flush often to create multiple files to compact + client.tableOperations().flush(table1, null, null, true); + } + } + + final AtomicReference<Exception> error = new AtomicReference<>(); + final CountDownLatch started = new CountDownLatch(1); + Thread t = new Thread(() -> { + try { + IteratorSetting setting = new IteratorSetting(50, "sleepy", SlowIterator.class); + setting.addOption("sleepTime", "3000"); + setting.addOption("seekSleepTime", "3000"); + client.tableOperations().attachIterator(table1, setting, EnumSet.of(IteratorScope.majc)); + started.countDown(); + client.tableOperations().compact(table1, new CompactionConfig().setWait(true)); + } catch (AccumuloSecurityException | TableNotFoundException | AccumuloException e) { + error.set(e); + } + }); + t.start(); + + started.await(); + + List<ActiveCompaction> compactions = new ArrayList<>(); + do { + client.instanceOperations().getActiveCompactions().forEach((ac) -> { + try { + if (ac.getTable().equals(table1)) { + compactions.add(ac); + } + } catch (TableNotFoundException e1) { + fail("Table was deleted during test, should not happen"); + } + }); + Thread.sleep(1000); + } while (compactions.isEmpty()); + + ActiveCompaction running1 = compactions.get(0); + CompactionHost host = running1.getHost(); - assertTrue(host.getType() == CompactionHost.Type.TSERVER); ++ assertTrue(host.getType() == CompactionHost.Type.COMPACTOR); + + compactions.clear(); + do { + HostAndPort hp = HostAndPort.fromParts(host.getAddress(), host.getPort()); + client.instanceOperations().getActiveCompactions(hp.toString()).forEach((ac) -> { + try { + if (ac.getTable().equals(table1)) { + compactions.add(ac); + } + } catch (TableNotFoundException e1) { + fail("Table was deleted during test, should not happen"); + } + }); + Thread.sleep(1000); + } while (compactions.isEmpty()); + + ActiveCompaction running2 = compactions.get(0); + assertEquals(running1.getInputFiles(), running2.getInputFiles()); + assertEquals(running1.getOutputFile(), running2.getOutputFile()); + assertEquals(running1.getTablet(), running2.getTablet()); + + client.tableOperations().cancelCompaction(table1); + t.join(); + } + } + /** * Counts the number of tablets and files in a table. */