Repository: mina
Updated Branches:
  refs/heads/2.1.0 d72f89017 -> 56ca189e1


Applies patch DIRMINA-1078; maven pass

Project: http://git-wip-us.apache.org/repos/asf/mina/repo
Commit: http://git-wip-us.apache.org/repos/asf/mina/commit/56ca189e
Tree: http://git-wip-us.apache.org/repos/asf/mina/tree/56ca189e
Diff: http://git-wip-us.apache.org/repos/asf/mina/diff/56ca189e

Branch: refs/heads/2.1.0
Commit: 56ca189e1b2de1b6d9ab3635dd8f331f13762009
Parents: d72f890
Author: johnnyv <john...@apache.org>
Authored: Sun Jul 22 08:13:42 2018 -0400
Committer: johnnyv <john...@apache.org>
Committed: Sun Jul 22 08:13:42 2018 -0400

----------------------------------------------------------------------
 .../executor/PriorityThreadPoolExecutor.java    | 885 +++++++++++++++++++
 .../PriorityThreadPoolExecutorTest.java         | 313 +++++++
 2 files changed, 1198 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mina/blob/56ca189e/mina-core/src/main/java/org/apache/mina/filter/executor/PriorityThreadPoolExecutor.java
----------------------------------------------------------------------
diff --git 
a/mina-core/src/main/java/org/apache/mina/filter/executor/PriorityThreadPoolExecutor.java
 
b/mina-core/src/main/java/org/apache/mina/filter/executor/PriorityThreadPoolExecutor.java
new file mode 100644
index 0000000..dd6b6e1
--- /dev/null
+++ 
b/mina-core/src/main/java/org/apache/mina/filter/executor/PriorityThreadPoolExecutor.java
@@ -0,0 +1,885 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ *
+ */
+package org.apache.mina.filter.executor;
+
+import org.apache.mina.core.session.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * A {@link ThreadPoolExecutor} that maintains the order of {@link IoEvent}s
+ * within a session (similar to {@link OrderedThreadPoolExecutor}) and allows
+ * some sessions to be prioritized over other sessions.
+ * <p>
+ * If you don't need to maintain the order of events per session, please use
+ * {@link UnorderedThreadPoolExecutor}.
+ * <p>
+ * If you don't need to prioritize sessions, please use
+ * {@link OrderedThreadPoolExecutor}.
+ *
+ * @author <a href="http://mina.apache.org";>Apache MINA Project</a>
+ * @author Guus der Kinderen, guus.der.kinde...@gmail.com
+ * @org.apache.xbean.XBean
+ */
+// TODO this class currently copies OrderedThreadPoolExecutor, and changes the
+// BlockingQueue used for the waitingSessions field. This code duplication
+// should be avoided.
+public class PriorityThreadPoolExecutor extends ThreadPoolExecutor {
+    /** A logger for this class (commented as it breaks MDCFlter tests) */
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(PriorityThreadPoolExecutor.class);
+
+    /** Generates sequential identifiers that ensure FIFO behavior. */
+    private static final AtomicLong seq = new AtomicLong(0);
+
+    /** A default value for the initial pool size */
+    private static final int DEFAULT_INITIAL_THREAD_POOL_SIZE = 0;
+
+    /** A default value for the maximum pool size */
+    private static final int DEFAULT_MAX_THREAD_POOL = 16;
+
+    /** A default value for the KeepAlive delay */
+    private static final int DEFAULT_KEEP_ALIVE = 30;
+
+    private static final SessionEntry EXIT_SIGNAL = new SessionEntry(new 
DummySession(), null);
+
+    /**
+     * A key stored into the session's attribute for the event tasks being
+     * queued
+     */
+    private static final AttributeKey TASKS_QUEUE = new 
AttributeKey(PriorityThreadPoolExecutor.class, "tasksQueue");
+
+    /** A queue used to store the available sessions */
+    private final BlockingQueue<SessionEntry> waitingSessions;
+
+    private final Set<Worker> workers = new HashSet<>();
+
+    private volatile int largestPoolSize;
+
+    private final AtomicInteger idleWorkers = new AtomicInteger();
+
+    private long completedTaskCount;
+
+    private volatile boolean shutdown;
+
+    private final IoEventQueueHandler eventQueueHandler;
+
+    private final Comparator<IoSession> comparator;
+
+    /**
+     * Creates a default ThreadPool, with default values : - minimum pool size
+     * is 0 - maximum pool size is 16 - keepAlive set to 30 seconds - A default
+     * ThreadFactory - All events are accepted
+     */
+    public PriorityThreadPoolExecutor() {
+       this(DEFAULT_INITIAL_THREAD_POOL_SIZE, DEFAULT_MAX_THREAD_POOL, 
DEFAULT_KEEP_ALIVE, TimeUnit.SECONDS, Executors.defaultThreadFactory(), null, 
null);
+    }
+
+    /**
+     * Creates a default ThreadPool, with default values : - minimum pool size
+     * is 0 - maximum pool size is 16 - keepAlive set to 30 seconds - A default
+     * ThreadFactory - All events are accepted
+     */
+    public PriorityThreadPoolExecutor(Comparator<IoSession> comparator) {
+       this(DEFAULT_INITIAL_THREAD_POOL_SIZE, DEFAULT_MAX_THREAD_POOL, 
DEFAULT_KEEP_ALIVE, TimeUnit.SECONDS, Executors.defaultThreadFactory(), null, 
comparator);
+    }
+
+    /**
+     * Creates a default ThreadPool, with default values : - minimum pool size
+     * is 0 - keepAlive set to 30 seconds - A default ThreadFactory - All 
events
+     * are accepted
+     *
+     * @param maximumPoolSize
+     *            The maximum pool size
+     */
+    public PriorityThreadPoolExecutor(int maximumPoolSize) {
+       this(DEFAULT_INITIAL_THREAD_POOL_SIZE, maximumPoolSize, 
DEFAULT_KEEP_ALIVE, TimeUnit.SECONDS, Executors.defaultThreadFactory(), null, 
null);
+    }
+
+    /**
+     * Creates a default ThreadPool, with default values : - minimum pool size
+     * is 0 - keepAlive set to 30 seconds - A default ThreadFactory - All 
events
+     * are accepted
+     *
+     * @param maximumPoolSize
+     *            The maximum pool size
+     */
+    public PriorityThreadPoolExecutor(int maximumPoolSize, 
Comparator<IoSession> comparator) {
+       this(DEFAULT_INITIAL_THREAD_POOL_SIZE, maximumPoolSize, 
DEFAULT_KEEP_ALIVE, TimeUnit.SECONDS, Executors.defaultThreadFactory(), null, 
comparator);
+    }
+
+    /**
+     * Creates a default ThreadPool, with default values : - keepAlive set to 
30
+     * seconds - A default ThreadFactory - All events are accepted
+     *
+     * @param corePoolSize
+     *            The initial pool sizePoolSize
+     * @param maximumPoolSize
+     *            The maximum pool size
+     */
+    public PriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize) {
+       this(corePoolSize, maximumPoolSize, DEFAULT_KEEP_ALIVE, 
TimeUnit.SECONDS, Executors.defaultThreadFactory(), null, null);
+    }
+
+    /**
+     * Creates a default ThreadPool, with default values : - A default
+     * ThreadFactory - All events are accepted
+     *
+     * @param corePoolSize
+     *            The initial pool sizePoolSize
+     * @param maximumPoolSize
+     *            The maximum pool size
+     * @param keepAliveTime
+     *            Default duration for a thread
+     * @param unit
+     *            Time unit used for the keepAlive value
+     */
+    public PriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize, 
long keepAliveTime, TimeUnit unit) {
+       this(corePoolSize, maximumPoolSize, keepAliveTime, unit, 
Executors.defaultThreadFactory(), null, null);
+    }
+
+    /**
+     * Creates a default ThreadPool, with default values : - A default
+     * ThreadFactory
+     *
+     * @param corePoolSize
+     *            The initial pool sizePoolSize
+     * @param maximumPoolSize
+     *            The maximum pool size
+     * @param keepAliveTime
+     *            Default duration for a thread
+     * @param unit
+     *            Time unit used for the keepAlive value
+     * @param eventQueueHandler
+     *            The queue used to store events
+     */
+    public PriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize, 
long keepAliveTime, TimeUnit unit, IoEventQueueHandler eventQueueHandler) {
+       this(corePoolSize, maximumPoolSize, keepAliveTime, unit, 
Executors.defaultThreadFactory(), eventQueueHandler, null);
+    }
+
+    /**
+     * Creates a default ThreadPool, with default values : - A default
+     * ThreadFactory
+     *
+     * @param corePoolSize
+     *            The initial pool sizePoolSize
+     * @param maximumPoolSize
+     *            The maximum pool size
+     * @param keepAliveTime
+     *            Default duration for a thread
+     * @param unit
+     *            Time unit used for the keepAlive value
+     * @param threadFactory
+     *            The factory used to create threads
+     */
+    public PriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize, 
long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) {
+       this(corePoolSize, maximumPoolSize, keepAliveTime, unit, threadFactory, 
null, null);
+    }
+
+    /**
+     * Creates a new instance of a PrioritisedOrderedThreadPoolExecutor.
+     *
+     * @param corePoolSize
+     *            The initial pool sizePoolSize
+     * @param maximumPoolSize
+     *            The maximum pool size
+     * @param keepAliveTime
+     *            Default duration for a thread
+     * @param unit
+     *            Time unit used for the keepAlive value
+     * @param threadFactory
+     *            The factory used to create threads
+     * @param eventQueueHandler
+     *            The queue used to store events
+     */
+    public PriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize, 
long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory, 
IoEventQueueHandler eventQueueHandler, Comparator<IoSession> comparator) {
+       // We have to initialize the pool with default values (0 and 1) in order
+       // to
+       // handle the exception in a better way. We can't add a try {} catch()
+       // {}
+       // around the super() call.
+       super(DEFAULT_INITIAL_THREAD_POOL_SIZE, 1, keepAliveTime, unit, new 
SynchronousQueue<Runnable>(), threadFactory, new AbortPolicy());
+
+       if (corePoolSize < DEFAULT_INITIAL_THREAD_POOL_SIZE) {
+           throw new IllegalArgumentException("corePoolSize: " + corePoolSize);
+       }
+
+       if ((maximumPoolSize == 0) || (maximumPoolSize < corePoolSize)) {
+           throw new IllegalArgumentException("maximumPoolSize: " + 
maximumPoolSize);
+       }
+
+       // Now, we can setup the pool sizes
+       super.setCorePoolSize(corePoolSize);
+       super.setMaximumPoolSize(maximumPoolSize);
+
+       // The queueHandler might be null.
+       if (eventQueueHandler == null) {
+           this.eventQueueHandler = IoEventQueueHandler.NOOP;
+       } else {
+           this.eventQueueHandler = eventQueueHandler;
+       }
+
+       // The comparator can be null.
+       this.comparator = comparator;
+
+       if (this.comparator == null) {
+           this.waitingSessions = new LinkedBlockingQueue<>();
+       } else {
+           this.waitingSessions = new PriorityBlockingQueue<>();
+       }
+    }
+
+    /**
+     * Get the session's tasks queue.
+     */
+    private SessionQueue getSessionTasksQueue(IoSession session) {
+       SessionQueue queue = (SessionQueue) session.getAttribute(TASKS_QUEUE);
+
+       if (queue == null) {
+           queue = new SessionQueue();
+           SessionQueue oldQueue = (SessionQueue) 
session.setAttributeIfAbsent(TASKS_QUEUE, queue);
+
+           if (oldQueue != null) {
+               queue = oldQueue;
+           }
+       }
+
+       return queue;
+    }
+
+    /**
+     * @return The associated queue handler.
+     */
+    public IoEventQueueHandler getQueueHandler() {
+       return eventQueueHandler;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void setRejectedExecutionHandler(RejectedExecutionHandler handler) {
+       // Ignore the request. It must always be AbortPolicy.
+    }
+
+    /**
+     * Add a new thread to execute a task, if needed and possible. It depends 
on
+     * the current pool size. If it's full, we do nothing.
+     */
+    private void addWorker() {
+       synchronized (workers) {
+           if (workers.size() >= super.getMaximumPoolSize()) {
+               return;
+           }
+
+           // Create a new worker, and add it to the thread pool
+           Worker worker = new Worker();
+           Thread thread = getThreadFactory().newThread(worker);
+
+           // As we have added a new thread, it's considered as idle.
+           idleWorkers.incrementAndGet();
+
+           // Now, we can start it.
+           thread.start();
+           workers.add(worker);
+
+           if (workers.size() > largestPoolSize) {
+               largestPoolSize = workers.size();
+           }
+       }
+    }
+
+    /**
+     * Add a new Worker only if there are no idle worker.
+     */
+    private void addWorkerIfNecessary() {
+       if (idleWorkers.get() == 0) {
+           synchronized (workers) {
+               if (workers.isEmpty() || (idleWorkers.get() == 0)) {
+                   addWorker();
+               }
+           }
+       }
+    }
+
+    private void removeWorker() {
+       synchronized (workers) {
+           if (workers.size() <= super.getCorePoolSize()) {
+               return;
+           }
+           waitingSessions.offer(EXIT_SIGNAL);
+       }
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void setMaximumPoolSize(int maximumPoolSize) {
+       if ((maximumPoolSize <= 0) || (maximumPoolSize < 
super.getCorePoolSize())) {
+           throw new IllegalArgumentException("maximumPoolSize: " + 
maximumPoolSize);
+       }
+
+       synchronized (workers) {
+           super.setMaximumPoolSize(maximumPoolSize);
+           int difference = workers.size() - maximumPoolSize;
+           while (difference > 0) {
+               removeWorker();
+               --difference;
+           }
+       }
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public boolean awaitTermination(long timeout, TimeUnit unit) throws 
InterruptedException {
+
+       long deadline = System.currentTimeMillis() + unit.toMillis(timeout);
+
+       synchronized (workers) {
+           while (!isTerminated()) {
+               long waitTime = deadline - System.currentTimeMillis();
+               if (waitTime <= 0) {
+                   break;
+               }
+
+               workers.wait(waitTime);
+           }
+       }
+       return isTerminated();
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public boolean isShutdown() {
+       return shutdown;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public boolean isTerminated() {
+       if (!shutdown) {
+           return false;
+       }
+
+       synchronized (workers) {
+           return workers.isEmpty();
+       }
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void shutdown() {
+       if (shutdown) {
+           return;
+       }
+
+       shutdown = true;
+
+       synchronized (workers) {
+           for (int i = workers.size(); i > 0; i--) {
+               waitingSessions.offer(EXIT_SIGNAL);
+           }
+       }
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public List<Runnable> shutdownNow() {
+       shutdown();
+
+       List<Runnable> answer = new ArrayList<>();
+       SessionEntry entry;
+
+       while ((entry = waitingSessions.poll()) != null) {
+           if (entry == EXIT_SIGNAL) {
+               waitingSessions.offer(EXIT_SIGNAL);
+               Thread.yield(); // Let others take the signal.
+               continue;
+           }
+
+           SessionQueue sessionTasksQueue = (SessionQueue) 
entry.getSession().getAttribute(TASKS_QUEUE);
+
+           synchronized (sessionTasksQueue.tasksQueue) {
+
+               for (Runnable task : sessionTasksQueue.tasksQueue) {
+                   getQueueHandler().polled(this, (IoEvent) task);
+                   answer.add(task);
+               }
+
+               sessionTasksQueue.tasksQueue.clear();
+           }
+       }
+
+       return answer;
+    }
+
+    /**
+     * A Helper class used to print the list of events being queued.
+     */
+    private void print(Queue<Runnable> queue, IoEvent event) {
+       StringBuilder sb = new StringBuilder();
+       sb.append("Adding event ").append(event.getType()).append(" to session 
").append(event.getSession().getId());
+       boolean first = true;
+       sb.append("\nQueue : [");
+       for (Runnable elem : queue) {
+           if (first) {
+               first = false;
+           } else {
+               sb.append(", ");
+           }
+
+           sb.append(((IoEvent) elem).getType()).append(", ");
+       }
+       sb.append("]\n");
+       LOGGER.debug(sb.toString());
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void execute(Runnable task) {
+       if (shutdown) {
+           rejectTask(task);
+       }
+
+       // Check that it's a IoEvent task
+       checkTaskType(task);
+
+       IoEvent event = (IoEvent) task;
+
+       // Get the associated session
+       IoSession session = event.getSession();
+
+       // Get the session's queue of events
+       SessionQueue sessionTasksQueue = getSessionTasksQueue(session);
+       Queue<Runnable> tasksQueue = sessionTasksQueue.tasksQueue;
+
+       boolean offerSession;
+
+       // propose the new event to the event queue handler. If we
+       // use a throttle queue handler, the message may be rejected
+       // if the maximum size has been reached.
+       boolean offerEvent = eventQueueHandler.accept(this, event);
+
+       if (offerEvent) {
+           // Ok, the message has been accepted
+           synchronized (tasksQueue) {
+               // Inject the event into the executor taskQueue
+               tasksQueue.offer(event);
+
+               if (sessionTasksQueue.processingCompleted) {
+                   sessionTasksQueue.processingCompleted = false;
+                   offerSession = true;
+               } else {
+                   offerSession = false;
+               }
+
+               if (LOGGER.isDebugEnabled()) {
+                   print(tasksQueue, event);
+               }
+           }
+       } else {
+           offerSession = false;
+       }
+
+       if (offerSession) {
+           // As the tasksQueue was empty, the task has been executed
+           // immediately, so we can move the session to the queue
+           // of sessions waiting for completion.
+           waitingSessions.offer(new SessionEntry(session, comparator));
+       }
+
+       addWorkerIfNecessary();
+
+       if (offerEvent) {
+           eventQueueHandler.offered(this, event);
+       }
+    }
+
+    private void rejectTask(Runnable task) {
+       getRejectedExecutionHandler().rejectedExecution(task, this);
+    }
+
+    private void checkTaskType(Runnable task) {
+       if (!(task instanceof IoEvent)) {
+           throw new IllegalArgumentException("task must be an IoEvent or its 
subclass.");
+       }
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public int getActiveCount() {
+       synchronized (workers) {
+           return workers.size() - idleWorkers.get();
+       }
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public long getCompletedTaskCount() {
+       synchronized (workers) {
+           long answer = completedTaskCount;
+           for (Worker w : workers) {
+               answer += w.completedTaskCount.get();
+           }
+
+           return answer;
+       }
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public int getLargestPoolSize() {
+       return largestPoolSize;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public int getPoolSize() {
+       synchronized (workers) {
+           return workers.size();
+       }
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public long getTaskCount() {
+       return getCompletedTaskCount();
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public boolean isTerminating() {
+       synchronized (workers) {
+           return isShutdown() && !isTerminated();
+       }
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public int prestartAllCoreThreads() {
+       int answer = 0;
+       synchronized (workers) {
+           for (int i = super.getCorePoolSize() - workers.size(); i > 0; i--) {
+               addWorker();
+               answer++;
+           }
+       }
+       return answer;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public boolean prestartCoreThread() {
+       synchronized (workers) {
+           if (workers.size() < super.getCorePoolSize()) {
+               addWorker();
+               return true;
+           } else {
+               return false;
+           }
+       }
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public BlockingQueue<Runnable> getQueue() {
+       throw new UnsupportedOperationException();
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void purge() {
+       // Nothing to purge in this implementation.
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public boolean remove(Runnable task) {
+       checkTaskType(task);
+       IoEvent event = (IoEvent) task;
+       IoSession session = event.getSession();
+       SessionQueue sessionTasksQueue = (SessionQueue) 
session.getAttribute(TASKS_QUEUE);
+
+       if (sessionTasksQueue == null) {
+           return false;
+       }
+
+       boolean removed;
+       Queue<Runnable> tasksQueue = sessionTasksQueue.tasksQueue;
+
+       synchronized (tasksQueue) {
+           removed = tasksQueue.remove(task);
+       }
+
+       if (removed) {
+           getQueueHandler().polled(this, event);
+       }
+
+       return removed;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void setCorePoolSize(int corePoolSize) {
+       if (corePoolSize < 0) {
+           throw new IllegalArgumentException("corePoolSize: " + corePoolSize);
+       }
+       if (corePoolSize > super.getMaximumPoolSize()) {
+           throw new IllegalArgumentException("corePoolSize exceeds 
maximumPoolSize");
+       }
+
+       synchronized (workers) {
+           if (super.getCorePoolSize() > corePoolSize) {
+               for (int i = super.getCorePoolSize() - corePoolSize; i > 0; 
i--) {
+                   removeWorker();
+               }
+           }
+           super.setCorePoolSize(corePoolSize);
+       }
+    }
+
+    private class Worker implements Runnable {
+
+       private AtomicLong completedTaskCount = new AtomicLong(0);
+
+       private Thread thread;
+
+       /**
+        * @inheritedDoc
+        */
+       @Override
+       public void run() {
+           thread = Thread.currentThread();
+
+           try {
+               for (;;) {
+                   IoSession session = fetchSession();
+
+                   idleWorkers.decrementAndGet();
+
+                   if (session == null) {
+                       synchronized (workers) {
+                           if (workers.size() > getCorePoolSize()) {
+                               // Remove now to prevent duplicate exit.
+                               workers.remove(this);
+                               break;
+                           }
+                       }
+                   }
+
+                   if (session == EXIT_SIGNAL) {
+                       break;
+                   }
+
+                   try {
+                       if (session != null) {
+                           runTasks(getSessionTasksQueue(session));
+                       }
+                   } finally {
+                       idleWorkers.incrementAndGet();
+                   }
+               }
+           } finally {
+               synchronized (workers) {
+                   workers.remove(this);
+                   PriorityThreadPoolExecutor.this.completedTaskCount += 
completedTaskCount.get();
+                   workers.notifyAll();
+               }
+           }
+       }
+
+       private IoSession fetchSession() {
+           SessionEntry entry = null;
+           long currentTime = System.currentTimeMillis();
+           long deadline = currentTime + 
getKeepAliveTime(TimeUnit.MILLISECONDS);
+
+           for (;;) {
+               try {
+                   long waitTime = deadline - currentTime;
+
+                   if (waitTime <= 0) {
+                       break;
+                   }
+
+                   try {
+                       entry = waitingSessions.poll(waitTime, 
TimeUnit.MILLISECONDS);
+                       break;
+                   } finally {
+                       if (entry != null) {
+                           currentTime = System.currentTimeMillis();
+                       }
+                   }
+               } catch (InterruptedException e) {
+                   // Ignore.
+                   continue;
+               }
+           }
+
+           if (entry != null) {
+               return entry.getSession();
+           }
+           return null;
+       }
+
+       private void runTasks(SessionQueue sessionTasksQueue) {
+           for (;;) {
+               Runnable task;
+               Queue<Runnable> tasksQueue = sessionTasksQueue.tasksQueue;
+
+               synchronized (tasksQueue) {
+                   task = tasksQueue.poll();
+
+                   if (task == null) {
+                       sessionTasksQueue.processingCompleted = true;
+                       break;
+                   }
+               }
+
+               eventQueueHandler.polled(PriorityThreadPoolExecutor.this, 
(IoEvent) task);
+
+               runTask(task);
+           }
+       }
+
+       private void runTask(Runnable task) {
+           beforeExecute(thread, task);
+           boolean ran = false;
+           try {
+               task.run();
+               ran = true;
+               afterExecute(task, null);
+               completedTaskCount.incrementAndGet();
+           } catch (RuntimeException e) {
+               if (!ran) {
+                   afterExecute(task, e);
+               }
+               throw e;
+           }
+       }
+    }
+
+    /**
+     * A class used to store the ordered list of events to be processed by the
+     * session, and the current task state.
+     */
+    private class SessionQueue {
+       /** A queue of ordered event waiting to be processed */
+       private final Queue<Runnable> tasksQueue = new 
ConcurrentLinkedQueue<>();
+
+       /** The current task state */
+       private boolean processingCompleted = true;
+    }
+
+    /**
+     * A class used to preserve first-in-first-out order of sessions that have
+     * equal priority.
+     */
+    static class SessionEntry implements Comparable<SessionEntry> {
+       private final long seqNum;
+       private final IoSession session;
+       private final Comparator<IoSession> comparator;
+
+       public SessionEntry(IoSession session, Comparator<IoSession> 
comparator) {
+           if (session == null) {
+               throw new IllegalArgumentException("session");
+           }
+           seqNum = seq.getAndIncrement();
+           this.session = session;
+           this.comparator = comparator;
+       }
+
+       public IoSession getSession() {
+           return session;
+       }
+
+       public int compareTo(SessionEntry other) {
+           if (other == this) {
+               return 0;
+           }
+
+           if (other.session == this.session) {
+               return 0;
+           }
+
+           // An exit signal should always be preferred.
+           if (this == EXIT_SIGNAL) {
+               return -1;
+           }
+           if (other == EXIT_SIGNAL) {
+               return 1;
+           }
+
+           int res = 0;
+
+           // If there's a comparator, use it to prioritise events.
+           if (comparator != null) {
+               res = comparator.compare(session, other.session);
+           }
+
+           // FIFO tiebreaker.
+           if (res == 0) {
+               res = (seqNum < other.seqNum ? -1 : 1);
+           }
+
+           return res;
+       }
+    }
+}

http://git-wip-us.apache.org/repos/asf/mina/blob/56ca189e/mina-core/src/test/java/org/apache/mina/filter/executor/PriorityThreadPoolExecutorTest.java
----------------------------------------------------------------------
diff --git 
a/mina-core/src/test/java/org/apache/mina/filter/executor/PriorityThreadPoolExecutorTest.java
 
b/mina-core/src/test/java/org/apache/mina/filter/executor/PriorityThreadPoolExecutorTest.java
new file mode 100644
index 0000000..fac0478
--- /dev/null
+++ 
b/mina-core/src/test/java/org/apache/mina/filter/executor/PriorityThreadPoolExecutorTest.java
@@ -0,0 +1,313 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ *
+ */
+package org.apache.mina.filter.executor;
+
+import org.apache.mina.core.filterchain.IoFilter;
+import org.apache.mina.core.session.DummySession;
+import org.apache.mina.core.session.IdleStatus;
+import org.apache.mina.core.session.IoSession;
+import org.apache.mina.core.write.WriteRequest;
+import org.apache.mina.filter.FilterEvent;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests that verify the functionality provided by the implementation of
+ * {@link PriorityThreadPoolExecutor}.
+ *
+ * @author Guus der Kinderen, guus.der.kinde...@gmail.com
+ */
+public class PriorityThreadPoolExecutorTest {
+    /**
+     * Tests that verify the functionality provided by the implementation of
+     * {@link 
org.apache.mina.filter.executor.PriorityThreadPoolExecutor.SessionEntry}
+     * .
+     *
+     * This test asserts that, without a provided comparator, entries are
+     * considered equal, when they reference the same session.
+     */
+    @Test
+    public void fifoEntryTestNoComparatorSameSession() throws Exception {
+       // Set up fixture.
+       final IoSession session = new DummySession();
+       final PriorityThreadPoolExecutor.SessionEntry first = new 
PriorityThreadPoolExecutor.SessionEntry(session, null);
+       final PriorityThreadPoolExecutor.SessionEntry last = new 
PriorityThreadPoolExecutor.SessionEntry(session, null);
+
+       // Execute system under test.
+       final int result = first.compareTo(last);
+
+       // Verify results.
+       assertEquals("Without a comparator, entries of the same session are 
expected to be equal.", 0, result);
+    }
+
+    /**
+     * Tests that verify the functionality provided by the implementation of
+     * {@link 
org.apache.mina.filter.executor.PriorityThreadPoolExecutor.SessionEntry}
+     * .
+     *
+     * This test asserts that, without a provided comparator, the first entry
+     * created is 'less than' an entry that is created later.
+     */
+    @Test
+    public void fifoEntryTestNoComparatorDifferentSession() throws Exception {
+       // Set up fixture (the order in which the entries are created is
+       // relevant here!)
+       final PriorityThreadPoolExecutor.SessionEntry first = new 
PriorityThreadPoolExecutor.SessionEntry(new DummySession(), null);
+       final PriorityThreadPoolExecutor.SessionEntry last = new 
PriorityThreadPoolExecutor.SessionEntry(new DummySession(), null);
+
+       // Execute system under test.
+       final int result = first.compareTo(last);
+
+       // Verify results.
+       assertTrue("Without a comparator, the first entry created should be the 
first entry out. Expected a negative result, instead, got: " + result, result < 
0);
+    }
+
+    /**
+     * Tests that verify the functionality provided by the implementation of
+     * {@link 
org.apache.mina.filter.executor.PriorityThreadPoolExecutor.SessionEntry}
+     * .
+     *
+     * This test asserts that, with a provided comparator, entries are
+     * considered equal, when they reference the same session (the provided
+     * comparator is ignored).
+     */
+    @Test
+    public void fifoEntryTestWithComparatorSameSession() throws Exception {
+       // Set up fixture.
+       final IoSession session = new DummySession();
+       final int predeterminedResult = 3853;
+       final Comparator<IoSession> comparator = new Comparator<IoSession>() {
+           @Override
+           public int compare(IoSession o1, IoSession o2) {
+               return predeterminedResult;
+           }
+       };
+
+       final PriorityThreadPoolExecutor.SessionEntry first = new 
PriorityThreadPoolExecutor.SessionEntry(session, comparator);
+       final PriorityThreadPoolExecutor.SessionEntry last = new 
PriorityThreadPoolExecutor.SessionEntry(session, comparator);
+
+       // Execute system under test.
+       final int result = first.compareTo(last);
+
+       // Verify results.
+       assertEquals("With a comparator, entries of the same session are 
expected to be equal.", 0, result);
+    }
+
+    /**
+     * Tests that verify the functionality provided by the implementation of
+     * {@link 
org.apache.mina.filter.executor.PriorityThreadPoolExecutor.SessionEntry}
+     * .
+     *
+     * This test asserts that a provided comparator is used instead of the
+     * (fallback) default behavior (when entries are referring different
+     * sessions).
+     */
+    @Test
+    public void fifoEntryTestComparatorDifferentSession() throws Exception {
+       // Set up fixture (the order in which the entries are created is
+       // relevant here!)
+       final int predeterminedResult = 3853;
+       final Comparator<IoSession> comparator = new Comparator<IoSession>() {
+           @Override
+           public int compare(IoSession o1, IoSession o2) {
+               return predeterminedResult;
+           }
+       };
+       final PriorityThreadPoolExecutor.SessionEntry first = new 
PriorityThreadPoolExecutor.SessionEntry(new DummySession(), comparator);
+       final PriorityThreadPoolExecutor.SessionEntry last = new 
PriorityThreadPoolExecutor.SessionEntry(new DummySession(), comparator);
+
+       // Execute system under test.
+       final int result = first.compareTo(last);
+
+       // Verify results.
+       assertEquals("With a comparator, comparing entries of different 
sessions is expected to yield the comparator result.", predeterminedResult, 
result);
+    }
+
+    /**
+     * Asserts that, when enough work is being submitted to the executor for it
+     * to start queuing work, prioritisation of work starts to occur.
+     *
+     * This implementation starts a number of sessions, and evenly distributes 
a
+     * number of messages to them. Processing each message is artificially made
+     * 'expensive', while the executor pool is kept small. This causes work to
+     * be queued in the executor.
+     *
+     * The executor that is used is configured to prefer one specific session.
+     * Each session records the timestamp of its last activity. After all work
+     * has been processed, the test asserts that the last activity of all
+     * sessions was later than the last activity of the preferred session.
+     */
+    @Test
+    public void testPrioritisation() throws Throwable {
+       // Set up fixture.
+       final MockWorkFilter nextFilter = new MockWorkFilter();
+       final List<LastActivityTracker> sessions = new ArrayList<>();
+       for (int i = 0; i < 10; i++) {
+           sessions.add(new LastActivityTracker());
+       }
+       final LastActivityTracker preferredSession = sessions.get(4); // prefer
+                                                                     // an
+                                                                     // 
arbitrary
+                                                                     // session
+                                                                     // (but
+                                                                     // not the
+                                                                     // first
+                                                                     // or last
+                                                                     // 
session,
+                                                                     // for
+                                                                     // good
+                                                                     // 
measure).
+       final Comparator<IoSession> comparator = new 
UnfairComparator(preferredSession);
+       final int maximumPoolSize = 1; // keep this low, to force resource
+                                      // contention.
+       final int amountOfTasks = 400;
+
+       final ExecutorService executor = new 
PriorityThreadPoolExecutor(maximumPoolSize, comparator);
+       final ExecutorFilter filter = new ExecutorFilter(executor);
+
+       // Execute system under test.
+       int sessionIndex = 0;
+       for (int i = 0; i < amountOfTasks; i++) {
+           if (++sessionIndex >= sessions.size()) {
+               sessionIndex = 0;
+           }
+
+           filter.messageReceived(nextFilter, sessions.get(sessionIndex), 
null);
+
+           if (nextFilter.throwable != null) {
+               throw nextFilter.throwable;
+           }
+       }
+
+       executor.shutdown();
+
+       // Verify results.
+       executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
+
+       for (final LastActivityTracker session : sessions) {
+           if (session != preferredSession) {
+               assertTrue("All other sessions should have finished later than 
the preferred session (but at least one did not).", session.lastActivity > 
preferredSession.lastActivity);
+           }
+       }
+    }
+
+    /**
+     * A comparator that prefers a particular session.
+     */
+    private static class UnfairComparator implements Comparator<IoSession> {
+       private final IoSession preferred;
+
+       public UnfairComparator(IoSession preferred) {
+           this.preferred = preferred;
+       }
+
+       @Override
+       public int compare(IoSession o1, IoSession o2) {
+           if (o1 == preferred) {
+               return -1;
+           }
+
+           if (o2 == preferred) {
+               return 1;
+           }
+
+           return 0;
+       }
+    }
+
+    /**
+     * A session that tracks the timestamp of last activity.
+     */
+    private static class LastActivityTracker extends DummySession {
+       long lastActivity = System.currentTimeMillis();
+
+       public synchronized void setLastActivity() {
+           lastActivity = System.currentTimeMillis();
+       }
+    }
+
+    /**
+     * A filter that simulates a non-negligible amount of work.
+     */
+    private static class MockWorkFilter implements IoFilter.NextFilter {
+       Throwable throwable;
+
+       public void sessionOpened(IoSession session) {
+           // Do nothing
+       }
+
+       public void sessionClosed(IoSession session) {
+           // Do nothing
+       }
+
+       public void sessionIdle(IoSession session, IdleStatus status) {
+           // Do nothing
+       }
+
+       public void exceptionCaught(IoSession session, Throwable cause) {
+           // Do nothing
+       }
+
+       public void inputClosed(IoSession session) {
+           // Do nothing
+       }
+
+       public void messageReceived(IoSession session, Object message) {
+           try {
+               Thread.sleep(20); // mimic work.
+               ((LastActivityTracker) session).setLastActivity();
+           } catch (Exception e) {
+               if (this.throwable == null) {
+                   this.throwable = e;
+               }
+           }
+       }
+
+       public void messageSent(IoSession session, WriteRequest writeRequest) {
+           // Do nothing
+       }
+
+       public void filterWrite(IoSession session, WriteRequest writeRequest) {
+           // Do nothing
+       }
+
+       public void filterClose(IoSession session) {
+           // Do nothing
+       }
+
+       public void sessionCreated(IoSession session) {
+           // Do nothing
+       }
+
+       @Override
+       public void event(IoSession session, FilterEvent event) {
+           // TODO Auto-generated method stub
+
+       }
+    }
+}

Reply via email to