Author: kfujino
Date: Wed Jun 19 09:59:56 2013
New Revision: 1494527

URL: http://svn.apache.org/r1494527
Log:
Replace Tribes's TaskQueue as executor's workQueue in order to ensure that 
executor's maxThread works correctly.

Modified:
    
tomcat/trunk/java/org/apache/catalina/tribes/group/interceptors/MessageDispatch15Interceptor.java
    tomcat/trunk/java/org/apache/catalina/tribes/util/ExecutorFactory.java

Modified: 
tomcat/trunk/java/org/apache/catalina/tribes/group/interceptors/MessageDispatch15Interceptor.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/group/interceptors/MessageDispatch15Interceptor.java?rev=1494527&r1=1494526&r2=1494527&view=diff
==============================================================================
--- 
tomcat/trunk/java/org/apache/catalina/tribes/group/interceptors/MessageDispatch15Interceptor.java
 (original)
+++ 
tomcat/trunk/java/org/apache/catalina/tribes/group/interceptors/MessageDispatch15Interceptor.java
 Wed Jun 19 09:59:56 2013
@@ -16,8 +16,7 @@
  */
 package org.apache.catalina.tribes.group.interceptors;
 
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -25,6 +24,7 @@ import org.apache.catalina.tribes.Channe
 import org.apache.catalina.tribes.Member;
 import org.apache.catalina.tribes.group.InterceptorPayload;
 import org.apache.catalina.tribes.transport.bio.util.LinkObject;
+import org.apache.catalina.tribes.util.ExecutorFactory;
 import org.apache.catalina.tribes.util.TcclThreadFactory;
 
 /**
@@ -40,12 +40,10 @@ import org.apache.catalina.tribes.util.T
 public class MessageDispatch15Interceptor extends MessageDispatchInterceptor {
 
     protected final AtomicLong currentSize = new AtomicLong(0);
-    protected ThreadPoolExecutor executor = null;
+    protected ExecutorService executor = null;
     protected int maxThreads = 10;
     protected int maxSpareThreads = 2;
     protected long keepAliveTime = 5000;
-    protected final LinkedBlockingQueue<Runnable> runnablequeue =
-            new LinkedBlockingQueue<>();
 
     @Override
     public long getCurrentSize() {
@@ -84,9 +82,8 @@ public class MessageDispatch15Intercepto
     @Override
     public void startQueue() {
         if ( run ) return;
-        executor = new ThreadPoolExecutor(maxSpareThreads, maxThreads,
-                keepAliveTime, TimeUnit.MILLISECONDS, runnablequeue,
-                new TcclThreadFactory());
+        executor = ExecutorFactory.newThreadPool(maxSpareThreads, maxThreads,
+                keepAliveTime, TimeUnit.MILLISECONDS, new TcclThreadFactory());
         run = true;
     }
 
@@ -95,7 +92,6 @@ public class MessageDispatch15Intercepto
         run = false;
         executor.shutdownNow();
         setAndGetCurrentSize(0);
-        runnablequeue.clear();
     }
 
     public long getKeepAliveTime() {

Modified: tomcat/trunk/java/org/apache/catalina/tribes/util/ExecutorFactory.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/util/ExecutorFactory.java?rev=1494527&r1=1494526&r2=1494527&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/catalina/tribes/util/ExecutorFactory.java 
(original)
+++ tomcat/trunk/java/org/apache/catalina/tribes/util/ExecutorFactory.java Wed 
Jun 19 09:59:56 2013
@@ -17,8 +17,11 @@
 
 package org.apache.catalina.tribes.util;
 
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.RejectedExecutionHandler;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -27,18 +30,52 @@ public class ExecutorFactory {
 
     public static ExecutorService newThreadPool(int minThreads, int 
maxThreads, long maxIdleTime, TimeUnit unit) {
         TaskQueue taskqueue = new TaskQueue();
-        ThreadPoolExecutor service = new ThreadPoolExecutor(minThreads, 
maxThreads, maxIdleTime, unit,taskqueue);
+        ThreadPoolExecutor service = new TribesThreadPoolExecutor(minThreads, 
maxThreads, maxIdleTime, unit,taskqueue);
         taskqueue.setParent(service);
         return service;
     }
 
     public static ExecutorService newThreadPool(int minThreads, int 
maxThreads, long maxIdleTime, TimeUnit unit, ThreadFactory threadFactory) {
         TaskQueue taskqueue = new TaskQueue();
-        ThreadPoolExecutor service = new ThreadPoolExecutor(minThreads, 
maxThreads, maxIdleTime, unit,taskqueue, threadFactory);
+        ThreadPoolExecutor service = new TribesThreadPoolExecutor(minThreads, 
maxThreads, maxIdleTime, unit,taskqueue, threadFactory);
         taskqueue.setParent(service);
         return service;
     }
 
+    // ---------------------------------------------- TribesThreadPoolExecutor 
Inner Class
+    private static class TribesThreadPoolExecutor extends ThreadPoolExecutor {
+        public TribesThreadPoolExecutor(int corePoolSize, int maximumPoolSize, 
long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, 
RejectedExecutionHandler handler) {
+            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, 
workQueue, handler);
+        }
+
+        public TribesThreadPoolExecutor(int corePoolSize, int maximumPoolSize, 
long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, 
ThreadFactory threadFactory,
+                RejectedExecutionHandler handler) {
+            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, 
workQueue, threadFactory, handler);
+        }
+
+        public TribesThreadPoolExecutor(int corePoolSize, int maximumPoolSize, 
long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, 
ThreadFactory threadFactory) {
+            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, 
workQueue, threadFactory);
+        }
+
+        public TribesThreadPoolExecutor(int corePoolSize, int maximumPoolSize, 
long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
+            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, 
workQueue);
+        }
+
+        @Override
+        public void execute(Runnable command) {
+            try {
+                super.execute(command);
+            } catch (RejectedExecutionException rx) {
+                if (super.getQueue() instanceof TaskQueue) {
+                    TaskQueue queue = (TaskQueue)super.getQueue();
+                    if (!queue.force(command)) {
+                        throw new RejectedExecutionException("Queue capacity 
is full.");
+                    }
+                }
+            }
+        }
+    }
+
      // ---------------------------------------------- TaskQueue Inner Class
     private static class TaskQueue extends LinkedBlockingQueue<Runnable> {
         private static final long serialVersionUID = 1L;
@@ -53,6 +90,11 @@ public class ExecutorFactory {
             parent = tp;
         }
 
+        public boolean force(Runnable o) {
+            if ( parent.isShutdown() ) throw new 
RejectedExecutionException("Executor not running, can't force a command into 
the queue");
+            return super.offer(o); //forces the item onto the queue, to be 
used if the task is rejected
+        }
+
         @Override
         public boolean offer(Runnable o) {
             //we can't do any checks



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org
For additional commands, e-mail: dev-h...@tomcat.apache.org

Reply via email to