Author: fhanik
Date: Mon Mar 20 09:26:34 2006
New Revision: 387256

URL: http://svn.apache.org/viewcvs?rev=387256&view=rev
Log:
added in max/min threads, still need to make it shrink dynamically

Modified:
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReceiverBase.java
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ThreadPool.java
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioReceiver.java
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioReplicationThread.java
    
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioReceiver.java

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReceiverBase.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReceiverBase.java?rev=387256&r1=387255&r2=387256&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReceiverBase.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReceiverBase.java
 Mon Mar 20 09:26:34 2006
@@ -38,7 +38,7 @@
  * @author not attributable
  * @version 1.0
  */
-public abstract class ReceiverBase implements ChannelReceiver, ListenCallback {
+public abstract class ReceiverBase implements ChannelReceiver, ListenCallback, 
ThreadPool.ThreadCreator {
 
     public static final int OPTION_DIRECT_BUFFER = 0x0004;
 
@@ -51,7 +51,6 @@
     private int port;
     private int rxBufSize = 43800;
     private int txBufSize = 25188;
-    private int tcpThreadCount;
     private boolean listen = false;
     private ThreadPool pool;
     private boolean direct = true;
@@ -59,6 +58,8 @@
     private String tcpListenAddress;
     //how many times to search for an available socket
     private int autoBind = 1;
+    private int maxThreads = 25;
+    private int minThreads = 6;
 
     public ReceiverBase() {
     }
@@ -89,9 +90,13 @@
     public int getTxBufSize() {
         return txBufSize;
     }
-
+    
+    /**
+     * @deprecated use getMinThreads()/getMaxThreads()
+     * @return int
+     */
     public int getTcpThreadCount() {
-        return tcpThreadCount;
+        return getMinThreads();
     }
 
     /**
@@ -121,7 +126,7 @@
     }
 
     public void setTcpThreadCount(int tcpThreadCount) {
-        this.tcpThreadCount = tcpThreadCount;
+        setMinThreads(tcpThreadCount);
     }
 
     /**
@@ -235,6 +240,14 @@
         return autoBind;
     }
 
+    public int getMaxThreads() {
+        return maxThreads;
+    }
+
+    public int getMinThreads() {
+        return minThreads;
+    }
+
     public void setTcpSelectorTimeout(long selTimeout) {
         tcpSelectorTimeout = selTimeout;
     }
@@ -266,5 +279,13 @@
     public void setAutoBind(int autoBind) {
         this.autoBind = autoBind;
         if ( this.autoBind <= 0 ) this.autoBind = 1;
+    }
+
+    public void setMaxThreads(int maxThreads) {
+        this.maxThreads = maxThreads;
+    }
+
+    public void setMinThreads(int minThreads) {
+        this.minThreads = minThreads;
     }
 }

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ThreadPool.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ThreadPool.java?rev=387256&r1=387255&r2=387256&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ThreadPool.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ThreadPool.java
 Mon Mar 20 09:26:34 2006
@@ -15,10 +15,9 @@
  */
 
 package org.apache.catalina.tribes.tcp;
+import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.Iterator;
-import org.apache.catalina.tribes.tcp.nio.*;
 
 /**
  * @author not attributable
@@ -34,41 +33,42 @@
      */
 
     List idle = new LinkedList();
+    List used = new LinkedList();
+    
     Object mutex = new Object();
     Object interestOpsMutex = null;
     boolean running = true;
-
-    public ThreadPool (Object interestOpsMutex, WorkerThread[] threads) throws 
Exception {
-        // fill up the pool with worker threads
-        this.interestOpsMutex = interestOpsMutex;
-        for (int i = 0; i < threads.length; i++) {
-            WorkerThread thread = threads[i];
-            thread.setPool(this);
-            thread.setName (thread.getClass().getName()+"[" + (i + 1)+"]");
-            thread.setDaemon(true);
-            thread.setPriority(Thread.MAX_PRIORITY);
-            thread.start();
-            idle.add (thread);
-        }
+    
+    private static int counter = 1;
+    private int maxThreads;
+    private int minThreads;
+    
+    private ThreadCreator creator = null;
+    
+    private static synchronized int inc() {
+        return counter++;
     }
 
-
-    public ThreadPool (int poolSize, Class threadClass, Object 
interestOpsMutex, int threadOptions) throws Exception {
+    public ThreadPool (Object interestOpsMutex, int maxThreads, int 
minThreads, ThreadCreator creator) throws Exception {
         // fill up the pool with worker threads
         this.interestOpsMutex = interestOpsMutex;
-        for (int i = 0; i < poolSize; i++) {
-            WorkerThread thread = (WorkerThread)threadClass.newInstance();
-            thread.setPool(this);
-            // set thread name for debugging, start it
-            thread.setName (threadClass.getName()+"[" + (i + 1)+"]");
-            thread.setDaemon(true);
-            thread.setPriority(Thread.MAX_PRIORITY);
-            thread.setOptions(threadOptions);
-            thread.start();
-
+        this.maxThreads = maxThreads;
+        this.minThreads = minThreads;
+        this.creator = creator;
+        for (int i = 0; i < minThreads; i++) {
+            WorkerThread thread = creator.getWorkerThread();
+            setupThread(thread);
             idle.add (thread);
         }
     }
+    
+    protected void setupThread(WorkerThread thread) {
+        thread.setPool(this);
+        thread.setName (thread.getClass().getName()+"[" + inc()+"]");
+        thread.setDaemon(true);
+        thread.setPriority(Thread.MAX_PRIORITY);
+        thread.start();
+    }
 
     /**
      * Find an idle worker thread, if any.  Could return null.
@@ -79,7 +79,7 @@
 
         
         synchronized (mutex) {
-            while ( worker == null ) {
+            while ( worker == null && running ) {
                 if (idle.size() > 0) {
                     try {
                         worker = (WorkerThread) idle.remove(0);
@@ -87,12 +87,15 @@
                         //this means that there are no available workers
                         worker = null;
                     }
+                } else if ( used.size() < this.maxThreads && creator != null) {
+                    worker = creator.getWorkerThread();
+                    setupThread(worker);
                 } else {
-                    try { mutex.wait(); } catch ( 
java.lang.InterruptedException x ) {}
+                    try { mutex.wait(); } catch ( 
java.lang.InterruptedException x ) {Thread.currentThread().interrupted();}
                 }
-            }
+            }//while
+            if ( worker != null ) used.add(worker);
         }
-
         return (worker);
     }
     
@@ -107,7 +110,12 @@
     public void returnWorker (WorkerThread worker) {
         if ( running ) {
             synchronized (mutex) {
-                idle.add(worker);
+                used.remove(worker);
+                if ( idle.size() < minThreads && !idle.contains(worker)) 
idle.add(worker);
+                else {
+                    worker.setDoRun(false);
+                    synchronized (worker){worker.notify();}
+                }
                 mutex.notify();
             }
         }else {
@@ -118,7 +126,15 @@
     public Object getInterestOpsMutex() {
         return interestOpsMutex;
     }
-    
+
+    public int getMaxThreads() {
+        return maxThreads;
+    }
+
+    public int getMinThreads() {
+        return minThreads;
+    }
+
     public void stop() {
         running = false;
         synchronized (mutex) {
@@ -129,5 +145,17 @@
                 i.remove();
             }
         }
+    }
+
+    public void setMaxThreads(int maxThreads) {
+        this.maxThreads = maxThreads;
+    }
+
+    public void setMinThreads(int minThreads) {
+        this.minThreads = minThreads;
+    }
+    
+    public static interface ThreadCreator {
+        public WorkerThread getWorkerThread();
     }
 }

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioReceiver.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioReceiver.java?rev=387256&r1=387255&r2=387256&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioReceiver.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioReceiver.java
 Mon Mar 20 09:26:34 2006
@@ -54,11 +54,7 @@
      */
     public void start() throws IOException {
         try {
-            BioReplicationThread[] receivers = new 
BioReplicationThread[getTcpThreadCount()];
-            for ( int i=0; i<receivers.length; i++ ) {
-                receivers[i] = getReplicationThread();
-            }
-            setPool(new ThreadPool(new Object(), receivers));
+            setPool(new ThreadPool(new 
Object(),getMaxThreads(),getMinThreads(),this));
         } catch (Exception e) {
             log.error("ThreadPool can initilzed. Listener not started", e);
             return;
@@ -72,6 +68,10 @@
         } catch (Exception x) {
             log.fatal("Unable to start cluster receiver", x);
         }
+    }
+    
+    public WorkerThread getWorkerThread() {
+        return getReplicationThread();
     }
     
     protected BioReplicationThread getReplicationThread() {

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioReplicationThread.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioReplicationThread.java?rev=387256&r1=387255&r2=387256&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioReplicationThread.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioReplicationThread.java
 Mon Mar 20 09:26:34 2006
@@ -67,17 +67,16 @@
                 // clear interrupt status
                 Thread.interrupted();
             }
-            if ( this.socket != null ) {
-                try {
-                    drainSocket();
-                } catch ( Exception x ) {
-                    log.error("Unable to service bio socket");
-                }finally {
-                    try {socket.close();}catch ( Exception ignore){}
-                    try {reader.close();}catch ( Exception ignore){}
-                    reader = null;
-                    socket = null;
-                }
+            if ( socket == null ) continue;
+            try {
+                drainSocket();
+            } catch ( Exception x ) {
+                log.error("Unable to service bio socket");
+            }finally {
+                try {socket.close();}catch ( Exception ignore){}
+                try {reader.close();}catch ( Exception ignore){}
+                reader = null;
+                socket = null;
             }
             // done, ready for more, return to pool
             if ( getPool() != null ) getPool().returnWorker (this);

Modified: 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioReceiver.java
URL: 
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioReceiver.java?rev=387256&r1=387255&r2=387256&view=diff
==============================================================================
--- 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioReceiver.java
 (original)
+++ 
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioReceiver.java
 Mon Mar 20 09:26:34 2006
@@ -87,13 +87,7 @@
      */
     public void start() {
         try {
-            NioReplicationThread[] receivers = new 
NioReplicationThread[getTcpThreadCount()];
-            for ( int i=0; i<receivers.length; i++ ) {
-                receivers[i] = new NioReplicationThread(this);
-                receivers[i].setRxBufSize(getRxBufSize());
-                receivers[i].setOptions(getWorkerThreadOptions());
-            }
-            setPool(new ThreadPool(interestOpsMutex, receivers));
+            setPool(new ThreadPool(interestOpsMutex, 
getMaxThreads(),getMinThreads(),this));
         } catch (Exception e) {
             log.error("ThreadPool can initilzed. Listener not started", e);
             return;
@@ -107,6 +101,13 @@
         } catch (Exception x) {
             log.fatal("Unable to start cluster receiver", x);
         }
+    }
+    
+    public WorkerThread getWorkerThread() {
+        NioReplicationThread thread = new NioReplicationThread(this);
+        thread.setRxBufSize(getRxBufSize());
+        thread.setOptions(getWorkerThreadOptions());
+        return thread;
     }
     
     



---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to