This is an automated email from the ASF dual-hosted git repository. elecharny pushed a commit to branch 2.0.X in repository https://gitbox.apache.org/repos/asf/mina.git
The following commit(s) were added to refs/heads/2.0.X by this push: new 4de9c94 Backported the fix for DIRMINA-1156 4de9c94 is described below commit 4de9c942c8e34ef864d02ce6568dc23978548862 Author: emmanuel lecharny <elecha...@apache.org> AuthorDate: Wed Jan 12 23:47:01 2022 +0100 Backported the fix for DIRMINA-1156 --- .../filter/executor/OrderedThreadPoolExecutor.java | 15 +- .../executor/PriorityThreadPoolExecutor.java | 44 +- .../executor/UnorderedThreadPoolExecutor.java | 20 +- .../executor/OrderedThreadPoolExecutorTest.java | 162 +++++++ .../executor/PriorityThreadPoolExecutorTest.java | 424 +++++++++++++++++++ .../executor/UnorderedThreadPoolExecutorTest.java | 162 +++++++ .../org/apache/mina/handler/DIRMINA1156Test.java | 465 +++++++++++++++++++++ 7 files changed, 1258 insertions(+), 34 deletions(-) diff --git a/mina-core/src/main/java/org/apache/mina/filter/executor/OrderedThreadPoolExecutor.java b/mina-core/src/main/java/org/apache/mina/filter/executor/OrderedThreadPoolExecutor.java index 7378956..841a584 100644 --- a/mina-core/src/main/java/org/apache/mina/filter/executor/OrderedThreadPoolExecutor.java +++ b/mina-core/src/main/java/org/apache/mina/filter/executor/OrderedThreadPoolExecutor.java @@ -255,12 +255,13 @@ public class OrderedThreadPoolExecutor extends ThreadPoolExecutor { Worker worker = new Worker(); Thread thread = getThreadFactory().newThread(worker); + workers.add(worker); + // As we have added a new thread, it's considered as idle. idleWorkers.incrementAndGet(); // Now, we can start it. thread.start(); - workers.add(worker); if (workers.size() > largestPoolSize) { largestPoolSize = workers.size(); @@ -681,8 +682,6 @@ public class OrderedThreadPoolExecutor extends ThreadPoolExecutor { if (session == null) { synchronized (workers) { if (workers.size() > getCorePoolSize()) { - // Remove now to prevent duplicate exit. - workers.remove(this); break; } } @@ -692,13 +691,11 @@ public class OrderedThreadPoolExecutor extends ThreadPoolExecutor { break; } - try { - if (session != null) { - runTasks(getSessionTasksQueue(session)); - } - } finally { - idleWorkers.incrementAndGet(); + if (session != null) { + runTasks(getSessionTasksQueue(session)); } + + idleWorkers.incrementAndGet(); } } finally { synchronized (workers) { diff --git a/mina-core/src/main/java/org/apache/mina/filter/executor/PriorityThreadPoolExecutor.java b/mina-core/src/main/java/org/apache/mina/filter/executor/PriorityThreadPoolExecutor.java index 721005c..43ace10 100644 --- a/mina-core/src/main/java/org/apache/mina/filter/executor/PriorityThreadPoolExecutor.java +++ b/mina-core/src/main/java/org/apache/mina/filter/executor/PriorityThreadPoolExecutor.java @@ -19,15 +19,32 @@ */ package org.apache.mina.filter.executor; -import org.apache.mina.core.session.*; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.*; -import java.util.concurrent.*; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.HashSet; +import java.util.List; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.PriorityBlockingQueue; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import org.apache.mina.core.session.AttributeKey; +import org.apache.mina.core.session.DummySession; +import org.apache.mina.core.session.IoEvent; +import org.apache.mina.core.session.IoSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + /** * A {@link ThreadPoolExecutor} that maintains the order of {@link IoEvent}s * within a session (similar to {@link OrderedThreadPoolExecutor}) and allows @@ -304,12 +321,13 @@ public class PriorityThreadPoolExecutor extends ThreadPoolExecutor { Worker worker = new Worker(); Thread thread = getThreadFactory().newThread(worker); + workers.add(worker); + // As we have added a new thread, it's considered as idle. idleWorkers.incrementAndGet(); // Now, we can start it. thread.start(); - workers.add(worker); if (workers.size() > largestPoolSize) { largestPoolSize = workers.size(); @@ -730,8 +748,6 @@ public class PriorityThreadPoolExecutor extends ThreadPoolExecutor { if (session == null) { synchronized (workers) { if (workers.size() > getCorePoolSize()) { - // Remove now to prevent duplicate exit. - workers.remove(this); break; } } @@ -741,13 +757,11 @@ public class PriorityThreadPoolExecutor extends ThreadPoolExecutor { break; } - try { - if (session != null) { - runTasks(getSessionTasksQueue(session)); - } - } finally { - idleWorkers.incrementAndGet(); + if (session != null) { + runTasks(getSessionTasksQueue(session)); } + + idleWorkers.incrementAndGet(); } } finally { synchronized (workers) { diff --git a/mina-core/src/main/java/org/apache/mina/filter/executor/UnorderedThreadPoolExecutor.java b/mina-core/src/main/java/org/apache/mina/filter/executor/UnorderedThreadPoolExecutor.java index 3136492..813f857 100644 --- a/mina-core/src/main/java/org/apache/mina/filter/executor/UnorderedThreadPoolExecutor.java +++ b/mina-core/src/main/java/org/apache/mina/filter/executor/UnorderedThreadPoolExecutor.java @@ -198,9 +198,13 @@ public class UnorderedThreadPoolExecutor extends ThreadPoolExecutor { Worker worker = new Worker(); Thread thread = getThreadFactory().newThread(worker); + workers.add(worker); + + // As we have added a new thread, it's considered as idle. idleWorkers.incrementAndGet(); + + // Now, we can start it. thread.start(); - workers.add(worker); if (workers.size() > largestPoolSize) { largestPoolSize = workers.size(); @@ -476,8 +480,6 @@ public class UnorderedThreadPoolExecutor extends ThreadPoolExecutor { if (task == null) { synchronized (workers) { if (workers.size() > corePoolSize) { - // Remove now to prevent duplicate exit. - workers.remove(this); break; } } @@ -487,14 +489,12 @@ public class UnorderedThreadPoolExecutor extends ThreadPoolExecutor { break; } - try { - if (task != null) { - queueHandler.polled(UnorderedThreadPoolExecutor.this, (IoEvent) task); - runTask(task); - } - } finally { - idleWorkers.incrementAndGet(); + if (task != null) { + queueHandler.polled(UnorderedThreadPoolExecutor.this, (IoEvent) task); + runTask(task); } + + idleWorkers.incrementAndGet(); } } finally { synchronized (workers) { diff --git a/mina-core/src/test/java/org/apache/mina/filter/executor/OrderedThreadPoolExecutorTest.java b/mina-core/src/test/java/org/apache/mina/filter/executor/OrderedThreadPoolExecutorTest.java new file mode 100644 index 0000000..de6c9d9 --- /dev/null +++ b/mina-core/src/test/java/org/apache/mina/filter/executor/OrderedThreadPoolExecutorTest.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.mina.filter.executor; + +import static org.junit.Assert.assertEquals; + +import java.lang.reflect.Field; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.mina.core.filterchain.IoFilter; +import org.apache.mina.core.session.DummySession; +import org.apache.mina.core.session.IdleStatus; +import org.apache.mina.core.session.IoSession; +import org.apache.mina.core.write.WriteRequest; +import org.junit.Test; + +/** + * Tests that verify the functionality provided by the implementation of + * {@link OrderedThreadPoolExecutor}. + * + * @author Guus der Kinderen, guus.der.kinde...@gmail.com + */ +public class OrderedThreadPoolExecutorTest +{ + /** + * Tests the state of {@link OrderedThreadPoolExecutor#idleWorkers} and {@link OrderedThreadPoolExecutor#workers} + * after a RuntimeException is thrown when the {@link OrderedThreadPoolExecutor.Worker} is running. + * + * Note that the implementation of this test is <em>not representative</em> of how tasks are normally executed, as + * tasks would ordinarily be 'wrapped' in a FilterChain. Most FilterChain implementations would catch the + * RuntimeException that is being used in the implementation of this test. The purpose of this test is to verify + * Worker's behavior when a RuntimeException is thrown during execution occurs (even if that RuntimeException cannot + * occur in the way that this test simulates it). A test that implements the execution in a more realistic manner is + * provided in {@link org.apache.mina.transport.socket.nio.DIRMINA1156Test}. + * + * @see org.apache.mina.transport.socket.nio.DIRMINA1156Test + * @see <a href="https://issues.apache.org/jira/browse/DIRMINA-1132">Issue DIRMINA-1156: Inconsistent worker / idleWorker in ThreadPoolExecutors</a> + */ + @Test + public void testRuntimeExceptionInWorkerRun() throws Throwable + { + // Set up test fixture. + int corePoolSize = 1; // Prevent an idle worker from being cleaned up, which would skew the results of this test. + OrderedThreadPoolExecutor executor = new OrderedThreadPoolExecutor(corePoolSize,1); + IoFilter.NextFilter nextFilter = new NextFilterAdapter() { + @Override + public void messageReceived(IoSession session, Object message) { + throw new RuntimeException("A RuntimeException thrown during unit testing."); + } + }; + DummySession session = new DummySession(); + ExecutorFilter filter = new ExecutorFilter(executor); + + try { + // Execute system under test. + filter.messageReceived(nextFilter, session, null); + + // Shutting down and awaiting termination ensures that test execution blocks until Worker execution has happened. + executor.shutdown(); + if (!executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS)) { + throw new IllegalStateException("Bug in test implementation."); + } + + // Verify results. + final Field idleWorkersField = OrderedThreadPoolExecutor.class.getDeclaredField("idleWorkers"); // Using reflection as the field is not accessible. It might be nicer to make the field package-protected for testing. + idleWorkersField.setAccessible(true); + final AtomicInteger idleWorkers = (AtomicInteger) idleWorkersField.get(executor); + assertEquals("After all tasks have finished, the amount of workers that are idle should equal the amount of workers, but did not.", executor.getPoolSize(), idleWorkers.get()); + } finally { + // Clean up test fixture. + if (!executor.isShutdown()) { + executor.shutdownNow(); + } + } + } + + /** + * Tests the state of {@link OrderedThreadPoolExecutor#idleWorkers} and {@link OrderedThreadPoolExecutor#workers} + * after an Error is thrown when the {@link OrderedThreadPoolExecutor.Worker} is running. + * + * Note that the implementation of this test is <em>not representative</em> of how tasks are normally executed, as + * tasks would ordinarily be 'wrapped' in a FilterChain. Most FilterChain implementations would catch the Error that + * is being used in the implementation of this test. The purpose of this test is to verify Worker's behavior when an + * Error is thrown during execution occurs (even if that Error cannot occur in the way that this test simulates it). + * A test that implements the execution in a more realistic manner is provided in + * {@link org.apache.mina.transport.socket.nio.DIRMINA1156Test}. + * + * @see org.apache.mina.transport.socket.nio.DIRMINA1156Test + * @see <a href="https://issues.apache.org/jira/browse/DIRMINA-1132">Issue DIRMINA-1156: Inconsistent worker / idleWorker in ThreadPoolExecutors</a> + */ + @Test + public void testErrorInWorkerRun() throws Throwable + { + // Set up test fixture. + int corePoolSize = 1; // Prevent an idle worker from being cleaned up, which would skew the results of this test. + OrderedThreadPoolExecutor executor = new OrderedThreadPoolExecutor(corePoolSize,1); + IoFilter.NextFilter nextFilter = new NextFilterAdapter() { + @Override + public void messageReceived(IoSession session, Object message) { + throw new Error("An Error thrown during unit testing."); + } + }; + DummySession session = new DummySession(); + ExecutorFilter filter = new ExecutorFilter(executor); + + try { + // Execute system under test. + filter.messageReceived(nextFilter, session, null); + + // Ensure that the task has been executed in the executor. + executor.shutdown(); // Shutting down and awaiting termination ensures that test execution blocks until Worker execution has happened. + if (!executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS)) { + throw new IllegalStateException("Bug in test implementation."); + } + + // Verify results. + final Field idleWorkersField = OrderedThreadPoolExecutor.class.getDeclaredField("idleWorkers"); // Using reflection as the field is not accessible. It might be nicer to make the field package-protected for testing. + idleWorkersField.setAccessible(true); + final AtomicInteger idleWorkers = (AtomicInteger) idleWorkersField.get(executor); + assertEquals("After all tasks have finished, the amount of workers that are idle should equal the amount of workers, but did not.", executor.getPoolSize(), idleWorkers.get()); + } finally { + // Clean up test fixture. + if (!executor.isShutdown()) { + executor.shutdownNow(); + } + } + } + + /** + * Empty implementation of IoFilter.NextFilterAdapter, intended to facilitate easy subclassing. + */ + private abstract static class NextFilterAdapter implements IoFilter.NextFilter { + public void sessionOpened(IoSession session) {} + public void sessionClosed(IoSession session) {} + public void sessionIdle(IoSession session, IdleStatus status) {} + public void exceptionCaught(IoSession session, Throwable cause) {} + public void inputClosed(IoSession session) {} + public void messageReceived(IoSession session, Object message) {} + public void messageSent(IoSession session, WriteRequest writeRequest) {} + public void filterWrite(IoSession session, WriteRequest writeRequest) {} + public void filterClose(IoSession session) {} + public void sessionCreated(IoSession session) {} + } +} diff --git a/mina-core/src/test/java/org/apache/mina/filter/executor/PriorityThreadPoolExecutorTest.java b/mina-core/src/test/java/org/apache/mina/filter/executor/PriorityThreadPoolExecutorTest.java new file mode 100644 index 0000000..97c97cd --- /dev/null +++ b/mina-core/src/test/java/org/apache/mina/filter/executor/PriorityThreadPoolExecutorTest.java @@ -0,0 +1,424 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.mina.filter.executor; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.mina.core.filterchain.IoFilter; +import org.apache.mina.core.session.DummySession; +import org.apache.mina.core.session.IdleStatus; +import org.apache.mina.core.session.IoSession; +import org.apache.mina.core.write.WriteRequest; +import org.junit.Ignore; +import org.junit.Test; + +/** + * Tests that verify the functionality provided by the implementation of + * {@link PriorityThreadPoolExecutor}. + * + * @author Guus der Kinderen, guus.der.kinde...@gmail.com + */ +public class PriorityThreadPoolExecutorTest { + /** + * Tests that verify the functionality provided by the implementation of + * {@link org.apache.mina.filter.executor.PriorityThreadPoolExecutor.SessionEntry} + * . + * + * This test asserts that, without a provided comparator, entries are + * considered equal, when they reference the same session. + */ + @Test + public void fifoEntryTestNoComparatorSameSession() throws Exception { + // Set up fixture. + IoSession session = new DummySession(); + PriorityThreadPoolExecutor.SessionEntry first = new PriorityThreadPoolExecutor.SessionEntry(session, null); + PriorityThreadPoolExecutor.SessionEntry last = new PriorityThreadPoolExecutor.SessionEntry(session, null); + + // Execute system under test. + int result = first.compareTo(last); + + // Verify results. + assertEquals("Without a comparator, entries of the same session are expected to be equal.", 0, result); + } + + /** + * Tests that verify the functionality provided by the implementation of + * {@link org.apache.mina.filter.executor.PriorityThreadPoolExecutor.SessionEntry} + * . + * + * This test asserts that, without a provided comparator, the first entry + * created is 'less than' an entry that is created later. + */ + @Test + public void fifoEntryTestNoComparatorDifferentSession() throws Exception { + // Set up fixture (the order in which the entries are created is + // relevant here!) + PriorityThreadPoolExecutor.SessionEntry first = new PriorityThreadPoolExecutor.SessionEntry(new DummySession(), null); + PriorityThreadPoolExecutor.SessionEntry last = new PriorityThreadPoolExecutor.SessionEntry(new DummySession(), null); + + // Execute system under test. + int result = first.compareTo(last); + + // Verify results. + assertTrue("Without a comparator, the first entry created should be the first entry out. Expected a negative result, instead, got: " + result, result < 0); + } + + /** + * Tests that verify the functionality provided by the implementation of + * {@link org.apache.mina.filter.executor.PriorityThreadPoolExecutor.SessionEntry} + * . + * + * This test asserts that, with a provided comparator, entries are + * considered equal, when they reference the same session (the provided + * comparator is ignored). + */ + @Test + public void fifoEntryTestWithComparatorSameSession() throws Exception { + // Set up fixture. + IoSession session = new DummySession(); + final int predeterminedResult = 3853; + + Comparator<IoSession> comparator = new Comparator<IoSession>() { + @Override + public int compare(IoSession o1, IoSession o2) { + return predeterminedResult; + } + }; + + PriorityThreadPoolExecutor.SessionEntry first = new PriorityThreadPoolExecutor.SessionEntry(session, comparator); + PriorityThreadPoolExecutor.SessionEntry last = new PriorityThreadPoolExecutor.SessionEntry(session, comparator); + + // Execute system under test. + int result = first.compareTo(last); + + // Verify results. + assertEquals("With a comparator, entries of the same session are expected to be equal.", 0, result); + } + + /** + * Tests that verify the functionality provided by the implementation of + * {@link org.apache.mina.filter.executor.PriorityThreadPoolExecutor.SessionEntry} + * . + * + * This test asserts that a provided comparator is used instead of the + * (fallback) default behavior (when entries are referring different + * sessions). + */ + @Test + public void fifoEntryTestComparatorDifferentSession() throws Exception { + // Set up fixture (the order in which the entries are created is + // relevant here!) + final int predeterminedResult = 3853; + + Comparator<IoSession> comparator = new Comparator<IoSession>() { + @Override + public int compare(IoSession o1, IoSession o2) { + return predeterminedResult; + } + }; + + PriorityThreadPoolExecutor.SessionEntry first = new PriorityThreadPoolExecutor.SessionEntry(new DummySession(), comparator); + PriorityThreadPoolExecutor.SessionEntry last = new PriorityThreadPoolExecutor.SessionEntry(new DummySession(), comparator); + + // Execute system under test. + int result = first.compareTo(last); + + // Verify results. + assertEquals("With a comparator, comparing entries of different sessions is expected to yield the comparator result.", predeterminedResult, result); + } + + /** + * Asserts that, when enough work is being submitted to the executor for it + * to start queuing work, prioritisation of work starts to occur. + * + * This implementation starts a number of sessions, and evenly distributes a + * number of messages to them. Processing each message is artificially made + * 'expensive', while the executor pool is kept small. This causes work to + * be queued in the executor. + * + * The executor that is used is configured to prefer one specific session. + * Each session records the timestamp of its last activity. After all work + * has been processed, the test asserts that the last activity of all + * sessions was later than the last activity of the preferred session. + */ + @Test + @Ignore("This test faiuls randomly") + public void testPrioritisation() throws Throwable { + // Set up fixture. + MockWorkFilter nextFilter = new MockWorkFilter(); + List<LastActivityTracker> sessions = new ArrayList<>(); + + for (int i = 0; i < 10; i++) { + sessions.add(new LastActivityTracker()); + } + + LastActivityTracker preferredSession = sessions.get(4); // prefer an arbitrary session + // (but not the first or last + // session, for good measure). + Comparator<IoSession> comparator = new UnfairComparator(preferredSession); + int maximumPoolSize = 1; // keep this low, to force resource contention. + int amountOfTasks = 400; + + ExecutorService executor = new PriorityThreadPoolExecutor(maximumPoolSize, comparator); + ExecutorFilter filter = new ExecutorFilter(executor); + + // Execute system under test. + for (int i = 0; i < amountOfTasks; i++) { + int sessionIndex = i % sessions.size(); + + LastActivityTracker currentSession = sessions.get(sessionIndex); + filter.messageReceived(nextFilter, currentSession, null); + + if (nextFilter.throwable != null) { + throw nextFilter.throwable; + } + } + + executor.shutdown(); + + // Verify results. + executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS); + + for (LastActivityTracker session : sessions) { + if (session != preferredSession) { + assertTrue("All other sessions should have finished later than the preferred session (but at least one did not).", + session.lastActivity > preferredSession.lastActivity); + } + } + } + + /** + * Tests the state of {@link PriorityThreadPoolExecutor#idleWorkers} and {@link PriorityThreadPoolExecutor#workers} + * after a RuntimeException is thrown when the {@link PriorityThreadPoolExecutor.Worker} is running. + * + * Note that the implementation of this test is <em>not representative</em> of how tasks are normally executed, as + * tasks would ordinarily be 'wrapped' in a FilterChain. Most FilterChain implementations would catch the + * RuntimeException that is being used in the implementation of this test. The purpose of this test is to verify + * Worker's behavior when a RuntimeException is thrown during execution occurs (even if that RuntimeException cannot + * occur in the way that this test simulates it). A test that implements the execution in a more realistic manner is + * provided in {@link org.apache.mina.transport.socket.nio.DIRMINA1156Test}. + * + * @see org.apache.mina.transport.socket.nio.DIRMINA1156Test + * @see <a href="https://issues.apache.org/jira/browse/DIRMINA-1132">Issue DIRMINA-1156: Inconsistent worker / idleWorker in ThreadPoolExecutors</a> + */ + @Test + public void testRuntimeExceptionInWorkerRun() throws Throwable + { + // Set up test fixture. + int corePoolSize = 1; // Prevent an idle worker from being cleaned up, which would skew the results of this test. + PriorityThreadPoolExecutor executor = new PriorityThreadPoolExecutor(corePoolSize,1); + IoFilter.NextFilter nextFilter = new PriorityThreadPoolExecutorTest.NextFilterAdapter() { + @Override + public void messageReceived(IoSession session, Object message) { + throw new RuntimeException("A RuntimeException thrown during unit testing."); + } + }; + DummySession session = new DummySession(); + ExecutorFilter filter = new ExecutorFilter(executor); + + try { + // Execute system under test. + filter.messageReceived(nextFilter, session, null); + + // Shutting down and awaiting termination ensures that test execution blocks until Worker execution has happened. + executor.shutdown(); + if (!executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS)) { + throw new IllegalStateException("Bug in test implementation."); + } + + // Verify results. + final Field idleWorkersField = PriorityThreadPoolExecutor.class.getDeclaredField("idleWorkers"); // Using reflection as the field is not accessible. It might be nicer to make the field package-protected for testing. + idleWorkersField.setAccessible(true); + final AtomicInteger idleWorkers = (AtomicInteger) idleWorkersField.get(executor); + assertEquals("After all tasks have finished, the amount of workers that are idle should equal the amount of workers, but did not.", executor.getPoolSize(), idleWorkers.get()); + } finally { + // Clean up test fixture. + if (!executor.isShutdown()) { + executor.shutdownNow(); + } + } + } + + /** + * Tests the state of {@link PriorityThreadPoolExecutor#idleWorkers} and {@link PriorityThreadPoolExecutor#workers} + * after an Error is thrown when the {@link PriorityThreadPoolExecutor.Worker} is running. + * + * Note that the implementation of this test is <em>not representative</em> of how tasks are normally executed, as + * tasks would ordinarily be 'wrapped' in a FilterChain. Most FilterChain implementations would catch the Error that + * is being used in the implementation of this test. The purpose of this test is to verify Worker's behavior when an + * Error is thrown during execution occurs (even if that Error cannot occur in the way that this test simulates it). + * A test that implements the execution in a more realistic manner is provided in + * {@link org.apache.mina.transport.socket.nio.DIRMINA1156Test}. + * + * @see org.apache.mina.transport.socket.nio.DIRMINA1156Test + * @see <a href="https://issues.apache.org/jira/browse/DIRMINA-1132">Issue DIRMINA-1156: Inconsistent worker / idleWorker in ThreadPoolExecutors</a> + */ + @Test + public void testErrorInWorkerRun() throws Throwable + { + // Set up test fixture. + int corePoolSize = 1; // Prevent an idle worker from being cleaned up, which would skew the results of this test. + PriorityThreadPoolExecutor executor = new PriorityThreadPoolExecutor(corePoolSize,1); + IoFilter.NextFilter nextFilter = new PriorityThreadPoolExecutorTest.NextFilterAdapter() { + @Override + public void messageReceived(IoSession session, Object message) { + throw new Error("An Error thrown during unit testing."); + } + }; + DummySession session = new DummySession(); + ExecutorFilter filter = new ExecutorFilter(executor); + + try { + // Execute system under test. + filter.messageReceived(nextFilter, session, null); + + // Ensure that the task has been executed in the executor. + executor.shutdown(); // Shutting down and awaiting termination ensures that test execution blocks until Worker execution has happened. + if (!executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS)) { + throw new IllegalStateException("Bug in test implementation."); + } + + // Verify results. + final Field idleWorkersField = PriorityThreadPoolExecutor.class.getDeclaredField("idleWorkers"); // Using reflection as the field is not accessible. It might be nicer to make the field package-protected for testing. + idleWorkersField.setAccessible(true); + final AtomicInteger idleWorkers = (AtomicInteger) idleWorkersField.get(executor); + assertEquals("After all tasks have finished, the amount of workers that are idle should equal the amount of workers, but did not.", executor.getPoolSize(), idleWorkers.get()); + } finally { + // Clean up test fixture. + if (!executor.isShutdown()) { + executor.shutdownNow(); + } + } + } + + /** + * A comparator that prefers a particular session. + */ + private static class UnfairComparator implements Comparator<IoSession> { + private IoSession preferred; + + public UnfairComparator(IoSession preferred) { + this.preferred = preferred; + } + + @Override + public int compare(IoSession o1, IoSession o2) { + if (o1 == preferred) { + return -1; + } + + if (o2 == preferred) { + return 1; + } + + return 0; + } + } + + /** + * A session that tracks the timestamp of last activity. + */ + private static class LastActivityTracker extends DummySession { + long lastActivity = System.currentTimeMillis(); + + public synchronized void setLastActivity() { + lastActivity = System.currentTimeMillis(); + } + } + + /** + * A filter that simulates a non-negligible amount of work. + */ + private static class MockWorkFilter implements IoFilter.NextFilter { + Throwable throwable; + + public void sessionOpened(IoSession session) { + // Do nothing + } + + public void sessionClosed(IoSession session) { + // Do nothing + } + + public void sessionIdle(IoSession session, IdleStatus status) { + // Do nothing + } + + public void exceptionCaught(IoSession session, Throwable cause) { + // Do nothing + } + + public void inputClosed(IoSession session) { + // Do nothing + } + + public void messageReceived(IoSession session, Object message) { + try { + Thread.sleep(20); // mimic work. + ((LastActivityTracker) session).setLastActivity(); + } catch (Exception e) { + if (this.throwable == null) { + this.throwable = e; + } + } + } + + public void messageSent(IoSession session, WriteRequest writeRequest) { + // Do nothing + } + + public void filterWrite(IoSession session, WriteRequest writeRequest) { + // Do nothing + } + + public void filterClose(IoSession session) { + // Do nothing + } + + public void sessionCreated(IoSession session) { + // Do nothing + } + } + + /** + * Empty implementation of IoFilter.NextFilterAdapter, intended to facilitate easy subclassing. + */ + private abstract static class NextFilterAdapter implements IoFilter.NextFilter { + public void sessionOpened(IoSession session) {} + public void sessionClosed(IoSession session) {} + public void sessionIdle(IoSession session, IdleStatus status) {} + public void exceptionCaught(IoSession session, Throwable cause) {} + public void inputClosed(IoSession session) {} + public void messageReceived(IoSession session, Object message) {} + public void messageSent(IoSession session, WriteRequest writeRequest) {} + public void filterWrite(IoSession session, WriteRequest writeRequest) {} + public void filterClose(IoSession session) {} + public void sessionCreated(IoSession session) {} + } +} diff --git a/mina-core/src/test/java/org/apache/mina/filter/executor/UnorderedThreadPoolExecutorTest.java b/mina-core/src/test/java/org/apache/mina/filter/executor/UnorderedThreadPoolExecutorTest.java new file mode 100644 index 0000000..22adcea --- /dev/null +++ b/mina-core/src/test/java/org/apache/mina/filter/executor/UnorderedThreadPoolExecutorTest.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.mina.filter.executor; + +import static org.junit.Assert.assertEquals; + +import java.lang.reflect.Field; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.mina.core.filterchain.IoFilter; +import org.apache.mina.core.session.DummySession; +import org.apache.mina.core.session.IdleStatus; +import org.apache.mina.core.session.IoSession; +import org.apache.mina.core.write.WriteRequest; +import org.junit.Test; + +/** + * Tests that verify the functionality provided by the implementation of + * {@link UnorderedThreadPoolExecutor}. + * + * @author Guus der Kinderen, guus.der.kinde...@gmail.com + */ +public class UnorderedThreadPoolExecutorTest +{ + /** + * Tests the state of {@link UnorderedThreadPoolExecutor#idleWorkers} and {@link UnorderedThreadPoolExecutor#workers} + * after a RuntimeException is thrown when the {@link UnorderedThreadPoolExecutor.Worker} is running. + * + * Note that the implementation of this test is <em>not representative</em> of how tasks are normally executed, as + * tasks would ordinarily be 'wrapped' in a FilterChain. Most FilterChain implementations would catch the + * RuntimeException that is being used in the implementation of this test. The purpose of this test is to verify + * Worker's behavior when a RuntimeException is thrown during execution occurs (even if that RuntimeException cannot + * occur in the way that this test simulates it). A test that implements the execution in a more realistic manner is + * provided in {@link org.apache.mina.transport.socket.nio.DIRMINA1156Test}. + * + * @see org.apache.mina.transport.socket.nio.DIRMINA1156Test + * @see <a href="https://issues.apache.org/jira/browse/DIRMINA-1132">Issue DIRMINA-1156: Inconsistent worker / idleWorker in ThreadPoolExecutors</a> + */ + @Test + public void testRuntimeExceptionInWorkerRun() throws Throwable + { + // Set up test fixture. + int corePoolSize = 1; // Prevent an idle worker from being cleaned up, which would skew the results of this test. + UnorderedThreadPoolExecutor executor = new UnorderedThreadPoolExecutor(corePoolSize,1); + IoFilter.NextFilter nextFilter = new NextFilterAdapter() { + @Override + public void messageReceived(IoSession session, Object message) { + throw new RuntimeException("A RuntimeException thrown during unit testing."); + } + }; + DummySession session = new DummySession(); + ExecutorFilter filter = new ExecutorFilter(executor); + + try { + // Execute system under test. + filter.messageReceived(nextFilter, session, null); + + // Shutting down and awaiting termination ensures that test execution blocks until Worker execution has happened. + executor.shutdown(); + if (!executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS)) { + throw new IllegalStateException("Bug in test implementation."); + } + + // Verify results. + final Field idleWorkersField = UnorderedThreadPoolExecutor.class.getDeclaredField("idleWorkers"); // Using reflection as the field is not accessible. It might be nicer to make the field package-protected for testing. + idleWorkersField.setAccessible(true); + final AtomicInteger idleWorkers = (AtomicInteger) idleWorkersField.get(executor); + assertEquals("After all tasks have finished, the amount of workers that are idle should equal the amount of workers, but did not.", executor.getPoolSize(), idleWorkers.get()); + } finally { + // Clean up test fixture. + if (!executor.isShutdown()) { + executor.shutdownNow(); + } + } + } + + /** + * Tests the state of {@link UnorderedThreadPoolExecutor#idleWorkers} and {@link UnorderedThreadPoolExecutor#workers} + * after an Error is thrown when the {@link UnorderedThreadPoolExecutor.Worker} is running. + * + * Note that the implementation of this test is <em>not representative</em> of how tasks are normally executed, as + * tasks would ordinarily be 'wrapped' in a FilterChain. Most FilterChain implementations would catch the Error that + * is being used in the implementation of this test. The purpose of this test is to verify Worker's behavior when an + * Error is thrown during execution occurs (even if that Error cannot occur in the way that this test simulates it). + * A test that implements the execution in a more realistic manner is provided in + * {@link org.apache.mina.transport.socket.nio.DIRMINA1156Test}. + * + * @see org.apache.mina.transport.socket.nio.DIRMINA1156Test + * @see <a href="https://issues.apache.org/jira/browse/DIRMINA-1132">Issue DIRMINA-1156: Inconsistent worker / idleWorker in ThreadPoolExecutors</a> + */ + @Test + public void testErrorInWorkerRun() throws Throwable + { + // Set up test fixture. + int corePoolSize = 1; // Prevent an idle worker from being cleaned up, which would skew the results of this test. + UnorderedThreadPoolExecutor executor = new UnorderedThreadPoolExecutor(corePoolSize,1); + IoFilter.NextFilter nextFilter = new NextFilterAdapter() { + @Override + public void messageReceived(IoSession session, Object message) { + throw new Error("An Error thrown during unit testing."); + } + }; + DummySession session = new DummySession(); + ExecutorFilter filter = new ExecutorFilter(executor); + + try { + // Execute system under test. + filter.messageReceived(nextFilter, session, null); + + // Ensure that the task has been executed in the executor. + executor.shutdown(); // Shutting down and awaiting termination ensures that test execution blocks until Worker execution has happened. + if (!executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS)) { + throw new IllegalStateException("Bug in test implementation."); + } + + // Verify results. + final Field idleWorkersField = UnorderedThreadPoolExecutor.class.getDeclaredField("idleWorkers"); // Using reflection as the field is not accessible. It might be nicer to make the field package-protected for testing. + idleWorkersField.setAccessible(true); + final AtomicInteger idleWorkers = (AtomicInteger) idleWorkersField.get(executor); + assertEquals("After all tasks have finished, the amount of workers that are idle should equal the amount of workers, but did not.", executor.getPoolSize(), idleWorkers.get()); + } finally { + // Clean up test fixture. + if (!executor.isShutdown()) { + executor.shutdownNow(); + } + } + } + + /** + * Empty implementation of IoFilter.NextFilterAdapter, intended to facilitate easy subclassing. + */ + private abstract static class NextFilterAdapter implements IoFilter.NextFilter { + public void sessionOpened(IoSession session) {} + public void sessionClosed(IoSession session) {} + public void sessionIdle(IoSession session, IdleStatus status) {} + public void exceptionCaught(IoSession session, Throwable cause) {} + public void inputClosed(IoSession session) {} + public void messageReceived(IoSession session, Object message) {} + public void messageSent(IoSession session, WriteRequest writeRequest) {} + public void filterWrite(IoSession session, WriteRequest writeRequest) {} + public void filterClose(IoSession session) {} + public void sessionCreated(IoSession session) {} + } +} diff --git a/mina-core/src/test/java/org/apache/mina/handler/DIRMINA1156Test.java b/mina-core/src/test/java/org/apache/mina/handler/DIRMINA1156Test.java new file mode 100644 index 0000000..40b7073 --- /dev/null +++ b/mina-core/src/test/java/org/apache/mina/handler/DIRMINA1156Test.java @@ -0,0 +1,465 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.mina.handler; + +import static org.junit.Assert.assertEquals; + +import java.lang.reflect.Field; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.mina.core.filterchain.IoFilterChain; +import org.apache.mina.core.service.IoHandlerAdapter; +import org.apache.mina.core.session.DummySession; +import org.apache.mina.core.session.IoSession; +import org.apache.mina.filter.executor.ExecutorFilter; +import org.apache.mina.filter.executor.OrderedThreadPoolExecutor; +import org.apache.mina.filter.executor.PriorityThreadPoolExecutor; +import org.apache.mina.filter.executor.UnorderedThreadPoolExecutor; +import org.junit.Test; + +/** + * Tests that reproduces a bug as described in issue DIRMINA-1156 + * + * @author Guus der Kinderen, guus.der.kinde...@gmail.com + * @see <a href="https://issues.apache.org/jira/browse/DIRMINA-1156">DIRMINA-1156</a> + */ +public class DIRMINA1156Test +{ + /** + * Tests the state of {@link OrderedThreadPoolExecutor#idleWorkers} and {@link OrderedThreadPoolExecutor#workers} + * after an {@link Error} is thrown by a session's handler that was invoked through an OrderedThreadPoolExecutor. + */ + @Test + public void testOrderedThreadPoolExecutorSessionHandlerThrowingError() throws Exception + { + // Set up test fixture. + final boolean[] filterTriggered = {false}; // Used to verify the implementation of this test (to see if the Handler is invoked at all). + int corePoolSize = 1; // Prevent an idle worker from being cleaned up, which would skew the results of this test. + DummySession session = new DummySession(); + IoFilterChain chain = session.getFilterChain(); + OrderedThreadPoolExecutor executor = new OrderedThreadPoolExecutor(corePoolSize,1); + chain.addLast("executor", new ExecutorFilter(executor)); + session.setHandler( new IoHandlerAdapter() { + @Override + public void messageReceived(IoSession session, Object message) throws Exception { + filterTriggered[0] = true; + throw new Error("An Error thrown during unit testing."); + } + }); + + // Execute system under test. + try { + chain.fireMessageReceived("foo"); + + // Shutting down and awaiting termination ensures that test execution blocks until Handler invocation has happened. + executor.shutdown(); + executor.awaitTermination(10, TimeUnit.SECONDS); + if (!filterTriggered[0]) { + throw new IllegalStateException("Bug in test implementation: the session handler was never invoked."); + } + + // Verify results. + final Field idleWorkersField = OrderedThreadPoolExecutor.class.getDeclaredField("idleWorkers"); // Using reflection as the field is not accessible. It might be nicer to make the field package-protected for testing. + idleWorkersField.setAccessible(true); + final AtomicInteger idleWorkers = (AtomicInteger) idleWorkersField.get(executor); + assertEquals("After all tasks have finished, the amount of workers that are idle should equal the amount of workers, but did not.", executor.getPoolSize(), idleWorkers.get()); + } finally { + // Clean up test fixture. + if (!executor.isShutdown()) { + executor.shutdownNow(); + } + } + } + + /** + * Tests the state of {@link OrderedThreadPoolExecutor#idleWorkers} and {@link OrderedThreadPoolExecutor#workers} + * after a {@link RuntimeException} is thrown by a session's handler that was invoked through an + * OrderedThreadPoolExecutor. + */ + @Test + public void testOrderedThreadPoolExecutorSessionHandlerThrowingRuntimeException() throws Exception + { + // Set up test fixture. + final boolean[] filterTriggered = {false}; // Used to verify the implementation of this test (to see if the Handler is invoked at all). + int corePoolSize = 1; // Prevent an idle worker from being cleaned up, which would skew the results of this test. + DummySession session = new DummySession(); + IoFilterChain chain = session.getFilterChain(); + OrderedThreadPoolExecutor executor = new OrderedThreadPoolExecutor(corePoolSize,1); + chain.addLast("executor", new ExecutorFilter(executor)); + session.setHandler( new IoHandlerAdapter() { + @Override + public void messageReceived(IoSession session, Object message) throws Exception { + filterTriggered[0] = true; + throw new RuntimeException("A RuntimeException thrown during unit testing."); + } + }); + + // Execute system under test. + try { + chain.fireMessageReceived("foo"); + + // Shutting down and awaiting termination ensures that test execution blocks until Handler invocation has happened. + executor.shutdown(); + executor.awaitTermination(10, TimeUnit.SECONDS); + if (!filterTriggered[0]) { + throw new IllegalStateException("Bug in test implementation: the session handler was never invoked."); + } + + // Verify results. + final Field idleWorkersField = OrderedThreadPoolExecutor.class.getDeclaredField("idleWorkers"); // Using reflection as the field is not accessible. It might be nicer to make the field package-protected for testing. + idleWorkersField.setAccessible(true); + final AtomicInteger idleWorkers = (AtomicInteger) idleWorkersField.get(executor); + assertEquals("After all tasks have finished, the amount of workers that are idle should equal the amount of workers, but did not.", executor.getPoolSize(), idleWorkers.get()); + } finally { + // Clean up test fixture. + if (!executor.isShutdown()) { + executor.shutdownNow(); + } + } + } + + /** + * Tests the state of {@link OrderedThreadPoolExecutor#idleWorkers} and {@link OrderedThreadPoolExecutor#workers} + * after a (checked) {@link Exception} is thrown by a session's handler that was invoked through an + * OrderedThreadPoolExecutor. + */ + @Test + public void testOrderedThreadPoolExecutorSessionHandlerThrowingCheckedException() throws Exception + { + // Set up test fixture. + final boolean[] filterTriggered = {false}; // Used to verify the implementation of this test (to see if the Handler is invoked at all). + int corePoolSize = 1; // Prevent an idle worker from being cleaned up, which would skew the results of this test. + DummySession session = new DummySession(); + IoFilterChain chain = session.getFilterChain(); + OrderedThreadPoolExecutor executor = new OrderedThreadPoolExecutor(corePoolSize,1); + chain.addLast("executor", new ExecutorFilter(executor)); + session.setHandler( new IoHandlerAdapter() { + @Override + public void messageReceived(IoSession session, Object message) throws Exception { + filterTriggered[0] = true; + throw new Exception("A (checked) Exception thrown during unit testing."); + } + }); + + // Execute system under test. + try { + chain.fireMessageReceived("foo"); + + // Shutting down and awaiting termination ensures that test execution blocks until Handler invocation has happened. + executor.shutdown(); + executor.awaitTermination(10, TimeUnit.SECONDS); + if (!filterTriggered[0]) { + throw new IllegalStateException("Bug in test implementation: the session handler was never invoked."); + } + + // Verify results. + final Field idleWorkersField = OrderedThreadPoolExecutor.class.getDeclaredField("idleWorkers"); // Using reflection as the field is not accessible. It might be nicer to make the field package-protected for testing. + idleWorkersField.setAccessible(true); + final AtomicInteger idleWorkers = (AtomicInteger) idleWorkersField.get(executor); + assertEquals("After all tasks have finished, the amount of workers that are idle should equal the amount of workers, but did not.", executor.getPoolSize(), idleWorkers.get()); + } finally { + // Clean up test fixture. + if (!executor.isShutdown()) { + executor.shutdownNow(); + } + } + } + + /** + * Tests the state of {@link UnorderedThreadPoolExecutor#idleWorkers} and {@link UnorderedThreadPoolExecutor#workers} + * after an {@link Error} is thrown by a session's handler that was invoked through an UnorderedThreadPoolExecutor. + */ + @Test + public void testUnorderedThreadPoolExecutorSessionHandlerThrowingError() throws Exception + { + // Set up test fixture. + final boolean[] filterTriggered = {false}; // Used to verify the implementation of this test (to see if the Handler is invoked at all). + int corePoolSize = 1; // Prevent an idle worker from being cleaned up, which would skew the results of this test. + DummySession session = new DummySession(); + IoFilterChain chain = session.getFilterChain(); + UnorderedThreadPoolExecutor executor = new UnorderedThreadPoolExecutor(corePoolSize,1); + chain.addLast("executor", new ExecutorFilter(executor)); + session.setHandler( new IoHandlerAdapter() { + @Override + public void messageReceived(IoSession session, Object message) throws Exception { + filterTriggered[0] = true; + throw new Error("An Error thrown during unit testing."); + } + }); + + // Execute system under test. + try { + chain.fireMessageReceived("foo"); + + // Shutting down and awaiting termination ensures that test execution blocks until Handler invocation has happened. + executor.shutdown(); + executor.awaitTermination(10, TimeUnit.SECONDS); + if (!filterTriggered[0]) { + throw new IllegalStateException("Bug in test implementation: the session handler was never invoked."); + } + + // Verify results. + final Field idleWorkersField = UnorderedThreadPoolExecutor.class.getDeclaredField("idleWorkers"); // Using reflection as the field is not accessible. It might be nicer to make the field package-protected for testing. + idleWorkersField.setAccessible(true); + final AtomicInteger idleWorkers = (AtomicInteger) idleWorkersField.get(executor); + assertEquals("After all tasks have finished, the amount of workers that are idle should equal the amount of workers, but did not.", executor.getPoolSize(), idleWorkers.get()); + } finally { + // Clean up test fixture. + if (!executor.isShutdown()) { + executor.shutdownNow(); + } + } + } + + /** + * Tests the state of {@link UnorderedThreadPoolExecutor#idleWorkers} and {@link UnorderedThreadPoolExecutor#workers} + * after a {@link RuntimeException} is thrown by a session's handler that was invoked through an + * UnorderedThreadPoolExecutor. + */ + @Test + public void testUnorderedThreadPoolExecutorSessionHandlerThrowingRuntimeException() throws Exception + { + // Set up test fixture. + final boolean[] filterTriggered = {false}; // Used to verify the implementation of this test (to see if the Handler is invoked at all). + int corePoolSize = 1; // Prevent an idle worker from being cleaned up, which would skew the results of this test. + DummySession session = new DummySession(); + IoFilterChain chain = session.getFilterChain(); + UnorderedThreadPoolExecutor executor = new UnorderedThreadPoolExecutor(corePoolSize,1); + chain.addLast("executor", new ExecutorFilter(executor)); + session.setHandler( new IoHandlerAdapter() { + @Override + public void messageReceived(IoSession session, Object message) throws Exception { + filterTriggered[0] = true; + throw new RuntimeException("A RuntimeException thrown during unit testing."); + } + }); + + // Execute system under test. + try { + chain.fireMessageReceived("foo"); + + // Shutting down and awaiting termination ensures that test execution blocks until Handler invocation has happened. + executor.shutdown(); + executor.awaitTermination(10, TimeUnit.SECONDS); + if (!filterTriggered[0]) { + throw new IllegalStateException("Bug in test implementation: the session handler was never invoked."); + } + + // Verify results. + final Field idleWorkersField = UnorderedThreadPoolExecutor.class.getDeclaredField("idleWorkers"); // Using reflection as the field is not accessible. It might be nicer to make the field package-protected for testing. + idleWorkersField.setAccessible(true); + final AtomicInteger idleWorkers = (AtomicInteger) idleWorkersField.get(executor); + assertEquals("After all tasks have finished, the amount of workers that are idle should equal the amount of workers, but did not.", executor.getPoolSize(), idleWorkers.get()); + } finally { + // Clean up test fixture. + if (!executor.isShutdown()) { + executor.shutdownNow(); + } + } + } + + /** + * Tests the state of {@link UnorderedThreadPoolExecutor#idleWorkers} and {@link UnorderedThreadPoolExecutor#workers} + * after a (checked) {@link Exception} is thrown by a session's handler that was invoked through an + * UnorderedThreadPoolExecutor. + */ + @Test + public void testUnorderedThreadPoolExecutorSessionHandlerThrowingCheckedException() throws Exception + { + // Set up test fixture. + final boolean[] filterTriggered = {false}; // Used to verify the implementation of this test (to see if the Handler is invoked at all). + int corePoolSize = 1; // Prevent an idle worker from being cleaned up, which would skew the results of this test. + DummySession session = new DummySession(); + IoFilterChain chain = session.getFilterChain(); + UnorderedThreadPoolExecutor executor = new UnorderedThreadPoolExecutor(corePoolSize,1); + chain.addLast("executor", new ExecutorFilter(executor)); + session.setHandler( new IoHandlerAdapter() { + @Override + public void messageReceived(IoSession session, Object message) throws Exception { + filterTriggered[0] = true; + throw new Exception("A (checked) Exception thrown during unit testing."); + } + }); + + // Execute system under test. + try { + chain.fireMessageReceived("foo"); + + // Shutting down and awaiting termination ensures that test execution blocks until Handler invocation has happened. + executor.shutdown(); + executor.awaitTermination(10, TimeUnit.SECONDS); + if (!filterTriggered[0]) { + throw new IllegalStateException("Bug in test implementation: the session handler was never invoked."); + } + + // Verify results. + final Field idleWorkersField = UnorderedThreadPoolExecutor.class.getDeclaredField("idleWorkers"); // Using reflection as the field is not accessible. It might be nicer to make the field package-protected for testing. + idleWorkersField.setAccessible(true); + final AtomicInteger idleWorkers = (AtomicInteger) idleWorkersField.get(executor); + assertEquals("After all tasks have finished, the amount of workers that are idle should equal the amount of workers, but did not.", executor.getPoolSize(), idleWorkers.get()); + } finally { + // Clean up test fixture. + if (!executor.isShutdown()) { + executor.shutdownNow(); + } + } + } + + /** + * Tests the state of {@link PriorityThreadPoolExecutor#idleWorkers} and {@link PriorityThreadPoolExecutor#workers} + * after an {@link Error} is thrown by a session's handler that was invoked through an PriorityThreadPoolExecutor. + */ + @Test + public void testPriorityThreadPoolExecutorSessionHandlerThrowingError() throws Exception + { + // Set up test fixture. + final boolean[] filterTriggered = {false}; // Used to verify the implementation of this test (to see if the Handler is invoked at all). + int corePoolSize = 1; // Prevent an idle worker from being cleaned up, which would skew the results of this test. + DummySession session = new DummySession(); + IoFilterChain chain = session.getFilterChain(); + PriorityThreadPoolExecutor executor = new PriorityThreadPoolExecutor(corePoolSize,1); + chain.addLast("executor", new ExecutorFilter(executor)); + session.setHandler( new IoHandlerAdapter() { + @Override + public void messageReceived(IoSession session, Object message) throws Exception { + filterTriggered[0] = true; + throw new Error("An Error thrown during unit testing."); + } + }); + + // Execute system under test. + try { + chain.fireMessageReceived("foo"); + + // Shutting down and awaiting termination ensures that test execution blocks until Handler invocation has happened. + executor.shutdown(); + executor.awaitTermination(10, TimeUnit.SECONDS); + if (!filterTriggered[0]) { + throw new IllegalStateException("Bug in test implementation: the session handler was never invoked."); + } + + // Verify results. + final Field idleWorkersField = PriorityThreadPoolExecutor.class.getDeclaredField("idleWorkers"); // Using reflection as the field is not accessible. It might be nicer to make the field package-protected for testing. + idleWorkersField.setAccessible(true); + final AtomicInteger idleWorkers = (AtomicInteger) idleWorkersField.get(executor); + assertEquals("After all tasks have finished, the amount of workers that are idle should equal the amount of workers, but did not.", executor.getPoolSize(), idleWorkers.get()); + } finally { + // Clean up test fixture. + if (!executor.isShutdown()) { + executor.shutdownNow(); + } + } + } + + /** + * Tests the state of {@link PriorityThreadPoolExecutor#idleWorkers} and {@link PriorityThreadPoolExecutor#workers} + * after a {@link RuntimeException} is thrown by a session's handler that was invoked through an + * PriorityThreadPoolExecutor. + */ + @Test + public void testPriorityThreadPoolExecutorSessionHandlerThrowingRuntimeException() throws Exception + { + // Set up test fixture. + final boolean[] filterTriggered = {false}; // Used to verify the implementation of this test (to see if the Handler is invoked at all). + int corePoolSize = 1; // Prevent an idle worker from being cleaned up, which would skew the results of this test. + DummySession session = new DummySession(); + IoFilterChain chain = session.getFilterChain(); + PriorityThreadPoolExecutor executor = new PriorityThreadPoolExecutor(corePoolSize,1); + chain.addLast("executor", new ExecutorFilter(executor)); + session.setHandler( new IoHandlerAdapter() { + @Override + public void messageReceived(IoSession session, Object message) throws Exception { + filterTriggered[0] = true; + throw new RuntimeException("A RuntimeException thrown during unit testing."); + } + }); + + // Execute system under test. + try { + chain.fireMessageReceived("foo"); + + // Shutting down and awaiting termination ensures that test execution blocks until Handler invocation has happened. + executor.shutdown(); + executor.awaitTermination(10, TimeUnit.SECONDS); + if (!filterTriggered[0]) { + throw new IllegalStateException("Bug in test implementation: the session handler was never invoked."); + } + + // Verify results. + final Field idleWorkersField = PriorityThreadPoolExecutor.class.getDeclaredField("idleWorkers"); // Using reflection as the field is not accessible. It might be nicer to make the field package-protected for testing. + idleWorkersField.setAccessible(true); + final AtomicInteger idleWorkers = (AtomicInteger) idleWorkersField.get(executor); + assertEquals("After all tasks have finished, the amount of workers that are idle should equal the amount of workers, but did not.", executor.getPoolSize(), idleWorkers.get()); + } finally { + // Clean up test fixture. + if (!executor.isShutdown()) { + executor.shutdownNow(); + } + } + } + + /** + * Tests the state of {@link PriorityThreadPoolExecutor#idleWorkers} and {@link PriorityThreadPoolExecutor#workers} + * after a (checked) {@link Exception} is thrown by a session's handler that was invoked through an + * PriorityThreadPoolExecutor. + */ + @Test + public void testPriorityThreadPoolExecutorSessionHandlerThrowingCheckedException() throws Exception + { + // Set up test fixture. + final boolean[] filterTriggered = {false}; // Used to verify the implementation of this test (to see if the Handler is invoked at all). + int corePoolSize = 1; // Prevent an idle worker from being cleaned up, which would skew the results of this test. + DummySession session = new DummySession(); + IoFilterChain chain = session.getFilterChain(); + PriorityThreadPoolExecutor executor = new PriorityThreadPoolExecutor(corePoolSize,1); + chain.addLast("executor", new ExecutorFilter(executor)); + session.setHandler( new IoHandlerAdapter() { + @Override + public void messageReceived(IoSession session, Object message) throws Exception { + filterTriggered[0] = true; + throw new Exception("A (checked) Exception thrown during unit testing."); + } + }); + + // Execute system under test. + try { + chain.fireMessageReceived("foo"); + + // Shutting down and awaiting termination ensures that test execution blocks until Handler invocation has happened. + executor.shutdown(); + executor.awaitTermination(10, TimeUnit.SECONDS); + if (!filterTriggered[0]) { + throw new IllegalStateException("Bug in test implementation: the session handler was never invoked."); + } + + // Verify results. + final Field idleWorkersField = PriorityThreadPoolExecutor.class.getDeclaredField("idleWorkers"); // Using reflection as the field is not accessible. It might be nicer to make the field package-protected for testing. + idleWorkersField.setAccessible(true); + final AtomicInteger idleWorkers = (AtomicInteger) idleWorkersField.get(executor); + assertEquals("After all tasks have finished, the amount of workers that are idle should equal the amount of workers, but did not.", executor.getPoolSize(), idleWorkers.get()); + } finally { + // Clean up test fixture. + if (!executor.isShutdown()) { + executor.shutdownNow(); + } + } + } +}