This is an automated email from the ASF dual-hosted git repository.
dlmarion pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push:
new 501efad92b Fixes for process exit, added ExitCodesIT (#5811)
501efad92b is described below
commit 501efad92bee10aad2910b0984752879d5e42322
Author: Dave Marion <[email protected]>
AuthorDate: Wed Aug 27 13:53:29 2025 -0400
Fixes for process exit, added ExitCodesIT (#5811)
Various changes to clean up the exit process
of the server processes. Added ExitCodesIT
to confirm exit codes under different conditions.
---
.../accumulo/core/clientImpl/ClientContext.java | 17 +-
.../accumulo/core/lock/ServiceLockSupport.java | 10 +-
.../java/org/apache/accumulo/core/util/Halt.java | 2 +-
.../org/apache/accumulo/server/AbstractServer.java | 53 +--
.../org/apache/accumulo/server/ServerContext.java | 3 +
.../accumulo/server/manager/LiveTServerSet.java | 2 +-
.../accumulo/server/mem/LowMemoryDetector.java | 2 +-
.../org/apache/accumulo/compactor/Compactor.java | 395 ++++++++++-----------
.../apache/accumulo/gc/SimpleGarbageCollector.java | 14 +-
.../java/org/apache/accumulo/manager/Manager.java | 15 +-
.../accumulo/manager/TabletGroupWatcher.java | 4 +-
.../org/apache/accumulo/tserver/ScanServer.java | 90 ++---
.../org/apache/accumulo/tserver/TabletServer.java | 14 +-
.../accumulo/tserver/log/TabletServerLogger.java | 2 +-
.../accumulo/tserver/tablet/MinorCompactor.java | 2 +-
.../org/apache/accumulo/tserver/tablet/Tablet.java | 2 +-
.../accumulo/test/functional/ExitCodesIT.java | 334 +++++++++++++++++
17 files changed, 635 insertions(+), 326 deletions(-)
diff --git
a/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java
b/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java
index 79ac3440a4..dc5a03288e 100644
--- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java
+++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java
@@ -918,21 +918,26 @@ public class ClientContext implements AccumuloClient {
@Override
public synchronized void close() {
if (closed.compareAndSet(false, true)) {
- if (zooCacheCreated.get()) {
- zooCache.get().close();
- }
- if (zooKeeperOpened.get()) {
- zooSession.get().close();
- }
if (thriftTransportPool != null) {
+ log.debug("Closing Thrift Transport Pool");
thriftTransportPool.shutdown();
}
if (scannerReadaheadPool != null) {
+ log.debug("Closing Scanner ReadAhead Pool");
scannerReadaheadPool.shutdownNow(); // abort all tasks, client is
shutting down
}
if (cleanupThreadPool != null) {
+ log.debug("Closing Cleanup ThreadPool");
cleanupThreadPool.shutdown(); // wait for shutdown tasks to execute
}
+ if (zooCacheCreated.get()) {
+ log.debug("Closing ZooCache");
+ zooCache.get().close();
+ }
+ if (zooKeeperOpened.get()) {
+ log.debug("Closing ZooSession");
+ zooSession.get().close();
+ }
}
}
diff --git
a/core/src/main/java/org/apache/accumulo/core/lock/ServiceLockSupport.java
b/core/src/main/java/org/apache/accumulo/core/lock/ServiceLockSupport.java
index 19be1b6f4f..fad535fefa 100644
--- a/core/src/main/java/org/apache/accumulo/core/lock/ServiceLockSupport.java
+++ b/core/src/main/java/org/apache/accumulo/core/lock/ServiceLockSupport.java
@@ -82,13 +82,13 @@ public class ServiceLockSupport {
Halt.halt(0, server + " lock in zookeeper lost (reason = " + reason
+ "), exiting cleanly because shutdown is complete.");
} else {
- Halt.halt(-1, server + " lock in zookeeper lost (reason = " + reason +
"), exiting!");
+ Halt.halt(1, server + " lock in zookeeper lost (reason = " + reason +
"), exiting!");
}
}
@Override
public void unableToMonitorLockNode(final Exception e) {
- Halt.halt(-1, "FATAL: No longer able to monitor " + server + " lock
node", e);
+ Halt.halt(1, "FATAL: No longer able to monitor " + server + " lock
node", e);
}
@Override
@@ -96,7 +96,7 @@ public class ServiceLockSupport {
LOG.debug("Acquired {} lock", server);
if (acquiredLock || failedToAcquireLock) {
- Halt.halt(-1, "Zoolock in unexpected state AL " + acquiredLock + " " +
failedToAcquireLock);
+ Halt.halt(1, "Zoolock in unexpected state AL " + acquiredLock + " " +
failedToAcquireLock);
}
acquiredLock = true;
@@ -111,11 +111,11 @@ public class ServiceLockSupport {
String msg =
"Failed to acquire " + server + " lock due to incorrect ZooKeeper
authentication.";
LOG.error("{} Ensure instance.secret is consistent across Accumulo
configuration", msg, e);
- Halt.halt(-1, msg);
+ Halt.halt(1, msg);
}
if (acquiredLock) {
- Halt.halt(-1,
+ Halt.halt(1,
"Zoolock in unexpected state acquiredLock true with FAL " +
failedToAcquireLock);
}
diff --git a/core/src/main/java/org/apache/accumulo/core/util/Halt.java
b/core/src/main/java/org/apache/accumulo/core/util/Halt.java
index 48e82deb9b..bea10e9b12 100644
--- a/core/src/main/java/org/apache/accumulo/core/util/Halt.java
+++ b/core/src/main/java/org/apache/accumulo/core/util/Halt.java
@@ -40,7 +40,7 @@ public class Halt {
halt(status, msg, null, runnable);
}
- public static void halt(final int status, final String msg, final Throwable
exception,
+ private static void halt(final int status, final String msg, final Throwable
exception,
final Runnable runnable) {
try {
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java
b/server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java
index e6543afe17..0b94dc1203 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java
@@ -76,24 +76,7 @@ public abstract class AbstractServer
}
public static void startServer(AbstractServer server, Logger LOG) throws
Exception {
- try {
- server.runServer();
- } catch (Throwable e) {
- System.err
- .println(server.getClass().getSimpleName() + " died, exception
thrown from runServer.");
- e.printStackTrace();
- LOG.error("{} died, exception thrown from runServer.",
server.getClass().getSimpleName(), e);
- throw e;
- } finally {
- try {
- server.close();
- } catch (Throwable e) {
- System.err.println("Exception thrown while closing " +
server.getClass().getSimpleName());
- e.printStackTrace();
- LOG.error("Exception thrown while closing {}",
server.getClass().getSimpleName(), e);
- throw e;
- }
- }
+ server.runServer();
}
private final MetricSource metricSource;
@@ -112,6 +95,7 @@ public abstract class AbstractServer
private volatile Thread verificationThread;
private final AtomicBoolean shutdownRequested = new AtomicBoolean(false);
private final AtomicBoolean shutdownComplete = new AtomicBoolean(false);
+ private final AtomicBoolean closed = new AtomicBoolean(false);
protected AbstractServer(ServerId.Type serverType, ConfigOpts opts,
Function<SiteConfiguration,ServerContext> serverContextFactory, String[]
args) {
@@ -203,7 +187,8 @@ public abstract class AbstractServer
*
* @param isIdle whether the server is idle
*/
- protected void updateIdleStatus(boolean isIdle) {
+ // public for ExitCodesIT
+ public void updateIdleStatus(boolean isIdle) {
boolean shouldResetIdlePeriod = !isIdle || idleReportingPeriodMillis == 0;
boolean hasIdlePeriodStarted = idlePeriodTimer != null;
boolean hasExceededIdlePeriod =
@@ -290,7 +275,10 @@ public abstract class AbstractServer
*/
public void runServer() throws Exception {
final AtomicReference<Throwable> err = new AtomicReference<>();
- serverThread = new Thread(TraceUtil.wrap(this), applicationName);
+ serverThread = new Thread(TraceUtil.wrap(() -> {
+ this.run();
+ close();
+ }), applicationName);
serverThread.setUncaughtExceptionHandler((thread, exception) ->
err.set(exception));
serverThread.start();
serverThread.join();
@@ -331,7 +319,8 @@ public abstract class AbstractServer
return bindAddress;
}
- protected TServer getThriftServer() {
+ // public for ExitCodesIT
+ public TServer getThriftServer() {
if (thriftServer == null) {
return null;
}
@@ -451,7 +440,7 @@ public abstract class AbstractServer
log.trace(
"ServiceLockVerificationThread - checking ServiceLock
existence in ZooKeeper");
if (lock != null && !lock.verifyLockAtSource()) {
- Halt.halt(-1, "Lock verification thread could not find
lock");
+ Halt.halt(1, "Lock verification thread could not find lock");
}
// Need to sleep, not yield when the thread priority is
greater than NORM_PRIORITY
// so that this thread does not get immediately rescheduled.
@@ -476,8 +465,21 @@ public abstract class AbstractServer
@Override
public void close() {
- if (context != null) {
- context.close();
+
+ if (closed.compareAndSet(false, true)) {
+
+ // Must set shutdown as completed before calling ServerContext.close().
+ // ServerContext.close() calls ClientContext.close() ->
+ // ZooSession.close() which removes all of the ephemeral nodes and
+ // forces the watches to fire. The ServiceLockWatcher has a reference
+ // to shutdownComplete and will terminate the JVM with a 0 exit code
+ // if true. Otherwise it will exit with a non-zero exit code.
+ getShutdownComplete().set(true);
+
+ if (context != null) {
+ context.getLowMemoryDetector().logGCInfo(getConfiguration());
+ context.close();
+ }
}
}
@@ -488,4 +490,7 @@ public abstract class AbstractServer
}
}
+ public void requestShutdownForTests() {
+ shutdownRequested.set(true);
+ }
}
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/ServerContext.java
b/server/base/src/main/java/org/apache/accumulo/server/ServerContext.java
index 4363648ff5..a6919bed3e 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/ServerContext.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/ServerContext.java
@@ -522,9 +522,11 @@ public class ServerContext extends ClientContext {
getMetricsInfo().close();
}
if (sharedSchedExecutorCreated.get()) {
+ log.debug("Shutting down shared executor pool");
getScheduledExecutor().shutdownNow();
}
if (sharedMetadataWriterCreated.get()) {
+ log.debug("Shutting down shared metadata conditional writer");
try {
ConditionalWriter writer = sharedMetadataWriter.get();
if (writer != null) {
@@ -535,6 +537,7 @@ public class ServerContext extends ClientContext {
}
}
if (sharedUserWriterCreated.get()) {
+ log.debug("Shutting down shared user conditional writer");
try {
ConditionalWriter writer = sharedUserWriter.get();
if (writer != null) {
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java
b/server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java
index a178f0b44b..d367f356eb 100644
---
a/server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java
+++
b/server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java
@@ -519,7 +519,7 @@ public class LiveTServerSet implements ZooCacheWatcher {
try {
context.getZooSession().asReaderWriter().recursiveDelete(slp.toString(), SKIP);
} catch (Exception e) {
- Halt.halt(-1, "error removing tablet server lock", e);
+ Halt.halt(1, "error removing tablet server lock", e);
}
context.getZooCache().clear(slp.toString());
}
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/mem/LowMemoryDetector.java
b/server/base/src/main/java/org/apache/accumulo/server/mem/LowMemoryDetector.java
index 8d745e7774..ca320b3cd1 100644
---
a/server/base/src/main/java/org/apache/accumulo/server/mem/LowMemoryDetector.java
+++
b/server/base/src/main/java/org/apache/accumulo/server/mem/LowMemoryDetector.java
@@ -193,7 +193,7 @@ public class LowMemoryDetector {
}
if (maxIncreaseInCollectionTime > keepAliveTimeout) {
- Halt.halt(-1, "Garbage collection may be interfering with lock
keep-alive. Halting.");
+ Halt.halt(1, "Garbage collection may be interfering with lock
keep-alive. Halting.");
}
localState.lastMemorySize = freeMemory;
diff --git
a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
index 64bf2b0200..9772d4bd9f 100644
---
a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
+++
b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
@@ -829,237 +829,236 @@ public class Compactor extends AbstractServer
implements MetricsProducer, Compac
getConfiguration().getTimeInMillis(Property.COMPACTOR_CANCEL_CHECK_INTERVAL));
LOG.info("Compactor started, waiting for work");
- try {
-
- final AtomicReference<Throwable> err = new AtomicReference<>();
- final LogSorter logSorter = new LogSorter(this);
- long nextSortLogsCheckTime = System.currentTimeMillis();
- while (!isShutdownRequested()) {
- if (Thread.currentThread().isInterrupted()) {
- LOG.info("Server process thread has been interrupted, shutting
down");
- break;
- }
- try {
- // mark compactor as idle while not in the compaction loop
- updateIdleStatus(true);
+ final AtomicReference<Throwable> err = new AtomicReference<>();
+ final LogSorter logSorter = new LogSorter(this);
+ long nextSortLogsCheckTime = System.currentTimeMillis();
- currentCompactionId.set(null);
- err.set(null);
- JOB_HOLDER.reset();
-
- if (System.currentTimeMillis() > nextSortLogsCheckTime) {
- // Attempt to process all existing log sorting work serially in
this thread.
- // When no work remains, this call will return so that we can look
for compaction
- // work.
- LOG.debug("Checking to see if any recovery logs need sorting");
+ while (!isShutdownRequested()) {
+ if (Thread.currentThread().isInterrupted()) {
+ LOG.info("Server process thread has been interrupted, shutting down");
+ break;
+ }
+ try {
+ // mark compactor as idle while not in the compaction loop
+ updateIdleStatus(true);
+
+ currentCompactionId.set(null);
+ err.set(null);
+ JOB_HOLDER.reset();
+
+ if (System.currentTimeMillis() > nextSortLogsCheckTime) {
+ // Attempt to process all existing log sorting work serially in this
thread.
+ // When no work remains, this call will return so that we can look
for compaction
+ // work.
+ LOG.debug("Checking to see if any recovery logs need sorting");
+ try {
nextSortLogsCheckTime = logSorter.sortLogsIfNeeded();
+ } catch (KeeperException e) {
+ LOG.error("Error sorting logs", e);
}
+ }
- performFailureProcessing(errorHistory);
+ performFailureProcessing(errorHistory);
- TExternalCompactionJob job;
- try {
- TNextCompactionJob next = getNextJob(getNextId());
- job = next.getJob();
- if (!job.isSetExternalCompactionId()) {
- LOG.trace("No external compactions in queue {}",
this.getResourceGroup());
-
UtilWaitThread.sleep(getWaitTimeBetweenCompactionChecks(next.getCompactorCount()));
- continue;
- }
- if
(!job.getExternalCompactionId().equals(currentCompactionId.get().toString())) {
- throw new IllegalStateException("Returned eci " +
job.getExternalCompactionId()
- + " does not match supplied eci " +
currentCompactionId.get());
- }
- } catch (RetriesExceededException e2) {
- LOG.warn("Retries exceeded getting next job. Retrying...");
+ TExternalCompactionJob job;
+ try {
+ TNextCompactionJob next = getNextJob(getNextId());
+ job = next.getJob();
+ if (!job.isSetExternalCompactionId()) {
+ LOG.trace("No external compactions in queue {}",
this.getResourceGroup());
+
UtilWaitThread.sleep(getWaitTimeBetweenCompactionChecks(next.getCompactorCount()));
continue;
}
- LOG.debug("Received next compaction job: {}", job);
+ if
(!job.getExternalCompactionId().equals(currentCompactionId.get().toString())) {
+ throw new IllegalStateException("Returned eci " +
job.getExternalCompactionId()
+ + " does not match supplied eci " + currentCompactionId.get());
+ }
+ } catch (RetriesExceededException e2) {
+ LOG.warn("Retries exceeded getting next job. Retrying...");
+ continue;
+ }
+ LOG.debug("Received next compaction job: {}", job);
- final LongAdder totalInputEntries = new LongAdder();
- final LongAdder totalInputBytes = new LongAdder();
- final CountDownLatch started = new CountDownLatch(1);
- final CountDownLatch stopped = new CountDownLatch(1);
+ final LongAdder totalInputEntries = new LongAdder();
+ final LongAdder totalInputBytes = new LongAdder();
+ final CountDownLatch started = new CountDownLatch(1);
+ final CountDownLatch stopped = new CountDownLatch(1);
- final FileCompactorRunnable fcr =
- createCompactionJob(job, totalInputEntries, totalInputBytes,
started, stopped, err);
+ final FileCompactorRunnable fcr =
+ createCompactionJob(job, totalInputEntries, totalInputBytes,
started, stopped, err);
- final Thread compactionThread = Threads.createNonCriticalThread(
- "Compaction job for tablet " + job.getExtent().toString(), fcr);
+ final Thread compactionThread = Threads.createNonCriticalThread(
+ "Compaction job for tablet " + job.getExtent().toString(), fcr);
- JOB_HOLDER.set(job, compactionThread, fcr.getFileCompactor());
+ JOB_HOLDER.set(job, compactionThread, fcr.getFileCompactor());
- try {
- // mark compactor as busy while compacting
- updateIdleStatus(false);
+ try {
+ // mark compactor as busy while compacting
+ updateIdleStatus(false);
+ try {
// Need to call FileCompactorRunnable.initialize after calling
JOB_HOLDER.set
fcr.initialize();
-
- compactionThread.start(); // start the compactionThread
- started.await(); // wait until the compactor is started
- final long inputEntries = totalInputEntries.sum();
- final long waitTime =
calculateProgressCheckTime(totalInputBytes.sum());
- LOG.debug("Progress checks will occur every {} seconds", waitTime);
- String percentComplete = "unknown";
-
- while (!stopped.await(waitTime, TimeUnit.SECONDS)) {
- List<CompactionInfo> running =
-
org.apache.accumulo.server.compaction.FileCompactor.getRunningCompactions();
- if (!running.isEmpty()) {
- // Compaction has started. There should only be one in the list
- CompactionInfo info = running.get(0);
- if (info != null) {
- final long entriesRead = info.getEntriesRead();
- final long entriesWritten = info.getEntriesWritten();
- if (inputEntries > 0) {
- percentComplete = Float.toString((entriesRead / (float)
inputEntries) * 100);
- }
- String message = String.format(
- "Compaction in progress, read %d of %d input entries (
%s %s ), written %d entries",
- entriesRead, inputEntries, percentComplete, "%",
entriesWritten);
- watcher.run();
- try {
- LOG.debug("Updating coordinator with compaction progress:
{}.", message);
- TCompactionStatusUpdate update = new
TCompactionStatusUpdate(
- TCompactionState.IN_PROGRESS, message, inputEntries,
entriesRead,
- entriesWritten, fcr.getCompactionAge().toNanos());
- updateCompactionState(job, update);
- } catch (RetriesExceededException e) {
- LOG.warn("Error updating coordinator with compaction
progress, error: {}",
- e.getMessage());
- }
- }
- } else {
- LOG.debug("Waiting on compaction thread to finish, but no
RUNNING compaction");
- }
- }
- compactionThread.join();
- LOG.trace("Compaction thread finished.");
- // Run the watcher again to clear out the finished compaction and
set the
- // stuck count to zero.
- watcher.run();
-
- if (err.get() != null) {
- // maybe the error occured because the table was deleted or
something like that, so
- // force a cancel check to possibly reduce noise in the logs
- checkIfCanceled();
+ } catch (RetriesExceededException e) {
+ LOG.error(
+ "Error starting FileCompactableRunnable, cancelling compaction
and moving to next job.",
+ e);
+ try {
+ cancel(job.getExternalCompactionId());
+ } catch (TException e1) {
+ LOG.error("Error cancelling compaction.", e1);
}
+ continue;
+ }
- if (compactionThread.isInterrupted() || JOB_HOLDER.isCancelled()
- || (err.get() != null &&
err.get().getClass().equals(InterruptedException.class))) {
- LOG.warn("Compaction thread was interrupted, sending CANCELLED
state");
- try {
- TCompactionStatusUpdate update =
- new TCompactionStatusUpdate(TCompactionState.CANCELLED,
"Compaction cancelled",
- -1, -1, -1, fcr.getCompactionAge().toNanos());
- updateCompactionState(job, update);
- updateCompactionFailed(job,
InterruptedException.class.getName());
- cancelled.incrementAndGet();
- } catch (RetriesExceededException e) {
- LOG.error("Error updating coordinator with compaction
cancellation.", e);
- } finally {
- currentCompactionId.set(null);
- }
- } else if (err.get() != null) {
- final KeyExtent fromThriftExtent =
KeyExtent.fromThrift(job.getExtent());
- try {
- LOG.info("Updating coordinator with compaction failure: id:
{}, extent: {}",
- job.getExternalCompactionId(), fromThriftExtent);
- TCompactionStatusUpdate update = new TCompactionStatusUpdate(
- TCompactionState.FAILED, "Compaction failed due to: " +
err.get().getMessage(),
- -1, -1, -1, fcr.getCompactionAge().toNanos());
- updateCompactionState(job, update);
- updateCompactionFailed(job, err.get().getClass().getName());
- failed.incrementAndGet();
- errorHistory.addError(fromThriftExtent.tableId(), err.get());
- } catch (RetriesExceededException e) {
- LOG.error("Error updating coordinator with compaction failure:
id: {}, extent: {}",
- job.getExternalCompactionId(), fromThriftExtent, e);
- } finally {
- currentCompactionId.set(null);
- }
- } else {
- try {
- LOG.trace("Updating coordinator with compaction completion.");
- updateCompactionCompleted(job, JOB_HOLDER.getStats());
- completed.incrementAndGet();
- // job completed successfully, clear the error history
- errorHistory.clear();
- } catch (RetriesExceededException e) {
- LOG.error(
- "Error updating coordinator with compaction completion,
cancelling compaction.",
- e);
+ compactionThread.start(); // start the compactionThread
+ started.await(); // wait until the compactor is started
+ final long inputEntries = totalInputEntries.sum();
+ final long waitTime =
calculateProgressCheckTime(totalInputBytes.sum());
+ LOG.debug("Progress checks will occur every {} seconds", waitTime);
+ String percentComplete = "unknown";
+
+ while (!stopped.await(waitTime, TimeUnit.SECONDS)) {
+ List<CompactionInfo> running =
+
org.apache.accumulo.server.compaction.FileCompactor.getRunningCompactions();
+ if (!running.isEmpty()) {
+ // Compaction has started. There should only be one in the list
+ CompactionInfo info = running.get(0);
+ if (info != null) {
+ final long entriesRead = info.getEntriesRead();
+ final long entriesWritten = info.getEntriesWritten();
+ if (inputEntries > 0) {
+ percentComplete = Float.toString((entriesRead / (float)
inputEntries) * 100);
+ }
+ String message = String.format(
+ "Compaction in progress, read %d of %d input entries ( %s
%s ), written %d entries",
+ entriesRead, inputEntries, percentComplete, "%",
entriesWritten);
+ watcher.run();
try {
- cancel(job.getExternalCompactionId());
- } catch (TException e1) {
- LOG.error("Error cancelling compaction.", e1);
+ LOG.debug("Updating coordinator with compaction progress:
{}.", message);
+ TCompactionStatusUpdate update = new TCompactionStatusUpdate(
+ TCompactionState.IN_PROGRESS, message, inputEntries,
entriesRead,
+ entriesWritten, fcr.getCompactionAge().toNanos());
+ updateCompactionState(job, update);
+ } catch (RetriesExceededException e) {
+ LOG.warn("Error updating coordinator with compaction
progress, error: {}",
+ e.getMessage());
}
- } finally {
- currentCompactionId.set(null);
}
+ } else {
+ LOG.debug("Waiting on compaction thread to finish, but no
RUNNING compaction");
}
- } catch (RuntimeException e1) {
- LOG.error(
- "Compactor thread was interrupted waiting for compaction to
start, cancelling job",
- e1);
+ }
+ compactionThread.join();
+ LOG.trace("Compaction thread finished.");
+ // Run the watcher again to clear out the finished compaction and
set the
+ // stuck count to zero.
+ watcher.run();
+
+ if (err.get() != null) {
+ // maybe the error occured because the table was deleted or
something like that, so
+ // force a cancel check to possibly reduce noise in the logs
+ checkIfCanceled();
+ }
+
+ if (compactionThread.isInterrupted() || JOB_HOLDER.isCancelled()
+ || (err.get() != null &&
err.get().getClass().equals(InterruptedException.class))) {
+ LOG.warn("Compaction thread was interrupted, sending CANCELLED
state");
try {
- cancel(job.getExternalCompactionId());
- } catch (TException e2) {
- LOG.error("Error cancelling compaction.", e2);
+ TCompactionStatusUpdate update =
+ new TCompactionStatusUpdate(TCompactionState.CANCELLED,
"Compaction cancelled",
+ -1, -1, -1, fcr.getCompactionAge().toNanos());
+ updateCompactionState(job, update);
+ updateCompactionFailed(job,
InterruptedException.class.getName());
+ cancelled.incrementAndGet();
+ } catch (RetriesExceededException e) {
+ LOG.error("Error updating coordinator with compaction
cancellation.", e);
+ } finally {
+ currentCompactionId.set(null);
}
- } finally {
- currentCompactionId.set(null);
+ } else if (err.get() != null) {
+ final KeyExtent fromThriftExtent =
KeyExtent.fromThrift(job.getExtent());
+ try {
+ LOG.info("Updating coordinator with compaction failure: id: {},
extent: {}",
+ job.getExternalCompactionId(), fromThriftExtent);
+ TCompactionStatusUpdate update = new
TCompactionStatusUpdate(TCompactionState.FAILED,
+ "Compaction failed due to: " + err.get().getMessage(), -1,
-1, -1,
+ fcr.getCompactionAge().toNanos());
+ updateCompactionState(job, update);
+ updateCompactionFailed(job, err.get().getClass().getName());
+ failed.incrementAndGet();
+ errorHistory.addError(fromThriftExtent.tableId(), err.get());
+ } catch (RetriesExceededException e) {
+ LOG.error("Error updating coordinator with compaction failure:
id: {}, extent: {}",
+ job.getExternalCompactionId(), fromThriftExtent, e);
+ } finally {
+ currentCompactionId.set(null);
+ }
+ } else {
+ try {
+ LOG.trace("Updating coordinator with compaction completion.");
+ updateCompactionCompleted(job, JOB_HOLDER.getStats());
+ completed.incrementAndGet();
+ // job completed successfully, clear the error history
+ errorHistory.clear();
+ } catch (RetriesExceededException e) {
+ LOG.error(
+ "Error updating coordinator with compaction completion,
cancelling compaction.",
+ e);
+ try {
+ cancel(job.getExternalCompactionId());
+ } catch (TException e1) {
+ LOG.error("Error cancelling compaction.", e1);
+ }
+ } finally {
+ currentCompactionId.set(null);
+ }
+ }
+ } catch (RuntimeException e1) {
+ LOG.error(
+ "Compactor thread was interrupted waiting for compaction to
start, cancelling job",
+ e1);
+ try {
+ cancel(job.getExternalCompactionId());
+ } catch (TException e2) {
+ LOG.error("Error cancelling compaction.", e2);
+ }
+ } finally {
+ currentCompactionId.set(null);
- // mark compactor as idle after compaction completes
- updateIdleStatus(true);
+ // mark compactor as idle after compaction completes
+ updateIdleStatus(true);
- // In the case where there is an error in the foreground code the
background compaction
- // may still be running. Must cancel it before starting another
iteration of the loop to
- // avoid multiple threads updating shared state.
- while (compactionThread.isAlive()) {
- compactionThread.interrupt();
- compactionThread.join(1000);
- }
+ // In the case where there is an error in the foreground code the
background compaction
+ // may still be running. Must cancel it before starting another
iteration of the loop to
+ // avoid multiple threads updating shared state.
+ while (compactionThread.isAlive()) {
+ compactionThread.interrupt();
+ compactionThread.join(1000);
}
- } catch (InterruptedException e) {
- LOG.info("Interrupt Exception received, shutting down");
- gracefulShutdown(getContext().rpcCreds());
}
- } // end while
- } catch (Exception e) {
- LOG.error("Unhandled error occurred in Compactor", e);
- } finally {
- // Shutdown local thrift server
- LOG.debug("Stopping Thrift Servers");
- if (getThriftServer() != null) {
- getThriftServer().stop();
+ } catch (InterruptedException e) {
+ LOG.info("Interrupt Exception received, shutting down");
+ gracefulShutdown(getContext().rpcCreds());
}
+ } // end while, shutdown requested
- try {
- LOG.debug("Closing filesystems");
- VolumeManager mgr = getContext().getVolumeManager();
- if (null != mgr) {
- mgr.close();
- }
- } catch (IOException e) {
- LOG.warn("Failed to close filesystem : {}", e.getMessage(), e);
- }
+ // Shutdown local thrift server
+ LOG.debug("Stopping Thrift Servers");
+ if (getThriftServer() != null) {
+ getThriftServer().stop();
+ }
- getContext().getLowMemoryDetector().logGCInfo(getConfiguration());
- super.close();
- getShutdownComplete().set(true);
- LOG.info("stop requested. exiting ... ");
- try {
- if (null != compactorLock) {
- compactorLock.unlock();
- }
- } catch (Exception e) {
- LOG.warn("Failed to release compactor lock", e);
+ try {
+ LOG.debug("Closing filesystems");
+ VolumeManager mgr = getContext().getVolumeManager();
+ if (null != mgr) {
+ mgr.close();
}
+ } catch (IOException e) {
+ LOG.warn("Failed to close filesystem : {}", e.getMessage(), e);
}
-
}
public static void main(String[] args) throws Exception {
diff --git
a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
index 25383876c5..5b09324d4a 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
@@ -93,7 +93,7 @@ public class SimpleGarbageCollector extends AbstractServer
implements Iface {
private final Timer lastCompactorCheck = Timer.startNew();
- SimpleGarbageCollector(ConfigOpts opts, String[] args) {
+ protected SimpleGarbageCollector(ConfigOpts opts, String[] args) {
super(ServerId.Type.GARBAGE_COLLECTOR, opts, ServerContext::new, args);
final AccumuloConfiguration conf = getConfiguration();
@@ -352,15 +352,6 @@ public class SimpleGarbageCollector extends AbstractServer
implements Iface {
gracefulShutdown(getContext().rpcCreds());
}
}
- super.close();
- getShutdownComplete().set(true);
- log.info("stop requested. exiting ... ");
- try {
- gcLock.unlock();
- } catch (Exception e) {
- log.warn("Failed to release GarbageCollector lock", e);
- }
-
}
private void incrementStatsForRun(GCRun gcRun) {
@@ -370,7 +361,8 @@ public class SimpleGarbageCollector extends AbstractServer
implements Iface {
status.current.errors += gcRun.getErrorsStat();
}
- private void logStats() {
+ // public for ExitCodesIT
+ public void logStats() {
log.info("Number of data file candidates for deletion: {}",
status.current.candidates);
log.info("Number of data file candidates still in use: {}",
status.current.inUse);
log.info("Number of successfully deleted data files: {}",
status.current.deleted);
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
index 7805cc85ab..cb50ebae84 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
@@ -1206,7 +1206,7 @@ public class Manager extends AbstractServer implements
LiveTServerSet.Listener {
break;
}
try {
- Thread.sleep(500);
+ mainWait();
} catch (InterruptedException e) {
log.info("Interrupt Exception received, shutting down");
gracefulShutdown(context.rpcCreds());
@@ -1257,14 +1257,11 @@ public class Manager extends AbstractServer implements
LiveTServerSet.Listener {
throw new IllegalStateException("Exception waiting on watcher", e);
}
}
- super.close();
- getShutdownComplete().set(true);
- log.info("stop requested. exiting ... ");
- try {
- managerLock.unlock();
- } catch (Exception e) {
- log.warn("Failed to release Manager lock", e);
- }
+ }
+
+ // method exists for ExitCodesIT
+ public void mainWait() throws InterruptedException {
+ Thread.sleep(500);
}
protected Fate<Manager> initializeFateInstance(ServerContext context,
FateStore<Manager> store) {
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java
b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java
index 0658262f56..975ac8f245 100644
---
a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java
@@ -478,7 +478,7 @@ abstract class TabletGroupWatcher extends
AccumuloDaemonThread {
Set<TServerInstance> filteredServersToShutdown =
new HashSet<>(tableMgmtParams.getServersToShutdown());
- while (iter.hasNext()) {
+ while (iter.hasNext() && !manager.isShutdownRequested()) {
final TabletManagement mti = iter.next();
if (mti == null) {
throw new IllegalStateException("State store returned a null
ManagerTabletInfo object");
@@ -728,7 +728,7 @@ abstract class TabletGroupWatcher extends
AccumuloDaemonThread {
int[] oldCounts = new int[TabletState.values().length];
boolean lookForTabletsNeedingVolReplacement = true;
- while (manager.stillManager()) {
+ while (manager.stillManager() && !manager.isShutdownRequested()) {
if (!eventHandler.isNeedsFullScan()) {
// If an event handled by the EventHandler.RangeProcessor indicated
// that we need to do a full scan, then do it. Otherwise wait a bit
diff --git
a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
index a5562e44bf..f218f8f71f 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
@@ -400,67 +400,51 @@ public class ScanServer extends AbstractServer
"Log sorting for tablet recovery is disabled,
SSERV_WAL_SORT_MAX_CONCURRENT is less than 1.");
}
- try {
- while (!isShutdownRequested()) {
- if (Thread.currentThread().isInterrupted()) {
- LOG.info("Server process thread has been interrupted, shutting
down");
- break;
- }
- try {
- Thread.sleep(1000);
- updateIdleStatus(sessionManager.getActiveScans().isEmpty()
- && tabletMetadataCache.estimatedSize() == 0);
- } catch (InterruptedException e) {
- LOG.info("Interrupt Exception received, shutting down");
- gracefulShutdown(getContext().rpcCreds());
- }
- }
- } finally {
- // Wait for scans to got to zero
- while (!sessionManager.getActiveScans().isEmpty()) {
- LOG.debug("Waiting on {} active scans to complete.",
- sessionManager.getActiveScans().size());
- UtilWaitThread.sleep(1000);
+ while (!isShutdownRequested()) {
+ if (Thread.currentThread().isInterrupted()) {
+ LOG.info("Server process thread has been interrupted, shutting down");
+ break;
}
-
- LOG.debug("Stopping Thrift Servers");
- getThriftServer().stop();
-
try {
- LOG.info("Removing server scan references");
-
this.getContext().getAmple().scanServerRefs().delete(getAdvertiseAddress().toString(),
- serverLockUUID);
- } catch (Exception e) {
- LOG.warn("Failed to remove scan server refs from metadata location",
e);
+ Thread.sleep(1000);
+ updateIdleStatus(
+ sessionManager.getActiveScans().isEmpty() &&
tabletMetadataCache.estimatedSize() == 0);
+ } catch (InterruptedException e) {
+ LOG.info("Interrupt Exception received, shutting down");
+ gracefulShutdown(getContext().rpcCreds());
}
+ }
- try {
- LOG.debug("Closing filesystems");
- VolumeManager mgr = getContext().getVolumeManager();
- if (null != mgr) {
- mgr.close();
- }
- } catch (IOException e) {
- LOG.warn("Failed to close filesystem : {}", e.getMessage(), e);
- }
+ // Wait for scans to get to zero
+ while (!sessionManager.getActiveScans().isEmpty()) {
+ LOG.debug("Waiting on {} active scans to complete.",
sessionManager.getActiveScans().size());
+ UtilWaitThread.sleep(1000);
+ }
- if (tmCacheExecutor != null) {
- LOG.debug("Shutting down TabletMetadataCache executor");
- tmCacheExecutor.shutdownNow();
- }
+ LOG.debug("Stopping Thrift Servers");
+ getThriftServer().stop();
- context.getLowMemoryDetector().logGCInfo(getConfiguration());
- super.close();
- getShutdownComplete().set(true);
- LOG.info("stop requested. exiting ... ");
- try {
- if (null != lock) {
- lock.unlock();
- }
- } catch (Exception e) {
- LOG.warn("Failed to release scan server lock", e);
+ try {
+ LOG.info("Removing server scan references");
+
this.getContext().getAmple().scanServerRefs().delete(getAdvertiseAddress().toString(),
+ serverLockUUID);
+ } catch (Exception e) {
+ LOG.warn("Failed to remove scan server refs from metadata location", e);
+ }
+
+ try {
+ LOG.debug("Closing filesystems");
+ VolumeManager mgr = getContext().getVolumeManager();
+ if (null != mgr) {
+ mgr.close();
}
+ } catch (IOException e) {
+ LOG.warn("Failed to close filesystem : {}", e.getMessage(), e);
+ }
+ if (tmCacheExecutor != null) {
+ LOG.debug("Shutting down TabletMetadataCache executor");
+ tmCacheExecutor.shutdownNow();
}
}
diff --git
a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index bee188443e..1861389feb 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -671,7 +671,7 @@ public class TabletServer extends AbstractServer implements
TabletHostingServer
try {
// Ask the manager to unload our tablets and stop loading new tablets
if (iface == null) {
- Halt.halt(-1, "Error informing Manager that we are shutting down,
exiting!");
+ Halt.halt(1, "Error informing Manager that we are shutting down,
exiting!");
} else {
iface.tabletServerStopping(TraceUtil.traceInfo(),
getContext().rpcCreds(),
getTabletSession().getHostPortSession(),
getResourceGroup().canonical());
@@ -690,7 +690,7 @@ public class TabletServer extends AbstractServer implements
TabletHostingServer
sendManagerMessages(managerDown, iface, advertiseAddressString);
} catch (TException | RuntimeException e) {
- Halt.halt(-1, "Error informing Manager that we are shutting down,
exiting!", e);
+ Halt.halt(1, "Error informing Manager that we are shutting down,
exiting!", e);
} finally {
returnManagerConnection(iface);
}
@@ -706,16 +706,6 @@ public class TabletServer extends AbstractServer
implements TabletHostingServer
} catch (IOException e) {
log.warn("Failed to close filesystem : {}", e.getMessage(), e);
}
-
- context.getLowMemoryDetector().logGCInfo(getConfiguration());
- super.close();
- getShutdownComplete().set(true);
- log.info("TServerInfo: stop requested. exiting ... ");
- try {
- tabletServerLock.unlock();
- } catch (Exception e) {
- log.warn("Failed to release tablet server lock", e);
- }
}
private boolean sendManagerMessages(boolean managerDown,
ManagerClientService.Client iface,
diff --git
a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
index 1ae26c5a05..559cc8a487 100644
---
a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
+++
b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
@@ -455,7 +455,7 @@ public class TabletServerLogger {
if (sawWriteFailure != null) {
log.info("WAL write failure, validating server lock in ZooKeeper",
sawWriteFailure);
if (tabletServerLock == null ||
!tabletServerLock.verifyLockAtSource()) {
- Halt.halt(-1, "Writing to WAL has failed and TabletServer lock
does not exist",
+ Halt.halt(1, "Writing to WAL has failed and TabletServer lock does
not exist",
sawWriteFailure);
}
}
diff --git
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/MinorCompactor.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/MinorCompactor.java
index e091ecb401..113aa1928d 100644
---
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/MinorCompactor.java
+++
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/MinorCompactor.java
@@ -99,7 +99,7 @@ public class MinorCompactor extends FileCompactor {
if (tserverLock == null || !tserverLock.verifyLockAtSource()) {
log.error("Minor compaction of {} has failed and TabletServer
lock does not exist."
+ " Halting...", getExtent(), e);
- Halt.halt(-1, "TabletServer lock does not exist", e);
+ Halt.halt(1, "TabletServer lock does not exist", e);
} else {
throw e;
}
diff --git
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
index c2170a42d4..1c96bddbe7 100644
---
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
+++
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
@@ -414,7 +414,7 @@ public class Tablet extends TabletBase {
if (tserverLock == null || !tserverLock.verifyLockAtSource()) {
log.error("Minor compaction of {} has failed and TabletServer lock
does not exist."
+ " Halting...", getExtent(), e);
- Halt.halt(-1, "TabletServer lock does not exist", e);
+ Halt.halt(1, "TabletServer lock does not exist", e);
} else {
TraceUtil.setException(span2, e, true);
throw e;
diff --git
a/test/src/main/java/org/apache/accumulo/test/functional/ExitCodesIT.java
b/test/src/main/java/org/apache/accumulo/test/functional/ExitCodesIT.java
new file mode 100644
index 0000000000..e37ef29975
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/functional/ExitCodesIT.java
@@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import static org.apache.accumulo.harness.AccumuloITBase.MINI_CLUSTER_ONLY;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Stream;
+
+import org.apache.accumulo.compactor.Compactor;
+import org.apache.accumulo.core.cli.ConfigOpts;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.gc.SimpleGarbageCollector;
+import org.apache.accumulo.harness.SharedMiniClusterBase;
+import org.apache.accumulo.manager.Manager;
+import org.apache.accumulo.minicluster.ServerType;
+import org.apache.accumulo.miniclusterImpl.MiniAccumuloClusterImpl.ProcessInfo;
+import org.apache.accumulo.server.AbstractServer;
+import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.test.util.Wait;
+import org.apache.accumulo.tserver.ScanServer;
+import org.apache.accumulo.tserver.TabletServer;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.EnumSource;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.slf4j.LoggerFactory;
+
+@Tag(MINI_CLUSTER_ONLY)
+public class ExitCodesIT extends SharedMiniClusterBase {
+
+ public static enum TerminalBehavior {
+ SHUTDOWN, EXCEPTION, ERROR
+ };
+
+ public static final String PROXY_METHOD_BEHAVIOR = "proxy.method.behavior";
+
+ public static TerminalBehavior getTerminalBehavior() {
+ final String methodBehavior = System.getProperty(PROXY_METHOD_BEHAVIOR);
+ Objects.requireNonNull(methodBehavior, methodBehavior + " must exist as an
env var");
+ return TerminalBehavior.valueOf(methodBehavior);
+ }
+
+ public static class ExitCompactor extends Compactor {
+
+ public static void main(String[] args) throws Exception {
+ List<String> compactorArgs = new ArrayList<>();
+ compactorArgs.add("-o");
+ compactorArgs.add(Property.COMPACTOR_GROUP_NAME.getKey() + "=TEST");
+ AbstractServer.startServer(new ExitCompactor(getTerminalBehavior(),
compactorArgs),
+ LoggerFactory.getLogger(ExitCompactor.class));
+ }
+
+ private final TerminalBehavior behavior;
+
+ public ExitCompactor(TerminalBehavior behavior, List<String>
compactorArgs) {
+ super(new ConfigOpts(), compactorArgs.toArray(new String[] {}));
+ this.behavior = behavior;
+ }
+
+ @Override
+ public void updateIdleStatus(boolean isIdle) {
+ switch (behavior) {
+ case ERROR:
+ throw new StackOverflowError("throwing stack overflow error");
+ case EXCEPTION:
+ throw new RuntimeException("throwing runtime exception");
+ case SHUTDOWN:
+ requestShutdownForTests();
+ break;
+ default:
+ throw new UnsupportedOperationException(behavior + " is not
currently supported");
+ }
+ }
+ }
+
+ public static class ExitScanServer extends ScanServer {
+
+ public static void main(String[] args) throws Exception {
+ List<String> sserverArgs = new ArrayList<>();
+ sserverArgs.add("-o");
+ sserverArgs.add(Property.SSERV_GROUP_NAME.getKey() + "=TEST");
+ AbstractServer.startServer(new ExitScanServer(getTerminalBehavior(),
sserverArgs),
+ LoggerFactory.getLogger(ExitScanServer.class));
+ }
+
+ private final TerminalBehavior behavior;
+
+ public ExitScanServer(TerminalBehavior behavior, List<String> sserverArgs)
{
+ super(new ConfigOpts(), sserverArgs.toArray(new String[] {}));
+ this.behavior = behavior;
+ }
+
+ @Override
+ public void updateIdleStatus(boolean isIdle) {
+ switch (behavior) {
+ case ERROR:
+ throw new StackOverflowError("throwing stack overflow error");
+ case EXCEPTION:
+ throw new RuntimeException("throwing runtime exception");
+ case SHUTDOWN:
+ requestShutdownForTests();
+ break;
+ default:
+ throw new UnsupportedOperationException(behavior + " is not
currently supported");
+ }
+ }
+ }
+
+ public static class ExitTabletServer extends TabletServer {
+
+ public static void main(String[] args) throws Exception {
+ List<String> tserverArgs = new ArrayList<>();
+ tserverArgs.add("-o");
+ tserverArgs.add(Property.TSERV_GROUP_NAME.getKey() + "=TEST");
+ AbstractServer.startServer(new ExitTabletServer(getTerminalBehavior(),
tserverArgs),
+ LoggerFactory.getLogger(ExitTabletServer.class));
+ }
+
+ private final TerminalBehavior behavior;
+
+ public ExitTabletServer(TerminalBehavior behavior, List<String>
tserverArgs) {
+ super(new ConfigOpts(), ServerContext::new, tserverArgs.toArray(new
String[] {}));
+ this.behavior = behavior;
+ }
+
+ @Override
+ public void updateIdleStatus(boolean isIdle) {
+ switch (behavior) {
+ case ERROR:
+ throw new StackOverflowError("throwing stack overflow error");
+ case EXCEPTION:
+ throw new RuntimeException("throwing runtime exception");
+ case SHUTDOWN:
+ requestShutdownForTests();
+ break;
+ default:
+ throw new UnsupportedOperationException(behavior + " is not
currently supported");
+ }
+ }
+ }
+
+ public static class ExitGC extends SimpleGarbageCollector {
+
+ public static void main(String[] args) throws Exception {
+ AbstractServer.startServer(new ExitGC(getTerminalBehavior()),
+ LoggerFactory.getLogger(ExitGC.class));
+ }
+
+ private final TerminalBehavior behavior;
+
+ public ExitGC(TerminalBehavior behavior) {
+ super(new ConfigOpts(), new String[] {});
+ this.behavior = behavior;
+ }
+
+ @Override
+ public void logStats() {
+ switch (behavior) {
+ case ERROR:
+ throw new StackOverflowError("throwing stack overflow error");
+ case EXCEPTION:
+ throw new RuntimeException("throwing runtime exception");
+ case SHUTDOWN:
+ requestShutdownForTests();
+ break;
+ default:
+ throw new UnsupportedOperationException(behavior + " is not
currently supported");
+ }
+ }
+ }
+
+ public static class ExitManager extends Manager {
+
+ public static void main(String[] args) throws Exception {
+ AbstractServer.startServer(new ExitManager(getTerminalBehavior()),
+ LoggerFactory.getLogger(ExitManager.class));
+ }
+
+ private final TerminalBehavior behavior;
+
+ public ExitManager(TerminalBehavior behavior) throws IOException {
+ super(new ConfigOpts(), ServerContext::new, new String[] {});
+ this.behavior = behavior;
+ }
+
+ @Override
+ public void mainWait() throws InterruptedException {
+ switch (behavior) {
+ case ERROR:
+ throw new StackOverflowError("throwing stack overflow error");
+ case EXCEPTION:
+ throw new RuntimeException("throwing runtime exception");
+ case SHUTDOWN:
+ requestShutdownForTests();
+ break;
+ default:
+ throw new UnsupportedOperationException(behavior + " is not
currently supported");
+ }
+ }
+ }
+
+ @BeforeAll
+ public static void beforeTests() throws Exception {
+ // Start MiniCluster so that getCluster() does not
+ // return null,
+ SharedMiniClusterBase.startMiniCluster();
+ getCluster().getClusterControl().stop(ServerType.GARBAGE_COLLECTOR);
+ }
+
+ @AfterAll
+ public static void afterTests() {
+ SharedMiniClusterBase.stopMiniCluster();
+ }
+
+ // Junit doesn't support more than one parameterized
+ // argument to a test method. We need to generate
+ // the arguments here.
+ static Stream<Arguments> generateWorkerProcessArguments() {
+ List<Arguments> args = new ArrayList<>();
+ for (ServerType st : ServerType.values()) {
+ if (st == ServerType.COMPACTOR || st == ServerType.SCAN_SERVER
+ || st == ServerType.TABLET_SERVER) {
+ for (TerminalBehavior tb : TerminalBehavior.values()) {
+ args.add(Arguments.of(st, tb));
+ }
+ }
+ }
+ return args.stream();
+ }
+
+ @ParameterizedTest
+ @MethodSource("generateWorkerProcessArguments")
+ public void testWorkerProcesses(ServerType server, TerminalBehavior
behavior) throws Exception {
+ Map<String,String> properties = new HashMap<>();
+ properties.put(PROXY_METHOD_BEHAVIOR, behavior.name());
+ getCluster().getConfig().setSystemProperties(properties);
+ Class<?> serverClass = null;
+ switch (server) {
+ case COMPACTOR:
+ serverClass = ExitCompactor.class;
+ break;
+ case SCAN_SERVER:
+ serverClass = ExitScanServer.class;
+ break;
+ case TABLET_SERVER:
+ serverClass = ExitTabletServer.class;
+ break;
+ case GARBAGE_COLLECTOR:
+ case MANAGER:
+ case MONITOR:
+ case ZOOKEEPER:
+ default:
+ throw new IllegalArgumentException("Unhandled type");
+ }
+ ProcessInfo pi = getCluster()._exec(serverClass, server, Map.of(), new
String[] {});
+ Wait.waitFor(() -> !pi.getProcess().isAlive(), 120_000);
+ int exitValue = pi.getProcess().exitValue();
+ assertEquals(behavior == TerminalBehavior.SHUTDOWN ? 0 : 1, exitValue);
+ }
+
+ @ParameterizedTest
+ @EnumSource
+ public void testGarbageCollector(TerminalBehavior behavior) throws Exception
{
+ Map<String,String> properties = new HashMap<>();
+ properties.put(PROXY_METHOD_BEHAVIOR, behavior.name());
+ getCluster().getConfig().setSystemProperties(properties);
+ ProcessInfo pi =
+ getCluster()._exec(ExitGC.class, ServerType.GARBAGE_COLLECTOR,
Map.of(), new String[] {});
+ if (behavior == TerminalBehavior.SHUTDOWN) {
+ Wait.waitFor(() -> !pi.getProcess().isAlive(), 120_000);
+ int exitValue = pi.getProcess().exitValue();
+ assertEquals(0, exitValue);
+ } else if (behavior == TerminalBehavior.EXCEPTION) {
+ // GarbageCollector logs exceptions and keeps going.
+ // We need to let this time out and then
+ // terminate the process.
+ IllegalStateException ise = assertThrows(IllegalStateException.class,
+ () -> Wait.waitFor(() -> !pi.getProcess().isAlive(), 60_000));
+ assertTrue(ise.getMessage().contains("Timeout exceeded"));
+ pi.getProcess().destroyForcibly();
+ } else {
+ Wait.waitFor(() -> !pi.getProcess().isAlive(), 120_000);
+ int exitValue = pi.getProcess().exitValue();
+ assertEquals(1, exitValue);
+ }
+ }
+
+ @ParameterizedTest
+ @EnumSource
+ public void testManager(TerminalBehavior behavior) throws Exception {
+ try {
+ getCluster().getClusterControl().stop(ServerType.MANAGER);
+ Map<String,String> properties = new HashMap<>();
+ properties.put(PROXY_METHOD_BEHAVIOR, behavior.name());
+ getCluster().getConfig().setSystemProperties(properties);
+ ProcessInfo pi =
+ getCluster()._exec(ExitManager.class, ServerType.MANAGER, Map.of(),
new String[] {});
+ Wait.waitFor(() -> !pi.getProcess().isAlive(), 120_000);
+ int exitValue = pi.getProcess().exitValue();
+ assertEquals(behavior == TerminalBehavior.SHUTDOWN ? 0 : 1, exitValue);
+ } finally {
+ getCluster().getClusterControl().stop(ServerType.MANAGER);
+ }
+ }
+
+}