This is an automated email from the ASF dual-hosted git repository.

elecharny pushed a commit to branch 2.0.X
in repository https://gitbox.apache.org/repos/asf/mina.git


The following commit(s) were added to refs/heads/2.0.X by this push:
     new 4de9c94  Backported the fix for DIRMINA-1156
4de9c94 is described below

commit 4de9c942c8e34ef864d02ce6568dc23978548862
Author: emmanuel lecharny <elecha...@apache.org>
AuthorDate: Wed Jan 12 23:47:01 2022 +0100

    Backported the fix for DIRMINA-1156
---
 .../filter/executor/OrderedThreadPoolExecutor.java |  15 +-
 .../executor/PriorityThreadPoolExecutor.java       |  44 +-
 .../executor/UnorderedThreadPoolExecutor.java      |  20 +-
 .../executor/OrderedThreadPoolExecutorTest.java    | 162 +++++++
 .../executor/PriorityThreadPoolExecutorTest.java   | 424 +++++++++++++++++++
 .../executor/UnorderedThreadPoolExecutorTest.java  | 162 +++++++
 .../org/apache/mina/handler/DIRMINA1156Test.java   | 465 +++++++++++++++++++++
 7 files changed, 1258 insertions(+), 34 deletions(-)

diff --git 
a/mina-core/src/main/java/org/apache/mina/filter/executor/OrderedThreadPoolExecutor.java
 
b/mina-core/src/main/java/org/apache/mina/filter/executor/OrderedThreadPoolExecutor.java
index 7378956..841a584 100644
--- 
a/mina-core/src/main/java/org/apache/mina/filter/executor/OrderedThreadPoolExecutor.java
+++ 
b/mina-core/src/main/java/org/apache/mina/filter/executor/OrderedThreadPoolExecutor.java
@@ -255,12 +255,13 @@ public class OrderedThreadPoolExecutor extends 
ThreadPoolExecutor {
             Worker worker = new Worker();
             Thread thread = getThreadFactory().newThread(worker);
 
+            workers.add(worker);
+
             // As we have added a new thread, it's considered as idle.
             idleWorkers.incrementAndGet();
 
             // Now, we can start it.
             thread.start();
-            workers.add(worker);
 
             if (workers.size() > largestPoolSize) {
                 largestPoolSize = workers.size();
@@ -681,8 +682,6 @@ public class OrderedThreadPoolExecutor extends 
ThreadPoolExecutor {
                     if (session == null) {
                         synchronized (workers) {
                             if (workers.size() > getCorePoolSize()) {
-                                // Remove now to prevent duplicate exit.
-                                workers.remove(this);
                                 break;
                             }
                         }
@@ -692,13 +691,11 @@ public class OrderedThreadPoolExecutor extends 
ThreadPoolExecutor {
                         break;
                     }
 
-                    try {
-                        if (session != null) {
-                            runTasks(getSessionTasksQueue(session));
-                        }
-                    } finally {
-                        idleWorkers.incrementAndGet();
+                    if (session != null) {
+                        runTasks(getSessionTasksQueue(session));
                     }
+
+                    idleWorkers.incrementAndGet();
                 }
             } finally {
                 synchronized (workers) {
diff --git 
a/mina-core/src/main/java/org/apache/mina/filter/executor/PriorityThreadPoolExecutor.java
 
b/mina-core/src/main/java/org/apache/mina/filter/executor/PriorityThreadPoolExecutor.java
index 721005c..43ace10 100644
--- 
a/mina-core/src/main/java/org/apache/mina/filter/executor/PriorityThreadPoolExecutor.java
+++ 
b/mina-core/src/main/java/org/apache/mina/filter/executor/PriorityThreadPoolExecutor.java
@@ -19,15 +19,32 @@
  */
 package org.apache.mina.filter.executor;
 
-import org.apache.mina.core.session.*;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.*;
-import java.util.concurrent.*;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.mina.core.session.AttributeKey;
+import org.apache.mina.core.session.DummySession;
+import org.apache.mina.core.session.IoEvent;
+import org.apache.mina.core.session.IoSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 /**
  * A {@link ThreadPoolExecutor} that maintains the order of {@link IoEvent}s
  * within a session (similar to {@link OrderedThreadPoolExecutor}) and allows
@@ -304,12 +321,13 @@ public class PriorityThreadPoolExecutor extends 
ThreadPoolExecutor {
             Worker worker = new Worker();
             Thread thread = getThreadFactory().newThread(worker);
 
+            workers.add(worker);
+
             // As we have added a new thread, it's considered as idle.
             idleWorkers.incrementAndGet();
 
             // Now, we can start it.
             thread.start();
-            workers.add(worker);
 
             if (workers.size() > largestPoolSize) {
                 largestPoolSize = workers.size();
@@ -730,8 +748,6 @@ public class PriorityThreadPoolExecutor extends 
ThreadPoolExecutor {
                     if (session == null) {
                         synchronized (workers) {
                             if (workers.size() > getCorePoolSize()) {
-                                // Remove now to prevent duplicate exit.
-                                workers.remove(this);
                                 break;
                             }
                         }
@@ -741,13 +757,11 @@ public class PriorityThreadPoolExecutor extends 
ThreadPoolExecutor {
                         break;
                     }
 
-                    try {
-                        if (session != null) {
-                            runTasks(getSessionTasksQueue(session));
-                        }
-                    } finally {
-                        idleWorkers.incrementAndGet();
+                    if (session != null) {
+                        runTasks(getSessionTasksQueue(session));
                     }
+
+                    idleWorkers.incrementAndGet();
                 }
             } finally {
                 synchronized (workers) {
diff --git 
a/mina-core/src/main/java/org/apache/mina/filter/executor/UnorderedThreadPoolExecutor.java
 
b/mina-core/src/main/java/org/apache/mina/filter/executor/UnorderedThreadPoolExecutor.java
index 3136492..813f857 100644
--- 
a/mina-core/src/main/java/org/apache/mina/filter/executor/UnorderedThreadPoolExecutor.java
+++ 
b/mina-core/src/main/java/org/apache/mina/filter/executor/UnorderedThreadPoolExecutor.java
@@ -198,9 +198,13 @@ public class UnorderedThreadPoolExecutor extends 
ThreadPoolExecutor {
 
             Worker worker = new Worker();
             Thread thread = getThreadFactory().newThread(worker);
+            workers.add(worker);
+
+            // As we have added a new thread, it's considered as idle.
             idleWorkers.incrementAndGet();
+
+            // Now, we can start it.
             thread.start();
-            workers.add(worker);
 
             if (workers.size() > largestPoolSize) {
                 largestPoolSize = workers.size();
@@ -476,8 +480,6 @@ public class UnorderedThreadPoolExecutor extends 
ThreadPoolExecutor {
                     if (task == null) {
                         synchronized (workers) {
                             if (workers.size() > corePoolSize) {
-                                // Remove now to prevent duplicate exit.
-                                workers.remove(this);
                                 break;
                             }
                         }
@@ -487,14 +489,12 @@ public class UnorderedThreadPoolExecutor extends 
ThreadPoolExecutor {
                         break;
                     }
 
-                    try {
-                        if (task != null) {
-                            
queueHandler.polled(UnorderedThreadPoolExecutor.this, (IoEvent) task);
-                            runTask(task);
-                        }
-                    } finally {
-                        idleWorkers.incrementAndGet();
+                    if (task != null) {
+                        queueHandler.polled(UnorderedThreadPoolExecutor.this, 
(IoEvent) task);
+                        runTask(task);
                     }
+
+                    idleWorkers.incrementAndGet();
                 }
             } finally {
                 synchronized (workers) {
diff --git 
a/mina-core/src/test/java/org/apache/mina/filter/executor/OrderedThreadPoolExecutorTest.java
 
b/mina-core/src/test/java/org/apache/mina/filter/executor/OrderedThreadPoolExecutorTest.java
new file mode 100644
index 0000000..de6c9d9
--- /dev/null
+++ 
b/mina-core/src/test/java/org/apache/mina/filter/executor/OrderedThreadPoolExecutorTest.java
@@ -0,0 +1,162 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ *
+ */
+package org.apache.mina.filter.executor;
+
+import static org.junit.Assert.assertEquals;
+
+import java.lang.reflect.Field;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.mina.core.filterchain.IoFilter;
+import org.apache.mina.core.session.DummySession;
+import org.apache.mina.core.session.IdleStatus;
+import org.apache.mina.core.session.IoSession;
+import org.apache.mina.core.write.WriteRequest;
+import org.junit.Test;
+
+/**
+ * Tests that verify the functionality provided by the implementation of
+ * {@link OrderedThreadPoolExecutor}.
+ *
+ * @author Guus der Kinderen, guus.der.kinde...@gmail.com
+ */
+public class OrderedThreadPoolExecutorTest
+{
+    /**
+     * Tests the state of {@link OrderedThreadPoolExecutor#idleWorkers} and 
{@link OrderedThreadPoolExecutor#workers}
+     * after a RuntimeException is thrown when the {@link 
OrderedThreadPoolExecutor.Worker} is running.
+     *
+     * Note that the implementation of this test is <em>not 
representative</em> of how tasks are normally executed, as
+     * tasks would ordinarily be 'wrapped' in a FilterChain. Most FilterChain 
implementations would catch the
+     * RuntimeException that is being used in the implementation of this test. 
The purpose of this test is to verify
+     * Worker's behavior when a RuntimeException is thrown during execution 
occurs (even if that RuntimeException cannot
+     * occur in the way that this test simulates it). A test that implements 
the execution in a more realistic manner is
+     * provided in {@link 
org.apache.mina.transport.socket.nio.DIRMINA1156Test}.
+     *
+     * @see org.apache.mina.transport.socket.nio.DIRMINA1156Test
+     * @see <a href="https://issues.apache.org/jira/browse/DIRMINA-1132";>Issue 
DIRMINA-1156: Inconsistent worker / idleWorker in ThreadPoolExecutors</a>
+     */
+    @Test
+    public void testRuntimeExceptionInWorkerRun() throws Throwable
+    {
+        // Set up test fixture.
+        int corePoolSize = 1; // Prevent an idle worker from being cleaned up, 
which would skew the results of this test.
+        OrderedThreadPoolExecutor executor = new 
OrderedThreadPoolExecutor(corePoolSize,1);
+        IoFilter.NextFilter nextFilter = new NextFilterAdapter() {
+            @Override
+            public void messageReceived(IoSession session, Object message) {
+                throw new RuntimeException("A RuntimeException thrown during 
unit testing.");
+            }
+        };
+        DummySession session = new DummySession();
+        ExecutorFilter filter = new ExecutorFilter(executor);
+
+        try {
+            // Execute system under test.
+            filter.messageReceived(nextFilter, session, null);
+
+            // Shutting down and awaiting termination ensures that test 
execution blocks until Worker execution has happened.
+            executor.shutdown();
+            if (!executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS)) {
+                throw new IllegalStateException("Bug in test implementation.");
+            }
+
+            // Verify results.
+            final Field idleWorkersField = 
OrderedThreadPoolExecutor.class.getDeclaredField("idleWorkers"); // Using 
reflection as the field is not accessible. It might be nicer to make the field 
package-protected for testing.
+            idleWorkersField.setAccessible(true);
+            final AtomicInteger idleWorkers = (AtomicInteger) 
idleWorkersField.get(executor);
+            assertEquals("After all tasks have finished, the amount of workers 
that are idle should equal the amount of workers, but did not.", 
executor.getPoolSize(), idleWorkers.get());
+        } finally {
+            // Clean up test fixture.
+            if (!executor.isShutdown()) {
+                executor.shutdownNow();
+            }
+        }
+    }
+
+    /**
+     * Tests the state of {@link OrderedThreadPoolExecutor#idleWorkers} and 
{@link OrderedThreadPoolExecutor#workers}
+     * after an Error is thrown when the {@link 
OrderedThreadPoolExecutor.Worker} is running.
+     *
+     * Note that the implementation of this test is <em>not 
representative</em> of how tasks are normally executed, as
+     * tasks would ordinarily be 'wrapped' in a FilterChain. Most FilterChain 
implementations would catch the Error that
+     * is being used in the implementation of this test. The purpose of this 
test is to verify Worker's behavior when an
+     * Error is thrown during execution occurs (even if that Error cannot 
occur in the way that this test simulates it).
+     * A test that implements the execution in a more realistic manner is 
provided in
+     * {@link org.apache.mina.transport.socket.nio.DIRMINA1156Test}.
+     *
+     * @see org.apache.mina.transport.socket.nio.DIRMINA1156Test
+     * @see <a href="https://issues.apache.org/jira/browse/DIRMINA-1132";>Issue 
DIRMINA-1156: Inconsistent worker / idleWorker in ThreadPoolExecutors</a>
+     */
+    @Test
+    public void testErrorInWorkerRun() throws Throwable
+    {
+        // Set up test fixture.
+        int corePoolSize = 1; // Prevent an idle worker from being cleaned up, 
which would skew the results of this test.
+        OrderedThreadPoolExecutor executor = new 
OrderedThreadPoolExecutor(corePoolSize,1);
+        IoFilter.NextFilter nextFilter = new NextFilterAdapter() {
+            @Override
+            public void messageReceived(IoSession session, Object message) {
+                throw new Error("An Error thrown during unit testing.");
+            }
+        };
+        DummySession session = new DummySession();
+        ExecutorFilter filter = new ExecutorFilter(executor);
+
+        try {
+            // Execute system under test.
+            filter.messageReceived(nextFilter, session, null);
+
+            // Ensure that the task has been executed in the executor.
+            executor.shutdown(); // Shutting down and awaiting termination 
ensures that test execution blocks until Worker execution has happened.
+            if (!executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS)) {
+                throw new IllegalStateException("Bug in test implementation.");
+            }
+
+            // Verify results.
+            final Field idleWorkersField = 
OrderedThreadPoolExecutor.class.getDeclaredField("idleWorkers"); // Using 
reflection as the field is not accessible. It might be nicer to make the field 
package-protected for testing.
+            idleWorkersField.setAccessible(true);
+            final AtomicInteger idleWorkers = (AtomicInteger) 
idleWorkersField.get(executor);
+            assertEquals("After all tasks have finished, the amount of workers 
that are idle should equal the amount of workers, but did not.", 
executor.getPoolSize(), idleWorkers.get());
+        } finally {
+            // Clean up test fixture.
+            if (!executor.isShutdown()) {
+                executor.shutdownNow();
+            }
+        }
+    }
+
+    /**
+     * Empty implementation of IoFilter.NextFilterAdapter, intended to 
facilitate easy subclassing.
+     */
+    private abstract static class NextFilterAdapter implements 
IoFilter.NextFilter {
+        public void sessionOpened(IoSession session) {}
+        public void sessionClosed(IoSession session) {}
+        public void sessionIdle(IoSession session, IdleStatus status) {}
+        public void exceptionCaught(IoSession session, Throwable cause) {}
+        public void inputClosed(IoSession session) {}
+        public void messageReceived(IoSession session, Object message) {}
+        public void messageSent(IoSession session, WriteRequest writeRequest) 
{}
+        public void filterWrite(IoSession session, WriteRequest writeRequest) 
{}
+        public void filterClose(IoSession session) {}
+        public void sessionCreated(IoSession session) {}
+    }
+}
diff --git 
a/mina-core/src/test/java/org/apache/mina/filter/executor/PriorityThreadPoolExecutorTest.java
 
b/mina-core/src/test/java/org/apache/mina/filter/executor/PriorityThreadPoolExecutorTest.java
new file mode 100644
index 0000000..97c97cd
--- /dev/null
+++ 
b/mina-core/src/test/java/org/apache/mina/filter/executor/PriorityThreadPoolExecutorTest.java
@@ -0,0 +1,424 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ *
+ */
+package org.apache.mina.filter.executor;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.mina.core.filterchain.IoFilter;
+import org.apache.mina.core.session.DummySession;
+import org.apache.mina.core.session.IdleStatus;
+import org.apache.mina.core.session.IoSession;
+import org.apache.mina.core.write.WriteRequest;
+import org.junit.Ignore;
+import org.junit.Test;
+
+/**
+ * Tests that verify the functionality provided by the implementation of
+ * {@link PriorityThreadPoolExecutor}.
+ *
+ * @author Guus der Kinderen, guus.der.kinde...@gmail.com
+ */
+public class PriorityThreadPoolExecutorTest {
+    /**
+     * Tests that verify the functionality provided by the implementation of
+     * {@link 
org.apache.mina.filter.executor.PriorityThreadPoolExecutor.SessionEntry}
+     * .
+     *
+     * This test asserts that, without a provided comparator, entries are
+     * considered equal, when they reference the same session.
+     */
+    @Test
+    public void fifoEntryTestNoComparatorSameSession() throws Exception {
+        // Set up fixture.
+        IoSession session = new DummySession();
+        PriorityThreadPoolExecutor.SessionEntry first = new 
PriorityThreadPoolExecutor.SessionEntry(session, null);
+        PriorityThreadPoolExecutor.SessionEntry last = new 
PriorityThreadPoolExecutor.SessionEntry(session, null);
+    
+        // Execute system under test.
+        int result = first.compareTo(last);
+    
+        // Verify results.
+        assertEquals("Without a comparator, entries of the same session are 
expected to be equal.", 0, result);
+    }
+
+    /**
+     * Tests that verify the functionality provided by the implementation of
+     * {@link 
org.apache.mina.filter.executor.PriorityThreadPoolExecutor.SessionEntry}
+     * .
+     *
+     * This test asserts that, without a provided comparator, the first entry
+     * created is 'less than' an entry that is created later.
+     */
+    @Test
+    public void fifoEntryTestNoComparatorDifferentSession() throws Exception {
+        // Set up fixture (the order in which the entries are created is
+        // relevant here!)
+        PriorityThreadPoolExecutor.SessionEntry first = new 
PriorityThreadPoolExecutor.SessionEntry(new DummySession(), null);
+        PriorityThreadPoolExecutor.SessionEntry last = new 
PriorityThreadPoolExecutor.SessionEntry(new DummySession(), null);
+        
+        // Execute system under test.
+        int result = first.compareTo(last);
+        
+        // Verify results.
+        assertTrue("Without a comparator, the first entry created should be 
the first entry out. Expected a negative result, instead, got: " + result, 
result < 0);
+    }
+
+    /**
+     * Tests that verify the functionality provided by the implementation of
+     * {@link 
org.apache.mina.filter.executor.PriorityThreadPoolExecutor.SessionEntry}
+     * .
+     *
+     * This test asserts that, with a provided comparator, entries are
+     * considered equal, when they reference the same session (the provided
+     * comparator is ignored).
+     */
+    @Test
+    public void fifoEntryTestWithComparatorSameSession() throws Exception {
+        // Set up fixture.
+        IoSession session = new DummySession();
+        final int predeterminedResult = 3853;
+        
+        Comparator<IoSession> comparator = new Comparator<IoSession>() {
+            @Override
+            public int compare(IoSession o1, IoSession o2) {
+                return predeterminedResult;
+            }
+        };
+        
+        PriorityThreadPoolExecutor.SessionEntry first = new 
PriorityThreadPoolExecutor.SessionEntry(session, comparator);
+        PriorityThreadPoolExecutor.SessionEntry last = new 
PriorityThreadPoolExecutor.SessionEntry(session, comparator);
+        
+        // Execute system under test.
+        int result = first.compareTo(last);
+        
+        // Verify results.
+        assertEquals("With a comparator, entries of the same session are 
expected to be equal.", 0, result);
+    }
+
+    /**
+     * Tests that verify the functionality provided by the implementation of
+     * {@link 
org.apache.mina.filter.executor.PriorityThreadPoolExecutor.SessionEntry}
+     * .
+     *
+     * This test asserts that a provided comparator is used instead of the
+     * (fallback) default behavior (when entries are referring different
+     * sessions).
+     */
+    @Test
+    public void fifoEntryTestComparatorDifferentSession() throws Exception {
+        // Set up fixture (the order in which the entries are created is
+        // relevant here!)
+        final int predeterminedResult = 3853;
+        
+        Comparator<IoSession> comparator = new Comparator<IoSession>() {
+            @Override
+            public int compare(IoSession o1, IoSession o2) {
+                return predeterminedResult;
+            }
+        };
+        
+        PriorityThreadPoolExecutor.SessionEntry first = new 
PriorityThreadPoolExecutor.SessionEntry(new DummySession(), comparator);
+        PriorityThreadPoolExecutor.SessionEntry last = new 
PriorityThreadPoolExecutor.SessionEntry(new DummySession(), comparator);
+        
+        // Execute system under test.
+        int result = first.compareTo(last);
+        
+        // Verify results.
+        assertEquals("With a comparator, comparing entries of different 
sessions is expected to yield the comparator result.", predeterminedResult, 
result);
+    }
+
+    /**
+     * Asserts that, when enough work is being submitted to the executor for it
+     * to start queuing work, prioritisation of work starts to occur.
+     *
+     * This implementation starts a number of sessions, and evenly distributes 
a
+     * number of messages to them. Processing each message is artificially made
+     * 'expensive', while the executor pool is kept small. This causes work to
+     * be queued in the executor.
+     *
+     * The executor that is used is configured to prefer one specific session.
+     * Each session records the timestamp of its last activity. After all work
+     * has been processed, the test asserts that the last activity of all
+     * sessions was later than the last activity of the preferred session.
+     */
+    @Test
+    @Ignore("This test faiuls randomly")
+    public void testPrioritisation() throws Throwable {
+        // Set up fixture.
+        MockWorkFilter nextFilter = new MockWorkFilter();
+        List<LastActivityTracker> sessions = new ArrayList<>();
+        
+        for (int i = 0; i < 10; i++) {
+            sessions.add(new LastActivityTracker());
+        }
+        
+        LastActivityTracker preferredSession = sessions.get(4); // prefer an 
arbitrary session
+                                                                // (but not 
the first or last
+                                                                // session, 
for good measure).
+        Comparator<IoSession> comparator = new 
UnfairComparator(preferredSession);
+        int maximumPoolSize = 1; // keep this low, to force resource 
contention.
+        int amountOfTasks = 400;
+        
+        ExecutorService executor = new 
PriorityThreadPoolExecutor(maximumPoolSize, comparator);
+        ExecutorFilter filter = new ExecutorFilter(executor);
+        
+        // Execute system under test.
+        for (int i = 0; i < amountOfTasks; i++) {
+            int sessionIndex = i % sessions.size();
+            
+            LastActivityTracker currentSession = sessions.get(sessionIndex);
+            filter.messageReceived(nextFilter, currentSession, null);
+        
+            if (nextFilter.throwable != null) {
+                throw nextFilter.throwable;
+            }
+        }
+        
+        executor.shutdown();
+        
+        // Verify results.
+        executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
+        
+        for (LastActivityTracker session : sessions) {
+            if (session != preferredSession) {
+                assertTrue("All other sessions should have finished later than 
the preferred session (but at least one did not).", 
+                    session.lastActivity > preferredSession.lastActivity);
+            }
+        }
+    }
+
+    /**
+     * Tests the state of {@link PriorityThreadPoolExecutor#idleWorkers} and 
{@link PriorityThreadPoolExecutor#workers}
+     * after a RuntimeException is thrown when the {@link 
PriorityThreadPoolExecutor.Worker} is running.
+     *
+     * Note that the implementation of this test is <em>not 
representative</em> of how tasks are normally executed, as
+     * tasks would ordinarily be 'wrapped' in a FilterChain. Most FilterChain 
implementations would catch the
+     * RuntimeException that is being used in the implementation of this test. 
The purpose of this test is to verify
+     * Worker's behavior when a RuntimeException is thrown during execution 
occurs (even if that RuntimeException cannot
+     * occur in the way that this test simulates it). A test that implements 
the execution in a more realistic manner is
+     * provided in {@link 
org.apache.mina.transport.socket.nio.DIRMINA1156Test}.
+     *
+     * @see org.apache.mina.transport.socket.nio.DIRMINA1156Test
+     * @see <a href="https://issues.apache.org/jira/browse/DIRMINA-1132";>Issue 
DIRMINA-1156: Inconsistent worker / idleWorker in ThreadPoolExecutors</a>
+     */
+    @Test
+    public void testRuntimeExceptionInWorkerRun() throws Throwable
+    {
+        // Set up test fixture.
+        int corePoolSize = 1; // Prevent an idle worker from being cleaned up, 
which would skew the results of this test.
+        PriorityThreadPoolExecutor executor = new 
PriorityThreadPoolExecutor(corePoolSize,1);
+        IoFilter.NextFilter nextFilter = new 
PriorityThreadPoolExecutorTest.NextFilterAdapter() {
+            @Override
+            public void messageReceived(IoSession session, Object message) {
+                throw new RuntimeException("A RuntimeException thrown during 
unit testing.");
+            }
+        };
+        DummySession session = new DummySession();
+        ExecutorFilter filter = new ExecutorFilter(executor);
+
+        try {
+            // Execute system under test.
+            filter.messageReceived(nextFilter, session, null);
+
+            // Shutting down and awaiting termination ensures that test 
execution blocks until Worker execution has happened.
+            executor.shutdown();
+            if (!executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS)) {
+                throw new IllegalStateException("Bug in test implementation.");
+            }
+
+            // Verify results.
+            final Field idleWorkersField = 
PriorityThreadPoolExecutor.class.getDeclaredField("idleWorkers"); // Using 
reflection as the field is not accessible. It might be nicer to make the field 
package-protected for testing.
+            idleWorkersField.setAccessible(true);
+            final AtomicInteger idleWorkers = (AtomicInteger) 
idleWorkersField.get(executor);
+            assertEquals("After all tasks have finished, the amount of workers 
that are idle should equal the amount of workers, but did not.", 
executor.getPoolSize(), idleWorkers.get());
+        } finally {
+            // Clean up test fixture.
+            if (!executor.isShutdown()) {
+                executor.shutdownNow();
+            }
+        }
+    }
+
+    /**
+     * Tests the state of {@link PriorityThreadPoolExecutor#idleWorkers} and 
{@link PriorityThreadPoolExecutor#workers}
+     * after an Error is thrown when the {@link 
PriorityThreadPoolExecutor.Worker} is running.
+     *
+     * Note that the implementation of this test is <em>not 
representative</em> of how tasks are normally executed, as
+     * tasks would ordinarily be 'wrapped' in a FilterChain. Most FilterChain 
implementations would catch the Error that
+     * is being used in the implementation of this test. The purpose of this 
test is to verify Worker's behavior when an
+     * Error is thrown during execution occurs (even if that Error cannot 
occur in the way that this test simulates it).
+     * A test that implements the execution in a more realistic manner is 
provided in
+     * {@link org.apache.mina.transport.socket.nio.DIRMINA1156Test}.
+     *
+     * @see org.apache.mina.transport.socket.nio.DIRMINA1156Test
+     * @see <a href="https://issues.apache.org/jira/browse/DIRMINA-1132";>Issue 
DIRMINA-1156: Inconsistent worker / idleWorker in ThreadPoolExecutors</a>
+     */
+    @Test
+    public void testErrorInWorkerRun() throws Throwable
+    {
+        // Set up test fixture.
+        int corePoolSize = 1; // Prevent an idle worker from being cleaned up, 
which would skew the results of this test.
+        PriorityThreadPoolExecutor executor = new 
PriorityThreadPoolExecutor(corePoolSize,1);
+        IoFilter.NextFilter nextFilter = new 
PriorityThreadPoolExecutorTest.NextFilterAdapter() {
+            @Override
+            public void messageReceived(IoSession session, Object message) {
+                throw new Error("An Error thrown during unit testing.");
+            }
+        };
+        DummySession session = new DummySession();
+        ExecutorFilter filter = new ExecutorFilter(executor);
+
+        try {
+            // Execute system under test.
+            filter.messageReceived(nextFilter, session, null);
+
+            // Ensure that the task has been executed in the executor.
+            executor.shutdown(); // Shutting down and awaiting termination 
ensures that test execution blocks until Worker execution has happened.
+            if (!executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS)) {
+                throw new IllegalStateException("Bug in test implementation.");
+            }
+
+            // Verify results.
+            final Field idleWorkersField = 
PriorityThreadPoolExecutor.class.getDeclaredField("idleWorkers"); // Using 
reflection as the field is not accessible. It might be nicer to make the field 
package-protected for testing.
+            idleWorkersField.setAccessible(true);
+            final AtomicInteger idleWorkers = (AtomicInteger) 
idleWorkersField.get(executor);
+            assertEquals("After all tasks have finished, the amount of workers 
that are idle should equal the amount of workers, but did not.", 
executor.getPoolSize(), idleWorkers.get());
+        } finally {
+            // Clean up test fixture.
+            if (!executor.isShutdown()) {
+                executor.shutdownNow();
+            }
+        }
+    }
+
+    /**
+     * A comparator that prefers a particular session.
+     */
+    private static class UnfairComparator implements Comparator<IoSession> {
+        private IoSession preferred;
+        
+        public UnfairComparator(IoSession preferred) {
+            this.preferred = preferred;
+        }
+        
+        @Override
+        public int compare(IoSession o1, IoSession o2) {
+            if (o1 == preferred) {
+                return -1;
+            }
+        
+            if (o2 == preferred) {
+                return 1;
+            }
+        
+            return 0;
+        }
+    }
+
+    /**
+     * A session that tracks the timestamp of last activity.
+     */
+    private static class LastActivityTracker extends DummySession {
+        long lastActivity = System.currentTimeMillis();
+
+        public synchronized void setLastActivity() {
+            lastActivity = System.currentTimeMillis();
+        }
+    }
+
+    /**
+     * A filter that simulates a non-negligible amount of work.
+     */
+    private static class MockWorkFilter implements IoFilter.NextFilter {
+        Throwable throwable;
+        
+        public void sessionOpened(IoSession session) {
+            // Do nothing
+        }
+        
+        public void sessionClosed(IoSession session) {
+            // Do nothing
+        }
+        
+        public void sessionIdle(IoSession session, IdleStatus status) {
+            // Do nothing
+        }
+        
+        public void exceptionCaught(IoSession session, Throwable cause) {
+            // Do nothing
+        }
+        
+        public void inputClosed(IoSession session) {
+            // Do nothing
+        }
+        
+        public void messageReceived(IoSession session, Object message) {
+            try {
+                Thread.sleep(20); // mimic work.
+                ((LastActivityTracker) session).setLastActivity();
+            } catch (Exception e) {
+                if (this.throwable == null) {
+                    this.throwable = e;
+                }
+            }
+        }
+        
+        public void messageSent(IoSession session, WriteRequest writeRequest) {
+            // Do nothing
+        }
+        
+        public void filterWrite(IoSession session, WriteRequest writeRequest) {
+            // Do nothing
+        }
+        
+        public void filterClose(IoSession session) {
+            // Do nothing
+        }
+        
+        public void sessionCreated(IoSession session) {
+            // Do nothing
+        }
+    }
+
+    /**
+     * Empty implementation of IoFilter.NextFilterAdapter, intended to 
facilitate easy subclassing.
+     */
+    private abstract static class NextFilterAdapter implements 
IoFilter.NextFilter {
+        public void sessionOpened(IoSession session) {}
+        public void sessionClosed(IoSession session) {}
+        public void sessionIdle(IoSession session, IdleStatus status) {}
+        public void exceptionCaught(IoSession session, Throwable cause) {}
+        public void inputClosed(IoSession session) {}
+        public void messageReceived(IoSession session, Object message) {}
+        public void messageSent(IoSession session, WriteRequest writeRequest) 
{}
+        public void filterWrite(IoSession session, WriteRequest writeRequest) 
{}
+        public void filterClose(IoSession session) {}
+        public void sessionCreated(IoSession session) {}
+    }
+}
diff --git 
a/mina-core/src/test/java/org/apache/mina/filter/executor/UnorderedThreadPoolExecutorTest.java
 
b/mina-core/src/test/java/org/apache/mina/filter/executor/UnorderedThreadPoolExecutorTest.java
new file mode 100644
index 0000000..22adcea
--- /dev/null
+++ 
b/mina-core/src/test/java/org/apache/mina/filter/executor/UnorderedThreadPoolExecutorTest.java
@@ -0,0 +1,162 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ *
+ */
+package org.apache.mina.filter.executor;
+
+import static org.junit.Assert.assertEquals;
+
+import java.lang.reflect.Field;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.mina.core.filterchain.IoFilter;
+import org.apache.mina.core.session.DummySession;
+import org.apache.mina.core.session.IdleStatus;
+import org.apache.mina.core.session.IoSession;
+import org.apache.mina.core.write.WriteRequest;
+import org.junit.Test;
+
+/**
+ * Tests that verify the functionality provided by the implementation of
+ * {@link UnorderedThreadPoolExecutor}.
+ *
+ * @author Guus der Kinderen, guus.der.kinde...@gmail.com
+ */
+public class UnorderedThreadPoolExecutorTest
+{
+    /**
+     * Tests the state of {@link UnorderedThreadPoolExecutor#idleWorkers} and 
{@link UnorderedThreadPoolExecutor#workers}
+     * after a RuntimeException is thrown when the {@link 
UnorderedThreadPoolExecutor.Worker} is running.
+     *
+     * Note that the implementation of this test is <em>not 
representative</em> of how tasks are normally executed, as
+     * tasks would ordinarily be 'wrapped' in a FilterChain. Most FilterChain 
implementations would catch the
+     * RuntimeException that is being used in the implementation of this test. 
The purpose of this test is to verify
+     * Worker's behavior when a RuntimeException is thrown during execution 
occurs (even if that RuntimeException cannot
+     * occur in the way that this test simulates it). A test that implements 
the execution in a more realistic manner is
+     * provided in {@link 
org.apache.mina.transport.socket.nio.DIRMINA1156Test}.
+     *
+     * @see org.apache.mina.transport.socket.nio.DIRMINA1156Test
+     * @see <a href="https://issues.apache.org/jira/browse/DIRMINA-1132";>Issue 
DIRMINA-1156: Inconsistent worker / idleWorker in ThreadPoolExecutors</a>
+     */
+    @Test
+    public void testRuntimeExceptionInWorkerRun() throws Throwable
+    {
+        // Set up test fixture.
+        int corePoolSize = 1; // Prevent an idle worker from being cleaned up, 
which would skew the results of this test.
+        UnorderedThreadPoolExecutor executor = new 
UnorderedThreadPoolExecutor(corePoolSize,1);
+        IoFilter.NextFilter nextFilter = new NextFilterAdapter() {
+            @Override
+            public void messageReceived(IoSession session, Object message) {
+                throw new RuntimeException("A RuntimeException thrown during 
unit testing.");
+            }
+        };
+        DummySession session = new DummySession();
+        ExecutorFilter filter = new ExecutorFilter(executor);
+
+        try {
+            // Execute system under test.
+            filter.messageReceived(nextFilter, session, null);
+
+            // Shutting down and awaiting termination ensures that test 
execution blocks until Worker execution has happened.
+            executor.shutdown();
+            if (!executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS)) {
+                throw new IllegalStateException("Bug in test implementation.");
+            }
+
+            // Verify results.
+            final Field idleWorkersField = 
UnorderedThreadPoolExecutor.class.getDeclaredField("idleWorkers"); // Using 
reflection as the field is not accessible. It might be nicer to make the field 
package-protected for testing.
+            idleWorkersField.setAccessible(true);
+            final AtomicInteger idleWorkers = (AtomicInteger) 
idleWorkersField.get(executor);
+            assertEquals("After all tasks have finished, the amount of workers 
that are idle should equal the amount of workers, but did not.", 
executor.getPoolSize(), idleWorkers.get());
+        } finally {
+            // Clean up test fixture.
+            if (!executor.isShutdown()) {
+                executor.shutdownNow();
+            }
+        }
+    }
+
+    /**
+     * Tests the state of {@link UnorderedThreadPoolExecutor#idleWorkers} and 
{@link UnorderedThreadPoolExecutor#workers}
+     * after an Error is thrown when the {@link 
UnorderedThreadPoolExecutor.Worker} is running.
+     *
+     * Note that the implementation of this test is <em>not 
representative</em> of how tasks are normally executed, as
+     * tasks would ordinarily be 'wrapped' in a FilterChain. Most FilterChain 
implementations would catch the Error that
+     * is being used in the implementation of this test. The purpose of this 
test is to verify Worker's behavior when an
+     * Error is thrown during execution occurs (even if that Error cannot 
occur in the way that this test simulates it).
+     * A test that implements the execution in a more realistic manner is 
provided in
+     * {@link org.apache.mina.transport.socket.nio.DIRMINA1156Test}.
+     *
+     * @see org.apache.mina.transport.socket.nio.DIRMINA1156Test
+     * @see <a href="https://issues.apache.org/jira/browse/DIRMINA-1132";>Issue 
DIRMINA-1156: Inconsistent worker / idleWorker in ThreadPoolExecutors</a>
+     */
+    @Test
+    public void testErrorInWorkerRun() throws Throwable
+    {
+        // Set up test fixture.
+        int corePoolSize = 1; // Prevent an idle worker from being cleaned up, 
which would skew the results of this test.
+        UnorderedThreadPoolExecutor executor = new 
UnorderedThreadPoolExecutor(corePoolSize,1);
+        IoFilter.NextFilter nextFilter = new NextFilterAdapter() {
+            @Override
+            public void messageReceived(IoSession session, Object message) {
+                throw new Error("An Error thrown during unit testing.");
+            }
+        };
+        DummySession session = new DummySession();
+        ExecutorFilter filter = new ExecutorFilter(executor);
+
+        try {
+            // Execute system under test.
+            filter.messageReceived(nextFilter, session, null);
+
+            // Ensure that the task has been executed in the executor.
+            executor.shutdown(); // Shutting down and awaiting termination 
ensures that test execution blocks until Worker execution has happened.
+            if (!executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS)) {
+                throw new IllegalStateException("Bug in test implementation.");
+            }
+
+            // Verify results.
+            final Field idleWorkersField = 
UnorderedThreadPoolExecutor.class.getDeclaredField("idleWorkers"); // Using 
reflection as the field is not accessible. It might be nicer to make the field 
package-protected for testing.
+            idleWorkersField.setAccessible(true);
+            final AtomicInteger idleWorkers = (AtomicInteger) 
idleWorkersField.get(executor);
+            assertEquals("After all tasks have finished, the amount of workers 
that are idle should equal the amount of workers, but did not.", 
executor.getPoolSize(), idleWorkers.get());
+        } finally {
+            // Clean up test fixture.
+            if (!executor.isShutdown()) {
+                executor.shutdownNow();
+            }
+        }
+    }
+
+    /**
+     * Empty implementation of IoFilter.NextFilterAdapter, intended to 
facilitate easy subclassing.
+     */
+    private abstract static class NextFilterAdapter implements 
IoFilter.NextFilter {
+        public void sessionOpened(IoSession session) {}
+        public void sessionClosed(IoSession session) {}
+        public void sessionIdle(IoSession session, IdleStatus status) {}
+        public void exceptionCaught(IoSession session, Throwable cause) {}
+        public void inputClosed(IoSession session) {}
+        public void messageReceived(IoSession session, Object message) {}
+        public void messageSent(IoSession session, WriteRequest writeRequest) 
{}
+        public void filterWrite(IoSession session, WriteRequest writeRequest) 
{}
+        public void filterClose(IoSession session) {}
+        public void sessionCreated(IoSession session) {}
+    }
+}
diff --git 
a/mina-core/src/test/java/org/apache/mina/handler/DIRMINA1156Test.java 
b/mina-core/src/test/java/org/apache/mina/handler/DIRMINA1156Test.java
new file mode 100644
index 0000000..40b7073
--- /dev/null
+++ b/mina-core/src/test/java/org/apache/mina/handler/DIRMINA1156Test.java
@@ -0,0 +1,465 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ *
+ */
+package org.apache.mina.handler;
+
+import static org.junit.Assert.assertEquals;
+
+import java.lang.reflect.Field;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.mina.core.filterchain.IoFilterChain;
+import org.apache.mina.core.service.IoHandlerAdapter;
+import org.apache.mina.core.session.DummySession;
+import org.apache.mina.core.session.IoSession;
+import org.apache.mina.filter.executor.ExecutorFilter;
+import org.apache.mina.filter.executor.OrderedThreadPoolExecutor;
+import org.apache.mina.filter.executor.PriorityThreadPoolExecutor;
+import org.apache.mina.filter.executor.UnorderedThreadPoolExecutor;
+import org.junit.Test;
+
+/**
+ * Tests that reproduces a bug as described in issue DIRMINA-1156
+ *
+ * @author Guus der Kinderen, guus.der.kinde...@gmail.com
+ * @see <a 
href="https://issues.apache.org/jira/browse/DIRMINA-1156";>DIRMINA-1156</a>
+ */
+public class DIRMINA1156Test
+{
+    /**
+     * Tests the state of {@link OrderedThreadPoolExecutor#idleWorkers} and 
{@link OrderedThreadPoolExecutor#workers}
+     * after an {@link Error} is thrown by a session's handler that was 
invoked through an OrderedThreadPoolExecutor.
+     */
+    @Test
+    public void testOrderedThreadPoolExecutorSessionHandlerThrowingError() 
throws Exception
+    {
+        // Set up test fixture.
+        final boolean[] filterTriggered = {false}; // Used to verify the 
implementation of this test (to see if the Handler is invoked at all).
+        int corePoolSize = 1; // Prevent an idle worker from being cleaned up, 
which would skew the results of this test.
+        DummySession session = new DummySession();
+        IoFilterChain chain = session.getFilterChain();
+        OrderedThreadPoolExecutor executor = new 
OrderedThreadPoolExecutor(corePoolSize,1);
+        chain.addLast("executor", new ExecutorFilter(executor));
+        session.setHandler( new IoHandlerAdapter() {
+            @Override
+            public void messageReceived(IoSession session, Object message) 
throws Exception {
+                filterTriggered[0] = true;
+                throw new Error("An Error thrown during unit testing.");
+            }
+        });
+
+        // Execute system under test.
+        try {
+            chain.fireMessageReceived("foo");
+
+            // Shutting down and awaiting termination ensures that test 
execution blocks until Handler invocation has happened.
+            executor.shutdown();
+            executor.awaitTermination(10, TimeUnit.SECONDS);
+            if (!filterTriggered[0]) {
+                throw new IllegalStateException("Bug in test implementation: 
the session handler was never invoked.");
+            }
+
+            // Verify results.
+            final Field idleWorkersField = 
OrderedThreadPoolExecutor.class.getDeclaredField("idleWorkers"); // Using 
reflection as the field is not accessible. It might be nicer to make the field 
package-protected for testing.
+            idleWorkersField.setAccessible(true);
+            final AtomicInteger idleWorkers = (AtomicInteger) 
idleWorkersField.get(executor);
+            assertEquals("After all tasks have finished, the amount of workers 
that are idle should equal the amount of workers, but did not.", 
executor.getPoolSize(), idleWorkers.get());
+        } finally {
+            // Clean up test fixture.
+            if (!executor.isShutdown()) {
+                executor.shutdownNow();
+            }
+        }
+    }
+
+    /**
+     * Tests the state of {@link OrderedThreadPoolExecutor#idleWorkers} and 
{@link OrderedThreadPoolExecutor#workers}
+     * after a {@link RuntimeException} is thrown by a session's handler that 
was invoked through an
+     * OrderedThreadPoolExecutor.
+     */
+    @Test
+    public void 
testOrderedThreadPoolExecutorSessionHandlerThrowingRuntimeException() throws 
Exception
+    {
+        // Set up test fixture.
+        final boolean[] filterTriggered = {false}; // Used to verify the 
implementation of this test (to see if the Handler is invoked at all).
+        int corePoolSize = 1; // Prevent an idle worker from being cleaned up, 
which would skew the results of this test.
+        DummySession session = new DummySession();
+        IoFilterChain chain = session.getFilterChain();
+        OrderedThreadPoolExecutor executor = new 
OrderedThreadPoolExecutor(corePoolSize,1);
+        chain.addLast("executor", new ExecutorFilter(executor));
+        session.setHandler( new IoHandlerAdapter() {
+            @Override
+            public void messageReceived(IoSession session, Object message) 
throws Exception {
+                filterTriggered[0] = true;
+                throw new RuntimeException("A RuntimeException thrown during 
unit testing.");
+            }
+        });
+
+        // Execute system under test.
+        try {
+            chain.fireMessageReceived("foo");
+
+            // Shutting down and awaiting termination ensures that test 
execution blocks until Handler invocation has happened.
+            executor.shutdown();
+            executor.awaitTermination(10, TimeUnit.SECONDS);
+            if (!filterTriggered[0]) {
+                throw new IllegalStateException("Bug in test implementation: 
the session handler was never invoked.");
+            }
+
+            // Verify results.
+            final Field idleWorkersField = 
OrderedThreadPoolExecutor.class.getDeclaredField("idleWorkers"); // Using 
reflection as the field is not accessible. It might be nicer to make the field 
package-protected for testing.
+            idleWorkersField.setAccessible(true);
+            final AtomicInteger idleWorkers = (AtomicInteger) 
idleWorkersField.get(executor);
+            assertEquals("After all tasks have finished, the amount of workers 
that are idle should equal the amount of workers, but did not.", 
executor.getPoolSize(), idleWorkers.get());
+        } finally {
+            // Clean up test fixture.
+            if (!executor.isShutdown()) {
+                executor.shutdownNow();
+            }
+        }
+    }
+
+    /**
+     * Tests the state of {@link OrderedThreadPoolExecutor#idleWorkers} and 
{@link OrderedThreadPoolExecutor#workers}
+     * after a (checked) {@link Exception} is thrown by a session's handler 
that was invoked through an
+     * OrderedThreadPoolExecutor.
+     */
+    @Test
+    public void 
testOrderedThreadPoolExecutorSessionHandlerThrowingCheckedException() throws 
Exception
+    {
+        // Set up test fixture.
+        final boolean[] filterTriggered = {false}; // Used to verify the 
implementation of this test (to see if the Handler is invoked at all).
+        int corePoolSize = 1; // Prevent an idle worker from being cleaned up, 
which would skew the results of this test.
+        DummySession session = new DummySession();
+        IoFilterChain chain = session.getFilterChain();
+        OrderedThreadPoolExecutor executor = new 
OrderedThreadPoolExecutor(corePoolSize,1);
+        chain.addLast("executor", new ExecutorFilter(executor));
+        session.setHandler( new IoHandlerAdapter() {
+            @Override
+            public void messageReceived(IoSession session, Object message) 
throws Exception {
+                filterTriggered[0] = true;
+                throw new Exception("A (checked) Exception thrown during unit 
testing.");
+            }
+        });
+
+        // Execute system under test.
+        try {
+            chain.fireMessageReceived("foo");
+
+            // Shutting down and awaiting termination ensures that test 
execution blocks until Handler invocation has happened.
+            executor.shutdown();
+            executor.awaitTermination(10, TimeUnit.SECONDS);
+            if (!filterTriggered[0]) {
+                throw new IllegalStateException("Bug in test implementation: 
the session handler was never invoked.");
+            }
+
+            // Verify results.
+            final Field idleWorkersField = 
OrderedThreadPoolExecutor.class.getDeclaredField("idleWorkers"); // Using 
reflection as the field is not accessible. It might be nicer to make the field 
package-protected for testing.
+            idleWorkersField.setAccessible(true);
+            final AtomicInteger idleWorkers = (AtomicInteger) 
idleWorkersField.get(executor);
+            assertEquals("After all tasks have finished, the amount of workers 
that are idle should equal the amount of workers, but did not.", 
executor.getPoolSize(), idleWorkers.get());
+        } finally {
+            // Clean up test fixture.
+            if (!executor.isShutdown()) {
+                executor.shutdownNow();
+            }
+        }
+    }
+
+    /**
+     * Tests the state of {@link UnorderedThreadPoolExecutor#idleWorkers} and 
{@link UnorderedThreadPoolExecutor#workers}
+     * after an {@link Error} is thrown by a session's handler that was 
invoked through an UnorderedThreadPoolExecutor.
+     */
+    @Test
+    public void testUnorderedThreadPoolExecutorSessionHandlerThrowingError() 
throws Exception
+    {
+        // Set up test fixture.
+        final boolean[] filterTriggered = {false}; // Used to verify the 
implementation of this test (to see if the Handler is invoked at all).
+        int corePoolSize = 1; // Prevent an idle worker from being cleaned up, 
which would skew the results of this test.
+        DummySession session = new DummySession();
+        IoFilterChain chain = session.getFilterChain();
+        UnorderedThreadPoolExecutor executor = new 
UnorderedThreadPoolExecutor(corePoolSize,1);
+        chain.addLast("executor", new ExecutorFilter(executor));
+        session.setHandler( new IoHandlerAdapter() {
+            @Override
+            public void messageReceived(IoSession session, Object message) 
throws Exception {
+                filterTriggered[0] = true;
+                throw new Error("An Error thrown during unit testing.");
+            }
+        });
+
+        // Execute system under test.
+        try {
+            chain.fireMessageReceived("foo");
+
+            // Shutting down and awaiting termination ensures that test 
execution blocks until Handler invocation has happened.
+            executor.shutdown();
+            executor.awaitTermination(10, TimeUnit.SECONDS);
+            if (!filterTriggered[0]) {
+                throw new IllegalStateException("Bug in test implementation: 
the session handler was never invoked.");
+            }
+
+            // Verify results.
+            final Field idleWorkersField = 
UnorderedThreadPoolExecutor.class.getDeclaredField("idleWorkers"); // Using 
reflection as the field is not accessible. It might be nicer to make the field 
package-protected for testing.
+            idleWorkersField.setAccessible(true);
+            final AtomicInteger idleWorkers = (AtomicInteger) 
idleWorkersField.get(executor);
+            assertEquals("After all tasks have finished, the amount of workers 
that are idle should equal the amount of workers, but did not.", 
executor.getPoolSize(), idleWorkers.get());
+        } finally {
+            // Clean up test fixture.
+            if (!executor.isShutdown()) {
+                executor.shutdownNow();
+            }
+        }
+    }
+
+    /**
+     * Tests the state of {@link UnorderedThreadPoolExecutor#idleWorkers} and 
{@link UnorderedThreadPoolExecutor#workers}
+     * after a {@link RuntimeException} is thrown by a session's handler that 
was invoked through an
+     * UnorderedThreadPoolExecutor.
+     */
+    @Test
+    public void 
testUnorderedThreadPoolExecutorSessionHandlerThrowingRuntimeException() throws 
Exception
+    {
+        // Set up test fixture.
+        final boolean[] filterTriggered = {false}; // Used to verify the 
implementation of this test (to see if the Handler is invoked at all).
+        int corePoolSize = 1; // Prevent an idle worker from being cleaned up, 
which would skew the results of this test.
+        DummySession session = new DummySession();
+        IoFilterChain chain = session.getFilterChain();
+        UnorderedThreadPoolExecutor executor = new 
UnorderedThreadPoolExecutor(corePoolSize,1);
+        chain.addLast("executor", new ExecutorFilter(executor));
+        session.setHandler( new IoHandlerAdapter() {
+            @Override
+            public void messageReceived(IoSession session, Object message) 
throws Exception {
+                filterTriggered[0] = true;
+                throw new RuntimeException("A RuntimeException thrown during 
unit testing.");
+            }
+        });
+
+        // Execute system under test.
+        try {
+            chain.fireMessageReceived("foo");
+
+            // Shutting down and awaiting termination ensures that test 
execution blocks until Handler invocation has happened.
+            executor.shutdown();
+            executor.awaitTermination(10, TimeUnit.SECONDS);
+            if (!filterTriggered[0]) {
+                throw new IllegalStateException("Bug in test implementation: 
the session handler was never invoked.");
+            }
+
+            // Verify results.
+            final Field idleWorkersField = 
UnorderedThreadPoolExecutor.class.getDeclaredField("idleWorkers"); // Using 
reflection as the field is not accessible. It might be nicer to make the field 
package-protected for testing.
+            idleWorkersField.setAccessible(true);
+            final AtomicInteger idleWorkers = (AtomicInteger) 
idleWorkersField.get(executor);
+            assertEquals("After all tasks have finished, the amount of workers 
that are idle should equal the amount of workers, but did not.", 
executor.getPoolSize(), idleWorkers.get());
+        } finally {
+            // Clean up test fixture.
+            if (!executor.isShutdown()) {
+                executor.shutdownNow();
+            }
+        }
+    }
+
+    /**
+     * Tests the state of {@link UnorderedThreadPoolExecutor#idleWorkers} and 
{@link UnorderedThreadPoolExecutor#workers}
+     * after a (checked) {@link Exception} is thrown by a session's handler 
that was invoked through an
+     * UnorderedThreadPoolExecutor.
+     */
+    @Test
+    public void 
testUnorderedThreadPoolExecutorSessionHandlerThrowingCheckedException() throws 
Exception
+    {
+        // Set up test fixture.
+        final boolean[] filterTriggered = {false}; // Used to verify the 
implementation of this test (to see if the Handler is invoked at all).
+        int corePoolSize = 1; // Prevent an idle worker from being cleaned up, 
which would skew the results of this test.
+        DummySession session = new DummySession();
+        IoFilterChain chain = session.getFilterChain();
+        UnorderedThreadPoolExecutor executor = new 
UnorderedThreadPoolExecutor(corePoolSize,1);
+        chain.addLast("executor", new ExecutorFilter(executor));
+        session.setHandler( new IoHandlerAdapter() {
+            @Override
+            public void messageReceived(IoSession session, Object message) 
throws Exception {
+                filterTriggered[0] = true;
+                throw new Exception("A (checked) Exception thrown during unit 
testing.");
+            }
+        });
+
+        // Execute system under test.
+        try {
+            chain.fireMessageReceived("foo");
+
+            // Shutting down and awaiting termination ensures that test 
execution blocks until Handler invocation has happened.
+            executor.shutdown();
+            executor.awaitTermination(10, TimeUnit.SECONDS);
+            if (!filterTriggered[0]) {
+                throw new IllegalStateException("Bug in test implementation: 
the session handler was never invoked.");
+            }
+
+            // Verify results.
+            final Field idleWorkersField = 
UnorderedThreadPoolExecutor.class.getDeclaredField("idleWorkers"); // Using 
reflection as the field is not accessible. It might be nicer to make the field 
package-protected for testing.
+            idleWorkersField.setAccessible(true);
+            final AtomicInteger idleWorkers = (AtomicInteger) 
idleWorkersField.get(executor);
+            assertEquals("After all tasks have finished, the amount of workers 
that are idle should equal the amount of workers, but did not.", 
executor.getPoolSize(), idleWorkers.get());
+        } finally {
+            // Clean up test fixture.
+            if (!executor.isShutdown()) {
+                executor.shutdownNow();
+            }
+        }
+    }
+
+    /**
+     * Tests the state of {@link PriorityThreadPoolExecutor#idleWorkers} and 
{@link PriorityThreadPoolExecutor#workers}
+     * after an {@link Error} is thrown by a session's handler that was 
invoked through an PriorityThreadPoolExecutor.
+     */
+    @Test
+    public void testPriorityThreadPoolExecutorSessionHandlerThrowingError() 
throws Exception
+    {
+        // Set up test fixture.
+        final boolean[] filterTriggered = {false}; // Used to verify the 
implementation of this test (to see if the Handler is invoked at all).
+        int corePoolSize = 1; // Prevent an idle worker from being cleaned up, 
which would skew the results of this test.
+        DummySession session = new DummySession();
+        IoFilterChain chain = session.getFilterChain();
+        PriorityThreadPoolExecutor executor = new 
PriorityThreadPoolExecutor(corePoolSize,1);
+        chain.addLast("executor", new ExecutorFilter(executor));
+        session.setHandler( new IoHandlerAdapter() {
+            @Override
+            public void messageReceived(IoSession session, Object message) 
throws Exception {
+                filterTriggered[0] = true;
+                throw new Error("An Error thrown during unit testing.");
+            }
+        });
+
+        // Execute system under test.
+        try {
+            chain.fireMessageReceived("foo");
+
+            // Shutting down and awaiting termination ensures that test 
execution blocks until Handler invocation has happened.
+            executor.shutdown();
+            executor.awaitTermination(10, TimeUnit.SECONDS);
+            if (!filterTriggered[0]) {
+                throw new IllegalStateException("Bug in test implementation: 
the session handler was never invoked.");
+            }
+
+            // Verify results.
+            final Field idleWorkersField = 
PriorityThreadPoolExecutor.class.getDeclaredField("idleWorkers"); // Using 
reflection as the field is not accessible. It might be nicer to make the field 
package-protected for testing.
+            idleWorkersField.setAccessible(true);
+            final AtomicInteger idleWorkers = (AtomicInteger) 
idleWorkersField.get(executor);
+            assertEquals("After all tasks have finished, the amount of workers 
that are idle should equal the amount of workers, but did not.", 
executor.getPoolSize(), idleWorkers.get());
+        } finally {
+            // Clean up test fixture.
+            if (!executor.isShutdown()) {
+                executor.shutdownNow();
+            }
+        }
+    }
+
+    /**
+     * Tests the state of {@link PriorityThreadPoolExecutor#idleWorkers} and 
{@link PriorityThreadPoolExecutor#workers}
+     * after a {@link RuntimeException} is thrown by a session's handler that 
was invoked through an
+     * PriorityThreadPoolExecutor.
+     */
+    @Test
+    public void 
testPriorityThreadPoolExecutorSessionHandlerThrowingRuntimeException() throws 
Exception
+    {
+        // Set up test fixture.
+        final boolean[] filterTriggered = {false}; // Used to verify the 
implementation of this test (to see if the Handler is invoked at all).
+        int corePoolSize = 1; // Prevent an idle worker from being cleaned up, 
which would skew the results of this test.
+        DummySession session = new DummySession();
+        IoFilterChain chain = session.getFilterChain();
+        PriorityThreadPoolExecutor executor = new 
PriorityThreadPoolExecutor(corePoolSize,1);
+        chain.addLast("executor", new ExecutorFilter(executor));
+        session.setHandler( new IoHandlerAdapter() {
+            @Override
+            public void messageReceived(IoSession session, Object message) 
throws Exception {
+                filterTriggered[0] = true;
+                throw new RuntimeException("A RuntimeException thrown during 
unit testing.");
+            }
+        });
+
+        // Execute system under test.
+        try {
+            chain.fireMessageReceived("foo");
+
+            // Shutting down and awaiting termination ensures that test 
execution blocks until Handler invocation has happened.
+            executor.shutdown();
+            executor.awaitTermination(10, TimeUnit.SECONDS);
+            if (!filterTriggered[0]) {
+                throw new IllegalStateException("Bug in test implementation: 
the session handler was never invoked.");
+            }
+
+            // Verify results.
+            final Field idleWorkersField = 
PriorityThreadPoolExecutor.class.getDeclaredField("idleWorkers"); // Using 
reflection as the field is not accessible. It might be nicer to make the field 
package-protected for testing.
+            idleWorkersField.setAccessible(true);
+            final AtomicInteger idleWorkers = (AtomicInteger) 
idleWorkersField.get(executor);
+            assertEquals("After all tasks have finished, the amount of workers 
that are idle should equal the amount of workers, but did not.", 
executor.getPoolSize(), idleWorkers.get());
+        } finally {
+            // Clean up test fixture.
+            if (!executor.isShutdown()) {
+                executor.shutdownNow();
+            }
+        }
+    }
+
+    /**
+     * Tests the state of {@link PriorityThreadPoolExecutor#idleWorkers} and 
{@link PriorityThreadPoolExecutor#workers}
+     * after a (checked) {@link Exception} is thrown by a session's handler 
that was invoked through an
+     * PriorityThreadPoolExecutor.
+     */
+    @Test
+    public void 
testPriorityThreadPoolExecutorSessionHandlerThrowingCheckedException() throws 
Exception
+    {
+        // Set up test fixture.
+        final boolean[] filterTriggered = {false}; // Used to verify the 
implementation of this test (to see if the Handler is invoked at all).
+        int corePoolSize = 1; // Prevent an idle worker from being cleaned up, 
which would skew the results of this test.
+        DummySession session = new DummySession();
+        IoFilterChain chain = session.getFilterChain();
+        PriorityThreadPoolExecutor executor = new 
PriorityThreadPoolExecutor(corePoolSize,1);
+        chain.addLast("executor", new ExecutorFilter(executor));
+        session.setHandler( new IoHandlerAdapter() {
+            @Override
+            public void messageReceived(IoSession session, Object message) 
throws Exception {
+                filterTriggered[0] = true;
+                throw new Exception("A (checked) Exception thrown during unit 
testing.");
+            }
+        });
+
+        // Execute system under test.
+        try {
+            chain.fireMessageReceived("foo");
+
+            // Shutting down and awaiting termination ensures that test 
execution blocks until Handler invocation has happened.
+            executor.shutdown();
+            executor.awaitTermination(10, TimeUnit.SECONDS);
+            if (!filterTriggered[0]) {
+                throw new IllegalStateException("Bug in test implementation: 
the session handler was never invoked.");
+            }
+
+            // Verify results.
+            final Field idleWorkersField = 
PriorityThreadPoolExecutor.class.getDeclaredField("idleWorkers"); // Using 
reflection as the field is not accessible. It might be nicer to make the field 
package-protected for testing.
+            idleWorkersField.setAccessible(true);
+            final AtomicInteger idleWorkers = (AtomicInteger) 
idleWorkersField.get(executor);
+            assertEquals("After all tasks have finished, the amount of workers 
that are idle should equal the amount of workers, but did not.", 
executor.getPoolSize(), idleWorkers.get());
+        } finally {
+            // Clean up test fixture.
+            if (!executor.isShutdown()) {
+                executor.shutdownNow();
+            }
+        }
+    }
+}

Reply via email to