This is an automated email from the ASF dual-hosted git repository.
kturner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push:
new 28d152348d Breaks Fate into Fate and FateClient (#6142)
28d152348d is described below
commit 28d152348d180b22a53b555a4b2f5eb5d2b72693
Author: Keith Turner <[email protected]>
AuthorDate: Tue Feb 24 14:53:30 2026 -0800
Breaks Fate into Fate and FateClient (#6142)
---
.../java/org/apache/accumulo/core/fate/Fate.java | 146 +----------------
.../org/apache/accumulo/core/fate/FateClient.java | 179 +++++++++++++++++++++
.../accumulo/manager/FateServiceHandler.java | 50 +++---
.../java/org/apache/accumulo/manager/Manager.java | 36 +++--
.../coordinator/CompactionCoordinator.java | 24 +--
.../coordinator/DeadCompactionDetector.java | 20 +--
.../manager/merge/FindMergeableRangeTask.java | 2 +-
.../apache/accumulo/manager/split/Splitter.java | 2 +-
.../compaction/CompactionCoordinatorTest.java | 9 +-
9 files changed, 249 insertions(+), 219 deletions(-)
diff --git a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
index eebe114785..58bdc5cef6 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
@@ -19,12 +19,6 @@
package org.apache.accumulo.core.fate;
import static java.util.concurrent.TimeUnit.SECONDS;
-import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.FAILED;
-import static
org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.FAILED_IN_PROGRESS;
-import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.NEW;
-import static
org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.SUBMITTED;
-import static
org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.SUCCESSFUL;
-import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.UNKNOWN;
import static
org.apache.accumulo.core.util.threads.ThreadPoolNames.META_DEAD_RESERVATION_CLEANER_POOL;
import static
org.apache.accumulo.core.util.threads.ThreadPoolNames.USER_DEAD_RESERVATION_CLEANER_POOL;
@@ -36,7 +30,6 @@ import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
-import java.util.Optional;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ExecutorService;
@@ -48,18 +41,12 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
-import java.util.stream.Stream;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.core.fate.FateStore.FateTxStore;
-import org.apache.accumulo.core.fate.FateStore.Seeder;
-import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus;
import org.apache.accumulo.core.logging.FateLogger;
import org.apache.accumulo.core.manager.thrift.TFateOperation;
-import org.apache.accumulo.core.util.UtilWaitThread;
import org.apache.accumulo.core.util.threads.ThreadPools;
-import org.apache.thrift.TApplicationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -73,16 +60,15 @@ import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
*/
@SuppressFBWarnings(value = "CT_CONSTRUCTOR_THROW",
justification = "Constructor validation is required for proper
initialization")
-public class Fate<T> {
+public class Fate<T> extends FateClient<T> {
- private static final Logger log = LoggerFactory.getLogger(Fate.class);
+ static final Logger log = LoggerFactory.getLogger(Fate.class);
private final FateStore<T> store;
private final ScheduledFuture<?> fatePoolsWatcherFuture;
private final AtomicInteger needMoreThreadsWarnCount = new AtomicInteger(0);
private final ExecutorService deadResCleanerExecutor;
- private static final EnumSet<TStatus> FINISHED_STATES = EnumSet.of(FAILED,
SUCCESSFUL, UNKNOWN);
public static final Duration INITIAL_DELAY = Duration.ofSeconds(3);
private static final Duration DEAD_RES_CLEANUP_DELAY = Duration.ofMinutes(3);
public static final Duration POOL_WATCHER_DELAY = Duration.ofSeconds(30);
@@ -262,6 +248,7 @@ public class Fate<T> {
public Fate(T environment, FateStore<T> store, boolean runDeadResCleaner,
Function<Repo<T>,String> toLogStrFunc, AccumuloConfiguration conf,
ScheduledThreadPoolExecutor genSchedExecutor) {
+ super(store, toLogStrFunc);
this.store = FateLogger.wrap(store, toLogStrFunc, false);
fatePoolsWatcherFuture =
@@ -382,133 +369,6 @@ public class Fate<T> {
return needMoreThreadsWarnCount;
}
- // get a transaction id back to the requester before doing any work
- public FateId startTransaction() {
- return store.create();
- }
-
- public Seeder<T> beginSeeding() {
- return store.beginSeeding();
- }
-
- public void seedTransaction(FateOperation fateOp, FateKey fateKey, Repo<T>
repo,
- boolean autoCleanUp) {
- try (var seeder = store.beginSeeding()) {
- @SuppressWarnings("unused")
- var unused = seeder.attemptToSeedTransaction(fateOp, fateKey, repo,
autoCleanUp);
- }
- }
-
- // start work in the transaction.. it is safe to call this
- // multiple times for a transaction... but it will only seed once
- public void seedTransaction(FateOperation fateOp, FateId fateId, Repo<T>
repo,
- boolean autoCleanUp, String goalMessage) {
- log.info("[{}] Seeding {} {} {}", store.type(), fateOp, fateId,
goalMessage);
- store.seedTransaction(fateOp, fateId, repo, autoCleanUp);
- }
-
- // check on the transaction
- public TStatus waitForCompletion(FateId fateId) {
- return store.read(fateId).waitForStatusChange(FINISHED_STATES);
- }
-
- /**
- * Attempts to cancel a running Fate transaction
- *
- * @param fateId fate transaction id
- * @return true if transaction transitioned to a failed state or already in
a completed state,
- * false otherwise
- */
- public boolean cancel(FateId fateId) {
- for (int retries = 0; retries < 5; retries++) {
- Optional<FateTxStore<T>> optionalTxStore = store.tryReserve(fateId);
- if (optionalTxStore.isPresent()) {
- var txStore = optionalTxStore.orElseThrow();
- try {
- TStatus status = txStore.getStatus();
- log.info("[{}] status is: {}", store.type(), status);
- if (status == NEW || status == SUBMITTED) {
- txStore.setTransactionInfo(TxInfo.EXCEPTION, new
TApplicationException(
- TApplicationException.INTERNAL_ERROR, "Fate transaction
cancelled by user"));
- txStore.setStatus(FAILED_IN_PROGRESS);
- log.info(
- "[{}] Updated status for {} to FAILED_IN_PROGRESS because it
was cancelled by user",
- store.type(), fateId);
- return true;
- } else {
- log.info("[{}] {} cancelled by user but already in progress or
finished state",
- store.type(), fateId);
- return false;
- }
- } finally {
- txStore.unreserve(Duration.ZERO);
- }
- } else {
- // reserved, lets retry.
- UtilWaitThread.sleep(500);
- }
- }
- log.info("[{}] Unable to reserve transaction {} to cancel it",
store.type(), fateId);
- return false;
- }
-
- // resource cleanup
- public void delete(FateId fateId) {
- FateTxStore<T> txStore = store.reserve(fateId);
- try {
- switch (txStore.getStatus()) {
- case NEW:
- case SUBMITTED:
- case FAILED:
- case SUCCESSFUL:
- txStore.delete();
- break;
- case FAILED_IN_PROGRESS:
- case IN_PROGRESS:
- throw new IllegalStateException("Can not delete in progress
transaction " + fateId);
- case UNKNOWN:
- // nothing to do, it does not exist
- break;
- }
- } finally {
- txStore.unreserve(Duration.ZERO);
- }
- }
-
- public String getReturn(FateId fateId) {
- FateTxStore<T> txStore = store.reserve(fateId);
- try {
- if (txStore.getStatus() != SUCCESSFUL) {
- throw new IllegalStateException(
- "Tried to get exception when transaction " + fateId + " not in
successful state");
- }
- return (String) txStore.getTransactionInfo(TxInfo.RETURN_VALUE);
- } finally {
- txStore.unreserve(Duration.ZERO);
- }
- }
-
- // get reportable failures
- public Exception getException(FateId fateId) {
- FateTxStore<T> txStore = store.reserve(fateId);
- try {
- if (txStore.getStatus() != FAILED) {
- throw new IllegalStateException(
- "Tried to get exception when transaction " + fateId + " not in
failed state");
- }
- return (Exception) txStore.getTransactionInfo(TxInfo.EXCEPTION);
- } finally {
- txStore.unreserve(Duration.ZERO);
- }
- }
-
- /**
- * Lists transctions for a given fate key type.
- */
- public Stream<FateKey> list(FateKey.FateKeyType type) {
- return store.list(type);
- }
-
/**
* Initiates shutdown of background threads that run fate operations and
cleanup fate data and
* optionally waits on them. Leaves the fate object in a state where it can
still update and read
diff --git a/core/src/main/java/org/apache/accumulo/core/fate/FateClient.java
b/core/src/main/java/org/apache/accumulo/core/fate/FateClient.java
new file mode 100644
index 0000000000..2dc472e4bd
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/fate/FateClient.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.fate;
+
+import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.FAILED;
+import static
org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.FAILED_IN_PROGRESS;
+import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.NEW;
+import static
org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.SUBMITTED;
+import static
org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.SUCCESSFUL;
+import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.UNKNOWN;
+
+import java.time.Duration;
+import java.util.EnumSet;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.stream.Stream;
+
+import org.apache.accumulo.core.logging.FateLogger;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.thrift.TApplicationException;
+
+/**
+ * Supports initiating and checking status of fate operations.
+ *
+ */
+public class FateClient<T> {
+
+ private final FateStore<T> store;
+
+ private static final EnumSet<ReadOnlyFateStore.TStatus> FINISHED_STATES =
+ EnumSet.of(FAILED, SUCCESSFUL, UNKNOWN);
+
+ public FateClient(FateStore<T> store, Function<Repo<T>,String> toLogStrFunc)
{
+ this.store = FateLogger.wrap(store, toLogStrFunc, false);
+ }
+
+ // get a transaction id back to the requester before doing any work
+ public FateId startTransaction() {
+ return store.create();
+ }
+
+ public FateStore.Seeder<T> beginSeeding() {
+ return store.beginSeeding();
+ }
+
+ public void seedTransaction(Fate.FateOperation fateOp, FateKey fateKey,
Repo<T> repo,
+ boolean autoCleanUp) {
+ try (var seeder = store.beginSeeding()) {
+ @SuppressWarnings("unused")
+ var unused = seeder.attemptToSeedTransaction(fateOp, fateKey, repo,
autoCleanUp);
+ }
+ }
+
+ // start work in the transaction.. it is safe to call this
+ // multiple times for a transaction... but it will only seed once
+ public void seedTransaction(Fate.FateOperation fateOp, FateId fateId,
Repo<T> repo,
+ boolean autoCleanUp, String goalMessage) {
+ Fate.log.info("[{}] Seeding {} {} {}", store.type(), fateOp, fateId,
goalMessage);
+ store.seedTransaction(fateOp, fateId, repo, autoCleanUp);
+ }
+
+ // check on the transaction
+ public ReadOnlyFateStore.TStatus waitForCompletion(FateId fateId) {
+ return store.read(fateId).waitForStatusChange(FINISHED_STATES);
+ }
+
+ /**
+ * Attempts to cancel a running Fate transaction
+ *
+ * @param fateId fate transaction id
+ * @return true if transaction transitioned to a failed state or already in
a completed state,
+ * false otherwise
+ */
+ public boolean cancel(FateId fateId) {
+ for (int retries = 0; retries < 5; retries++) {
+ Optional<FateStore.FateTxStore<T>> optionalTxStore =
store.tryReserve(fateId);
+ if (optionalTxStore.isPresent()) {
+ var txStore = optionalTxStore.orElseThrow();
+ try {
+ ReadOnlyFateStore.TStatus status = txStore.getStatus();
+ Fate.log.info("[{}] status is: {}", store.type(), status);
+ if (status == NEW || status == SUBMITTED) {
+ txStore.setTransactionInfo(Fate.TxInfo.EXCEPTION, new
TApplicationException(
+ TApplicationException.INTERNAL_ERROR, "Fate transaction
cancelled by user"));
+ txStore.setStatus(FAILED_IN_PROGRESS);
+ Fate.log.info(
+ "[{}] Updated status for {} to FAILED_IN_PROGRESS because it
was cancelled by user",
+ store.type(), fateId);
+ return true;
+ } else {
+ Fate.log.info("[{}] {} cancelled by user but already in progress
or finished state",
+ store.type(), fateId);
+ return false;
+ }
+ } finally {
+ txStore.unreserve(Duration.ZERO);
+ }
+ } else {
+ // reserved, lets retry.
+ UtilWaitThread.sleep(500);
+ }
+ }
+ Fate.log.info("[{}] Unable to reserve transaction {} to cancel it",
store.type(), fateId);
+ return false;
+ }
+
+ // resource cleanup
+ public void delete(FateId fateId) {
+ FateStore.FateTxStore<T> txStore = store.reserve(fateId);
+ try {
+ switch (txStore.getStatus()) {
+ case NEW:
+ case SUBMITTED:
+ case FAILED:
+ case SUCCESSFUL:
+ txStore.delete();
+ break;
+ case FAILED_IN_PROGRESS:
+ case IN_PROGRESS:
+ throw new IllegalStateException("Can not delete in progress
transaction " + fateId);
+ case UNKNOWN:
+ // nothing to do, it does not exist
+ break;
+ }
+ } finally {
+ txStore.unreserve(Duration.ZERO);
+ }
+ }
+
+ public String getReturn(FateId fateId) {
+ FateStore.FateTxStore<T> txStore = store.reserve(fateId);
+ try {
+ if (txStore.getStatus() != SUCCESSFUL) {
+ throw new IllegalStateException(
+ "Tried to get exception when transaction " + fateId + " not in
successful state");
+ }
+ return (String) txStore.getTransactionInfo(Fate.TxInfo.RETURN_VALUE);
+ } finally {
+ txStore.unreserve(Duration.ZERO);
+ }
+ }
+
+ // get reportable failures
+ public Exception getException(FateId fateId) {
+ FateStore.FateTxStore<T> txStore = store.reserve(fateId);
+ try {
+ if (txStore.getStatus() != FAILED) {
+ throw new IllegalStateException(
+ "Tried to get exception when transaction " + fateId + " not in
failed state");
+ }
+ return (Exception) txStore.getTransactionInfo(Fate.TxInfo.EXCEPTION);
+ } finally {
+ txStore.unreserve(Duration.ZERO);
+ }
+ }
+
+ /**
+ * Lists transctions for a given fate key type.
+ */
+ public Stream<FateKey> list(FateKey.FateKeyType type) {
+ return store.list(type);
+ }
+}
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/FateServiceHandler.java
b/server/manager/src/main/java/org/apache/accumulo/manager/FateServiceHandler.java
index f48ef33e19..f230e17bee 100644
---
a/server/manager/src/main/java/org/apache/accumulo/manager/FateServiceHandler.java
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/FateServiceHandler.java
@@ -131,7 +131,7 @@ class FateServiceHandler implements FateService.Iface {
throws ThriftSecurityException {
authenticate(credentials);
return new TFateId(type,
-
manager.fate(FateInstanceType.fromThrift(type)).startTransaction().getTxUUIDStr());
+
manager.fateClient(FateInstanceType.fromThrift(type)).startTransaction().getTxUUIDStr());
}
@Override
@@ -156,7 +156,7 @@ class FateServiceHandler implements FateService.Iface {
}
goalMessage += "Create " + namespace + " namespace.";
- manager.fate(type).seedTransaction(op, fateId,
+ manager.fateClient(type).seedTransaction(op, fateId,
new TraceRepo<>(new CreateNamespace(c.getPrincipal(), namespace,
options)), autoCleanup,
goalMessage);
break;
@@ -175,7 +175,7 @@ class FateServiceHandler implements FateService.Iface {
}
goalMessage += "Rename " + oldName + " namespace to " + newName;
- manager.fate(type).seedTransaction(op, fateId,
+ manager.fateClient(type).seedTransaction(op, fateId,
new TraceRepo<>(new RenameNamespace(namespaceId, oldName,
newName)), autoCleanup,
goalMessage);
break;
@@ -193,7 +193,7 @@ class FateServiceHandler implements FateService.Iface {
}
goalMessage += "Delete namespace Id: " + namespaceId;
- manager.fate(type).seedTransaction(op, fateId,
+ manager.fateClient(type).seedTransaction(op, fateId,
new TraceRepo<>(new DeleteNamespace(namespaceId)), autoCleanup,
goalMessage);
break;
}
@@ -251,7 +251,7 @@ class FateServiceHandler implements FateService.Iface {
goalMessage += "Create table " + tableName + " " + initialTableState +
" with " + splitCount
+ " splits and initial tabletAvailability of " +
initialTabletAvailability;
- manager.fate(type).seedTransaction(op, fateId,
+ manager.fateClient(type).seedTransaction(op, fateId,
new TraceRepo<>(new CreateTable(c.getPrincipal(), tableName,
timeType, options,
splitsPath, splitCount, splitsDirsPath, initialTableState,
// Set the default tablet to be auto-mergeable with other
tablets if it is split
@@ -287,7 +287,7 @@ class FateServiceHandler implements FateService.Iface {
goalMessage += "Rename table " + oldTableName + "(" + tableId + ") to
" + oldTableName;
try {
- manager.fate(type).seedTransaction(op, fateId,
+ manager.fateClient(type).seedTransaction(op, fateId,
new TraceRepo<>(new RenameTable(namespaceId, tableId,
oldTableName, newTableName)),
autoCleanup, goalMessage);
} catch (NamespaceNotFoundException e) {
@@ -369,7 +369,7 @@ class FateServiceHandler implements FateService.Iface {
goalMessage += " and keep offline.";
}
- manager.fate(type).seedTransaction(op, fateId,
+ manager.fateClient(type).seedTransaction(op, fateId,
new TraceRepo<>(new CloneTable(c.getPrincipal(), srcNamespaceId,
srcTableId,
namespaceId, tableName, propertiesToSet, propertiesToExclude,
keepOffline)),
autoCleanup, goalMessage);
@@ -399,7 +399,7 @@ class FateServiceHandler implements FateService.Iface {
}
goalMessage += "Delete table " + tableName + "(" + tableId + ")";
- manager.fate(type).seedTransaction(op, fateId,
+ manager.fateClient(type).seedTransaction(op, fateId,
new TraceRepo<>(new PreDeleteTable(namespaceId, tableId)),
autoCleanup, goalMessage);
break;
}
@@ -426,7 +426,7 @@ class FateServiceHandler implements FateService.Iface {
goalMessage += "Online table " + tableId;
final EnumSet<TableState> expectedCurrStates =
EnumSet.of(TableState.ONLINE, TableState.OFFLINE);
- manager.fate(type).seedTransaction(op, fateId,
+ manager.fateClient(type).seedTransaction(op, fateId,
new TraceRepo<>(
new ChangeTableState(namespaceId, tableId, tableOp,
expectedCurrStates)),
autoCleanup, goalMessage);
@@ -455,7 +455,7 @@ class FateServiceHandler implements FateService.Iface {
goalMessage += "Offline table " + tableId;
final EnumSet<TableState> expectedCurrStates =
EnumSet.of(TableState.ONLINE, TableState.OFFLINE);
- manager.fate(type).seedTransaction(op, fateId,
+ manager.fateClient(type).seedTransaction(op, fateId,
new TraceRepo<>(
new ChangeTableState(namespaceId, tableId, tableOp,
expectedCurrStates)),
autoCleanup, goalMessage);
@@ -491,7 +491,7 @@ class FateServiceHandler implements FateService.Iface {
startRowStr, endRowStr);
goalMessage += "Merge table " + tableName + "(" + tableId + ") splits
from " + startRowStr
+ " to " + endRowStr;
- manager.fate(type).seedTransaction(op, fateId, new TraceRepo<>(
+ manager.fateClient(type).seedTransaction(op, fateId, new TraceRepo<>(
new TableRangeOp(MergeInfo.Operation.MERGE, namespaceId, tableId,
startRow, endRow)),
autoCleanup, goalMessage);
break;
@@ -523,7 +523,7 @@ class FateServiceHandler implements FateService.Iface {
goalMessage +=
"Delete table " + tableName + "(" + tableId + ") range " +
startRow + " to " + endRow;
- manager.fate(type).seedTransaction(op, fateId, new TraceRepo<>(
+ manager.fateClient(type).seedTransaction(op, fateId, new TraceRepo<>(
new TableRangeOp(MergeInfo.Operation.DELETE, namespaceId, tableId,
startRow, endRow)),
autoCleanup, goalMessage);
break;
@@ -549,7 +549,7 @@ class FateServiceHandler implements FateService.Iface {
}
goalMessage += "Compact table (" + tableId + ") with config " +
compactionConfig;
- manager.fate(type).seedTransaction(op, fateId,
+ manager.fateClient(type).seedTransaction(op, fateId,
new TraceRepo<>(new CompactRange(namespaceId, tableId,
compactionConfig)), autoCleanup,
goalMessage);
break;
@@ -573,7 +573,7 @@ class FateServiceHandler implements FateService.Iface {
}
goalMessage += "Cancel compaction of table (" + tableId + ")";
- manager.fate(type).seedTransaction(op, fateId,
+ manager.fateClient(type).seedTransaction(op, fateId,
new TraceRepo<>(new CancelCompactions(namespaceId, tableId)),
autoCleanup, goalMessage);
break;
}
@@ -608,7 +608,7 @@ class FateServiceHandler implements FateService.Iface {
}
goalMessage += "Import table with new name: " + tableName + " from " +
exportDirs;
- manager.fate(type)
+ manager.fateClient(type)
.seedTransaction(op, fateId, new TraceRepo<>(new
ImportTable(c.getPrincipal(),
tableName, exportDirs, namespaceId, keepMappings,
keepOffline)), autoCleanup,
goalMessage);
@@ -638,7 +638,7 @@ class FateServiceHandler implements FateService.Iface {
}
goalMessage += "Export table " + tableName + "(" + tableId + ") to " +
exportDir;
- manager.fate(type).seedTransaction(op, fateId,
+ manager.fateClient(type).seedTransaction(op, fateId,
new TraceRepo<>(new ExportTable(namespaceId, tableName, tableId,
exportDir)),
autoCleanup, goalMessage);
break;
@@ -673,7 +673,7 @@ class FateServiceHandler implements FateService.Iface {
}
goalMessage += "Bulk import (v2) " + dir + " to " + tableName + "(" +
tableId + ")";
- manager.fate(type).seedTransaction(op, fateId,
+ manager.fateClient(type).seedTransaction(op, fateId,
new TraceRepo<>(new ComputeBulkRange(tableId, dir, setTime)),
autoCleanup, goalMessage);
break;
}
@@ -717,7 +717,7 @@ class FateServiceHandler implements FateService.Iface {
goalMessage += "Set availability for table: " + tableName + "(" +
tableId + ") range: "
+ tRange + " to: " + tabletAvailability.name();
- manager.fate(type).seedTransaction(op, fateId,
+ manager.fateClient(type).seedTransaction(op, fateId,
new TraceRepo<>(new LockTable(tableId, namespaceId, tRange,
tabletAvailability)),
autoCleanup, goalMessage);
break;
@@ -791,8 +791,8 @@ class FateServiceHandler implements FateService.Iface {
}
goalMessage = "Splitting " + extent + " for user into " +
(splits.size() + 1) + " tablets";
- manager.fate(type).seedTransaction(op, fateId, new PreSplit(extent,
splits), autoCleanup,
- goalMessage);
+ manager.fateClient(type).seedTransaction(op, fateId, new
PreSplit(extent, splits),
+ autoCleanup, goalMessage);
break;
}
default:
@@ -844,9 +844,9 @@ class FateServiceHandler implements FateService.Iface {
FateId fateId = FateId.fromThrift(opid);
FateInstanceType type = fateId.getType();
- TStatus status = manager.fate(type).waitForCompletion(fateId);
+ TStatus status = manager.fateClient(type).waitForCompletion(fateId);
if (status == TStatus.FAILED) {
- Exception e = manager.fate(type).getException(fateId);
+ Exception e = manager.fateClient(type).getException(fateId);
if (e instanceof ThriftTableOperationException) {
throw (ThriftTableOperationException) e;
} else if (e instanceof ThriftSecurityException) {
@@ -858,7 +858,7 @@ class FateServiceHandler implements FateService.Iface {
}
}
- String ret = manager.fate(type).getReturn(fateId);
+ String ret = manager.fateClient(type).getReturn(fateId);
if (ret == null) {
ret = ""; // thrift does not like returning null
}
@@ -870,7 +870,7 @@ class FateServiceHandler implements FateService.Iface {
throws ThriftSecurityException {
authenticate(credentials);
FateId fateId = FateId.fromThrift(opid);
- manager.fate(fateId.getType()).delete(fateId);
+ manager.fateClient(fateId.getType()).delete(fateId);
}
protected void authenticate(TCredentials credentials) throws
ThriftSecurityException {
@@ -984,6 +984,6 @@ class FateServiceHandler implements FateService.Iface {
SecurityErrorCode.PERMISSION_DENIED);
}
- return manager.fate(fateId.getType()).cancel(fateId);
+ return manager.fateClient(fateId.getType()).cancel(fateId);
}
}
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
index 06f227967e..1a3f9c5443 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
@@ -68,6 +68,7 @@ import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.fate.Fate;
import org.apache.accumulo.core.fate.FateCleaner;
+import org.apache.accumulo.core.fate.FateClient;
import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.fate.FateInstanceType;
import org.apache.accumulo.core.fate.FateStore;
@@ -290,17 +291,7 @@ public class Manager extends AbstractServer implements
LiveTServerSet.Listener,
return getManagerState() != ManagerState.STOP;
}
- /**
- * Retrieve the Fate object, blocking until it is ready. This could cause
problems if Fate
- * operations are attempted to be used prior to the Manager being ready for
them. If these
- * operations are triggered by a client side request from a tserver or
client, it should be safe
- * to wait to handle those until Fate is ready, but if it occurs during an
upgrade, or some other
- * time in the Manager before Fate is started, that may result in a deadlock
and will need to be
- * fixed.
- *
- * @return the Fate object, only after the fate components are running and
ready
- */
- public Fate<FateEnv> fate(FateInstanceType type) {
+ private void waitForFate() {
try {
// block up to 30 seconds until it's ready; if it's still not ready,
introduce some logging
if (!fateReadyLatch.await(30, SECONDS)) {
@@ -321,7 +312,26 @@ public class Manager extends AbstractServer implements
LiveTServerSet.Listener,
Thread.currentThread().interrupt();
throw new IllegalStateException("Thread was interrupted; cannot
proceed");
}
- return getFateRefs().get(type);
+ }
+
+ /**
+ * Retrieve the Fate object, blocking until it is ready. This could cause
problems if Fate
+ * operations are attempted to be used prior to the Manager being ready for
them. If these
+ * operations are triggered by a client side request from a tserver or
client, it should be safe
+ * to wait to handle those until Fate is ready, but if it occurs during an
upgrade, or some other
+ * time in the Manager before Fate is started, that may result in a deadlock
and will need to be
+ * fixed.
+ *
+ * @return the Fate object, only after the fate components are running and
ready
+ */
+ public Fate<FateEnv> fate(FateInstanceType type) {
+ waitForFate();
+ var fate = Objects.requireNonNull(fateRefs.get(), "fateRefs is not set
yet").get(type);
+ return Objects.requireNonNull(fate, () -> "fate type " + type + " is not
present");
+ }
+
+ public FateClient<FateEnv> fateClient(FateInstanceType type) {
+ return fate(type);
}
static final boolean X = true;
@@ -924,7 +934,7 @@ public class Manager extends AbstractServer implements
LiveTServerSet.Listener,
// Start the Manager's Fate Service
fateServiceHandler = new FateServiceHandler(this);
managerClientHandler = new ManagerClientServiceHandler(this);
- compactionCoordinator = new CompactionCoordinator(this, fateRefs);
+ compactionCoordinator = new CompactionCoordinator(this, this::fateClient);
var processor = ThriftProcessorTypes.getManagerTProcessor(this,
fateServiceHandler,
compactionCoordinator.getThriftService(), managerClientHandler,
getContext());
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
index e6c0c86f2e..d6915b5155 100644
---
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
@@ -56,8 +56,8 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
+import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -90,6 +90,7 @@ import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent;
import org.apache.accumulo.core.fate.Fate;
+import org.apache.accumulo.core.fate.FateClient;
import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.fate.FateInstanceType;
import org.apache.accumulo.core.fate.FateKey;
@@ -121,7 +122,6 @@ import
org.apache.accumulo.core.tabletserver.thrift.IteratorConfig;
import org.apache.accumulo.core.tabletserver.thrift.TCompactionKind;
import org.apache.accumulo.core.tabletserver.thrift.TCompactionStats;
import org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob;
-import org.apache.accumulo.core.util.UtilWaitThread;
import org.apache.accumulo.core.util.cache.Caches.CacheName;
import org.apache.accumulo.core.util.compaction.CompactionPlannerInitParams;
import org.apache.accumulo.core.util.compaction.CompactionServicesConfig;
@@ -271,7 +271,7 @@ public class CompactionCoordinator
private final ServerContext ctx;
private final AuditedSecurityOperation security;
private final CompactionJobQueues jobQueues;
- private final AtomicReference<Map<FateInstanceType,Fate<FateEnv>>>
fateInstances;
+ private final Function<FateInstanceType,FateClient<FateEnv>> fateClients;
// Exposed for tests
protected final CountDownLatch shutdown = new CountDownLatch(1);
@@ -291,7 +291,7 @@ public class CompactionCoordinator
private final Set<String> activeCompactorReservationRequest =
ConcurrentHashMap.newKeySet();
public CompactionCoordinator(Manager manager,
- AtomicReference<Map<FateInstanceType,Fate<FateEnv>>> fateInstances) {
+ Function<FateInstanceType,FateClient<FateEnv>> fateClients) {
this.ctx = manager.getContext();
this.security = ctx.getSecurityOperation();
this.manager = Objects.requireNonNull(manager);
@@ -303,7 +303,7 @@ public class CompactionCoordinator
this.queueMetrics = new QueueMetrics(jobQueues);
- this.fateInstances = fateInstances;
+ this.fateClients = fateClients;
completed =
ctx.getCaches().createNewBuilder(CacheName.COMPACTIONS_COMPLETED, true)
.maximumSize(200).expireAfterWrite(10, TimeUnit.MINUTES).build();
@@ -326,7 +326,7 @@ public class CompactionCoordinator
.maximumWeight(10485760L).weigher(weigher).build();
deadCompactionDetector =
- new DeadCompactionDetector(this.ctx, this, ctx.getScheduledExecutor(),
fateInstances);
+ new DeadCompactionDetector(this.ctx, this, ctx.getScheduledExecutor(),
fateClients);
var rootReservationPool =
ThreadPools.getServerThreadPools().createExecutorService(
ctx.getConfiguration(),
Property.COMPACTION_COORDINATOR_RESERVATION_THREADS_ROOT, true);
@@ -789,17 +789,9 @@ public class CompactionCoordinator
}
// maybe fate has not started yet
- var localFates = fateInstances.get();
- while (localFates == null) {
- UtilWaitThread.sleep(100);
- if (shutdown.getCount() == 0) {
- return;
- }
- localFates = fateInstances.get();
- }
-
var extent = KeyExtent.fromThrift(textent);
- var localFate =
localFates.get(FateInstanceType.fromTableId(extent.tableId()));
+ var fateType = FateInstanceType.fromTableId(extent.tableId());
+ var localFate = fateClients.apply(fateType);
LOG.info("Compaction completed, id: {}, stats: {}, extent: {}",
externalCompactionId, stats,
extent);
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/DeadCompactionDetector.java
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/DeadCompactionDetector.java
index da852f1bb1..ce04296a61 100644
---
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/DeadCompactionDetector.java
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/DeadCompactionDetector.java
@@ -18,6 +18,7 @@
*/
package org.apache.accumulo.manager.compaction.coordinator;
+import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
@@ -27,14 +28,14 @@ import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.dataImpl.KeyExtent;
-import org.apache.accumulo.core.fate.Fate;
+import org.apache.accumulo.core.fate.FateClient;
import org.apache.accumulo.core.fate.FateInstanceType;
import org.apache.accumulo.core.fate.FateKey;
import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
@@ -62,16 +63,16 @@ public class DeadCompactionDetector {
private final ScheduledThreadPoolExecutor schedExecutor;
private final ConcurrentHashMap<ExternalCompactionId,Long> deadCompactions;
private final Set<TableId> tablesWithUnreferencedTmpFiles = new HashSet<>();
- private final AtomicReference<Map<FateInstanceType,Fate<FateEnv>>>
fateInstances;
+ private final Function<FateInstanceType,FateClient<FateEnv>> fateClients;
public DeadCompactionDetector(ServerContext context, CompactionCoordinator
coordinator,
ScheduledThreadPoolExecutor stpe,
- AtomicReference<Map<FateInstanceType,Fate<FateEnv>>> fateInstances) {
+ Function<FateInstanceType,FateClient<FateEnv>> fateClients) {
this.context = context;
this.coordinator = coordinator;
this.schedExecutor = stpe;
this.deadCompactions = new ConcurrentHashMap<>();
- this.fateInstances = fateInstances;
+ this.fateClients = fateClients;
}
public void addTableId(TableId tableWithUnreferencedTmpFiles) {
@@ -196,13 +197,8 @@ public class DeadCompactionDetector {
if (!tabletCompactions.isEmpty()) {
// look for any compactions committing in fate and remove those
- var fateMap = fateInstances.get();
- if (fateMap == null) {
- log.warn("Fate is not present, can not look for dead compactions");
- return;
- }
- try (Stream<FateKey> keyStream = fateMap.values().stream()
- .flatMap(fate ->
fate.list(FateKey.FateKeyType.COMPACTION_COMMIT))) {
+ try (Stream<FateKey> keyStream =
Arrays.stream(FateInstanceType.values()).map(fateClients)
+ .flatMap(fateClient ->
fateClient.list(FateKey.FateKeyType.COMPACTION_COMMIT))) {
keyStream.map(fateKey ->
fateKey.getCompactionId().orElseThrow()).forEach(ecid -> {
if (tabletCompactions.remove(ecid) != null) {
log.debug("Ignoring compaction {} that is committing in a fate",
ecid);
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/merge/FindMergeableRangeTask.java
b/server/manager/src/main/java/org/apache/accumulo/manager/merge/FindMergeableRangeTask.java
index a59bee95ef..11c5b69211 100644
---
a/server/manager/src/main/java/org/apache/accumulo/manager/merge/FindMergeableRangeTask.java
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/merge/FindMergeableRangeTask.java
@@ -158,7 +158,7 @@ public class FindMergeableRangeTask implements Runnable {
tableId, startRowStr, endRowStr);
var fateKey = FateKey.forMerge(new KeyExtent(tableId, range.endRow,
range.startRow));
- manager.fate(type).seedTransaction(FateOperation.SYSTEM_MERGE, fateKey,
+ manager.fateClient(type).seedTransaction(FateOperation.SYSTEM_MERGE,
fateKey,
new TraceRepo<>(
new TableRangeOp(Operation.SYSTEM_MERGE, namespaceId, tableId,
startRow, endRow)),
true);
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/split/Splitter.java
b/server/manager/src/main/java/org/apache/accumulo/manager/split/Splitter.java
index 1f21fde170..93d4c1cf03 100644
---
a/server/manager/src/main/java/org/apache/accumulo/manager/split/Splitter.java
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/split/Splitter.java
@@ -89,7 +89,7 @@ public class Splitter {
private void seedSplits(FateInstanceType instanceType, Map<Text,KeyExtent>
splits) {
if (!splits.isEmpty()) {
- try (var seeder = manager.fate(instanceType).beginSeeding()) {
+ try (var seeder = manager.fateClient(instanceType).beginSeeding()) {
for (KeyExtent extent : splits.values()) {
@SuppressWarnings("unused")
var unused =
seeder.attemptToSeedTransaction(Fate.FateOperation.SYSTEM_SPLIT,
diff --git
a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java
b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java
index b121faf2b3..0800c42d2a 100644
---
a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java
+++
b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java
@@ -39,7 +39,6 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.admin.CompactionConfig;
@@ -55,7 +54,6 @@ import org.apache.accumulo.core.data.ResourceGroupId;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent;
-import org.apache.accumulo.core.fate.Fate;
import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.fate.FateInstanceType;
import org.apache.accumulo.core.iteratorsImpl.system.SystemIteratorUtil;
@@ -79,7 +77,6 @@ import org.apache.accumulo.manager.Manager;
import
org.apache.accumulo.manager.compaction.coordinator.CompactionCoordinator;
import org.apache.accumulo.manager.compaction.queue.CompactionJobPriorityQueue;
import org.apache.accumulo.manager.compaction.queue.ResolvedCompactionJob;
-import org.apache.accumulo.manager.tableOps.FateEnv;
import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.server.conf.TableConfiguration;
import org.apache.accumulo.server.security.AuditedSecurityOperation;
@@ -94,10 +91,6 @@ import com.google.common.net.HostAndPort;
public class CompactionCoordinatorTest {
- // Need a non-null fateInstances reference for
CompactionCoordinator.compactionCompleted
- private static final AtomicReference<Map<FateInstanceType,Fate<FateEnv>>>
fateInstances =
- new AtomicReference<>(Map.of());
-
private static final ResourceGroupId GROUP_ID = ResourceGroupId.of("R2DQ");
private final HostAndPort tserverAddr = HostAndPort.fromParts("192.168.1.1",
9090);
@@ -118,7 +111,7 @@ public class CompactionCoordinatorTest {
private Set<ExternalCompactionId> metadataCompactionIds = null;
public TestCoordinator(Manager manager, List<RunningCompaction>
runningCompactions) {
- super(manager, fateInstances);
+ super(manager, t -> null);
this.runningCompactions = runningCompactions;
}