This is an automated email from the ASF dual-hosted git repository.
elecharny pushed a commit to branch 2.1.X
in repository https://gitbox.apache.org/repos/asf/mina.git
The following commit(s) were added to refs/heads/2.1.X by this push:
new 79059ca Patch for DIRMINA-1110 applied. I suspect it'll fix
DIRMINA-1113...
79059ca is described below
commit 79059ca54b1fe6b8e98790b0f8aad12e82fa6f93
Author: emmanuel lecharny <[email protected]>
AuthorDate: Fri May 24 15:42:53 2019 +0200
Patch for DIRMINA-1110 applied. I suspect it'll fix DIRMINA-1113...
---
.../filter/executor/OrderedThreadPoolExecutor.java | 25 +-
.../executor/PriorityThreadPoolExecutor.java | 71 ++--
.../executor/PriorityThreadPoolExecutorTest.java | 359 ++++++++++-----------
3 files changed, 211 insertions(+), 244 deletions(-)
diff --git
a/mina-core/src/main/java/org/apache/mina/filter/executor/OrderedThreadPoolExecutor.java
b/mina-core/src/main/java/org/apache/mina/filter/executor/OrderedThreadPoolExecutor.java
index 65e97a0..aeada71 100644
---
a/mina-core/src/main/java/org/apache/mina/filter/executor/OrderedThreadPoolExecutor.java
+++
b/mina-core/src/main/java/org/apache/mina/filter/executor/OrderedThreadPoolExecutor.java
@@ -448,9 +448,6 @@ public class OrderedThreadPoolExecutor extends
ThreadPoolExecutor {
// Get the session's queue of events
SessionTasksQueue sessionTasksQueue = getSessionTasksQueue(session);
- Queue<Runnable> tasksQueue = sessionTasksQueue.tasksQueue;
-
- boolean offerSession;
// propose the new event to the event queue handler. If we
// use a throttle queue handler, the message may be rejected
@@ -459,30 +456,22 @@ public class OrderedThreadPoolExecutor extends
ThreadPoolExecutor {
if (offerEvent) {
// Ok, the message has been accepted
- synchronized (tasksQueue) {
+ synchronized (sessionTasksQueue.tasksQueue) {
// Inject the event into the executor taskQueue
- tasksQueue.offer(event);
+ sessionTasksQueue.tasksQueue.offer(event);
if (sessionTasksQueue.processingCompleted) {
sessionTasksQueue.processingCompleted = false;
- offerSession = true;
- } else {
- offerSession = false;
+ // Processing of the tasks queue of this session is
currently not
+ // scheduled or underway. As new tasks have now been
added, the
+ // session needs to be offered for processing.
+ waitingSessions.offer(session);
}
if (LOGGER.isDebugEnabled()) {
- print(tasksQueue, event);
+ print(sessionTasksQueue.tasksQueue, event);
}
}
- } else {
- offerSession = false;
- }
-
- if (offerSession) {
- // As the tasksQueue was empty, the task has been executed
- // immediately, so we can move the session to the queue
- // of sessions waiting for completion.
- waitingSessions.offer(session);
}
addWorkerIfNecessary();
diff --git
a/mina-core/src/main/java/org/apache/mina/filter/executor/PriorityThreadPoolExecutor.java
b/mina-core/src/main/java/org/apache/mina/filter/executor/PriorityThreadPoolExecutor.java
index 721005c..bd3ad65 100644
---
a/mina-core/src/main/java/org/apache/mina/filter/executor/PriorityThreadPoolExecutor.java
+++
b/mina-core/src/main/java/org/apache/mina/filter/executor/PriorityThreadPoolExecutor.java
@@ -205,28 +205,20 @@ public class PriorityThreadPoolExecutor extends
ThreadPoolExecutor {
/**
* Creates a new instance of a PrioritisedOrderedThreadPoolExecutor.
*
- * @param corePoolSize
- * The initial pool sizePoolSize
- * @param maximumPoolSize
- * The maximum pool size
- * @param keepAliveTime
- * Default duration for a thread
- * @param unit
- * Time unit used for the keepAlive value
- * @param threadFactory
- * The factory used to create threads
- * @param eventQueueHandler
- * The queue used to store events
+ * @param corePoolSize The initial pool sizePoolSize
+ * @param maximumPoolSize The maximum pool size
+ * @param keepAliveTime Default duration for a thread
+ * @param unit Time unit used for the keepAlive value
+ * @param threadFactory The factory used to create threads
+ * @param eventQueueHandler The queue used to store events
*/
public PriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit,
ThreadFactory threadFactory, IoEventQueueHandler
eventQueueHandler, Comparator<IoSession> comparator) {
- // We have to initialize the pool with default values (0 and 1) in
order
- // to
- // handle the exception in a better way. We can't add a try {} catch()
- // {}
+ // We have to initialise the pool with default values (0 and 1) in
order
+ // to handle the exception in a better way. We can't add a try {}
catch() {}
// around the super() call.
- super(DEFAULT_INITIAL_THREAD_POOL_SIZE, 1, keepAliveTime, unit, new
SynchronousQueue<Runnable>(), threadFactory,
- new AbortPolicy());
+ super(DEFAULT_INITIAL_THREAD_POOL_SIZE, 1, keepAliveTime, unit, new
SynchronousQueue<Runnable>(),
+ threadFactory, new AbortPolicy());
if (corePoolSize < DEFAULT_INITIAL_THREAD_POOL_SIZE) {
throw new IllegalArgumentException("corePoolSize: " +
corePoolSize);
@@ -260,12 +252,12 @@ public class PriorityThreadPoolExecutor extends
ThreadPoolExecutor {
/**
* Get the session's tasks queue.
*/
- private SessionQueue getSessionTasksQueue(IoSession session) {
- SessionQueue queue = (SessionQueue) session.getAttribute(TASKS_QUEUE);
+ private SessionTasksQueue getSessionTasksQueue(IoSession session) {
+ SessionTasksQueue queue = (SessionTasksQueue)
session.getAttribute(TASKS_QUEUE);
if (queue == null) {
- queue = new SessionQueue();
- SessionQueue oldQueue = (SessionQueue)
session.setAttributeIfAbsent(TASKS_QUEUE, queue);
+ queue = new SessionTasksQueue();
+ SessionTasksQueue oldQueue = (SessionTasksQueue)
session.setAttributeIfAbsent(TASKS_QUEUE, queue);
if (oldQueue != null) {
queue = oldQueue;
@@ -436,7 +428,7 @@ public class PriorityThreadPoolExecutor extends
ThreadPoolExecutor {
continue;
}
- SessionQueue sessionTasksQueue = (SessionQueue)
entry.getSession().getAttribute(TASKS_QUEUE);
+ SessionTasksQueue sessionTasksQueue = (SessionTasksQueue)
entry.getSession().getAttribute(TASKS_QUEUE);
synchronized (sessionTasksQueue.tasksQueue) {
@@ -496,10 +488,7 @@ public class PriorityThreadPoolExecutor extends
ThreadPoolExecutor {
IoSession session = event.getSession();
// Get the session's queue of events
- SessionQueue sessionTasksQueue = getSessionTasksQueue(session);
- Queue<Runnable> tasksQueue = sessionTasksQueue.tasksQueue;
-
- boolean offerSession;
+ SessionTasksQueue sessionTasksQueue = getSessionTasksQueue(session);
// propose the new event to the event queue handler. If we
// use a throttle queue handler, the message may be rejected
@@ -508,30 +497,22 @@ public class PriorityThreadPoolExecutor extends
ThreadPoolExecutor {
if (offerEvent) {
// Ok, the message has been accepted
- synchronized (tasksQueue) {
+ synchronized (sessionTasksQueue.tasksQueue) {
// Inject the event into the executor taskQueue
- tasksQueue.offer(event);
+ sessionTasksQueue.tasksQueue.offer(event);
if (sessionTasksQueue.processingCompleted) {
sessionTasksQueue.processingCompleted = false;
- offerSession = true;
- } else {
- offerSession = false;
+ // Processing of the tasks queue of this session is
currently not
+ // scheduled or underway. As new tasks have now been
added, the
+ // session needs to be offered for processing.
+ waitingSessions.offer(new SessionEntry(session,
comparator));
}
if (LOGGER.isDebugEnabled()) {
- print(tasksQueue, event);
+ print(sessionTasksQueue.tasksQueue, event);
}
}
- } else {
- offerSession = false;
- }
-
- if (offerSession) {
- // As the tasksQueue was empty, the task has been executed
- // immediately, so we can move the session to the queue
- // of sessions waiting for completion.
- waitingSessions.offer(new SessionEntry(session, comparator));
}
addWorkerIfNecessary();
@@ -666,7 +647,7 @@ public class PriorityThreadPoolExecutor extends
ThreadPoolExecutor {
checkTaskType(task);
IoEvent event = (IoEvent) task;
IoSession session = event.getSession();
- SessionQueue sessionTasksQueue = (SessionQueue)
session.getAttribute(TASKS_QUEUE);
+ SessionTasksQueue sessionTasksQueue = (SessionTasksQueue)
session.getAttribute(TASKS_QUEUE);
if (sessionTasksQueue == null) {
return false;
@@ -791,7 +772,7 @@ public class PriorityThreadPoolExecutor extends
ThreadPoolExecutor {
return null;
}
- private void runTasks(SessionQueue sessionTasksQueue) {
+ private void runTasks(SessionTasksQueue sessionTasksQueue) {
for (;;) {
Runnable task;
Queue<Runnable> tasksQueue = sessionTasksQueue.tasksQueue;
@@ -832,7 +813,7 @@ public class PriorityThreadPoolExecutor extends
ThreadPoolExecutor {
* A class used to store the ordered list of events to be processed by the
* session, and the current task state.
*/
- private class SessionQueue {
+ private class SessionTasksQueue {
/** A queue of ordered event waiting to be processed */
private final Queue<Runnable> tasksQueue = new
ConcurrentLinkedQueue<>();
diff --git
a/mina-core/src/test/java/org/apache/mina/filter/executor/PriorityThreadPoolExecutorTest.java
b/mina-core/src/test/java/org/apache/mina/filter/executor/PriorityThreadPoolExecutorTest.java
index fac0478..fe49857 100644
---
a/mina-core/src/test/java/org/apache/mina/filter/executor/PriorityThreadPoolExecutorTest.java
+++
b/mina-core/src/test/java/org/apache/mina/filter/executor/PriorityThreadPoolExecutorTest.java
@@ -53,16 +53,16 @@ public class PriorityThreadPoolExecutorTest {
*/
@Test
public void fifoEntryTestNoComparatorSameSession() throws Exception {
- // Set up fixture.
- final IoSession session = new DummySession();
- final PriorityThreadPoolExecutor.SessionEntry first = new
PriorityThreadPoolExecutor.SessionEntry(session, null);
- final PriorityThreadPoolExecutor.SessionEntry last = new
PriorityThreadPoolExecutor.SessionEntry(session, null);
-
- // Execute system under test.
- final int result = first.compareTo(last);
-
- // Verify results.
- assertEquals("Without a comparator, entries of the same session are
expected to be equal.", 0, result);
+ // Set up fixture.
+ IoSession session = new DummySession();
+ PriorityThreadPoolExecutor.SessionEntry first = new
PriorityThreadPoolExecutor.SessionEntry(session, null);
+ PriorityThreadPoolExecutor.SessionEntry last = new
PriorityThreadPoolExecutor.SessionEntry(session, null);
+
+ // Execute system under test.
+ int result = first.compareTo(last);
+
+ // Verify results.
+ assertEquals("Without a comparator, entries of the same session are
expected to be equal.", 0, result);
}
/**
@@ -75,16 +75,16 @@ public class PriorityThreadPoolExecutorTest {
*/
@Test
public void fifoEntryTestNoComparatorDifferentSession() throws Exception {
- // Set up fixture (the order in which the entries are created is
- // relevant here!)
- final PriorityThreadPoolExecutor.SessionEntry first = new
PriorityThreadPoolExecutor.SessionEntry(new DummySession(), null);
- final PriorityThreadPoolExecutor.SessionEntry last = new
PriorityThreadPoolExecutor.SessionEntry(new DummySession(), null);
-
- // Execute system under test.
- final int result = first.compareTo(last);
-
- // Verify results.
- assertTrue("Without a comparator, the first entry created should be the
first entry out. Expected a negative result, instead, got: " + result, result <
0);
+ // Set up fixture (the order in which the entries are created is
+ // relevant here!)
+ PriorityThreadPoolExecutor.SessionEntry first = new
PriorityThreadPoolExecutor.SessionEntry(new DummySession(), null);
+ PriorityThreadPoolExecutor.SessionEntry last = new
PriorityThreadPoolExecutor.SessionEntry(new DummySession(), null);
+
+ // Execute system under test.
+ int result = first.compareTo(last);
+
+ // Verify results.
+ assertTrue("Without a comparator, the first entry created should be
the first entry out. Expected a negative result, instead, got: " + result,
result < 0);
}
/**
@@ -98,24 +98,25 @@ public class PriorityThreadPoolExecutorTest {
*/
@Test
public void fifoEntryTestWithComparatorSameSession() throws Exception {
- // Set up fixture.
- final IoSession session = new DummySession();
- final int predeterminedResult = 3853;
- final Comparator<IoSession> comparator = new Comparator<IoSession>() {
- @Override
- public int compare(IoSession o1, IoSession o2) {
- return predeterminedResult;
- }
- };
-
- final PriorityThreadPoolExecutor.SessionEntry first = new
PriorityThreadPoolExecutor.SessionEntry(session, comparator);
- final PriorityThreadPoolExecutor.SessionEntry last = new
PriorityThreadPoolExecutor.SessionEntry(session, comparator);
-
- // Execute system under test.
- final int result = first.compareTo(last);
-
- // Verify results.
- assertEquals("With a comparator, entries of the same session are
expected to be equal.", 0, result);
+ // Set up fixture.
+ IoSession session = new DummySession();
+ final int predeterminedResult = 3853;
+
+ Comparator<IoSession> comparator = new Comparator<IoSession>() {
+ @Override
+ public int compare(IoSession o1, IoSession o2) {
+ return predeterminedResult;
+ }
+ };
+
+ PriorityThreadPoolExecutor.SessionEntry first = new
PriorityThreadPoolExecutor.SessionEntry(session, comparator);
+ PriorityThreadPoolExecutor.SessionEntry last = new
PriorityThreadPoolExecutor.SessionEntry(session, comparator);
+
+ // Execute system under test.
+ int result = first.compareTo(last);
+
+ // Verify results.
+ assertEquals("With a comparator, entries of the same session are
expected to be equal.", 0, result);
}
/**
@@ -129,23 +130,25 @@ public class PriorityThreadPoolExecutorTest {
*/
@Test
public void fifoEntryTestComparatorDifferentSession() throws Exception {
- // Set up fixture (the order in which the entries are created is
- // relevant here!)
- final int predeterminedResult = 3853;
- final Comparator<IoSession> comparator = new Comparator<IoSession>() {
- @Override
- public int compare(IoSession o1, IoSession o2) {
- return predeterminedResult;
- }
- };
- final PriorityThreadPoolExecutor.SessionEntry first = new
PriorityThreadPoolExecutor.SessionEntry(new DummySession(), comparator);
- final PriorityThreadPoolExecutor.SessionEntry last = new
PriorityThreadPoolExecutor.SessionEntry(new DummySession(), comparator);
-
- // Execute system under test.
- final int result = first.compareTo(last);
-
- // Verify results.
- assertEquals("With a comparator, comparing entries of different
sessions is expected to yield the comparator result.", predeterminedResult,
result);
+ // Set up fixture (the order in which the entries are created is
+ // relevant here!)
+ final int predeterminedResult = 3853;
+
+ Comparator<IoSession> comparator = new Comparator<IoSession>() {
+ @Override
+ public int compare(IoSession o1, IoSession o2) {
+ return predeterminedResult;
+ }
+ };
+
+ PriorityThreadPoolExecutor.SessionEntry first = new
PriorityThreadPoolExecutor.SessionEntry(new DummySession(), comparator);
+ PriorityThreadPoolExecutor.SessionEntry last = new
PriorityThreadPoolExecutor.SessionEntry(new DummySession(), comparator);
+
+ // Execute system under test.
+ int result = first.compareTo(last);
+
+ // Verify results.
+ assertEquals("With a comparator, comparing entries of different
sessions is expected to yield the comparator result.", predeterminedResult,
result);
}
/**
@@ -164,150 +167,144 @@ public class PriorityThreadPoolExecutorTest {
*/
@Test
public void testPrioritisation() throws Throwable {
- // Set up fixture.
- final MockWorkFilter nextFilter = new MockWorkFilter();
- final List<LastActivityTracker> sessions = new ArrayList<>();
- for (int i = 0; i < 10; i++) {
- sessions.add(new LastActivityTracker());
- }
- final LastActivityTracker preferredSession = sessions.get(4); // prefer
- // an
- //
arbitrary
- // session
- // (but
- // not the
- // first
- // or last
- //
session,
- // for
- // good
- //
measure).
- final Comparator<IoSession> comparator = new
UnfairComparator(preferredSession);
- final int maximumPoolSize = 1; // keep this low, to force resource
- // contention.
- final int amountOfTasks = 400;
-
- final ExecutorService executor = new
PriorityThreadPoolExecutor(maximumPoolSize, comparator);
- final ExecutorFilter filter = new ExecutorFilter(executor);
-
- // Execute system under test.
- int sessionIndex = 0;
- for (int i = 0; i < amountOfTasks; i++) {
- if (++sessionIndex >= sessions.size()) {
- sessionIndex = 0;
- }
-
- filter.messageReceived(nextFilter, sessions.get(sessionIndex),
null);
-
- if (nextFilter.throwable != null) {
- throw nextFilter.throwable;
- }
- }
-
- executor.shutdown();
-
- // Verify results.
- executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
-
- for (final LastActivityTracker session : sessions) {
- if (session != preferredSession) {
- assertTrue("All other sessions should have finished later than
the preferred session (but at least one did not).", session.lastActivity >
preferredSession.lastActivity);
- }
- }
+ // Set up fixture.
+ MockWorkFilter nextFilter = new MockWorkFilter();
+ List<LastActivityTracker> sessions = new ArrayList<>();
+
+ for (int i = 0; i < 10; i++) {
+ sessions.add(new LastActivityTracker());
+ }
+
+ LastActivityTracker preferredSession = sessions.get(4); // prefer an
arbitrary session
+ // (but not
the first or last
+ // session,
for good measure).
+ Comparator<IoSession> comparator = new
UnfairComparator(preferredSession);
+ int maximumPoolSize = 1; // keep this low, to force resource
contention.
+ int amountOfTasks = 400;
+
+ ExecutorService executor = new
PriorityThreadPoolExecutor(maximumPoolSize, comparator);
+ ExecutorFilter filter = new ExecutorFilter(executor);
+
+ // Execute system under test.
+ int sessionIndex = 0;
+ for (int i = 0; i < amountOfTasks; i++) {
+ if (++sessionIndex >= sessions.size()) {
+ sessionIndex = 0;
+ }
+
+ filter.messageReceived(nextFilter, sessions.get(sessionIndex),
null);
+
+ if (nextFilter.throwable != null) {
+ throw nextFilter.throwable;
+ }
+ }
+
+ executor.shutdown();
+
+ // Verify results.
+ executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
+
+ for (LastActivityTracker session : sessions) {
+ if (session != preferredSession) {
+ assertTrue("All other sessions should have finished later than
the preferred session (but at least one did not).",
+ session.lastActivity > preferredSession.lastActivity);
+ }
+ }
}
/**
* A comparator that prefers a particular session.
*/
private static class UnfairComparator implements Comparator<IoSession> {
- private final IoSession preferred;
-
- public UnfairComparator(IoSession preferred) {
- this.preferred = preferred;
- }
-
- @Override
- public int compare(IoSession o1, IoSession o2) {
- if (o1 == preferred) {
- return -1;
- }
-
- if (o2 == preferred) {
- return 1;
- }
-
- return 0;
- }
+ private IoSession preferred;
+
+ public UnfairComparator(IoSession preferred) {
+ this.preferred = preferred;
+ }
+
+ @Override
+ public int compare(IoSession o1, IoSession o2) {
+ if (o1 == preferred) {
+ System.out.println( "session1 preferred" );
+ return -1;
+ }
+
+ if (o2 == preferred) {
+ System.out.println( "session2 preferred" + ", o2=" + o2 + "
preferred=" + preferred );
+ return 1;
+ }
+
+ return 0;
+ }
}
/**
* A session that tracks the timestamp of last activity.
*/
private static class LastActivityTracker extends DummySession {
- long lastActivity = System.currentTimeMillis();
+ long lastActivity = System.currentTimeMillis();
- public synchronized void setLastActivity() {
- lastActivity = System.currentTimeMillis();
- }
+ public synchronized void setLastActivity() {
+ lastActivity = System.currentTimeMillis();
+ }
}
/**
* A filter that simulates a non-negligible amount of work.
*/
private static class MockWorkFilter implements IoFilter.NextFilter {
- Throwable throwable;
-
- public void sessionOpened(IoSession session) {
- // Do nothing
- }
-
- public void sessionClosed(IoSession session) {
- // Do nothing
- }
-
- public void sessionIdle(IoSession session, IdleStatus status) {
- // Do nothing
- }
-
- public void exceptionCaught(IoSession session, Throwable cause) {
- // Do nothing
- }
-
- public void inputClosed(IoSession session) {
- // Do nothing
- }
-
- public void messageReceived(IoSession session, Object message) {
- try {
- Thread.sleep(20); // mimic work.
- ((LastActivityTracker) session).setLastActivity();
- } catch (Exception e) {
- if (this.throwable == null) {
- this.throwable = e;
- }
- }
- }
-
- public void messageSent(IoSession session, WriteRequest writeRequest) {
- // Do nothing
- }
-
- public void filterWrite(IoSession session, WriteRequest writeRequest) {
- // Do nothing
- }
-
- public void filterClose(IoSession session) {
- // Do nothing
- }
-
- public void sessionCreated(IoSession session) {
- // Do nothing
- }
-
- @Override
- public void event(IoSession session, FilterEvent event) {
- // TODO Auto-generated method stub
-
- }
+ Throwable throwable;
+
+ public void sessionOpened(IoSession session) {
+ // Do nothing
+ }
+
+ public void sessionClosed(IoSession session) {
+ // Do nothing
+ }
+
+ public void sessionIdle(IoSession session, IdleStatus status) {
+ // Do nothing
+ }
+
+ public void exceptionCaught(IoSession session, Throwable cause) {
+ // Do nothing
+ }
+
+ public void inputClosed(IoSession session) {
+ // Do nothing
+ }
+
+ public void messageReceived(IoSession session, Object message) {
+ try {
+ Thread.sleep(20); // mimic work.
+ ((LastActivityTracker) session).setLastActivity();
+ } catch (Exception e) {
+ if (this.throwable == null) {
+ this.throwable = e;
+ }
+ }
+ }
+
+ public void messageSent(IoSession session, WriteRequest writeRequest) {
+ // Do nothing
+ }
+
+ public void filterWrite(IoSession session, WriteRequest writeRequest) {
+ // Do nothing
+ }
+
+ public void filterClose(IoSession session) {
+ // Do nothing
+ }
+
+ public void sessionCreated(IoSession session) {
+ // Do nothing
+ }
+
+ @Override
+ public void event(IoSession session, FilterEvent event) {
+ // TODO Auto-generated method stub
+ }
}
}