Author: fhanik
Date: Wed Oct 22 13:00:58 2008
New Revision: 707181

URL: http://svn.apache.org/viewvc?rev=707181&view=rev
Log:
fix thread boundaries by adding a queue to the pool

Modified:
    tomcat/trunk/java/org/apache/catalina/tribes/transport/ReceiverBase.java

Modified: 
tomcat/trunk/java/org/apache/catalina/tribes/transport/ReceiverBase.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/transport/ReceiverBase.java?rev=707181&r1=707180&r2=707181&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/catalina/tribes/transport/ReceiverBase.java 
(original)
+++ tomcat/trunk/java/org/apache/catalina/tribes/transport/ReceiverBase.java 
Wed Oct 22 13:00:58 2008
@@ -21,10 +21,14 @@
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.ServerSocket;
+import java.util.Collection;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.catalina.tribes.ChannelMessage;
 import org.apache.catalina.tribes.ChannelReceiver;
@@ -65,7 +69,7 @@
     private long tcpSelectorTimeout = 5000;
     //how many times to search for an available socket
     private int autoBind = 100;
-    private int maxThreads = Integer.MAX_VALUE;
+    private int maxThreads = 15;
     private int minThreads = 6;
     private int maxTasks = 100;
     private int minTasks = 10;
@@ -78,7 +82,9 @@
     private int soTrafficClass = 0x04 | 0x08 | 0x010;
     private int timeout = 3000; //3 seconds
     private boolean useBufferPool = true;
-
+    private boolean daemon = true;
+    private long maxIdleTime = 60000;
+    
     private ExecutorService executor;
 
 
@@ -87,7 +93,11 @@
 
     public void start() throws IOException {
         if ( executor == null ) {
-            executor = new 
ThreadPoolExecutor(minThreads,maxThreads,60,TimeUnit.SECONDS,new 
LinkedBlockingQueue<Runnable>());
+            //executor = new 
ThreadPoolExecutor(minThreads,maxThreads,60,TimeUnit.SECONDS,new 
LinkedBlockingQueue<Runnable>());
+            TaskQueue taskqueue = new TaskQueue();
+            TaskThreadFactory tf = new 
TaskThreadFactory("Tribes-Task-Receiver-");
+            executor = new ThreadPoolExecutor(minThreads, maxThreads, 
maxIdleTime, TimeUnit.MILLISECONDS,taskqueue, tf);
+            taskqueue.setParent((ThreadPoolExecutor)executor);
         }
     }
 
@@ -539,4 +549,80 @@
         this.udpTxBufSize = udpTxBufSize;
     }
 
+ // ---------------------------------------------- TaskQueue Inner Class
+    class TaskQueue extends LinkedBlockingQueue<Runnable> {
+        ThreadPoolExecutor parent = null;
+
+        public TaskQueue() {
+            super();
+        }
+
+        public TaskQueue(int initialCapacity) {
+            super(initialCapacity);
+        }
+
+        public TaskQueue(Collection<? extends Runnable> c) {
+            super(c);
+        }
+
+        public void setParent(ThreadPoolExecutor tp) {
+            parent = tp;
+        }
+        
+        public boolean force(Runnable o) {
+            if ( parent.isShutdown() ) throw new 
RejectedExecutionException("Executor not running, can't force a command into 
the queue");
+            return super.offer(o); //forces the item onto the queue, to be 
used if the task is rejected
+        }
+
+        public boolean offer(Runnable o) {
+            //we can't do any checks
+            if (parent==null) return super.offer(o);
+            //we are maxed out on threads, simply queue the object
+            if (parent.getPoolSize() == parent.getMaximumPoolSize()) return 
super.offer(o);
+            //we have idle threads, just add it to the queue
+            //this is an approximation, so it could use some tuning
+            if (parent.getActiveCount()<(parent.getPoolSize())) return 
super.offer(o);
+            //if we have less threads than maximum force creation of a new 
thread
+            if (parent.getPoolSize()<parent.getMaximumPoolSize()) return false;
+            //if we reached here, we need to add it to the queue
+            return super.offer(o);
+        }
+    }
+
+    // ---------------------------------------------- ThreadFactory Inner Class
+    class TaskThreadFactory implements ThreadFactory {
+        final ThreadGroup group;
+        final AtomicInteger threadNumber = new AtomicInteger(1);
+        final String namePrefix;
+
+        TaskThreadFactory(String namePrefix) {
+            SecurityManager s = System.getSecurityManager();
+            group = (s != null) ? s.getThreadGroup() : 
Thread.currentThread().getThreadGroup();
+            this.namePrefix = namePrefix;
+        }
+
+        public Thread newThread(Runnable r) {
+            Thread t = new Thread(group, r, namePrefix + 
threadNumber.getAndIncrement());
+            t.setDaemon(daemon);
+            t.setPriority(Thread.NORM_PRIORITY);
+            return t;
+        }
+    }
+
+    public boolean isDaemon() {
+        return daemon;
+    }
+
+    public long getMaxIdleTime() {
+        return maxIdleTime;
+    }
+
+    public void setDaemon(boolean daemon) {
+        this.daemon = daemon;
+    }
+
+    public void setMaxIdleTime(long maxIdleTime) {
+        this.maxIdleTime = maxIdleTime;
+    }    
+    
 }
\ No newline at end of file



---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to