Author: kfujino Date: Wed Jun 19 10:03:51 2013 New Revision: 1494528 URL: http://svn.apache.org/r1494528 Log: Replace Tribes's TaskQueue as executor's workQueue in order to ensure that executor's maxThread works correctly.
Modified: tomcat/tc7.0.x/trunk/java/org/apache/catalina/tribes/group/interceptors/MessageDispatch15Interceptor.java tomcat/tc7.0.x/trunk/java/org/apache/catalina/tribes/util/ExecutorFactory.java tomcat/tc7.0.x/trunk/webapps/docs/changelog.xml Modified: tomcat/tc7.0.x/trunk/java/org/apache/catalina/tribes/group/interceptors/MessageDispatch15Interceptor.java URL: http://svn.apache.org/viewvc/tomcat/tc7.0.x/trunk/java/org/apache/catalina/tribes/group/interceptors/MessageDispatch15Interceptor.java?rev=1494528&r1=1494527&r2=1494528&view=diff ============================================================================== --- tomcat/tc7.0.x/trunk/java/org/apache/catalina/tribes/group/interceptors/MessageDispatch15Interceptor.java (original) +++ tomcat/tc7.0.x/trunk/java/org/apache/catalina/tribes/group/interceptors/MessageDispatch15Interceptor.java Wed Jun 19 10:03:51 2013 @@ -16,8 +16,7 @@ */ package org.apache.catalina.tribes.group.interceptors; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -25,6 +24,7 @@ import org.apache.catalina.tribes.Channe import org.apache.catalina.tribes.Member; import org.apache.catalina.tribes.group.InterceptorPayload; import org.apache.catalina.tribes.transport.bio.util.LinkObject; +import org.apache.catalina.tribes.util.ExecutorFactory; import org.apache.catalina.tribes.util.TcclThreadFactory; /** @@ -40,11 +40,10 @@ import org.apache.catalina.tribes.util.T public class MessageDispatch15Interceptor extends MessageDispatchInterceptor { protected AtomicLong currentSize = new AtomicLong(0); - protected ThreadPoolExecutor executor = null; + protected ExecutorService executor = null; protected int maxThreads = 10; protected int maxSpareThreads = 2; protected long keepAliveTime = 5000; - protected LinkedBlockingQueue<Runnable> runnablequeue = new LinkedBlockingQueue<Runnable>(); @Override public long getCurrentSize() { @@ -83,9 +82,8 @@ public class MessageDispatch15Intercepto @Override public void startQueue() { if ( run ) return; - executor = new ThreadPoolExecutor(maxSpareThreads, maxThreads, - keepAliveTime, TimeUnit.MILLISECONDS, runnablequeue, - new TcclThreadFactory()); + executor = ExecutorFactory.newThreadPool(maxSpareThreads, maxThreads, + keepAliveTime, TimeUnit.MILLISECONDS, new TcclThreadFactory()); run = true; } @@ -94,7 +92,6 @@ public class MessageDispatch15Intercepto run = false; executor.shutdownNow(); setAndGetCurrentSize(0); - runnablequeue.clear(); } public long getKeepAliveTime() { Modified: tomcat/tc7.0.x/trunk/java/org/apache/catalina/tribes/util/ExecutorFactory.java URL: http://svn.apache.org/viewvc/tomcat/tc7.0.x/trunk/java/org/apache/catalina/tribes/util/ExecutorFactory.java?rev=1494528&r1=1494527&r2=1494528&view=diff ============================================================================== --- tomcat/tc7.0.x/trunk/java/org/apache/catalina/tribes/util/ExecutorFactory.java (original) +++ tomcat/tc7.0.x/trunk/java/org/apache/catalina/tribes/util/ExecutorFactory.java Wed Jun 19 10:03:51 2013 @@ -18,9 +18,11 @@ package org.apache.catalina.tribes.util; import java.util.Collection; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -29,18 +31,52 @@ public class ExecutorFactory { public static ExecutorService newThreadPool(int minThreads, int maxThreads, long maxIdleTime, TimeUnit unit) { TaskQueue taskqueue = new TaskQueue(); - ThreadPoolExecutor service = new ThreadPoolExecutor(minThreads, maxThreads, maxIdleTime, unit,taskqueue); + ThreadPoolExecutor service = new TribesThreadPoolExecutor(minThreads, maxThreads, maxIdleTime, unit,taskqueue); taskqueue.setParent(service); return service; } public static ExecutorService newThreadPool(int minThreads, int maxThreads, long maxIdleTime, TimeUnit unit, ThreadFactory threadFactory) { TaskQueue taskqueue = new TaskQueue(); - ThreadPoolExecutor service = new ThreadPoolExecutor(minThreads, maxThreads, maxIdleTime, unit,taskqueue, threadFactory); + ThreadPoolExecutor service = new TribesThreadPoolExecutor(minThreads, maxThreads, maxIdleTime, unit,taskqueue, threadFactory); taskqueue.setParent(service); return service; } - + + // ---------------------------------------------- TribesThreadPoolExecutor Inner Class + private static class TribesThreadPoolExecutor extends ThreadPoolExecutor { + public TribesThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) { + super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler); + } + + public TribesThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, + RejectedExecutionHandler handler) { + super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); + } + + public TribesThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { + super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory); + } + + public TribesThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { + super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); + } + + @Override + public void execute(Runnable command) { + try { + super.execute(command); + } catch (RejectedExecutionException rx) { + if (super.getQueue() instanceof TaskQueue) { + TaskQueue queue = (TaskQueue)super.getQueue(); + if (!queue.force(command)) { + throw new RejectedExecutionException("Queue capacity is full."); + } + } + } + } + } + // ---------------------------------------------- TaskQueue Inner Class private static class TaskQueue extends LinkedBlockingQueue<Runnable> { private static final long serialVersionUID = 1L; Modified: tomcat/tc7.0.x/trunk/webapps/docs/changelog.xml URL: http://svn.apache.org/viewvc/tomcat/tc7.0.x/trunk/webapps/docs/changelog.xml?rev=1494528&r1=1494527&r2=1494528&view=diff ============================================================================== --- tomcat/tc7.0.x/trunk/webapps/docs/changelog.xml (original) +++ tomcat/tc7.0.x/trunk/webapps/docs/changelog.xml Wed Jun 19 10:03:51 2013 @@ -90,6 +90,11 @@ <add> Add logging of when a member is unable to join the cluster. (kfujino) </add> + <fix> + Replace Tribes's <code>TaskQueue</code> as executor's + workQueue in order to ensure that executor's <code>maxThread</code> + works correctly. (kfujino) + </fix> </changelog> </subsection> <subsection name="Web applications"> --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org For additional commands, e-mail: dev-h...@tomcat.apache.org