This is an automated email from the ASF dual-hosted git repository.
kturner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push:
new fba07b60a0 Remove meta fate specialization (#6227)
fba07b60a0 is described below
commit fba07b60a0ff7ac77866008187b43a7745f287d3
Author: Keith Turner <[email protected]>
AuthorDate: Thu Apr 2 15:09:49 2026 -0700
Remove meta fate specialization (#6227)
After this change meta fate and user fate are both treated mostly the
same in the managers. One difference is in assignment, the entire meta
fate range is assigned to a single manager. User fate is spread across
all managers. But both are assigned out by the primary manager using
the same RPCs now. The primary manager used to directly start a meta
fate instance.
Was able to remove the extension of FateEnv from the manager class in
this change, that caused a ripple of test changes. But now there are no
longer two different implementations of FateEnv
---
.../apache/accumulo/core/fate/FatePartition.java | 4 +
.../java/org/apache/accumulo/manager/Manager.java | 86 +++------------------
.../apache/accumulo/manager/fate/FateManager.java | 90 +++++++++++++++-------
.../apache/accumulo/manager/fate/FateWorker.java | 83 +++++++++++---------
.../tableOps/compact/CompactionDriverTest.java | 14 ++--
.../manager/tableOps/merge/MergeTabletsTest.java | 12 +--
.../manager/tableOps/split/UpdateTabletsTest.java | 26 +++----
.../accumulo/test/MultipleManagerFateIT.java | 5 +-
.../apache/accumulo/test/ample/TestAmpleUtil.java | 24 +++---
.../test/fate/ManagerRepoIT_SimpleSuite.java | 66 ++++++++--------
10 files changed, 198 insertions(+), 212 deletions(-)
diff --git
a/core/src/main/java/org/apache/accumulo/core/fate/FatePartition.java
b/core/src/main/java/org/apache/accumulo/core/fate/FatePartition.java
index 33cbbc9724..44907cb788 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/FatePartition.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/FatePartition.java
@@ -60,4 +60,8 @@ public record FatePartition(FateId start, FateId end) {
}
}
+
+ public FateInstanceType getType() {
+ return start.getType();
+ }
}
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
index b1edc70857..204de1d79d 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
@@ -50,7 +50,6 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
-import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -77,7 +76,6 @@ import org.apache.accumulo.core.fate.FateCleaner;
import org.apache.accumulo.core.fate.FateClient;
import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.fate.FateInstanceType;
-import org.apache.accumulo.core.fate.FatePartition;
import org.apache.accumulo.core.fate.FateStore;
import org.apache.accumulo.core.fate.TraceRepo;
import org.apache.accumulo.core.fate.user.UserFateStore;
@@ -122,11 +120,9 @@ import
org.apache.accumulo.manager.compaction.coordinator.CompactionCoordinator;
import org.apache.accumulo.manager.fate.FateManager;
import org.apache.accumulo.manager.fate.FateWorker;
import org.apache.accumulo.manager.merge.FindMergeableRangeTask;
-import org.apache.accumulo.manager.metrics.fate.FateExecutorMetricsProducer;
import org.apache.accumulo.manager.metrics.fate.meta.MetaFateMetrics;
import org.apache.accumulo.manager.metrics.fate.user.UserFateMetrics;
import org.apache.accumulo.manager.recovery.RecoveryManager;
-import org.apache.accumulo.manager.split.FileRangeCache;
import org.apache.accumulo.manager.split.Splitter;
import org.apache.accumulo.manager.state.TableCounts;
import org.apache.accumulo.manager.tableOps.FateEnv;
@@ -178,10 +174,8 @@ import io.opentelemetry.context.Scope;
* <p>
* The manager will also coordinate log recoveries and reports general status.
*/
-// TODO create standalone PrimaryFateEnv class and pull everything into there
relatated to
-// FateEnv... this will make it much more clear the env is for metadata ops
only
public class Manager extends AbstractServer
- implements LiveTServerSet.Listener, FateEnv, PrimaryManagerThriftService {
+ implements LiveTServerSet.Listener, PrimaryManagerThriftService {
static final Logger log = LoggerFactory.getLogger(Manager.class);
@@ -227,8 +221,6 @@ public class Manager extends AbstractServer
private final CountDownLatch fateReadyLatch = new CountDownLatch(1);
private final AtomicReference<Map<FateInstanceType,FateClient<FateEnv>>>
fateClients =
new AtomicReference<>();
- private final AtomicReference<Map<FateInstanceType,Fate<FateEnv>>> fateRefs =
- new AtomicReference<>();
private volatile FateManager fateManager;
static class TServerStatus {
@@ -286,7 +278,6 @@ public class Manager extends AbstractServer
private final long timeToCacheRecoveryWalExistence;
private ExecutorService tableInformationStatusPool = null;
- private ThreadPoolExecutor tabletRefreshThreadPool;
private final TabletStateStore rootTabletStore;
private final TabletStateStore metadataTabletStore;
@@ -340,21 +331,15 @@ public class Manager extends AbstractServer
}
/**
- * Retrieve the Fate object, blocking until it is ready. This could cause
problems if Fate
+ * Retrieve the FateClient object, blocking until it is ready. This could
cause problems if Fate
* operations are attempted to be used prior to the Manager being ready for
them. If these
* operations are triggered by a client side request from a tserver or
client, it should be safe
* to wait to handle those until Fate is ready, but if it occurs during an
upgrade, or some other
* time in the Manager before Fate is started, that may result in a deadlock
and will need to be
* fixed.
*
- * @return the Fate object, only after the fate components are running and
ready
+ * @return the FateClient object, only after the fate components are running
and ready
*/
- public Fate<FateEnv> fate(FateInstanceType type) {
- waitForFate();
- var fate = requireNonNull(fateRefs.get(), "fateRefs is not set
yet").get(type);
- return requireNonNull(fate, () -> "fate type " + type + " is not present");
- }
-
public FateClient<FateEnv> fateClient(FateInstanceType type) {
waitForFate();
var client = requireNonNull(fateClients.get(), "fateClients is not set
yet").get(type);
@@ -497,16 +482,10 @@ public class Manager extends AbstractServer
return result;
}
- @Override
public TableManager getTableManager() {
return getContext().getTableManager();
}
- @Override
- public ThreadPoolExecutor getTabletRefreshThreadPool() {
- return tabletRefreshThreadPool;
- }
-
public static void main(String[] args) throws Exception {
AbstractServer.startServer(new Manager(new ServerOpts(),
ServerContext::new, args), log);
}
@@ -585,17 +564,11 @@ public class Manager extends AbstractServer
}
private Splitter splitter;
- private FileRangeCache fileRangeCache;
public Splitter getSplitter() {
return splitter;
}
- @Override
- public FileRangeCache getFileRangeCache() {
- return fileRangeCache;
- }
-
public UpgradeCoordinator.UpgradeStatus getUpgradeStatus() {
return upgradeCoordinator.getStatus();
}
@@ -717,14 +690,14 @@ public class Manager extends AbstractServer
case CLEAN_STOP:
switch (getManagerState()) {
case NORMAL:
- fateManager.stop(Duration.ofMinutes(1));
+ fateManager.stop(FateInstanceType.USER,
Duration.ofMinutes(1));
setManagerState(ManagerState.SAFE_MODE);
break;
case SAFE_MODE: {
// META fate stores its data in Zookeeper and its operations
interact with
// metadata and root tablets, need to completely shut it
down before unloading
// metadata and root tablets
- fate(FateInstanceType.META).shutdown(1, MINUTES);
+ fateManager.stop(FateInstanceType.META,
Duration.ofMinutes(1));
int count = nonMetaDataTabletsAssignedOrHosted();
log.debug(
String.format("There are %d non-metadata tablets
assigned or hosted", count));
@@ -953,9 +926,6 @@ public class Manager extends AbstractServer
getConfiguration().getTimeInMillis(Property.MANAGER_FATE_METRICS_MIN_UPDATE_INTERVAL)));
metricsInfo.addMetricsProducers(new UserFateMetrics(getContext(),
getConfiguration().getTimeInMillis(Property.MANAGER_FATE_METRICS_MIN_UPDATE_INTERVAL)));
- metricsInfo.addMetricsProducers(new
FateExecutorMetricsProducer(getContext(),
- fate(FateInstanceType.META).getFateExecutors(),
-
getConfiguration().getTimeInMillis(Property.MANAGER_FATE_METRICS_MIN_UPDATE_INTERVAL)));
metricsInfo.addMetricsProducers(this);
}
@@ -1045,11 +1015,6 @@ public class Manager extends AbstractServer
tableInformationStatusPool = ThreadPools.getServerThreadPools()
.createExecutorService(getConfiguration(),
Property.MANAGER_STATUS_THREAD_POOL_SIZE, false);
- tabletRefreshThreadPool =
ThreadPools.getServerThreadPools().getPoolBuilder("Tablet refresh ")
-
.numCoreThreads(getConfiguration().getCount(Property.MANAGER_TABLET_REFRESH_MINTHREADS))
-
.numMaxThreads(getConfiguration().getCount(Property.MANAGER_TABLET_REFRESH_MAXTHREADS))
- .build();
-
Thread statusThread = Threads.createCriticalThread("Status Thread", new
StatusThread());
statusThread.start();
@@ -1195,7 +1160,6 @@ public class Manager extends AbstractServer
this.splitter = new Splitter(this);
this.splitter.start();
- this.fileRangeCache = new FileRangeCache(context);
setupFate(context);
@@ -1283,8 +1247,8 @@ public class Manager extends AbstractServer
}
log.debug("Shutting down fate.");
- fate(FateInstanceType.META).close();
- fateManager.stop(Duration.ZERO);
+ fateManager.stop(FateInstanceType.USER, Duration.ZERO);
+ fateManager.stop(FateInstanceType.META, Duration.ZERO);
splitter.stop();
@@ -1296,7 +1260,6 @@ public class Manager extends AbstractServer
}
tableInformationStatusPool.shutdownNow();
- tabletRefreshThreadPool.shutdownNow();
compactionCoordinator.shutdown();
@@ -1344,12 +1307,10 @@ public class Manager extends AbstractServer
lock -> ServiceLock.isLockHeld(context.getZooCache(), lock);
var metaStore = new MetaFateStore<FateEnv>(context.getZooSession(),
primaryManagerLock.getLockID(), isLockHeld);
- var metaInstance = createFateInstance(this, metaStore, context);
- // configure this instance to process all data
-
metaInstance.setPartitions(Set.of(FatePartition.all(FateInstanceType.META)));
+ var metaFateClient = new FateClient<>(metaStore, TraceRepo::toLogString);
var userStore = new UserFateStore<FateEnv>(context,
SystemTables.FATE.tableName(),
managerLock.getLockID(), isLockHeld);
- var userFateClient = new FateClient<FateEnv>(userStore,
TraceRepo::toLogString);
+ var userFateClient = new FateClient<>(userStore, TraceRepo::toLogString);
var metaCleaner = new FateCleaner<>(metaStore, Duration.ofHours(8),
this::getSteadyTime);
ThreadPools.watchCriticalScheduledTask(context.getScheduledExecutor()
@@ -1359,14 +1320,10 @@ public class Manager extends AbstractServer
.scheduleWithFixedDelay(userCleaner::ageOff, 10, 4 * 60, MINUTES));
if (!fateClients.compareAndSet(null,
- Map.of(FateInstanceType.META, metaInstance, FateInstanceType.USER,
userFateClient))) {
+ Map.of(FateInstanceType.META, metaFateClient, FateInstanceType.USER,
userFateClient))) {
throw new IllegalStateException(
"Unexpected previous fateClient reference map already
initialized");
}
- if (!fateRefs.compareAndSet(null, Map.of(FateInstanceType.META,
metaInstance))) {
- throw new IllegalStateException(
- "Unexpected previous fate reference map already initialized");
- }
fateReadyLatch.countDown();
} catch (KeeperException | InterruptedException e) {
@@ -1522,11 +1479,6 @@ public class Manager extends AbstractServer
}
}
- @Override
- public ServiceLock getServiceLock() {
- return primaryManagerLock;
- }
-
private ServiceLockData getPrimaryManagerLock(final ServiceLockPath
zManagerLoc)
throws KeeperException, InterruptedException {
var zooKeeper = getContext().getZooSession();
@@ -1647,7 +1599,6 @@ public class Manager extends AbstractServer
return result;
}
- @Override
public Set<TServerInstance> onlineTabletServers() {
return tserverSet.getSnapshot().getTservers();
}
@@ -1660,12 +1611,6 @@ public class Manager extends AbstractServer
return nextEvent;
}
- @Override
- public EventPublisher getEventPublisher() {
- return nextEvent;
- }
-
- @Override
public VolumeManager getVolumeManager() {
return getContext().getVolumeManager();
}
@@ -1732,7 +1677,6 @@ public class Manager extends AbstractServer
* monotonic clock, which will be approximately consistent between different
managers or different
* runs of the same manager. SteadyTime supports both nanoseconds and
milliseconds.
*/
- @Override
public SteadyTime getSteadyTime() {
return timeKeeper.getTime();
}
@@ -1760,14 +1704,4 @@ public class Manager extends AbstractServer
public ServiceLock getLock() {
return managerLock;
}
-
- /**
- * Get Threads Pool instance which is used by blocked I/O
- *
- * @return {@link ExecutorService}
- */
- @Override
- public ExecutorService getRenamePool() {
- return this.renamePool;
- }
}
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/fate/FateManager.java
b/server/manager/src/main/java/org/apache/accumulo/manager/fate/FateManager.java
index f5724a0f9d..a791da3229 100644
---
a/server/manager/src/main/java/org/apache/accumulo/manager/fate/FateManager.java
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/fate/FateManager.java
@@ -32,7 +32,9 @@ import java.util.stream.Collectors;
import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.fate.FateInstanceType;
import org.apache.accumulo.core.fate.FatePartition;
+import org.apache.accumulo.core.fate.FateStore;
import org.apache.accumulo.core.fate.user.UserFateStore;
+import org.apache.accumulo.core.fate.zookeeper.MetaFateStore;
import org.apache.accumulo.core.lock.ServiceLockPaths.AddressSelector;
import org.apache.accumulo.core.manager.thrift.FateWorkerService;
import org.apache.accumulo.core.metadata.SystemTables;
@@ -44,6 +46,7 @@ import org.apache.accumulo.core.util.threads.Threads;
import org.apache.accumulo.manager.tableOps.FateEnv;
import org.apache.accumulo.server.ServerContext;
import org.apache.thrift.TException;
+import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -103,7 +106,8 @@ public class FateManager {
}
Map<HostAndPort,Set<FatePartition>> currentAssignments = new
HashMap<>();
currentPartitions.forEach((k, v) -> currentAssignments.put(k,
v.partitions()));
- Set<FatePartition> desiredParititions =
getDesiredPartitions(currentAssignments.size());
+ Map<FateInstanceType,Set<FatePartition>> desiredParititions =
+ getDesiredPartitions(currentAssignments.size());
Map<HostAndPort,Set<FatePartition>> desired =
computeDesiredAssignments(currentAssignments, desiredParititions);
@@ -188,7 +192,7 @@ public class FateManager {
@SuppressFBWarnings(value = "SWL_SLEEP_WITH_LOCK_HELD",
justification = "Sleep is okay. Can hold the lock as long as needed, as
we are shutting down."
+ " Don't need or want other operations to run.")
- public synchronized void stop(Duration timeout) {
+ public synchronized void stop(FateInstanceType fateType, Duration timeout) {
if (!stop.compareAndSet(false, true)) {
return;
}
@@ -219,7 +223,9 @@ public class FateManager {
var currentPartitions = entry.getValue();
if (!currentPartitions.partitions.isEmpty()) {
try {
- setPartitions(hostPort, currentPartitions.updateId(), Set.of());
+ var copy = new HashSet<>(currentPartitions.partitions);
+ copy.removeIf(fp -> fp.getType() == fateType);
+ setPartitions(hostPort, currentPartitions.updateId(), copy);
} catch (TException e) {
log.warn("Failed to unassign fate partitions {}", hostPort, e);
}
@@ -229,8 +235,16 @@ public class FateManager {
stableAssignments.set(TreeRangeMap.create());
if (!timer.isExpired()) {
- var store = new UserFateStore<FateEnv>(context,
SystemTables.FATE.tableName(), null, null);
-
+ FateStore<FateEnv> store = switch (fateType) {
+ case USER -> new UserFateStore<FateEnv>(context,
SystemTables.FATE.tableName(), null, null);
+ case META -> {
+ try {
+ yield new MetaFateStore<>(context.getZooSession(), null, null);
+ } catch (KeeperException | InterruptedException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+ };
var reserved =
store.getActiveReservations(Set.of(FatePartition.all(FateInstanceType.USER)));
while (!reserved.isEmpty() && !timer.isExpired()) {
if (log.isTraceEnabled()) {
@@ -333,28 +347,41 @@ public class FateManager {
*/
private Map<HostAndPort,Set<FatePartition>> computeDesiredAssignments(
Map<HostAndPort,Set<FatePartition>> currentAssignments,
- Set<FatePartition> desiredParititions) {
+ Map<FateInstanceType,Set<FatePartition>> desiredParititions) {
- Preconditions.checkArgument(currentAssignments.size() ==
desiredParititions.size());
Map<HostAndPort,Set<FatePartition>> desiredAssignments = new HashMap<>();
- var copy = new HashSet<>(desiredParititions);
+ currentAssignments.keySet().forEach(hp -> {
+ desiredAssignments.put(hp, new HashSet<>());
+ });
+
+ desiredParititions.forEach((fateType, desiredForType) -> {
+ // This code can not handle more than one partition per host
+ Preconditions.checkState(desiredForType.size() <=
currentAssignments.size());
+
+ var added = new HashSet<FatePartition>();
+
+ currentAssignments.forEach((hp, partitions) -> {
+ var hostAssignments = desiredAssignments.get(hp);
+ partitions.forEach(partition -> {
+ if (desiredForType.contains(partition)
+ && hostAssignments.stream().noneMatch(fp -> fp.getType() ==
fateType)
+ && !added.contains(partition)) {
+ hostAssignments.add(partition);
+ Preconditions.checkState(added.add(partition));
+ }
+ });
+ });
- currentAssignments.forEach((hp, partitions) -> {
- if (!partitions.isEmpty()) {
- var firstPart = partitions.iterator().next();
- if (copy.contains(firstPart)) {
- desiredAssignments.put(hp, Set.of(firstPart));
- copy.remove(firstPart);
+ var iter = Sets.difference(desiredForType, added).iterator();
+ currentAssignments.forEach((hp, partitions) -> {
+ var hostAssignments = desiredAssignments.get(hp);
+ if (iter.hasNext() && hostAssignments.stream().noneMatch(fp ->
fp.getType() == fateType)) {
+ hostAssignments.add(iter.next());
}
- }
- });
+ });
- var iter = copy.iterator();
- currentAssignments.forEach((hp, partitions) -> {
- if (!desiredAssignments.containsKey(hp)) {
- desiredAssignments.put(hp, Set.of(iter.next()));
- }
+ Preconditions.checkState(!iter.hasNext());
});
if (log.isTraceEnabled()) {
@@ -363,7 +390,6 @@ public class FateManager {
log.trace(" desired {} {} {}", hp, parts.size(), parts);
});
}
-
return desiredAssignments;
}
@@ -371,15 +397,23 @@ public class FateManager {
* Computes a single partition for each worker such that the partition cover
all possible UUIDs
* and evenly divide the UUIDs.
*/
- private Set<FatePartition> getDesiredPartitions(int numWorkers) {
+ private Map<FateInstanceType,Set<FatePartition>> getDesiredPartitions(int
numWorkers) {
Preconditions.checkArgument(numWorkers >= 0);
if (numWorkers == 0) {
- return Set.of();
+ return Map.of(FateInstanceType.META, Set.of(), FateInstanceType.USER,
Set.of());
}
// create a single partition per worker that equally divides the space
- HashSet<FatePartition> desired = new HashSet<>();
+ Map<FateInstanceType,Set<FatePartition>> desired = new HashMap<>();
+
+ // meta fate will never see much activity, so give it a single partition.
+ desired.put(FateInstanceType.META,
+ Set.of(new FatePartition(FateId.from(FateInstanceType.META, new
UUID(0, 0)),
+ FateId.from(FateInstanceType.META, new UUID(-1, -1)))));
+
+ Set<FatePartition> desiredUser = new HashSet<>();
+
// All the shifting is because java does not have unsigned integers. Want
to evenly partition
// [0,2^64) into numWorker ranges, but can not directly do that. Work w/
60 bit unsigned
// integers to partition the space and then shift over by 4. Used 60 bits
instead of 63 so it
@@ -392,7 +426,7 @@ public class FateManager {
UUID startUuid = new UUID(start, 0);
UUID endUuid = new UUID(end, 0);
- desired.add(new FatePartition(FateId.from(FateInstanceType.USER,
startUuid),
+ desiredUser.add(new FatePartition(FateId.from(FateInstanceType.USER,
startUuid),
FateId.from(FateInstanceType.USER, endUuid)));
}
@@ -400,9 +434,11 @@ public class FateManager {
UUID startUuid = new UUID(start, 0);
// last partition has a special end uuid that is all f nibbles.
UUID endUuid = new UUID(-1, -1);
- desired.add(new FatePartition(FateId.from(FateInstanceType.USER,
startUuid),
+ desiredUser.add(new FatePartition(FateId.from(FateInstanceType.USER,
startUuid),
FateId.from(FateInstanceType.USER, endUuid)));
+ desired.put(FateInstanceType.USER, desiredUser);
+
return desired;
}
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/fate/FateWorker.java
b/server/manager/src/main/java/org/apache/accumulo/manager/fate/FateWorker.java
index a1409af98e..f60fc453c0 100644
---
a/server/manager/src/main/java/org/apache/accumulo/manager/fate/FateWorker.java
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/fate/FateWorker.java
@@ -20,8 +20,11 @@ package org.apache.accumulo.manager.fate;
import static org.apache.accumulo.core.util.LazySingletons.RANDOM;
+import java.util.Arrays;
import java.util.List;
-import java.util.concurrent.TimeUnit;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Predicate;
import java.util.stream.Collectors;
@@ -31,9 +34,11 @@ import org.apache.accumulo.core.clientImpl.thrift.TInfo;
import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.fate.Fate;
+import org.apache.accumulo.core.fate.FateInstanceType;
import org.apache.accumulo.core.fate.FatePartition;
import org.apache.accumulo.core.fate.FateStore;
import org.apache.accumulo.core.fate.user.UserFateStore;
+import org.apache.accumulo.core.fate.zookeeper.MetaFateStore;
import org.apache.accumulo.core.fate.zookeeper.ZooUtil;
import org.apache.accumulo.core.lock.ServiceLock;
import org.apache.accumulo.core.manager.thrift.FateWorkerService;
@@ -49,6 +54,7 @@ import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.server.manager.LiveTServerSet;
import org.apache.accumulo.server.security.AuditedSecurityOperation;
import org.apache.thrift.TException;
+import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -61,8 +67,7 @@ public class FateWorker implements FateWorkerService.Iface {
private final AuditedSecurityOperation security;
private final LiveTServerSet liveTserverSet;
private final FateFactory fateFactory;
- private Fate<FateEnv> fate;
- private FateWorkerEnv fateWorkerEnv;
+ private final Map<FateInstanceType,Fate<FateEnv>> fates = new
ConcurrentHashMap<>();
public interface FateFactory {
Fate<FateEnv> create(FateEnv env, FateStore<FateEnv> store, ServerContext
context);
@@ -71,17 +76,23 @@ public class FateWorker implements FateWorkerService.Iface {
public FateWorker(ServerContext ctx, LiveTServerSet liveTServerSet,
FateFactory fateFactory) {
this.context = ctx;
this.security = ctx.getSecurityOperation();
- this.fate = null;
this.liveTserverSet = liveTServerSet;
this.fateFactory = fateFactory;
}
public synchronized void setLock(ServiceLock lock) {
- fateWorkerEnv = new FateWorkerEnv(context, lock, liveTserverSet);
+ FateWorkerEnv fateWorkerEnv = new FateWorkerEnv(context, lock,
liveTserverSet);
Predicate<ZooUtil.LockID> isLockHeld = l ->
ServiceLock.isLockHeld(context.getZooCache(), l);
+ try {
+ MetaFateStore<FateEnv> metaStore =
+ new MetaFateStore<>(context.getZooSession(), lock.getLockID(),
isLockHeld);
+ this.fates.put(FateInstanceType.META, fateFactory.create(fateWorkerEnv,
metaStore, context));
+ } catch (KeeperException | InterruptedException e) {
+ throw new IllegalStateException(e);
+ }
UserFateStore<FateEnv> store =
new UserFateStore<>(context, SystemTables.FATE.tableName(),
lock.getLockID(), isLockHeld);
- this.fate = fateFactory.create(fateWorkerEnv, store, context);
+ this.fates.put(FateInstanceType.USER, fateFactory.create(fateWorkerEnv,
store, context));
}
private Long expectedUpdateId = null;
@@ -105,12 +116,8 @@ public class FateWorker implements FateWorkerService.Iface
{
// id
expectedUpdateId = updateId;
- if (fate == null) {
- return new TFatePartitions(updateId, List.of());
- } else {
- return new TFatePartitions(updateId,
-
fate.getPartitions().stream().map(FatePartition::toThrift).toList());
- }
+ return new TFatePartitions(updateId, fates.values().stream()
+ .flatMap(fate ->
fate.getPartitions().stream()).map(FatePartition::toThrift).toList());
}
}
@@ -137,16 +144,22 @@ public class FateWorker implements
FateWorkerService.Iface {
synchronized (this) {
// The primary manager should not assign any fate partitions until after
upgrade is complete.
Preconditions.checkState(isUpgradeComplete());
- if (fate != null && expectedUpdateId != null && updateId ==
expectedUpdateId) {
+
+ if (expectedUpdateId != null && updateId == expectedUpdateId) {
// Set to null which makes it so that an update id can only be used
once.
expectedUpdateId = null;
- var desiredSet =
desired.stream().map(FatePartition::from).collect(Collectors.toSet());
- var oldPartitions = fate.setPartitions(desiredSet);
- log.info("Changed partitions from {} to {}", oldPartitions,
desiredSet);
+ for (var fateType : FateInstanceType.values()) {
+ var fate = fates.get(fateType);
+ var desiredSet = desired.stream().map(FatePartition::from)
+ .filter(fp -> fp.getType() ==
fateType).collect(Collectors.toSet());
+ var oldPartitions = fate.setPartitions(desiredSet);
+ log.info("Changed partitions for {} from {} to {}", fateType,
oldPartitions, desiredSet);
+ }
+
return true;
} else {
- log.debug("Did not change partitions to {} expectedUpdateId:{}
updateId:{} fate==null:{}",
- desired, expectedUpdateId, updateId, fate == null);
+ log.debug("Did not change partitions to {} expectedUpdateId:{}
updateId:{} fates:{}",
+ desired, expectedUpdateId, updateId, fates.keySet());
return false;
}
}
@@ -161,28 +174,24 @@ public class FateWorker implements
FateWorkerService.Iface {
SecurityErrorCode.PERMISSION_DENIED).asThriftException();
}
- Fate<FateEnv> localFate;
- synchronized (this) {
- localFate = fate;
- }
+ Map<FateInstanceType,Set<FatePartition>> partitions =
+ tpartitions.stream().map(FatePartition::from)
+ .collect(Collectors.groupingBy(FatePartition::getType,
Collectors.toSet()));
- if (localFate != null) {
-
localFate.seeded(tpartitions.stream().map(FatePartition::from).collect(Collectors.toSet()));
- }
- }
-
- public synchronized void stop() {
- fate.shutdown(1, TimeUnit.MINUTES);
- fate.close();
- fateWorkerEnv.stop();
- fate = null;
- fateWorkerEnv = null;
+ partitions.forEach((fateType, typePartitions) -> {
+ var fate = fates.get(fateType);
+ if (fate != null) {
+ fate.seeded(typePartitions);
+ }
+ });
}
public synchronized MetricsProducer[] getMetricsProducers() {
- Preconditions.checkState(fate != null, "Not started yet");
- return new MetricsProducer[] {
- new FateExecutorMetricsProducer(context, fate.getFateExecutors(),
context.getConfiguration()
-
.getTimeInMillis(Property.MANAGER_FATE_METRICS_MIN_UPDATE_INTERVAL))};
+ Preconditions.checkState(!fates.isEmpty(), "Not started yet");
+ return Arrays.stream(FateInstanceType.values()).map(fates::get)
+ .map(fate -> new FateExecutorMetricsProducer(context,
fate.getFateExecutors(),
+ context.getConfiguration()
+
.getTimeInMillis(Property.MANAGER_FATE_METRICS_MIN_UPDATE_INTERVAL)))
+ .toArray(MetricsProducer[]::new);
}
}
diff --git
a/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriverTest.java
b/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriverTest.java
index c95dcd9ad6..8d407d763d 100644
---
a/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriverTest.java
+++
b/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriverTest.java
@@ -38,7 +38,7 @@ import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.fate.FateInstanceType;
import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
import org.apache.accumulo.core.zookeeper.ZooSession;
-import org.apache.accumulo.manager.Manager;
+import org.apache.accumulo.manager.tableOps.FateEnv;
import org.apache.accumulo.manager.tableOps.delete.PreDeleteTable;
import org.apache.accumulo.server.ServerContext;
import org.apache.zookeeper.data.Stat;
@@ -74,24 +74,24 @@ public class CompactionDriverTest {
}
}
- private Manager manager;
+ private FateEnv fateEnv;
private ServerContext ctx;
private ZooSession zk;
@BeforeEach
public void setup() {
- manager = createMock(Manager.class);
+ fateEnv = createMock(FateEnv.class);
ctx = createMock(ServerContext.class);
zk = createMock(ZooSession.class);
expect(ctx.getInstanceID()).andReturn(instance).anyTimes();
expect(ctx.getZooSession()).andReturn(zk).anyTimes();
expect(zk.asReaderWriter()).andReturn(new ZooReaderWriter(zk)).anyTimes();
- expect(manager.getContext()).andReturn(ctx).anyTimes();
+ expect(fateEnv.getContext()).andReturn(ctx).anyTimes();
}
@AfterEach
public void teardown() {
- verify(manager, ctx, zk);
+ verify(fateEnv, ctx, zk);
}
@Test
@@ -107,9 +107,9 @@ public class CompactionDriverTest {
}
private void runDriver(CompactionDriver driver, String expectedMessage) {
- replay(manager, ctx, zk);
+ replay(fateEnv, ctx, zk);
var e = assertThrows(AcceptableThriftTableOperationException.class,
- () -> driver.isReady(FateId.from(FateInstanceType.USER,
UUID.randomUUID()), manager));
+ () -> driver.isReady(FateId.from(FateInstanceType.USER,
UUID.randomUUID()), fateEnv));
assertEquals(e.getTableId(), tableId.toString());
assertEquals(e.getOp(), TableOperation.COMPACT);
assertEquals(e.getType(), TableOperationExceptionType.OTHER);
diff --git
a/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/merge/MergeTabletsTest.java
b/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/merge/MergeTabletsTest.java
index 9e3037dbe0..7bd6824f2f 100644
---
a/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/merge/MergeTabletsTest.java
+++
b/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/merge/MergeTabletsTest.java
@@ -84,7 +84,7 @@ import
org.apache.accumulo.core.metadata.schema.TabletsMetadata;
import org.apache.accumulo.core.metadata.schema.UnSplittableMetadata;
import org.apache.accumulo.core.tabletserver.log.LogEntry;
import org.apache.accumulo.core.util.time.SteadyTime;
-import org.apache.accumulo.manager.Manager;
+import org.apache.accumulo.manager.tableOps.FateEnv;
import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.server.gc.AllVolumesDirectory;
import org.apache.accumulo.server.metadata.ConditionalTabletMutatorImpl;
@@ -425,7 +425,7 @@ public class MergeTabletsTest {
end == null ? null : end.getBytes(UTF_8),
MergeInfo.Operation.MERGE);
MergeTablets mergeTablets = new MergeTablets(mergeInfo);
- Manager manager = EasyMock.mock(Manager.class);
+ FateEnv fateEnv = EasyMock.mock(FateEnv.class);
ServerContext context = EasyMock.mock(ServerContext.class);
Ample ample = EasyMock.mock(Ample.class);
TabletsMetadata.Builder tabletBuilder =
EasyMock.mock(TabletsMetadata.Builder.class);
@@ -438,7 +438,7 @@ public class MergeTabletsTest {
EasyMock.expect(context.getServiceLock()).andReturn(managerLock).anyTimes();
// setup reading the tablets
- EasyMock.expect(manager.getContext()).andReturn(context).atLeastOnce();
+ EasyMock.expect(fateEnv.getContext()).andReturn(context).atLeastOnce();
EasyMock.expect(context.getAmple()).andReturn(ample).atLeastOnce();
EasyMock.expect(ample.readTablets()).andReturn(tabletBuilder).once();
EasyMock.expect(tabletBuilder.forTable(tableId)).andReturn(tabletBuilder).once();
@@ -478,12 +478,12 @@ public class MergeTabletsTest {
ample.putGcFileAndDirCandidates(tableId, dirs);
EasyMock.expectLastCall().once();
- EasyMock.replay(manager, context, ample, tabletBuilder, tabletsMetadata,
tabletsMutator,
+ EasyMock.replay(fateEnv, context, ample, tabletBuilder, tabletsMetadata,
tabletsMutator,
tabletMutator, cr, managerLock);
- mergeTablets.call(fateId, manager);
+ mergeTablets.call(fateId, fateEnv);
- EasyMock.verify(manager, context, ample, tabletBuilder, tabletsMetadata,
tabletsMutator,
+ EasyMock.verify(fateEnv, context, ample, tabletBuilder, tabletsMetadata,
tabletsMutator,
tabletMutator, cr, managerLock);
}
}
diff --git
a/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/split/UpdateTabletsTest.java
b/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/split/UpdateTabletsTest.java
index 6ceac3d681..dad40a0cb1 100644
---
a/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/split/UpdateTabletsTest.java
+++
b/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/split/UpdateTabletsTest.java
@@ -66,8 +66,8 @@ import
org.apache.accumulo.core.metadata.schema.TabletOperationType;
import org.apache.accumulo.core.metadata.schema.UnSplittableMetadata;
import org.apache.accumulo.core.tabletserver.log.LogEntry;
import org.apache.accumulo.core.util.time.SteadyTime;
-import org.apache.accumulo.manager.Manager;
import org.apache.accumulo.manager.split.FileRangeCache;
+import org.apache.accumulo.manager.tableOps.FateEnv;
import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.server.metadata.ConditionalTabletMutatorImpl;
import org.apache.hadoop.fs.Path;
@@ -233,9 +233,9 @@ public class UpdateTabletsTest {
String dir1 = "dir1";
String dir2 = "dir2";
- Manager manager = EasyMock.mock(Manager.class);
+ FateEnv fateEnv = EasyMock.mock(FateEnv.class);
ServerContext context = EasyMock.mock(ServerContext.class);
- EasyMock.expect(manager.getContext()).andReturn(context).atLeastOnce();
+ EasyMock.expect(fateEnv.getContext()).andReturn(context).atLeastOnce();
Ample ample = EasyMock.mock(Ample.class);
EasyMock.expect(context.getAmple()).andReturn(ample).atLeastOnce();
FileRangeCache fileRangeCache = EasyMock.mock(FileRangeCache.class);
@@ -247,8 +247,8 @@ public class UpdateTabletsTest {
.andReturn(newFileInfo("d", "f"));
EasyMock.expect(fileRangeCache.getCachedFileInfo(tableId, file4))
.andReturn(newFileInfo("d", "j"));
-
EasyMock.expect(manager.getFileRangeCache()).andReturn(fileRangeCache).atLeastOnce();
-
EasyMock.expect(manager.getSteadyTime()).andReturn(SteadyTime.from(100_000,
TimeUnit.SECONDS))
+
EasyMock.expect(fateEnv.getFileRangeCache()).andReturn(fileRangeCache).atLeastOnce();
+
EasyMock.expect(fateEnv.getSteadyTime()).andReturn(SteadyTime.from(100_000,
TimeUnit.SECONDS))
.atLeastOnce();
ServiceLock managerLock = EasyMock.mock(ServiceLock.class);
@@ -394,7 +394,7 @@ public class UpdateTabletsTest {
tabletsMutator.close();
EasyMock.expectLastCall().anyTimes();
- EasyMock.replay(manager, context, ample, tabletMeta, fileRangeCache,
tabletsMutator,
+ EasyMock.replay(fateEnv, context, ample, tabletMeta, fileRangeCache,
tabletsMutator,
tablet1Mutator, tablet2Mutator, tablet3Mutator, cr, compactions);
// Now we can actually test the split code that writes the new tablets
with a bunch columns in
// the original tablet
@@ -404,9 +404,9 @@ public class UpdateTabletsTest {
dirNames.add(dir2);
UpdateTablets updateTablets = new UpdateTablets(
new SplitInfo(origExtent,
TabletMergeabilityUtil.systemDefaultSplits(splits)), dirNames);
- updateTablets.call(fateId, manager);
+ updateTablets.call(fateId, fateEnv);
- EasyMock.verify(manager, context, ample, tabletMeta, fileRangeCache,
tabletsMutator,
+ EasyMock.verify(fateEnv, context, ample, tabletMeta, fileRangeCache,
tabletsMutator,
tablet1Mutator, tablet2Mutator, tablet3Mutator, cr, compactions);
}
@@ -469,15 +469,15 @@ public class UpdateTabletsTest {
private static void testError(KeyExtent origExtent, TabletMetadata tm1,
FateId fateId)
throws Exception {
- Manager manager = EasyMock.mock(Manager.class);
+ FateEnv fateEnv = EasyMock.mock(FateEnv.class);
ServerContext context = EasyMock.mock(ServerContext.class);
- EasyMock.expect(manager.getContext()).andReturn(context).atLeastOnce();
+ EasyMock.expect(fateEnv.getContext()).andReturn(context).atLeastOnce();
Ample ample = EasyMock.mock(Ample.class);
EasyMock.expect(context.getAmple()).andReturn(ample).atLeastOnce();
EasyMock.expect(ample.readTablet(origExtent)).andReturn(tm1);
- EasyMock.replay(manager, context, ample);
+ EasyMock.replay(fateEnv, context, ample);
// Now we can actually test the split code that writes the new tablets
with a bunch columns in
// the original tablet
SortedSet<Text> splits = new TreeSet<>(List.of(new Text("c")));
@@ -485,8 +485,8 @@ public class UpdateTabletsTest {
dirNames.add("d1");
var updateTablets = new UpdateTablets(
new SplitInfo(origExtent,
TabletMergeabilityUtil.systemDefaultSplits(splits)), dirNames);
- updateTablets.call(fateId, manager);
+ updateTablets.call(fateId, fateEnv);
- EasyMock.verify(manager, context, ample);
+ EasyMock.verify(fateEnv, context, ample);
}
}
diff --git
a/test/src/main/java/org/apache/accumulo/test/MultipleManagerFateIT.java
b/test/src/main/java/org/apache/accumulo/test/MultipleManagerFateIT.java
index 5e825efbac..dce79c9953 100644
--- a/test/src/main/java/org/apache/accumulo/test/MultipleManagerFateIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/MultipleManagerFateIT.java
@@ -84,7 +84,6 @@ import com.google.common.net.HostAndPort;
*
*/
public class MultipleManagerFateIT extends ConfigurableMacBase {
-
// A manager that will quickly clean up fate reservations held by dead
managers
public static class FastFateCleanupManager extends Manager {
protected FastFateCleanupManager(ServerOpts opts, String[] args) throws
IOException {
@@ -111,6 +110,10 @@ public class MultipleManagerFateIT extends
ConfigurableMacBase {
cfg.getClusterServerConfiguration().setNumDefaultCompactors(8);
// Set this lower so that locks timeout faster
cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "5s");
+ // This test could kill a manager after its written a compaction to the
metadata table, but
+ // before it returns it to the compactor via RPC which creates a dead
compaction. Need to speed
+ // up the dead compaction detection to handle this or else the test will
hang.
+
cfg.setProperty(Property.COMPACTION_COORDINATOR_DEAD_COMPACTOR_CHECK_INTERVAL,
"5s");
cfg.setServerClass(ServerType.MANAGER, r -> FastFateCleanupManager.class);
super.configure(cfg, hadoopCoreSite);
}
diff --git
a/test/src/main/java/org/apache/accumulo/test/ample/TestAmpleUtil.java
b/test/src/main/java/org/apache/accumulo/test/ample/TestAmpleUtil.java
index 8b4283f31d..834e9a10bd 100644
--- a/test/src/main/java/org/apache/accumulo/test/ample/TestAmpleUtil.java
+++ b/test/src/main/java/org/apache/accumulo/test/ample/TestAmpleUtil.java
@@ -23,29 +23,29 @@ import static
org.apache.accumulo.test.ample.metadata.TestAmple.testAmpleServerC
import java.time.Duration;
import org.apache.accumulo.core.util.time.SteadyTime;
-import org.apache.accumulo.manager.Manager;
+import org.apache.accumulo.manager.tableOps.FateEnv;
import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.test.ample.metadata.TestAmple.TestServerAmpleImpl;
import org.easymock.EasyMock;
public class TestAmpleUtil {
- public static Manager mockWithAmple(ServerContext context,
TestServerAmpleImpl ample) {
- Manager manager = EasyMock.mock(Manager.class);
-
EasyMock.expect(manager.getContext()).andReturn(testAmpleServerContext(context,
ample))
+ public static FateEnv mockWithAmple(ServerContext context,
TestServerAmpleImpl ample) {
+ FateEnv fateEnv = EasyMock.mock(FateEnv.class);
+
EasyMock.expect(fateEnv.getContext()).andReturn(testAmpleServerContext(context,
ample))
.atLeastOnce();
- EasyMock.replay(manager);
- return manager;
+ EasyMock.replay(fateEnv);
+ return fateEnv;
}
- public static Manager mockWithAmple(ServerContext context,
TestServerAmpleImpl ample,
+ public static FateEnv mockWithAmple(ServerContext context,
TestServerAmpleImpl ample,
Duration currentTime) {
- Manager manager = EasyMock.mock(Manager.class);
-
EasyMock.expect(manager.getContext()).andReturn(testAmpleServerContext(context,
ample))
+ FateEnv fateEnv = EasyMock.mock(FateEnv.class);
+
EasyMock.expect(fateEnv.getContext()).andReturn(testAmpleServerContext(context,
ample))
.atLeastOnce();
-
EasyMock.expect(manager.getSteadyTime()).andReturn(SteadyTime.from(currentTime)).anyTimes();
- EasyMock.replay(manager);
- return manager;
+
EasyMock.expect(fateEnv.getSteadyTime()).andReturn(SteadyTime.from(currentTime)).anyTimes();
+ EasyMock.replay(fateEnv);
+ return fateEnv;
}
}
diff --git
a/test/src/main/java/org/apache/accumulo/test/fate/ManagerRepoIT_SimpleSuite.java
b/test/src/main/java/org/apache/accumulo/test/fate/ManagerRepoIT_SimpleSuite.java
index 1178f2d8ad..a36f870446 100644
---
a/test/src/main/java/org/apache/accumulo/test/fate/ManagerRepoIT_SimpleSuite.java
+++
b/test/src/main/java/org/apache/accumulo/test/fate/ManagerRepoIT_SimpleSuite.java
@@ -75,9 +75,9 @@ import org.apache.accumulo.core.tabletserver.log.LogEntry;
import org.apache.accumulo.core.util.Pair;
import org.apache.accumulo.core.util.time.SteadyTime;
import org.apache.accumulo.harness.SharedMiniClusterBase;
-import org.apache.accumulo.manager.Manager;
import
org.apache.accumulo.manager.merge.FindMergeableRangeTask.UnmergeableReason;
import org.apache.accumulo.manager.tableOps.AbstractFateOperation;
+import org.apache.accumulo.manager.tableOps.FateEnv;
import org.apache.accumulo.manager.tableOps.compact.CompactionDriver;
import org.apache.accumulo.manager.tableOps.merge.DeleteRows;
import org.apache.accumulo.manager.tableOps.merge.MergeInfo;
@@ -133,7 +133,7 @@ public class ManagerRepoIT_SimpleSuite extends
SharedMiniClusterBase {
TestServerAmpleImpl testAmple = (TestServerAmpleImpl) TestAmple
.create(getCluster().getServerContext(), Map.of(DataLevel.USER,
metadataTable));
testAmple.createMetadataFromExisting(client, tableId);
- Manager manager = mockWithAmple(getCluster().getServerContext(),
testAmple);
+ FateEnv fateEnv = mockWithAmple(getCluster().getServerContext(),
testAmple);
// Create a test operation and fate id for testing merge and delete rows
// and add operation to test metadata for the tablet
@@ -148,15 +148,15 @@ public class ManagerRepoIT_SimpleSuite extends
SharedMiniClusterBase {
// Build either MergeTablets or DeleteRows repo for testing no WALs,
both should check this
// condition
final MergeInfo mergeInfo = new MergeInfo(tableId,
- manager.getContext().getNamespaceId(tableId), null, null, operation);
+ fateEnv.getContext().getNamespaceId(tableId), null, null, operation);
final AbstractFateOperation repo =
operation == Operation.MERGE ? new MergeTablets(mergeInfo) : new
DeleteRows(mergeInfo);
// Also test ReserveTablets isReady()
final AbstractFateOperation reserve = new ReserveTablets(mergeInfo);
// First, check no errors with the default case
- assertEquals(0, reserve.isReady(fateId, manager));
- assertNotNull(repo.call(fateId, manager));
+ assertEquals(0, reserve.isReady(fateId, fateEnv));
+ assertNotNull(repo.call(fateId, fateEnv));
// Write a WAL to the test metadata and then re-run the repo to check
for an error
try (TabletsMutator tm = testAmple.mutateTablets()) {
@@ -165,10 +165,10 @@ public class ManagerRepoIT_SimpleSuite extends
SharedMiniClusterBase {
}
// Should not be ready due to the presence of a WAL
- assertTrue(reserve.isReady(fateId, manager) > 0);
+ assertTrue(reserve.isReady(fateId, fateEnv) > 0);
// Repo should throw an exception due to the WAL existence
- var thrown = assertThrows(IllegalStateException.class, () ->
repo.call(fateId, manager));
+ var thrown = assertThrows(IllegalStateException.class, () ->
repo.call(fateId, fateEnv));
assertTrue(thrown.getMessage().contains("has unexpected walogs"));
}
}
@@ -200,57 +200,57 @@ public class ManagerRepoIT_SimpleSuite extends
SharedMiniClusterBase {
TestServerAmpleImpl testAmple = (TestServerAmpleImpl) TestAmple
.create(getCluster().getServerContext(), Map.of(DataLevel.USER,
metadataTable));
testAmple.createMetadataFromExisting(client, tableId);
- Manager manager =
+ FateEnv fateEnv =
mockWithAmple(getCluster().getServerContext(), testAmple,
Duration.ofDays(1));
// Create a test fate id
var fateId = FateId.from(FateInstanceType.USER, UUID.randomUUID());
// Tablet c is set to never merge
- MergeInfo mergeInfo = new MergeInfo(tableId,
manager.getContext().getNamespaceId(tableId),
+ MergeInfo mergeInfo = new MergeInfo(tableId,
fateEnv.getContext().getNamespaceId(tableId),
null, new Text("c").getBytes(), Operation.SYSTEM_MERGE);
- var repo = new VerifyMergeability(mergeInfo).call(fateId, manager);
+ var repo = new VerifyMergeability(mergeInfo).call(fateId, fateEnv);
assertInstanceOf(UnreserveSystemMerge.class, repo);
assertEquals(UnmergeableReason.TABLET_MERGEABILITY,
((UnreserveSystemMerge) repo).getReason());
// Tablets a and b are always merge
- mergeInfo = new MergeInfo(tableId,
manager.getContext().getNamespaceId(tableId), null,
+ mergeInfo = new MergeInfo(tableId,
fateEnv.getContext().getNamespaceId(tableId), null,
new Text("b").getBytes(), Operation.SYSTEM_MERGE);
- assertInstanceOf(MergeTablets.class, new
VerifyMergeability(mergeInfo).call(fateId, manager));
+ assertInstanceOf(MergeTablets.class, new
VerifyMergeability(mergeInfo).call(fateId, fateEnv));
- var context = manager.getContext();
+ var context = fateEnv.getContext();
// split threshold is 10k so default max merge size is 2500 bytes.
// this adds 6 files of 450 each which puts the tablets over teh 2500
threshold
addFileMetadata(context, tableId, null, new Text("c"), 3, 450);
// Data written to the first two tablets totals 2700 bytes and is too
large
- repo = new VerifyMergeability(mergeInfo).call(fateId, manager);
+ repo = new VerifyMergeability(mergeInfo).call(fateId, fateEnv);
assertInstanceOf(UnreserveSystemMerge.class, repo);
assertEquals(UnmergeableReason.MAX_TOTAL_SIZE, ((UnreserveSystemMerge)
repo).getReason());
// Not enough time has passed for Tablet, should be able to merge d and e
- mergeInfo = new MergeInfo(tableId,
manager.getContext().getNamespaceId(tableId),
+ mergeInfo = new MergeInfo(tableId,
fateEnv.getContext().getNamespaceId(tableId),
new Text("c").getBytes(), new Text("e").getBytes(),
Operation.SYSTEM_MERGE);
- repo = new VerifyMergeability(mergeInfo).call(fateId, manager);
+ repo = new VerifyMergeability(mergeInfo).call(fateId, fateEnv);
assertInstanceOf(UnreserveSystemMerge.class, repo);
assertEquals(UnmergeableReason.TABLET_MERGEABILITY,
((UnreserveSystemMerge) repo).getReason());
// update time to 3 days so enough time has passed
- manager = mockWithAmple(getCluster().getServerContext(), testAmple,
Duration.ofDays(3));
- assertInstanceOf(MergeTablets.class, new
VerifyMergeability(mergeInfo).call(fateId, manager));
+ fateEnv = mockWithAmple(getCluster().getServerContext(), testAmple,
Duration.ofDays(3));
+ assertInstanceOf(MergeTablets.class, new
VerifyMergeability(mergeInfo).call(fateId, fateEnv));
// last 3 tablets should total 9 files which is < max of 10
- mergeInfo = new MergeInfo(tableId,
manager.getContext().getNamespaceId(tableId),
+ mergeInfo = new MergeInfo(tableId,
fateEnv.getContext().getNamespaceId(tableId),
new Text("c").getBytes(), null, Operation.SYSTEM_MERGE);
addFileMetadata(context, tableId, new Text("c"), null, 3, 10);
- assertInstanceOf(MergeTablets.class, new
VerifyMergeability(mergeInfo).call(fateId, manager));
+ assertInstanceOf(MergeTablets.class, new
VerifyMergeability(mergeInfo).call(fateId, fateEnv));
// last 3 tablets should total 12 files which is > max of 10
addFileMetadata(context, tableId, new Text("c"), null, 4, 10);
- repo = new VerifyMergeability(mergeInfo).call(fateId, manager);
+ repo = new VerifyMergeability(mergeInfo).call(fateId, fateEnv);
assertInstanceOf(UnreserveSystemMerge.class, repo);
assertEquals(UnmergeableReason.MAX_FILE_COUNT, ((UnreserveSystemMerge)
repo).getReason());
}
@@ -306,7 +306,7 @@ public class ManagerRepoIT_SimpleSuite extends
SharedMiniClusterBase {
testAmple.mutateTablet(extent)
.putOperation(TabletOperationId.from(TabletOperationType.SPLITTING,
fateId)).mutate();
- Manager manager = mockWithAmple(getCluster().getServerContext(),
testAmple);
+ FateEnv fateEnv = mockWithAmple(getCluster().getServerContext(),
testAmple);
assertEquals(opid, testAmple.readTablet(extent).getOperationId());
@@ -314,7 +314,7 @@ public class ManagerRepoIT_SimpleSuite extends
SharedMiniClusterBase {
TabletMergeabilityUtil.systemDefaultSplits(new TreeSet<>(List.of(new
Text("sp1"))))));
// The repo should delete the opid and throw an exception
- assertThrows(ThriftTableOperationException.class, () ->
eoRepo.call(fateId, manager));
+ assertThrows(ThriftTableOperationException.class, () ->
eoRepo.call(fateId, fateEnv));
// the operation id should have been cleaned up before the exception was
thrown
assertNull(testAmple.readTablet(extent).getOperationId());
@@ -347,11 +347,11 @@ public class ManagerRepoIT_SimpleSuite extends
SharedMiniClusterBase {
not(SplitColumnFamily.UNSPLITTABLE_COLUMN));
KeyExtent extent = new KeyExtent(tableId, null, null);
- Manager manager = mockWithAmple(getCluster().getServerContext(),
testAmple);
+ FateEnv fateEnv = mockWithAmple(getCluster().getServerContext(),
testAmple);
FindSplits findSplits = new FindSplits(extent);
PreSplit preSplit = (PreSplit) findSplits
- .call(FateId.from(FateInstanceType.USER, UUID.randomUUID()),
manager);
+ .call(FateId.from(FateInstanceType.USER, UUID.randomUUID()),
fateEnv);
// The table should not need splitting
assertNull(preSplit);
@@ -366,7 +366,7 @@ public class ManagerRepoIT_SimpleSuite extends
SharedMiniClusterBase {
findSplits = new FindSplits(extent);
preSplit = (PreSplit) findSplits.call(FateId.from(FateInstanceType.USER,
UUID.randomUUID()),
- manager);
+ fateEnv);
// The table SHOULD now need splitting
assertNotNull(preSplit);
@@ -408,11 +408,11 @@ public class ManagerRepoIT_SimpleSuite extends
SharedMiniClusterBase {
not(SplitColumnFamily.UNSPLITTABLE_COLUMN));
KeyExtent extent = new KeyExtent(tableId, null, null);
- Manager manager = mockWithAmple(getCluster().getServerContext(),
testAmple);
+ FateEnv fateEnv = mockWithAmple(getCluster().getServerContext(),
testAmple);
FindSplits findSplits = new FindSplits(extent);
PreSplit preSplit = (PreSplit) findSplits
- .call(FateId.from(FateInstanceType.USER, UUID.randomUUID()),
manager);
+ .call(FateId.from(FateInstanceType.USER, UUID.randomUUID()),
fateEnv);
// The table should not need splitting
assertNull(preSplit);
@@ -428,7 +428,7 @@ public class ManagerRepoIT_SimpleSuite extends
SharedMiniClusterBase {
findSplits = new FindSplits(extent);
preSplit = (PreSplit) findSplits.call(FateId.from(FateInstanceType.USER,
UUID.randomUUID()),
- manager);
+ fateEnv);
// The table SHOULD not need splitting
assertNull(preSplit);
@@ -462,8 +462,8 @@ public class ManagerRepoIT_SimpleSuite extends
SharedMiniClusterBase {
TestServerAmpleImpl testAmple = (TestServerAmpleImpl) TestAmple
.create(getCluster().getServerContext(), Map.of(DataLevel.USER,
metadataTable));
testAmple.createMetadataFromExisting(client, tableId);
- Manager manager = mockWithAmple(getCluster().getServerContext(),
testAmple);
- var ctx = manager.getContext();
+ FateEnv fateEnv = mockWithAmple(getCluster().getServerContext(),
testAmple);
+ var ctx = fateEnv.getContext();
// Create the CompactionDriver to test with the given range passed into
the method
final AbstractFateOperation repo = new
CompactionDriver(ctx.getNamespaceId(tableId), tableId,
@@ -489,7 +489,7 @@ public class ManagerRepoIT_SimpleSuite extends
SharedMiniClusterBase {
assertEquals(4, extents.size());
// First call undo using the second fateId and verify there's still
metadata for the first one
- repo.undo(fateId2, manager);
+ repo.undo(fateId2, fateEnv);
try (TabletsMetadata tabletsMetadata =
testAmple.readTablets().forTable(tableId).build()) {
tabletsMetadata.forEach(tm -> {
assertHasCompactionMetadata(fateId1, tm);
@@ -499,7 +499,7 @@ public class ManagerRepoIT_SimpleSuite extends
SharedMiniClusterBase {
// Now call undo on the first fateId which would clean up all the
metadata for all the
// tablets that overlap with the given range that was provided to the
CompactionDriver
// during the creation of the repo
- repo.undo(fateId1, manager);
+ repo.undo(fateId1, fateEnv);
// First, iterate over only the overlapping tablets and verify that
those tablets
// were cleaned up and remove any visited tablets from the extents set