CAMEL-9153: ThreadPoolRejectedPolicy does not implement Abort as expected. Thanks to Michael Riedel for the patch.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/80b0d0b4 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/80b0d0b4 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/80b0d0b4 Branch: refs/heads/master Commit: 80b0d0b44515b50088ba486fcd83e353eba9d5fa Parents: 04f3d11 Author: Claus Ibsen <davscl...@apache.org> Authored: Sat Feb 20 09:57:57 2016 +0100 Committer: Claus Ibsen <davscl...@apache.org> Committed: Sat Feb 20 09:57:57 2016 +0100 ---------------------------------------------------------------------- .../apache/camel/ThreadPoolRejectedPolicy.java | 5 +- .../RejectableScheduledThreadPoolExecutor.java | 14 +- .../RejectableThreadPoolExecutor.java | 14 +- .../camel/ThreadPoolRejectedPolicyTest.java | 275 +++++++++++++++++++ 4 files changed, 303 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/80b0d0b4/camel-core/src/main/java/org/apache/camel/ThreadPoolRejectedPolicy.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/ThreadPoolRejectedPolicy.java b/camel-core/src/main/java/org/apache/camel/ThreadPoolRejectedPolicy.java index 623a3ed..11f3b59 100644 --- a/camel-core/src/main/java/org/apache/camel/ThreadPoolRejectedPolicy.java +++ b/camel-core/src/main/java/org/apache/camel/ThreadPoolRejectedPolicy.java @@ -16,6 +16,7 @@ */ package org.apache.camel; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadPoolExecutor; import javax.xml.bind.annotation.XmlEnum; @@ -42,7 +43,9 @@ public enum ThreadPoolRejectedPolicy { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { if (r instanceof Rejectable) { - ((Rejectable) r).reject(); + ((Rejectable)r).reject(); + } else { + throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + executor.toString()); } } http://git-wip-us.apache.org/repos/asf/camel/blob/80b0d0b4/camel-core/src/main/java/org/apache/camel/util/concurrent/RejectableScheduledThreadPoolExecutor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/util/concurrent/RejectableScheduledThreadPoolExecutor.java b/camel-core/src/main/java/org/apache/camel/util/concurrent/RejectableScheduledThreadPoolExecutor.java index 3f61869..7a205e6 100644 --- a/camel-core/src/main/java/org/apache/camel/util/concurrent/RejectableScheduledThreadPoolExecutor.java +++ b/camel-core/src/main/java/org/apache/camel/util/concurrent/RejectableScheduledThreadPoolExecutor.java @@ -22,6 +22,8 @@ import java.util.concurrent.RunnableFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadFactory; +import org.apache.camel.Rejectable; + /** * Scheduled thread pool executor that creates {@link RejectableFutureTask} instead of * {@link java.util.concurrent.FutureTask} when registering new tasks for execution. @@ -70,12 +72,20 @@ public class RejectableScheduledThreadPoolExecutor extends ScheduledThreadPoolEx @Override protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { - return new RejectableFutureTask<T>(runnable, value); + if (runnable instanceof Rejectable) { + return new RejectableFutureTask<T>(runnable, value); + } else { + return super.newTaskFor(runnable, value); + } } @Override protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { - return new RejectableFutureTask<T>(callable); + if (callable instanceof Rejectable) { + return new RejectableFutureTask<T>(callable); + } else { + return super.newTaskFor(callable); + } } @Override http://git-wip-us.apache.org/repos/asf/camel/blob/80b0d0b4/camel-core/src/main/java/org/apache/camel/util/concurrent/RejectableThreadPoolExecutor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/util/concurrent/RejectableThreadPoolExecutor.java b/camel-core/src/main/java/org/apache/camel/util/concurrent/RejectableThreadPoolExecutor.java index 6d67c6b..8962184 100644 --- a/camel-core/src/main/java/org/apache/camel/util/concurrent/RejectableThreadPoolExecutor.java +++ b/camel-core/src/main/java/org/apache/camel/util/concurrent/RejectableThreadPoolExecutor.java @@ -24,6 +24,8 @@ import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import org.apache.camel.Rejectable; + /** * Thread pool executor that creates {@link RejectableFutureTask} instead of * {@link java.util.concurrent.FutureTask} when registering new tasks for execution. @@ -76,12 +78,20 @@ public class RejectableThreadPoolExecutor extends ThreadPoolExecutor { @Override protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { - return new RejectableFutureTask<T>(runnable, value); + if (runnable instanceof Rejectable) { + return new RejectableFutureTask<T>(runnable, value); + } else { + return super.newTaskFor(runnable, value); + } } @Override protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { - return new RejectableFutureTask<T>(callable); + if (callable instanceof Rejectable) { + return new RejectableFutureTask<T>(callable); + } else { + return super.newTaskFor(callable); + } } @Override http://git-wip-us.apache.org/repos/asf/camel/blob/80b0d0b4/camel-core/src/test/java/org/apache/camel/ThreadPoolRejectedPolicyTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/ThreadPoolRejectedPolicyTest.java b/camel-core/src/test/java/org/apache/camel/ThreadPoolRejectedPolicyTest.java new file mode 100644 index 0000000..fb64d02 --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/ThreadPoolRejectedPolicyTest.java @@ -0,0 +1,275 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel; + +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.camel.util.concurrent.RejectableThreadPoolExecutor; + +public class ThreadPoolRejectedPolicyTest extends TestSupport { + + public void testAbortAsRejectedExecutionHandler() throws InterruptedException { + + final ExecutorService executorService = createTestExecutorService(ThreadPoolRejectedPolicy.Abort.asRejectedExecutionHandler()); + + final MockCallable<String> task1 = new MockCallable<String>(); + final Future<?> result1 = executorService.submit(task1); + final MockRunnable task2 = new MockRunnable(); + final Future<?> result2 = executorService.submit(task2); + final MockCallable<String> task3 = new MockCallable<String>(); + try { + executorService.submit(task3); + fail("Third task should have been rejected by a threadpool is full with 1 task and queue is full with 1 task."); + } catch (RejectedExecutionException e) { + } + + shutdownAndAwait(executorService); + + assertInvoked(task1, result1); + assertInvoked(task2, result2); + assertRejected(task3, null); + } + + public void testAbortAsRejectedExecutionHandlerWithRejectableTasks() throws InterruptedException { + + final ExecutorService executorService = createTestExecutorService(ThreadPoolRejectedPolicy.Abort.asRejectedExecutionHandler()); + + final MockRejectableRunnable task1 = new MockRejectableRunnable(); + final Future<?> result1 = executorService.submit(task1); + final MockRejectableCallable<String> task2 = new MockRejectableCallable<String>(); + final Future<?> result2 = executorService.submit(task2); + final MockRejectableRunnable task3 = new MockRejectableRunnable(); + final Future<?> result3 = executorService.submit(task3); + + final MockRejectableCallable<String> task4 = new MockRejectableCallable<String>(); + final Future<?> result4 = executorService.submit(task4); + + shutdownAndAwait(executorService); + + assertInvoked(task1, result1); + assertInvoked(task2, result2); + assertRejected(task3, result3); + assertRejected(task4, result4); + } + + public void testCallerRunsAsRejectedExecutionHandler() throws InterruptedException { + + final ExecutorService executorService = createTestExecutorService(ThreadPoolRejectedPolicy.CallerRuns.asRejectedExecutionHandler()); + + final MockRunnable task1 = new MockRunnable(); + final Future<?> result1 = executorService.submit(task1); + final MockRunnable task2 = new MockRunnable(); + final Future<?> result2 = executorService.submit(task2); + final MockRunnable task3 = new MockRunnable(); + final Future<?> result3 = executorService.submit(task3); + + shutdownAndAwait(executorService); + + assertInvoked(task1, result1); + assertInvoked(task2, result2); + assertInvoked(task3, result3); + } + + public void testCallerRunsAsRejectedExecutionHandlerWithRejectableTasks() throws InterruptedException { + + final ExecutorService executorService = createTestExecutorService(ThreadPoolRejectedPolicy.CallerRuns.asRejectedExecutionHandler()); + + final MockRejectableRunnable task1 = new MockRejectableRunnable(); + final Future<?> result1 = executorService.submit(task1); + final MockRejectableRunnable task2 = new MockRejectableRunnable(); + final Future<?> result2 = executorService.submit(task2); + final MockRejectableRunnable task3 = new MockRejectableRunnable(); + final Future<?> result3 = executorService.submit(task3); + + shutdownAndAwait(executorService); + + assertInvoked(task1, result1); + assertInvoked(task2, result2); + assertInvoked(task3, result3); + } + + public void testDiscardAsRejectedExecutionHandler() throws InterruptedException { + + final ExecutorService executorService = createTestExecutorService(ThreadPoolRejectedPolicy.Discard.asRejectedExecutionHandler()); + + final MockRunnable task1 = new MockRunnable(); + final Future<?> result1 = executorService.submit(task1); + final MockRunnable task2 = new MockRunnable(); + final Future<?> result2 = executorService.submit(task2); + final MockRunnable task3 = new MockRunnable(); + final Future<?> result3 = executorService.submit(task3); + + shutdownAndAwait(executorService); + + assertInvoked(task1, result1); + assertInvoked(task2, result2); + assertRejected(task3, result3); + } + + public void testDiscardAsRejectedExecutionHandlerWithRejectableTasks() throws InterruptedException { + + final ExecutorService executorService = createTestExecutorService(ThreadPoolRejectedPolicy.Discard.asRejectedExecutionHandler()); + + final MockRejectableRunnable task1 = new MockRejectableRunnable(); + final Future<?> result1 = executorService.submit(task1); + final MockRejectableRunnable task2 = new MockRejectableRunnable(); + final Future<?> result2 = executorService.submit(task2); + final MockRejectableRunnable task3 = new MockRejectableRunnable(); + final Future<?> result3 = executorService.submit(task3); + + shutdownAndAwait(executorService); + + assertInvoked(task1, result1); + assertInvoked(task2, result2); + assertRejected(task3, result3); + } + + public void testDiscardOldestAsRejectedExecutionHandler() throws InterruptedException { + + final ExecutorService executorService = createTestExecutorService(ThreadPoolRejectedPolicy.DiscardOldest.asRejectedExecutionHandler()); + + final MockRunnable task1 = new MockRunnable(); + final Future<?> result1 = executorService.submit(task1); + final MockRunnable task2 = new MockRunnable(); + final Future<?> result2 = executorService.submit(task2); + final MockRunnable task3 = new MockRunnable(); + final Future<?> result3 = executorService.submit(task3); + + shutdownAndAwait(executorService); + + assertInvoked(task1, result1); + assertRejected(task2, result2); + assertInvoked(task3, result3); + } + + public void testDiscardOldestAsRejectedExecutionHandlerWithRejectableTasks() throws InterruptedException { + + final ExecutorService executorService = createTestExecutorService(ThreadPoolRejectedPolicy.DiscardOldest.asRejectedExecutionHandler()); + + final MockRejectableRunnable task1 = new MockRejectableRunnable(); + final Future<?> result1 = executorService.submit(task1); + final MockRejectableRunnable task2 = new MockRejectableRunnable(); + final Future<?> result2 = executorService.submit(task2); + final MockRejectableRunnable task3 = new MockRejectableRunnable(); + final Future<?> result3 = executorService.submit(task3); + + shutdownAndAwait(executorService); + + assertInvoked(task1, result1); + assertRejected(task2, result2); + assertInvoked(task3, result3); + } + + private ExecutorService createTestExecutorService(final RejectedExecutionHandler rejectedExecutionHandler) { + return new RejectableThreadPoolExecutor(1, 1, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(1), rejectedExecutionHandler); + } + + private void shutdownAndAwait(final ExecutorService executorService) { + executorService.shutdown(); + try { + assertTrue("Test ExecutorService shutdown is not expected to take longer than 10 seconds.", executorService.awaitTermination(10, TimeUnit.SECONDS)); + } catch (InterruptedException e) { + fail("Test ExecutorService shutdown is not expected to be interrupted."); + } + } + + private void assertInvoked(MockTask task, Future<?> result) { + assertTrue(result.isDone()); + assertEquals(1, task.getInvocationCount()); + if (task instanceof Rejectable) { + assertEquals(0, task.getRejectionCount()); + } + } + + private void assertRejected(MockTask task, Future<?> result) { + if (result != null) { + assertFalse(result.isDone()); + } + assertEquals(0, task.getInvocationCount()); + if (task instanceof Rejectable) { + assertEquals(1, task.getRejectionCount()); + } + } + + private abstract static class MockTask { + private final AtomicInteger invocationCount = new AtomicInteger(); + + private final AtomicInteger rejectionCount = new AtomicInteger(); + + public int getInvocationCount() { + return invocationCount.get(); + } + + protected void countInvocation() { + invocationCount.incrementAndGet(); + } + + public int getRejectionCount() { + return rejectionCount.get(); + } + + protected void countRejection() { + rejectionCount.incrementAndGet(); + } + } + + private static class MockRunnable extends MockTask implements Runnable { + @Override + public void run() { + countInvocation(); + try { + TimeUnit.MILLISECONDS.sleep(100); + } catch (InterruptedException e) { + fail("MockRunnable task is not expected to be interrupted."); + } + } + } + + private static class MockRejectableRunnable extends MockRunnable implements Rejectable { + @Override + public void reject() { + countRejection(); + } + } + + private static class MockCallable<T> extends MockTask implements Callable<T> { + @Override + public T call() throws Exception { + countInvocation(); + try { + TimeUnit.MILLISECONDS.sleep(100); + } catch (InterruptedException e) { + fail("MockCallable task is not expected to be interrupted."); + } + return null; + } + } + + private static class MockRejectableCallable<T> extends MockCallable<T> implements Rejectable { + @Override + public void reject() { + countRejection(); + } + } +}