This is an automated email from the ASF dual-hosted git repository. ddanielr pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit 3f3da5a5da9cf500777d4c4b0d5a1c6556557dd8 Merge: 5bec92248c 9817474333 Author: Daniel Roberts <ddani...@gmail.com> AuthorDate: Fri Mar 1 01:21:57 2024 +0000 Merge branch 'main' into elasticity .../core/clientImpl/InstanceOperationsImpl.java | 10 +- .../core/clientImpl/NamespaceOperationsImpl.java | 9 +- .../core/clientImpl/TableOperationsImpl.java | 14 +- .../TabletServerBatchReaderIterator.java | 8 +- .../core/clientImpl/TabletServerBatchWriter.java | 9 +- .../accumulo/core/clientImpl/ThriftScanner.java | 8 +- .../accumulo/core/clientImpl/bulk/BulkImport.java | 9 +- .../accumulo/core/fate/zookeeper/ZooReader.java | 8 +- .../spi/compaction/SimpleCompactionDispatcher.java | 5 +- .../java/org/apache/accumulo/core/util/Retry.java | 112 +++++++------- .../org/apache/accumulo/core/util/RetryTest.java | 163 +++++++++++---------- .../hadoopImpl/mapred/AccumuloRecordReader.java | 10 +- .../hadoopImpl/mapreduce/AccumuloRecordReader.java | 10 +- .../server/compaction/RetryableThriftCall.java | 8 +- .../metadata/ConditionalTabletsMutatorImpl.java | 11 +- .../java/org/apache/accumulo/manager/Manager.java | 7 +- .../coordinator/CompactionCoordinator.java | 8 +- .../coordinator/commit/CommitCompaction.java | 10 +- .../tableOps/bulkVer2/CleanUpBulkImport.java | 11 +- .../manager/tableOps/bulkVer2/TabletRefresher.java | 10 +- .../manager/tableOps/compact/CompactionDriver.java | 10 +- .../org/apache/accumulo/tserver/TabletServer.java | 19 +-- .../accumulo/tserver/session/SessionManager.java | 9 +- 23 files changed, 232 insertions(+), 246 deletions(-) diff --cc core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java index 6e558b9e3b,8435df2a40..bd708d2b19 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java @@@ -23,19 -23,10 +23,18 @@@ import static com.google.common.util.co import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.Objects.requireNonNull; import static java.util.concurrent.TimeUnit.MILLISECONDS; - import static java.util.concurrent.TimeUnit.MINUTES; import static java.util.concurrent.TimeUnit.SECONDS; import static java.util.stream.Collectors.toSet; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.AVAILABILITY; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.DIR; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.FILES; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.HOSTING_REQUESTED; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LAST; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOCATION; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOGS; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.PREV_ROW; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.SUSPEND; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.TIME; import static org.apache.accumulo.core.util.LazySingletons.RANDOM; import static org.apache.accumulo.core.util.Validators.EXISTING_TABLE_NAME; import static org.apache.accumulo.core.util.Validators.NEW_TABLE_NAME; @@@ -1876,74 -1882,26 +1876,74 @@@ public class TableOperationsImpl extend rangeList = new ArrayList<>(ranges); } - Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<>(); - + ClientTabletCache locator = ClientTabletCache.getInstance(context, tableId); locator.invalidateCache(); - Retry retry = Retry.builder().infiniteRetries().retryAfter(100, MILLISECONDS) - .incrementBy(100, MILLISECONDS).maxWait(2, SECONDS).backOffFactor(1.5) - .logInterval(3, MINUTES).createRetry(); + Retry retry = Retry.builder().infiniteRetries().retryAfter(Duration.ofMillis(100)) + .incrementBy(Duration.ofMillis(100)).maxWait(Duration.ofSeconds(2)).backOffFactor(1.5) + .logInterval(Duration.ofMinutes(3)).createRetry(); - while (!locator.binRanges(context, rangeList, binnedRanges).isEmpty()) { - context.requireTableExists(tableId, tableName); - context.requireNotOffline(tableId, tableName); - binnedRanges.clear(); - try { - retry.waitForNextAttempt(log, - String.format("locating tablets in table %s(%s) for %d ranges", tableName, tableId, - rangeList.size())); - } catch (InterruptedException e) { - throw new IllegalStateException(e); + final ArrayList<KeyExtent> locationLess = new ArrayList<>(); + final Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<>(); + final AtomicBoolean foundOnDemandTabletInRange = new AtomicBoolean(false); + + BiConsumer<CachedTablet,Range> rangeConsumer = (cachedTablet, range) -> { + // We want tablets that are currently hosted (location present) and + // their tablet availability is HOSTED (not OnDemand) + if (cachedTablet.getAvailability() != TabletAvailability.HOSTED) { + foundOnDemandTabletInRange.set(true); + } else if (cachedTablet.getTserverLocation().isPresent() + && cachedTablet.getAvailability() == TabletAvailability.HOSTED) { + ClientTabletCacheImpl.addRange(binnedRanges, cachedTablet, range); + } else { + locationLess.add(cachedTablet.getExtent()); + } + }; + + try { + + List<Range> failed = + locator.findTablets(context, rangeList, rangeConsumer, LocationNeed.NOT_REQUIRED); + + if (foundOnDemandTabletInRange.get()) { + throw new AccumuloException( + "TableOperations.locate() only works with tablets that have an availability of " + + TabletAvailability.HOSTED + + ". Tablets with other availabilities were seen. table:" + tableName + + " table id:" + tableId); + } + + while (!failed.isEmpty() || !locationLess.isEmpty()) { + + context.requireTableExists(tableId, tableName); + context.requireNotOffline(tableId, tableName); + + if (foundOnDemandTabletInRange.get()) { + throw new AccumuloException( + "TableOperations.locate() only works with tablets that have a tablet availability of " + + TabletAvailability.HOSTED + + ". Tablets with other availabilities were seen. table:" + tableName + + " table id:" + tableId); + } + + try { + retry.waitForNextAttempt(log, + String.format("locating tablets in table %s(%s) for %d ranges", tableName, tableId, + rangeList.size())); + } catch (InterruptedException e) { + throw new IllegalStateException(e); + } + + locationLess.clear(); + binnedRanges.clear(); + foundOnDemandTabletInRange.set(false); + locator.invalidateCache(); + failed = locator.findTablets(context, rangeList, rangeConsumer, LocationNeed.NOT_REQUIRED); } - locator.invalidateCache(); + + } catch (InvalidTabletHostingRequestException e) { + throw new AccumuloException("findTablets requested tablet hosting when it should not have", + e); } return new LocationsImpl(binnedRanges); diff --cc core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java index 324e78fe53,5e71087ce8..b95f256786 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java @@@ -257,14 -249,10 +255,14 @@@ public class TabletServerBatchReaderIte int lastFailureSize = Integer.MAX_VALUE; - Retry retry = Retry.builder().infiniteRetries().retryAfter(100, MILLISECONDS) - .incrementBy(100, MILLISECONDS).maxWait(10, SECONDS).backOffFactor(1.07) - .logInterval(1, MINUTES).createFactory().createRetry(); + Retry retry = Retry.builder().infiniteRetries().retryAfter(Duration.ofMillis(100)) + .incrementBy(Duration.ofMillis(100)).maxWait(Duration.ofMinutes(10)).backOffFactor(1.07) + .logInterval(Duration.ofMinutes(1)).createFactory().createRetry(); + ScanServerData ssd; + + long startTime = System.currentTimeMillis(); + while (true) { binnedRanges.clear(); diff --cc core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftScanner.java index 46d5be5e89,87726fd639..428a006575 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftScanner.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftScanner.java @@@ -306,61 -259,10 +304,61 @@@ public class ThriftScanner if (useScanServer) { scanAttempts = new ScanServerAttemptsImpl(); } + + this.tabletsScanned = 0; + } + + long startTimeNanos = 0; + long getNextScanAddressTimeNanos = 0; + + public void incrementTabletsScanned(KeyExtent extent) { + if (!extent.equals(prevExtent)) { + tabletsScanned++; + prevExtent = extent; + } + } + } + + static <T> Optional<T> waitUntil(Supplier<Optional<T>> condition, Duration maxWaitTime, + String description, Duration timeoutLeft, ClientContext context, TableId tableId, + Logger log) { + - Retry retry = Retry.builder().infiniteRetries().retryAfter(100, MILLISECONDS) - .incrementBy(100, MILLISECONDS).maxWait(1, SECONDS).backOffFactor(1.5) - .logInterval(3, MINUTES).createRetry(); ++ Retry retry = Retry.builder().infiniteRetries().retryAfter(Duration.ofMillis(100)) ++ .incrementBy(Duration.ofMillis(100)).maxWait(Duration.ofSeconds(1)).backOffFactor(1.5) ++ .logInterval(Duration.ofMinutes(3)).createRetry(); + + long startTime = System.nanoTime(); + Optional<T> optional = condition.get(); + while (optional.isEmpty()) { + log.trace("For tableId {} scan server selector is waiting for '{}'", tableId, description); + + var elapsedTime = Duration.ofNanos(System.nanoTime() - startTime); + + if (elapsedTime.compareTo(timeoutLeft) > 0) { + throw new TimedOutException("While waiting for '" + description + + "' in order to select a scan server, the scan timed out. "); + } + + if (elapsedTime.compareTo(maxWaitTime) > 0) { + return Optional.empty(); + } + + context.requireNotDeleted(tableId); + + try { + retry.waitForNextAttempt(log, String.format( + "For tableId %s scan server selector is waiting for '%s'", tableId, description)); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + + optional = condition.get(); } + + return optional; } - public static class ScanTimedOutException extends IOException { + public static class ScanTimedOutException extends TimedOutException { private static final long serialVersionUID = 1L; diff --cc hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapred/AccumuloRecordReader.java index 53a65676e2,09bb4539a6..463d1355ec --- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapred/AccumuloRecordReader.java +++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapred/AccumuloRecordReader.java @@@ -26,6 -23,6 +23,7 @@@ import static org.apache.accumulo.core. import java.io.IOException; import java.net.InetAddress; ++import java.time.Duration; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; @@@ -354,51 -346,18 +352,51 @@@ public abstract class AccumuloRecordRea // tablets... so clear it tl.invalidateCache(); - while (!tl.binRanges(context, ranges, binnedRanges).isEmpty()) { - context.requireNotDeleted(tableId); - context.requireNotOffline(tableId, tableName); - binnedRanges.clear(); - log.warn("Unable to locate bins for specified ranges. Retrying."); - // sleep randomly between 100 and 200 ms - sleepUninterruptibly(100 + RANDOM.get().nextInt(100), TimeUnit.MILLISECONDS); - tl.invalidateCache(); + if (InputConfigurator.getConsistencyLevel(callingClass, job) + == ConsistencyLevel.IMMEDIATE) { + while (!tl.binRanges(context, ranges, binnedRanges).isEmpty()) { + context.requireNotDeleted(tableId); + context.requireNotOffline(tableId, tableName); + binnedRanges.clear(); + log.warn("Unable to locate bins for specified ranges. Retrying."); + // sleep randomly between 100 and 200 ms + sleepUninterruptibly(100 + RANDOM.get().nextInt(100), TimeUnit.MILLISECONDS); + tl.invalidateCache(); + } + } else { + Map<String,Map<KeyExtent,List<Range>>> unhostedRanges = new HashMap<>(); + unhostedRanges.put("", new HashMap<>()); + BiConsumer<CachedTablet,Range> consumer = (ct, r) -> { + unhostedRanges.get("").computeIfAbsent(ct.getExtent(), k -> new ArrayList<>()) + .add(r); + }; + List<Range> failures = + tl.findTablets(context, ranges, consumer, LocationNeed.NOT_REQUIRED); + - Retry retry = Retry.builder().infiniteRetries().retryAfter(100, MILLISECONDS) - .incrementBy(100, MILLISECONDS).maxWait(2, SECONDS).backOffFactor(1.5) - .logInterval(3, MINUTES).createRetry(); ++ Retry retry = Retry.builder().infiniteRetries().retryAfter(Duration.ofMillis(100)) ++ .incrementBy(Duration.ofMillis(100)).maxWait(Duration.ofSeconds(2)) ++ .backOffFactor(1.5).logInterval(Duration.ofMinutes(3)).createRetry(); + + while (!failures.isEmpty()) { + + context.requireNotDeleted(tableId); + + try { + retry.waitForNextAttempt(log, + String.format("locating tablets in table %s(%s) for %d ranges", tableName, + tableId, ranges.size())); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + unhostedRanges.get("").clear(); + tl.invalidateCache(); + failures = tl.findTablets(context, ranges, consumer, LocationNeed.NOT_REQUIRED); + } + binnedRanges = unhostedRanges; } } - } catch (TableOfflineException | TableNotFoundException | AccumuloException - | AccumuloSecurityException e) { + } catch (InvalidTabletHostingRequestException | TableOfflineException + | TableNotFoundException | AccumuloException | AccumuloSecurityException e) { throw new IOException(e); } diff --cc hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/AccumuloRecordReader.java index b088d8530f,3b60ca798a..50c7fecc9c --- a/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/AccumuloRecordReader.java +++ b/hadoop-mapreduce/src/main/java/org/apache/accumulo/hadoopImpl/mapreduce/AccumuloRecordReader.java @@@ -26,6 -23,6 +23,7 @@@ import static org.apache.accumulo.core. import java.io.IOException; import java.net.InetAddress; ++import java.time.Duration; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; @@@ -387,52 -379,18 +385,52 @@@ public abstract class AccumuloRecordRea // tables tablets... so clear it tl.invalidateCache(); - while (!tl.binRanges(clientContext, ranges, binnedRanges).isEmpty()) { - clientContext.requireNotDeleted(tableId); - clientContext.requireNotOffline(tableId, tableName); - binnedRanges.clear(); - log.warn("Unable to locate bins for specified ranges. Retrying."); - // sleep randomly between 100 and 200 ms - sleepUninterruptibly(100 + RANDOM.get().nextInt(100), TimeUnit.MILLISECONDS); - tl.invalidateCache(); + if (InputConfigurator.getConsistencyLevel(callingClass, context.getConfiguration()) + == ConsistencyLevel.IMMEDIATE) { + while (!tl.binRanges(clientContext, ranges, binnedRanges).isEmpty()) { + clientContext.requireNotDeleted(tableId); + clientContext.requireNotOffline(tableId, tableName); + binnedRanges.clear(); + log.warn("Unable to locate bins for specified ranges. Retrying."); + // sleep randomly between 100 and 200 ms + sleepUninterruptibly(100 + RANDOM.get().nextInt(100), TimeUnit.MILLISECONDS); + tl.invalidateCache(); + } + } else { + Map<String,Map<KeyExtent,List<Range>>> unhostedRanges = new HashMap<>(); + unhostedRanges.put("", new HashMap<>()); + BiConsumer<CachedTablet,Range> consumer = (ct, r) -> { + unhostedRanges.get("").computeIfAbsent(ct.getExtent(), k -> new ArrayList<>()) + .add(r); + }; + List<Range> failures = + tl.findTablets(clientContext, ranges, consumer, LocationNeed.NOT_REQUIRED); + - Retry retry = Retry.builder().infiniteRetries().retryAfter(100, MILLISECONDS) - .incrementBy(100, MILLISECONDS).maxWait(2, SECONDS).backOffFactor(1.5) - .logInterval(3, MINUTES).createRetry(); ++ Retry retry = Retry.builder().infiniteRetries().retryAfter(Duration.ofMillis(100)) ++ .incrementBy(Duration.ofMillis(100)).maxWait(Duration.ofSeconds(2)) ++ .backOffFactor(1.5).logInterval(Duration.ofMinutes(3)).createRetry(); + + while (!failures.isEmpty()) { + + clientContext.requireNotDeleted(tableId); + + try { + retry.waitForNextAttempt(log, + String.format("locating tablets in table %s(%s) for %d ranges", tableName, + tableId, ranges.size())); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + unhostedRanges.get("").clear(); + tl.invalidateCache(); + failures = + tl.findTablets(clientContext, ranges, consumer, LocationNeed.NOT_REQUIRED); + } + binnedRanges = unhostedRanges; } } - } catch (TableOfflineException | TableNotFoundException | AccumuloException - | AccumuloSecurityException e) { + } catch (InvalidTabletHostingRequestException | TableOfflineException + | TableNotFoundException | AccumuloException | AccumuloSecurityException e) { throw new IOException(e); } diff --cc server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletsMutatorImpl.java index 9e7e903a00,0000000000..4401654313 mode 100644,000000..100644 --- a/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletsMutatorImpl.java +++ b/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletsMutatorImpl.java @@@ -1,298 -1,0 +1,295 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.server.metadata; + - import static java.util.concurrent.TimeUnit.MILLISECONDS; - import static java.util.concurrent.TimeUnit.MINUTES; - import static java.util.concurrent.TimeUnit.SECONDS; - ++import java.time.Duration; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.ConditionalWriter; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.data.ConditionalMutation; +import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.metadata.schema.Ample; +import org.apache.accumulo.core.metadata.schema.TabletMetadata; +import org.apache.accumulo.core.util.Retry; +import org.apache.accumulo.server.ServerContext; +import org.apache.hadoop.io.Text; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; +import com.google.common.base.Suppliers; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; + +public class ConditionalTabletsMutatorImpl implements Ample.ConditionalTabletsMutator { + + private static final Logger log = LoggerFactory.getLogger(ConditionalTabletsMutatorImpl.class); + + private final ServerContext context; + private Ample.DataLevel dataLevel = null; + + private List<ConditionalMutation> mutations = new ArrayList<>(); + + private Map<Text,KeyExtent> extents = new HashMap<>(); + + private boolean active = true; + + Map<KeyExtent,Ample.RejectionHandler> rejectedHandlers = new HashMap<>(); + + public ConditionalTabletsMutatorImpl(ServerContext context) { + this.context = context; + } + + @Override + public Ample.OperationRequirements mutateTablet(KeyExtent extent) { + Preconditions.checkState(active); + + var dataLevel = Ample.DataLevel.of(extent.tableId()); + + if (this.dataLevel == null) { + this.dataLevel = dataLevel; + } else if (!this.dataLevel.equals(dataLevel)) { + throw new IllegalArgumentException( + "Can not mix data levels " + this.dataLevel + " " + dataLevel); + } + + Preconditions.checkState(extents.putIfAbsent(extent.toMetaRow(), extent) == null, + "Duplicate extents not handled %s", extent); + return new ConditionalTabletMutatorImpl(this, context, extent, mutations::add, + rejectedHandlers::put); + } + + protected ConditionalWriter createConditionalWriter(Ample.DataLevel dataLevel) + throws TableNotFoundException { + if (dataLevel == Ample.DataLevel.ROOT) { + return new RootConditionalWriter(context); + } else { + return context.createConditionalWriter(dataLevel.metaTable()); + } + } + + protected Map<KeyExtent,TabletMetadata> readTablets(List<KeyExtent> extents) { + Map<KeyExtent,TabletMetadata> failedTablets = new HashMap<>(); + + try (var tabletsMeta = context.getAmple().readTablets().forTablets(extents, Optional.empty()) + .saveKeyValues().build()) { + tabletsMeta + .forEach(tabletMetadata -> failedTablets.put(tabletMetadata.getExtent(), tabletMetadata)); + } + + return failedTablets; + } + + private Map<KeyExtent,TabletMetadata> + readFailedTablets(Map<KeyExtent,ConditionalWriter.Result> results) { + + var extents = results.entrySet().stream().filter(e -> { + try { + return e.getValue().getStatus() != ConditionalWriter.Status.ACCEPTED; + } catch (AccumuloException | AccumuloSecurityException ex) { + throw new RuntimeException(ex); + } + }).map(Map.Entry::getKey).collect(Collectors.toList()); + + if (extents.isEmpty()) { + return Map.of(); + } + + return readTablets(extents); + } + + private void partitionResults(Iterator<ConditionalWriter.Result> results, + List<ConditionalWriter.Result> resultsList, List<ConditionalWriter.Result> unknownResults) { + while (results.hasNext()) { + var result = results.next(); + + try { + if (result.getStatus() == ConditionalWriter.Status.UNKNOWN) { + unknownResults.add(result); + } else { + resultsList.add(result); + } + } catch (AccumuloException | AccumuloSecurityException e) { + throw new RuntimeException(e); + } + } + } + + private Iterator<ConditionalWriter.Result> writeMutations(ConditionalWriter conditionalWriter) { + var results = conditionalWriter.write(mutations.iterator()); + + List<ConditionalWriter.Result> resultsList = new ArrayList<>(); + List<ConditionalWriter.Result> unknownResults = new ArrayList<>(); + partitionResults(results, resultsList, unknownResults); + + Retry retry = null; + + while (!unknownResults.isEmpty()) { + try { + if (retry == null) { - retry = Retry.builder().infiniteRetries().retryAfter(100, MILLISECONDS) - .incrementBy(100, MILLISECONDS).maxWait(2, SECONDS).backOffFactor(1.5) - .logInterval(3, MINUTES).createRetry(); ++ retry = Retry.builder().infiniteRetries().retryAfter(Duration.ofMillis(100)) ++ .incrementBy(Duration.ofMillis(100)).maxWait(Duration.ofSeconds(2)).backOffFactor(1.5) ++ .logInterval(Duration.ofMinutes(3)).createRetry(); + } + retry.waitForNextAttempt(log, "handle conditional mutations with unknown status"); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + + results = conditionalWriter + .write(unknownResults.stream().map(ConditionalWriter.Result::getMutation).iterator()); + + // create a new array instead of clearing in case the above has not consumed everything + unknownResults = new ArrayList<>(); + + partitionResults(results, resultsList, unknownResults); + } + + return resultsList.iterator(); + } + + private Ample.ConditionalResult.Status mapStatus(KeyExtent extent, + ConditionalWriter.Result result) { + + ConditionalWriter.Status status = null; + try { + status = result.getStatus(); + } catch (AccumuloException | AccumuloSecurityException e) { + throw new IllegalStateException(e); + } + + switch (status) { + case REJECTED: + return Ample.ConditionalResult.Status.REJECTED; + case ACCEPTED: + return Ample.ConditionalResult.Status.ACCEPTED; + default: + throw new IllegalStateException( + "Unexpected conditional mutation status : " + extent + " " + status); + } + } + + @Override + public Map<KeyExtent,Ample.ConditionalResult> process() { + Preconditions.checkState(active); + if (dataLevel != null) { + try (ConditionalWriter conditionalWriter = createConditionalWriter(dataLevel)) { + var results = writeMutations(conditionalWriter); + + var resultsMap = new HashMap<KeyExtent,ConditionalWriter.Result>(); + + while (results.hasNext()) { + var result = results.next(); + var row = new Text(result.getMutation().getRow()); + resultsMap.put(extents.get(row), result); + } + + var extentsSet = Set.copyOf(extents.values()); + ensureAllExtentsSeen(resultsMap, extentsSet); + + // only fetch the metadata for failures when requested and when it is requested fetch all + // of the failed extents at once to avoid fetching them one by one. + var failedMetadata = Suppliers.memoize(() -> readFailedTablets(resultsMap)); + + return Maps.transformEntries(resultsMap, (extent, result) -> new Ample.ConditionalResult() { + + private Ample.ConditionalResult.Status _getStatus() { + return mapStatus(extent, result); + } + + @Override + public Status getStatus() { + var status = _getStatus(); + if (status == Status.REJECTED && rejectedHandlers.containsKey(extent)) { + var tabletMetadata = readMetadata(); + var handler = rejectedHandlers.get(extent); + if (tabletMetadata == null && handler.callWhenTabletDoesNotExists() + && handler.test(null)) { + return Status.ACCEPTED; + } + if (tabletMetadata != null && handler.test(tabletMetadata)) { + return Status.ACCEPTED; + } + } + + return status; + } + + @Override + public KeyExtent getExtent() { + return extent; + } + + @Override + public TabletMetadata readMetadata() { + Preconditions.checkState(_getStatus() != Status.ACCEPTED, + "Can not read metadata for mutations with a status of " + Status.ACCEPTED); + return failedMetadata.get().get(getExtent()); + } + }); + } catch (TableNotFoundException e) { + throw new RuntimeException(e); + } finally { + // render inoperable because reuse is not tested + extents.clear(); + mutations.clear(); + active = false; + } + } else { + // render inoperable because reuse is not tested + extents.clear(); + mutations.clear(); + active = false; + return Map.of(); + } + } + + private void ensureAllExtentsSeen(HashMap<KeyExtent,ConditionalWriter.Result> resultsMap, + Set<KeyExtent> extentsSet) { + if (!resultsMap.keySet().equals(Set.copyOf(extents.values()))) { + // ELASTICITY_TODO this check can trigger if someone forgets to submit, could check for + // that + + Sets.difference(resultsMap.keySet(), extentsSet) + .forEach(extent -> log.error("Unexpected extent seen in in result {}", extent)); + + Sets.difference(extentsSet, resultsMap.keySet()) + .forEach(extent -> log.error("Expected extent not seen in result {}", extent)); + + resultsMap.forEach((keyExtent, result) -> { + log.error("result seen {} {}", keyExtent, new Text(result.getMutation().getRow())); + }); + + throw new AssertionError("Not all extents were seen, this is unexpected"); + } + } + + @Override + public void close() {} +} diff --cc server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java index 2c8dae7671,0000000000..ebd5d8f764 mode 100644,000000..100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java @@@ -1,1016 -1,0 +1,1016 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.manager.compaction.coordinator; + - import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.MINUTES; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.COMPACTED; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.ECOMP; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.FILES; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOCATION; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.OPID; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.SELECTED; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.UncheckedIOException; ++import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; +import java.util.stream.Collectors; + +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.TableDeletedException; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.client.admin.CompactionConfig; +import org.apache.accumulo.core.client.admin.compaction.CompactableFile; +import org.apache.accumulo.core.clientImpl.thrift.SecurityErrorCode; +import org.apache.accumulo.core.clientImpl.thrift.TInfo; +import org.apache.accumulo.core.clientImpl.thrift.TableOperation; +import org.apache.accumulo.core.clientImpl.thrift.TableOperationExceptionType; +import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException; +import org.apache.accumulo.core.clientImpl.thrift.ThriftTableOperationException; +import org.apache.accumulo.core.compaction.thrift.CompactionCoordinatorService; +import org.apache.accumulo.core.compaction.thrift.TCompactionState; +import org.apache.accumulo.core.compaction.thrift.TCompactionStatusUpdate; +import org.apache.accumulo.core.compaction.thrift.TExternalCompaction; +import org.apache.accumulo.core.compaction.thrift.TExternalCompactionList; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.NamespaceId; +import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent; +import org.apache.accumulo.core.fate.Fate; +import org.apache.accumulo.core.fate.FateId; +import org.apache.accumulo.core.fate.FateInstanceType; +import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; +import org.apache.accumulo.core.iterators.user.HasExternalCompactionsFilter; +import org.apache.accumulo.core.iteratorsImpl.system.SystemIteratorUtil; +import org.apache.accumulo.core.metadata.CompactableFileImpl; +import org.apache.accumulo.core.metadata.ReferencedTabletFile; +import org.apache.accumulo.core.metadata.StoredTabletFile; +import org.apache.accumulo.core.metadata.schema.Ample; +import org.apache.accumulo.core.metadata.schema.Ample.RejectionHandler; +import org.apache.accumulo.core.metadata.schema.CompactionMetadata; +import org.apache.accumulo.core.metadata.schema.ExternalCompactionId; +import org.apache.accumulo.core.metadata.schema.TabletMetadata; +import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType; +import org.apache.accumulo.core.metadata.schema.TabletsMetadata; +import org.apache.accumulo.core.metrics.MetricsProducer; +import org.apache.accumulo.core.securityImpl.thrift.TCredentials; +import org.apache.accumulo.core.spi.compaction.CompactionJob; +import org.apache.accumulo.core.spi.compaction.CompactionKind; +import org.apache.accumulo.core.spi.compaction.CompactorGroupId; +import org.apache.accumulo.core.tabletserver.thrift.InputFile; +import org.apache.accumulo.core.tabletserver.thrift.IteratorConfig; +import org.apache.accumulo.core.tabletserver.thrift.TCompactionKind; +import org.apache.accumulo.core.tabletserver.thrift.TCompactionStats; +import org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob; +import org.apache.accumulo.core.util.Retry; +import org.apache.accumulo.core.util.UtilWaitThread; +import org.apache.accumulo.core.util.cache.Caches.CacheName; +import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil; +import org.apache.accumulo.core.util.compaction.RunningCompaction; +import org.apache.accumulo.core.util.threads.ThreadPools; +import org.apache.accumulo.core.util.threads.Threads; +import org.apache.accumulo.core.volume.Volume; +import org.apache.accumulo.manager.Manager; +import org.apache.accumulo.manager.compaction.coordinator.commit.CommitCompaction; +import org.apache.accumulo.manager.compaction.coordinator.commit.CompactionCommitData; +import org.apache.accumulo.manager.compaction.coordinator.commit.RenameCompactionFile; +import org.apache.accumulo.manager.compaction.queue.CompactionJobQueues; +import org.apache.accumulo.server.ServerContext; +import org.apache.accumulo.server.compaction.CompactionConfigStorage; +import org.apache.accumulo.server.compaction.CompactionPluginUtils; +import org.apache.accumulo.server.security.SecurityOperation; +import org.apache.accumulo.server.tablets.TabletNameGenerator; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.thrift.TException; +import org.apache.zookeeper.KeeperException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.CacheLoader; +import com.github.benmanes.caffeine.cache.LoadingCache; +import com.github.benmanes.caffeine.cache.Weigher; +import com.google.common.base.Preconditions; +import com.google.common.collect.Sets; +import com.google.common.net.HostAndPort; + +import io.micrometer.core.instrument.Gauge; +import io.micrometer.core.instrument.MeterRegistry; + +public class CompactionCoordinator + implements CompactionCoordinatorService.Iface, Runnable, MetricsProducer { + + private static final Logger LOG = LoggerFactory.getLogger(CompactionCoordinator.class); + + /* + * Map of compactionId to RunningCompactions. This is an informational cache of what external + * compactions may be running. Its possible it may contain external compactions that are not + * actually running. It may not contain compactions that are actually running. The metadata table + * is the most authoritative source of what external compactions are currently running, but it + * does not have the stats that this map has. + */ + protected final Map<ExternalCompactionId,RunningCompaction> RUNNING_CACHE = + new ConcurrentHashMap<>(); + + /* Map of group name to last time compactor called to get a compaction job */ + // ELASTICITY_TODO need to clean out groups that are no longer configured.. + private final Map<CompactorGroupId,Long> TIME_COMPACTOR_LAST_CHECKED = new ConcurrentHashMap<>(); + + private final ServerContext ctx; + private final SecurityOperation security; + private final CompactionJobQueues jobQueues; + private final AtomicReference<Map<FateInstanceType,Fate<Manager>>> fateInstances; + // Exposed for tests + protected volatile Boolean shutdown = false; + + private final ScheduledThreadPoolExecutor schedExecutor; + + private final Cache<ExternalCompactionId,RunningCompaction> completed; + private final LoadingCache<FateId,CompactionConfig> compactionConfigCache; + private final Cache<Path,Integer> tabletDirCache; + private final DeadCompactionDetector deadCompactionDetector; + + private final QueueMetrics queueMetrics; + + public CompactionCoordinator(ServerContext ctx, SecurityOperation security, + AtomicReference<Map<FateInstanceType,Fate<Manager>>> fateInstances) { + this.ctx = ctx; + this.schedExecutor = this.ctx.getScheduledExecutor(); + this.security = security; + + this.jobQueues = new CompactionJobQueues( + ctx.getConfiguration().getCount(Property.MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_SIZE)); + + this.queueMetrics = new QueueMetrics(jobQueues); + + this.fateInstances = fateInstances; + + completed = ctx.getCaches().createNewBuilder(CacheName.COMPACTIONS_COMPLETED, true) + .maximumSize(200).expireAfterWrite(10, TimeUnit.MINUTES).build(); + + CacheLoader<FateId,CompactionConfig> loader = + fateId -> CompactionConfigStorage.getConfig(ctx, fateId); + + // Keep a small short lived cache of compaction config. Compaction config never changes, however + // when a compaction is canceled it is deleted which is why there is a time limit. It does not + // hurt to let a job that was canceled start, it will be canceled later. Caching this immutable + // config will help avoid reading the same data over and over. + compactionConfigCache = ctx.getCaches().createNewBuilder(CacheName.COMPACTION_CONFIGS, true) + .expireAfterWrite(30, SECONDS).maximumSize(100).build(loader); + + Weigher<Path,Integer> weigher = (path, count) -> { + return path.toUri().toString().length(); + }; + + tabletDirCache = ctx.getCaches().createNewBuilder(CacheName.COMPACTION_DIR_CACHE, true) + .maximumWeight(10485760L).weigher(weigher).build(); + + deadCompactionDetector = new DeadCompactionDetector(this.ctx, this, schedExecutor); + // At this point the manager does not have its lock so no actions should be taken yet + } + + private volatile Thread serviceThread = null; + + public void start() { + serviceThread = Threads.createThread("CompactionCoordinator Thread", this); + serviceThread.start(); + } + + public void shutdown() { + shutdown = true; + var localThread = serviceThread; + if (localThread != null) { + try { + localThread.join(); + } catch (InterruptedException e) { + LOG.error("Exception stopping compaction coordinator thread", e); + } + } + } + + protected void startCompactionCleaner(ScheduledThreadPoolExecutor schedExecutor) { + ScheduledFuture<?> future = + schedExecutor.scheduleWithFixedDelay(this::cleanUpCompactors, 0, 5, TimeUnit.MINUTES); + ThreadPools.watchNonCriticalScheduledTask(future); + } + + protected void startRunningCleaner(ScheduledThreadPoolExecutor schedExecutor) { + ScheduledFuture<?> future = + schedExecutor.scheduleWithFixedDelay(this::cleanUpRunning, 0, 5, TimeUnit.MINUTES); + ThreadPools.watchNonCriticalScheduledTask(future); + } + + @Override + public void run() { + + startCompactionCleaner(schedExecutor); + startRunningCleaner(schedExecutor); + + // On a re-start of the coordinator it's possible that external compactions are in-progress. + // Attempt to get the running compactions on the compactors and then resolve which tserver + // the external compaction came from to re-populate the RUNNING collection. + LOG.info("Checking for running external compactions"); + // On re-start contact the running Compactors to try and seed the list of running compactions + List<RunningCompaction> running = getCompactionsRunningOnCompactors(); + if (running.isEmpty()) { + LOG.info("No running external compactions found"); + } else { + LOG.info("Found {} running external compactions", running.size()); + running.forEach(rc -> { + TCompactionStatusUpdate update = new TCompactionStatusUpdate(); + update.setState(TCompactionState.IN_PROGRESS); + update.setMessage("Coordinator restarted, compaction found in progress"); + rc.addUpdate(System.currentTimeMillis(), update); + RUNNING_CACHE.put(ExternalCompactionId.of(rc.getJob().getExternalCompactionId()), rc); + }); + } + + startDeadCompactionDetector(); + + // ELASTICITY_TODO the main function of the following loop was getting group summaries from + // tservers. Its no longer doing that. May be best to remove the loop and make the remaining + // task a scheduled one. + + LOG.info("Starting loop to check for compactors not checking in"); + while (!shutdown) { + long start = System.currentTimeMillis(); + + long now = System.currentTimeMillis(); + TIME_COMPACTOR_LAST_CHECKED.forEach((k, v) -> { + if ((now - v) > getMissingCompactorWarningTime()) { + // ELASTICITY_TODO may want to consider of the group has any jobs queued OR if the group + // still exist in configuration + LOG.warn("No compactors have checked in with coordinator for group {} in {}ms", k, + getMissingCompactorWarningTime()); + } + }); + + long checkInterval = getTServerCheckInterval(); + long duration = (System.currentTimeMillis() - start); + if (checkInterval - duration > 0) { + LOG.debug("Waiting {}ms for next group check", (checkInterval - duration)); + UtilWaitThread.sleep(checkInterval - duration); + } + } + + LOG.info("Shutting down"); + } + + protected void startDeadCompactionDetector() { + deadCompactionDetector.start(); + } + + protected long getMissingCompactorWarningTime() { + return this.ctx.getConfiguration().getTimeInMillis(Property.COMPACTOR_MAX_JOB_WAIT_TIME) * 3; + } + + protected long getTServerCheckInterval() { + return this.ctx.getConfiguration() + .getTimeInMillis(Property.COMPACTION_COORDINATOR_TSERVER_COMPACTION_CHECK_INTERVAL); + } + + public long getNumRunningCompactions() { + return RUNNING_CACHE.size(); + } + + /** + * Return the next compaction job from the queue to a Compactor + * + * @param groupName group + * @param compactorAddress compactor address + * @throws ThriftSecurityException when permission error + * @return compaction job + */ + @Override + public TExternalCompactionJob getCompactionJob(TInfo tinfo, TCredentials credentials, + String groupName, String compactorAddress, String externalCompactionId) + throws ThriftSecurityException { + + // do not expect users to call this directly, expect compactors to call this method + if (!security.canPerformSystemActions(credentials)) { + throw new AccumuloSecurityException(credentials.getPrincipal(), + SecurityErrorCode.PERMISSION_DENIED).asThriftException(); + } + CompactorGroupId groupId = CompactorGroupId.of(groupName); + LOG.trace("getCompactionJob called for group {} by compactor {}", groupId, compactorAddress); + TIME_COMPACTOR_LAST_CHECKED.put(groupId, System.currentTimeMillis()); + + TExternalCompactionJob result = null; + + CompactionJobQueues.MetaJob metaJob = jobQueues.poll(groupId); + + while (metaJob != null) { + + Optional<CompactionConfig> compactionConfig = getCompactionConfig(metaJob); + + // this method may reread the metadata, do not use the metadata in metaJob for anything after + // this method + CompactionMetadata ecm = null; + + var kind = metaJob.getJob().getKind(); + + // Only reserve user compactions when the config is present. When compactions are canceled the + // config is deleted. + if (kind == CompactionKind.SYSTEM + || (kind == CompactionKind.USER && compactionConfig.isPresent())) { + ecm = reserveCompaction(metaJob, compactorAddress, + ExternalCompactionId.from(externalCompactionId)); + } + + if (ecm != null) { + result = createThriftJob(externalCompactionId, ecm, metaJob, compactionConfig); + // It is possible that by the time this added that the the compactor that made this request + // is dead. In this cases the compaction is not actually running. + RUNNING_CACHE.put(ExternalCompactionId.of(result.getExternalCompactionId()), + new RunningCompaction(result, compactorAddress, groupName)); + LOG.debug("Returning external job {} to {} with {} files", result.externalCompactionId, + compactorAddress, ecm.getJobFiles().size()); + break; + } else { + LOG.debug( + "Unable to reserve compaction job for {}, pulling another off the queue for group {}", + metaJob.getTabletMetadata().getExtent(), groupName); + metaJob = jobQueues.poll(CompactorGroupId.of(groupName)); + } + } + + if (metaJob == null) { + LOG.debug("No jobs found in group {} ", groupName); + } + + if (result == null) { + LOG.trace("No jobs found for group {}, returning empty job to compactor {}", groupName, + compactorAddress); + result = new TExternalCompactionJob(); + } + + return result; + + } + + // ELASTICITY_TODO unit test this code + private boolean canReserveCompaction(TabletMetadata tablet, CompactionJob job, + Set<StoredTabletFile> jobFiles) { + + if (tablet == null) { + // the tablet no longer exist + return false; + } + + if (tablet.getOperationId() != null) { + return false; + } + + if (!tablet.getFiles().containsAll(jobFiles)) { + return false; + } + + var currentlyCompactingFiles = tablet.getExternalCompactions().values().stream() + .flatMap(ecm -> ecm.getJobFiles().stream()).collect(Collectors.toSet()); + + if (!Collections.disjoint(jobFiles, currentlyCompactingFiles)) { + return false; + } + + switch (job.getKind()) { + case SYSTEM: + var userRequestedCompactions = tablet.getUserCompactionsRequested().size(); + if (userRequestedCompactions > 0) { + LOG.debug( + "Unable to reserve {} for system compaction, tablet has {} pending requested user compactions", + tablet.getExtent(), userRequestedCompactions); + return false; + } else if (tablet.getSelectedFiles() != null + && !Collections.disjoint(jobFiles, tablet.getSelectedFiles().getFiles())) { + return false; + } + break; + case USER: + case SELECTOR: + if (tablet.getSelectedFiles() == null + || !tablet.getSelectedFiles().getFiles().containsAll(jobFiles)) { + return false; + } + break; + default: + throw new UnsupportedOperationException("Not currently handling " + job.getKind()); + } + + return true; + } + + private void checkTabletDir(KeyExtent extent, Path path) { + try { + if (tabletDirCache.getIfPresent(path) == null) { + FileStatus[] files = null; + try { + files = ctx.getVolumeManager().listStatus(path); + } catch (FileNotFoundException ex) { + // ignored + } + + if (files == null) { + LOG.debug("Tablet {} had no dir, creating {}", extent, path); + + ctx.getVolumeManager().mkdirs(path); + } + tabletDirCache.put(path, 1); + } + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + protected CompactionMetadata createExternalCompactionMetadata(CompactionJob job, + Set<StoredTabletFile> jobFiles, TabletMetadata tablet, String compactorAddress, + ExternalCompactionId externalCompactionId) { + boolean propDels; + + FateId fateId = null; + + switch (job.getKind()) { + case SYSTEM: { + boolean compactingAll = tablet.getFiles().equals(jobFiles); + propDels = !compactingAll; + } + break; + case SELECTOR: + case USER: { + boolean compactingAll = tablet.getSelectedFiles().initiallySelectedAll() + && tablet.getSelectedFiles().getFiles().equals(jobFiles); + propDels = !compactingAll; + fateId = tablet.getSelectedFiles().getFateId(); + } + break; + default: + throw new IllegalArgumentException(); + } + + Consumer<String> directoryCreator = dir -> checkTabletDir(tablet.getExtent(), new Path(dir)); + ReferencedTabletFile newFile = TabletNameGenerator.getNextDataFilenameForMajc(propDels, ctx, + tablet, directoryCreator, externalCompactionId); + + return new CompactionMetadata(jobFiles, newFile, compactorAddress, job.getKind(), + job.getPriority(), job.getGroup(), propDels, fateId); + + } + + protected CompactionMetadata reserveCompaction(CompactionJobQueues.MetaJob metaJob, + String compactorAddress, ExternalCompactionId externalCompactionId) { + + Preconditions.checkArgument(metaJob.getJob().getKind() == CompactionKind.SYSTEM + || metaJob.getJob().getKind() == CompactionKind.USER); + + var tabletMetadata = metaJob.getTabletMetadata(); + + var jobFiles = metaJob.getJob().getFiles().stream().map(CompactableFileImpl::toStoredTabletFile) + .collect(Collectors.toSet()); + - Retry retry = - Retry.builder().maxRetries(5).retryAfter(100, MILLISECONDS).incrementBy(100, MILLISECONDS) - .maxWait(10, SECONDS).backOffFactor(1.5).logInterval(3, MINUTES).createRetry(); ++ Retry retry = Retry.builder().maxRetries(5).retryAfter(Duration.ofMillis(100)) ++ .incrementBy(Duration.ofMillis(100)).maxWait(Duration.ofSeconds(10)).backOffFactor(1.5) ++ .logInterval(Duration.ofMinutes(3)).createRetry(); + + while (retry.canRetry()) { + try (var tabletsMutator = ctx.getAmple().conditionallyMutateTablets()) { + var extent = metaJob.getTabletMetadata().getExtent(); + + if (!canReserveCompaction(tabletMetadata, metaJob.getJob(), jobFiles)) { + return null; + } + + var ecm = createExternalCompactionMetadata(metaJob.getJob(), jobFiles, tabletMetadata, + compactorAddress, externalCompactionId); + + // any data that is read from the tablet to make a decision about if it can compact or not + // must be included in the requireSame call + var tabletMutator = tabletsMutator.mutateTablet(extent).requireAbsentOperation() + .requireSame(tabletMetadata, FILES, SELECTED, ECOMP); + + tabletMutator.putExternalCompaction(externalCompactionId, ecm); + tabletMutator.submit(tm -> tm.getExternalCompactions().containsKey(externalCompactionId)); + + var result = tabletsMutator.process().get(extent); + + if (result.getStatus() == Ample.ConditionalResult.Status.ACCEPTED) { + return ecm; + } else { + tabletMetadata = result.readMetadata(); + } + } + + retry.useRetry(); + try { + retry.waitForNextAttempt(LOG, + "Reserved compaction for " + metaJob.getTabletMetadata().getExtent()); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + return null; + } + + protected TExternalCompactionJob createThriftJob(String externalCompactionId, + CompactionMetadata ecm, CompactionJobQueues.MetaJob metaJob, + Optional<CompactionConfig> compactionConfig) { + + Set<CompactableFile> selectedFiles; + if (metaJob.getJob().getKind() == CompactionKind.SYSTEM) { + selectedFiles = Set.of(); + } else { + selectedFiles = metaJob.getTabletMetadata().getSelectedFiles().getFiles().stream() + .map(file -> new CompactableFileImpl(file, + metaJob.getTabletMetadata().getFilesMap().get(file))) + .collect(Collectors.toUnmodifiableSet()); + } + + Map<String,String> overrides = CompactionPluginUtils.computeOverrides(compactionConfig, ctx, + metaJob.getTabletMetadata().getExtent(), metaJob.getJob().getFiles(), selectedFiles); + + IteratorConfig iteratorSettings = SystemIteratorUtil + .toIteratorConfig(compactionConfig.map(CompactionConfig::getIterators).orElse(List.of())); + + var files = ecm.getJobFiles().stream().map(storedTabletFile -> { + var dfv = metaJob.getTabletMetadata().getFilesMap().get(storedTabletFile); + return new InputFile(storedTabletFile.getMetadata(), dfv.getSize(), dfv.getNumEntries(), + dfv.getTime()); + }).collect(Collectors.toList()); + + FateInstanceType type = FateInstanceType.fromTableId(metaJob.getTabletMetadata().getTableId()); + FateId fateId = FateId.from(type, 0); + if (metaJob.getJob().getKind() == CompactionKind.USER) { + fateId = metaJob.getTabletMetadata().getSelectedFiles().getFateId(); + } + + return new TExternalCompactionJob(externalCompactionId, + metaJob.getTabletMetadata().getExtent().toThrift(), files, iteratorSettings, + ecm.getCompactTmpName().getNormalizedPathStr(), ecm.getPropagateDeletes(), + TCompactionKind.valueOf(ecm.getKind().name()), fateId.toThrift(), overrides); + } + + @Override + public void registerMetrics(MeterRegistry registry) { + Gauge.builder(METRICS_MAJC_QUEUED, jobQueues, CompactionJobQueues::getQueuedJobCount) + .description("Number of queued major compactions").register(registry); + Gauge.builder(METRICS_MAJC_RUNNING, this, CompactionCoordinator::getNumRunningCompactions) + .description("Number of running major compactions").register(registry); + + queueMetrics.registerMetrics(registry); + } + + public void addJobs(TabletMetadata tabletMetadata, Collection<CompactionJob> jobs) { + jobQueues.add(tabletMetadata, jobs); + } + + public CompactionCoordinatorService.Iface getThriftService() { + return this; + } + + private Optional<CompactionConfig> getCompactionConfig(CompactionJobQueues.MetaJob metaJob) { + if (metaJob.getJob().getKind() == CompactionKind.USER + && metaJob.getTabletMetadata().getSelectedFiles() != null) { + var cconf = + compactionConfigCache.get(metaJob.getTabletMetadata().getSelectedFiles().getFateId()); + return Optional.ofNullable(cconf); + } + return Optional.empty(); + } + + /** + * Compactors calls this method when they have finished a compaction. This method does the + * following. + * + * <ol> + * <li>Reads the tablets metadata and determines if the compaction can commit. Its possible that + * things changed while the compaction was running and it can no longer commit.</li> + * <li>Commit the compaction using a conditional mutation. If the tablets files or location + * changed since reading the tablets metadata, then conditional mutation will fail. When this + * happens it will reread the metadata and go back to step 1 conceptually. When committing a + * compaction the compacted files are removed and scan entries are added to the tablet in case the + * files are in use, this prevents GC from deleting the files between updating tablet metadata and + * refreshing the tablet. The scan entries are only added when a tablet has a location.</li> + * <li>After successful commit a refresh request is sent to the tablet if it has a location. This + * will cause the tablet to start using the newly compacted files for future scans. Also the + * tablet can delete the scan entries if there are no active scans using them.</li> + * </ol> + * + * <p> + * User compactions will be refreshed as part of the fate operation. The user compaction fate + * operation will see the compaction was committed after this code updates the tablet metadata, + * however if it were to rely on this code to do the refresh it would not be able to know when the + * refresh was actually done. Therefore, user compactions will refresh as part of the fate + * operation so that it's known to be done before the fate operation returns. Since the fate + * operation will do it, there is no need to do it here for user compactions. + * + * @param tinfo trace info + * @param credentials tcredentials object + * @param externalCompactionId compaction id + * @param textent tablet extent + * @param stats compaction stats + * @throws ThriftSecurityException when permission error + */ + @Override + public void compactionCompleted(TInfo tinfo, TCredentials credentials, + String externalCompactionId, TKeyExtent textent, TCompactionStats stats) + throws ThriftSecurityException { + // do not expect users to call this directly, expect other tservers to call this method + if (!security.canPerformSystemActions(credentials)) { + throw new AccumuloSecurityException(credentials.getPrincipal(), + SecurityErrorCode.PERMISSION_DENIED).asThriftException(); + } + + // maybe fate has not started yet + var localFates = fateInstances.get(); + while (localFates == null) { + UtilWaitThread.sleep(100); + if (shutdown) { + return; + } + localFates = fateInstances.get(); + } + + var extent = KeyExtent.fromThrift(textent); + var localFate = localFates.get(FateInstanceType.fromTableId(extent.tableId())); + + LOG.info("Compaction completed, id: {}, stats: {}, extent: {}", externalCompactionId, stats, + extent); + final var ecid = ExternalCompactionId.of(externalCompactionId); + + var tabletMeta = + ctx.getAmple().readTablet(extent, ECOMP, SELECTED, LOCATION, FILES, COMPACTED, OPID); + + if (!CommitCompaction.canCommitCompaction(ecid, tabletMeta)) { + return; + } + + CompactionMetadata ecm = tabletMeta.getExternalCompactions().get(ecid); + var renameOp = new RenameCompactionFile(new CompactionCommitData(ecid, extent, ecm, stats)); + + // ELASTICITY_TODO add tag to fate that ECID can be added to. This solves two problem. First it + // defends against starting multiple fate txs for the same thing. This will help the split code + // also. Second the tag can be used by the dead compaction detector to ignore committing + // compactions. The imple coould hash the key to produce the fate tx id. + var txid = localFate.startTransaction(); + localFate.seedTransaction("COMMIT_COMPACTION", txid, renameOp, true, + "Commit compaction " + ecid); + + // ELASTICITY_TODO need to remove this wait. It is here because when the dead compaction + // detector ask a compactor what its currently running it expects that cover commit. To remove + // this wait would need another way for the dead compaction detector to know about committing + // compactions. Could add a tag to the fate tx with the ecid and have dead compaction detector + // scan these tags. This wait makes the code running in fate not be fault tolerant because in + // the + // case of faults the dead compaction detector may remove the compaction entry. + localFate.waitForCompletion(txid); + + // It's possible that RUNNING might not have an entry for this ecid in the case + // of a coordinator restart when the Coordinator can't find the TServer for the + // corresponding external compaction. + recordCompletion(ecid); + // ELASTICITY_TODO should above call move into fate code? + } + + @Override + public void compactionFailed(TInfo tinfo, TCredentials credentials, String externalCompactionId, + TKeyExtent extent) throws ThriftSecurityException { + // do not expect users to call this directly, expect other tservers to call this method + if (!security.canPerformSystemActions(credentials)) { + throw new AccumuloSecurityException(credentials.getPrincipal(), + SecurityErrorCode.PERMISSION_DENIED).asThriftException(); + } + KeyExtent fromThriftExtent = KeyExtent.fromThrift(extent); + LOG.info("Compaction failed, id: {}, extent: {}", externalCompactionId, fromThriftExtent); + final var ecid = ExternalCompactionId.of(externalCompactionId); + compactionFailed(Map.of(ecid, KeyExtent.fromThrift(extent))); + } + + void compactionFailed(Map<ExternalCompactionId,KeyExtent> compactions) { + + try (var tabletsMutator = ctx.getAmple().conditionallyMutateTablets()) { + compactions.forEach((ecid, extent) -> { + try { + ctx.requireNotDeleted(extent.tableId()); + tabletsMutator.mutateTablet(extent).requireAbsentOperation().requireCompaction(ecid) + .deleteExternalCompaction(ecid).submit(new RejectionHandler() { + + @Override + public boolean callWhenTabletDoesNotExists() { + return true; + } + + @Override + public boolean test(TabletMetadata tabletMetadata) { + return tabletMetadata == null + || !tabletMetadata.getExternalCompactions().containsKey(ecid); + } + + }); + } catch (TableDeletedException e) { + LOG.warn("Table {} was deleted, unable to update metadata for compaction failure.", + extent.tableId()); + } + }); + + final List<ExternalCompactionId> ecidsForTablet = new ArrayList<>(); + tabletsMutator.process().forEach((extent, result) -> { + if (result.getStatus() != Ample.ConditionalResult.Status.ACCEPTED) { + + // this should try again later when the dead compaction detector runs, lets log it in case + // its a persistent problem + if (LOG.isDebugEnabled()) { + var ecid = + compactions.entrySet().stream().filter(entry -> entry.getValue().equals(extent)) + .findFirst().map(Map.Entry::getKey).orElse(null); + LOG.debug("Unable to remove failed compaction {} {}", extent, ecid); + } + } else { + // compactionFailed is called from the Compactor when either a compaction fails or + // is cancelled and it's called from the DeadCompactionDetector. This block is + // entered when the conditional mutator above successfully deletes an ecid from + // the tablet metadata. Remove compaction tmp files from the tablet directory + // that have a corresponding ecid in the name. + + ecidsForTablet.clear(); + compactions.entrySet().stream().filter(e -> e.getValue().compareTo(extent) == 0) + .map(Entry::getKey).forEach(ecidsForTablet::add); + + if (!ecidsForTablet.isEmpty()) { + final TabletMetadata tm = ctx.getAmple().readTablet(extent, ColumnType.DIR); + if (tm != null) { + final Collection<Volume> vols = ctx.getVolumeManager().getVolumes(); + for (Volume vol : vols) { + try { + final String volPath = + vol.getBasePath() + Constants.HDFS_TABLES_DIR + Path.SEPARATOR + + extent.tableId().canonical() + Path.SEPARATOR + tm.getDirName(); + final FileSystem fs = vol.getFileSystem(); + for (ExternalCompactionId ecid : ecidsForTablet) { + final String fileSuffix = "_tmp_" + ecid.canonical(); + FileStatus[] files = fs.listStatus(new Path(volPath), (path) -> { + return path.getName().endsWith(fileSuffix); + }); + if (files.length > 0) { + for (FileStatus file : files) { + if (!fs.delete(file.getPath(), false)) { + LOG.warn("Unable to delete ecid tmp file: {}: ", file.getPath()); + } else { + LOG.debug("Deleted ecid tmp file: {}", file.getPath()); + } + } + } + } + } catch (IOException e) { + LOG.error("Exception deleting compaction tmp files for tablet: {}", extent, e); + } + } + } else { + // TabletMetadata does not exist for the extent. This could be due to a merge or + // split operation. Use the utility to find tmp files at the table level + deadCompactionDetector.addTableId(extent.tableId()); + } + } + } + }); + } + + compactions.forEach((k, v) -> recordCompletion(k)); + } + + /** + * Compactor calls to update the status of the assigned compaction + * + * @param tinfo trace info + * @param credentials tcredentials object + * @param externalCompactionId compaction id + * @param update compaction status update + * @param timestamp timestamp of the message + * @throws ThriftSecurityException when permission error + */ + @Override + public void updateCompactionStatus(TInfo tinfo, TCredentials credentials, + String externalCompactionId, TCompactionStatusUpdate update, long timestamp) + throws ThriftSecurityException { + // do not expect users to call this directly, expect other tservers to call this method + if (!security.canPerformSystemActions(credentials)) { + throw new AccumuloSecurityException(credentials.getPrincipal(), + SecurityErrorCode.PERMISSION_DENIED).asThriftException(); + } + LOG.debug("Compaction status update, id: {}, timestamp: {}, update: {}", externalCompactionId, + timestamp, update); + final RunningCompaction rc = RUNNING_CACHE.get(ExternalCompactionId.of(externalCompactionId)); + if (null != rc) { + rc.addUpdate(timestamp, update); + } + } + + private void recordCompletion(ExternalCompactionId ecid) { + var rc = RUNNING_CACHE.remove(ecid); + if (rc != null) { + completed.put(ecid, rc); + } + } + + protected Set<ExternalCompactionId> readExternalCompactionIds() { + try (TabletsMetadata tabletsMetadata = + this.ctx.getAmple().readTablets().forLevel(Ample.DataLevel.USER) + .filter(new HasExternalCompactionsFilter()).fetch(ECOMP).build()) { + return tabletsMetadata.stream().flatMap(tm -> tm.getExternalCompactions().keySet().stream()) + .collect(Collectors.toSet()); + } + } + + /** + * The RUNNING_CACHE set may contain external compactions that are not actually running. This + * method periodically cleans those up. + */ + public void cleanUpRunning() { + + // grab a snapshot of the ids in the set before reading the metadata table. This is done to + // avoid removing things that are added while reading the metadata. + Set<ExternalCompactionId> idsSnapshot = Set.copyOf(RUNNING_CACHE.keySet()); + + // grab the ids that are listed as running in the metadata table. It important that this is done + // after getting the snapshot. + Set<ExternalCompactionId> idsInMetadata = readExternalCompactionIds(); + + var idsToRemove = Sets.difference(idsSnapshot, idsInMetadata); + + // remove ids that are in the running set but not in the metadata table + idsToRemove.forEach(this::recordCompletion); + + if (idsToRemove.size() > 0) { + LOG.debug("Removed stale entries from RUNNING_CACHE : {}", idsToRemove); + } + } + + /** + * Return information about running compactions + * + * @param tinfo trace info + * @param credentials tcredentials object + * @return map of ECID to TExternalCompaction objects + * @throws ThriftSecurityException permission error + */ + @Override + public TExternalCompactionList getRunningCompactions(TInfo tinfo, TCredentials credentials) + throws ThriftSecurityException { + // do not expect users to call this directly, expect other tservers to call this method + if (!security.canPerformSystemActions(credentials)) { + throw new AccumuloSecurityException(credentials.getPrincipal(), + SecurityErrorCode.PERMISSION_DENIED).asThriftException(); + } + + final TExternalCompactionList result = new TExternalCompactionList(); + RUNNING_CACHE.forEach((ecid, rc) -> { + TExternalCompaction trc = new TExternalCompaction(); + trc.setGroupName(rc.getGroupName()); + trc.setCompactor(rc.getCompactorAddress()); + trc.setUpdates(rc.getUpdates()); + trc.setJob(rc.getJob()); + result.putToCompactions(ecid.canonical(), trc); + }); + return result; + } + + /** + * Return information about recently completed compactions + * + * @param tinfo trace info + * @param credentials tcredentials object + * @return map of ECID to TExternalCompaction objects + * @throws ThriftSecurityException permission error + */ + @Override + public TExternalCompactionList getCompletedCompactions(TInfo tinfo, TCredentials credentials) + throws ThriftSecurityException { + // do not expect users to call this directly, expect other tservers to call this method + if (!security.canPerformSystemActions(credentials)) { + throw new AccumuloSecurityException(credentials.getPrincipal(), + SecurityErrorCode.PERMISSION_DENIED).asThriftException(); + } + final TExternalCompactionList result = new TExternalCompactionList(); + completed.asMap().forEach((ecid, rc) -> { + TExternalCompaction trc = new TExternalCompaction(); + trc.setGroupName(rc.getGroupName()); + trc.setCompactor(rc.getCompactorAddress()); + trc.setJob(rc.getJob()); + trc.setUpdates(rc.getUpdates()); + result.putToCompactions(ecid.canonical(), trc); + }); + return result; + } + + @Override + public void cancel(TInfo tinfo, TCredentials credentials, String externalCompactionId) + throws TException { + var runningCompaction = RUNNING_CACHE.get(ExternalCompactionId.of(externalCompactionId)); + var extent = KeyExtent.fromThrift(runningCompaction.getJob().getExtent()); + try { + NamespaceId nsId = this.ctx.getNamespaceId(extent.tableId()); + if (!security.canCompact(credentials, extent.tableId(), nsId)) { + throw new AccumuloSecurityException(credentials.getPrincipal(), + SecurityErrorCode.PERMISSION_DENIED).asThriftException(); + } + } catch (TableNotFoundException e) { + throw new ThriftTableOperationException(extent.tableId().canonical(), null, + TableOperation.COMPACT_CANCEL, TableOperationExceptionType.NOTFOUND, e.getMessage()); + } + + cancelCompactionOnCompactor(runningCompaction.getCompactorAddress(), externalCompactionId); + } + + /* Method exists to be called from test */ + public CompactionJobQueues getJobQueues() { + return jobQueues; + } + + /* Method exists to be overridden in test to hide static method */ + protected List<RunningCompaction> getCompactionsRunningOnCompactors() { + return ExternalCompactionUtil.getCompactionsRunningOnCompactors(this.ctx); + } + + /* Method exists to be overridden in test to hide static method */ + protected void cancelCompactionOnCompactor(String address, String externalCompactionId) { + HostAndPort hostPort = HostAndPort.fromString(address); + ExternalCompactionUtil.cancelCompaction(this.ctx, hostPort, externalCompactionId); + } + + private void deleteEmpty(ZooReaderWriter zoorw, String path) + throws KeeperException, InterruptedException { + try { + LOG.debug("Deleting empty ZK node {}", path); + zoorw.delete(path); + } catch (KeeperException.NotEmptyException e) { + LOG.debug("Failed to delete {} its not empty, likely an expected race condition.", path); + } + } + + private void cleanUpCompactors() { + final String compactorQueuesPath = this.ctx.getZooKeeperRoot() + Constants.ZCOMPACTORS; + + var zoorw = this.ctx.getZooReaderWriter(); + + try { + var groups = zoorw.getChildren(compactorQueuesPath); + + for (String group : groups) { + String qpath = compactorQueuesPath + "/" + group; + + var compactors = zoorw.getChildren(qpath); + + if (compactors.isEmpty()) { + deleteEmpty(zoorw, qpath); + } + + for (String compactor : compactors) { + String cpath = compactorQueuesPath + "/" + group + "/" + compactor; + var lockNodes = zoorw.getChildren(compactorQueuesPath + "/" + group + "/" + compactor); + if (lockNodes.isEmpty()) { + deleteEmpty(zoorw, cpath); + } + } + } + + } catch (KeeperException | RuntimeException e) { + LOG.warn("Failed to clean up compactors", e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IllegalStateException(e); + } + } + +} diff --cc server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/CommitCompaction.java index 477a040330,0000000000..2ff3a30386 mode 100644,000000..100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/CommitCompaction.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/CommitCompaction.java @@@ -1,280 -1,0 +1,278 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.manager.compaction.coordinator.commit; + - import static java.util.concurrent.TimeUnit.MILLISECONDS; - import static java.util.concurrent.TimeUnit.MINUTES; - import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.COMPACTED; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.ECOMP; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.FILES; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOCATION; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.OPID; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.SELECTED; + ++import java.time.Duration; +import java.util.HashSet; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.fate.FateId; +import org.apache.accumulo.core.fate.Repo; +import org.apache.accumulo.core.metadata.AbstractTabletFile; +import org.apache.accumulo.core.metadata.ReferencedTabletFile; +import org.apache.accumulo.core.metadata.StoredTabletFile; +import org.apache.accumulo.core.metadata.schema.Ample; +import org.apache.accumulo.core.metadata.schema.CompactionMetadata; +import org.apache.accumulo.core.metadata.schema.DataFileValue; +import org.apache.accumulo.core.metadata.schema.ExternalCompactionId; +import org.apache.accumulo.core.metadata.schema.SelectedFiles; +import org.apache.accumulo.core.metadata.schema.TabletMetadata; +import org.apache.accumulo.core.spi.compaction.CompactionKind; +import org.apache.accumulo.core.tabletserver.thrift.TCompactionStats; +import org.apache.accumulo.core.util.Retry; +import org.apache.accumulo.manager.Manager; +import org.apache.accumulo.manager.tableOps.ManagerRepo; +import org.apache.accumulo.server.ServerContext; +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; + +public class CommitCompaction extends ManagerRepo { + private static final long serialVersionUID = 1L; + private static final Logger LOG = LoggerFactory.getLogger(CommitCompaction.class); + private final CompactionCommitData commitData; + private final String newDatafile; + + public CommitCompaction(CompactionCommitData commitData, String newDatafile) { + this.commitData = commitData; + this.newDatafile = newDatafile; + } + + @Override + public Repo<Manager> call(FateId fateId, Manager manager) throws Exception { + var ecid = ExternalCompactionId.of(commitData.ecid); + var newFile = Optional.ofNullable(newDatafile).map(f -> ReferencedTabletFile.of(new Path(f))); + + // ELASTICITIY_TODO is it possible to test this code running a 2nd time, simulating a failure + // and rerun? Maybe fate could have a testing mode where it calls operations multiple times? + + // It is possible that when this runs that the compaction was previously committed and then the + // process died and now its running again. In this case commit should do nothing, but its + // important to still carry on with the rest of the steps after commit. This code ignores a that + // fact that a commit may not have happened in the current call and continues for this reason. + TabletMetadata tabletMetadata = commitCompaction(manager.getContext(), ecid, newFile); + + String loc = null; + if (tabletMetadata != null && tabletMetadata.getLocation() != null) { + loc = tabletMetadata.getLocation().getHostPortSession(); + } + + // This will causes the tablet to be reexamined to see if it needs any more compactions. + var extent = KeyExtent.fromThrift(commitData.textent); + manager.getEventCoordinator().event(extent, "Compaction completed %s", extent); + + return new PutGcCandidates(commitData, loc); + } + + KeyExtent getExtent() { + return KeyExtent.fromThrift(commitData.textent); + } + + private TabletMetadata commitCompaction(ServerContext ctx, ExternalCompactionId ecid, + Optional<ReferencedTabletFile> newDatafile) { + + var tablet = + ctx.getAmple().readTablet(getExtent(), ECOMP, SELECTED, LOCATION, FILES, COMPACTED, OPID); + - Retry retry = Retry.builder().infiniteRetries().retryAfter(100, MILLISECONDS) - .incrementBy(100, MILLISECONDS).maxWait(10, SECONDS).backOffFactor(1.5) - .logInterval(3, MINUTES).createRetry(); ++ Retry retry = Retry.builder().infiniteRetries().retryAfter(Duration.ofMillis(100)) ++ .incrementBy(Duration.ofMillis(100)).maxWait(Duration.ofSeconds(10)).backOffFactor(1.5) ++ .logInterval(Duration.ofMinutes(3)).createRetry(); + + while (canCommitCompaction(ecid, tablet)) { + CompactionMetadata ecm = tablet.getExternalCompactions().get(ecid); + + // the compacted files should not exists in the tablet already + var tablet2 = tablet; + newDatafile.ifPresent( + newFile -> Preconditions.checkState(!tablet2.getFiles().contains(newFile.insert()), + "File already exists in tablet %s %s", newFile, tablet2.getFiles())); + + try (var tabletsMutator = ctx.getAmple().conditionallyMutateTablets()) { + var tabletMutator = tabletsMutator.mutateTablet(getExtent()).requireAbsentOperation() + .requireCompaction(ecid).requireSame(tablet, FILES, LOCATION); + + if (ecm.getKind() == CompactionKind.USER || ecm.getKind() == CompactionKind.SELECTOR) { + tabletMutator.requireSame(tablet, SELECTED, COMPACTED); + } + + // make the needed updates to the tablet + updateTabletForCompaction(commitData.stats, ecid, tablet, newDatafile, ecm, tabletMutator); + + tabletMutator + .submit(tabletMetadata -> !tabletMetadata.getExternalCompactions().containsKey(ecid)); + + // TODO expensive logging + LOG.debug("Compaction completed {} added {} removed {}", tablet.getExtent(), newDatafile, + ecm.getJobFiles().stream().map(AbstractTabletFile::getFileName) + .collect(Collectors.toList())); + + var result = tabletsMutator.process().get(getExtent()); + if (result.getStatus() == Ample.ConditionalResult.Status.ACCEPTED) { + break; + } else { + // compaction failed to commit, maybe something changed on the tablet so lets reread the + // metadata and try again + tablet = result.readMetadata(); + } + + retry.waitForNextAttempt(LOG, "Failed to commit " + ecid + " for tablet " + getExtent()); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + return tablet; + } + + private void updateTabletForCompaction(TCompactionStats stats, ExternalCompactionId ecid, + TabletMetadata tablet, Optional<ReferencedTabletFile> newDatafile, CompactionMetadata ecm, + Ample.ConditionalTabletMutator tabletMutator) { + // ELASTICITY_TODO improve logging adapt to use existing tablet files logging + if (ecm.getKind() == CompactionKind.USER) { + if (tablet.getSelectedFiles().getFiles().equals(ecm.getJobFiles())) { + // all files selected for the user compactions are finished, so the tablet is finish and + // its compaction id needs to be updated. + + FateId fateId = tablet.getSelectedFiles().getFateId(); + + Preconditions.checkArgument(!tablet.getCompacted().contains(fateId), + "Tablet %s unexpected has selected files and compacted columns for %s", + tablet.getExtent(), fateId); + + // TODO set to trace + LOG.debug("All selected files compacted for {} setting compacted for {}", + tablet.getExtent(), tablet.getSelectedFiles().getFateId()); + + tabletMutator.deleteSelectedFiles(); + tabletMutator.putCompacted(fateId); + + } else { + // not all of the selected files were finished, so need to add the new file to the + // selected set + + Set<StoredTabletFile> newSelectedFileSet = + new HashSet<>(tablet.getSelectedFiles().getFiles()); + newSelectedFileSet.removeAll(ecm.getJobFiles()); + + if (newDatafile.isPresent()) { + // TODO set to trace + LOG.debug( + "Not all selected files for {} are done, adding new selected file {} from compaction", + tablet.getExtent(), newDatafile.orElseThrow().getPath().getName()); + newSelectedFileSet.add(newDatafile.orElseThrow().insert()); + } else { + // TODO set to trace + LOG.debug( + "Not all selected files for {} are done, compaction produced no output so not adding to selected set.", + tablet.getExtent()); + } + + tabletMutator.putSelectedFiles( + new SelectedFiles(newSelectedFileSet, tablet.getSelectedFiles().initiallySelectedAll(), + tablet.getSelectedFiles().getFateId())); + } + } + + if (tablet.getLocation() != null) { + // add scan entries to prevent GC in case the hosted tablet is currently using the files for + // scan + ecm.getJobFiles().forEach(tabletMutator::putScan); + } + ecm.getJobFiles().forEach(tabletMutator::deleteFile); + tabletMutator.deleteExternalCompaction(ecid); + + if (newDatafile.isPresent()) { + tabletMutator.putFile(newDatafile.orElseThrow(), + new DataFileValue(stats.getFileSize(), stats.getEntriesWritten())); + } + } + + // ELASTICITY_TODO unit test this method + public static boolean canCommitCompaction(ExternalCompactionId ecid, + TabletMetadata tabletMetadata) { + + if (tabletMetadata == null) { + LOG.debug("Received completion notification for nonexistent tablet {}", ecid); + return false; + } + + var extent = tabletMetadata.getExtent(); + + if (tabletMetadata.getOperationId() != null) { + // split, merge, and delete tablet should delete the compaction entry in the tablet + LOG.debug("Received completion notification for tablet with active operation {} {} {}", ecid, + extent, tabletMetadata.getOperationId()); + return false; + } + + CompactionMetadata ecm = tabletMetadata.getExternalCompactions().get(ecid); + + if (ecm == null) { + LOG.debug("Received completion notification for unknown compaction {} {}", ecid, extent); + return false; + } + + if (ecm.getKind() == CompactionKind.USER || ecm.getKind() == CompactionKind.SELECTOR) { + if (tabletMetadata.getSelectedFiles() == null) { + // when the compaction is canceled, selected files are deleted + LOG.debug( + "Received completion notification for user compaction and tablet has no selected files {} {}", + ecid, extent); + return false; + } + + if (!ecm.getFateId().equals(tabletMetadata.getSelectedFiles().getFateId())) { + // maybe the compaction was cancled and another user compaction was started on the tablet. + LOG.debug( + "Received completion notification for user compaction where its fate txid did not match the tablets {} {} {} {}", + ecid, extent, ecm.getFateId(), tabletMetadata.getSelectedFiles().getFateId()); + } + + if (!tabletMetadata.getSelectedFiles().getFiles().containsAll(ecm.getJobFiles())) { + // this is not expected to happen + LOG.error("User compaction contained files not in the selected set {} {} {} {} {}", + tabletMetadata.getExtent(), ecid, ecm.getKind(), + Optional.ofNullable(tabletMetadata.getSelectedFiles()).map(SelectedFiles::getFiles), + ecm.getJobFiles()); + return false; + } + } + + if (!tabletMetadata.getFiles().containsAll(ecm.getJobFiles())) { + // this is not expected to happen + LOG.error("Compaction contained files not in the tablet files set {} {} {} {}", + tabletMetadata.getExtent(), ecid, tabletMetadata.getFiles(), ecm.getJobFiles()); + return false; + } + + return true; + } +} diff --cc server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/CleanUpBulkImport.java index c1849d25dd,562423750b..0210584cb5 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/CleanUpBulkImport.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/CleanUpBulkImport.java @@@ -18,27 -18,16 +18,24 @@@ */ package org.apache.accumulo.manager.tableOps.bulkVer2; - import static java.util.concurrent.TimeUnit.MILLISECONDS; - import static java.util.concurrent.TimeUnit.MINUTES; - import static java.util.concurrent.TimeUnit.SECONDS; - import java.io.IOException; ++import java.time.Duration; import java.util.Collections; +import java.util.Map; +import java.util.Optional; import org.apache.accumulo.core.Constants; -import org.apache.accumulo.core.fate.FateTxId; +import org.apache.accumulo.core.data.AbstractId; +import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.fate.Repo; import org.apache.accumulo.core.gc.ReferenceFile; -import org.apache.accumulo.core.manager.state.tables.TableState; import org.apache.accumulo.core.manager.thrift.BulkImportState; import org.apache.accumulo.core.metadata.schema.Ample; +import org.apache.accumulo.core.metadata.schema.Ample.ConditionalResult.Status; +import org.apache.accumulo.core.metadata.schema.TabletMetadata; +import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType; +import org.apache.accumulo.core.util.Retry; import org.apache.accumulo.manager.Manager; import org.apache.accumulo.manager.tableOps.ManagerRepo; import org.apache.accumulo.manager.tableOps.Utils; @@@ -93,56 -89,4 +90,56 @@@ public class CleanUpBulkImport extends manager.removeBulkImportStatus(info.sourceDir); return null; } + + private static void removeBulkLoadEntries(Ample ample, TableId tableId, FateId fateId, + Text firstSplit, Text lastSplit) { + - Retry retry = Retry.builder().infiniteRetries().retryAfter(100, MILLISECONDS) - .incrementBy(100, MILLISECONDS).maxWait(1, SECONDS).backOffFactor(1.5) - .logInterval(3, MINUTES).createRetry(); ++ Retry retry = Retry.builder().infiniteRetries().retryAfter(Duration.ofMillis(100)) ++ .incrementBy(Duration.ofMillis(100)).maxWait(Duration.ofSeconds(1)).backOffFactor(1.5) ++ .logInterval(Duration.ofMinutes(3)).createRetry(); + + while (true) { + try ( + var tablets = ample.readTablets().forTable(tableId).overlapping(firstSplit, lastSplit) + .checkConsistency().fetch(ColumnType.PREV_ROW, ColumnType.LOADED).build(); + var tabletsMutator = ample.conditionallyMutateTablets()) { + + for (var tablet : tablets) { + if (tablet.getLoaded().values().stream() + .anyMatch(loadedFateId -> loadedFateId.equals(fateId))) { + var tabletMutator = + tabletsMutator.mutateTablet(tablet.getExtent()).requireAbsentOperation(); + tablet.getLoaded().entrySet().stream().filter(entry -> entry.getValue().equals(fateId)) + .map(Map.Entry::getKey).forEach(tabletMutator::deleteBulkFile); + tabletMutator.submit(tm -> false); + } + } + + var results = tabletsMutator.process(); + + if (results.values().stream() + .anyMatch(condResult -> condResult.getStatus() != Status.ACCEPTED)) { + + results.forEach((extent, condResult) -> { + if (condResult.getStatus() != Status.ACCEPTED) { + var metadata = Optional.ofNullable(condResult.readMetadata()); + log.debug("Tablet update failed {} {} {} {} ", fateId, extent, condResult.getStatus(), + metadata.map(TabletMetadata::getOperationId).map(AbstractId::toString) + .orElse("tablet is gone")); + } + }); + + try { + retry.waitForNextAttempt(log, + String.format("%s tableId:%s conditional mutations to delete load markers failed.", + fateId, tableId)); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } else { + break; + } + } + } + } } diff --cc server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/TabletRefresher.java index a4f7928060,0000000000..d2bc8c9b91 mode 100644,000000..100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/TabletRefresher.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/TabletRefresher.java @@@ -1,192 -1,0 +1,190 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.manager.tableOps.bulkVer2; + - import static java.util.concurrent.TimeUnit.MILLISECONDS; - import static java.util.concurrent.TimeUnit.MINUTES; - import static java.util.concurrent.TimeUnit.SECONDS; +import static java.util.stream.Collectors.groupingBy; +import static java.util.stream.Collectors.mapping; +import static java.util.stream.Collectors.toList; + ++import java.time.Duration; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.function.Predicate; +import java.util.function.Supplier; + +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent; +import org.apache.accumulo.core.fate.FateId; +import org.apache.accumulo.core.metadata.TServerInstance; +import org.apache.accumulo.core.metadata.schema.TabletMetadata; +import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType; +import org.apache.accumulo.core.rpc.ThriftUtil; +import org.apache.accumulo.core.rpc.clients.ThriftClientTypes; +import org.apache.accumulo.core.tabletserver.thrift.TabletServerClientService; +import org.apache.accumulo.core.trace.TraceUtil; +import org.apache.accumulo.core.util.Retry; +import org.apache.accumulo.server.ServerContext; +import org.apache.thrift.TException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Iterators; + +public class TabletRefresher { + + private static final Logger log = LoggerFactory.getLogger(TabletRefresher.class); + + public static void refresh(ServerContext context, + Supplier<Set<TServerInstance>> onlineTserversSupplier, FateId fateId, TableId tableId, + byte[] startRow, byte[] endRow, Predicate<TabletMetadata> needsRefresh) { + + // ELASTICITY_TODO should this thread pool be configurable? + ThreadPoolExecutor threadPool = + context.threadPools().createFixedThreadPool(10, "Tablet refresh " + fateId, false); + + try (var tablets = context.getAmple().readTablets().forTable(tableId) + .overlapping(startRow, endRow).checkConsistency() + .fetch(ColumnType.LOADED, ColumnType.LOCATION, ColumnType.PREV_ROW).build()) { + + // Find all tablets that need to refresh their metadata. There may be some tablets that were + // hosted after the tablet files were updated, it just results in an unneeded refresh + // request. There may also be tablets that had a location when the files were set but do not + // have a location now, that is ok the next time that tablet loads somewhere it will see the + // files. + + var tabletIterator = + tablets.stream().filter(tabletMetadata -> tabletMetadata.getLocation() != null) + .filter(needsRefresh).iterator(); + + // avoid reading all tablets into memory and instead process batches of 1000 tablets at a time + Iterators.partition(tabletIterator, 1000).forEachRemaining(batch -> { + var refreshesNeeded = batch.stream().collect(groupingBy(TabletMetadata::getLocation, + mapping(tabletMetadata -> tabletMetadata.getExtent().toThrift(), toList()))); + + refreshTablets(threadPool, fateId.canonical(), context, onlineTserversSupplier, + refreshesNeeded); + }); + + } finally { + threadPool.shutdownNow(); + } + + } + + public static void refreshTablets(ExecutorService threadPool, String logId, ServerContext context, + Supplier<Set<TServerInstance>> onlineTserversSupplier, + Map<TabletMetadata.Location,List<TKeyExtent>> refreshesNeeded) { + + // make a copy as it will be mutated in this method + refreshesNeeded = new HashMap<>(refreshesNeeded); + - Retry retry = Retry.builder().infiniteRetries().retryAfter(100, MILLISECONDS) - .incrementBy(100, MILLISECONDS).maxWait(1, SECONDS).backOffFactor(1.5) - .logInterval(3, MINUTES).createRetry(); ++ Retry retry = Retry.builder().infiniteRetries().retryAfter(Duration.ofMillis(100)) ++ .incrementBy(Duration.ofMillis(100)).maxWait(Duration.ofSeconds(1)).backOffFactor(1.5) ++ .logInterval(Duration.ofMinutes(3)).createRetry(); + + while (!refreshesNeeded.isEmpty()) { + + Map<TabletMetadata.Location,Future<List<TKeyExtent>>> futures = new HashMap<>(); + + for (Map.Entry<TabletMetadata.Location,List<TKeyExtent>> entry : refreshesNeeded.entrySet()) { + + // Ask tablet server to reload the metadata for these tablets. The tablet server returns + // the list of extents it was hosting but was unable to refresh (the tablets could be in + // the process of loading). If it is not currently hosting the tablet it treats that as + // refreshed and does not return anything for it. + Future<List<TKeyExtent>> future = threadPool + .submit(() -> sendSyncRefreshRequest(context, logId, entry.getKey(), entry.getValue())); + + futures.put(entry.getKey(), future); + } + + for (Map.Entry<TabletMetadata.Location,Future<List<TKeyExtent>>> entry : futures.entrySet()) { + TabletMetadata.Location location = entry.getKey(); + Future<List<TKeyExtent>> future = entry.getValue(); + + List<TKeyExtent> nonRefreshedExtents = null; + try { + nonRefreshedExtents = future.get(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + if (nonRefreshedExtents.isEmpty()) { + // tablet server was able to refresh everything, so remove that location + refreshesNeeded.remove(location); + } else { + // tablet server could not refresh some tablets, try them again later. + refreshesNeeded.put(location, nonRefreshedExtents); + } + } + + // look for any tservers that have died since we read the metadata table and remove them + if (!refreshesNeeded.isEmpty()) { + Set<TServerInstance> liveTservers = onlineTserversSupplier.get(); + + refreshesNeeded.keySet() + .removeIf(location -> !liveTservers.contains(location.getServerInstance())); + } + + if (!refreshesNeeded.isEmpty()) { + try { + retry.waitForNextAttempt(log, logId + " waiting for " + refreshesNeeded.size() + + " tservers to refresh their tablets metadata"); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + } + } + + private static List<TKeyExtent> sendSyncRefreshRequest(ServerContext context, String logId, + TabletMetadata.Location location, List<TKeyExtent> refreshes) { + TabletServerClientService.Client client = null; + try { + log.trace("{} sending refresh request to {} for {} extents", logId, location, + refreshes.size()); + var timeInMillis = context.getConfiguration().getTimeInMillis(Property.MANAGER_BULK_TIMEOUT); + client = ThriftUtil.getClient(ThriftClientTypes.TABLET_SERVER, location.getHostAndPort(), + context, timeInMillis); + + var unrefreshed = client.refreshTablets(TraceUtil.traceInfo(), context.rpcCreds(), refreshes); + + log.trace("{} refresh request to {} returned {} unrefreshed extents", logId, location, + unrefreshed.size()); + + return unrefreshed; + } catch (TException ex) { + log.debug("rpc failed server: " + location + ", " + logId + " " + ex.getMessage(), ex); + + // ELASTICITY_TODO are there any other exceptions we should catch in this method and check if + // the tserver is till alive? + + // something went wrong w/ RPC return all extents as unrefreshed + return refreshes; + } finally { + ThriftUtil.returnClient(client, context); + } + } +} diff --cc server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java index 9fee825488,992eba99be..b3f373cb16 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java @@@ -18,24 -18,12 +18,22 @@@ */ package org.apache.accumulo.manager.tableOps.compact; - import static java.util.concurrent.TimeUnit.MILLISECONDS; - import static java.util.concurrent.TimeUnit.MINUTES; - import static java.util.concurrent.TimeUnit.SECONDS; -import static java.nio.charset.StandardCharsets.UTF_8; -import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.COMPACT_ID; -import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOCATION; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.COMPACTED; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.ECOMP; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.FILES; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.OPID; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.PREV_ROW; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.SELECTED; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.USER_COMPACTION_REQUESTED; -import org.apache.accumulo.core.Constants; ++import java.time.Duration; +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; +import java.util.function.Predicate; +import java.util.stream.Collectors; + +import org.apache.accumulo.core.client.admin.CompactionConfig; import org.apache.accumulo.core.clientImpl.AcceptableThriftTableOperationException; import org.apache.accumulo.core.clientImpl.TableOperationsImpl; import org.apache.accumulo.core.clientImpl.thrift.TableOperation; @@@ -309,77 -171,8 +307,77 @@@ class CompactionDriver extends ManagerR } @Override - public void undo(long tid, Manager environment) { + public void undo(FateId fateId, Manager env) throws Exception { + cleanupTabletMetadata(fateId, env); + + // For any compactions that may have happened before this operation failed, attempt to refresh + // tablets. + TabletRefresher.refresh(env.getContext(), env::onlineTabletServers, fateId, tableId, startRow, + endRow, tabletMetadata -> true); + } + /** + * Cleans up any tablet metadata that may have been added as part of this compaction operation. + */ + private void cleanupTabletMetadata(FateId fateId, Manager manager) throws Exception { + var ample = manager.getContext().getAmple(); + + // ELASTICITY_TODO use existing compaction logging + + boolean allCleanedUp = false; + - Retry retry = Retry.builder().infiniteRetries().retryAfter(100, MILLISECONDS) - .incrementBy(100, MILLISECONDS).maxWait(1, SECONDS).backOffFactor(1.5) - .logInterval(3, MINUTES).createRetry(); ++ Retry retry = Retry.builder().infiniteRetries().retryAfter(Duration.ofMillis(100)) ++ .incrementBy(Duration.ofMillis(100)).maxWait(Duration.ofSeconds(1)).backOffFactor(1.5) ++ .logInterval(Duration.ofMinutes(3)).createRetry(); + + while (!allCleanedUp) { + + AtomicLong rejectedCount = new AtomicLong(0); + Consumer<Ample.ConditionalResult> resultConsumer = result -> { + if (result.getStatus() == Status.REJECTED) { + log.debug("{} update for {} was rejected ", fateId, result.getExtent()); + rejectedCount.incrementAndGet(); + } + }; + + try (var tablets = ample.readTablets().forTable(tableId).overlapping(startRow, endRow) + .fetch(PREV_ROW, COMPACTED, SELECTED, USER_COMPACTION_REQUESTED).checkConsistency() + .build(); var tabletsMutator = ample.conditionallyMutateTablets(resultConsumer)) { + Predicate<TabletMetadata> needsUpdate = + tabletMetadata -> (tabletMetadata.getSelectedFiles() != null + && tabletMetadata.getSelectedFiles().getFateId().equals(fateId)) + || tabletMetadata.getCompacted().contains(fateId) + || tabletMetadata.getUserCompactionsRequested().contains(fateId); + Predicate<TabletMetadata> needsNoUpdate = needsUpdate.negate(); + + for (TabletMetadata tablet : tablets) { + + if (needsUpdate.test(tablet)) { + var mutator = tabletsMutator.mutateTablet(tablet.getExtent()).requireAbsentOperation() + .requireSame(tablet, COMPACTED, SELECTED); + if (tablet.getSelectedFiles() != null + && tablet.getSelectedFiles().getFateId().equals(fateId)) { + mutator.deleteSelectedFiles(); + } + + if (tablet.getCompacted().contains(fateId)) { + mutator.deleteCompacted(fateId); + } + if (tablet.getUserCompactionsRequested().contains(fateId)) { + mutator.deleteUserCompactionRequested(fateId); + } + + mutator.submit(needsNoUpdate::test); + } + } + } + + allCleanedUp = rejectedCount.get() == 0; + + if (!allCleanedUp) { + retry.waitForNextAttempt(log, "Cleanup metadata for failed compaction " + fateId); + } + } } } diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java index 0de617455c,b9c9a39359..63fcbd4c5b --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java @@@ -28,6 -32,8 +28,7 @@@ import java.io.IOException import java.lang.management.ManagementFactory; import java.lang.reflect.InvocationTargetException; import java.net.UnknownHostException; + import java.time.Duration; -import java.time.Instant; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection;