This is an automated email from the ASF dual-hosted git repository. elecharny pushed a commit to branch 2.1.X in repository https://gitbox.apache.org/repos/asf/mina.git
The following commit(s) were added to refs/heads/2.1.X by this push: new e495790 o Fixed the worker run() method to properly manage the idle count o Added Guss' tests for DIRMINA-1156 e495790 is described below commit e49579069e49d733d8f75bc67b2e16311b619ffe Author: emmanuel lecharny <elecha...@apache.org> AuthorDate: Wed Jan 12 22:03:26 2022 +0100 o Fixed the worker run() method to properly manage the idle count o Added Guss' tests for DIRMINA-1156 --- .../filter/executor/OrderedThreadPoolExecutor.java | 2 - .../executor/PriorityThreadPoolExecutor.java | 44 +- .../executor/UnorderedThreadPoolExecutor.java | 21 +- .../executor/OrderedThreadPoolExecutorTest.java | 166 ++++++++ .../executor/PriorityThreadPoolExecutorTest.java | 143 ++++++- .../executor/UnorderedThreadPoolExecutorTest.java | 166 ++++++++ .../org/apache/mina/handler/DIRMINA1156Test.java | 465 +++++++++++++++++++++ 7 files changed, 971 insertions(+), 36 deletions(-) diff --git a/mina-core/src/main/java/org/apache/mina/filter/executor/OrderedThreadPoolExecutor.java b/mina-core/src/main/java/org/apache/mina/filter/executor/OrderedThreadPoolExecutor.java index c800864..4b42986 100644 --- a/mina-core/src/main/java/org/apache/mina/filter/executor/OrderedThreadPoolExecutor.java +++ b/mina-core/src/main/java/org/apache/mina/filter/executor/OrderedThreadPoolExecutor.java @@ -682,8 +682,6 @@ public class OrderedThreadPoolExecutor extends ThreadPoolExecutor { if (session == null) { synchronized (workers) { if (workers.size() > getCorePoolSize()) { - // Remove now to prevent duplicate exit. - // workers.remove(this); break; } } diff --git a/mina-core/src/main/java/org/apache/mina/filter/executor/PriorityThreadPoolExecutor.java b/mina-core/src/main/java/org/apache/mina/filter/executor/PriorityThreadPoolExecutor.java index 721005c..43ace10 100644 --- a/mina-core/src/main/java/org/apache/mina/filter/executor/PriorityThreadPoolExecutor.java +++ b/mina-core/src/main/java/org/apache/mina/filter/executor/PriorityThreadPoolExecutor.java @@ -19,15 +19,32 @@ */ package org.apache.mina.filter.executor; -import org.apache.mina.core.session.*; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.*; -import java.util.concurrent.*; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.HashSet; +import java.util.List; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.PriorityBlockingQueue; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import org.apache.mina.core.session.AttributeKey; +import org.apache.mina.core.session.DummySession; +import org.apache.mina.core.session.IoEvent; +import org.apache.mina.core.session.IoSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + /** * A {@link ThreadPoolExecutor} that maintains the order of {@link IoEvent}s * within a session (similar to {@link OrderedThreadPoolExecutor}) and allows @@ -304,12 +321,13 @@ public class PriorityThreadPoolExecutor extends ThreadPoolExecutor { Worker worker = new Worker(); Thread thread = getThreadFactory().newThread(worker); + workers.add(worker); + // As we have added a new thread, it's considered as idle. idleWorkers.incrementAndGet(); // Now, we can start it. thread.start(); - workers.add(worker); if (workers.size() > largestPoolSize) { largestPoolSize = workers.size(); @@ -730,8 +748,6 @@ public class PriorityThreadPoolExecutor extends ThreadPoolExecutor { if (session == null) { synchronized (workers) { if (workers.size() > getCorePoolSize()) { - // Remove now to prevent duplicate exit. - workers.remove(this); break; } } @@ -741,13 +757,11 @@ public class PriorityThreadPoolExecutor extends ThreadPoolExecutor { break; } - try { - if (session != null) { - runTasks(getSessionTasksQueue(session)); - } - } finally { - idleWorkers.incrementAndGet(); + if (session != null) { + runTasks(getSessionTasksQueue(session)); } + + idleWorkers.incrementAndGet(); } } finally { synchronized (workers) { diff --git a/mina-core/src/main/java/org/apache/mina/filter/executor/UnorderedThreadPoolExecutor.java b/mina-core/src/main/java/org/apache/mina/filter/executor/UnorderedThreadPoolExecutor.java index 3136492..570963d 100644 --- a/mina-core/src/main/java/org/apache/mina/filter/executor/UnorderedThreadPoolExecutor.java +++ b/mina-core/src/main/java/org/apache/mina/filter/executor/UnorderedThreadPoolExecutor.java @@ -198,9 +198,14 @@ public class UnorderedThreadPoolExecutor extends ThreadPoolExecutor { Worker worker = new Worker(); Thread thread = getThreadFactory().newThread(worker); + + workers.add(worker); + + // As we have added a new thread, it's considered as idle. idleWorkers.incrementAndGet(); + + // Now, we can start it. thread.start(); - workers.add(worker); if (workers.size() > largestPoolSize) { largestPoolSize = workers.size(); @@ -476,8 +481,6 @@ public class UnorderedThreadPoolExecutor extends ThreadPoolExecutor { if (task == null) { synchronized (workers) { if (workers.size() > corePoolSize) { - // Remove now to prevent duplicate exit. - workers.remove(this); break; } } @@ -487,14 +490,12 @@ public class UnorderedThreadPoolExecutor extends ThreadPoolExecutor { break; } - try { - if (task != null) { - queueHandler.polled(UnorderedThreadPoolExecutor.this, (IoEvent) task); - runTask(task); - } - } finally { - idleWorkers.incrementAndGet(); + if (task != null) { + queueHandler.polled(UnorderedThreadPoolExecutor.this, (IoEvent) task); + runTask(task); } + + idleWorkers.incrementAndGet(); } } finally { synchronized (workers) { diff --git a/mina-core/src/test/java/org/apache/mina/filter/executor/OrderedThreadPoolExecutorTest.java b/mina-core/src/test/java/org/apache/mina/filter/executor/OrderedThreadPoolExecutorTest.java new file mode 100644 index 0000000..729e0ee --- /dev/null +++ b/mina-core/src/test/java/org/apache/mina/filter/executor/OrderedThreadPoolExecutorTest.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.mina.filter.executor; + +import static org.junit.Assert.assertEquals; + +import java.lang.reflect.Field; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.mina.core.filterchain.IoFilter; +import org.apache.mina.core.session.DummySession; +import org.apache.mina.core.session.IdleStatus; +import org.apache.mina.core.session.IoSession; +import org.apache.mina.core.write.WriteRequest; +import org.apache.mina.filter.FilterEvent; +import org.junit.Test; + +/** + * Tests that verify the functionality provided by the implementation of + * {@link OrderedThreadPoolExecutor}. + * + * @author Guus der Kinderen, guus.der.kinde...@gmail.com + */ +public class OrderedThreadPoolExecutorTest +{ + /** + * Tests the state of {@link OrderedThreadPoolExecutor#idleWorkers} and {@link OrderedThreadPoolExecutor#workers} + * after a RuntimeException is thrown when the {@link OrderedThreadPoolExecutor.Worker} is running. + * + * Note that the implementation of this test is <em>not representative</em> of how tasks are normally executed, as + * tasks would ordinarily be 'wrapped' in a FilterChain. Most FilterChain implementations would catch the + * RuntimeException that is being used in the implementation of this test. The purpose of this test is to verify + * Worker's behavior when a RuntimeException is thrown during execution occurs (even if that RuntimeException cannot + * occur in the way that this test simulates it). A test that implements the execution in a more realistic manner is + * provided in {@link org.apache.mina.transport.socket.nio.DIRMINA1156Test}. + * + * @see org.apache.mina.transport.socket.nio.DIRMINA1156Test + * @see <a href="https://issues.apache.org/jira/browse/DIRMINA-1132">Issue DIRMINA-1156: Inconsistent worker / idleWorker in ThreadPoolExecutors</a> + */ + @Test + public void testRuntimeExceptionInWorkerRun() throws Throwable + { + // Set up test fixture. + int corePoolSize = 1; // Prevent an idle worker from being cleaned up, which would skew the results of this test. + OrderedThreadPoolExecutor executor = new OrderedThreadPoolExecutor(corePoolSize,1); + IoFilter.NextFilter nextFilter = new NextFilterAdapter() { + @Override + public void messageReceived(IoSession session, Object message) { + throw new RuntimeException("A RuntimeException thrown during unit testing."); + } + }; + DummySession session = new DummySession(); + ExecutorFilter filter = new ExecutorFilter(executor); + + try { + // Execute system under test. + filter.messageReceived(nextFilter, session, null); + + // Shutting down and awaiting termination ensures that test execution blocks until Worker execution has happened. + executor.shutdown(); + if (!executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS)) { + throw new IllegalStateException("Bug in test implementation."); + } + + // Verify results. + final Field idleWorkersField = OrderedThreadPoolExecutor.class.getDeclaredField("idleWorkers"); // Using reflection as the field is not accessible. It might be nicer to make the field package-protected for testing. + idleWorkersField.setAccessible(true); + final AtomicInteger idleWorkers = (AtomicInteger) idleWorkersField.get(executor); + assertEquals("After all tasks have finished, the amount of workers that are idle should equal the amount of workers, but did not.", executor.getPoolSize(), idleWorkers.get()); + } finally { + // Clean up test fixture. + if (!executor.isShutdown()) { + executor.shutdownNow(); + } + } + } + + /** + * Tests the state of {@link OrderedThreadPoolExecutor#idleWorkers} and {@link OrderedThreadPoolExecutor#workers} + * after an Error is thrown when the {@link OrderedThreadPoolExecutor.Worker} is running. + * + * Note that the implementation of this test is <em>not representative</em> of how tasks are normally executed, as + * tasks would ordinarily be 'wrapped' in a FilterChain. Most FilterChain implementations would catch the Error that + * is being used in the implementation of this test. The purpose of this test is to verify Worker's behavior when an + * Error is thrown during execution occurs (even if that Error cannot occur in the way that this test simulates it). + * A test that implements the execution in a more realistic manner is provided in + * {@link org.apache.mina.transport.socket.nio.DIRMINA1156Test}. + * + * @see org.apache.mina.transport.socket.nio.DIRMINA1156Test + * @see <a href="https://issues.apache.org/jira/browse/DIRMINA-1132">Issue DIRMINA-1156: Inconsistent worker / idleWorker in ThreadPoolExecutors</a> + */ + @Test + public void testErrorInWorkerRun() throws Throwable + { + // Set up test fixture. + int corePoolSize = 1; // Prevent an idle worker from being cleaned up, which would skew the results of this test. + OrderedThreadPoolExecutor executor = new OrderedThreadPoolExecutor(corePoolSize,1); + IoFilter.NextFilter nextFilter = new NextFilterAdapter() { + @Override + public void messageReceived(IoSession session, Object message) { + throw new Error("An Error thrown during unit testing."); + } + }; + DummySession session = new DummySession(); + ExecutorFilter filter = new ExecutorFilter(executor); + + try { + // Execute system under test. + filter.messageReceived(nextFilter, session, null); + + // Ensure that the task has been executed in the executor. + executor.shutdown(); // Shutting down and awaiting termination ensures that test execution blocks until Worker execution has happened. + if (!executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS)) { + throw new IllegalStateException("Bug in test implementation."); + } + + // Verify results. + final Field idleWorkersField = OrderedThreadPoolExecutor.class.getDeclaredField("idleWorkers"); // Using reflection as the field is not accessible. It might be nicer to make the field package-protected for testing. + idleWorkersField.setAccessible(true); + final AtomicInteger idleWorkers = (AtomicInteger) idleWorkersField.get(executor); + assertEquals("After all tasks have finished, the amount of workers that are idle should equal the amount of workers, but did not.", executor.getPoolSize(), idleWorkers.get()); + } finally { + // Clean up test fixture. + if (!executor.isShutdown()) { + executor.shutdownNow(); + } + } + } + + /** + * Empty implementation of IoFilter.NextFilterAdapter, intended to facilitate easy subclassing. + */ + private abstract static class NextFilterAdapter implements IoFilter.NextFilter { + public void sessionOpened(IoSession session) {} + public void sessionClosed(IoSession session) {} + public void sessionIdle(IoSession session, IdleStatus status) {} + public void exceptionCaught(IoSession session, Throwable cause) {} + public void inputClosed(IoSession session) {} + public void messageReceived(IoSession session, Object message) {} + public void messageSent(IoSession session, WriteRequest writeRequest) {} + public void filterWrite(IoSession session, WriteRequest writeRequest) {} + public void filterClose(IoSession session) {} + public void sessionCreated(IoSession session) {} + + @Override + public void event(IoSession session, FilterEvent event) {} + } +} diff --git a/mina-core/src/test/java/org/apache/mina/filter/executor/PriorityThreadPoolExecutorTest.java b/mina-core/src/test/java/org/apache/mina/filter/executor/PriorityThreadPoolExecutorTest.java index 87b48ea..8af367f 100644 --- a/mina-core/src/test/java/org/apache/mina/filter/executor/PriorityThreadPoolExecutorTest.java +++ b/mina-core/src/test/java/org/apache/mina/filter/executor/PriorityThreadPoolExecutorTest.java @@ -19,6 +19,17 @@ */ package org.apache.mina.filter.executor; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + import org.apache.mina.core.filterchain.IoFilter; import org.apache.mina.core.session.DummySession; import org.apache.mina.core.session.IdleStatus; @@ -28,15 +39,6 @@ import org.apache.mina.filter.FilterEvent; import org.junit.Ignore; import org.junit.Test; -import java.util.ArrayList; -import java.util.Comparator; -import java.util.List; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeUnit; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - /** * Tests that verify the functionality provided by the implementation of * {@link PriorityThreadPoolExecutor}. @@ -213,6 +215,110 @@ public class PriorityThreadPoolExecutorTest { } /** + * Tests the state of {@link PriorityThreadPoolExecutor#idleWorkers} and {@link PriorityThreadPoolExecutor#workers} + * after a RuntimeException is thrown when the {@link PriorityThreadPoolExecutor.Worker} is running. + * + * Note that the implementation of this test is <em>not representative</em> of how tasks are normally executed, as + * tasks would ordinarily be 'wrapped' in a FilterChain. Most FilterChain implementations would catch the + * RuntimeException that is being used in the implementation of this test. The purpose of this test is to verify + * Worker's behavior when a RuntimeException is thrown during execution occurs (even if that RuntimeException cannot + * occur in the way that this test simulates it). A test that implements the execution in a more realistic manner is + * provided in {@link org.apache.mina.transport.socket.nio.DIRMINA1156Test}. + * + * @see org.apache.mina.transport.socket.nio.DIRMINA1156Test + * @see <a href="https://issues.apache.org/jira/browse/DIRMINA-1132">Issue DIRMINA-1156: Inconsistent worker / idleWorker in ThreadPoolExecutors</a> + */ + @Test + public void testRuntimeExceptionInWorkerRun() throws Throwable + { + // Set up test fixture. + int corePoolSize = 1; // Prevent an idle worker from being cleaned up, which would skew the results of this test. + PriorityThreadPoolExecutor executor = new PriorityThreadPoolExecutor(corePoolSize,1); + IoFilter.NextFilter nextFilter = new PriorityThreadPoolExecutorTest.NextFilterAdapter() { + @Override + public void messageReceived(IoSession session, Object message) { + throw new RuntimeException("A RuntimeException thrown during unit testing."); + } + }; + DummySession session = new DummySession(); + ExecutorFilter filter = new ExecutorFilter(executor); + + try { + // Execute system under test. + filter.messageReceived(nextFilter, session, null); + + // Shutting down and awaiting termination ensures that test execution blocks until Worker execution has happened. + executor.shutdown(); + if (!executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS)) { + throw new IllegalStateException("Bug in test implementation."); + } + + // Verify results. + final Field idleWorkersField = PriorityThreadPoolExecutor.class.getDeclaredField("idleWorkers"); // Using reflection as the field is not accessible. It might be nicer to make the field package-protected for testing. + idleWorkersField.setAccessible(true); + final AtomicInteger idleWorkers = (AtomicInteger) idleWorkersField.get(executor); + assertEquals("After all tasks have finished, the amount of workers that are idle should equal the amount of workers, but did not.", executor.getPoolSize(), idleWorkers.get()); + } finally { + // Clean up test fixture. + if (!executor.isShutdown()) { + executor.shutdownNow(); + } + } + } + + /** + * Tests the state of {@link PriorityThreadPoolExecutor#idleWorkers} and {@link PriorityThreadPoolExecutor#workers} + * after an Error is thrown when the {@link PriorityThreadPoolExecutor.Worker} is running. + * + * Note that the implementation of this test is <em>not representative</em> of how tasks are normally executed, as + * tasks would ordinarily be 'wrapped' in a FilterChain. Most FilterChain implementations would catch the Error that + * is being used in the implementation of this test. The purpose of this test is to verify Worker's behavior when an + * Error is thrown during execution occurs (even if that Error cannot occur in the way that this test simulates it). + * A test that implements the execution in a more realistic manner is provided in + * {@link org.apache.mina.transport.socket.nio.DIRMINA1156Test}. + * + * @see org.apache.mina.transport.socket.nio.DIRMINA1156Test + * @see <a href="https://issues.apache.org/jira/browse/DIRMINA-1132">Issue DIRMINA-1156: Inconsistent worker / idleWorker in ThreadPoolExecutors</a> + */ + @Test + public void testErrorInWorkerRun() throws Throwable + { + // Set up test fixture. + int corePoolSize = 1; // Prevent an idle worker from being cleaned up, which would skew the results of this test. + PriorityThreadPoolExecutor executor = new PriorityThreadPoolExecutor(corePoolSize,1); + IoFilter.NextFilter nextFilter = new PriorityThreadPoolExecutorTest.NextFilterAdapter() { + @Override + public void messageReceived(IoSession session, Object message) { + throw new Error("An Error thrown during unit testing."); + } + }; + DummySession session = new DummySession(); + ExecutorFilter filter = new ExecutorFilter(executor); + + try { + // Execute system under test. + filter.messageReceived(nextFilter, session, null); + + // Ensure that the task has been executed in the executor. + executor.shutdown(); // Shutting down and awaiting termination ensures that test execution blocks until Worker execution has happened. + if (!executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS)) { + throw new IllegalStateException("Bug in test implementation."); + } + + // Verify results. + final Field idleWorkersField = PriorityThreadPoolExecutor.class.getDeclaredField("idleWorkers"); // Using reflection as the field is not accessible. It might be nicer to make the field package-protected for testing. + idleWorkersField.setAccessible(true); + final AtomicInteger idleWorkers = (AtomicInteger) idleWorkersField.get(executor); + assertEquals("After all tasks have finished, the amount of workers that are idle should equal the amount of workers, but did not.", executor.getPoolSize(), idleWorkers.get()); + } finally { + // Clean up test fixture. + if (!executor.isShutdown()) { + executor.shutdownNow(); + } + } + } + + /** * A comparator that prefers a particular session. */ private static class UnfairComparator implements Comparator<IoSession> { @@ -305,4 +411,23 @@ public class PriorityThreadPoolExecutorTest { // TODO Auto-generated method stub } } + + /** + * Empty implementation of IoFilter.NextFilterAdapter, intended to facilitate easy subclassing. + */ + private abstract static class NextFilterAdapter implements IoFilter.NextFilter { + public void sessionOpened(IoSession session) {} + public void sessionClosed(IoSession session) {} + public void sessionIdle(IoSession session, IdleStatus status) {} + public void exceptionCaught(IoSession session, Throwable cause) {} + public void inputClosed(IoSession session) {} + public void messageReceived(IoSession session, Object message) {} + public void messageSent(IoSession session, WriteRequest writeRequest) {} + public void filterWrite(IoSession session, WriteRequest writeRequest) {} + public void filterClose(IoSession session) {} + public void sessionCreated(IoSession session) {} + + @Override + public void event(IoSession session, FilterEvent event) {} + } } diff --git a/mina-core/src/test/java/org/apache/mina/filter/executor/UnorderedThreadPoolExecutorTest.java b/mina-core/src/test/java/org/apache/mina/filter/executor/UnorderedThreadPoolExecutorTest.java new file mode 100644 index 0000000..28d06c5 --- /dev/null +++ b/mina-core/src/test/java/org/apache/mina/filter/executor/UnorderedThreadPoolExecutorTest.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.mina.filter.executor; + +import static org.junit.Assert.assertEquals; + +import java.lang.reflect.Field; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.mina.core.filterchain.IoFilter; +import org.apache.mina.core.session.DummySession; +import org.apache.mina.core.session.IdleStatus; +import org.apache.mina.core.session.IoSession; +import org.apache.mina.core.write.WriteRequest; +import org.apache.mina.filter.FilterEvent; +import org.junit.Test; + +/** + * Tests that verify the functionality provided by the implementation of + * {@link UnorderedThreadPoolExecutor}. + * + * @author Guus der Kinderen, guus.der.kinde...@gmail.com + */ +public class UnorderedThreadPoolExecutorTest +{ + /** + * Tests the state of {@link UnorderedThreadPoolExecutor#idleWorkers} and {@link UnorderedThreadPoolExecutor#workers} + * after a RuntimeException is thrown when the {@link UnorderedThreadPoolExecutor.Worker} is running. + * + * Note that the implementation of this test is <em>not representative</em> of how tasks are normally executed, as + * tasks would ordinarily be 'wrapped' in a FilterChain. Most FilterChain implementations would catch the + * RuntimeException that is being used in the implementation of this test. The purpose of this test is to verify + * Worker's behavior when a RuntimeException is thrown during execution occurs (even if that RuntimeException cannot + * occur in the way that this test simulates it). A test that implements the execution in a more realistic manner is + * provided in {@link org.apache.mina.transport.socket.nio.DIRMINA1156Test}. + * + * @see org.apache.mina.transport.socket.nio.DIRMINA1156Test + * @see <a href="https://issues.apache.org/jira/browse/DIRMINA-1132">Issue DIRMINA-1156: Inconsistent worker / idleWorker in ThreadPoolExecutors</a> + */ + @Test + public void testRuntimeExceptionInWorkerRun() throws Throwable + { + // Set up test fixture. + int corePoolSize = 1; // Prevent an idle worker from being cleaned up, which would skew the results of this test. + UnorderedThreadPoolExecutor executor = new UnorderedThreadPoolExecutor(corePoolSize,1); + IoFilter.NextFilter nextFilter = new NextFilterAdapter() { + @Override + public void messageReceived(IoSession session, Object message) { + throw new RuntimeException("A RuntimeException thrown during unit testing."); + } + }; + DummySession session = new DummySession(); + ExecutorFilter filter = new ExecutorFilter(executor); + + try { + // Execute system under test. + filter.messageReceived(nextFilter, session, null); + + // Shutting down and awaiting termination ensures that test execution blocks until Worker execution has happened. + executor.shutdown(); + if (!executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS)) { + throw new IllegalStateException("Bug in test implementation."); + } + + // Verify results. + final Field idleWorkersField = UnorderedThreadPoolExecutor.class.getDeclaredField("idleWorkers"); // Using reflection as the field is not accessible. It might be nicer to make the field package-protected for testing. + idleWorkersField.setAccessible(true); + final AtomicInteger idleWorkers = (AtomicInteger) idleWorkersField.get(executor); + assertEquals("After all tasks have finished, the amount of workers that are idle should equal the amount of workers, but did not.", executor.getPoolSize(), idleWorkers.get()); + } finally { + // Clean up test fixture. + if (!executor.isShutdown()) { + executor.shutdownNow(); + } + } + } + + /** + * Tests the state of {@link UnorderedThreadPoolExecutor#idleWorkers} and {@link UnorderedThreadPoolExecutor#workers} + * after an Error is thrown when the {@link UnorderedThreadPoolExecutor.Worker} is running. + * + * Note that the implementation of this test is <em>not representative</em> of how tasks are normally executed, as + * tasks would ordinarily be 'wrapped' in a FilterChain. Most FilterChain implementations would catch the Error that + * is being used in the implementation of this test. The purpose of this test is to verify Worker's behavior when an + * Error is thrown during execution occurs (even if that Error cannot occur in the way that this test simulates it). + * A test that implements the execution in a more realistic manner is provided in + * {@link org.apache.mina.transport.socket.nio.DIRMINA1156Test}. + * + * @see org.apache.mina.transport.socket.nio.DIRMINA1156Test + * @see <a href="https://issues.apache.org/jira/browse/DIRMINA-1132">Issue DIRMINA-1156: Inconsistent worker / idleWorker in ThreadPoolExecutors</a> + */ + @Test + public void testErrorInWorkerRun() throws Throwable + { + // Set up test fixture. + int corePoolSize = 1; // Prevent an idle worker from being cleaned up, which would skew the results of this test. + UnorderedThreadPoolExecutor executor = new UnorderedThreadPoolExecutor(corePoolSize,1); + IoFilter.NextFilter nextFilter = new NextFilterAdapter() { + @Override + public void messageReceived(IoSession session, Object message) { + throw new Error("An Error thrown during unit testing."); + } + }; + DummySession session = new DummySession(); + ExecutorFilter filter = new ExecutorFilter(executor); + + try { + // Execute system under test. + filter.messageReceived(nextFilter, session, null); + + // Ensure that the task has been executed in the executor. + executor.shutdown(); // Shutting down and awaiting termination ensures that test execution blocks until Worker execution has happened. + if (!executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS)) { + throw new IllegalStateException("Bug in test implementation."); + } + + // Verify results. + final Field idleWorkersField = UnorderedThreadPoolExecutor.class.getDeclaredField("idleWorkers"); // Using reflection as the field is not accessible. It might be nicer to make the field package-protected for testing. + idleWorkersField.setAccessible(true); + final AtomicInteger idleWorkers = (AtomicInteger) idleWorkersField.get(executor); + assertEquals("After all tasks have finished, the amount of workers that are idle should equal the amount of workers, but did not.", executor.getPoolSize(), idleWorkers.get()); + } finally { + // Clean up test fixture. + if (!executor.isShutdown()) { + executor.shutdownNow(); + } + } + } + + /** + * Empty implementation of IoFilter.NextFilterAdapter, intended to facilitate easy subclassing. + */ + private abstract static class NextFilterAdapter implements IoFilter.NextFilter { + public void sessionOpened(IoSession session) {} + public void sessionClosed(IoSession session) {} + public void sessionIdle(IoSession session, IdleStatus status) {} + public void exceptionCaught(IoSession session, Throwable cause) {} + public void inputClosed(IoSession session) {} + public void messageReceived(IoSession session, Object message) {} + public void messageSent(IoSession session, WriteRequest writeRequest) {} + public void filterWrite(IoSession session, WriteRequest writeRequest) {} + public void filterClose(IoSession session) {} + public void sessionCreated(IoSession session) {} + + @Override + public void event(IoSession session, FilterEvent event) {} + } +} diff --git a/mina-core/src/test/java/org/apache/mina/handler/DIRMINA1156Test.java b/mina-core/src/test/java/org/apache/mina/handler/DIRMINA1156Test.java new file mode 100644 index 0000000..40b7073 --- /dev/null +++ b/mina-core/src/test/java/org/apache/mina/handler/DIRMINA1156Test.java @@ -0,0 +1,465 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.mina.handler; + +import static org.junit.Assert.assertEquals; + +import java.lang.reflect.Field; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.mina.core.filterchain.IoFilterChain; +import org.apache.mina.core.service.IoHandlerAdapter; +import org.apache.mina.core.session.DummySession; +import org.apache.mina.core.session.IoSession; +import org.apache.mina.filter.executor.ExecutorFilter; +import org.apache.mina.filter.executor.OrderedThreadPoolExecutor; +import org.apache.mina.filter.executor.PriorityThreadPoolExecutor; +import org.apache.mina.filter.executor.UnorderedThreadPoolExecutor; +import org.junit.Test; + +/** + * Tests that reproduces a bug as described in issue DIRMINA-1156 + * + * @author Guus der Kinderen, guus.der.kinde...@gmail.com + * @see <a href="https://issues.apache.org/jira/browse/DIRMINA-1156">DIRMINA-1156</a> + */ +public class DIRMINA1156Test +{ + /** + * Tests the state of {@link OrderedThreadPoolExecutor#idleWorkers} and {@link OrderedThreadPoolExecutor#workers} + * after an {@link Error} is thrown by a session's handler that was invoked through an OrderedThreadPoolExecutor. + */ + @Test + public void testOrderedThreadPoolExecutorSessionHandlerThrowingError() throws Exception + { + // Set up test fixture. + final boolean[] filterTriggered = {false}; // Used to verify the implementation of this test (to see if the Handler is invoked at all). + int corePoolSize = 1; // Prevent an idle worker from being cleaned up, which would skew the results of this test. + DummySession session = new DummySession(); + IoFilterChain chain = session.getFilterChain(); + OrderedThreadPoolExecutor executor = new OrderedThreadPoolExecutor(corePoolSize,1); + chain.addLast("executor", new ExecutorFilter(executor)); + session.setHandler( new IoHandlerAdapter() { + @Override + public void messageReceived(IoSession session, Object message) throws Exception { + filterTriggered[0] = true; + throw new Error("An Error thrown during unit testing."); + } + }); + + // Execute system under test. + try { + chain.fireMessageReceived("foo"); + + // Shutting down and awaiting termination ensures that test execution blocks until Handler invocation has happened. + executor.shutdown(); + executor.awaitTermination(10, TimeUnit.SECONDS); + if (!filterTriggered[0]) { + throw new IllegalStateException("Bug in test implementation: the session handler was never invoked."); + } + + // Verify results. + final Field idleWorkersField = OrderedThreadPoolExecutor.class.getDeclaredField("idleWorkers"); // Using reflection as the field is not accessible. It might be nicer to make the field package-protected for testing. + idleWorkersField.setAccessible(true); + final AtomicInteger idleWorkers = (AtomicInteger) idleWorkersField.get(executor); + assertEquals("After all tasks have finished, the amount of workers that are idle should equal the amount of workers, but did not.", executor.getPoolSize(), idleWorkers.get()); + } finally { + // Clean up test fixture. + if (!executor.isShutdown()) { + executor.shutdownNow(); + } + } + } + + /** + * Tests the state of {@link OrderedThreadPoolExecutor#idleWorkers} and {@link OrderedThreadPoolExecutor#workers} + * after a {@link RuntimeException} is thrown by a session's handler that was invoked through an + * OrderedThreadPoolExecutor. + */ + @Test + public void testOrderedThreadPoolExecutorSessionHandlerThrowingRuntimeException() throws Exception + { + // Set up test fixture. + final boolean[] filterTriggered = {false}; // Used to verify the implementation of this test (to see if the Handler is invoked at all). + int corePoolSize = 1; // Prevent an idle worker from being cleaned up, which would skew the results of this test. + DummySession session = new DummySession(); + IoFilterChain chain = session.getFilterChain(); + OrderedThreadPoolExecutor executor = new OrderedThreadPoolExecutor(corePoolSize,1); + chain.addLast("executor", new ExecutorFilter(executor)); + session.setHandler( new IoHandlerAdapter() { + @Override + public void messageReceived(IoSession session, Object message) throws Exception { + filterTriggered[0] = true; + throw new RuntimeException("A RuntimeException thrown during unit testing."); + } + }); + + // Execute system under test. + try { + chain.fireMessageReceived("foo"); + + // Shutting down and awaiting termination ensures that test execution blocks until Handler invocation has happened. + executor.shutdown(); + executor.awaitTermination(10, TimeUnit.SECONDS); + if (!filterTriggered[0]) { + throw new IllegalStateException("Bug in test implementation: the session handler was never invoked."); + } + + // Verify results. + final Field idleWorkersField = OrderedThreadPoolExecutor.class.getDeclaredField("idleWorkers"); // Using reflection as the field is not accessible. It might be nicer to make the field package-protected for testing. + idleWorkersField.setAccessible(true); + final AtomicInteger idleWorkers = (AtomicInteger) idleWorkersField.get(executor); + assertEquals("After all tasks have finished, the amount of workers that are idle should equal the amount of workers, but did not.", executor.getPoolSize(), idleWorkers.get()); + } finally { + // Clean up test fixture. + if (!executor.isShutdown()) { + executor.shutdownNow(); + } + } + } + + /** + * Tests the state of {@link OrderedThreadPoolExecutor#idleWorkers} and {@link OrderedThreadPoolExecutor#workers} + * after a (checked) {@link Exception} is thrown by a session's handler that was invoked through an + * OrderedThreadPoolExecutor. + */ + @Test + public void testOrderedThreadPoolExecutorSessionHandlerThrowingCheckedException() throws Exception + { + // Set up test fixture. + final boolean[] filterTriggered = {false}; // Used to verify the implementation of this test (to see if the Handler is invoked at all). + int corePoolSize = 1; // Prevent an idle worker from being cleaned up, which would skew the results of this test. + DummySession session = new DummySession(); + IoFilterChain chain = session.getFilterChain(); + OrderedThreadPoolExecutor executor = new OrderedThreadPoolExecutor(corePoolSize,1); + chain.addLast("executor", new ExecutorFilter(executor)); + session.setHandler( new IoHandlerAdapter() { + @Override + public void messageReceived(IoSession session, Object message) throws Exception { + filterTriggered[0] = true; + throw new Exception("A (checked) Exception thrown during unit testing."); + } + }); + + // Execute system under test. + try { + chain.fireMessageReceived("foo"); + + // Shutting down and awaiting termination ensures that test execution blocks until Handler invocation has happened. + executor.shutdown(); + executor.awaitTermination(10, TimeUnit.SECONDS); + if (!filterTriggered[0]) { + throw new IllegalStateException("Bug in test implementation: the session handler was never invoked."); + } + + // Verify results. + final Field idleWorkersField = OrderedThreadPoolExecutor.class.getDeclaredField("idleWorkers"); // Using reflection as the field is not accessible. It might be nicer to make the field package-protected for testing. + idleWorkersField.setAccessible(true); + final AtomicInteger idleWorkers = (AtomicInteger) idleWorkersField.get(executor); + assertEquals("After all tasks have finished, the amount of workers that are idle should equal the amount of workers, but did not.", executor.getPoolSize(), idleWorkers.get()); + } finally { + // Clean up test fixture. + if (!executor.isShutdown()) { + executor.shutdownNow(); + } + } + } + + /** + * Tests the state of {@link UnorderedThreadPoolExecutor#idleWorkers} and {@link UnorderedThreadPoolExecutor#workers} + * after an {@link Error} is thrown by a session's handler that was invoked through an UnorderedThreadPoolExecutor. + */ + @Test + public void testUnorderedThreadPoolExecutorSessionHandlerThrowingError() throws Exception + { + // Set up test fixture. + final boolean[] filterTriggered = {false}; // Used to verify the implementation of this test (to see if the Handler is invoked at all). + int corePoolSize = 1; // Prevent an idle worker from being cleaned up, which would skew the results of this test. + DummySession session = new DummySession(); + IoFilterChain chain = session.getFilterChain(); + UnorderedThreadPoolExecutor executor = new UnorderedThreadPoolExecutor(corePoolSize,1); + chain.addLast("executor", new ExecutorFilter(executor)); + session.setHandler( new IoHandlerAdapter() { + @Override + public void messageReceived(IoSession session, Object message) throws Exception { + filterTriggered[0] = true; + throw new Error("An Error thrown during unit testing."); + } + }); + + // Execute system under test. + try { + chain.fireMessageReceived("foo"); + + // Shutting down and awaiting termination ensures that test execution blocks until Handler invocation has happened. + executor.shutdown(); + executor.awaitTermination(10, TimeUnit.SECONDS); + if (!filterTriggered[0]) { + throw new IllegalStateException("Bug in test implementation: the session handler was never invoked."); + } + + // Verify results. + final Field idleWorkersField = UnorderedThreadPoolExecutor.class.getDeclaredField("idleWorkers"); // Using reflection as the field is not accessible. It might be nicer to make the field package-protected for testing. + idleWorkersField.setAccessible(true); + final AtomicInteger idleWorkers = (AtomicInteger) idleWorkersField.get(executor); + assertEquals("After all tasks have finished, the amount of workers that are idle should equal the amount of workers, but did not.", executor.getPoolSize(), idleWorkers.get()); + } finally { + // Clean up test fixture. + if (!executor.isShutdown()) { + executor.shutdownNow(); + } + } + } + + /** + * Tests the state of {@link UnorderedThreadPoolExecutor#idleWorkers} and {@link UnorderedThreadPoolExecutor#workers} + * after a {@link RuntimeException} is thrown by a session's handler that was invoked through an + * UnorderedThreadPoolExecutor. + */ + @Test + public void testUnorderedThreadPoolExecutorSessionHandlerThrowingRuntimeException() throws Exception + { + // Set up test fixture. + final boolean[] filterTriggered = {false}; // Used to verify the implementation of this test (to see if the Handler is invoked at all). + int corePoolSize = 1; // Prevent an idle worker from being cleaned up, which would skew the results of this test. + DummySession session = new DummySession(); + IoFilterChain chain = session.getFilterChain(); + UnorderedThreadPoolExecutor executor = new UnorderedThreadPoolExecutor(corePoolSize,1); + chain.addLast("executor", new ExecutorFilter(executor)); + session.setHandler( new IoHandlerAdapter() { + @Override + public void messageReceived(IoSession session, Object message) throws Exception { + filterTriggered[0] = true; + throw new RuntimeException("A RuntimeException thrown during unit testing."); + } + }); + + // Execute system under test. + try { + chain.fireMessageReceived("foo"); + + // Shutting down and awaiting termination ensures that test execution blocks until Handler invocation has happened. + executor.shutdown(); + executor.awaitTermination(10, TimeUnit.SECONDS); + if (!filterTriggered[0]) { + throw new IllegalStateException("Bug in test implementation: the session handler was never invoked."); + } + + // Verify results. + final Field idleWorkersField = UnorderedThreadPoolExecutor.class.getDeclaredField("idleWorkers"); // Using reflection as the field is not accessible. It might be nicer to make the field package-protected for testing. + idleWorkersField.setAccessible(true); + final AtomicInteger idleWorkers = (AtomicInteger) idleWorkersField.get(executor); + assertEquals("After all tasks have finished, the amount of workers that are idle should equal the amount of workers, but did not.", executor.getPoolSize(), idleWorkers.get()); + } finally { + // Clean up test fixture. + if (!executor.isShutdown()) { + executor.shutdownNow(); + } + } + } + + /** + * Tests the state of {@link UnorderedThreadPoolExecutor#idleWorkers} and {@link UnorderedThreadPoolExecutor#workers} + * after a (checked) {@link Exception} is thrown by a session's handler that was invoked through an + * UnorderedThreadPoolExecutor. + */ + @Test + public void testUnorderedThreadPoolExecutorSessionHandlerThrowingCheckedException() throws Exception + { + // Set up test fixture. + final boolean[] filterTriggered = {false}; // Used to verify the implementation of this test (to see if the Handler is invoked at all). + int corePoolSize = 1; // Prevent an idle worker from being cleaned up, which would skew the results of this test. + DummySession session = new DummySession(); + IoFilterChain chain = session.getFilterChain(); + UnorderedThreadPoolExecutor executor = new UnorderedThreadPoolExecutor(corePoolSize,1); + chain.addLast("executor", new ExecutorFilter(executor)); + session.setHandler( new IoHandlerAdapter() { + @Override + public void messageReceived(IoSession session, Object message) throws Exception { + filterTriggered[0] = true; + throw new Exception("A (checked) Exception thrown during unit testing."); + } + }); + + // Execute system under test. + try { + chain.fireMessageReceived("foo"); + + // Shutting down and awaiting termination ensures that test execution blocks until Handler invocation has happened. + executor.shutdown(); + executor.awaitTermination(10, TimeUnit.SECONDS); + if (!filterTriggered[0]) { + throw new IllegalStateException("Bug in test implementation: the session handler was never invoked."); + } + + // Verify results. + final Field idleWorkersField = UnorderedThreadPoolExecutor.class.getDeclaredField("idleWorkers"); // Using reflection as the field is not accessible. It might be nicer to make the field package-protected for testing. + idleWorkersField.setAccessible(true); + final AtomicInteger idleWorkers = (AtomicInteger) idleWorkersField.get(executor); + assertEquals("After all tasks have finished, the amount of workers that are idle should equal the amount of workers, but did not.", executor.getPoolSize(), idleWorkers.get()); + } finally { + // Clean up test fixture. + if (!executor.isShutdown()) { + executor.shutdownNow(); + } + } + } + + /** + * Tests the state of {@link PriorityThreadPoolExecutor#idleWorkers} and {@link PriorityThreadPoolExecutor#workers} + * after an {@link Error} is thrown by a session's handler that was invoked through an PriorityThreadPoolExecutor. + */ + @Test + public void testPriorityThreadPoolExecutorSessionHandlerThrowingError() throws Exception + { + // Set up test fixture. + final boolean[] filterTriggered = {false}; // Used to verify the implementation of this test (to see if the Handler is invoked at all). + int corePoolSize = 1; // Prevent an idle worker from being cleaned up, which would skew the results of this test. + DummySession session = new DummySession(); + IoFilterChain chain = session.getFilterChain(); + PriorityThreadPoolExecutor executor = new PriorityThreadPoolExecutor(corePoolSize,1); + chain.addLast("executor", new ExecutorFilter(executor)); + session.setHandler( new IoHandlerAdapter() { + @Override + public void messageReceived(IoSession session, Object message) throws Exception { + filterTriggered[0] = true; + throw new Error("An Error thrown during unit testing."); + } + }); + + // Execute system under test. + try { + chain.fireMessageReceived("foo"); + + // Shutting down and awaiting termination ensures that test execution blocks until Handler invocation has happened. + executor.shutdown(); + executor.awaitTermination(10, TimeUnit.SECONDS); + if (!filterTriggered[0]) { + throw new IllegalStateException("Bug in test implementation: the session handler was never invoked."); + } + + // Verify results. + final Field idleWorkersField = PriorityThreadPoolExecutor.class.getDeclaredField("idleWorkers"); // Using reflection as the field is not accessible. It might be nicer to make the field package-protected for testing. + idleWorkersField.setAccessible(true); + final AtomicInteger idleWorkers = (AtomicInteger) idleWorkersField.get(executor); + assertEquals("After all tasks have finished, the amount of workers that are idle should equal the amount of workers, but did not.", executor.getPoolSize(), idleWorkers.get()); + } finally { + // Clean up test fixture. + if (!executor.isShutdown()) { + executor.shutdownNow(); + } + } + } + + /** + * Tests the state of {@link PriorityThreadPoolExecutor#idleWorkers} and {@link PriorityThreadPoolExecutor#workers} + * after a {@link RuntimeException} is thrown by a session's handler that was invoked through an + * PriorityThreadPoolExecutor. + */ + @Test + public void testPriorityThreadPoolExecutorSessionHandlerThrowingRuntimeException() throws Exception + { + // Set up test fixture. + final boolean[] filterTriggered = {false}; // Used to verify the implementation of this test (to see if the Handler is invoked at all). + int corePoolSize = 1; // Prevent an idle worker from being cleaned up, which would skew the results of this test. + DummySession session = new DummySession(); + IoFilterChain chain = session.getFilterChain(); + PriorityThreadPoolExecutor executor = new PriorityThreadPoolExecutor(corePoolSize,1); + chain.addLast("executor", new ExecutorFilter(executor)); + session.setHandler( new IoHandlerAdapter() { + @Override + public void messageReceived(IoSession session, Object message) throws Exception { + filterTriggered[0] = true; + throw new RuntimeException("A RuntimeException thrown during unit testing."); + } + }); + + // Execute system under test. + try { + chain.fireMessageReceived("foo"); + + // Shutting down and awaiting termination ensures that test execution blocks until Handler invocation has happened. + executor.shutdown(); + executor.awaitTermination(10, TimeUnit.SECONDS); + if (!filterTriggered[0]) { + throw new IllegalStateException("Bug in test implementation: the session handler was never invoked."); + } + + // Verify results. + final Field idleWorkersField = PriorityThreadPoolExecutor.class.getDeclaredField("idleWorkers"); // Using reflection as the field is not accessible. It might be nicer to make the field package-protected for testing. + idleWorkersField.setAccessible(true); + final AtomicInteger idleWorkers = (AtomicInteger) idleWorkersField.get(executor); + assertEquals("After all tasks have finished, the amount of workers that are idle should equal the amount of workers, but did not.", executor.getPoolSize(), idleWorkers.get()); + } finally { + // Clean up test fixture. + if (!executor.isShutdown()) { + executor.shutdownNow(); + } + } + } + + /** + * Tests the state of {@link PriorityThreadPoolExecutor#idleWorkers} and {@link PriorityThreadPoolExecutor#workers} + * after a (checked) {@link Exception} is thrown by a session's handler that was invoked through an + * PriorityThreadPoolExecutor. + */ + @Test + public void testPriorityThreadPoolExecutorSessionHandlerThrowingCheckedException() throws Exception + { + // Set up test fixture. + final boolean[] filterTriggered = {false}; // Used to verify the implementation of this test (to see if the Handler is invoked at all). + int corePoolSize = 1; // Prevent an idle worker from being cleaned up, which would skew the results of this test. + DummySession session = new DummySession(); + IoFilterChain chain = session.getFilterChain(); + PriorityThreadPoolExecutor executor = new PriorityThreadPoolExecutor(corePoolSize,1); + chain.addLast("executor", new ExecutorFilter(executor)); + session.setHandler( new IoHandlerAdapter() { + @Override + public void messageReceived(IoSession session, Object message) throws Exception { + filterTriggered[0] = true; + throw new Exception("A (checked) Exception thrown during unit testing."); + } + }); + + // Execute system under test. + try { + chain.fireMessageReceived("foo"); + + // Shutting down and awaiting termination ensures that test execution blocks until Handler invocation has happened. + executor.shutdown(); + executor.awaitTermination(10, TimeUnit.SECONDS); + if (!filterTriggered[0]) { + throw new IllegalStateException("Bug in test implementation: the session handler was never invoked."); + } + + // Verify results. + final Field idleWorkersField = PriorityThreadPoolExecutor.class.getDeclaredField("idleWorkers"); // Using reflection as the field is not accessible. It might be nicer to make the field package-protected for testing. + idleWorkersField.setAccessible(true); + final AtomicInteger idleWorkers = (AtomicInteger) idleWorkersField.get(executor); + assertEquals("After all tasks have finished, the amount of workers that are idle should equal the amount of workers, but did not.", executor.getPoolSize(), idleWorkers.get()); + } finally { + // Clean up test fixture. + if (!executor.isShutdown()) { + executor.shutdownNow(); + } + } + } +}