This is an automated email from the ASF dual-hosted git repository. ctubbsii pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit 40c5f8edbf6a10366d9ce9728fd0a4c7211503c4 Merge: 8a9c10724c 187733f634 Author: Christopher Tubbs <ctubb...@apache.org> AuthorDate: Mon Feb 10 20:49:43 2025 -0500 Merge branch '3.1' .../apache/accumulo/core/clientImpl/TableOperationsImpl.java | 1 - .../src/main/java/org/apache/accumulo/core/data/LoadPlan.java | 2 ++ .../java/org/apache/accumulo/core/fate/user/FateMutator.java | 11 ++++++----- .../org/apache/accumulo/core/fate/user/UserFateStore.java | 5 +++-- .../java/org/apache/accumulo/core/metadata/schema/Ample.java | 9 +++++---- .../accumulo/core/util/compaction/CompactionPlanImpl.java | 5 +---- .../java/org/apache/accumulo/core/zookeeper/ZooCache.java | 11 +---------- .../core/spi/compaction/RatioBasedCompactionPlannerTest.java | 4 ++-- .../accumulo/server/compaction/CompactionJobGenerator.java | 2 +- .../org/apache/accumulo/server/util/ServiceStatusCmd.java | 1 - .../apache/accumulo/server/util/checkCommand/CheckRunner.java | 3 ++- .../apache/accumulo/test/functional/GracefulShutdownIT.java | 1 + .../apache/accumulo/test/functional/MemoryStarvedScanIT.java | 3 ++- 13 files changed, 26 insertions(+), 32 deletions(-) diff --cc core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java index ff73e72017,0baf9b7e9e..6acf4758d2 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java @@@ -105,10 -86,6 +105,9 @@@ import org.apache.accumulo.core.client. import org.apache.accumulo.core.client.admin.NewTableConfiguration; import org.apache.accumulo.core.client.admin.SummaryRetriever; import org.apache.accumulo.core.client.admin.TableOperations; - import org.apache.accumulo.core.client.admin.TableOperations.ImportDestinationArguments; +import org.apache.accumulo.core.client.admin.TabletAvailability; +import org.apache.accumulo.core.client.admin.TabletInformation; +import org.apache.accumulo.core.client.admin.TabletMergeability; import org.apache.accumulo.core.client.admin.TimeType; import org.apache.accumulo.core.client.admin.compaction.CompactionConfigurer; import org.apache.accumulo.core.client.admin.compaction.CompactionSelector; diff --cc core/src/main/java/org/apache/accumulo/core/fate/user/FateMutator.java index ac675d7fb9,0000000000..0e8b4344ed mode 100644,000000..100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutator.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutator.java @@@ -1,103 -1,0 +1,104 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.core.fate.user; + +import org.apache.accumulo.core.fate.Fate; +import org.apache.accumulo.core.fate.FateKey; +import org.apache.accumulo.core.fate.FateStore; +import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus; +import org.apache.accumulo.core.fate.Repo; - import org.apache.accumulo.core.fate.user.schema.FateSchema; + +public interface FateMutator<T> { + + FateMutator<T> putStatus(TStatus status); + + FateMutator<T> putKey(FateKey fateKey); + + FateMutator<T> putCreateTime(long ctime); + + /** + * Requires that nothing exists for this fate mutation. + */ + FateMutator<T> requireAbsent(); + + /** + * Require that the transaction status is one of the given statuses. If no statuses are provided, + * require that the status column is absent. + * + * @param statuses The statuses to check against. + */ + FateMutator<T> requireStatus(TStatus... statuses); + + /** + * Require the transaction has no reservation. + */ + FateMutator<T> requireUnreserved(); + + /** + * Require the transaction has no fate key set. + */ + FateMutator<T> requireAbsentKey(); + + /** - * Add a conditional mutation to {@link FateSchema.TxColumnFamily#RESERVATION_COLUMN} that will - * put the reservation if there is not already a reservation present ++ * Add a conditional mutation to ++ * {@link org.apache.accumulo.core.fate.user.schema.FateSchema.TxColumnFamily#RESERVATION_COLUMN} ++ * that will put the reservation if there is not already a reservation present + * + * @param reservation the reservation to attempt to put + * @return the FateMutator with this added mutation + */ + FateMutator<T> putReservedTx(FateStore.FateReservation reservation); + + /** - * Add a conditional mutation to {@link FateSchema.TxColumnFamily#RESERVATION_COLUMN} that will - * delete the column if the column value matches the given reservation ++ * Add a conditional mutation to ++ * {@link org.apache.accumulo.core.fate.user.schema.FateSchema.TxColumnFamily#RESERVATION_COLUMN} ++ * that will delete the column if the column value matches the given reservation + * + * @param reservation the reservation to attempt to remove + * @return the FateMutator with this added mutation + */ + FateMutator<T> putUnreserveTx(FateStore.FateReservation reservation); + + FateMutator<T> putName(byte[] data); + + FateMutator<T> putAutoClean(byte[] data); + + FateMutator<T> putException(byte[] data); + + FateMutator<T> putReturnValue(byte[] data); + + FateMutator<T> putAgeOff(byte[] data); + + FateMutator<T> putTxInfo(Fate.TxInfo txInfo, byte[] data); + + FateMutator<T> putRepo(int position, Repo<T> repo); + + FateMutator<T> deleteRepo(int position); + + void mutate(); + + // This exists to represent the subset of statuses from ConditionalWriter.Status that are expected + // and need to be handled. + enum Status { + ACCEPTED, REJECTED, UNKNOWN + } + + Status tryMutate(); + +} diff --cc core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java index 977adb2440,0000000000..8268163dad mode 100644,000000..100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java @@@ -1,555 -1,0 +1,556 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.core.fate.user; + +import java.io.IOException; +import java.io.Serializable; +import java.util.EnumSet; +import java.util.List; +import java.util.Map.Entry; +import java.util.Objects; +import java.util.Optional; +import java.util.SortedMap; +import java.util.UUID; +import java.util.function.Function; +import java.util.function.Predicate; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.clientImpl.ClientContext; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.fate.AbstractFateStore; +import org.apache.accumulo.core.fate.Fate; +import org.apache.accumulo.core.fate.Fate.TxInfo; +import org.apache.accumulo.core.fate.FateId; +import org.apache.accumulo.core.fate.FateInstanceType; +import org.apache.accumulo.core.fate.FateKey; +import org.apache.accumulo.core.fate.ReadOnlyRepo; +import org.apache.accumulo.core.fate.Repo; +import org.apache.accumulo.core.fate.StackOverflowException; +import org.apache.accumulo.core.fate.user.schema.FateSchema.RepoColumnFamily; +import org.apache.accumulo.core.fate.user.schema.FateSchema.TxColumnFamily; +import org.apache.accumulo.core.fate.user.schema.FateSchema.TxInfoColumnFamily; +import org.apache.accumulo.core.fate.zookeeper.ZooUtil; +import org.apache.accumulo.core.iterators.user.WholeRowIterator; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.util.ColumnFQ; +import org.apache.accumulo.core.util.UtilWaitThread; +import org.apache.hadoop.io.Text; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; + +public class UserFateStore<T> extends AbstractFateStore<T> { + + private static final Logger log = LoggerFactory.getLogger(UserFateStore.class); + + private final ClientContext context; + private final String tableName; + + private static final FateInstanceType fateInstanceType = FateInstanceType.USER; + private static final com.google.common.collect.Range<Integer> REPO_RANGE = + com.google.common.collect.Range.closed(1, MAX_REPOS); + + /** + * Constructs a UserFateStore + * + * @param context the {@link ClientContext} + * @param tableName the name of the table which will store the Fate data - * @param lockID the {@link ZooUtil.LockID} held by the process creating this store. Should be - * null if this store will be used as read-only (will not be used to reserve transactions) ++ * @param lockID the {@link org.apache.accumulo.core.fate.zookeeper.ZooUtil.LockID} held by the ++ * process creating this store. Should be null if this store will be used as read-only ++ * (will not be used to reserve transactions) + * @param isLockHeld the {@link Predicate} used to determine if the lockID is held or not at the + * time of invocation. If the store is used for a {@link Fate} which runs a dead + * reservation cleaner, this should be non-null, otherwise null is fine + */ + public UserFateStore(ClientContext context, String tableName, ZooUtil.LockID lockID, + Predicate<ZooUtil.LockID> isLockHeld) { + this(context, tableName, lockID, isLockHeld, DEFAULT_MAX_DEFERRED, DEFAULT_FATE_ID_GENERATOR); + } + + @VisibleForTesting + public UserFateStore(ClientContext context, String tableName, ZooUtil.LockID lockID, + Predicate<ZooUtil.LockID> isLockHeld, int maxDeferred, FateIdGenerator fateIdGenerator) { + super(lockID, isLockHeld, maxDeferred, fateIdGenerator); + this.context = Objects.requireNonNull(context); + this.tableName = Objects.requireNonNull(tableName); + } + + @Override + public FateId create() { + + int attempt = 0; + while (true) { + + FateId fateId = getFateId(); + + if (attempt >= 1) { + log.debug("Failed to create new id: {}, trying again", fateId); + UtilWaitThread.sleep(100); + } + + var status = newMutator(fateId).requireAbsent().putStatus(TStatus.NEW) + .putCreateTime(System.currentTimeMillis()).tryMutate(); + + switch (status) { + case ACCEPTED: + return fateId; + case UNKNOWN: + case REJECTED: + attempt++; + continue; + default: + throw new IllegalStateException("Unknown status " + status); + } + } + } + + public FateId getFateId() { + return fateIdGenerator.newRandomId(type()); + } + + @Override + public Optional<FateId> seedTransaction(Fate.FateOperation txName, FateKey fateKey, Repo<T> repo, + boolean autoCleanUp) { + final var fateId = fateIdGenerator.fromTypeAndKey(type(), fateKey); + Supplier<FateMutator<T>> mutatorFactory = () -> newMutator(fateId).requireAbsent() + .putKey(fateKey).putCreateTime(System.currentTimeMillis()); + if (seedTransaction(mutatorFactory, fateKey + " " + fateId, txName, repo, autoCleanUp)) { + return Optional.of(fateId); + } else { + return Optional.empty(); + } + } + + @Override + public boolean seedTransaction(Fate.FateOperation txName, FateId fateId, Repo<T> repo, + boolean autoCleanUp) { + Supplier<FateMutator<T>> mutatorFactory = + () -> newMutator(fateId).requireStatus(TStatus.NEW).requireUnreserved().requireAbsentKey(); + return seedTransaction(mutatorFactory, fateId.canonical(), txName, repo, autoCleanUp); + } + + private boolean seedTransaction(Supplier<FateMutator<T>> mutatorFactory, String logId, + Fate.FateOperation txName, Repo<T> repo, boolean autoCleanUp) { + int maxAttempts = 5; + for (int attempt = 0; attempt < maxAttempts; attempt++) { + var mutator = mutatorFactory.get(); + mutator = + mutator.putName(serializeTxInfo(txName)).putRepo(1, repo).putStatus(TStatus.SUBMITTED); + if (autoCleanUp) { + mutator = mutator.putAutoClean(serializeTxInfo(autoCleanUp)); + } + var status = mutator.tryMutate(); + if (status == FateMutator.Status.ACCEPTED) { + // signal to the super class that a new fate transaction was seeded and is ready to run + seededTx(); + log.trace("Attempt to seed {} returned {}", logId, status); + return true; + } else if (status == FateMutator.Status.REJECTED) { + log.debug("Attempt to seed {} returned {}", logId, status); + return false; + } else if (status == FateMutator.Status.UNKNOWN) { + // At this point can not reliably determine if the conditional mutation was successful or + // not because no reservation was acquired. For example since no reservation was acquired it + // is possible that seeding was a success and something immediately picked it up and started + // operating on it and changing it. If scanning after that point can not conclude success or + // failure. Another situation is that maybe the fateId already existed in a seeded form + // prior to getting this unknown. + log.debug("Attempt to seed {} returned {} status, retrying", logId, status); + UtilWaitThread.sleep(250); + } + } + + log.warn("Repeatedly received unknown status when attempting to seed {}", logId); + return false; + } + + @Override + public Optional<FateTxStore<T>> tryReserve(FateId fateId) { + verifyLock(lockID, fateId); + // Create a unique FateReservation for this reservation attempt + FateReservation reservation = FateReservation.from(lockID, UUID.randomUUID()); + + // requiring any status prevents creating an entry if the fate id doesn't exist + FateMutator.Status status = + newMutator(fateId).requireStatus(TStatus.values()).putReservedTx(reservation).tryMutate(); + if (status.equals(FateMutator.Status.ACCEPTED)) { + return Optional.of(new FateTxStoreImpl(fateId, reservation)); + } else if (status.equals(FateMutator.Status.UNKNOWN)) { + // If the status is UNKNOWN, this means an error occurred after the mutation was + // sent to the TabletServer, and it is unknown if the mutation was written. We + // need to check if the mutation was written and if it was written by this + // attempt at reservation. If it was written by this reservation attempt, + // we can return the FateTxStore since it was successfully reserved in this + // attempt, otherwise we return empty (was written by another reservation + // attempt or was not written at all). + try (Scanner scanner = context.createScanner(tableName, Authorizations.EMPTY)) { + scanner.setRange(getRow(fateId)); + scanner.fetchColumn(TxColumnFamily.RESERVATION_COLUMN.getColumnFamily(), + TxColumnFamily.RESERVATION_COLUMN.getColumnQualifier()); + FateReservation persistedRes = + scanner.stream().map(entry -> FateReservation.deserialize(entry.getValue().get())) + .findFirst().orElse(null); + if (persistedRes != null && persistedRes.equals(reservation)) { + return Optional.of(new FateTxStoreImpl(fateId, reservation)); + } + } catch (TableNotFoundException e) { + throw new IllegalStateException(tableName + " not found!", e); + } + } + return Optional.empty(); + } + + @Override + public void deleteDeadReservations() { + for (Entry<FateId,FateReservation> activeRes : getActiveReservations().entrySet()) { + FateId fateId = activeRes.getKey(); + FateReservation reservation = activeRes.getValue(); + if (!isLockHeld.test(reservation.getLockID())) { + var status = newMutator(fateId).putUnreserveTx(reservation).tryMutate(); + if (status == FateMutator.Status.ACCEPTED) { + // Technically, this should also be logged for the case where the mutation status + // is UNKNOWN, but the mutation was actually written (fate id was unreserved) + // but there is no way to tell if it was unreserved from this mutation or another + // thread simply unreserving the transaction + log.trace("Deleted the dead reservation {} for fate id {}", reservation, fateId); + } + // No need to verify the status... If it is ACCEPTED, we have successfully unreserved + // the dead transaction. If it is REJECTED, the reservation has changed (i.e., + // has been unreserved so no need to do anything, or has been unreserved and reserved + // again in which case we don't want to change it). If it is UNKNOWN, the mutation + // may or may not have been written. If it was written, we have successfully unreserved + // the dead transaction. If it was not written, the next cycle/call to + // deleteDeadReservations() will try again. + } + } + } + + @Override + protected Stream<FateIdStatus> getTransactions(EnumSet<TStatus> statuses) { + try { + Scanner scanner = context.createScanner(tableName, Authorizations.EMPTY); + scanner.setRange(new Range()); + RowFateStatusFilter.configureScanner(scanner, statuses); + TxColumnFamily.STATUS_COLUMN.fetch(scanner); + TxColumnFamily.RESERVATION_COLUMN.fetch(scanner); + return scanner.stream().onClose(scanner::close).map(e -> { + String txUUIDStr = e.getKey().getRow().toString(); + FateId fateId = FateId.from(fateInstanceType, txUUIDStr); + SortedMap<Key,Value> rowMap; + TStatus status = TStatus.UNKNOWN; + FateReservation reservation = null; + try { + rowMap = WholeRowIterator.decodeRow(e.getKey(), e.getValue()); + } catch (IOException ex) { + throw new RuntimeException(ex); + } + // expect status and optionally reservation + Preconditions.checkState(rowMap.size() == 1 || rowMap.size() == 2, + "Invalid row seen: %s. Expected to see one entry for the status and optionally an " + + "entry for the fate reservation", + rowMap); + for (Entry<Key,Value> entry : rowMap.entrySet()) { + Text colf = entry.getKey().getColumnFamily(); + Text colq = entry.getKey().getColumnQualifier(); + Value val = entry.getValue(); + switch (colq.toString()) { + case TxColumnFamily.STATUS: + status = TStatus.valueOf(val.toString()); + break; + case TxColumnFamily.RESERVATION: + reservation = FateReservation.deserialize(val.get()); + break; + default: + throw new IllegalStateException("Unexpected column seen: " + colf + ":" + colq); + } + } + final TStatus finalStatus = status; + final Optional<FateReservation> finalReservation = Optional.ofNullable(reservation); + return new FateIdStatusBase(fateId) { + @Override + public TStatus getStatus() { + return finalStatus; + } + + @Override + public Optional<FateReservation> getFateReservation() { + return finalReservation; + } + }; + }); + } catch (TableNotFoundException e) { + throw new IllegalStateException(tableName + " not found!", e); + } + } + + @Override + public Stream<FateKey> list(FateKey.FateKeyType type) { + try { + Scanner scanner = context.createScanner(tableName, Authorizations.EMPTY); + scanner.setRange(new Range()); + TxColumnFamily.TX_KEY_COLUMN.fetch(scanner); + FateKeyFilter.configureScanner(scanner, type); + return scanner.stream().onClose(scanner::close) + .map(e -> FateKey.deserialize(e.getValue().get())); + } catch (TableNotFoundException e) { + throw new IllegalStateException(tableName + " not found!", e); + } + } + + @Override + protected TStatus _getStatus(FateId fateId) { + return scanTx(scanner -> { + scanner.setRange(getRow(fateId)); + TxColumnFamily.STATUS_COLUMN.fetch(scanner); + return scanner.stream().map(e -> TStatus.valueOf(e.getValue().toString())).findFirst() + .orElse(TStatus.UNKNOWN); + }); + } + + @Override + protected Optional<FateKey> getKey(FateId fateId) { + return scanTx(scanner -> { + scanner.setRange(getRow(fateId)); + TxColumnFamily.TX_KEY_COLUMN.fetch(scanner); + return scanner.stream().map(e -> FateKey.deserialize(e.getValue().get())).findFirst(); + }); + } + + @Override + protected FateTxStore<T> newUnreservedFateTxStore(FateId fateId) { + return new FateTxStoreImpl(fateId); + } + + static Range getRow(FateId fateId) { + return new Range(getRowId(fateId)); + } + + public static String getRowId(FateId fateId) { + return fateId.getTxUUIDStr(); + } + + private FateMutatorImpl<T> newMutator(FateId fateId) { + return new FateMutatorImpl<>(context, tableName, fateId); + } + + private <R> R scanTx(Function<Scanner,R> func) { + try (Scanner scanner = context.createScanner(tableName, Authorizations.EMPTY)) { + return func.apply(scanner); + } catch (TableNotFoundException e) { + throw new IllegalStateException(tableName + " not found!", e); + } + } + + @Override + public FateInstanceType type() { + return fateInstanceType; + } + + private class FateTxStoreImpl extends AbstractFateTxStoreImpl { + + private FateTxStoreImpl(FateId fateId) { + super(fateId); + } + + private FateTxStoreImpl(FateId fateId, FateReservation reservation) { + super(fateId, reservation); + } + + @Override + public Repo<T> top() { + verifyReservedAndNotDeleted(false); + + return scanTx(scanner -> { + scanner.setRange(getRow(fateId)); + scanner.setBatchSize(1); + scanner.fetchColumnFamily(RepoColumnFamily.NAME); + return scanner.stream().map(e -> { + @SuppressWarnings("unchecked") + var repo = (Repo<T>) deserialize(e.getValue().get()); + return repo; + }).findFirst().orElse(null); + }); + } + + @Override + public List<ReadOnlyRepo<T>> getStack() { + verifyReservedAndNotDeleted(false); + + return scanTx(scanner -> { + scanner.setRange(getRow(fateId)); + scanner.fetchColumnFamily(RepoColumnFamily.NAME); + return scanner.stream().map(e -> { + @SuppressWarnings("unchecked") + var repo = (ReadOnlyRepo<T>) deserialize(e.getValue().get()); + return repo; + }).collect(Collectors.toList()); + }); + } + + @Override + public Serializable getTransactionInfo(TxInfo txInfo) { + verifyReservedAndNotDeleted(false); + + try (Scanner scanner = context.createScanner(tableName, Authorizations.EMPTY)) { + scanner.setRange(getRow(fateId)); + + final ColumnFQ cq; + switch (txInfo) { + case TX_NAME: + cq = TxInfoColumnFamily.TX_NAME_COLUMN; + break; + case AUTO_CLEAN: + cq = TxInfoColumnFamily.AUTO_CLEAN_COLUMN; + break; + case EXCEPTION: + cq = TxInfoColumnFamily.EXCEPTION_COLUMN; + break; + case RETURN_VALUE: + cq = TxInfoColumnFamily.RETURN_VALUE_COLUMN; + break; + case TX_AGEOFF: + cq = TxInfoColumnFamily.TX_AGEOFF_COLUMN; + break; + default: + throw new IllegalArgumentException("Unexpected TxInfo type " + txInfo); + } + scanner.fetchColumn(cq.getColumnFamily(), cq.getColumnQualifier()); + + return scanner.stream().map(e -> deserializeTxInfo(txInfo, e.getValue().get())).findFirst() + .orElse(null); + } catch (TableNotFoundException e) { + throw new IllegalStateException(tableName + " not found!", e); + } + } + + @Override + public long timeCreated() { + verifyReservedAndNotDeleted(false); + + return scanTx(scanner -> { + scanner.setRange(getRow(fateId)); + TxColumnFamily.CREATE_TIME_COLUMN.fetch(scanner); + return scanner.stream().map(e -> Long.parseLong(e.getValue().toString())).findFirst() + .orElse(0L); + }); + } + + @Override + public void push(Repo<T> repo) throws StackOverflowException { + verifyReservedAndNotDeleted(true); + + Optional<Integer> top = findTop(); + + if (top.filter(t -> t >= MAX_REPOS).isPresent()) { + throw new StackOverflowException("Repo stack size too large"); + } + + FateMutator<T> fateMutator = + newMutator(fateId).requireStatus(REQ_PUSH_STATUS.toArray(TStatus[]::new)); + fateMutator.putRepo(top.map(t -> t + 1).orElse(1), repo).mutate(); + } + + @Override + public void pop() { + verifyReservedAndNotDeleted(true); + + Optional<Integer> top = findTop(); + top.ifPresent(t -> newMutator(fateId).requireStatus(REQ_POP_STATUS.toArray(TStatus[]::new)) + .deleteRepo(t).mutate()); + } + + @Override + public void setStatus(TStatus status) { + verifyReservedAndNotDeleted(true); + + newMutator(fateId).putStatus(status).mutate(); + observedStatus = status; + } + + @Override + public void setTransactionInfo(TxInfo txInfo, Serializable so) { + verifyReservedAndNotDeleted(true); + + final byte[] serialized = serializeTxInfo(so); + + newMutator(fateId).putTxInfo(txInfo, serialized).mutate(); + } + + @Override + public void delete() { + verifyReservedAndNotDeleted(true); + + var mutator = newMutator(fateId); + mutator.requireStatus(REQ_DELETE_STATUS.toArray(TStatus[]::new)); + mutator.delete().mutate(); + this.deleted = true; + } + + @Override + public void forceDelete() { + verifyReservedAndNotDeleted(true); + + var mutator = newMutator(fateId); + mutator.requireStatus(REQ_FORCE_DELETE_STATUS.toArray(TStatus[]::new)); + mutator.delete().mutate(); + this.deleted = true; + } + + private Optional<Integer> findTop() { + return scanTx(scanner -> { + scanner.setRange(getRow(fateId)); + scanner.setBatchSize(1); + scanner.fetchColumnFamily(RepoColumnFamily.NAME); + return scanner.stream().map(e -> restoreRepo(e.getKey().getColumnQualifier())).findFirst(); + }); + } + + @Override + protected void unreserve() { + if (!deleted) { + FateMutator.Status status; + do { + status = newMutator(fateId).putUnreserveTx(reservation).tryMutate(); + } while (status.equals(FateMutator.Status.UNKNOWN)); + } + reservation = null; + } + } + + static Text invertRepo(int position) { + Preconditions.checkArgument(REPO_RANGE.contains(position), + "Position %s is not in the valid range of [0,%s]", position, MAX_REPOS); + return new Text(String.format("%02d", MAX_REPOS - position)); + } + + static Integer restoreRepo(Text invertedPosition) { + int position = MAX_REPOS - Integer.parseInt(invertedPosition.toString()); + Preconditions.checkArgument(REPO_RANGE.contains(position), + "Position %s is not in the valid range of [0,%s]", position, MAX_REPOS); + return position; + } +} diff --cc core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java index cf2ea3bb82,b212356a4a..c8e5ae33be --- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java @@@ -256,59 -261,6 +256,60 @@@ public interface Ample void close(); } + interface ConditionalResult { + + /** - * This enum was created instead of using {@link ConditionalWriter.Status} because Ample has - * automated handling for most of the statuses of the conditional writer and therefore only a - * subset are expected to be passed out of Ample. This enum represents the subset that Ample - * will actually return. ++ * This enum was created instead of using ++ * {@link org.apache.accumulo.core.client.ConditionalWriter.Status} because Ample has automated ++ * handling for most of the statuses of the conditional writer and therefore only a subset are ++ * expected to be passed out of Ample. This enum represents the subset that Ample will actually ++ * return. + */ + enum Status { + ACCEPTED, REJECTED + } + + /** + * Returns the status of the conditional mutation or may return a computed status of ACCEPTED in + * some cases, see {@link ConditionalTabletMutator#submit(RejectionHandler)} for details. + */ + Status getStatus(); + + KeyExtent getExtent(); + + /** + * This can only be called when {@link #getStatus()} returns something other than + * {@link Status#ACCEPTED}. It reads that tablets metadata for a failed conditional mutation. + * This can be used to see why it was not accepted. + */ + TabletMetadata readMetadata(); + } + + interface AsyncConditionalTabletsMutator extends AutoCloseable { + /** + * @return A fluent interface to conditional mutating a tablet. Ensure you call + * {@link ConditionalTabletMutator#submit(RejectionHandler)} when finished. + */ + OperationRequirements mutateTablet(KeyExtent extent); + + /** + * Closing ensures that all mutations are processed and their results are reported. + */ + @Override + void close(); + } + + interface ConditionalTabletsMutator extends AsyncConditionalTabletsMutator { + + /** + * After creating one or more conditional mutations using {@link #mutateTablet(KeyExtent)}, call + * this method to process them using a {@link ConditionalWriter} + * + * @return The result from the {@link ConditionalWriter} of processing each tablet. + */ + Map<KeyExtent,ConditionalResult> process(); + } + /** * Interface for changing a tablets persistent data. */ diff --cc core/src/main/java/org/apache/accumulo/core/util/compaction/CompactionPlanImpl.java index 49b8eaa53c,0f26f9492b..32b23bdb20 --- a/core/src/main/java/org/apache/accumulo/core/util/compaction/CompactionPlanImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/util/compaction/CompactionPlanImpl.java @@@ -55,14 -56,14 +55,11 @@@ public class CompactionPlanImpl impleme private final CompactionKind kind; private final ArrayList<CompactionJob> jobs = new ArrayList<>(); -- private final Set<CompactableFile> allFiles; private final Set<CompactableFile> seenFiles = new HashSet<>(); private final Set<CompactableFile> candidates; -- public BuilderImpl(CompactionKind kind, Set<CompactableFile> allFiles, -- Set<CompactableFile> candidates) { ++ public BuilderImpl(CompactionKind kind, Set<CompactableFile> candidates) { this.kind = kind; -- this.allFiles = allFiles; this.candidates = candidates; } diff --cc core/src/main/java/org/apache/accumulo/core/zookeeper/ZooCache.java index ff30277797,0000000000..791e8d85bc mode 100644,000000..100644 --- a/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooCache.java +++ b/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooCache.java @@@ -1,589 -1,0 +1,580 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.core.zookeeper; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static java.util.Objects.requireNonNull; +import static org.apache.accumulo.core.util.LazySingletons.RANDOM; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.NavigableSet; +import java.util.Optional; +import java.util.Set; +import java.util.TreeSet; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; +import java.util.function.Predicate; + +import org.apache.accumulo.core.lock.ServiceLock; +import org.apache.accumulo.core.lock.ServiceLockData; +import org.apache.accumulo.core.lock.ServiceLockPaths.ServiceLockPath; +import org.apache.accumulo.core.util.cache.Caches; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.KeeperException.Code; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.data.Stat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Ticker; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; + +/** + * A cache for values stored in ZooKeeper. Values are kept up to date as they change. + */ +public class ZooCache { + + public interface ZooCacheWatcher extends Consumer<WatchedEvent> {} + + private static final Logger log = LoggerFactory.getLogger(ZooCache.class); + + private final NavigableSet<String> watchedPaths; + + // visible for tests + protected final ZCacheWatcher watcher = new ZCacheWatcher(); + private final List<ZooCacheWatcher> externalWatchers = + Collections.synchronizedList(new ArrayList<>()); + + private static final AtomicLong nextCacheId = new AtomicLong(0); + private final String cacheId = "ZC" + nextCacheId.incrementAndGet(); + + public static final Duration CACHE_DURATION = Duration.ofMinutes(30); + + private final Cache<String,ZcNode> cache; + + private final ConcurrentMap<String,ZcNode> nodeCache; + + private final ZooSession zk; + + private volatile boolean closed = false; + + private final AtomicLong updateCount = new AtomicLong(); + + private final AtomicLong zkClientTracker = new AtomicLong(); + + class ZCacheWatcher implements Watcher { + @Override + public void process(WatchedEvent event) { + if (log.isTraceEnabled()) { + log.trace("{}: {}", cacheId, event); + } + + switch (event.getType()) { + case NodeChildrenChanged: + // According to documentation we should not receive this event. + // According to https://issues.apache.org/jira/browse/ZOOKEEPER-4475 we + // may receive this event (Fixed in 3.9.0) + break; + case ChildWatchRemoved: + case DataWatchRemoved: + // We don't need to do anything with the cache on these events. + break; + case NodeDataChanged: + log.trace("{} node data changed; clearing {}", cacheId, event.getPath()); + clear(path -> path.equals(event.getPath())); + break; + case NodeCreated: + case NodeDeleted: + // With the Watcher being set at a higher level we need to remove + // the parent of the affected node and all of its children from the cache + // so that the parent and children node can be re-cached. If we only remove the + // affected node, then the cached children in the parent could be incorrect. + int lastSlash = event.getPath().lastIndexOf('/'); + String parent = lastSlash == 0 ? "/" : event.getPath().substring(0, lastSlash); + log.trace("{} node created or deleted {}; clearing {}", cacheId, event.getPath(), parent); + clear((path) -> path.startsWith(parent)); + break; + case PersistentWatchRemoved: + log.warn( + "{} persistent watch removed {} which is only done in ZooSession.addPersistentRecursiveWatchers; ignoring;", + cacheId, event.getPath()); + break; + case None: + switch (event.getState()) { + case Closed: + log.trace("{} ZooKeeper connection closed, clearing cache; {}", cacheId, event); + clear(); + break; + case Disconnected: + log.trace("{} ZooKeeper connection disconnected, clearing cache; {}", cacheId, event); + clear(); + break; + case SyncConnected: + log.trace("{} ZooKeeper connection established, ignoring; {}", cacheId, event); + break; + case Expired: + log.trace("{} ZooKeeper connection expired, clearing cache; {}", cacheId, event); + clear(); + break; + default: + log.warn("{} Unhandled state {}", cacheId, event); + break; + } + break; + default: + log.warn("{} Unhandled event type {}", cacheId, event); + break; + } + + externalWatchers.forEach(ew -> { + try { + ew.accept(event); + } catch (Exception e) { + log.error( + "Exception calling external watcher. This is a bug and could impact proper operation.", + e); + } + }); + } + } + + /** + * Creates a ZooCache instance that uses the supplied ZooSession for communicating with the + * instance's ZooKeeper servers. The ZooCache will create persistent watchers at the given + * pathsToWatch, if any, to be updated when changes are made in ZooKeeper for nodes at or below in + * the tree. If ZooCacheWatcher's are added via {@code addZooCacheWatcher}, then they will be + * notified when this object is notified of changes via the PersistentWatcher callback. + * + * @param zk ZooSession for this instance + * @param pathsToWatch Paths in ZooKeeper to watch + */ + public ZooCache(ZooSession zk, Set<String> pathsToWatch) { + this(zk, pathsToWatch, Ticker.systemTicker()); + } + + // visible for tests that use a Ticker + public ZooCache(ZooSession zk, Set<String> pathsToWatch, Ticker ticker) { + this.zk = requireNonNull(zk); + // this initial value is meant to indicate watchers were never setup + this.zkClientTracker.set(-1); + this.cache = Caches.getInstance().createNewBuilder(Caches.CacheName.ZOO_CACHE, false) + .ticker(requireNonNull(ticker)).expireAfterAccess(CACHE_DURATION).build(); + // The concurrent map returned by Caffeine will only allow one thread to run at a time for a + // given key and ZooCache relies on that. Not all concurrent map implementations have this + // behavior for their compute functions. + this.nodeCache = cache.asMap(); + this.watchedPaths = Collections.unmodifiableNavigableSet(new TreeSet<>(pathsToWatch)); + setupWatchers(); + log.trace("{} created new cache watching {}", cacheId, pathsToWatch, new Exception()); + } + + public void addZooCacheWatcher(ZooCacheWatcher watcher) { + externalWatchers.add(requireNonNull(watcher)); + } + + // visible for tests + long getZKClientObjectVersion() { + long counter = zk.getConnectionCounter(); + // -1 is used to signify ZK has not been setup in this code and this code assume ZooSession will + // always return something >= 0. + Preconditions.checkState(counter >= 0); + return counter; + } + + /** + * @return true if ZK has changed; false otherwise + */ + private boolean handleZKConnectionChange() { + final long currentCount = getZKClientObjectVersion(); + final long oldCount = zkClientTracker.get(); + if (oldCount != currentCount) { + return setupWatchers(); + } + return false; + } + + // Called on construction and when ZooKeeper connection changes + synchronized boolean setupWatchers() { + + final long currentCount = getZKClientObjectVersion(); + final long oldCount = zkClientTracker.get(); + + if (currentCount == oldCount) { + return false; + } + + for (String left : watchedPaths) { + for (String right : watchedPaths) { + if (!left.equals(right) && left.contains(right)) { + throw new IllegalArgumentException( + "Overlapping paths found in paths to watch. left: " + left + ", right: " + right); + } + } + } + + try { + long zkId = zk.addPersistentRecursiveWatchers(watchedPaths, watcher); + zkClientTracker.set(zkId); + clear(); + log.trace("{} Reinitialized persistent watchers and cleared cache {}", cacheId, + zkClientTracker.get()); + return true; + } catch (KeeperException | InterruptedException e) { + throw new RuntimeException("Error setting up persistent recursive watcher", e); + } + + } + + private boolean isWatchedPath(String path) { + // Check that the path is equal to, or a descendant of, a watched path + var floor = watchedPaths.floor(path); + return floor != null + && (floor.equals("/") || floor.equals(path) || path.startsWith(floor + "/")); + } + + private void ensureWatched(String path) { + if (!isWatchedPath(path)) { + throw new IllegalStateException("Supplied path " + path + " is not watched by this ZooCache"); + } + } + + private abstract class ZooRunnable<T> { + /** + * Runs an operation against ZooKeeper. Retries are performed by the retry method when + * KeeperExceptions occur. + * - * Changes were made in ACCUMULO-4388 so that the run method no longer accepts Zookeeper as an - * argument, and instead relies on the ZooRunnable implementation to call - * {@link #getZooKeeper()}. Performing the call to retrieving a ZooKeeper Session after caches - * are checked has the benefit of limiting ZK connections and blocking as a result of obtaining - * these sessions. - * + * @return T the result of the runnable + */ + abstract T run() throws KeeperException, InterruptedException; + + /** - * Retry will attempt to call the run method. Run should make a call to {@link #getZooKeeper()} - * after checks to cached information are made. This change, per ACCUMULO-4388 ensures that we - * don't create a ZooKeeper session when information is cached, and access to ZooKeeper is - * unnecessary. ++ * Retry will attempt to call the run method. + * + * @return result of the runnable access success ( i.e. no exceptions ). + */ + public T retry() { + + int sleepTime = 100; + + while (true) { + + try { + T result = run(); + if (handleZKConnectionChange()) { + continue; + } + return result; + } catch (KeeperException | ZcException e) { + KeeperException ke; + if (e instanceof ZcException) { + ke = ((ZcException) e).getZKException(); + } else { + ke = ((KeeperException) e); + } + final Code code = ke.code(); + if (code == Code.NONODE) { + log.error("Looked up non-existent node in cache " + ke.getPath(), e); + } else if (code == Code.CONNECTIONLOSS || code == Code.OPERATIONTIMEOUT + || code == Code.SESSIONEXPIRED) { + log.warn("Saw (possibly) transient exception communicating with ZooKeeper, will retry", + e); + } else { + log.warn("Zookeeper error, will retry", e); + } + } catch (InterruptedException | ZcInterruptedException e) { + log.info("Zookeeper error, will retry", e); + } + + try { + // do not hold lock while sleeping + Thread.sleep(sleepTime); + } catch (InterruptedException e) { + log.debug("Wait in retry() was interrupted.", e); + } + if (sleepTime < 10_000) { + sleepTime = (int) (sleepTime + sleepTime * RANDOM.get().nextDouble()); + } + } + } + + } + + private static class ZcException extends RuntimeException { + private static final long serialVersionUID = 1; + + private ZcException(KeeperException e) { + super(e); + } + + public KeeperException getZKException() { + return (KeeperException) getCause(); + } + } + + private static class ZcInterruptedException extends RuntimeException { + private static final long serialVersionUID = 1; + + private ZcInterruptedException(InterruptedException e) { + super(e); + } + } + + /** + * Gets the children of the given node. + * + * @param zPath path of node + * @return children list, or null if node has no children or does not exist + */ + public List<String> getChildren(final String zPath) { + Preconditions.checkState(!closed); + ensureWatched(zPath); + ZooRunnable<List<String>> zr = new ZooRunnable<>() { + + @Override + public List<String> run() throws KeeperException, InterruptedException { + + var zcNode = nodeCache.get(zPath); + if (zcNode != null && zcNode.cachedChildren()) { + return zcNode.getChildren(); + } + + log.trace("{} {} was not in children cache, looking up in zookeeper", cacheId, zPath); + + zcNode = nodeCache.compute(zPath, (zp, zcn) -> { + // recheck the children now that lock is held on key + if (zcn != null && zcn.cachedChildren()) { + return zcn; + } + try { + List<String> children = zk.getChildren(zPath, null); + log.trace("{} adding {} children of {} to cache", cacheId, children.size(), zPath); + return new ZcNode(children, zcn); + } catch (KeeperException.NoNodeException nne) { + log.trace("{} getChildren saw that {} does not exists", cacheId, zPath); + return ZcNode.NON_EXISTENT; + } catch (KeeperException e) { + throw new ZcException(e); + } catch (InterruptedException e) { + throw new ZcInterruptedException(e); + } + }); + // increment this after compute call completes when the change is visible + updateCount.incrementAndGet(); + return zcNode.getChildren(); + } + }; + + return zr.retry(); + } + + /** + * Gets data at the given path. Status information is not returned. + * + * @param zPath path to get + * @return path data, or null if non-existent + */ + public byte[] get(final String zPath) { + return get(zPath, null); + } + + /** + * Gets data at the given path, filling status information into the given <code>Stat</code> + * object. + * + * @param zPath path to get + * @param status status object to populate + * @return path data, or null if non-existent + */ + public byte[] get(final String zPath, final ZcStat status) { + Preconditions.checkState(!closed); + ensureWatched(zPath); + ZooRunnable<byte[]> zr = new ZooRunnable<>() { + + @Override + public byte[] run() throws KeeperException, InterruptedException { + + var zcNode = nodeCache.get(zPath); + if (zcNode != null && zcNode.cachedData()) { + if (status != null) { + copyStats(status, zcNode.getStat()); + } + return zcNode.getData(); + } + + log.trace("{} {} was not in data cache, looking up in zookeeper", cacheId, zPath); + + zcNode = nodeCache.compute(zPath, (zp, zcn) -> { + // recheck the now that lock is held on key, it may be present now. Could have been + // computed while waiting for lock. + if (zcn != null && zcn.cachedData()) { + return zcn; + } + try { + byte[] data = null; + Stat stat = new Stat(); + ZcStat zstat = null; + try { + data = zk.getData(zPath, null, stat); + zstat = new ZcStat(stat); + } catch (KeeperException.NoNodeException e1) { + log.trace("{} zookeeper did not contain {}", cacheId, zPath); + return ZcNode.NON_EXISTENT; + } catch (InterruptedException e) { + throw new ZcInterruptedException(e); + } + if (log.isTraceEnabled()) { + log.trace("{} zookeeper contained {} {}", cacheId, zPath, + (data == null ? null : new String(data, UTF_8))); + } + return new ZcNode(data, zstat, zcn); + } catch (KeeperException ke) { + throw new ZcException(ke); + } + }); + + // update this after the compute call completes when the change is visible + updateCount.incrementAndGet(); + if (status != null) { + copyStats(status, zcNode.getStat()); + } + return zcNode.getData(); + } + }; + + return zr.retry(); + } + + /** + * Helper method to copy stats from the cached stat into userStat + * + * @param userStat user Stat object + * @param cachedStat cached statistic, that is or will be cached + */ + protected void copyStats(ZcStat userStat, ZcStat cachedStat) { + Preconditions.checkState(!closed); + if (userStat != null && cachedStat != null) { + userStat.set(cachedStat); + } + } + + /** + * Clears this cache. + */ + protected void clear() { + Preconditions.checkState(!closed); + nodeCache.clear(); + updateCount.incrementAndGet(); + log.trace("{} cleared all from cache", cacheId); + } + + public void close() { + clear(); + closed = true; + } + + /** + * Returns a monotonically increasing count of the number of time the cache was updated. If the + * count is the same, then it means cache did not change. + */ + public long getUpdateCount() { + Preconditions.checkState(!closed); + return updateCount.get(); + } + + /** + * Checks if a data value (or lack of one) is cached. + * + * @param zPath path of node + * @return true if data value is cached + */ + @VisibleForTesting + public boolean dataCached(String zPath) { + ensureWatched(zPath); + var zcn = nodeCache.get(zPath); + return zcn != null && zcn.cachedData(); + } + + /** + * Checks if children of a node (or lack of them) are cached. + * + * @param zPath path of node + * @return true if children are cached + */ + @VisibleForTesting + public boolean childrenCached(String zPath) { + ensureWatched(zPath); + var zcn = nodeCache.get(zPath); + return zcn != null && zcn.cachedChildren(); + } + + /** + * Removes all paths in the cache match the predicate. + */ + public void clear(Predicate<String> pathPredicate) { + Preconditions.checkState(!closed); + Predicate<String> pathPredicateWrapper = path -> { + boolean testResult = pathPredicate.test(path); + if (testResult) { + updateCount.incrementAndGet(); + log.trace("{} removing {} from cache", cacheId, path); + } + return testResult; + }; + nodeCache.keySet().removeIf(pathPredicateWrapper); + } + + /** + * Clears this cache of all information about nodes rooted at the given path. + * + * @param zPath path of top node + */ + public void clear(String zPath) { + ensureWatched(zPath); + clear(path -> path.startsWith(zPath)); + } + + /** + * Gets the lock data from the node in the cache at the specified path + */ + public Optional<ServiceLockData> getLockData(ServiceLockPath path) { + ensureWatched(path.toString()); + List<String> children = ServiceLock.validateAndSort(path, getChildren(path.toString())); + if (children == null || children.isEmpty()) { + return Optional.empty(); + } + String lockNode = children.get(0); + + byte[] lockData = get(path + "/" + lockNode); + if (log.isTraceEnabled()) { + log.trace("{} Data from lockNode {} is {}", cacheId, lockNode, new String(lockData, UTF_8)); + } + if (lockData == null) { + lockData = new byte[0]; + } + return ServiceLockData.parse(lockData); + } + +} diff --cc core/src/test/java/org/apache/accumulo/core/spi/compaction/RatioBasedCompactionPlannerTest.java index 90fc5a4c58,55cf3e1f97..a782b69465 --- a/core/src/test/java/org/apache/accumulo/core/spi/compaction/RatioBasedCompactionPlannerTest.java +++ b/core/src/test/java/org/apache/accumulo/core/spi/compaction/RatioBasedCompactionPlannerTest.java @@@ -667,9 -584,9 +667,9 @@@ public class RatioBasedCompactionPlanne private CompactionJob createJob(CompactionKind kind, Set<CompactableFile> all, Set<CompactableFile> files) { -- return new CompactionPlanImpl.BuilderImpl(kind, all, all) - .addJob((short) all.size(), CompactionExecutorIdImpl.internalId(csid, "small"), files) - .build().getJobs().iterator().next(); ++ return new CompactionPlanImpl.BuilderImpl(kind, all) + .addJob((short) all.size(), CompactorGroupId.of("small"), files).build().getJobs() + .iterator().next(); } // Create a set of files whose sizes would require certain compaction ratios to compact @@@ -817,7 -727,7 +817,7 @@@ @Override public Builder createPlanBuilder() { -- return new CompactionPlanImpl.BuilderImpl(kind, all, candidates); ++ return new CompactionPlanImpl.BuilderImpl(kind, candidates); } }; } diff --cc server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionJobGenerator.java index 349dedcc97,0000000000..80e72b1561 mode 100644,000000..100644 --- a/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionJobGenerator.java +++ b/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionJobGenerator.java @@@ -1,342 -1,0 +1,342 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.server.compaction; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import org.apache.accumulo.core.client.PluginEnvironment; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.client.admin.compaction.CompactableFile; +import org.apache.accumulo.core.conf.ConfigurationTypeHelper; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.NamespaceId; +import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.fate.FateId; +import org.apache.accumulo.core.logging.ConditionalLogger; +import org.apache.accumulo.core.metadata.CompactableFileImpl; +import org.apache.accumulo.core.metadata.schema.TabletMetadata; +import org.apache.accumulo.core.spi.common.ServiceEnvironment; +import org.apache.accumulo.core.spi.compaction.CompactionDispatcher; +import org.apache.accumulo.core.spi.compaction.CompactionJob; +import org.apache.accumulo.core.spi.compaction.CompactionKind; +import org.apache.accumulo.core.spi.compaction.CompactionPlan; +import org.apache.accumulo.core.spi.compaction.CompactionPlanner; +import org.apache.accumulo.core.spi.compaction.CompactionServiceId; +import org.apache.accumulo.core.spi.compaction.CompactionServices; +import org.apache.accumulo.core.util.cache.Caches; +import org.apache.accumulo.core.util.cache.Caches.CacheName; +import org.apache.accumulo.core.util.compaction.CompactionJobImpl; +import org.apache.accumulo.core.util.compaction.CompactionPlanImpl; +import org.apache.accumulo.core.util.compaction.CompactionPlannerInitParams; +import org.apache.accumulo.core.util.compaction.CompactionServicesConfig; +import org.apache.accumulo.core.util.time.SteadyTime; +import org.apache.accumulo.server.ServiceEnvironmentImpl; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.slf4j.event.Level; + +import com.github.benmanes.caffeine.cache.Cache; + +public class CompactionJobGenerator { + private static final Logger log = LoggerFactory.getLogger(CompactionJobGenerator.class); + private static final Logger UNKNOWN_SERVICE_ERROR_LOG = + new ConditionalLogger.EscalatingLogger(log, Duration.ofMinutes(5), 3000, Level.ERROR); + private static final Logger PLANNING_INIT_ERROR_LOG = + new ConditionalLogger.EscalatingLogger(log, Duration.ofMinutes(5), 3000, Level.ERROR); + private static final Logger PLANNING_ERROR_LOG = + new ConditionalLogger.EscalatingLogger(log, Duration.ofMinutes(5), 3000, Level.ERROR); + + private final CompactionServicesConfig servicesConfig; + private final Map<CompactionServiceId,CompactionPlanner> planners = new HashMap<>(); + private final Cache<TableId,CompactionDispatcher> dispatchers; + private final Set<CompactionServiceId> serviceIds; + private final PluginEnvironment env; + private final Map<FateId,Map<String,String>> allExecutionHints; + private final SteadyTime steadyTime; + + public CompactionJobGenerator(PluginEnvironment env, + Map<FateId,Map<String,String>> executionHints, SteadyTime steadyTime) { + servicesConfig = new CompactionServicesConfig(env.getConfiguration()); + serviceIds = servicesConfig.getPlanners().keySet().stream().map(CompactionServiceId::of) + .collect(Collectors.toUnmodifiableSet()); + + dispatchers = Caches.getInstance().createNewBuilder(CacheName.COMPACTION_DISPATCHERS, false) + .maximumSize(10).build(); + this.env = env; + if (executionHints.isEmpty()) { + this.allExecutionHints = executionHints; + } else { + this.allExecutionHints = new HashMap<>(); + // Make the maps that will be passed to plugins unmodifiable. Do this once, so it does not + // need to be done for each tablet. + executionHints.forEach((k, v) -> allExecutionHints.put(k, + v.isEmpty() ? Map.of() : Collections.unmodifiableMap(v))); + } + + this.steadyTime = steadyTime; + } + + public Collection<CompactionJob> generateJobs(TabletMetadata tablet, Set<CompactionKind> kinds) { + Collection<CompactionJob> systemJobs = Set.of(); + + log.trace("Planning for {} {} {}", tablet.getExtent(), kinds, this.hashCode()); + + if (kinds.contains(CompactionKind.SYSTEM)) { + CompactionServiceId serviceId = dispatch(CompactionKind.SYSTEM, tablet, Map.of()); + systemJobs = planCompactions(serviceId, CompactionKind.SYSTEM, tablet, Map.of()); + } + + Collection<CompactionJob> userJobs = Set.of(); + + if (kinds.contains(CompactionKind.USER) && tablet.getSelectedFiles() != null) { + var hints = allExecutionHints.get(tablet.getSelectedFiles().getFateId()); + if (hints != null) { + CompactionServiceId serviceId = dispatch(CompactionKind.USER, tablet, hints); + userJobs = planCompactions(serviceId, CompactionKind.USER, tablet, hints); + } + } + + if (userJobs.isEmpty()) { + return systemJobs; + } else if (systemJobs.isEmpty()) { + return userJobs; + } else { + var all = new ArrayList<CompactionJob>(systemJobs.size() + userJobs.size()); + all.addAll(systemJobs); + all.addAll(userJobs); + return all; + } + } + + private CompactionServiceId dispatch(CompactionKind kind, TabletMetadata tablet, + Map<String,String> executionHints) { + + CompactionDispatcher dispatcher = dispatchers.get(tablet.getTableId(), + tableId -> CompactionPluginUtils.createDispatcher((ServiceEnvironment) env, tableId)); + + CompactionDispatcher.DispatchParameters dispatchParams = + new CompactionDispatcher.DispatchParameters() { + @Override + public CompactionServices getCompactionServices() { + return () -> serviceIds; + } + + @Override + public ServiceEnvironment getServiceEnv() { + return (ServiceEnvironment) env; + } + + @Override + public CompactionKind getCompactionKind() { + return kind; + } + + @Override + public Map<String,String> getExecutionHints() { + return executionHints; + } + }; + + return dispatcher.dispatch(dispatchParams).getService(); + } + + private Collection<CompactionJob> planCompactions(CompactionServiceId serviceId, + CompactionKind kind, TabletMetadata tablet, Map<String,String> executionHints) { + + if (!servicesConfig.getPlanners().containsKey(serviceId.canonical())) { + UNKNOWN_SERVICE_ERROR_LOG.trace( + "Table {} returned non-existent compaction service {} for compaction type {}. Check" + + " the table compaction dispatcher configuration. No compactions will happen" + + " until the configuration is fixed. This log message is temporarily suppressed.", + tablet.getExtent().tableId(), serviceId, kind); + return Set.of(); + } + + CompactionPlanner planner = + planners.computeIfAbsent(serviceId, sid -> createPlanner(tablet.getTableId(), serviceId)); + + // selecting indicator + // selected files + + String ratioStr = + env.getConfiguration(tablet.getTableId()).get(Property.TABLE_MAJC_RATIO.getKey()); + if (ratioStr == null) { + ratioStr = Property.TABLE_MAJC_RATIO.getDefaultValue(); + } + + double ratio = Double.parseDouble(ratioStr); + + Set<CompactableFile> allFiles = tablet.getFilesMap().entrySet().stream() + .map(entry -> new CompactableFileImpl(entry.getKey(), entry.getValue())) + .collect(Collectors.toUnmodifiableSet()); + Set<CompactableFile> candidates; + + if (kind == CompactionKind.SYSTEM) { + if (tablet.getExternalCompactions().isEmpty() && tablet.getSelectedFiles() == null) { + candidates = allFiles; + } else { + var tmpFiles = new HashMap<>(tablet.getFilesMap()); + // remove any files that are in active compactions + tablet.getExternalCompactions().values().stream().flatMap(ecm -> ecm.getJobFiles().stream()) + .forEach(tmpFiles::remove); + // remove any files that are selected and the user compaction has completed + // at least 1 job, otherwise we can keep the files + var selectedFiles = tablet.getSelectedFiles(); + + if (selectedFiles != null) { + long selectedExpirationDuration = + ConfigurationTypeHelper.getTimeInMillis(env.getConfiguration(tablet.getTableId()) + .get(Property.TABLE_COMPACTION_SELECTION_EXPIRATION.getKey())); + + // If jobs are completed, or selected time has not expired, the remove + // from the candidate list otherwise we can cancel the selection + if (selectedFiles.getCompletedJobs() > 0 + || (steadyTime.minus(selectedFiles.getSelectedTime()).toMillis() + < selectedExpirationDuration)) { + tmpFiles.keySet().removeAll(selectedFiles.getFiles()); + } + } + candidates = tmpFiles.entrySet().stream() + .map(entry -> new CompactableFileImpl(entry.getKey(), entry.getValue())) + .collect(Collectors.toUnmodifiableSet()); + } + } else if (kind == CompactionKind.USER) { + var selectedFiles = new HashSet<>(tablet.getSelectedFiles().getFiles()); + tablet.getExternalCompactions().values().stream().flatMap(ecm -> ecm.getJobFiles().stream()) + .forEach(selectedFiles::remove); + candidates = selectedFiles.stream() + .map(file -> new CompactableFileImpl(file, tablet.getFilesMap().get(file))) + .collect(Collectors.toUnmodifiableSet()); + } else { + throw new UnsupportedOperationException(); + } + + if (candidates.isEmpty()) { + // there are not candidate files for compaction, so no reason to call the planner + return Set.of(); + } + + CompactionPlanner.PlanningParameters params = new CompactionPlanner.PlanningParameters() { + + @Override + public NamespaceId getNamespaceId() throws TableNotFoundException { + return ((ServiceEnvironmentImpl) env).getContext().getNamespaceId(tablet.getTableId()); + } + + @Override + public TableId getTableId() { + return tablet.getTableId(); + } + + @Override + public ServiceEnvironment getServiceEnvironment() { + return (ServiceEnvironment) env; + } + + @Override + public CompactionKind getKind() { + return kind; + } + + @Override + public double getRatio() { + return ratio; + } + + @Override + public Collection<CompactableFile> getAll() { + return allFiles; + } + + @Override + public Collection<CompactableFile> getCandidates() { + return candidates; + } + + @Override + public Collection<CompactionJob> getRunningCompactions() { + var allFiles2 = tablet.getFilesMap(); + return tablet.getExternalCompactions().values().stream().map(ecMeta -> { + Collection<CompactableFile> files = ecMeta.getJobFiles().stream() + .map(f -> new CompactableFileImpl(f, allFiles2.get(f))).collect(Collectors.toList()); + CompactionJob job = new CompactionJobImpl(ecMeta.getPriority(), + ecMeta.getCompactionGroupId(), files, ecMeta.getKind()); + return job; + }).collect(Collectors.toUnmodifiableList()); + } + + @Override + public Map<String,String> getExecutionHints() { + return executionHints; + } + + @Override + public CompactionPlan.Builder createPlanBuilder() { - return new CompactionPlanImpl.BuilderImpl(kind, allFiles, candidates); ++ return new CompactionPlanImpl.BuilderImpl(kind, candidates); + } + }; + return planCompactions(planner, params, serviceId); + } + + private CompactionPlanner createPlanner(TableId tableId, CompactionServiceId serviceId) { + + CompactionPlanner planner; + String plannerClassName = null; + Map<String,String> options = null; + try { + plannerClassName = servicesConfig.getPlanners().get(serviceId.canonical()); + options = servicesConfig.getOptions().get(serviceId.canonical()); + planner = env.instantiate(tableId, plannerClassName, CompactionPlanner.class); + CompactionPlannerInitParams initParameters = new CompactionPlannerInitParams(serviceId, + servicesConfig.getPlannerPrefix(serviceId.canonical()), + servicesConfig.getOptions().get(serviceId.canonical()), (ServiceEnvironment) env); + planner.init(initParameters); + } catch (Exception e) { + PLANNING_INIT_ERROR_LOG.trace( + "Failed to create compaction planner for service:{} tableId:{} using class:{} options:{}. Compaction " + + "service will not start any new compactions until its configuration is fixed. This log message is " + + "temporarily suppressed.", + serviceId, tableId, plannerClassName, options, e); + planner = new ProvisionalCompactionPlanner(serviceId); + } + return planner; + } + + private Collection<CompactionJob> planCompactions(CompactionPlanner planner, + CompactionPlanner.PlanningParameters params, CompactionServiceId serviceId) { + try { + return planner.makePlan(params).getJobs(); + } catch (Exception e) { + PLANNING_ERROR_LOG.trace( + "Failed to plan compactions for service:{} kind:{} tableId:{} hints:{}. Compaction service may not start any" + + " new compactions until this issue is resolved. Duplicates of this log message are temporarily" + + " suppressed.", + serviceId, params.getKind(), params.getTableId(), params.getExecutionHints(), e); + return Set.of(); + } + } +}