This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit 6f908863f571bcb76dd77e0576ec1bd135cd0284 Merge: e7d789dc79 6dc1bcfa55 Author: Keith Turner <ktur...@apache.org> AuthorDate: Thu Jan 4 21:40:30 2024 -0500 Merge branch 'main' into elasticity .../core/client/admin/InstanceOperations.java | 10 + .../admin/compaction/CompactionConfigurer.java | 23 +++ .../core/clientImpl/InstanceOperationsImpl.java | 7 + .../org/apache/accumulo/core/conf/Property.java | 8 +- .../accumulo/core/fate/AbstractFateStore.java | 14 +- .../org/apache/accumulo/core/fate/AdminUtil.java | 5 +- .../org/apache/accumulo/core/fate/AgeOffStore.java | 5 +- .../java/org/apache/accumulo/core/fate/Fate.java | 13 +- .../org/apache/accumulo/core/fate/FateStore.java | 3 +- .../accumulo/core/fate/WrappedFateTxStore.java | 5 +- .../core/file/rfile/bcfile/PrintBCInfo.java | 14 +- .../spi/compaction/DefaultCompactionPlanner.java | 117 +++++++++++- .../ShellCompactCommandConfigurerTest.java | 5 + .../apache/accumulo/core/fate/AgeOffStoreTest.java | 17 +- .../org/apache/accumulo/core/fate/TestStore.java | 3 +- .../compaction/DefaultCompactionPlannerTest.java | 202 ++++++++++++++++++++- .../server/compaction/CompactionPluginUtils.java | 16 +- .../coordinator/CompactionCoordinator.java | 13 +- .../accumulo/test/conf/PropStoreConfigIT.java | 41 +++++ .../accumulo/test/functional/CompactionIT.java | 192 ++++++++++++++++++++ 20 files changed, 666 insertions(+), 47 deletions(-) diff --cc core/src/main/java/org/apache/accumulo/core/conf/Property.java index b1bc08b2a4,ba0437a9fe..e568dd44d4 --- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java @@@ -957,16 -965,13 +957,20 @@@ public enum Property + " adjusting this property you may want to consider adjusting" + " table.compaction.major.ratio also. Setting this property to 0 will make" + " it default to tserver.scan.files.open.max-1, this will prevent a tablet" - + " from having more RFiles than can be opened. Setting this property low may" - + " throttle ingest and increase query performance.", + + " from having more RFiles than can be opened. Prior to 2.1.0 this property" + + " was used to trigger merging minor compactions, but merging minor compactions" + + " were removed in 2.1.0. Now this property is only used by the" + + " DefaultCompactionStrategy and the DefaultCompactionPlanner." + + " The DefaultCompactionPlanner started using this property in 2.1.3, before" + + " that it did not use the property.", "1.4.0"), + TABLE_MERGE_FILE_MAX("table.merge.file.max", "10000", PropertyType.COUNT, + "The maximum number of files that a merge operation will process. Before " + + "merging a sum of the number of files in the merge range is computed and if it " + + "exceeds this configuration then the merge will error and fail. For example if " + + "there are 100 tablets each having 10 files in the merge range, then the sum would " + + "be 1000 and the merge will only proceed if this property is greater than 1000.", + "4.0.0"), TABLE_FILE_SUMMARY_MAX_SIZE("table.file.summary.maxSize", "256k", PropertyType.BYTES, "The maximum size summary that will be stored. The number of RFiles that" + " had summary data exceeding this threshold is reported by" diff --cc core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java index 874b58d8c6,0000000000..8c18fd378a mode 100644,000000..100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java @@@ -1,321 -1,0 +1,325 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.core.fate; + +import static java.nio.charset.StandardCharsets.UTF_8; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import java.io.UncheckedIOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; ++import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.accumulo.core.fate.Fate.TxInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; + +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; + +public abstract class AbstractFateStore<T> implements FateStore<T> { + + private static final Logger log = LoggerFactory.getLogger(AbstractFateStore.class); + + protected final Set<Long> reserved; + protected final Map<Long,Long> deferred; + + // This is incremented each time a transaction was unreserved that was non new + protected final SignalCount unreservedNonNewCount = new SignalCount(); + + // This is incremented each time a transaction is unreserved that was runnable + protected final SignalCount unreservedRunnableCount = new SignalCount(); + + public AbstractFateStore() { + this.reserved = new HashSet<>(); + this.deferred = new HashMap<>(); + } + + public static byte[] serialize(Object o) { + try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); + ObjectOutputStream oos = new ObjectOutputStream(baos)) { + oos.writeObject(o); + return baos.toByteArray(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + @SuppressFBWarnings(value = "OBJECT_DESERIALIZATION", + justification = "unsafe to store arbitrary serialized objects like this, but needed for now" + + " for backwards compatibility") + public static Object deserialize(byte[] ser) { + try (ByteArrayInputStream bais = new ByteArrayInputStream(ser); + ObjectInputStream ois = new ObjectInputStream(bais)) { + return ois.readObject(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } catch (ReflectiveOperationException e) { + throw new IllegalStateException(e); + } + } + + /** + * Attempt to reserve transaction + * + * @param tid transaction id + * @return An Optional containing the FateTxStore if the transaction was successfully reserved, or + * an empty Optional if the transaction was already reserved. + */ + @Override + public Optional<FateTxStore<T>> tryReserve(long tid) { + synchronized (this) { + if (!reserved.contains(tid)) { + return Optional.of(reserve(tid)); + } + return Optional.empty(); + } + } + + @Override + public FateTxStore<T> reserve(long tid) { + synchronized (AbstractFateStore.this) { + while (reserved.contains(tid)) { + try { + AbstractFateStore.this.wait(100); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IllegalStateException(e); + } + } + + reserved.add(tid); + return newFateTxStore(tid, true); + } + } + + @Override + public Iterator<Long> runnable(AtomicBoolean keepWaiting) { + + while (keepWaiting.get()) { + ArrayList<Long> runnableTids = new ArrayList<>(); + + final long beforeCount = unreservedRunnableCount.getCount(); + + List<String> transactions = getTransactions(); + for (String txidStr : transactions) { + long txid = parseTid(txidStr); + if (isRunnable(_getStatus(txid))) { + runnableTids.add(txid); + } + } + + synchronized (this) { + runnableTids.removeIf(txid -> { + var deferredTime = deferred.get(txid); + if (deferredTime != null) { - if (deferredTime >= System.currentTimeMillis()) { ++ if ((deferredTime - System.nanoTime()) > 0) { + return true; + } else { + deferred.remove(txid); + } + } + + return reserved.contains(txid); + }); + } + + if (runnableTids.isEmpty()) { + if (beforeCount == unreservedRunnableCount.getCount()) { + long waitTime = 5000; + if (!deferred.isEmpty()) { - Long minTime = Collections.min(deferred.values()); - waitTime = minTime - System.currentTimeMillis(); ++ long currTime = System.nanoTime(); ++ long minWait = ++ deferred.values().stream().mapToLong(l -> l - currTime).min().getAsLong(); ++ waitTime = TimeUnit.MILLISECONDS.convert(minWait, TimeUnit.NANOSECONDS); + } + + if (waitTime > 0) { + unreservedRunnableCount.waitFor(count -> count != beforeCount, waitTime, + keepWaiting::get); + } + } + } else { + return runnableTids.iterator(); + } + + } + + return Collections.emptyIterator(); + } + + @Override + public List<Long> list() { + ArrayList<Long> l = new ArrayList<>(); + List<String> transactions = getTransactions(); + for (String txid : transactions) { + l.add(parseTid(txid)); + } + return l; + } + + @Override + public ReadOnlyFateTxStore<T> read(long tid) { + return newFateTxStore(tid, false); + } + + protected boolean isRunnable(TStatus status) { + return status == TStatus.IN_PROGRESS || status == TStatus.FAILED_IN_PROGRESS + || status == TStatus.SUBMITTED; + } + + protected long parseTid(String txdir) { + return Long.parseLong(txdir.split("_")[1], 16); + } + + protected abstract List<String> getTransactions(); + + protected abstract TStatus _getStatus(long tid); + + protected abstract FateTxStore<T> newFateTxStore(long tid, boolean isReserved); + + protected abstract class AbstractFateTxStoreImpl<T> implements FateTxStore<T> { + protected final long tid; + protected final boolean isReserved; + + protected TStatus observedStatus = null; + + protected AbstractFateTxStoreImpl(long tid, boolean isReserved) { + this.tid = tid; + this.isReserved = isReserved; + } + + @Override + public TStatus waitForStatusChange(EnumSet<TStatus> expected) { + Preconditions.checkState(!isReserved, + "Attempted to wait for status change while reserved " + FateTxId.formatTid(getID())); + while (true) { + + long countBefore = unreservedNonNewCount.getCount(); + + TStatus status = _getStatus(tid); + if (expected.contains(status)) { + return status; + } + + unreservedNonNewCount.waitFor(count -> count != countBefore, 1000, () -> true); + } + } + + @Override - public void unreserve(long deferTime) { ++ public void unreserve(long deferTime, TimeUnit timeUnit) { ++ deferTime = TimeUnit.NANOSECONDS.convert(deferTime, timeUnit); + + if (deferTime < 0) { + throw new IllegalArgumentException("deferTime < 0 : " + deferTime); + } + + synchronized (AbstractFateStore.this) { + if (!reserved.remove(tid)) { + throw new IllegalStateException( + "Tried to unreserve id that was not reserved " + FateTxId.formatTid(tid)); + } + + // notify any threads waiting to reserve + AbstractFateStore.this.notifyAll(); + + if (deferTime > 0) { - deferred.put(tid, System.currentTimeMillis() + deferTime); ++ deferred.put(tid, System.nanoTime() + deferTime); + } + } + + if (observedStatus != null && isRunnable(observedStatus)) { + unreservedRunnableCount.increment(); + } + + if (observedStatus != TStatus.NEW) { + unreservedNonNewCount.increment(); + } + } + + protected void verifyReserved(boolean isWrite) { + if (!isReserved && isWrite) { + throw new IllegalStateException("Attempted write on unreserved FATE transaction."); + } + + if (isReserved) { + synchronized (AbstractFateStore.this) { + if (!reserved.contains(tid)) { + throw new IllegalStateException( + "Tried to operate on unreserved transaction " + FateTxId.formatTid(tid)); + } + } + } + } + + @Override + public TStatus getStatus() { + verifyReserved(false); + var status = _getStatus(tid); + observedStatus = status; + return status; + } + + @Override + public long getID() { + return tid; + } + + protected byte[] serializeTxInfo(Serializable so) { + if (so instanceof String) { + return ("S " + so).getBytes(UTF_8); + } else { + byte[] sera = serialize(so); + byte[] data = new byte[sera.length + 2]; + System.arraycopy(sera, 0, data, 2, sera.length); + data[0] = 'O'; + data[1] = ' '; + return data; + } + } + + protected Serializable deserializeTxInfo(TxInfo txInfo, byte[] data) { + if (data[0] == 'O') { + byte[] sera = new byte[data.length - 2]; + System.arraycopy(data, 2, sera, 0, sera.length); + return (Serializable) deserialize(sera); + } else if (data[0] == 'S') { + return new String(data, 2, data.length - 2, UTF_8); + } else { + throw new IllegalStateException("Bad node data " + txInfo); + } + } + + } +} diff --cc core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java index 95ef99448f,858e6e6998..6a64014f01 --- a/core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java @@@ -32,10 -32,9 +32,11 @@@ import java.util.List import java.util.Map; import java.util.Map.Entry; import java.util.Set; + import java.util.concurrent.TimeUnit; -import org.apache.accumulo.core.fate.ReadOnlyTStore.TStatus; +import org.apache.accumulo.core.fate.FateStore.FateTxStore; +import org.apache.accumulo.core.fate.ReadOnlyFateStore.ReadOnlyFateTxStore; +import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus; import org.apache.accumulo.core.fate.zookeeper.FateLock; import org.apache.accumulo.core.fate.zookeeper.FateLock.FateLockPath; import org.apache.accumulo.core.fate.zookeeper.ZooReader; @@@ -431,28 -432,26 +432,28 @@@ public class AdminUtil<T> return false; } boolean state = false; - zs.reserve(txid); - TStatus ts = zs.getStatus(txid); - switch (ts) { - case UNKNOWN: - System.out.printf("Invalid transaction ID: %016x%n", txid); - break; - - case SUBMITTED: - case IN_PROGRESS: - case NEW: - case FAILED: - case FAILED_IN_PROGRESS: - case SUCCESSFUL: - System.out.printf("Deleting transaction: %016x (%s)%n", txid, ts); - zs.delete(txid); - state = true; - break; + FateTxStore<T> txStore = zs.reserve(txid); + try { + TStatus ts = txStore.getStatus(); + switch (ts) { + case UNKNOWN: + System.out.printf("Invalid transaction ID: %016x%n", txid); + break; + + case SUBMITTED: + case IN_PROGRESS: + case NEW: + case FAILED: + case FAILED_IN_PROGRESS: + case SUCCESSFUL: + System.out.printf("Deleting transaction: %016x (%s)%n", txid, ts); + txStore.delete(); + state = true; + break; + } + } finally { - txStore.unreserve(0); ++ txStore.unreserve(0, TimeUnit.MILLISECONDS); } - - zs.unreserve(txid, 0, TimeUnit.MILLISECONDS); return state; } @@@ -470,36 -469,33 +471,36 @@@ return false; } boolean state = false; - zs.reserve(txid); - TStatus ts = zs.getStatus(txid); - switch (ts) { - case UNKNOWN: - System.out.printf("Invalid transaction ID: %016x%n", txid); - break; - - case SUBMITTED: - case IN_PROGRESS: - case NEW: - System.out.printf("Failing transaction: %016x (%s)%n", txid, ts); - zs.setStatus(txid, TStatus.FAILED_IN_PROGRESS); - state = true; - break; - - case SUCCESSFUL: - System.out.printf("Transaction already completed: %016x (%s)%n", txid, ts); - break; - - case FAILED: - case FAILED_IN_PROGRESS: - System.out.printf("Transaction already failed: %016x (%s)%n", txid, ts); - state = true; - break; + FateTxStore<T> txStore = zs.reserve(txid); + try { + TStatus ts = txStore.getStatus(); + switch (ts) { + case UNKNOWN: + System.out.printf("Invalid transaction ID: %016x%n", txid); + break; + + case SUBMITTED: + case IN_PROGRESS: + case NEW: + System.out.printf("Failing transaction: %016x (%s)%n", txid, ts); + txStore.setStatus(TStatus.FAILED_IN_PROGRESS); + state = true; + break; + + case SUCCESSFUL: + System.out.printf("Transaction already completed: %016x (%s)%n", txid, ts); + break; + + case FAILED: + case FAILED_IN_PROGRESS: + System.out.printf("Transaction already failed: %016x (%s)%n", txid, ts); + state = true; + break; + } + } finally { - txStore.unreserve(0); ++ txStore.unreserve(0, TimeUnit.MILLISECONDS); } - zs.unreserve(txid, 0, TimeUnit.MILLISECONDS); return state; } diff --cc core/src/main/java/org/apache/accumulo/core/fate/AgeOffStore.java index f61c06028c,ca016d0c9c..080ff0d33d --- a/core/src/main/java/org/apache/accumulo/core/fate/AgeOffStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/AgeOffStore.java @@@ -24,8 -25,7 +24,9 @@@ import java.util.Iterator import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Optional; + import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@@ -108,7 -108,7 +109,7 @@@ public class AgeOffStore<T> implements } } finally { - txStore.unreserve(0); - store.unreserve(txid, 0, TimeUnit.MILLISECONDS); ++ txStore.unreserve(0, TimeUnit.MILLISECONDS); } } catch (Exception e) { log.warn("Failed to age off FATE tx " + FateTxId.formatTid(txid), e); @@@ -138,7 -138,7 +139,7 @@@ break; } } finally { - txStore.unreserve(0); - store.unreserve(txid, 0, TimeUnit.MILLISECONDS); ++ txStore.unreserve(0, TimeUnit.MILLISECONDS); } } } diff --cc core/src/main/java/org/apache/accumulo/core/fate/Fate.java index a54ad734ee,1a14418b1a..4b54ccc771 --- a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java @@@ -38,7 -35,7 +38,8 @@@ import java.util.concurrent.LinkedTrans import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor; + import java.util.concurrent.TimeUnit; +import java.util.concurrent.TransferQueue; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; @@@ -203,8 -132,8 +204,8 @@@ public class Fate<T> } catch (Exception e) { runnerLog.error("Uncaught exception in FATE runner thread.", e); } finally { - if (tid != null) { - store.unreserve(tid, deferTime, TimeUnit.MILLISECONDS); + if (txStore != null) { - txStore.unreserve(deferTime); ++ txStore.unreserve(deferTime, TimeUnit.MILLISECONDS); } } } @@@ -356,15 -281,15 +357,15 @@@ } if (autoCleanUp) { - store.setTransactionInfo(tid, TxInfo.AUTO_CLEAN, autoCleanUp); + txStore.setTransactionInfo(TxInfo.AUTO_CLEAN, autoCleanUp); } - store.setTransactionInfo(tid, TxInfo.TX_NAME, txName); + txStore.setTransactionInfo(TxInfo.TX_NAME, txName); - store.setStatus(tid, SUBMITTED); + txStore.setStatus(SUBMITTED); } } finally { - txStore.unreserve(0); - store.unreserve(tid, 0, TimeUnit.MILLISECONDS); ++ txStore.unreserve(0, TimeUnit.MILLISECONDS); } } @@@ -402,7 -325,7 +403,7 @@@ return false; } } finally { - txStore.unreserve(0); - store.unreserve(tid, 0, TimeUnit.MILLISECONDS); ++ txStore.unreserve(0, TimeUnit.MILLISECONDS); } } else { // reserved, lets retry. @@@ -433,7 -356,7 +434,7 @@@ break; } } finally { - txStore.unreserve(0); - store.unreserve(tid, 0, TimeUnit.MILLISECONDS); ++ txStore.unreserve(0, TimeUnit.MILLISECONDS); } } @@@ -444,9 -367,9 +445,9 @@@ throw new IllegalStateException("Tried to get exception when transaction " + FateTxId.formatTid(tid) + " not in successful state"); } - return (String) store.getTransactionInfo(tid, TxInfo.RETURN_VALUE); + return (String) txStore.getTransactionInfo(TxInfo.RETURN_VALUE); } finally { - txStore.unreserve(0); - store.unreserve(tid, 0, TimeUnit.MILLISECONDS); ++ txStore.unreserve(0, TimeUnit.MILLISECONDS); } } @@@ -458,9 -381,9 +459,9 @@@ throw new IllegalStateException("Tried to get exception when transaction " + FateTxId.formatTid(tid) + " not in failed state"); } - return (Exception) store.getTransactionInfo(tid, TxInfo.EXCEPTION); + return (Exception) txStore.getTransactionInfo(TxInfo.EXCEPTION); } finally { - txStore.unreserve(0); - store.unreserve(tid, 0, TimeUnit.MILLISECONDS); ++ txStore.unreserve(0, TimeUnit.MILLISECONDS); } } diff --cc core/src/main/java/org/apache/accumulo/core/fate/FateStore.java index 7db5766e81,0000000000..81a0e3212f mode 100644,000000..100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java @@@ -1,110 -1,0 +1,111 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.core.fate; + +import java.io.Serializable; +import java.util.Optional; ++import java.util.concurrent.TimeUnit; + +/** + * Transaction Store: a place to save transactions + * + * A transaction consists of a number of operations. To use, first create a transaction id, and then + * seed the transaction with an initial operation. An executor service can then execute the + * transaction's operation, possibly pushing more operations onto the transaction as each step + * successfully completes. If a step fails, the stack can be unwound, undoing each operation. + */ +public interface FateStore<T> extends ReadOnlyFateStore<T> { + + /** + * Create a new transaction id + * + * @return a transaction id + */ + long create(); + + /** + * An interface that allows read/write access to the data related to a single fate operation. + */ + interface FateTxStore<T> extends ReadOnlyFateTxStore<T> { + @Override + Repo<T> top(); + + /** + * Update the given transaction with the next operation + * + * @param repo the operation + */ + void push(Repo<T> repo) throws StackOverflowException; + + /** + * Remove the last pushed operation from the given transaction. + */ + void pop(); + + /** + * Update the state of a given transaction + * + * @param status execution status + */ + void setStatus(TStatus status); + + /** + * Set transaction-specific information. + * + * @param txInfo name of attribute of a transaction to set. + * @param val transaction data to store + */ + void setTransactionInfo(Fate.TxInfo txInfo, Serializable val); + + /** + * Remove the transaction from the store. + * + */ + void delete(); + + /** + * Return the given transaction to the store. + * + * upon successful return the store now controls the referenced transaction id. caller should no + * longer interact with it. + * + * @param deferTime time in millis to keep this transaction from being returned by + * {@link #runnable(java.util.concurrent.atomic.AtomicBoolean)}. Must be non-negative. + */ - void unreserve(long deferTime); ++ void unreserve(long deferTime, TimeUnit timeUnit); + } + + /** + * Attempt to reserve transaction + * + * @param tid transaction id + * @return true if reserved by this call, false if already reserved + */ + Optional<FateTxStore<T>> tryReserve(long tid); + + /** + * Reserve the specific tid. + * + * Reserving a transaction id ensures that nothing else in-process interacting via the same + * instance will be operating on that transaction id. + * + */ + FateTxStore<T> reserve(long tid); + +} diff --cc core/src/main/java/org/apache/accumulo/core/fate/WrappedFateTxStore.java index 238f981a22,0000000000..1d8c7126c2 mode 100644,000000..100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/WrappedFateTxStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/WrappedFateTxStore.java @@@ -1,96 -1,0 +1,97 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.core.fate; + +import java.io.Serializable; +import java.util.EnumSet; +import java.util.List; ++import java.util.concurrent.TimeUnit; + +public class WrappedFateTxStore<T> implements FateStore.FateTxStore<T> { + protected final FateStore.FateTxStore<T> wrapped; + + public WrappedFateTxStore(FateStore.FateTxStore<T> wrapped) { + this.wrapped = wrapped; + } + + @Override - public void unreserve(long deferTime) { - wrapped.unreserve(deferTime); ++ public void unreserve(long deferTime, TimeUnit timeUnit) { ++ wrapped.unreserve(deferTime, timeUnit); + } + + @Override + public Repo<T> top() { + return wrapped.top(); + } + + @Override + public void push(Repo<T> repo) throws StackOverflowException { + wrapped.push(repo); + } + + @Override + public void pop() { + wrapped.pop(); + } + + @Override + public FateStore.TStatus getStatus() { + return wrapped.getStatus(); + } + + @Override + public void setStatus(FateStore.TStatus status) { + wrapped.setStatus(status); + } + + @Override + public FateStore.TStatus waitForStatusChange(EnumSet<FateStore.TStatus> expected) { + return wrapped.waitForStatusChange(expected); + } + + @Override + public void setTransactionInfo(Fate.TxInfo txInfo, Serializable val) { + wrapped.setTransactionInfo(txInfo, val); + } + + @Override + public Serializable getTransactionInfo(Fate.TxInfo txInfo) { + return wrapped.getTransactionInfo(txInfo); + } + + @Override + public void delete() { + wrapped.delete(); + } + + @Override + public long timeCreated() { + return wrapped.timeCreated(); + } + + @Override + public long getID() { + return wrapped.getID(); + } + + @Override + public List<ReadOnlyRepo<T>> getStack() { + return wrapped.getStack(); + } +} diff --cc core/src/main/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlanner.java index d3c3deb57a,eba25df062..dcbb78dfcd --- a/core/src/main/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlanner.java +++ b/core/src/main/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlanner.java @@@ -370,94 -329,157 +386,179 @@@ public class DefaultCompactionPlanner i Set<CompactableFile> filesCopy = new HashSet<>(params.getCandidates()); + FakeFileGenerator fakeFileGenerator = new FakeFileGenerator(); + long maxSizeToCompact = getMaxSizeToCompact(params.getKind()); - Collection<CompactableFile> group; - if (params.getRunningCompactions().isEmpty()) { - group = - findDataFilesToCompact(filesCopy, params.getRatio(), maxFilesToCompact, maxSizeToCompact); + // This set represents future files that will be produced by running compactions. If the optimal + // set of files to compact is computed and contains one of these files, then its optimal to wait + // for this compaction to finish. + Set<CompactableFile> expectedFiles = new HashSet<>(); + params.getRunningCompactions().stream().filter(job -> job.getKind() == params.getKind()) + .map(job -> getExpected(job.getFiles(), fakeFileGenerator)) + .forEach(compactableFile -> Preconditions.checkState(expectedFiles.add(compactableFile))); + Preconditions.checkState(Collections.disjoint(expectedFiles, filesCopy)); + filesCopy.addAll(expectedFiles); - if (!group.isEmpty() && group.size() < params.getCandidates().size() - && params.getCandidates().size() <= maxFilesToCompact - && (params.getKind() == CompactionKind.USER - || params.getKind() == CompactionKind.SELECTOR)) { - // USER and SELECTOR compactions must eventually compact all files. When a subset of files - // that meets the compaction ratio is selected, look ahead and see if the next compaction - // would also meet the compaction ratio. If not then compact everything to avoid doing - // more than logarithmic work across multiple comapctions. - - filesCopy.removeAll(group); - filesCopy.add(getExpected(group, 0)); - - if (findDataFilesToCompact(filesCopy, params.getRatio(), maxFilesToCompact, - maxSizeToCompact).isEmpty()) { - // The next possible compaction does not meet the compaction ratio, so compact - // everything. - group = Set.copyOf(params.getCandidates()); - } + List<Collection<CompactableFile>> compactionJobs = new ArrayList<>(); + while (true) { + var filesToCompact = + findDataFilesToCompact(filesCopy, params.getRatio(), maxFilesToCompact, maxSizeToCompact); + if (!Collections.disjoint(filesToCompact, expectedFiles)) { + // the optimal set of files to compact includes the output of a running compaction, so lets + // wait for that running compaction to finish. + break; } - } else if (params.getKind() == CompactionKind.SYSTEM) { - // This code determines if once the files compacting finish would they be included in a - // compaction with the files smaller than them? If so, then wait for the running compaction - // to complete. + if (filesToCompact.isEmpty()) { + break; + } - // The set of files running compactions may produce - var expectedFiles = getExpected(params.getRunningCompactions()); + filesCopy.removeAll(filesToCompact); - if (!Collections.disjoint(filesCopy, expectedFiles)) { - throw new AssertionError(); - } + // A compaction job will be created for these files, so lets add an expected file for that + // planned compaction job. Then if future iterations of this loop will include that file then + // they will not compact. + var expectedFile = getExpected(filesToCompact, fakeFileGenerator); + Preconditions.checkState(expectedFiles.add(expectedFile)); + Preconditions.checkState(filesCopy.add(expectedFile)); - filesCopy.addAll(expectedFiles); + compactionJobs.add(filesToCompact); - group = - findDataFilesToCompact(filesCopy, params.getRatio(), maxFilesToCompact, maxSizeToCompact); + if (filesToCompact.size() < maxFilesToCompact) { + // Only continue looking for more compaction jobs when a set of files is found equals + // maxFilesToCompact in size. When the files found is less than the max size its an + // indication that the compaction ratio was no longer met and therefore it would be + // suboptimal to look for more jobs because the smallest optimal set was found. + break; + } + } - if (!Collections.disjoint(group, expectedFiles)) { - // file produced by running compaction will eventually compact with existing files, so - // wait. - group = Set.of(); + if (compactionJobs.size() == 1 + && (params.getKind() == CompactionKind.USER || params.getKind() == CompactionKind.SELECTOR) + && compactionJobs.get(0).size() < params.getCandidates().size() + && compactionJobs.get(0).size() <= maxFilesToCompact) { + // USER and SELECTOR compactions must eventually compact all files. When a subset of files + // that meets the compaction ratio is selected, look ahead and see if the next compaction + // would also meet the compaction ratio. If not then compact everything to avoid doing + // more than logarithmic work across multiple comapctions. + + var group = compactionJobs.get(0); + var candidatesCopy = new HashSet<>(params.getCandidates()); + + candidatesCopy.removeAll(group); + Preconditions.checkState(candidatesCopy.add(getExpected(group, fakeFileGenerator))); + + if (findDataFilesToCompact(candidatesCopy, params.getRatio(), maxFilesToCompact, + maxSizeToCompact).isEmpty()) { + // The next possible compaction does not meet the compaction ratio, so compact + // everything. + compactionJobs.set(0, Set.copyOf(params.getCandidates())); } - } else { - group = Set.of(); } - if (compactionJobs.isEmpty() - && (params.getKind() == CompactionKind.USER || params.getKind() == CompactionKind.SELECTOR) - && params.getRunningCompactions().stream() - .noneMatch(job -> job.getKind() == params.getKind())) { - // These kinds of compaction require files to compact even if none of the files meet the - // compaction ratio. No files were found using the compaction ratio and no compactions are - // running, so force a compaction. - compactionJobs = findMaximalRequiredSetToCompact(params.getCandidates(), maxFilesToCompact); - if (group.isEmpty()) { ++ if (compactionJobs.isEmpty()) { + if ((params.getKind() == CompactionKind.USER || params.getKind() == CompactionKind.SELECTOR) + && params.getRunningCompactions().stream() + .noneMatch(job -> job.getKind() == params.getKind())) { - group = findMaximalRequiredSetToCompact(params.getCandidates(), maxFilesToCompact); ++ // These kinds of compaction require files to compact even if none of the files meet the ++ // compaction ratio. No files were found using the compaction ratio and no compactions are ++ // running, so force a compaction. ++ compactionJobs = findMaximalRequiredSetToCompact(params.getCandidates(), maxFilesToCompact); + } else if (params.getKind() == CompactionKind.SYSTEM + && params.getRunningCompactions().isEmpty() + && params.getAll().size() == params.getCandidates().size()) { + int maxTabletFiles = + getMaxTabletFiles(params.getServiceEnvironment().getConfiguration(params.getTableId())); + if (params.getAll().size() > maxTabletFiles) { + // The tablet is above its max files, there are no compactions running, all files are + // candidates for a system compaction, and no files were found to compact. Attempt to + // find a set of files to compact by lowering the compaction ratio. - group = findFilesToCompactWithLowerRatio(params, maxSizeToCompact, maxTabletFiles); ++ compactionJobs = ++ findFilesToCompactWithLowerRatio(params, maxSizeToCompact, maxTabletFiles); + } + } } - if (group.isEmpty()) { - return params.createPlanBuilder().build(); - } else { - // determine which executor to use based on the size of the files - var ceid = getExecutor(group); - - return params.createPlanBuilder().addJob(createPriority(params, group), ceid, group).build(); - } + var builder = params.createPlanBuilder(); + compactionJobs.forEach(jobFiles -> builder.addJob(createPriority(params, jobFiles), + getExecutor(jobFiles), jobFiles)); + return builder.build(); } + static int getMaxTabletFiles(ServiceEnvironment.Configuration configuration) { + int maxTabletFiles = Integer.parseInt(configuration.get(Property.TABLE_FILE_MAX.getKey())); + if (maxTabletFiles <= 0) { + maxTabletFiles = + Integer.parseInt(configuration.get(Property.TSERV_SCAN_MAX_OPENFILES.getKey())) - 1; + } + return maxTabletFiles; + } + + /** + * Searches for the highest compaction ratio that is less than the configured ratio that will + * lower the number of files. + */ - private Collection<CompactableFile> findFilesToCompactWithLowerRatio(PlanningParameters params, - long maxSizeToCompact, int maxTabletFiles) { ++ private List<Collection<CompactableFile>> findFilesToCompactWithLowerRatio( ++ PlanningParameters params, long maxSizeToCompact, int maxTabletFiles) { + double lowRatio = 1.0; + double highRatio = params.getRatio(); + + Preconditions.checkArgument(highRatio >= lowRatio); + + var candidates = Set.copyOf(params.getCandidates()); + Collection<CompactableFile> found = Set.of(); + + int goalCompactionSize = candidates.size() - maxTabletFiles + 1; + if (goalCompactionSize > maxFilesToCompact) { + // The tablet is way over max tablet files, so multiple compactions will be needed. Therefore, + // do not set a goal size for this compaction and find the largest compaction ratio that will + // compact some set of files. + goalCompactionSize = 0; + } + + // Do a binary search of the compaction ratios. + while (highRatio - lowRatio > .1) { + double ratioToCheck = (highRatio - lowRatio) / 2 + lowRatio; + + // This is continually resorting the list of files in the following call, could optimize this + var filesToCompact = + findDataFilesToCompact(candidates, ratioToCheck, maxFilesToCompact, maxSizeToCompact); + - log.trace("Tried ratio {} and found {} {} {}", ratioToCheck, filesToCompact, - filesToCompact.size() >= goalCompactionSize, goalCompactionSize); ++ log.info("Tried ratio {} and found {} {} {} {}", ratioToCheck, filesToCompact, ++ filesToCompact.size() >= goalCompactionSize, goalCompactionSize, maxFilesToCompact); + + if (filesToCompact.isEmpty() || filesToCompact.size() < goalCompactionSize) { + highRatio = ratioToCheck; + } else { + lowRatio = ratioToCheck; + found = filesToCompact; + } + } + + if (found.isEmpty() && lowRatio == 1.0) { + // in this case the data must be really skewed, operator intervention may be needed. + log.warn( + "Attempted to lower compaction ration from {} to {} for {} because there are {} files " + + "and the max tablet files is {}, however no set of files to compact were found.", + params.getRatio(), highRatio, params.getTableId(), params.getCandidates().size(), + maxTabletFiles); + } + + log.info( + "For {} found {} files to compact lowering compaction ratio from {} to {} because the tablet " + + "exceeded {} files, it had {}", + params.getTableId(), found.size(), params.getRatio(), lowRatio, maxTabletFiles, + params.getCandidates().size()); + - return found; ++ if (found.isEmpty()) { ++ return List.of(); ++ } else { ++ return List.of(found); ++ } + } + private static short createPriority(PlanningParameters params, Collection<CompactableFile> group) { return CompactionJobPrioritizer.createPriority(params.getKind(), params.getAll().size(), diff --cc core/src/test/java/org/apache/accumulo/core/fate/AgeOffStoreTest.java index d2530ce1f3,c2b086ee34..1e02bd2620 --- a/core/src/test/java/org/apache/accumulo/core/fate/AgeOffStoreTest.java +++ b/core/src/test/java/org/apache/accumulo/core/fate/AgeOffStoreTest.java @@@ -22,9 -22,10 +22,10 @@@ import static org.junit.jupiter.api.Ass import java.util.HashSet; import java.util.Set; + import java.util.concurrent.TimeUnit; import org.apache.accumulo.core.fate.AgeOffStore.TimeSource; -import org.apache.accumulo.core.fate.ReadOnlyTStore.TStatus; +import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus; import org.apache.zookeeper.KeeperException; import org.junit.jupiter.api.Test; @@@ -50,25 -51,25 +51,25 @@@ public class AgeOffStoreTest aoStore.ageOff(); long txid1 = aoStore.create(); - aoStore.reserve(txid1); - aoStore.setStatus(txid1, TStatus.IN_PROGRESS); - aoStore.unreserve(txid1, 0, TimeUnit.MILLISECONDS); + var txStore1 = aoStore.reserve(txid1); + txStore1.setStatus(TStatus.IN_PROGRESS); - txStore1.unreserve(0); ++ txStore1.unreserve(0, TimeUnit.MILLISECONDS); aoStore.ageOff(); long txid2 = aoStore.create(); - aoStore.reserve(txid2); - aoStore.setStatus(txid2, TStatus.IN_PROGRESS); - aoStore.setStatus(txid2, TStatus.FAILED); - aoStore.unreserve(txid2, 0, TimeUnit.MILLISECONDS); + var txStore2 = aoStore.reserve(txid2); + txStore2.setStatus(TStatus.IN_PROGRESS); + txStore2.setStatus(TStatus.FAILED); - txStore2.unreserve(0); ++ txStore2.unreserve(0, TimeUnit.MILLISECONDS); tts.time = 6; long txid3 = aoStore.create(); - aoStore.reserve(txid3); - aoStore.setStatus(txid3, TStatus.IN_PROGRESS); - aoStore.setStatus(txid3, TStatus.SUCCESSFUL); - aoStore.unreserve(txid3, 0, TimeUnit.MILLISECONDS); + var txStore3 = aoStore.reserve(txid3); + txStore3.setStatus(TStatus.IN_PROGRESS); + txStore3.setStatus(TStatus.SUCCESSFUL); - txStore3.unreserve(0); ++ txStore3.unreserve(0, TimeUnit.MILLISECONDS); Long txid4 = aoStore.create(); @@@ -99,21 -100,21 +100,21 @@@ TestTimeSource tts = new TestTimeSource(); TestStore testStore = new TestStore(); long txid1 = testStore.create(); - testStore.reserve(txid1); - testStore.setStatus(txid1, TStatus.IN_PROGRESS); - testStore.unreserve(txid1, 0, TimeUnit.MILLISECONDS); + var txStore1 = testStore.reserve(txid1); + txStore1.setStatus(TStatus.IN_PROGRESS); - txStore1.unreserve(0); ++ txStore1.unreserve(0, TimeUnit.MILLISECONDS); long txid2 = testStore.create(); - testStore.reserve(txid2); - testStore.setStatus(txid2, TStatus.IN_PROGRESS); - testStore.setStatus(txid2, TStatus.FAILED); - testStore.unreserve(txid2, 0, TimeUnit.MILLISECONDS); + var txStore2 = testStore.reserve(txid2); + txStore2.setStatus(TStatus.IN_PROGRESS); + txStore2.setStatus(TStatus.FAILED); - txStore2.unreserve(0); ++ txStore2.unreserve(0, TimeUnit.MILLISECONDS); long txid3 = testStore.create(); - testStore.reserve(txid3); - testStore.setStatus(txid3, TStatus.IN_PROGRESS); - testStore.setStatus(txid3, TStatus.SUCCESSFUL); - testStore.unreserve(txid3, 0, TimeUnit.MILLISECONDS); + var txStore3 = testStore.reserve(txid3); + txStore3.setStatus(TStatus.IN_PROGRESS); + txStore3.setStatus(TStatus.SUCCESSFUL); - txStore3.unreserve(0); ++ txStore3.unreserve(0, TimeUnit.MILLISECONDS); Long txid4 = testStore.create(); @@@ -134,9 -135,9 +135,9 @@@ assertEquals(Set.of(txid1), new HashSet<>(aoStore.list())); assertEquals(1, new HashSet<>(aoStore.list()).size()); - aoStore.reserve(txid1); - aoStore.setStatus(txid1, TStatus.FAILED_IN_PROGRESS); - aoStore.unreserve(txid1, 0, TimeUnit.MILLISECONDS); + txStore1 = aoStore.reserve(txid1); + txStore1.setStatus(TStatus.FAILED_IN_PROGRESS); - txStore1.unreserve(0); ++ txStore1.unreserve(0, TimeUnit.MILLISECONDS); tts.time = 30; @@@ -145,9 -146,9 +146,9 @@@ assertEquals(Set.of(txid1), new HashSet<>(aoStore.list())); assertEquals(1, new HashSet<>(aoStore.list()).size()); - aoStore.reserve(txid1); - aoStore.setStatus(txid1, TStatus.FAILED); - aoStore.unreserve(txid1, 0, TimeUnit.MILLISECONDS); + txStore1 = aoStore.reserve(txid1); + txStore1.setStatus(TStatus.FAILED); - txStore1.unreserve(0); ++ txStore1.unreserve(0, TimeUnit.MILLISECONDS); aoStore.ageOff(); diff --cc core/src/test/java/org/apache/accumulo/core/fate/TestStore.java index 058b0c50a4,3253c41a90..1dabfcf697 --- a/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java +++ b/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java @@@ -18,17 -18,13 +18,18 @@@ */ package org.apache.accumulo.core.fate; +import java.io.Serializable; import java.util.ArrayList; +import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; + import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; /** * Transient in memory store for transactions. @@@ -66,97 -61,35 +67,97 @@@ public class TestStore implements FateS } } - @Override - public void unreserve(long tid, long deferTime, TimeUnit deferTimeUnit) { - if (!reserved.remove(tid)) { - throw new IllegalStateException(); + private class TestFateTxStore implements FateTxStore<String> { + + private final long tid; + + TestFateTxStore(long tid) { + this.tid = tid; } - } - @Override - public org.apache.accumulo.core.fate.TStore.TStatus getStatus(long tid) { - if (!reserved.contains(tid)) { - throw new IllegalStateException(); + @Override + public Repo<String> top() { + throw new UnsupportedOperationException(); } - TStatus status = statuses.get(tid); - if (status == null) { - return TStatus.UNKNOWN; + @Override + public List<ReadOnlyRepo<String>> getStack() { + throw new UnsupportedOperationException(); } - return status; - } - @Override - public void setStatus(long tid, org.apache.accumulo.core.fate.TStore.TStatus status) { - if (!reserved.contains(tid)) { - throw new IllegalStateException(); + @Override + public TStatus getStatus() { + if (!reserved.contains(tid)) { + throw new IllegalStateException(); + } + + TStatus status = statuses.get(tid); + if (status == null) { + return TStatus.UNKNOWN; + } + return status; + } + + @Override + public TStatus waitForStatusChange(EnumSet<TStatus> expected) { + throw new UnsupportedOperationException(); + } + + @Override + public Serializable getTransactionInfo(Fate.TxInfo txInfo) { + throw new UnsupportedOperationException(); + } + + @Override + public long timeCreated() { + throw new UnsupportedOperationException(); + } + + @Override + public long getID() { + return tid; + } + + @Override + public void push(Repo<String> repo) throws StackOverflowException { + throw new UnsupportedOperationException(); + } + + @Override + public void pop() { + throw new UnsupportedOperationException(); + } + + @Override + public void setStatus(TStatus status) { + if (!reserved.contains(tid)) { + throw new IllegalStateException(); + } + if (!statuses.containsKey(tid)) { + throw new IllegalStateException(); + } + statuses.put(tid, status); + } + + @Override + public void setTransactionInfo(Fate.TxInfo txInfo, Serializable val) { + throw new UnsupportedOperationException(); } - if (!statuses.containsKey(tid)) { - throw new IllegalStateException(); + + @Override + public void delete() { + if (!reserved.contains(tid)) { + throw new IllegalStateException(); + } + statuses.remove(tid); + } + + @Override - public void unreserve(long deferTime) { ++ public void unreserve(long deferTime, TimeUnit timeUnit) { + if (!reserved.remove(tid)) { + throw new IllegalStateException(); + } } - statuses.put(tid, status); } @Override diff --cc core/src/test/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlannerTest.java index 6447a2d147,9f4f9d315c..7886a5f0c6 --- a/core/src/test/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlannerTest.java +++ b/core/src/test/java/org/apache/accumulo/core/spi/compaction/DefaultCompactionPlannerTest.java @@@ -27,14 -26,15 +27,16 @@@ import static org.junit.jupiter.api.Ass import java.net.URI; import java.net.URISyntaxException; + import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; + import java.util.Optional; import java.util.Set; -import java.util.stream.Collectors; +import java.util.stream.IntStream; import org.apache.accumulo.core.client.admin.compaction.CompactableFile; import org.apache.accumulo.core.conf.ConfigurationCopy; @@@ -600,6 -446,144 +605,146 @@@ public class DefaultCompactionPlannerTe assertTrue(e.getMessage().contains("maxSize"), "Error message didn't contain maxSize"); } + // Test cases where a tablet has more than table.file.max files, but no files were found using the + // compaction ratio. The planner should try to find the highest ratio that will result in a + // compaction. + @Test + public void testMaxTabletFiles() throws Exception { + String executors = "[{'name':'small','type': 'internal','maxSize':'32M','numThreads':1}," + + "{'name':'medium','type': 'internal','maxSize':'128M','numThreads':2}," + + "{'name':'large','type': 'internal','numThreads':3}]"; + + Map<String,String> overrides = new HashMap<>(); + overrides.put(Property.COMPACTION_SERVICE_PREFIX.getKey() + "cs1.planner.opts.maxOpen", "10"); + overrides.put(Property.TABLE_FILE_MAX.getKey(), "7"); + var conf = new ConfigurationImpl(SiteConfiguration.empty().withOverrides(overrides).build()); + + // For this case need to compact three files and the highest ratio that achieves that is 1.8 + var planner = createPlanner(conf, executors); + var all = createCFs(1000, 1.1, 1.9, 1.8, 1.6, 1.3, 1.4, 1.3, 1.2, 1.1); + var params = createPlanningParams(all, all, Set.of(), 3, CompactionKind.SYSTEM, conf); + var plan = planner.makePlan(params); + var job = getOnlyElement(plan.getJobs()); + assertEquals(createCFs(1000, 1.1, 1.9, 1.8), job.getFiles()); + + // For this case need to compact two files and the highest ratio that achieves that is 2.9 + all = createCFs(1000, 2, 2.9, 2.8, 2.7, 2.6, 2.5, 2.4, 2.3); + params = createPlanningParams(all, all, Set.of(), 3, CompactionKind.SYSTEM, conf); + plan = planner.makePlan(params); + job = getOnlyElement(plan.getJobs()); + assertEquals(createCFs(1000, 2, 2.9), job.getFiles()); + + all = + createCFs(1000, 1.1, 2.89, 2.85, 2.7, 2.3, 2.9, 2.8, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2); + params = createPlanningParams(all, all, Set.of(), 3, CompactionKind.SYSTEM, conf); + plan = planner.makePlan(params); + job = getOnlyElement(plan.getJobs()); + assertEquals(createCFs(1000, 1.1, 2.89, 2.85, 2.7, 2.3, 2.9), job.getFiles()); + + all = createCFs(1000, 1.1, 1.2, 1.3, 1.4, 1.5, 1.6, 1.7, 1.8, 1.9, 1.1); + params = createPlanningParams(all, all, Set.of(), 3, CompactionKind.SYSTEM, conf); + plan = planner.makePlan(params); + job = getOnlyElement(plan.getJobs()); + assertEquals(createCFs(1000, 1.1, 1.2, 1.3, 1.4, 1.5, 1.6, 1.7, 1.8, 1.9), job.getFiles()); + + // In this case the tablet can not be brought below the max files limit in a single compaction, + // so it should find the highest ratio to compact + for (var ratio : List.of(1.9, 2.0, 3.0, 4.0)) { + all = createCFs(1000, 1.9, 1.8, 1.7, 1.6, 1.5, 1.4, 1.5, 1.2, 1.1, 1.1, 1.1, 1.1, 1.1, 1.1, + 1.1, 1.1); + params = createPlanningParams(all, all, Set.of(), ratio, CompactionKind.SYSTEM, conf); + plan = planner.makePlan(params); + job = getOnlyElement(plan.getJobs()); + assertEquals(createCFs(1000, 1.9), job.getFiles()); + } + + // In this case the tablet can be brought below the max limit in single compaction, so it should + // find this + all = + createCFs(1000, 1.9, 1.8, 1.7, 1.6, 1.5, 1.4, 1.5, 1.2, 1.1, 1.1, 1.1, 1.1, 1.1, 1.1, 1.1); + params = createPlanningParams(all, all, Set.of(), 3, CompactionKind.SYSTEM, conf); + plan = planner.makePlan(params); + job = getOnlyElement(plan.getJobs()); + assertEquals(createCFs(1000, 1.9, 1.8, 1.7, 1.6, 1.5, 1.4, 1.5, 1.2, 1.1), job.getFiles()); + + // each file is 10x the size of the file smaller than it + all = createCFs(10, 1.1, 1.1, 1.1, 1.1, 1.1, 1.1, 1.1, 1.1); + params = createPlanningParams(all, all, Set.of(), 3, CompactionKind.SYSTEM, conf); + plan = planner.makePlan(params); + job = getOnlyElement(plan.getJobs()); + assertEquals(createCFs(10, 1.1, 1.1, 1.1, 1.1, 1.1, 1.1, 1.1, 1.1), job.getFiles()); + + // test with some files growing 20x, ensure those are not included + for (var ratio : List.of(1.9, 2.0, 3.0, 4.0)) { + all = createCFs(10, 1.05, 1.05, 1.25, 1.75, 1.25, 1.05, 1.05, 1.05); + params = createPlanningParams(all, all, Set.of(), ratio, CompactionKind.SYSTEM, conf); + plan = planner.makePlan(params); + job = getOnlyElement(plan.getJobs()); + assertEquals(createCFs(10, 1.05, 1.05, 1.25, 1.75), job.getFiles()); + } + + } + + @Test + public void testMaxTabletFilesNoCompaction() throws Exception { + String executors = "[{'name':'small','type': 'internal','maxSize':'32M','numThreads':1}," + + "{'name':'medium','type': 'internal','maxSize':'128M','numThreads':2}," + + "{'name':'large','type': 'internal','maxSize':'512M','numThreads':3}]"; + + Map<String,String> overrides = new HashMap<>(); + overrides.put(Property.COMPACTION_SERVICE_PREFIX.getKey() + "cs1.planner.opts.maxOpen", "10"); + overrides.put(Property.TABLE_FILE_MAX.getKey(), "7"); + var conf = new ConfigurationImpl(SiteConfiguration.empty().withOverrides(overrides).build()); + + // ensure that when a compaction would be over the max size limit that it is not planned + var planner = createPlanner(conf, executors); + var all = createCFs(1_000_000_000, 2, 2, 2, 2, 2, 2, 2); + var params = createPlanningParams(all, all, Set.of(), 3, CompactionKind.SYSTEM, conf); + var plan = planner.makePlan(params); + ++ System.out.println(plan.getJobs()); ++ + assertTrue(plan.getJobs().isEmpty()); + + // ensure when a compaction is running and we are over files max but below the compaction ratio + // that a compaction is not planned + all = createCFs(1_000, 2, 2, 2, 2, 2, 2, 2); + var job = new CompactionJobImpl((short) 1, CompactionExecutorIdImpl.externalId("ee1"), + createCFs("F1", "1000"), CompactionKind.SYSTEM, Optional.of(false)); + params = createPlanningParams(all, all, Set.of(job), 3, CompactionKind.SYSTEM, conf); + plan = planner.makePlan(params); + + assertTrue(plan.getJobs().isEmpty()); + + // a really bad situation, each file is 20 times the size of its smaller file. The algorithm + // does not search that for ratios that low. + all = createCFs(10, 1.05, 1.05, 1.05, 1.05, 1.05, 1.05, 1.05, 1.05); + params = createPlanningParams(all, all, Set.of(), 3, CompactionKind.SYSTEM, conf); + plan = planner.makePlan(params); + assertTrue(plan.getJobs().isEmpty()); + } + + // Test to ensure that plugin falls back from TABLE_FILE_MAX to TSERV_SCAN_MAX_OPENFILES + @Test + public void testMaxTableFilesFallback() throws Exception { + String executors = "[{'name':'small','type': 'internal','maxSize':'32M','numThreads':1}," + + "{'name':'medium','type': 'internal','maxSize':'128M','numThreads':2}," + + "{'name':'large','type': 'internal','numThreads':3}]"; + + Map<String,String> overrides = new HashMap<>(); + overrides.put(Property.COMPACTION_SERVICE_PREFIX.getKey() + "cs1.planner.opts.maxOpen", "10"); + overrides.put(Property.TABLE_FILE_MAX.getKey(), "0"); + overrides.put(Property.TSERV_SCAN_MAX_OPENFILES.getKey(), "5"); + var conf = new ConfigurationImpl(SiteConfiguration.empty().withOverrides(overrides).build()); + + var planner = createPlanner(conf, executors); + var all = createCFs(1000, 1.9, 1.8, 1.7, 1.6, 1.5, 1.4, 1.3, 1.2, 1.1); + var params = createPlanningParams(all, all, Set.of(), 3, CompactionKind.SYSTEM, conf); + var plan = planner.makePlan(params); + var job = getOnlyElement(plan.getJobs()); + assertEquals(createCFs(1000, 1.9, 1.8, 1.7, 1.6, 1.5, 1.4), job.getFiles()); + } + private CompactionJob createJob(CompactionKind kind, Set<CompactableFile> all, Set<CompactableFile> files) { return new CompactionPlanImpl.BuilderImpl(kind, all, all) @@@ -607,16 -591,46 +752,53 @@@ .build().getJobs().iterator().next(); } + // Create a set of files whose sizes would require certain compaction ratios to compact - private Set<CompactableFile> createCFs(int initialSize, double... desiredRatios) - throws URISyntaxException { ++ private Set<CompactableFile> createCFs(int initialSize, double... desiredRatios) { + List<String> pairs = new ArrayList<>(); + pairs.add("F1"); + pairs.add(initialSize + ""); + + double previousFileSizes = initialSize; + + int i = 2; + for (double desiredRatio : desiredRatios) { + Preconditions.checkArgument(desiredRatio > 1.0); + Preconditions.checkArgument(desiredRatio <= i); + + /* + * The compaction ratio formula is fileSize * ratio < fileSize + previousFileSizes. Solved the + * following equation to compute a file size given a desired ratio. + * + * fileSize * ratio = fileSize + previousFileSizes + * + * fileSize * ratio - fileSize = previousFileSizes + * + * fileSize * (ratio - 1) = previousFileSizes + * + * fileSize = previousFileSizes / (ratio - 1) + */ + + double fileSize = previousFileSizes / (desiredRatio - 1); + pairs.add("F" + i + "_" + desiredRatio); + pairs.add(Math.round(fileSize) + ""); + + previousFileSizes += fileSize; + i++; + } + + return createCFs(pairs.toArray(new String[0])); + } + - private static Set<CompactableFile> createCFs(String... namesSizePairs) - throws URISyntaxException { + private static CompactableFile createCF(String name, long size) { + try { + return CompactableFile + .create(new URI("hdfs://fake/accumulo/tables/1/t-0000000z/" + name + ".rf"), size, 0); + } catch (URISyntaxException e) { + throw new RuntimeException(e); + } + } + + private static Set<CompactableFile> createCFs(String... namesSizePairs) { Set<CompactableFile> files = new HashSet<>(); for (int i = 0; i < namesSizePairs.length; i += 2) { @@@ -721,9 -750,13 +913,15 @@@ private static CompactionPlanner.InitParameters getInitParams(Configuration conf, String executors) { + String maxOpen = conf.get(prefix + "cs1.planner.opts.maxOpen"); Map<String,String> options = new HashMap<>(); options.put("executors", executors.replaceAll("'", "\"")); - options.put("maxOpen", "15"); + + if (maxOpen != null) { + options.put("maxOpen", maxOpen); ++ } else { ++ options.put("maxOpen", "15"); + } ServiceEnvironment senv = EasyMock.createMock(ServiceEnvironment.class); EasyMock.expect(senv.getConfiguration()).andReturn(conf).anyTimes(); diff --cc server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionPluginUtils.java index 7978d3c2e5,0000000000..fdcb93732f mode 100644,000000..100644 --- a/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionPluginUtils.java +++ b/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionPluginUtils.java @@@ -1,331 -1,0 +1,337 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.server.compaction; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.function.Predicate; +import java.util.stream.Collectors; + +import org.apache.accumulo.core.classloader.ClassLoaderUtil; +import org.apache.accumulo.core.client.PluginEnvironment; +import org.apache.accumulo.core.client.admin.CompactionConfig; +import org.apache.accumulo.core.client.admin.PluginConfig; +import org.apache.accumulo.core.client.admin.compaction.CompactableFile; +import org.apache.accumulo.core.client.admin.compaction.CompactionConfigurer; +import org.apache.accumulo.core.client.admin.compaction.CompactionSelector; +import org.apache.accumulo.core.client.rfile.RFileSource; +import org.apache.accumulo.core.client.sample.SamplerConfiguration; +import org.apache.accumulo.core.client.summary.SummarizerConfiguration; +import org.apache.accumulo.core.client.summary.Summary; +import org.apache.accumulo.core.clientImpl.UserCompactionUtils; +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.ConfigurationTypeHelper; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.data.TabletId; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.dataImpl.TabletIdImpl; +import org.apache.accumulo.core.file.FileOperations; +import org.apache.accumulo.core.iterators.SortedKeyValueIterator; +import org.apache.accumulo.core.metadata.CompactableFileImpl; +import org.apache.accumulo.core.metadata.StoredTabletFile; +import org.apache.accumulo.core.metadata.schema.DataFileValue; +import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl; +import org.apache.accumulo.core.spi.common.ServiceEnvironment; +import org.apache.accumulo.core.spi.compaction.CompactionDispatcher; +import org.apache.accumulo.core.summary.Gatherer; +import org.apache.accumulo.core.summary.SummarizerFactory; +import org.apache.accumulo.core.summary.SummaryCollection; +import org.apache.accumulo.core.summary.SummaryReader; +import org.apache.accumulo.server.ServerContext; +import org.apache.accumulo.server.ServiceEnvironmentImpl; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Collections2; + +public class CompactionPluginUtils { + + private static final Logger log = LoggerFactory.getLogger(CompactionPluginUtils.class); + + private static <T> T newInstance(AccumuloConfiguration tableConfig, String className, + Class<T> baseClass) { + String context = ClassLoaderUtil.tableContext(tableConfig); + try { + return ConfigurationTypeHelper.getClassInstance(context, className, baseClass); + } catch (ReflectiveOperationException e) { + throw new IllegalArgumentException(e); + } + } + + public static Set<StoredTabletFile> selectFiles(ServerContext context, KeyExtent extent, + CompactionConfig compactionConfig, Map<StoredTabletFile,DataFileValue> allFiles) { + if (!UserCompactionUtils.isDefault(compactionConfig.getSelector())) { + return selectFiles(context, extent, allFiles, compactionConfig.getSelector()); + } else { + return allFiles.keySet(); + } + } + + private static Set<StoredTabletFile> selectFiles(ServerContext context, KeyExtent extent, + Map<StoredTabletFile,DataFileValue> datafiles, PluginConfig selectorConfig) { + + log.debug("Selecting files for {} using {}", extent, selectorConfig); + + CompactionSelector selector = newInstance(context.getTableConfiguration(extent.tableId()), + selectorConfig.getClassName(), CompactionSelector.class); + + final ServiceEnvironment senv = new ServiceEnvironmentImpl(context); + + selector.init(new CompactionSelector.InitParameters() { + @Override + public Map<String,String> getOptions() { + return selectorConfig.getOptions(); + } + + @Override + public PluginEnvironment getEnvironment() { + return senv; + } + + @Override + public TableId getTableId() { + return extent.tableId(); + } + }); + + CompactionSelector.Selection selection = + selector.select(new CompactionSelector.SelectionParameters() { + @Override + public PluginEnvironment getEnvironment() { + return senv; + } + + @Override + public Collection<CompactableFile> getAvailableFiles() { + return Collections2.transform(datafiles.entrySet(), + e -> new CompactableFileImpl(e.getKey(), e.getValue())); + } + + @Override + public Collection<Summary> getSummaries(Collection<CompactableFile> files, + Predicate<SummarizerConfiguration> summarySelector) { + + // ELASTICITY_TODO this may open files for user tables in the manager, need to avoid + // this. See #3526 + + try { + var tableConf = context.getTableConfiguration(extent.tableId()); + + SummaryCollection sc = new SummaryCollection(); + SummarizerFactory factory = new SummarizerFactory(tableConf); + for (CompactableFile cf : files) { + var file = CompactableFileImpl.toStoredTabletFile(cf); + FileSystem fs = context.getVolumeManager().getFileSystemByPath(file.getPath()); + Configuration conf = context.getHadoopConf(); + RFileSource source = new RFileSource(new FSDataInputStream(fs.open(file.getPath())), + fs.getFileStatus(file.getPath()).getLen(), file.getRange()); + + SummaryCollection fsc = SummaryReader + .load(conf, source, file.getFileName(), summarySelector, factory, + tableConf.getCryptoService()) + .getSummaries(Collections.singletonList(new Gatherer.RowRange(extent))); + + sc.merge(fsc, factory); + } + return sc.getSummaries(); + } catch (IOException ioe) { + throw new UncheckedIOException(ioe); + } + } + + @Override + public TableId getTableId() { + return extent.tableId(); + } + + @Override + public TabletId getTabletId() { + return new TabletIdImpl(extent); + } + + @Override + public Optional<SortedKeyValueIterator<Key,Value>> getSample(CompactableFile cf, + SamplerConfiguration sc) { + + // ELASTICITY_TODO this may open files for user tables in the manager, need to avoid + // this. See #3526 + + try { + var file = CompactableFileImpl.toStoredTabletFile(cf); + FileSystem fs = context.getVolumeManager().getFileSystemByPath(file.getPath()); + Configuration conf = context.getHadoopConf(); + var tableConf = context.getTableConfiguration(extent.tableId()); + var iter = FileOperations.getInstance().newReaderBuilder() + .forFile(file, fs, conf, tableConf.getCryptoService()) + .withTableConfiguration(tableConf).seekToBeginning().build(); + var sampleIter = iter.getSample(new SamplerConfigurationImpl(sc)); + if (sampleIter == null) { + iter.close(); + return Optional.empty(); + } + + return Optional.of(sampleIter); + } catch (IOException ioe) { + throw new UncheckedIOException(ioe); + } + } + }); + + return selection.getFilesToCompact().stream().map(CompactableFileImpl::toStoredTabletFile) + .collect(Collectors.toSet()); + } + + public static Map<String,String> computeOverrides(Optional<CompactionConfig> compactionConfig, - ServerContext context, KeyExtent extent, Set<CompactableFile> files) { ++ ServerContext context, KeyExtent extent, Set<CompactableFile> inputFiles, ++ Set<CompactableFile> selectedFiles) { + + if (compactionConfig.isPresent() + && !UserCompactionUtils.isDefault(compactionConfig.orElseThrow().getConfigurer())) { - return CompactionPluginUtils.computeOverrides(context, extent, files, ++ return CompactionPluginUtils.computeOverrides(context, extent, inputFiles, selectedFiles, + compactionConfig.orElseThrow().getConfigurer()); + } + + var tableConf = context.getTableConfiguration(extent.tableId()); + + var configurorClass = tableConf.get(Property.TABLE_COMPACTION_CONFIGURER); + if (configurorClass == null || configurorClass.isBlank()) { + return Map.of(); + } + + var opts = + tableConf.getAllPropertiesWithPrefixStripped(Property.TABLE_COMPACTION_CONFIGURER_OPTS); + - return CompactionPluginUtils.computeOverrides(context, extent, files, ++ return CompactionPluginUtils.computeOverrides(context, extent, inputFiles, selectedFiles, + new PluginConfig(configurorClass, opts)); + } + + public static Map<String,String> computeOverrides(ServerContext context, KeyExtent extent, - Set<CompactableFile> files, PluginConfig cfg) { ++ Set<CompactableFile> inputFiles, Set<CompactableFile> selectedFiles, PluginConfig cfg) { + + CompactionConfigurer configurer = newInstance(context.getTableConfiguration(extent.tableId()), + cfg.getClassName(), CompactionConfigurer.class); + + final ServiceEnvironment senv = new ServiceEnvironmentImpl(context); + + configurer.init(new CompactionConfigurer.InitParameters() { + @Override + public Map<String,String> getOptions() { + return cfg.getOptions(); + } + + @Override + public PluginEnvironment getEnvironment() { + return senv; + } + + @Override + public TableId getTableId() { + return extent.tableId(); + } + }); + + var overrides = configurer.override(new CompactionConfigurer.InputParameters() { + @Override + public Collection<CompactableFile> getInputFiles() { - return files; ++ return inputFiles; ++ } ++ ++ @Override ++ public Set<CompactableFile> getSelectedFiles() { ++ return selectedFiles; + } + + @Override + public PluginEnvironment getEnvironment() { + return senv; + } + + @Override + public TableId getTableId() { + return extent.tableId(); + } + + @Override + public TabletId getTabletId() { + return new TabletIdImpl(extent); + } + }); + + if (overrides.getOverrides().isEmpty()) { + return null; + } + + return overrides.getOverrides(); + } + + static CompactionDispatcher createDispatcher(ServiceEnvironment env, TableId tableId) { + + var conf = env.getConfiguration(tableId); + + var className = conf.get(Property.TABLE_COMPACTION_DISPATCHER.getKey()); + + Map<String,String> opts = new HashMap<>(); + + conf.getWithPrefix(Property.TABLE_COMPACTION_DISPATCHER_OPTS.getKey()).forEach((k, v) -> { + opts.put(k.substring(Property.TABLE_COMPACTION_DISPATCHER_OPTS.getKey().length()), v); + }); + + var finalOpts = Collections.unmodifiableMap(opts); + + CompactionDispatcher.InitParameters initParameters = new CompactionDispatcher.InitParameters() { + @Override + public Map<String,String> getOptions() { + return finalOpts; + } + + @Override + public TableId getTableId() { + return tableId; + } + + @Override + public ServiceEnvironment getServiceEnv() { + return env; + } + }; + + CompactionDispatcher dispatcher = null; + try { + dispatcher = env.instantiate(tableId, className, CompactionDispatcher.class); + } catch (ReflectiveOperationException e) { + throw new RuntimeException(e); + } + + dispatcher.init(initParameters); + + return dispatcher; + } +} diff --cc server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java index 6d2a51c46f,0000000000..c03cb3241f mode 100644,000000..100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java @@@ -1,1388 -1,0 +1,1399 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.manager.compaction.coordinator; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.MINUTES; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.COMPACTED; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.ECOMP; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.FILES; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOCATION; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.OPID; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.PREV_ROW; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.SCANS; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.SELECTED; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.EnumMap; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import java.util.stream.Collectors; + +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.TableDeletedException; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.client.admin.CompactionConfig; ++import org.apache.accumulo.core.client.admin.compaction.CompactableFile; +import org.apache.accumulo.core.clientImpl.thrift.SecurityErrorCode; +import org.apache.accumulo.core.clientImpl.thrift.TInfo; +import org.apache.accumulo.core.clientImpl.thrift.TableOperation; +import org.apache.accumulo.core.clientImpl.thrift.TableOperationExceptionType; +import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException; +import org.apache.accumulo.core.clientImpl.thrift.ThriftTableOperationException; +import org.apache.accumulo.core.compaction.thrift.CompactionCoordinatorService; +import org.apache.accumulo.core.compaction.thrift.TCompactionState; +import org.apache.accumulo.core.compaction.thrift.TCompactionStatusUpdate; +import org.apache.accumulo.core.compaction.thrift.TExternalCompaction; +import org.apache.accumulo.core.compaction.thrift.TExternalCompactionList; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.NamespaceId; +import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent; +import org.apache.accumulo.core.fate.FateTxId; +import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; +import org.apache.accumulo.core.iteratorsImpl.system.SystemIteratorUtil; +import org.apache.accumulo.core.metadata.AbstractTabletFile; +import org.apache.accumulo.core.metadata.CompactableFileImpl; +import org.apache.accumulo.core.metadata.ReferencedTabletFile; +import org.apache.accumulo.core.metadata.StoredTabletFile; +import org.apache.accumulo.core.metadata.schema.Ample; +import org.apache.accumulo.core.metadata.schema.Ample.Refreshes.RefreshEntry; +import org.apache.accumulo.core.metadata.schema.Ample.RejectionHandler; +import org.apache.accumulo.core.metadata.schema.DataFileValue; +import org.apache.accumulo.core.metadata.schema.ExternalCompactionId; +import org.apache.accumulo.core.metadata.schema.ExternalCompactionMetadata; +import org.apache.accumulo.core.metadata.schema.SelectedFiles; +import org.apache.accumulo.core.metadata.schema.TabletMetadata; +import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType; +import org.apache.accumulo.core.metrics.MetricsProducer; +import org.apache.accumulo.core.rpc.ThriftUtil; +import org.apache.accumulo.core.securityImpl.thrift.TCredentials; +import org.apache.accumulo.core.spi.compaction.CompactionJob; +import org.apache.accumulo.core.spi.compaction.CompactionKind; +import org.apache.accumulo.core.tabletserver.thrift.InputFile; +import org.apache.accumulo.core.tabletserver.thrift.IteratorConfig; +import org.apache.accumulo.core.tabletserver.thrift.TCompactionKind; +import org.apache.accumulo.core.tabletserver.thrift.TCompactionStats; +import org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob; +import org.apache.accumulo.core.tabletserver.thrift.TabletServerClientService; +import org.apache.accumulo.core.util.Retry; +import org.apache.accumulo.core.util.UtilWaitThread; +import org.apache.accumulo.core.util.cache.Caches.CacheName; +import org.apache.accumulo.core.util.compaction.CompactionExecutorIdImpl; +import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil; +import org.apache.accumulo.core.util.compaction.RunningCompaction; +import org.apache.accumulo.core.util.threads.ThreadPools; +import org.apache.accumulo.core.util.threads.Threads; +import org.apache.accumulo.core.volume.Volume; +import org.apache.accumulo.manager.EventCoordinator; +import org.apache.accumulo.manager.compaction.queue.CompactionJobQueues; +import org.apache.accumulo.manager.tableOps.bulkVer2.TabletRefresher; +import org.apache.accumulo.server.ServerContext; +import org.apache.accumulo.server.compaction.CompactionConfigStorage; +import org.apache.accumulo.server.compaction.CompactionPluginUtils; +import org.apache.accumulo.server.manager.LiveTServerSet; +import org.apache.accumulo.server.security.SecurityOperation; +import org.apache.accumulo.server.tablets.TabletNameGenerator; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.thrift.TException; +import org.apache.zookeeper.KeeperException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.CacheLoader; +import com.github.benmanes.caffeine.cache.LoadingCache; +import com.github.benmanes.caffeine.cache.Weigher; +import com.google.common.base.Preconditions; +import com.google.common.collect.Iterators; +import com.google.common.collect.Sets; +import com.google.common.net.HostAndPort; +import com.google.common.util.concurrent.MoreExecutors; + +import io.micrometer.core.instrument.Gauge; +import io.micrometer.core.instrument.MeterRegistry; + +public class CompactionCoordinator + implements CompactionCoordinatorService.Iface, Runnable, MetricsProducer { + + private static final Logger LOG = LoggerFactory.getLogger(CompactionCoordinator.class); + private static final long FIFTEEN_MINUTES = TimeUnit.MINUTES.toMillis(15); + + /* + * Map of compactionId to RunningCompactions. This is an informational cache of what external + * compactions may be running. Its possible it may contain external compactions that are not + * actually running. It may not contain compactions that are actually running. The metadata table + * is the most authoritative source of what external compactions are currently running, but it + * does not have the stats that this map has. + */ + protected static final Map<ExternalCompactionId,RunningCompaction> RUNNING_CACHE = + new ConcurrentHashMap<>(); + + /* + * When the manager starts up any refreshes that were in progress when the last manager process + * died must be completed before new refresh entries are written. This map of countdown latches + * helps achieve that goal. + */ + private final Map<Ample.DataLevel,CountDownLatch> refreshLatches; + + /* Map of group name to last time compactor called to get a compaction job */ + // ELASTICITY_TODO need to clean out groups that are no longer configured.. + private static final Map<String,Long> TIME_COMPACTOR_LAST_CHECKED = new ConcurrentHashMap<>(); + + private final ServerContext ctx; + private final LiveTServerSet tserverSet; + private final SecurityOperation security; + private final CompactionJobQueues jobQueues; + private final EventCoordinator eventCoordinator; + // Exposed for tests + protected volatile Boolean shutdown = false; + + private final ScheduledThreadPoolExecutor schedExecutor; + + private final Cache<ExternalCompactionId,RunningCompaction> completed; + private LoadingCache<Long,CompactionConfig> compactionConfigCache; + private final Cache<Path,Integer> checked_tablet_dir_cache; + private final DeadCompactionDetector deadCompactionDetector; + + private final QueueMetrics queueMetrics; + + public CompactionCoordinator(ServerContext ctx, LiveTServerSet tservers, + SecurityOperation security, EventCoordinator eventCoordinator) { + this.ctx = ctx; + this.tserverSet = tservers; + this.schedExecutor = this.ctx.getScheduledExecutor(); + this.security = security; + this.eventCoordinator = eventCoordinator; + + this.jobQueues = new CompactionJobQueues( + ctx.getConfiguration().getCount(Property.MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_SIZE)); + + this.queueMetrics = new QueueMetrics(jobQueues); + + var refreshLatches = new EnumMap<Ample.DataLevel,CountDownLatch>(Ample.DataLevel.class); + refreshLatches.put(Ample.DataLevel.ROOT, new CountDownLatch(1)); + refreshLatches.put(Ample.DataLevel.METADATA, new CountDownLatch(1)); + refreshLatches.put(Ample.DataLevel.USER, new CountDownLatch(1)); + this.refreshLatches = Collections.unmodifiableMap(refreshLatches); + + completed = ctx.getCaches().createNewBuilder(CacheName.COMPACTIONS_COMPLETED, true) + .maximumSize(200).expireAfterWrite(10, TimeUnit.MINUTES).build(); + + CacheLoader<Long,CompactionConfig> loader = + txid -> CompactionConfigStorage.getConfig(ctx, txid); + + // Keep a small short lived cache of compaction config. Compaction config never changes, however + // when a compaction is canceled it is deleted which is why there is a time limit. It does not + // hurt to let a job that was canceled start, it will be canceled later. Caching this immutable + // config will help avoid reading the same data over and over. + compactionConfigCache = ctx.getCaches().createNewBuilder(CacheName.COMPACTION_CONFIGS, true) + .expireAfterWrite(30, SECONDS).maximumSize(100).build(loader); + + Weigher<Path,Integer> weigher = (path, count) -> { + return path.toUri().toString().length(); + }; + + checked_tablet_dir_cache = + ctx.getCaches().createNewBuilder(CacheName.COMPACTION_DIR_CACHE, true) + .maximumWeight(10485760L).weigher(weigher).build(); + + deadCompactionDetector = new DeadCompactionDetector(this.ctx, this, schedExecutor); + // At this point the manager does not have its lock so no actions should be taken yet + } + + private volatile Thread serviceThread = null; + + public void start() { + serviceThread = Threads.createThread("CompactionCoordinator Thread", this); + serviceThread.start(); + } + + public void shutdown() { + shutdown = true; + var localThread = serviceThread; + if (localThread != null) { + try { + localThread.join(); + } catch (InterruptedException e) { + LOG.error("Exception stopping compaction coordinator thread", e); + } + } + } + + protected void startCompactionCleaner(ScheduledThreadPoolExecutor schedExecutor) { + ScheduledFuture<?> future = + schedExecutor.scheduleWithFixedDelay(this::cleanUpCompactors, 0, 5, TimeUnit.MINUTES); + ThreadPools.watchNonCriticalScheduledTask(future); + } + + protected void startRunningCleaner(ScheduledThreadPoolExecutor schedExecutor) { + ScheduledFuture<?> future = + schedExecutor.scheduleWithFixedDelay(this::cleanUpRunning, 0, 5, TimeUnit.MINUTES); + ThreadPools.watchNonCriticalScheduledTask(future); + } + + private void processRefreshes(Ample.DataLevel dataLevel) { + try (var refreshStream = ctx.getAmple().refreshes(dataLevel).stream()) { + // process batches of refresh entries to avoid reading all into memory at once + Iterators.partition(refreshStream.iterator(), 10000).forEachRemaining(refreshEntries -> { + LOG.info("Processing {} tablet refreshes for {}", refreshEntries.size(), dataLevel); + + var extents = + refreshEntries.stream().map(RefreshEntry::getExtent).collect(Collectors.toList()); + var tabletsMeta = new HashMap<KeyExtent,TabletMetadata>(); + try (var tablets = ctx.getAmple().readTablets().forTablets(extents, Optional.empty()) + .fetch(PREV_ROW, LOCATION, SCANS).build()) { + tablets.stream().forEach(tm -> tabletsMeta.put(tm.getExtent(), tm)); + } + + var tserverRefreshes = new HashMap<TabletMetadata.Location,List<TKeyExtent>>(); + + refreshEntries.forEach(refreshEntry -> { + var tm = tabletsMeta.get(refreshEntry.getExtent()); + + // only need to refresh if the tablet is still on the same tserver instance + if (tm != null && tm.getLocation() != null + && tm.getLocation().getServerInstance().equals(refreshEntry.getTserver())) { + KeyExtent extent = tm.getExtent(); + Collection<StoredTabletFile> scanfiles = tm.getScans(); + var ttr = extent.toThrift(); + tserverRefreshes.computeIfAbsent(tm.getLocation(), k -> new ArrayList<>()).add(ttr); + } + }); + + String logId = "Coordinator:" + dataLevel; + ThreadPoolExecutor threadPool = + ctx.threadPools().createFixedThreadPool(10, "Tablet refresh " + logId, false); + try { + TabletRefresher.refreshTablets(threadPool, logId, ctx, tserverSet::getCurrentServers, + tserverRefreshes); + } finally { + threadPool.shutdownNow(); + } + + ctx.getAmple().refreshes(dataLevel).delete(refreshEntries); + }); + } + // allow new refreshes to be written now that all preexisting ones are processed + refreshLatches.get(dataLevel).countDown(); + } + + @Override + public void run() { + + processRefreshes(Ample.DataLevel.ROOT); + processRefreshes(Ample.DataLevel.METADATA); + processRefreshes(Ample.DataLevel.USER); + + startCompactionCleaner(schedExecutor); + startRunningCleaner(schedExecutor); + + // On a re-start of the coordinator it's possible that external compactions are in-progress. + // Attempt to get the running compactions on the compactors and then resolve which tserver + // the external compaction came from to re-populate the RUNNING collection. + LOG.info("Checking for running external compactions"); + // On re-start contact the running Compactors to try and seed the list of running compactions + List<RunningCompaction> running = getCompactionsRunningOnCompactors(); + if (running.isEmpty()) { + LOG.info("No running external compactions found"); + } else { + LOG.info("Found {} running external compactions", running.size()); + running.forEach(rc -> { + TCompactionStatusUpdate update = new TCompactionStatusUpdate(); + update.setState(TCompactionState.IN_PROGRESS); + update.setMessage("Coordinator restarted, compaction found in progress"); + rc.addUpdate(System.currentTimeMillis(), update); + RUNNING_CACHE.put(ExternalCompactionId.of(rc.getJob().getExternalCompactionId()), rc); + }); + } + + startDeadCompactionDetector(); + + // ELASTICITY_TODO the main function of the following loop was getting group summaries from + // tservers. Its no longer doing that. May be best to remove the loop and make the remaining + // task a scheduled one. + + LOG.info("Starting loop to check tservers for compaction summaries"); + while (!shutdown) { + long start = System.currentTimeMillis(); + + long now = System.currentTimeMillis(); + TIME_COMPACTOR_LAST_CHECKED.forEach((k, v) -> { + if ((now - v) > getMissingCompactorWarningTime()) { + // ELASTICITY_TODO may want to consider of the group has any jobs queued OR if the group + // still exist in configuration + LOG.warn("No compactors have checked in with coordinator for group {} in {}ms", k, + getMissingCompactorWarningTime()); + } + }); + + long checkInterval = getTServerCheckInterval(); + long duration = (System.currentTimeMillis() - start); + if (checkInterval - duration > 0) { + LOG.debug("Waiting {}ms for next group check", (checkInterval - duration)); + UtilWaitThread.sleep(checkInterval - duration); + } + } + + LOG.info("Shutting down"); + } + + protected void startDeadCompactionDetector() { + deadCompactionDetector.start(); + } + + protected long getMissingCompactorWarningTime() { + return FIFTEEN_MINUTES; + } + + protected long getTServerCheckInterval() { + return this.ctx.getConfiguration() + .getTimeInMillis(Property.COMPACTION_COORDINATOR_TSERVER_COMPACTION_CHECK_INTERVAL); + } + + public long getNumRunningCompactions() { + return RUNNING_CACHE.size(); + } + + /** + * Return the next compaction job from the queue to a Compactor + * + * @param groupName group + * @param compactorAddress compactor address + * @throws ThriftSecurityException when permission error + * @return compaction job + */ + @Override + public TExternalCompactionJob getCompactionJob(TInfo tinfo, TCredentials credentials, + String groupName, String compactorAddress, String externalCompactionId) + throws ThriftSecurityException { + + // do not expect users to call this directly, expect compactors to call this method + if (!security.canPerformSystemActions(credentials)) { + throw new AccumuloSecurityException(credentials.getPrincipal(), + SecurityErrorCode.PERMISSION_DENIED).asThriftException(); + } + final String group = groupName.intern(); + LOG.trace("getCompactionJob called for group {} by compactor {}", group, compactorAddress); + TIME_COMPACTOR_LAST_CHECKED.put(group, System.currentTimeMillis()); + + TExternalCompactionJob result = null; + + CompactionJobQueues.MetaJob metaJob = + jobQueues.poll(CompactionExecutorIdImpl.externalId(groupName)); + + while (metaJob != null) { + + Optional<CompactionConfig> compactionConfig = getCompactionConfig(metaJob); + + // this method may reread the metadata, do not use the metadata in metaJob for anything after + // this method + ExternalCompactionMetadata ecm = null; + + var kind = metaJob.getJob().getKind(); + + // Only reserve user compactions when the config is present. When compactions are canceled the + // config is deleted. + if (kind == CompactionKind.SYSTEM + || (kind == CompactionKind.USER && compactionConfig.isPresent())) { + ecm = reserveCompaction(metaJob, compactorAddress, + ExternalCompactionId.from(externalCompactionId)); + } + + if (ecm != null) { + result = createThriftJob(externalCompactionId, ecm, metaJob, compactionConfig); + // It is possible that by the time this added that the the compactor that made this request + // is dead. In this cases the compaction is not actually running. + RUNNING_CACHE.put(ExternalCompactionId.of(result.getExternalCompactionId()), + new RunningCompaction(result, compactorAddress, group)); + LOG.debug("Returning external job {} to {} with {} files", result.externalCompactionId, + compactorAddress, ecm.getJobFiles().size()); + break; + } else { + LOG.debug("Unable to reserve compaction job for {}, pulling another off the queue ", + metaJob.getTabletMetadata().getExtent()); + metaJob = jobQueues.poll(CompactionExecutorIdImpl.externalId(groupName)); + } + } + + if (metaJob == null) { + LOG.debug("No jobs found in group {} ", group); + } + + if (result == null) { + LOG.trace("No jobs found for group {}, returning empty job to compactor {}", group, + compactorAddress); + result = new TExternalCompactionJob(); + } + + return result; + + } + + // ELASTICITY_TODO unit test this code + private boolean canReserveCompaction(TabletMetadata tablet, CompactionJob job, + Set<StoredTabletFile> jobFiles) { + + if (tablet == null) { + // the tablet no longer exist + return false; + } + + if (tablet.getOperationId() != null) { + return false; + } + + if (!tablet.getFiles().containsAll(jobFiles)) { + return false; + } + + var currentlyCompactingFiles = tablet.getExternalCompactions().values().stream() + .flatMap(ecm -> ecm.getJobFiles().stream()).collect(Collectors.toSet()); + + if (!Collections.disjoint(jobFiles, currentlyCompactingFiles)) { + return false; + } + + switch (job.getKind()) { + case SYSTEM: + if (tablet.getSelectedFiles() != null + && !Collections.disjoint(jobFiles, tablet.getSelectedFiles().getFiles())) { + return false; + } + break; + case USER: + case SELECTOR: + if (tablet.getSelectedFiles() == null + || !tablet.getSelectedFiles().getFiles().containsAll(jobFiles)) { + return false; + } + break; + default: + throw new UnsupportedOperationException("Not currently handling " + job.getKind()); + } + + return true; + } + + private void checkTabletDir(KeyExtent extent, Path path) { + try { + if (checked_tablet_dir_cache.getIfPresent(path) == null) { + FileStatus[] files = null; + try { + files = ctx.getVolumeManager().listStatus(path); + } catch (FileNotFoundException ex) { + // ignored + } + + if (files == null) { + LOG.debug("Tablet {} had no dir, creating {}", extent, path); + + ctx.getVolumeManager().mkdirs(path); + } + checked_tablet_dir_cache.put(path, 1); + } + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + private ExternalCompactionMetadata createExternalCompactionMetadata(CompactionJob job, + Set<StoredTabletFile> jobFiles, TabletMetadata tablet, String compactorAddress, + ExternalCompactionId externalCompactionId) { + boolean propDels; + + Long fateTxId = null; + + switch (job.getKind()) { + case SYSTEM: { + boolean compactingAll = tablet.getFiles().equals(jobFiles); + propDels = !compactingAll; + } + break; + case SELECTOR: + case USER: { + boolean compactingAll = tablet.getSelectedFiles().initiallySelectedAll() + && tablet.getSelectedFiles().getFiles().equals(jobFiles); + propDels = !compactingAll; + fateTxId = tablet.getSelectedFiles().getFateTxId(); + } + break; + default: + throw new IllegalArgumentException(); + } + + Consumer<String> directoryCreator = dir -> checkTabletDir(tablet.getExtent(), new Path(dir)); + ReferencedTabletFile newFile = TabletNameGenerator.getNextDataFilenameForMajc(propDels, ctx, + tablet, directoryCreator, externalCompactionId); + + return new ExternalCompactionMetadata(jobFiles, newFile, compactorAddress, job.getKind(), + job.getPriority(), job.getExecutor(), propDels, fateTxId); + + } + + private ExternalCompactionMetadata reserveCompaction(CompactionJobQueues.MetaJob metaJob, + String compactorAddress, ExternalCompactionId externalCompactionId) { + + Preconditions.checkArgument(metaJob.getJob().getKind() == CompactionKind.SYSTEM + || metaJob.getJob().getKind() == CompactionKind.USER); + + var tabletMetadata = metaJob.getTabletMetadata(); + + var jobFiles = metaJob.getJob().getFiles().stream().map(CompactableFileImpl::toStoredTabletFile) + .collect(Collectors.toSet()); + + Retry retry = + Retry.builder().maxRetries(5).retryAfter(100, MILLISECONDS).incrementBy(100, MILLISECONDS) + .maxWait(10, SECONDS).backOffFactor(1.5).logInterval(3, MINUTES).createRetry(); + + while (retry.canRetry()) { + try (var tabletsMutator = ctx.getAmple().conditionallyMutateTablets()) { + var extent = metaJob.getTabletMetadata().getExtent(); + + if (!canReserveCompaction(tabletMetadata, metaJob.getJob(), jobFiles)) { + return null; + } + + var ecm = createExternalCompactionMetadata(metaJob.getJob(), jobFiles, tabletMetadata, + compactorAddress, externalCompactionId); + + // any data that is read from the tablet to make a decision about if it can compact or not + // must be included in the requireSame call + var tabletMutator = tabletsMutator.mutateTablet(extent).requireAbsentOperation() + .requireSame(tabletMetadata, FILES, SELECTED, ECOMP); + + tabletMutator.putExternalCompaction(externalCompactionId, ecm); + tabletMutator.submit(tm -> tm.getExternalCompactions().containsKey(externalCompactionId)); + + var result = tabletsMutator.process().get(extent); + + if (result.getStatus() == Ample.ConditionalResult.Status.ACCEPTED) { + return ecm; + } else { + tabletMetadata = result.readMetadata(); + } + } + + retry.useRetry(); + try { + retry.waitForNextAttempt(LOG, + "Reserved compaction for " + metaJob.getTabletMetadata().getExtent()); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + return null; + } + + TExternalCompactionJob createThriftJob(String externalCompactionId, + ExternalCompactionMetadata ecm, CompactionJobQueues.MetaJob metaJob, + Optional<CompactionConfig> compactionConfig) { + ++ Set<CompactableFile> selectedFiles; ++ if (metaJob.getJob().getKind() == CompactionKind.SYSTEM) { ++ selectedFiles = Set.of(); ++ } else { ++ selectedFiles = metaJob.getTabletMetadata().getSelectedFiles().getFiles().stream() ++ .map(file -> new CompactableFileImpl(file, ++ metaJob.getTabletMetadata().getFilesMap().get(file))) ++ .collect(Collectors.toUnmodifiableSet()); ++ } ++ + Map<String,String> overrides = CompactionPluginUtils.computeOverrides(compactionConfig, ctx, - metaJob.getTabletMetadata().getExtent(), metaJob.getJob().getFiles()); ++ metaJob.getTabletMetadata().getExtent(), metaJob.getJob().getFiles(), selectedFiles); + + IteratorConfig iteratorSettings = SystemIteratorUtil + .toIteratorConfig(compactionConfig.map(CompactionConfig::getIterators).orElse(List.of())); + + var files = ecm.getJobFiles().stream().map(storedTabletFile -> { + var dfv = metaJob.getTabletMetadata().getFilesMap().get(storedTabletFile); + return new InputFile(storedTabletFile.getMetadata(), dfv.getSize(), dfv.getNumEntries(), + dfv.getTime()); + }).collect(Collectors.toList()); + + long fateTxid = 0; + if (metaJob.getJob().getKind() == CompactionKind.USER) { + fateTxid = metaJob.getTabletMetadata().getSelectedFiles().getFateTxId(); + } + + return new TExternalCompactionJob(externalCompactionId, + metaJob.getTabletMetadata().getExtent().toThrift(), files, iteratorSettings, + ecm.getCompactTmpName().getNormalizedPathStr(), ecm.getPropagateDeletes(), + TCompactionKind.valueOf(ecm.getKind().name()), fateTxid, overrides); + } + + @Override + public void registerMetrics(MeterRegistry registry) { + Gauge.builder(METRICS_MAJC_QUEUED, jobQueues, CompactionJobQueues::getQueuedJobCount) + .description("Number of queued major compactions").register(registry); + Gauge.builder(METRICS_MAJC_RUNNING, this, CompactionCoordinator::getNumRunningCompactions) + .description("Number of running major compactions").register(registry); + + queueMetrics.registerMetrics(registry); + } + + public void addJobs(TabletMetadata tabletMetadata, Collection<CompactionJob> jobs) { + jobQueues.add(tabletMetadata, jobs); + } + + public CompactionCoordinatorService.Iface getThriftService() { + return this; + } + + class RefreshWriter { + + private final ExternalCompactionId ecid; + private final KeyExtent extent; + + private RefreshEntry writtenEntry; + + RefreshWriter(ExternalCompactionId ecid, KeyExtent extent) { + this.ecid = ecid; + this.extent = extent; + + var dataLevel = Ample.DataLevel.of(extent.tableId()); + try { + // Wait for any refresh entries from the previous manager process to be processed before + // writing new ones. + refreshLatches.get(dataLevel).await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + public void writeRefresh(TabletMetadata.Location location) { + Objects.requireNonNull(location); + + if (writtenEntry != null) { + if (location.getServerInstance().equals(writtenEntry.getTserver())) { + // the location was already written so nothing to do + return; + } else { + deleteRefresh(); + } + } + + var entry = new RefreshEntry(ecid, extent, location.getServerInstance()); + + ctx.getAmple().refreshes(Ample.DataLevel.of(extent.tableId())).add(List.of(entry)); + + LOG.debug("wrote refresh entry for {}", ecid); + + writtenEntry = entry; + } + + public void deleteRefresh() { + if (writtenEntry != null) { + ctx.getAmple().refreshes(Ample.DataLevel.of(extent.tableId())) + .delete(List.of(writtenEntry)); + LOG.debug("deleted refresh entry for {}", ecid); + writtenEntry = null; + } + } + } + + private Optional<CompactionConfig> getCompactionConfig(CompactionJobQueues.MetaJob metaJob) { + if (metaJob.getJob().getKind() == CompactionKind.USER + && metaJob.getTabletMetadata().getSelectedFiles() != null) { + var cconf = + compactionConfigCache.get(metaJob.getTabletMetadata().getSelectedFiles().getFateTxId()); + return Optional.ofNullable(cconf); + } + return Optional.empty(); + } + + /** + * Compactors calls this method when they have finished a compaction. This method does the + * following. + * + * <ol> + * <li>Reads the tablets metadata and determines if the compaction can commit. Its possible that + * things changed while the compaction was running and it can no longer commit.</li> + * <li>If the compaction can commit then a ~refresh entry may be written to the metadata table. + * This is done before attempting to commit to cover the case of process failure after commit. If + * the manager dies after commit then when it restarts it will see the ~refresh entry and refresh + * that tablet. The ~refresh entry is only written when its a system compaction on a tablet with a + * location.</li> + * <li>Commit the compaction using a conditional mutation. If the tablets files or location + * changed since reading the tablets metadata, then conditional mutation will fail. When this + * happens it will reread the metadata and go back to step 1 conceptually. When committing a + * compaction the compacted files are removed and scan entries are added to the tablet in case the + * files are in use, this prevents GC from deleting the files between updating tablet metadata and + * refreshing the tablet. The scan entries are only added when a tablet has a location.</li> + * <li>After successful commit a refresh request is sent to the tablet if it has a location. This + * will cause the tablet to start using the newly compacted files for future scans. Also the + * tablet can delete the scan entries if there are no active scans using them.</li> + * <li>If a ~refresh entry was written, delete it since the refresh was successful.</li> + * </ol> + * + * <p> + * User compactions will be refreshed as part of the fate operation. The user compaction fate + * operation will see the compaction was committed after this code updates the tablet metadata, + * however if it were to rely on this code to do the refresh it would not be able to know when the + * refresh was actually done. Therefore, user compactions will refresh as part of the fate + * operation so that it's known to be done before the fate operation returns. Since the fate + * operation will do it, there is no need to do it here for user compactions. + * </p> + * + * <p> + * The ~refresh entries serve a similar purpose to FATE operations, it ensures that code executes + * even when a process dies. FATE was intentionally not used for compaction commit because FATE + * stores its data in zookeeper. The refresh entry is stored in the metadata table, which is much + * more scalable than zookeeper. The number of system compactions of small files could be large + * and this would be a large number of writes to zookeeper. Zookeeper scales somewhat with reads, + * but not with writes. + * </p> + * + * <p> + * Issue #3559 was opened to explore the possibility of making compaction commit a fate operation + * which would remove the need for the ~refresh section. + * </p> + * + * @param tinfo trace info + * @param credentials tcredentials object + * @param externalCompactionId compaction id + * @param textent tablet extent + * @param stats compaction stats + * @throws ThriftSecurityException when permission error + */ + @Override + public void compactionCompleted(TInfo tinfo, TCredentials credentials, + String externalCompactionId, TKeyExtent textent, TCompactionStats stats) + throws ThriftSecurityException { + // do not expect users to call this directly, expect other tservers to call this method + if (!security.canPerformSystemActions(credentials)) { + throw new AccumuloSecurityException(credentials.getPrincipal(), + SecurityErrorCode.PERMISSION_DENIED).asThriftException(); + } + + var extent = KeyExtent.fromThrift(textent); + LOG.info("Compaction completed, id: {}, stats: {}, extent: {}", externalCompactionId, stats, + extent); + final var ecid = ExternalCompactionId.of(externalCompactionId); + + var tabletMeta = + ctx.getAmple().readTablet(extent, ECOMP, SELECTED, LOCATION, FILES, COMPACTED, OPID); + + if (!canCommitCompaction(ecid, tabletMeta)) { + return; + } + + ExternalCompactionMetadata ecm = tabletMeta.getExternalCompactions().get(ecid); + + // ELASTICITY_TODO this code does not handle race conditions or faults. Need to ensure refresh + // happens in the case of manager process death between commit and refresh. + ReferencedTabletFile newDatafile = + TabletNameGenerator.computeCompactionFileDest(ecm.getCompactTmpName()); + + Optional<ReferencedTabletFile> optionalNewFile; + try { + optionalNewFile = renameOrDeleteFile(stats, ecm, newDatafile); + } catch (IOException e) { + LOG.warn("Can not commit complete compaction {} because unable to delete or rename {} ", ecid, + ecm.getCompactTmpName(), e); + compactionFailed(Map.of(ecid, extent)); + return; + } + + RefreshWriter refreshWriter = new RefreshWriter(ecid, extent); + + try { + tabletMeta = commitCompaction(stats, ecid, tabletMeta, optionalNewFile, refreshWriter); + } catch (RuntimeException e) { + LOG.warn("Failed to commit complete compaction {} {}", ecid, extent, e); + compactionFailed(Map.of(ecid, extent)); + } + + if (ecm.getKind() != CompactionKind.USER) { + refreshTablet(tabletMeta); + } + + // if a refresh entry was written, it can be removed after the tablet was refreshed + refreshWriter.deleteRefresh(); + + // It's possible that RUNNING might not have an entry for this ecid in the case + // of a coordinator restart when the Coordinator can't find the TServer for the + // corresponding external compaction. + recordCompletion(ecid); + + // This will causes the tablet to be reexamined to see if it needs any more compactions. + eventCoordinator.event(extent, "Compaction completed %s", extent); + } + + private Optional<ReferencedTabletFile> renameOrDeleteFile(TCompactionStats stats, + ExternalCompactionMetadata ecm, ReferencedTabletFile newDatafile) throws IOException { + if (stats.getEntriesWritten() == 0) { + // the compaction produced no output so do not need to rename or add a file to the metadata + // table, only delete the input files. + if (!ctx.getVolumeManager().delete(ecm.getCompactTmpName().getPath())) { + throw new IOException("delete returned false"); + } + + return Optional.empty(); + } else { + if (!ctx.getVolumeManager().rename(ecm.getCompactTmpName().getPath(), + newDatafile.getPath())) { + throw new IOException("rename returned false"); + } + + return Optional.of(newDatafile); + } + } + + private void refreshTablet(TabletMetadata metadata) { + var location = metadata.getLocation(); + if (location != null) { + KeyExtent extent = metadata.getExtent(); + + // there is a single tserver and single tablet, do not need a thread pool. The direct executor + // will run everything in the current thread + ExecutorService executorService = MoreExecutors.newDirectExecutorService(); + try { + TabletRefresher.refreshTablets(executorService, + "compaction:" + metadata.getExtent().toString(), ctx, tserverSet::getCurrentServers, + Map.of(metadata.getLocation(), List.of(extent.toThrift()))); + } finally { + executorService.shutdownNow(); + } + } + } + + // ELASTICITY_TODO unit test this method + private boolean canCommitCompaction(ExternalCompactionId ecid, TabletMetadata tabletMetadata) { + + if (tabletMetadata == null) { + LOG.debug("Received completion notification for nonexistent tablet {}", ecid); + return false; + } + + var extent = tabletMetadata.getExtent(); + + if (tabletMetadata.getOperationId() != null) { + // split, merge, and delete tablet should delete the compaction entry in the tablet + LOG.debug("Received completion notification for tablet with active operation {} {} {}", ecid, + extent, tabletMetadata.getOperationId()); + return false; + } + + ExternalCompactionMetadata ecm = tabletMetadata.getExternalCompactions().get(ecid); + + if (ecm == null) { + LOG.debug("Received completion notification for unknown compaction {} {}", ecid, extent); + return false; + } + + if (ecm.getKind() == CompactionKind.USER || ecm.getKind() == CompactionKind.SELECTOR) { + if (tabletMetadata.getSelectedFiles() == null) { + // when the compaction is canceled, selected files are deleted + LOG.debug( + "Received completion notification for user compaction and tablet has no selected files {} {}", + ecid, extent); + return false; + } + + if (ecm.getFateTxId() != tabletMetadata.getSelectedFiles().getFateTxId()) { + // maybe the compaction was cancled and another user compaction was started on the tablet. + LOG.debug( + "Received completion notification for user compaction where its fate txid did not match the tablets {} {} {} {}", + ecid, extent, FateTxId.formatTid(ecm.getFateTxId()), + FateTxId.formatTid(tabletMetadata.getSelectedFiles().getFateTxId())); + } + + if (!tabletMetadata.getSelectedFiles().getFiles().containsAll(ecm.getJobFiles())) { + // this is not expected to happen + LOG.error("User compaction contained files not in the selected set {} {} {} {} {}", + tabletMetadata.getExtent(), ecid, ecm.getKind(), + Optional.ofNullable(tabletMetadata.getSelectedFiles()).map(SelectedFiles::getFiles), + ecm.getJobFiles()); + return false; + } + } + + if (!tabletMetadata.getFiles().containsAll(ecm.getJobFiles())) { + // this is not expected to happen + LOG.error("Compaction contained files not in the tablet files set {} {} {} {}", + tabletMetadata.getExtent(), ecid, tabletMetadata.getFiles(), ecm.getJobFiles()); + return false; + } + + return true; + } + + private TabletMetadata commitCompaction(TCompactionStats stats, ExternalCompactionId ecid, + TabletMetadata tablet, Optional<ReferencedTabletFile> newDatafile, + RefreshWriter refreshWriter) { + + KeyExtent extent = tablet.getExtent(); + + Retry retry = Retry.builder().infiniteRetries().retryAfter(100, MILLISECONDS) + .incrementBy(100, MILLISECONDS).maxWait(10, SECONDS).backOffFactor(1.5) + .logInterval(3, MINUTES).createRetry(); + + while (canCommitCompaction(ecid, tablet)) { + ExternalCompactionMetadata ecm = tablet.getExternalCompactions().get(ecid); + + // the compacted files should not exists in the tablet already + var tablet2 = tablet; + newDatafile.ifPresent( + newFile -> Preconditions.checkState(!tablet2.getFiles().contains(newFile.insert()), + "File already exists in tablet %s %s", newFile, tablet2.getFiles())); + + if (tablet.getLocation() != null + && tablet.getExternalCompactions().get(ecid).getKind() != CompactionKind.USER) { + // Write the refresh entry before attempting to update tablet metadata, this ensures that + // refresh will happen even if this process dies. In the case where this process does not + // die refresh will happen after commit. User compactions will make refresh calls in their + // fate operation, so it does not need to be done here. + refreshWriter.writeRefresh(tablet.getLocation()); + } + + try (var tabletsMutator = ctx.getAmple().conditionallyMutateTablets()) { + var tabletMutator = tabletsMutator.mutateTablet(extent).requireAbsentOperation() + .requireCompaction(ecid).requireSame(tablet, FILES, LOCATION); + + if (ecm.getKind() == CompactionKind.USER || ecm.getKind() == CompactionKind.SELECTOR) { + tabletMutator.requireSame(tablet, SELECTED, COMPACTED); + } + + // make the needed updates to the tablet + updateTabletForCompaction(stats, ecid, tablet, newDatafile, extent, ecm, tabletMutator); + + tabletMutator + .submit(tabletMetadata -> !tabletMetadata.getExternalCompactions().containsKey(ecid)); + + // TODO expensive logging + LOG.debug("Compaction completed {} added {} removed {}", tablet.getExtent(), newDatafile, + ecm.getJobFiles().stream().map(AbstractTabletFile::getFileName) + .collect(Collectors.toList())); + + // ELASTICITY_TODO check return value and retry, could fail because of race conditions + var result = tabletsMutator.process().get(extent); + if (result.getStatus() == Ample.ConditionalResult.Status.ACCEPTED) { + // compaction was committed, mark the compaction input files for deletion + // + // ELASTICITIY_TODO in the case of process death the GC candidates would never be added + // like #3811. If compaction commit were moved to FATE per #3559 then this would not + // be an issue. If compaction commit is never moved to FATE, then this addition could + // moved to the compaction refresh process. The compaction refresh process will go away + // if compaction commit is moved to FATE, so should only do this if not moving to FATE. + ctx.getAmple().putGcCandidates(extent.tableId(), ecm.getJobFiles()); + break; + } else { + // compaction failed to commit, maybe something changed on the tablet so lets reread the + // metadata and try again + tablet = result.readMetadata(); + } + + retry.waitForNextAttempt(LOG, "Failed to commit " + ecid + " for tablet " + extent); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + return tablet; + } + + private void updateTabletForCompaction(TCompactionStats stats, ExternalCompactionId ecid, + TabletMetadata tablet, Optional<ReferencedTabletFile> newDatafile, KeyExtent extent, + ExternalCompactionMetadata ecm, Ample.ConditionalTabletMutator tabletMutator) { + // ELASTICITY_TODO improve logging adapt to use existing tablet files logging + if (ecm.getKind() == CompactionKind.USER) { + if (tablet.getSelectedFiles().getFiles().equals(ecm.getJobFiles())) { + // all files selected for the user compactions are finished, so the tablet is finish and + // its compaction id needs to be updated. + + long fateTxId = tablet.getSelectedFiles().getFateTxId(); + + Preconditions.checkArgument(!tablet.getCompacted().contains(fateTxId), + "Tablet %s unexpected has selected files and compacted columns for %s", + tablet.getExtent(), fateTxId); + + // TODO set to trace + LOG.debug("All selected files compcated for {} setting compacted for {}", + tablet.getExtent(), FateTxId.formatTid(tablet.getSelectedFiles().getFateTxId())); + + tabletMutator.deleteSelectedFiles(); + tabletMutator.putCompacted(fateTxId); + + } else { + // not all of the selected files were finished, so need to add the new file to the + // selected set + + Set<StoredTabletFile> newSelectedFileSet = + new HashSet<>(tablet.getSelectedFiles().getFiles()); + newSelectedFileSet.removeAll(ecm.getJobFiles()); + + if (newDatafile.isPresent()) { + // TODO set to trace + LOG.debug( + "Not all selected files for {} are done, adding new selected file {} from compaction", + tablet.getExtent(), newDatafile.orElseThrow().getPath().getName()); + newSelectedFileSet.add(newDatafile.orElseThrow().insert()); + } else { + // TODO set to trace + LOG.debug( + "Not all selected files for {} are done, compaction produced no output so not adding to selected set.", + tablet.getExtent()); + } + + tabletMutator.putSelectedFiles( + new SelectedFiles(newSelectedFileSet, tablet.getSelectedFiles().initiallySelectedAll(), + tablet.getSelectedFiles().getFateTxId())); + } + } + + if (tablet.getLocation() != null) { + // add scan entries to prevent GC in case the hosted tablet is currently using the files for + // scan + ecm.getJobFiles().forEach(tabletMutator::putScan); + } + ecm.getJobFiles().forEach(tabletMutator::deleteFile); + tabletMutator.deleteExternalCompaction(ecid); + + if (newDatafile.isPresent()) { + tabletMutator.putFile(newDatafile.orElseThrow(), + new DataFileValue(stats.getFileSize(), stats.getEntriesWritten())); + } + } + + @Override + public void compactionFailed(TInfo tinfo, TCredentials credentials, String externalCompactionId, + TKeyExtent extent) throws ThriftSecurityException { + // do not expect users to call this directly, expect other tservers to call this method + if (!security.canPerformSystemActions(credentials)) { + throw new AccumuloSecurityException(credentials.getPrincipal(), + SecurityErrorCode.PERMISSION_DENIED).asThriftException(); + } + LOG.info("Compaction failed, id: {}", externalCompactionId); + final var ecid = ExternalCompactionId.of(externalCompactionId); + compactionFailed(Map.of(ecid, KeyExtent.fromThrift(extent))); + } + + void compactionFailed(Map<ExternalCompactionId,KeyExtent> compactions) { + + try (var tabletsMutator = ctx.getAmple().conditionallyMutateTablets()) { + compactions.forEach((ecid, extent) -> { + try { + ctx.requireNotDeleted(extent.tableId()); + tabletsMutator.mutateTablet(extent).requireAbsentOperation().requireCompaction(ecid) + .deleteExternalCompaction(ecid).submit(new RejectionHandler() { + + @Override + public boolean callWhenTabletDoesNotExists() { + return true; + } + + @Override + public boolean test(TabletMetadata tabletMetadata) { + return tabletMetadata == null + || !tabletMetadata.getExternalCompactions().containsKey(ecid); + } + + }); + } catch (TableDeletedException e) { + LOG.warn("Table {} was deleted, unable to update metadata for compaction failure.", + extent.tableId()); + } + }); + + final List<ExternalCompactionId> ecidsForTablet = new ArrayList<>(); + tabletsMutator.process().forEach((extent, result) -> { + if (result.getStatus() != Ample.ConditionalResult.Status.ACCEPTED) { + + // this should try again later when the dead compaction detector runs, lets log it in case + // its a persistent problem + if (LOG.isDebugEnabled()) { + var ecid = + compactions.entrySet().stream().filter(entry -> entry.getValue().equals(extent)) + .findFirst().map(Map.Entry::getKey).orElse(null); + LOG.debug("Unable to remove failed compaction {} {}", extent, ecid); + } + } else { + // compactionFailed is called from the Compactor when either a compaction fails or + // is cancelled and it's called from the DeadCompactionDetector. This block is + // entered when the conditional mutator above successfully deletes an ecid from + // the tablet metadata. Remove compaction tmp files from the tablet directory + // that have a corresponding ecid in the name. + + ecidsForTablet.clear(); + compactions.entrySet().stream().filter(e -> e.getValue().compareTo(extent) == 0) + .map(Entry::getKey).forEach(ecidsForTablet::add); + + if (!ecidsForTablet.isEmpty()) { + final TabletMetadata tm = ctx.getAmple().readTablet(extent, ColumnType.DIR); + if (tm != null) { + final Collection<Volume> vols = ctx.getVolumeManager().getVolumes(); + for (Volume vol : vols) { + try { + final String volPath = + vol.getBasePath() + Constants.HDFS_TABLES_DIR + Path.SEPARATOR + + extent.tableId().canonical() + Path.SEPARATOR + tm.getDirName(); + final FileSystem fs = vol.getFileSystem(); + for (ExternalCompactionId ecid : ecidsForTablet) { + final String fileSuffix = "_tmp_" + ecid.canonical(); + FileStatus[] files = fs.listStatus(new Path(volPath), (path) -> { + return path.getName().endsWith(fileSuffix); + }); + if (files.length > 0) { + for (FileStatus file : files) { + if (!fs.delete(file.getPath(), false)) { + LOG.warn("Unable to delete ecid tmp file: {}: ", file.getPath()); + } else { + LOG.debug("Deleted ecid tmp file: {}", file.getPath()); + } + } + } + } + } catch (IOException e) { + LOG.error("Exception deleting compaction tmp files for tablet: {}", extent, e); + } + } + } else { + // TabletMetadata does not exist for the extent. This could be due to a merge or + // split operation. Use the utility to find tmp files at the table level + deadCompactionDetector.addTableId(extent.tableId()); + } + } + } + }); + } + + compactions.forEach((k, v) -> recordCompletion(k)); + } + + /** + * Compactor calls to update the status of the assigned compaction + * + * @param tinfo trace info + * @param credentials tcredentials object + * @param externalCompactionId compaction id + * @param update compaction status update + * @param timestamp timestamp of the message + * @throws ThriftSecurityException when permission error + */ + @Override + public void updateCompactionStatus(TInfo tinfo, TCredentials credentials, + String externalCompactionId, TCompactionStatusUpdate update, long timestamp) + throws ThriftSecurityException { + // do not expect users to call this directly, expect other tservers to call this method + if (!security.canPerformSystemActions(credentials)) { + throw new AccumuloSecurityException(credentials.getPrincipal(), + SecurityErrorCode.PERMISSION_DENIED).asThriftException(); + } + LOG.debug("Compaction status update, id: {}, timestamp: {}, update: {}", externalCompactionId, + timestamp, update); + final RunningCompaction rc = RUNNING_CACHE.get(ExternalCompactionId.of(externalCompactionId)); + if (null != rc) { + rc.addUpdate(timestamp, update); + } + } + + private void recordCompletion(ExternalCompactionId ecid) { + var rc = RUNNING_CACHE.remove(ecid); + if (rc != null) { + completed.put(ecid, rc); + } + } + + protected Set<ExternalCompactionId> readExternalCompactionIds() { + return this.ctx.getAmple().readTablets().forLevel(Ample.DataLevel.USER).fetch(ECOMP).build() + .stream().flatMap(tm -> tm.getExternalCompactions().keySet().stream()) + .collect(Collectors.toSet()); + } + + /** + * The RUNNING_CACHE set may contain external compactions that are not actually running. This + * method periodically cleans those up. + */ + protected void cleanUpRunning() { + + // grab a snapshot of the ids in the set before reading the metadata table. This is done to + // avoid removing things that are added while reading the metadata. + Set<ExternalCompactionId> idsSnapshot = Set.copyOf(RUNNING_CACHE.keySet()); + + // grab the ids that are listed as running in the metadata table. It important that this is done + // after getting the snapshot. + Set<ExternalCompactionId> idsInMetadata = readExternalCompactionIds(); + + var idsToRemove = Sets.difference(idsSnapshot, idsInMetadata); + + // remove ids that are in the running set but not in the metadata table + idsToRemove.forEach(this::recordCompletion); + + if (idsToRemove.size() > 0) { + LOG.debug("Removed stale entries from RUNNING_CACHE : {}", idsToRemove); + } + } + + /** + * Return information about running compactions + * + * @param tinfo trace info + * @param credentials tcredentials object + * @return map of ECID to TExternalCompaction objects + * @throws ThriftSecurityException permission error + */ + @Override + public TExternalCompactionList getRunningCompactions(TInfo tinfo, TCredentials credentials) + throws ThriftSecurityException { + // do not expect users to call this directly, expect other tservers to call this method + if (!security.canPerformSystemActions(credentials)) { + throw new AccumuloSecurityException(credentials.getPrincipal(), + SecurityErrorCode.PERMISSION_DENIED).asThriftException(); + } + + final TExternalCompactionList result = new TExternalCompactionList(); + RUNNING_CACHE.forEach((ecid, rc) -> { + TExternalCompaction trc = new TExternalCompaction(); + trc.setGroupName(rc.getGroupName()); + trc.setCompactor(rc.getCompactorAddress()); + trc.setUpdates(rc.getUpdates()); + trc.setJob(rc.getJob()); + result.putToCompactions(ecid.canonical(), trc); + }); + return result; + } + + /** + * Return information about recently completed compactions + * + * @param tinfo trace info + * @param credentials tcredentials object + * @return map of ECID to TExternalCompaction objects + * @throws ThriftSecurityException permission error + */ + @Override + public TExternalCompactionList getCompletedCompactions(TInfo tinfo, TCredentials credentials) + throws ThriftSecurityException { + // do not expect users to call this directly, expect other tservers to call this method + if (!security.canPerformSystemActions(credentials)) { + throw new AccumuloSecurityException(credentials.getPrincipal(), + SecurityErrorCode.PERMISSION_DENIED).asThriftException(); + } + final TExternalCompactionList result = new TExternalCompactionList(); + completed.asMap().forEach((ecid, rc) -> { + TExternalCompaction trc = new TExternalCompaction(); + trc.setGroupName(rc.getGroupName()); + trc.setCompactor(rc.getCompactorAddress()); + trc.setJob(rc.getJob()); + trc.setUpdates(rc.getUpdates()); + result.putToCompactions(ecid.canonical(), trc); + }); + return result; + } + + @Override + public void cancel(TInfo tinfo, TCredentials credentials, String externalCompactionId) + throws TException { + var runningCompaction = RUNNING_CACHE.get(ExternalCompactionId.of(externalCompactionId)); + var extent = KeyExtent.fromThrift(runningCompaction.getJob().getExtent()); + try { + NamespaceId nsId = this.ctx.getNamespaceId(extent.tableId()); + if (!security.canCompact(credentials, extent.tableId(), nsId)) { + throw new AccumuloSecurityException(credentials.getPrincipal(), + SecurityErrorCode.PERMISSION_DENIED).asThriftException(); + } + } catch (TableNotFoundException e) { + throw new ThriftTableOperationException(extent.tableId().canonical(), null, + TableOperation.COMPACT_CANCEL, TableOperationExceptionType.NOTFOUND, e.getMessage()); + } + + cancelCompactionOnCompactor(runningCompaction.getCompactorAddress(), externalCompactionId); + } + + /* Method exists to be overridden in test to hide static method */ + protected String getTServerAddressString(HostAndPort tserverAddress) { + return ExternalCompactionUtil.getHostPortString(tserverAddress); + } + + /* Method exists to be overridden in test to hide static method */ + protected List<RunningCompaction> getCompactionsRunningOnCompactors() { + return ExternalCompactionUtil.getCompactionsRunningOnCompactors(this.ctx); + } + + /* Method exists to be overridden in test to hide static method */ + protected void cancelCompactionOnCompactor(String address, String externalCompactionId) { + HostAndPort hostPort = HostAndPort.fromString(address); + ExternalCompactionUtil.cancelCompaction(this.ctx, hostPort, externalCompactionId); + } + + /* Method exists to be overridden in test to hide static method */ + protected void returnTServerClient(TabletServerClientService.Client client) { + ThriftUtil.returnClient(client, this.ctx); + } + + private void deleteEmpty(ZooReaderWriter zoorw, String path) + throws KeeperException, InterruptedException { + try { + LOG.debug("Deleting empty ZK node {}", path); + zoorw.delete(path); + } catch (KeeperException.NotEmptyException e) { + LOG.debug("Failed to delete {} its not empty, likely an expected race condition.", path); + } + } + + private void cleanUpCompactors() { + final String compactorQueuesPath = this.ctx.getZooKeeperRoot() + Constants.ZCOMPACTORS; + + var zoorw = this.ctx.getZooReaderWriter(); + + try { + var groups = zoorw.getChildren(compactorQueuesPath); + + for (String group : groups) { + String qpath = compactorQueuesPath + "/" + group; + + var compactors = zoorw.getChildren(qpath); + + if (compactors.isEmpty()) { + deleteEmpty(zoorw, qpath); + } + + for (String compactor : compactors) { + String cpath = compactorQueuesPath + "/" + group + "/" + compactor; + var lockNodes = zoorw.getChildren(compactorQueuesPath + "/" + group + "/" + compactor); + if (lockNodes.isEmpty()) { + deleteEmpty(zoorw, cpath); + } + } + } + + } catch (KeeperException | RuntimeException e) { + LOG.warn("Failed to clean up compactors", e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IllegalStateException(e); + } + } + +} diff --cc test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java index 7928b511d5,c6b194e8c7..76e9487708 --- a/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java @@@ -19,15 -19,26 +19,24 @@@ package org.apache.accumulo.test.functional; import static java.util.concurrent.TimeUnit.SECONDS; -import static org.apache.accumulo.core.util.LazySingletons.RANDOM; +import static java.util.stream.Collectors.toList; +import static java.util.stream.Collectors.toSet; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; + import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.junit.jupiter.api.Assertions.fail; import java.io.IOException; + import java.io.UncheckedIOException; + import java.nio.file.FileVisitResult; + import java.nio.file.Files; + import java.nio.file.Paths; + import java.nio.file.SimpleFileVisitor; + import java.nio.file.attribute.BasicFileAttributes; import java.time.Duration; -import java.util.ArrayList; + import java.util.Arrays; import java.util.EnumSet; + import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@@ -49,14 -62,19 +58,16 @@@ import org.apache.accumulo.core.client. import org.apache.accumulo.core.client.admin.CompactionConfig; import org.apache.accumulo.core.client.admin.NewTableConfiguration; import org.apache.accumulo.core.client.admin.PluginConfig; -import org.apache.accumulo.core.client.admin.compaction.CompactableFile; + import org.apache.accumulo.core.client.admin.compaction.CompactionConfigurer; import org.apache.accumulo.core.client.admin.compaction.CompactionSelector; -import org.apache.accumulo.core.client.admin.compaction.CompressionConfigurer; import org.apache.accumulo.core.clientImpl.ClientContext; -import org.apache.accumulo.core.clientImpl.TableOperationsImpl; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.dataImpl.KeyExtent; + import org.apache.accumulo.core.file.rfile.bcfile.PrintBCInfo; -import org.apache.accumulo.core.iterators.DevNull; import org.apache.accumulo.core.iterators.Filter; import org.apache.accumulo.core.iterators.IteratorEnvironment; import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;