Repository: mina Updated Branches: refs/heads/2.1.0 d72f89017 -> 56ca189e1
Applies patch DIRMINA-1078; maven pass Project: http://git-wip-us.apache.org/repos/asf/mina/repo Commit: http://git-wip-us.apache.org/repos/asf/mina/commit/56ca189e Tree: http://git-wip-us.apache.org/repos/asf/mina/tree/56ca189e Diff: http://git-wip-us.apache.org/repos/asf/mina/diff/56ca189e Branch: refs/heads/2.1.0 Commit: 56ca189e1b2de1b6d9ab3635dd8f331f13762009 Parents: d72f890 Author: johnnyv <john...@apache.org> Authored: Sun Jul 22 08:13:42 2018 -0400 Committer: johnnyv <john...@apache.org> Committed: Sun Jul 22 08:13:42 2018 -0400 ---------------------------------------------------------------------- .../executor/PriorityThreadPoolExecutor.java | 885 +++++++++++++++++++ .../PriorityThreadPoolExecutorTest.java | 313 +++++++ 2 files changed, 1198 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mina/blob/56ca189e/mina-core/src/main/java/org/apache/mina/filter/executor/PriorityThreadPoolExecutor.java ---------------------------------------------------------------------- diff --git a/mina-core/src/main/java/org/apache/mina/filter/executor/PriorityThreadPoolExecutor.java b/mina-core/src/main/java/org/apache/mina/filter/executor/PriorityThreadPoolExecutor.java new file mode 100644 index 0000000..dd6b6e1 --- /dev/null +++ b/mina-core/src/main/java/org/apache/mina/filter/executor/PriorityThreadPoolExecutor.java @@ -0,0 +1,885 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.mina.filter.executor; + +import org.apache.mina.core.session.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +/** + * A {@link ThreadPoolExecutor} that maintains the order of {@link IoEvent}s + * within a session (similar to {@link OrderedThreadPoolExecutor}) and allows + * some sessions to be prioritized over other sessions. + * <p> + * If you don't need to maintain the order of events per session, please use + * {@link UnorderedThreadPoolExecutor}. + * <p> + * If you don't need to prioritize sessions, please use + * {@link OrderedThreadPoolExecutor}. + * + * @author <a href="http://mina.apache.org">Apache MINA Project</a> + * @author Guus der Kinderen, guus.der.kinde...@gmail.com + * @org.apache.xbean.XBean + */ +// TODO this class currently copies OrderedThreadPoolExecutor, and changes the +// BlockingQueue used for the waitingSessions field. This code duplication +// should be avoided. +public class PriorityThreadPoolExecutor extends ThreadPoolExecutor { + /** A logger for this class (commented as it breaks MDCFlter tests) */ + private static final Logger LOGGER = LoggerFactory.getLogger(PriorityThreadPoolExecutor.class); + + /** Generates sequential identifiers that ensure FIFO behavior. */ + private static final AtomicLong seq = new AtomicLong(0); + + /** A default value for the initial pool size */ + private static final int DEFAULT_INITIAL_THREAD_POOL_SIZE = 0; + + /** A default value for the maximum pool size */ + private static final int DEFAULT_MAX_THREAD_POOL = 16; + + /** A default value for the KeepAlive delay */ + private static final int DEFAULT_KEEP_ALIVE = 30; + + private static final SessionEntry EXIT_SIGNAL = new SessionEntry(new DummySession(), null); + + /** + * A key stored into the session's attribute for the event tasks being + * queued + */ + private static final AttributeKey TASKS_QUEUE = new AttributeKey(PriorityThreadPoolExecutor.class, "tasksQueue"); + + /** A queue used to store the available sessions */ + private final BlockingQueue<SessionEntry> waitingSessions; + + private final Set<Worker> workers = new HashSet<>(); + + private volatile int largestPoolSize; + + private final AtomicInteger idleWorkers = new AtomicInteger(); + + private long completedTaskCount; + + private volatile boolean shutdown; + + private final IoEventQueueHandler eventQueueHandler; + + private final Comparator<IoSession> comparator; + + /** + * Creates a default ThreadPool, with default values : - minimum pool size + * is 0 - maximum pool size is 16 - keepAlive set to 30 seconds - A default + * ThreadFactory - All events are accepted + */ + public PriorityThreadPoolExecutor() { + this(DEFAULT_INITIAL_THREAD_POOL_SIZE, DEFAULT_MAX_THREAD_POOL, DEFAULT_KEEP_ALIVE, TimeUnit.SECONDS, Executors.defaultThreadFactory(), null, null); + } + + /** + * Creates a default ThreadPool, with default values : - minimum pool size + * is 0 - maximum pool size is 16 - keepAlive set to 30 seconds - A default + * ThreadFactory - All events are accepted + */ + public PriorityThreadPoolExecutor(Comparator<IoSession> comparator) { + this(DEFAULT_INITIAL_THREAD_POOL_SIZE, DEFAULT_MAX_THREAD_POOL, DEFAULT_KEEP_ALIVE, TimeUnit.SECONDS, Executors.defaultThreadFactory(), null, comparator); + } + + /** + * Creates a default ThreadPool, with default values : - minimum pool size + * is 0 - keepAlive set to 30 seconds - A default ThreadFactory - All events + * are accepted + * + * @param maximumPoolSize + * The maximum pool size + */ + public PriorityThreadPoolExecutor(int maximumPoolSize) { + this(DEFAULT_INITIAL_THREAD_POOL_SIZE, maximumPoolSize, DEFAULT_KEEP_ALIVE, TimeUnit.SECONDS, Executors.defaultThreadFactory(), null, null); + } + + /** + * Creates a default ThreadPool, with default values : - minimum pool size + * is 0 - keepAlive set to 30 seconds - A default ThreadFactory - All events + * are accepted + * + * @param maximumPoolSize + * The maximum pool size + */ + public PriorityThreadPoolExecutor(int maximumPoolSize, Comparator<IoSession> comparator) { + this(DEFAULT_INITIAL_THREAD_POOL_SIZE, maximumPoolSize, DEFAULT_KEEP_ALIVE, TimeUnit.SECONDS, Executors.defaultThreadFactory(), null, comparator); + } + + /** + * Creates a default ThreadPool, with default values : - keepAlive set to 30 + * seconds - A default ThreadFactory - All events are accepted + * + * @param corePoolSize + * The initial pool sizePoolSize + * @param maximumPoolSize + * The maximum pool size + */ + public PriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize) { + this(corePoolSize, maximumPoolSize, DEFAULT_KEEP_ALIVE, TimeUnit.SECONDS, Executors.defaultThreadFactory(), null, null); + } + + /** + * Creates a default ThreadPool, with default values : - A default + * ThreadFactory - All events are accepted + * + * @param corePoolSize + * The initial pool sizePoolSize + * @param maximumPoolSize + * The maximum pool size + * @param keepAliveTime + * Default duration for a thread + * @param unit + * Time unit used for the keepAlive value + */ + public PriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit) { + this(corePoolSize, maximumPoolSize, keepAliveTime, unit, Executors.defaultThreadFactory(), null, null); + } + + /** + * Creates a default ThreadPool, with default values : - A default + * ThreadFactory + * + * @param corePoolSize + * The initial pool sizePoolSize + * @param maximumPoolSize + * The maximum pool size + * @param keepAliveTime + * Default duration for a thread + * @param unit + * Time unit used for the keepAlive value + * @param eventQueueHandler + * The queue used to store events + */ + public PriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, IoEventQueueHandler eventQueueHandler) { + this(corePoolSize, maximumPoolSize, keepAliveTime, unit, Executors.defaultThreadFactory(), eventQueueHandler, null); + } + + /** + * Creates a default ThreadPool, with default values : - A default + * ThreadFactory + * + * @param corePoolSize + * The initial pool sizePoolSize + * @param maximumPoolSize + * The maximum pool size + * @param keepAliveTime + * Default duration for a thread + * @param unit + * Time unit used for the keepAlive value + * @param threadFactory + * The factory used to create threads + */ + public PriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) { + this(corePoolSize, maximumPoolSize, keepAliveTime, unit, threadFactory, null, null); + } + + /** + * Creates a new instance of a PrioritisedOrderedThreadPoolExecutor. + * + * @param corePoolSize + * The initial pool sizePoolSize + * @param maximumPoolSize + * The maximum pool size + * @param keepAliveTime + * Default duration for a thread + * @param unit + * Time unit used for the keepAlive value + * @param threadFactory + * The factory used to create threads + * @param eventQueueHandler + * The queue used to store events + */ + public PriorityThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory, IoEventQueueHandler eventQueueHandler, Comparator<IoSession> comparator) { + // We have to initialize the pool with default values (0 and 1) in order + // to + // handle the exception in a better way. We can't add a try {} catch() + // {} + // around the super() call. + super(DEFAULT_INITIAL_THREAD_POOL_SIZE, 1, keepAliveTime, unit, new SynchronousQueue<Runnable>(), threadFactory, new AbortPolicy()); + + if (corePoolSize < DEFAULT_INITIAL_THREAD_POOL_SIZE) { + throw new IllegalArgumentException("corePoolSize: " + corePoolSize); + } + + if ((maximumPoolSize == 0) || (maximumPoolSize < corePoolSize)) { + throw new IllegalArgumentException("maximumPoolSize: " + maximumPoolSize); + } + + // Now, we can setup the pool sizes + super.setCorePoolSize(corePoolSize); + super.setMaximumPoolSize(maximumPoolSize); + + // The queueHandler might be null. + if (eventQueueHandler == null) { + this.eventQueueHandler = IoEventQueueHandler.NOOP; + } else { + this.eventQueueHandler = eventQueueHandler; + } + + // The comparator can be null. + this.comparator = comparator; + + if (this.comparator == null) { + this.waitingSessions = new LinkedBlockingQueue<>(); + } else { + this.waitingSessions = new PriorityBlockingQueue<>(); + } + } + + /** + * Get the session's tasks queue. + */ + private SessionQueue getSessionTasksQueue(IoSession session) { + SessionQueue queue = (SessionQueue) session.getAttribute(TASKS_QUEUE); + + if (queue == null) { + queue = new SessionQueue(); + SessionQueue oldQueue = (SessionQueue) session.setAttributeIfAbsent(TASKS_QUEUE, queue); + + if (oldQueue != null) { + queue = oldQueue; + } + } + + return queue; + } + + /** + * @return The associated queue handler. + */ + public IoEventQueueHandler getQueueHandler() { + return eventQueueHandler; + } + + /** + * {@inheritDoc} + */ + @Override + public void setRejectedExecutionHandler(RejectedExecutionHandler handler) { + // Ignore the request. It must always be AbortPolicy. + } + + /** + * Add a new thread to execute a task, if needed and possible. It depends on + * the current pool size. If it's full, we do nothing. + */ + private void addWorker() { + synchronized (workers) { + if (workers.size() >= super.getMaximumPoolSize()) { + return; + } + + // Create a new worker, and add it to the thread pool + Worker worker = new Worker(); + Thread thread = getThreadFactory().newThread(worker); + + // As we have added a new thread, it's considered as idle. + idleWorkers.incrementAndGet(); + + // Now, we can start it. + thread.start(); + workers.add(worker); + + if (workers.size() > largestPoolSize) { + largestPoolSize = workers.size(); + } + } + } + + /** + * Add a new Worker only if there are no idle worker. + */ + private void addWorkerIfNecessary() { + if (idleWorkers.get() == 0) { + synchronized (workers) { + if (workers.isEmpty() || (idleWorkers.get() == 0)) { + addWorker(); + } + } + } + } + + private void removeWorker() { + synchronized (workers) { + if (workers.size() <= super.getCorePoolSize()) { + return; + } + waitingSessions.offer(EXIT_SIGNAL); + } + } + + /** + * {@inheritDoc} + */ + @Override + public void setMaximumPoolSize(int maximumPoolSize) { + if ((maximumPoolSize <= 0) || (maximumPoolSize < super.getCorePoolSize())) { + throw new IllegalArgumentException("maximumPoolSize: " + maximumPoolSize); + } + + synchronized (workers) { + super.setMaximumPoolSize(maximumPoolSize); + int difference = workers.size() - maximumPoolSize; + while (difference > 0) { + removeWorker(); + --difference; + } + } + } + + /** + * {@inheritDoc} + */ + @Override + public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { + + long deadline = System.currentTimeMillis() + unit.toMillis(timeout); + + synchronized (workers) { + while (!isTerminated()) { + long waitTime = deadline - System.currentTimeMillis(); + if (waitTime <= 0) { + break; + } + + workers.wait(waitTime); + } + } + return isTerminated(); + } + + /** + * {@inheritDoc} + */ + @Override + public boolean isShutdown() { + return shutdown; + } + + /** + * {@inheritDoc} + */ + @Override + public boolean isTerminated() { + if (!shutdown) { + return false; + } + + synchronized (workers) { + return workers.isEmpty(); + } + } + + /** + * {@inheritDoc} + */ + @Override + public void shutdown() { + if (shutdown) { + return; + } + + shutdown = true; + + synchronized (workers) { + for (int i = workers.size(); i > 0; i--) { + waitingSessions.offer(EXIT_SIGNAL); + } + } + } + + /** + * {@inheritDoc} + */ + @Override + public List<Runnable> shutdownNow() { + shutdown(); + + List<Runnable> answer = new ArrayList<>(); + SessionEntry entry; + + while ((entry = waitingSessions.poll()) != null) { + if (entry == EXIT_SIGNAL) { + waitingSessions.offer(EXIT_SIGNAL); + Thread.yield(); // Let others take the signal. + continue; + } + + SessionQueue sessionTasksQueue = (SessionQueue) entry.getSession().getAttribute(TASKS_QUEUE); + + synchronized (sessionTasksQueue.tasksQueue) { + + for (Runnable task : sessionTasksQueue.tasksQueue) { + getQueueHandler().polled(this, (IoEvent) task); + answer.add(task); + } + + sessionTasksQueue.tasksQueue.clear(); + } + } + + return answer; + } + + /** + * A Helper class used to print the list of events being queued. + */ + private void print(Queue<Runnable> queue, IoEvent event) { + StringBuilder sb = new StringBuilder(); + sb.append("Adding event ").append(event.getType()).append(" to session ").append(event.getSession().getId()); + boolean first = true; + sb.append("\nQueue : ["); + for (Runnable elem : queue) { + if (first) { + first = false; + } else { + sb.append(", "); + } + + sb.append(((IoEvent) elem).getType()).append(", "); + } + sb.append("]\n"); + LOGGER.debug(sb.toString()); + } + + /** + * {@inheritDoc} + */ + @Override + public void execute(Runnable task) { + if (shutdown) { + rejectTask(task); + } + + // Check that it's a IoEvent task + checkTaskType(task); + + IoEvent event = (IoEvent) task; + + // Get the associated session + IoSession session = event.getSession(); + + // Get the session's queue of events + SessionQueue sessionTasksQueue = getSessionTasksQueue(session); + Queue<Runnable> tasksQueue = sessionTasksQueue.tasksQueue; + + boolean offerSession; + + // propose the new event to the event queue handler. If we + // use a throttle queue handler, the message may be rejected + // if the maximum size has been reached. + boolean offerEvent = eventQueueHandler.accept(this, event); + + if (offerEvent) { + // Ok, the message has been accepted + synchronized (tasksQueue) { + // Inject the event into the executor taskQueue + tasksQueue.offer(event); + + if (sessionTasksQueue.processingCompleted) { + sessionTasksQueue.processingCompleted = false; + offerSession = true; + } else { + offerSession = false; + } + + if (LOGGER.isDebugEnabled()) { + print(tasksQueue, event); + } + } + } else { + offerSession = false; + } + + if (offerSession) { + // As the tasksQueue was empty, the task has been executed + // immediately, so we can move the session to the queue + // of sessions waiting for completion. + waitingSessions.offer(new SessionEntry(session, comparator)); + } + + addWorkerIfNecessary(); + + if (offerEvent) { + eventQueueHandler.offered(this, event); + } + } + + private void rejectTask(Runnable task) { + getRejectedExecutionHandler().rejectedExecution(task, this); + } + + private void checkTaskType(Runnable task) { + if (!(task instanceof IoEvent)) { + throw new IllegalArgumentException("task must be an IoEvent or its subclass."); + } + } + + /** + * {@inheritDoc} + */ + @Override + public int getActiveCount() { + synchronized (workers) { + return workers.size() - idleWorkers.get(); + } + } + + /** + * {@inheritDoc} + */ + @Override + public long getCompletedTaskCount() { + synchronized (workers) { + long answer = completedTaskCount; + for (Worker w : workers) { + answer += w.completedTaskCount.get(); + } + + return answer; + } + } + + /** + * {@inheritDoc} + */ + @Override + public int getLargestPoolSize() { + return largestPoolSize; + } + + /** + * {@inheritDoc} + */ + @Override + public int getPoolSize() { + synchronized (workers) { + return workers.size(); + } + } + + /** + * {@inheritDoc} + */ + @Override + public long getTaskCount() { + return getCompletedTaskCount(); + } + + /** + * {@inheritDoc} + */ + @Override + public boolean isTerminating() { + synchronized (workers) { + return isShutdown() && !isTerminated(); + } + } + + /** + * {@inheritDoc} + */ + @Override + public int prestartAllCoreThreads() { + int answer = 0; + synchronized (workers) { + for (int i = super.getCorePoolSize() - workers.size(); i > 0; i--) { + addWorker(); + answer++; + } + } + return answer; + } + + /** + * {@inheritDoc} + */ + @Override + public boolean prestartCoreThread() { + synchronized (workers) { + if (workers.size() < super.getCorePoolSize()) { + addWorker(); + return true; + } else { + return false; + } + } + } + + /** + * {@inheritDoc} + */ + @Override + public BlockingQueue<Runnable> getQueue() { + throw new UnsupportedOperationException(); + } + + /** + * {@inheritDoc} + */ + @Override + public void purge() { + // Nothing to purge in this implementation. + } + + /** + * {@inheritDoc} + */ + @Override + public boolean remove(Runnable task) { + checkTaskType(task); + IoEvent event = (IoEvent) task; + IoSession session = event.getSession(); + SessionQueue sessionTasksQueue = (SessionQueue) session.getAttribute(TASKS_QUEUE); + + if (sessionTasksQueue == null) { + return false; + } + + boolean removed; + Queue<Runnable> tasksQueue = sessionTasksQueue.tasksQueue; + + synchronized (tasksQueue) { + removed = tasksQueue.remove(task); + } + + if (removed) { + getQueueHandler().polled(this, event); + } + + return removed; + } + + /** + * {@inheritDoc} + */ + @Override + public void setCorePoolSize(int corePoolSize) { + if (corePoolSize < 0) { + throw new IllegalArgumentException("corePoolSize: " + corePoolSize); + } + if (corePoolSize > super.getMaximumPoolSize()) { + throw new IllegalArgumentException("corePoolSize exceeds maximumPoolSize"); + } + + synchronized (workers) { + if (super.getCorePoolSize() > corePoolSize) { + for (int i = super.getCorePoolSize() - corePoolSize; i > 0; i--) { + removeWorker(); + } + } + super.setCorePoolSize(corePoolSize); + } + } + + private class Worker implements Runnable { + + private AtomicLong completedTaskCount = new AtomicLong(0); + + private Thread thread; + + /** + * @inheritedDoc + */ + @Override + public void run() { + thread = Thread.currentThread(); + + try { + for (;;) { + IoSession session = fetchSession(); + + idleWorkers.decrementAndGet(); + + if (session == null) { + synchronized (workers) { + if (workers.size() > getCorePoolSize()) { + // Remove now to prevent duplicate exit. + workers.remove(this); + break; + } + } + } + + if (session == EXIT_SIGNAL) { + break; + } + + try { + if (session != null) { + runTasks(getSessionTasksQueue(session)); + } + } finally { + idleWorkers.incrementAndGet(); + } + } + } finally { + synchronized (workers) { + workers.remove(this); + PriorityThreadPoolExecutor.this.completedTaskCount += completedTaskCount.get(); + workers.notifyAll(); + } + } + } + + private IoSession fetchSession() { + SessionEntry entry = null; + long currentTime = System.currentTimeMillis(); + long deadline = currentTime + getKeepAliveTime(TimeUnit.MILLISECONDS); + + for (;;) { + try { + long waitTime = deadline - currentTime; + + if (waitTime <= 0) { + break; + } + + try { + entry = waitingSessions.poll(waitTime, TimeUnit.MILLISECONDS); + break; + } finally { + if (entry != null) { + currentTime = System.currentTimeMillis(); + } + } + } catch (InterruptedException e) { + // Ignore. + continue; + } + } + + if (entry != null) { + return entry.getSession(); + } + return null; + } + + private void runTasks(SessionQueue sessionTasksQueue) { + for (;;) { + Runnable task; + Queue<Runnable> tasksQueue = sessionTasksQueue.tasksQueue; + + synchronized (tasksQueue) { + task = tasksQueue.poll(); + + if (task == null) { + sessionTasksQueue.processingCompleted = true; + break; + } + } + + eventQueueHandler.polled(PriorityThreadPoolExecutor.this, (IoEvent) task); + + runTask(task); + } + } + + private void runTask(Runnable task) { + beforeExecute(thread, task); + boolean ran = false; + try { + task.run(); + ran = true; + afterExecute(task, null); + completedTaskCount.incrementAndGet(); + } catch (RuntimeException e) { + if (!ran) { + afterExecute(task, e); + } + throw e; + } + } + } + + /** + * A class used to store the ordered list of events to be processed by the + * session, and the current task state. + */ + private class SessionQueue { + /** A queue of ordered event waiting to be processed */ + private final Queue<Runnable> tasksQueue = new ConcurrentLinkedQueue<>(); + + /** The current task state */ + private boolean processingCompleted = true; + } + + /** + * A class used to preserve first-in-first-out order of sessions that have + * equal priority. + */ + static class SessionEntry implements Comparable<SessionEntry> { + private final long seqNum; + private final IoSession session; + private final Comparator<IoSession> comparator; + + public SessionEntry(IoSession session, Comparator<IoSession> comparator) { + if (session == null) { + throw new IllegalArgumentException("session"); + } + seqNum = seq.getAndIncrement(); + this.session = session; + this.comparator = comparator; + } + + public IoSession getSession() { + return session; + } + + public int compareTo(SessionEntry other) { + if (other == this) { + return 0; + } + + if (other.session == this.session) { + return 0; + } + + // An exit signal should always be preferred. + if (this == EXIT_SIGNAL) { + return -1; + } + if (other == EXIT_SIGNAL) { + return 1; + } + + int res = 0; + + // If there's a comparator, use it to prioritise events. + if (comparator != null) { + res = comparator.compare(session, other.session); + } + + // FIFO tiebreaker. + if (res == 0) { + res = (seqNum < other.seqNum ? -1 : 1); + } + + return res; + } + } +} http://git-wip-us.apache.org/repos/asf/mina/blob/56ca189e/mina-core/src/test/java/org/apache/mina/filter/executor/PriorityThreadPoolExecutorTest.java ---------------------------------------------------------------------- diff --git a/mina-core/src/test/java/org/apache/mina/filter/executor/PriorityThreadPoolExecutorTest.java b/mina-core/src/test/java/org/apache/mina/filter/executor/PriorityThreadPoolExecutorTest.java new file mode 100644 index 0000000..fac0478 --- /dev/null +++ b/mina-core/src/test/java/org/apache/mina/filter/executor/PriorityThreadPoolExecutorTest.java @@ -0,0 +1,313 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.mina.filter.executor; + +import org.apache.mina.core.filterchain.IoFilter; +import org.apache.mina.core.session.DummySession; +import org.apache.mina.core.session.IdleStatus; +import org.apache.mina.core.session.IoSession; +import org.apache.mina.core.write.WriteRequest; +import org.apache.mina.filter.FilterEvent; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * Tests that verify the functionality provided by the implementation of + * {@link PriorityThreadPoolExecutor}. + * + * @author Guus der Kinderen, guus.der.kinde...@gmail.com + */ +public class PriorityThreadPoolExecutorTest { + /** + * Tests that verify the functionality provided by the implementation of + * {@link org.apache.mina.filter.executor.PriorityThreadPoolExecutor.SessionEntry} + * . + * + * This test asserts that, without a provided comparator, entries are + * considered equal, when they reference the same session. + */ + @Test + public void fifoEntryTestNoComparatorSameSession() throws Exception { + // Set up fixture. + final IoSession session = new DummySession(); + final PriorityThreadPoolExecutor.SessionEntry first = new PriorityThreadPoolExecutor.SessionEntry(session, null); + final PriorityThreadPoolExecutor.SessionEntry last = new PriorityThreadPoolExecutor.SessionEntry(session, null); + + // Execute system under test. + final int result = first.compareTo(last); + + // Verify results. + assertEquals("Without a comparator, entries of the same session are expected to be equal.", 0, result); + } + + /** + * Tests that verify the functionality provided by the implementation of + * {@link org.apache.mina.filter.executor.PriorityThreadPoolExecutor.SessionEntry} + * . + * + * This test asserts that, without a provided comparator, the first entry + * created is 'less than' an entry that is created later. + */ + @Test + public void fifoEntryTestNoComparatorDifferentSession() throws Exception { + // Set up fixture (the order in which the entries are created is + // relevant here!) + final PriorityThreadPoolExecutor.SessionEntry first = new PriorityThreadPoolExecutor.SessionEntry(new DummySession(), null); + final PriorityThreadPoolExecutor.SessionEntry last = new PriorityThreadPoolExecutor.SessionEntry(new DummySession(), null); + + // Execute system under test. + final int result = first.compareTo(last); + + // Verify results. + assertTrue("Without a comparator, the first entry created should be the first entry out. Expected a negative result, instead, got: " + result, result < 0); + } + + /** + * Tests that verify the functionality provided by the implementation of + * {@link org.apache.mina.filter.executor.PriorityThreadPoolExecutor.SessionEntry} + * . + * + * This test asserts that, with a provided comparator, entries are + * considered equal, when they reference the same session (the provided + * comparator is ignored). + */ + @Test + public void fifoEntryTestWithComparatorSameSession() throws Exception { + // Set up fixture. + final IoSession session = new DummySession(); + final int predeterminedResult = 3853; + final Comparator<IoSession> comparator = new Comparator<IoSession>() { + @Override + public int compare(IoSession o1, IoSession o2) { + return predeterminedResult; + } + }; + + final PriorityThreadPoolExecutor.SessionEntry first = new PriorityThreadPoolExecutor.SessionEntry(session, comparator); + final PriorityThreadPoolExecutor.SessionEntry last = new PriorityThreadPoolExecutor.SessionEntry(session, comparator); + + // Execute system under test. + final int result = first.compareTo(last); + + // Verify results. + assertEquals("With a comparator, entries of the same session are expected to be equal.", 0, result); + } + + /** + * Tests that verify the functionality provided by the implementation of + * {@link org.apache.mina.filter.executor.PriorityThreadPoolExecutor.SessionEntry} + * . + * + * This test asserts that a provided comparator is used instead of the + * (fallback) default behavior (when entries are referring different + * sessions). + */ + @Test + public void fifoEntryTestComparatorDifferentSession() throws Exception { + // Set up fixture (the order in which the entries are created is + // relevant here!) + final int predeterminedResult = 3853; + final Comparator<IoSession> comparator = new Comparator<IoSession>() { + @Override + public int compare(IoSession o1, IoSession o2) { + return predeterminedResult; + } + }; + final PriorityThreadPoolExecutor.SessionEntry first = new PriorityThreadPoolExecutor.SessionEntry(new DummySession(), comparator); + final PriorityThreadPoolExecutor.SessionEntry last = new PriorityThreadPoolExecutor.SessionEntry(new DummySession(), comparator); + + // Execute system under test. + final int result = first.compareTo(last); + + // Verify results. + assertEquals("With a comparator, comparing entries of different sessions is expected to yield the comparator result.", predeterminedResult, result); + } + + /** + * Asserts that, when enough work is being submitted to the executor for it + * to start queuing work, prioritisation of work starts to occur. + * + * This implementation starts a number of sessions, and evenly distributes a + * number of messages to them. Processing each message is artificially made + * 'expensive', while the executor pool is kept small. This causes work to + * be queued in the executor. + * + * The executor that is used is configured to prefer one specific session. + * Each session records the timestamp of its last activity. After all work + * has been processed, the test asserts that the last activity of all + * sessions was later than the last activity of the preferred session. + */ + @Test + public void testPrioritisation() throws Throwable { + // Set up fixture. + final MockWorkFilter nextFilter = new MockWorkFilter(); + final List<LastActivityTracker> sessions = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + sessions.add(new LastActivityTracker()); + } + final LastActivityTracker preferredSession = sessions.get(4); // prefer + // an + // arbitrary + // session + // (but + // not the + // first + // or last + // session, + // for + // good + // measure). + final Comparator<IoSession> comparator = new UnfairComparator(preferredSession); + final int maximumPoolSize = 1; // keep this low, to force resource + // contention. + final int amountOfTasks = 400; + + final ExecutorService executor = new PriorityThreadPoolExecutor(maximumPoolSize, comparator); + final ExecutorFilter filter = new ExecutorFilter(executor); + + // Execute system under test. + int sessionIndex = 0; + for (int i = 0; i < amountOfTasks; i++) { + if (++sessionIndex >= sessions.size()) { + sessionIndex = 0; + } + + filter.messageReceived(nextFilter, sessions.get(sessionIndex), null); + + if (nextFilter.throwable != null) { + throw nextFilter.throwable; + } + } + + executor.shutdown(); + + // Verify results. + executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS); + + for (final LastActivityTracker session : sessions) { + if (session != preferredSession) { + assertTrue("All other sessions should have finished later than the preferred session (but at least one did not).", session.lastActivity > preferredSession.lastActivity); + } + } + } + + /** + * A comparator that prefers a particular session. + */ + private static class UnfairComparator implements Comparator<IoSession> { + private final IoSession preferred; + + public UnfairComparator(IoSession preferred) { + this.preferred = preferred; + } + + @Override + public int compare(IoSession o1, IoSession o2) { + if (o1 == preferred) { + return -1; + } + + if (o2 == preferred) { + return 1; + } + + return 0; + } + } + + /** + * A session that tracks the timestamp of last activity. + */ + private static class LastActivityTracker extends DummySession { + long lastActivity = System.currentTimeMillis(); + + public synchronized void setLastActivity() { + lastActivity = System.currentTimeMillis(); + } + } + + /** + * A filter that simulates a non-negligible amount of work. + */ + private static class MockWorkFilter implements IoFilter.NextFilter { + Throwable throwable; + + public void sessionOpened(IoSession session) { + // Do nothing + } + + public void sessionClosed(IoSession session) { + // Do nothing + } + + public void sessionIdle(IoSession session, IdleStatus status) { + // Do nothing + } + + public void exceptionCaught(IoSession session, Throwable cause) { + // Do nothing + } + + public void inputClosed(IoSession session) { + // Do nothing + } + + public void messageReceived(IoSession session, Object message) { + try { + Thread.sleep(20); // mimic work. + ((LastActivityTracker) session).setLastActivity(); + } catch (Exception e) { + if (this.throwable == null) { + this.throwable = e; + } + } + } + + public void messageSent(IoSession session, WriteRequest writeRequest) { + // Do nothing + } + + public void filterWrite(IoSession session, WriteRequest writeRequest) { + // Do nothing + } + + public void filterClose(IoSession session) { + // Do nothing + } + + public void sessionCreated(IoSession session) { + // Do nothing + } + + @Override + public void event(IoSession session, FilterEvent event) { + // TODO Auto-generated method stub + + } + } +}