Author: kfujino Date: Wed Jun 19 09:59:56 2013 New Revision: 1494527 URL: http://svn.apache.org/r1494527 Log: Replace Tribes's TaskQueue as executor's workQueue in order to ensure that executor's maxThread works correctly.
Modified: tomcat/trunk/java/org/apache/catalina/tribes/group/interceptors/MessageDispatch15Interceptor.java tomcat/trunk/java/org/apache/catalina/tribes/util/ExecutorFactory.java Modified: tomcat/trunk/java/org/apache/catalina/tribes/group/interceptors/MessageDispatch15Interceptor.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/group/interceptors/MessageDispatch15Interceptor.java?rev=1494527&r1=1494526&r2=1494527&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/catalina/tribes/group/interceptors/MessageDispatch15Interceptor.java (original) +++ tomcat/trunk/java/org/apache/catalina/tribes/group/interceptors/MessageDispatch15Interceptor.java Wed Jun 19 09:59:56 2013 @@ -16,8 +16,7 @@ */ package org.apache.catalina.tribes.group.interceptors; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -25,6 +24,7 @@ import org.apache.catalina.tribes.Channe import org.apache.catalina.tribes.Member; import org.apache.catalina.tribes.group.InterceptorPayload; import org.apache.catalina.tribes.transport.bio.util.LinkObject; +import org.apache.catalina.tribes.util.ExecutorFactory; import org.apache.catalina.tribes.util.TcclThreadFactory; /** @@ -40,12 +40,10 @@ import org.apache.catalina.tribes.util.T public class MessageDispatch15Interceptor extends MessageDispatchInterceptor { protected final AtomicLong currentSize = new AtomicLong(0); - protected ThreadPoolExecutor executor = null; + protected ExecutorService executor = null; protected int maxThreads = 10; protected int maxSpareThreads = 2; protected long keepAliveTime = 5000; - protected final LinkedBlockingQueue<Runnable> runnablequeue = - new LinkedBlockingQueue<>(); @Override public long getCurrentSize() { @@ -84,9 +82,8 @@ public class MessageDispatch15Intercepto @Override public void startQueue() { if ( run ) return; - executor = new ThreadPoolExecutor(maxSpareThreads, maxThreads, - keepAliveTime, TimeUnit.MILLISECONDS, runnablequeue, - new TcclThreadFactory()); + executor = ExecutorFactory.newThreadPool(maxSpareThreads, maxThreads, + keepAliveTime, TimeUnit.MILLISECONDS, new TcclThreadFactory()); run = true; } @@ -95,7 +92,6 @@ public class MessageDispatch15Intercepto run = false; executor.shutdownNow(); setAndGetCurrentSize(0); - runnablequeue.clear(); } public long getKeepAliveTime() { Modified: tomcat/trunk/java/org/apache/catalina/tribes/util/ExecutorFactory.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/util/ExecutorFactory.java?rev=1494527&r1=1494526&r2=1494527&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/catalina/tribes/util/ExecutorFactory.java (original) +++ tomcat/trunk/java/org/apache/catalina/tribes/util/ExecutorFactory.java Wed Jun 19 09:59:56 2013 @@ -17,8 +17,11 @@ package org.apache.catalina.tribes.util; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -27,18 +30,52 @@ public class ExecutorFactory { public static ExecutorService newThreadPool(int minThreads, int maxThreads, long maxIdleTime, TimeUnit unit) { TaskQueue taskqueue = new TaskQueue(); - ThreadPoolExecutor service = new ThreadPoolExecutor(minThreads, maxThreads, maxIdleTime, unit,taskqueue); + ThreadPoolExecutor service = new TribesThreadPoolExecutor(minThreads, maxThreads, maxIdleTime, unit,taskqueue); taskqueue.setParent(service); return service; } public static ExecutorService newThreadPool(int minThreads, int maxThreads, long maxIdleTime, TimeUnit unit, ThreadFactory threadFactory) { TaskQueue taskqueue = new TaskQueue(); - ThreadPoolExecutor service = new ThreadPoolExecutor(minThreads, maxThreads, maxIdleTime, unit,taskqueue, threadFactory); + ThreadPoolExecutor service = new TribesThreadPoolExecutor(minThreads, maxThreads, maxIdleTime, unit,taskqueue, threadFactory); taskqueue.setParent(service); return service; } + // ---------------------------------------------- TribesThreadPoolExecutor Inner Class + private static class TribesThreadPoolExecutor extends ThreadPoolExecutor { + public TribesThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) { + super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler); + } + + public TribesThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, + RejectedExecutionHandler handler) { + super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); + } + + public TribesThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { + super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory); + } + + public TribesThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { + super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); + } + + @Override + public void execute(Runnable command) { + try { + super.execute(command); + } catch (RejectedExecutionException rx) { + if (super.getQueue() instanceof TaskQueue) { + TaskQueue queue = (TaskQueue)super.getQueue(); + if (!queue.force(command)) { + throw new RejectedExecutionException("Queue capacity is full."); + } + } + } + } + } + // ---------------------------------------------- TaskQueue Inner Class private static class TaskQueue extends LinkedBlockingQueue<Runnable> { private static final long serialVersionUID = 1L; @@ -53,6 +90,11 @@ public class ExecutorFactory { parent = tp; } + public boolean force(Runnable o) { + if ( parent.isShutdown() ) throw new RejectedExecutionException("Executor not running, can't force a command into the queue"); + return super.offer(o); //forces the item onto the queue, to be used if the task is rejected + } + @Override public boolean offer(Runnable o) { //we can't do any checks --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org For additional commands, e-mail: dev-h...@tomcat.apache.org