This is an automated email from the ASF dual-hosted git repository.
dlmarion pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/elasticity by this push:
new 5d12597b4d Add LogSorter to Compactor and ScanServer (#4239)
5d12597b4d is described below
commit 5d12597b4dedb60524775d0411405bafc3ee86ce
Author: Dave Marion <[email protected]>
AuthorDate: Wed Feb 14 08:06:49 2024 -0500
Add LogSorter to Compactor and ScanServer (#4239)
Adds LogSorter to ScanServer and Compactor such that
all available processes will participate when a failure
occurs that leaves a Tablet with walogs. Modified LogSorter
and DistributedWorkQueue so that the Compactor could call
the LogSorter and have its tasks execute serially in the
Compactor thread.
Fixes #4232
---
.../org/apache/accumulo/core/conf/Property.java | 3 +
.../MiniAccumuloClusterControl.java | 4 +
.../server/zookeeper/DistributedWorkQueue.java | 97 ++++++---
server/compactor/pom.xml | 4 +
.../org/apache/accumulo/compactor/Compactor.java | 11 +
.../org/apache/accumulo/tserver/ScanServer.java | 17 ++
.../org/apache/accumulo/tserver/TabletServer.java | 20 +-
.../org/apache/accumulo/tserver/log/LogSorter.java | 25 ++-
.../java/org/apache/accumulo/test/RecoveryIT.java | 231 +++++++++++++++++++++
9 files changed, 371 insertions(+), 41 deletions(-)
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index 12f69d2ff9..22c2606c85 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -520,6 +520,9 @@ public enum Property {
@Experimental
SSERV_THREADCHECK("sserver.server.threadcheck.time", "1s",
PropertyType.TIMEDURATION,
"The time between adjustments of the thrift server thread pool.",
"2.1.0"),
+ @Experimental
+ SSERV_WAL_SORT_MAX_CONCURRENT("sserver.wal.sort.concurrent.max", "2",
PropertyType.COUNT,
+ "The maximum number of threads to use to sort logs during recovery.",
"4.0.0"),
// properties that are specific to tablet server behavior
TSERV_PREFIX("tserver.", null, PropertyType.PREFIX,
"Properties in this category affect the behavior of the tablet
servers.", "1.3.5"),
diff --git
a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterControl.java
b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterControl.java
index b3977dab00..21e831c3e0 100644
---
a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterControl.java
+++
b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterControl.java
@@ -519,4 +519,8 @@ public class MiniAccumuloClusterControl implements
ClusterControl {
stop(server, hostname);
}
+ public List<Process> getTabletServers(String resourceGroup) {
+ return tabletServerProcesses.get(resourceGroup);
+ }
+
}
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/zookeeper/DistributedWorkQueue.java
b/server/base/src/main/java/org/apache/accumulo/server/zookeeper/DistributedWorkQueue.java
index cd667909d0..2263e02af4 100644
---
a/server/base/src/main/java/org/apache/accumulo/server/zookeeper/DistributedWorkQueue.java
+++
b/server/base/src/main/java/org/apache/accumulo/server/zookeeper/DistributedWorkQueue.java
@@ -27,6 +27,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -57,7 +58,6 @@ public class DistributedWorkQueue {
private static final Logger log =
LoggerFactory.getLogger(DistributedWorkQueue.class);
- private ThreadPoolExecutor threadPool;
private ZooReaderWriter zoo;
private String path;
private ServerContext context;
@@ -65,12 +65,20 @@ public class DistributedWorkQueue {
private AtomicInteger numTask = new AtomicInteger(0);
- private void lookForWork(final Processor processor, List<String> children) {
+ /**
+ * Finds a child in {@code children} that is not currently being processed
and adds a Runnable to
+ * the {@code executor} that invokes the {@code processor}. The Runnable
will recursively call
+ * {@code lookForWork} after it invokes the {@code processor} such that it
will continue to look
+ * for children that need work until that condition is exhausted. This
method will return early if
+ * the number of currently running tasks is larger than {@code maxThreads}.
+ */
+ private void lookForWork(final Processor processor, final List<String>
children,
+ final ExecutorService executor, final int maxThreads) {
if (children.isEmpty()) {
return;
}
- if (numTask.get() >= threadPool.getCorePoolSize()) {
+ if (numTask.get() >= maxThreads) {
return;
}
@@ -102,7 +110,7 @@ public class DistributedWorkQueue {
}
// Great... we got the lock, but maybe we're too busy
- if (numTask.get() >= threadPool.getCorePoolSize()) {
+ if (numTask.get() >= maxThreads) {
zoo.recursiveDelete(lockPath, NodeMissingPolicy.SKIP);
break;
}
@@ -143,7 +151,7 @@ public class DistributedWorkQueue {
try {
// its important that this is called after numTask is decremented
- lookForWork(processor, zoo.getChildren(path));
+ lookForWork(processor, zoo.getChildren(path), executor,
maxThreads);
} catch (KeeperException e) {
log.error("Failed to look for work", e);
} catch (InterruptedException e) {
@@ -153,7 +161,7 @@ public class DistributedWorkQueue {
};
numTask.incrementAndGet();
- threadPool.execute(task);
+ executor.execute(task);
}
} catch (Exception t) {
@@ -186,40 +194,62 @@ public class DistributedWorkQueue {
return context;
}
- public void startProcessing(final Processor processor, ThreadPoolExecutor
executorService)
- throws KeeperException, InterruptedException {
+ public long getCheckInterval() {
+ return this.timerPeriod;
+ }
- threadPool = executorService;
+ /**
+ * Finds the children at the path passed in the constructor and calls {@code
lookForWork} which
+ * will attempt to process all of the currently available work
+ */
+ public void processExistingWork(final Processor processor, ExecutorService
executor,
+ final int maxThreads, boolean setWatch) throws KeeperException,
InterruptedException {
zoo.mkdirs(path);
zoo.mkdirs(path + "/" + LOCKS_NODE);
- List<String> children = zoo.getChildren(path, new Watcher() {
- @Override
- public void process(WatchedEvent event) {
- switch (event.getType()) {
- case NodeChildrenChanged:
- if (event.getPath().equals(path)) {
- try {
- lookForWork(processor, zoo.getChildren(path, this));
- } catch (KeeperException e) {
- log.error("Failed to look for work at path {}; {}", path,
event, e);
- } catch (InterruptedException e) {
- log.info("Interrupted looking for work at path {}; {}", path,
event, e);
+ List<String> children = null;
+ if (setWatch) {
+ children = zoo.getChildren(path, new Watcher() {
+ @Override
+ public void process(WatchedEvent event) {
+ switch (event.getType()) {
+ case NodeChildrenChanged:
+ if (event.getPath().equals(path)) {
+ try {
+ lookForWork(processor, zoo.getChildren(path, this),
executor, maxThreads);
+ } catch (KeeperException e) {
+ log.error("Failed to look for work at path {}; {}", path,
event, e);
+ } catch (InterruptedException e) {
+ log.info("Interrupted looking for work at path {}; {}",
path, event, e);
+ }
+ } else {
+ log.info("Unexpected path for NodeChildrenChanged event
watching path {}; {}", path,
+ event);
}
- } else {
- log.info("Unexpected path for NodeChildrenChanged event watching
path {}; {}", path,
- event);
- }
- break;
- default:
- log.info("Unexpected event watching path {}; {}", path, event);
- break;
+ break;
+ default:
+ log.info("Unexpected event watching path {}; {}", path, event);
+ break;
+ }
}
- }
- });
+ });
+ } else {
+ children = zoo.getChildren(path);
+ }
+
+ lookForWork(processor, children, executor, maxThreads);
+
+ }
+
+ /**
+ * Calls {@code runOne} to attempt to process all currently available work,
then adds a background
+ * thread that looks for work in the future.
+ */
+ public void processExistingAndFuture(final Processor processor,
+ ThreadPoolExecutor executorService) throws KeeperException,
InterruptedException {
- lookForWork(processor, children);
+ processExistingWork(processor, executorService,
executorService.getCorePoolSize(), true);
// Add a little jitter to avoid all the tservers slamming zookeeper at once
ThreadPools.watchCriticalScheduledTask(
@@ -228,7 +258,8 @@ public class DistributedWorkQueue {
public void run() {
log.debug("Looking for work in {}", path);
try {
- lookForWork(processor, zoo.getChildren(path));
+ lookForWork(processor, zoo.getChildren(path), executorService,
+ executorService.getCorePoolSize());
} catch (KeeperException e) {
log.error("Failed to look for work", e);
} catch (InterruptedException e) {
diff --git a/server/compactor/pom.xml b/server/compactor/pom.xml
index eaa30280f8..0f021dd497 100644
--- a/server/compactor/pom.xml
+++ b/server/compactor/pom.xml
@@ -55,6 +55,10 @@
<groupId>org.apache.accumulo</groupId>
<artifactId>accumulo-start</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.accumulo</groupId>
+ <artifactId>accumulo-tserver</artifactId>
+ </dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client-api</artifactId>
diff --git
a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
index 2d93e42068..6a234824db 100644
---
a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
+++
b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
@@ -109,6 +109,7 @@ import org.apache.accumulo.server.rpc.ServerAddress;
import org.apache.accumulo.server.rpc.TServerUtils;
import org.apache.accumulo.server.rpc.ThriftProcessorTypes;
import org.apache.accumulo.server.security.SecurityOperation;
+import org.apache.accumulo.tserver.log.LogSorter;
import org.apache.hadoop.fs.Path;
import org.apache.thrift.TException;
import org.apache.thrift.transport.TTransportException;
@@ -635,6 +636,8 @@ public class Compactor extends AbstractServer implements
MetricsProducer, Compac
final AtomicReference<Throwable> err = new AtomicReference<>();
final AtomicLong timeSinceLastCompletion = new AtomicLong(0L);
+ final LogSorter logSorter = new LogSorter(getContext(),
getConfiguration());
+ long nextSortLogsCheckTime = System.currentTimeMillis();
while (!shutdown) {
@@ -649,6 +652,14 @@ public class Compactor extends AbstractServer implements
MetricsProducer, Compac
err.set(null);
JOB_HOLDER.reset();
+ if (System.currentTimeMillis() > nextSortLogsCheckTime) {
+ // Attempt to process all existing log sorting work serially in this
thread.
+ // When no work remains, this call will return so that we can look
for compaction
+ // work.
+ LOG.debug("Checking to see if any recovery logs need sorting");
+ nextSortLogsCheckTime = logSorter.sortLogsIfNeeded();
+ }
+
TExternalCompactionJob job;
try {
job = getNextJob(getNextId());
diff --git
a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
index 2efd38b709..a5d74b3c07 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
@@ -103,6 +103,7 @@ import org.apache.accumulo.server.rpc.TServerUtils;
import org.apache.accumulo.server.rpc.ThriftProcessorTypes;
import org.apache.accumulo.server.security.SecurityUtil;
import
org.apache.accumulo.tserver.TabletServerResourceManager.TabletResourceManager;
+import org.apache.accumulo.tserver.log.LogSorter;
import org.apache.accumulo.tserver.metrics.TabletServerScanMetrics;
import org.apache.accumulo.tserver.session.MultiScanSession;
import org.apache.accumulo.tserver.session.ScanSession;
@@ -390,6 +391,22 @@ public class ScanServer extends AbstractServer
ServiceLock lock = announceExistence();
+ int threadPoolSize =
getConfiguration().getCount(Property.SSERV_WAL_SORT_MAX_CONCURRENT);
+ if (threadPoolSize > 0) {
+ final LogSorter logSorter = new LogSorter(context, getConfiguration());
+ try {
+ // Attempt to process all existing log sorting work and start a
background
+ // thread to look for log sorting work in the future
+ logSorter.startWatchingForRecoveryLogs(threadPoolSize);
+ } catch (Exception ex) {
+ log.error("Error starting LogSorter");
+ throw new RuntimeException(ex);
+ }
+ } else {
+ log.info(
+ "Log sorting for tablet recovery is disabled,
SSERV_WAL_SORT_MAX_CONCURRENT is less than 1.");
+ }
+
try {
while (!serverStopRequested) {
UtilWaitThread.sleep(1000);
diff --git
a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index 90244c3ba1..0de617455c 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -602,12 +602,22 @@ public class TabletServer extends AbstractServer
implements TabletHostingServer
throw new RuntimeException(e);
}
- try {
- logSorter.startWatchingForRecoveryLogs();
- } catch (Exception ex) {
- log.error("Error setting watches for recoveries");
- throw new RuntimeException(ex);
+ int threadPoolSize =
+
getContext().getConfiguration().getCount(Property.TSERV_WAL_SORT_MAX_CONCURRENT);
+ if (threadPoolSize > 0) {
+ try {
+ // Attempt to process all existing log sorting work and start a
background
+ // thread to look for log sorting work in the future
+ logSorter.startWatchingForRecoveryLogs(threadPoolSize);
+ } catch (Exception ex) {
+ log.error("Error starting LogSorter");
+ throw new RuntimeException(ex);
+ }
+ } else {
+ log.info(
+ "Log sorting for tablet recovery is disabled,
TSERV_WAL_SORT_MAX_CONCURRENT is less than 1.");
}
+
final AccumuloConfiguration aconf = getConfiguration();
final long onDemandUnloaderInterval =
diff --git
a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java
index 33ce1989e4..dda829d0a8 100644
---
a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java
+++
b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java
@@ -62,6 +62,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.MoreExecutors;
public class LogSorter {
@@ -290,12 +291,30 @@ public class LogSorter {
}
}
- public void startWatchingForRecoveryLogs() throws KeeperException,
InterruptedException {
- int threadPoolSize =
this.conf.getCount(Property.TSERV_WAL_SORT_MAX_CONCURRENT);
+ /**
+ * Sort any logs that need sorting in the current thread.
+ *
+ * @return The time in millis when the next check can be done.
+ */
+ public long sortLogsIfNeeded() throws KeeperException, InterruptedException {
+ DistributedWorkQueue dwq = new DistributedWorkQueue(
+ context.getZooKeeperRoot() + Constants.ZRECOVERY, sortedLogConf,
context);
+ dwq.processExistingWork(new LogProcessor(),
MoreExecutors.newDirectExecutorService(), 1, false);
+ return System.currentTimeMillis() + dwq.getCheckInterval();
+ }
+
+ /**
+ * Sort any logs that need sorting in a ThreadPool using
+ * {@link Property#TSERV_WAL_SORT_MAX_CONCURRENT} threads. This method will
start a background
+ * thread to look for log sorting work in the future that will be processed
by the
+ * ThreadPoolExecutor
+ */
+ public void startWatchingForRecoveryLogs(int threadPoolSize)
+ throws KeeperException, InterruptedException {
ThreadPoolExecutor threadPool = ThreadPools.getServerThreadPools()
.createFixedThreadPool(threadPoolSize, this.getClass().getName(),
true);
new DistributedWorkQueue(context.getZooKeeperRoot() + Constants.ZRECOVERY,
sortedLogConf,
- context).startProcessing(new LogProcessor(), threadPool);
+ context).processExistingAndFuture(new LogProcessor(), threadPool);
}
public List<RecoveryStatus> getLogSorts() {
diff --git a/test/src/main/java/org/apache/accumulo/test/RecoveryIT.java
b/test/src/main/java/org/apache/accumulo/test/RecoveryIT.java
new file mode 100644
index 0000000000..c8fb440eea
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/RecoveryIT.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test;
+
+import static org.apache.accumulo.harness.AccumuloITBase.MINI_CLUSTER_ONLY;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.fail;
+
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.stream.IntStream;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.Accumulo;
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.admin.NewTableConfiguration;
+import org.apache.accumulo.core.client.admin.TabletAvailability;
+import org.apache.accumulo.core.clientImpl.ClientContext;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata;
+import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
+import org.apache.accumulo.core.spi.balancer.TableLoadBalancer;
+import org.apache.accumulo.core.tabletserver.log.LogEntry;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.accumulo.minicluster.ServerType;
+import org.apache.accumulo.miniclusterImpl.MiniAccumuloClusterControl;
+import org.apache.accumulo.miniclusterImpl.MiniAccumuloClusterImpl;
+import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.log.SortedLogState;
+import org.apache.accumulo.server.manager.recovery.RecoveryPath;
+import org.apache.accumulo.test.functional.ReadWriteIT;
+import org.apache.accumulo.test.util.Wait;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.hadoop.io.Text;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+@Tag(MINI_CLUSTER_ONLY)
+public class RecoveryIT extends AccumuloClusterHarness {
+
+ private static final String RESOURCE_GROUP = "RG1";
+
+ private volatile boolean disableTabletServerLogSorting = false;
+
+ @Override
+ protected Duration defaultTimeout() {
+ return Duration.ofMinutes(3);
+ }
+
+ @Override
+ public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration
hadoopCoreSite) {
+ cfg.getClusterServerConfiguration().setNumDefaultTabletServers(1);
+ cfg.setProperty(Property.MANAGER_TABLET_GROUP_WATCHER_INTERVAL, "5s");
+
cfg.getClusterServerConfiguration().addTabletServerResourceGroup(RESOURCE_GROUP,
3);
+ cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "15s");
+ if (disableTabletServerLogSorting) {
+ cfg.setProperty(Property.TSERV_WAL_SORT_MAX_CONCURRENT, "0");
+ }
+ // file system supports recovery
+ hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
+ }
+
+ @Override
+ public void setupCluster() throws Exception {
+ // Do *NOT* startup the cluster here. We are doing it in the test
+ // method so that we can modify the properties for each test run
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = {"TSERVER", "SSERVER", "COMPACTOR"})
+ public void test(String serverForSorting) throws Exception {
+
+ // Determine whether or not we need to disable log sorting
+ // in the TabletServer. We want to do this when the serverForSorting
+ // parameter is SSERVER or COMPACTOR
+ switch (serverForSorting) {
+ case "TSERVER":
+ disableTabletServerLogSorting = false;
+ break;
+ case "SSERVER":
+ case "COMPACTOR":
+ default:
+ disableTabletServerLogSorting = true;
+ }
+
+ // Start the cluster
+ super.setupCluster();
+
+ // create a table
+ String tableName = getUniqueNames(1)[0];
+ try (AccumuloClient c =
Accumulo.newClient().from(getClientProps()).build()) {
+
+ SortedSet<Text> splits = new TreeSet<>();
+ IntStream.range(97, 122).forEach(i -> splits.add(new Text(new String(""
+ i))));
+
+ NewTableConfiguration ntc = new NewTableConfiguration();
+ Map<String,String> tableProps = new HashMap<>();
+ tableProps.put(Property.TABLE_MAJC_RATIO.getKey(), "100");
+ tableProps.put(Property.TABLE_FILE_MAX.getKey(), "3");
+ tableProps.put(TableLoadBalancer.TABLE_ASSIGNMENT_GROUP_PROPERTY,
RESOURCE_GROUP);
+ ntc.setProperties(tableProps);
+ ntc.withSplits(splits);
+ ntc.withInitialTabletAvailability(TabletAvailability.ONDEMAND);
+ c.tableOperations().create(tableName, ntc);
+
+ c.instanceOperations().waitForBalance();
+
+ TableId tid =
TableId.of(c.tableOperations().tableIdMap().get(tableName));
+
+ ReadWriteIT.ingest(c, 1000, 1, 1, 0, "", tableName, 100);
+
+ // Confirm that there are no walog entries for this table
+ assertEquals(0, countWaLogEntries(c, tid));
+
+ MiniAccumuloClusterImpl mini = (MiniAccumuloClusterImpl) cluster;
+ MiniAccumuloClusterControl control = (MiniAccumuloClusterControl)
cluster.getClusterControl();
+
+ // Stop any running Compactors and ScanServers
+ control.stopAllServers(ServerType.COMPACTOR);
+ control.stopAllServers(ServerType.SCAN_SERVER);
+
+ // Kill the TabletServer in resource group that is hosting the table
+ List<Process> procs = control.getTabletServers(RESOURCE_GROUP);
+ assertEquals(3, procs.size());
+ for (int i = 0; i < 3; i++) {
+ procs.get(i).destroyForcibly().waitFor();
+ }
+ control.getTabletServers(RESOURCE_GROUP).clear();
+
+ // The TabletGroupWatcher in the Manager will notice that the
TabletServer is dead
+ // and will assign the TabletServer's walog to all of the tablets that
were assigned
+ // to that server. Confirm that walog entries exist for this tablet
+ if (!serverForSorting.equals("TSERVER")) {
+ Wait.waitFor(() -> countWaLogEntries(c, tid) > 0, 60_000);
+ }
+
+ // Start the process that will do the log sorting
+ switch (serverForSorting) {
+ case "TSERVER":
+ // We don't need to re-start the TabletServers here, there is
+ // already a TabletServer running in the default group that
+ // is hosting the root and metadata tables. It should perform
+ // the log sorting.
+ break;
+ case "SSERVER":
+
mini.getConfig().getClusterServerConfiguration().setNumDefaultScanServers(1);
+ getClusterControl().startAllServers(ServerType.SCAN_SERVER);
+ break;
+ case "COMPACTOR":
+
mini.getConfig().getClusterServerConfiguration().setNumDefaultCompactors(1);
+ getClusterControl().startAllServers(ServerType.COMPACTOR);
+ break;
+ case "ALL":
+ default:
+ fail("Unhandled server type: " + serverForSorting);
+ }
+
+ // Confirm sorting completed
+ Wait.waitFor(() -> logSortingCompleted(c, tid) == true, 60_000);
+
+ // Start the tablet servers so that the Manager
+ // can assign the table and so that recovery can be completed.
+ getClusterControl().startAllServers(ServerType.TABLET_SERVER);
+
+ c.instanceOperations().waitForBalance();
+
+ // When the tablet is hosted, the sorted walogs will be applied
+ // Confirm the 3 walog entries are gone for this tablet
+ assertEquals(0, countWaLogEntries(c, tid));
+
+ // Scan the data and make sure its there.
+ ReadWriteIT.verify(c, 1000, 1, 1, 0, "", tableName);
+
+ }
+ }
+
+ private long countWaLogEntries(AccumuloClient c, TableId tableId) throws
Exception {
+ try (TabletsMetadata tabletsMetadata = ((ClientContext)
c).getAmple().readTablets()
+ .forTable(tableId).fetch(TabletMetadata.ColumnType.LOGS).build()) {
+ return tabletsMetadata.stream().filter(tabletMetadata ->
tabletMetadata.getLogs() != null)
+ .count();
+ }
+ }
+
+ private boolean logSortingCompleted(AccumuloClient c, TableId tableId)
throws Exception {
+ try (TabletsMetadata tabletsMetadata = ((ClientContext)
c).getAmple().readTablets()
+ .forTable(tableId).fetch(TabletMetadata.ColumnType.LOGS).build()) {
+ ServerContext ctx = getCluster().getServerContext();
+ for (TabletMetadata tm : tabletsMetadata) {
+ for (LogEntry walog : tm.getLogs()) {
+ String sortId = walog.getUniqueID().toString();
+ String filename = walog.getPath();
+ String dest = RecoveryPath.getRecoveryPath(new
Path(filename)).toString();
+
+ if (ctx.getZooCache().get(ctx.getZooKeeperRoot() +
Constants.ZRECOVERY + "/" + sortId)
+ != null
+ ||
!ctx.getVolumeManager().exists(SortedLogState.getFinishedMarkerPath(dest))) {
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+ }
+
+}