This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push: new 5233e84 Exception and cancellation changes in Compactor process (#2372) 5233e84 is described below commit 5233e849c12bc0fe4887a461ef148747721826da Author: Dave Marion <dlmar...@apache.org> AuthorDate: Fri Dec 10 07:03:54 2021 -0500 Exception and cancellation changes in Compactor process (#2372) The Thread in the Compactor that runs the FileCompactor was throwing an exception as part of the error handling routine. It's not necessary for that to occur to convey the exception information to the main thread, the exception information was already being conveyed. This will reduce the error logging in the Compactor log regarding the uncaught exception being thrown and handled by the AccumuloUncaughtExceptionHandler. Another background thread checks every 5 seconds to determine whether or not the current compaction should be canceled. Call this method when the compaction completes in case something happens that would cause the compaction to be canceled and it has not been noticed yet. Closes #2350 Co-authored-by: Keith Turner <ktur...@apache.org> --- .../src/main/java/org/apache/accumulo/compactor/Compactor.java | 7 ++++++- .../src/test/java/org/apache/accumulo/compactor/CompactorTest.java | 6 ++++++ .../accumulo/test/compaction/ExternalDoNothingCompactor.java | 4 +++- 3 files changed, 15 insertions(+), 2 deletions(-) diff --git a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java index 6565280..39c05fd 100644 --- a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java +++ b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java @@ -566,7 +566,6 @@ public class Compactor extends AbstractServer implements CompactorService.Iface } catch (Exception e) { LOG.error("Compaction failed", e); err.set(e); - throw new RuntimeException("Compaction failed", e); } finally { stopped.countDown(); Preconditions.checkState(compactionRunning.compareAndSet(true, false)); @@ -712,6 +711,12 @@ public class Compactor extends AbstractServer implements CompactorService.Iface compactionThread.join(); LOG.trace("Compaction thread finished."); + if (err.get() != null) { + // maybe the error occured because the table was deleted or something like that, so + // force a cancel check to possibly reduce noise in the logs + checkIfCanceled(); + } + if (compactionThread.isInterrupted() || JOB_HOLDER.isCancelled() || ((err.get() != null && err.get().getClass().equals(InterruptedException.class)))) { LOG.warn("Compaction thread was interrupted, sending CANCELLED state"); diff --git a/server/compactor/src/test/java/org/apache/accumulo/compactor/CompactorTest.java b/server/compactor/src/test/java/org/apache/accumulo/compactor/CompactorTest.java index 53404e2..f1132d2 100644 --- a/server/compactor/src/test/java/org/apache/accumulo/compactor/CompactorTest.java +++ b/server/compactor/src/test/java/org/apache/accumulo/compactor/CompactorTest.java @@ -220,6 +220,9 @@ public class CompactorTest { } @Override + protected synchronized void checkIfCanceled() {} + + @Override protected Runnable createCompactionJob(TExternalCompactionJob job, LongAdder totalInputEntries, LongAdder totalInputBytes, CountDownLatch started, CountDownLatch stopped, AtomicReference<Throwable> err) { @@ -337,6 +340,7 @@ public class CompactorTest { EasyMock.expect(conf.getTimeInMillis(Property.INSTANCE_ZK_TIMEOUT)).andReturn(86400000L); ServerContext context = PowerMock.createNiceMock(ServerContext.class); + EasyMock.expect(context.getConfiguration()).andReturn(conf); ZooReaderWriter zrw = PowerMock.createNiceMock(ZooReaderWriter.class); ZooKeeper zk = PowerMock.createNiceMock(ZooKeeper.class); EasyMock.expect(context.getZooReaderWriter()).andReturn(zrw).anyTimes(); @@ -390,6 +394,7 @@ public class CompactorTest { EasyMock.expect(conf.getTimeInMillis(Property.INSTANCE_ZK_TIMEOUT)).andReturn(86400000L); ServerContext context = PowerMock.createNiceMock(ServerContext.class); + EasyMock.expect(context.getConfiguration()).andReturn(conf); ZooReaderWriter zrw = PowerMock.createNiceMock(ZooReaderWriter.class); ZooKeeper zk = PowerMock.createNiceMock(ZooKeeper.class); EasyMock.expect(context.getZooReaderWriter()).andReturn(zrw).anyTimes(); @@ -443,6 +448,7 @@ public class CompactorTest { EasyMock.expect(conf.getTimeInMillis(Property.INSTANCE_ZK_TIMEOUT)).andReturn(86400000L); ServerContext context = PowerMock.createNiceMock(ServerContext.class); + EasyMock.expect(context.getConfiguration()).andReturn(conf); ZooReaderWriter zrw = PowerMock.createNiceMock(ZooReaderWriter.class); ZooKeeper zk = PowerMock.createNiceMock(ZooKeeper.class); EasyMock.expect(context.getZooReaderWriter()).andReturn(zrw).anyTimes(); diff --git a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalDoNothingCompactor.java b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalDoNothingCompactor.java index 5a43ea6..87b5c9f 100644 --- a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalDoNothingCompactor.java +++ b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalDoNothingCompactor.java @@ -53,6 +53,9 @@ public class ExternalDoNothingCompactor extends Compactor implements Iface { LongAdder totalInputBytes, CountDownLatch started, CountDownLatch stopped, AtomicReference<Throwable> err) { + // Set this to true so that only 1 external compaction is run + this.shutdown = true; + return new Runnable() { @Override public void run() { @@ -76,7 +79,6 @@ public class ExternalDoNothingCompactor extends Compactor implements Iface { } catch (Exception e) { LOG.error("Compaction failed", e); err.set(e); - throw new RuntimeException("Compaction failed", e); } finally { stopped.countDown(); }