Author: fhanik Date: Mon Mar 20 09:26:34 2006 New Revision: 387256 URL: http://svn.apache.org/viewcvs?rev=387256&view=rev Log: added in max/min threads, still need to make it shrink dynamically
Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReceiverBase.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ThreadPool.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioReceiver.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioReplicationThread.java tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioReceiver.java Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReceiverBase.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReceiverBase.java?rev=387256&r1=387255&r2=387256&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReceiverBase.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReceiverBase.java Mon Mar 20 09:26:34 2006 @@ -38,7 +38,7 @@ * @author not attributable * @version 1.0 */ -public abstract class ReceiverBase implements ChannelReceiver, ListenCallback { +public abstract class ReceiverBase implements ChannelReceiver, ListenCallback, ThreadPool.ThreadCreator { public static final int OPTION_DIRECT_BUFFER = 0x0004; @@ -51,7 +51,6 @@ private int port; private int rxBufSize = 43800; private int txBufSize = 25188; - private int tcpThreadCount; private boolean listen = false; private ThreadPool pool; private boolean direct = true; @@ -59,6 +58,8 @@ private String tcpListenAddress; //how many times to search for an available socket private int autoBind = 1; + private int maxThreads = 25; + private int minThreads = 6; public ReceiverBase() { } @@ -89,9 +90,13 @@ public int getTxBufSize() { return txBufSize; } - + + /** + * @deprecated use getMinThreads()/getMaxThreads() + * @return int + */ public int getTcpThreadCount() { - return tcpThreadCount; + return getMinThreads(); } /** @@ -121,7 +126,7 @@ } public void setTcpThreadCount(int tcpThreadCount) { - this.tcpThreadCount = tcpThreadCount; + setMinThreads(tcpThreadCount); } /** @@ -235,6 +240,14 @@ return autoBind; } + public int getMaxThreads() { + return maxThreads; + } + + public int getMinThreads() { + return minThreads; + } + public void setTcpSelectorTimeout(long selTimeout) { tcpSelectorTimeout = selTimeout; } @@ -266,5 +279,13 @@ public void setAutoBind(int autoBind) { this.autoBind = autoBind; if ( this.autoBind <= 0 ) this.autoBind = 1; + } + + public void setMaxThreads(int maxThreads) { + this.maxThreads = maxThreads; + } + + public void setMinThreads(int minThreads) { + this.minThreads = minThreads; } } Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ThreadPool.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ThreadPool.java?rev=387256&r1=387255&r2=387256&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ThreadPool.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ThreadPool.java Mon Mar 20 09:26:34 2006 @@ -15,10 +15,9 @@ */ package org.apache.catalina.tribes.tcp; +import java.util.Iterator; import java.util.LinkedList; import java.util.List; -import java.util.Iterator; -import org.apache.catalina.tribes.tcp.nio.*; /** * @author not attributable @@ -34,41 +33,42 @@ */ List idle = new LinkedList(); + List used = new LinkedList(); + Object mutex = new Object(); Object interestOpsMutex = null; boolean running = true; - - public ThreadPool (Object interestOpsMutex, WorkerThread[] threads) throws Exception { - // fill up the pool with worker threads - this.interestOpsMutex = interestOpsMutex; - for (int i = 0; i < threads.length; i++) { - WorkerThread thread = threads[i]; - thread.setPool(this); - thread.setName (thread.getClass().getName()+"[" + (i + 1)+"]"); - thread.setDaemon(true); - thread.setPriority(Thread.MAX_PRIORITY); - thread.start(); - idle.add (thread); - } + + private static int counter = 1; + private int maxThreads; + private int minThreads; + + private ThreadCreator creator = null; + + private static synchronized int inc() { + return counter++; } - - public ThreadPool (int poolSize, Class threadClass, Object interestOpsMutex, int threadOptions) throws Exception { + public ThreadPool (Object interestOpsMutex, int maxThreads, int minThreads, ThreadCreator creator) throws Exception { // fill up the pool with worker threads this.interestOpsMutex = interestOpsMutex; - for (int i = 0; i < poolSize; i++) { - WorkerThread thread = (WorkerThread)threadClass.newInstance(); - thread.setPool(this); - // set thread name for debugging, start it - thread.setName (threadClass.getName()+"[" + (i + 1)+"]"); - thread.setDaemon(true); - thread.setPriority(Thread.MAX_PRIORITY); - thread.setOptions(threadOptions); - thread.start(); - + this.maxThreads = maxThreads; + this.minThreads = minThreads; + this.creator = creator; + for (int i = 0; i < minThreads; i++) { + WorkerThread thread = creator.getWorkerThread(); + setupThread(thread); idle.add (thread); } } + + protected void setupThread(WorkerThread thread) { + thread.setPool(this); + thread.setName (thread.getClass().getName()+"[" + inc()+"]"); + thread.setDaemon(true); + thread.setPriority(Thread.MAX_PRIORITY); + thread.start(); + } /** * Find an idle worker thread, if any. Could return null. @@ -79,7 +79,7 @@ synchronized (mutex) { - while ( worker == null ) { + while ( worker == null && running ) { if (idle.size() > 0) { try { worker = (WorkerThread) idle.remove(0); @@ -87,12 +87,15 @@ //this means that there are no available workers worker = null; } + } else if ( used.size() < this.maxThreads && creator != null) { + worker = creator.getWorkerThread(); + setupThread(worker); } else { - try { mutex.wait(); } catch ( java.lang.InterruptedException x ) {} + try { mutex.wait(); } catch ( java.lang.InterruptedException x ) {Thread.currentThread().interrupted();} } - } + }//while + if ( worker != null ) used.add(worker); } - return (worker); } @@ -107,7 +110,12 @@ public void returnWorker (WorkerThread worker) { if ( running ) { synchronized (mutex) { - idle.add(worker); + used.remove(worker); + if ( idle.size() < minThreads && !idle.contains(worker)) idle.add(worker); + else { + worker.setDoRun(false); + synchronized (worker){worker.notify();} + } mutex.notify(); } }else { @@ -118,7 +126,15 @@ public Object getInterestOpsMutex() { return interestOpsMutex; } - + + public int getMaxThreads() { + return maxThreads; + } + + public int getMinThreads() { + return minThreads; + } + public void stop() { running = false; synchronized (mutex) { @@ -129,5 +145,17 @@ i.remove(); } } + } + + public void setMaxThreads(int maxThreads) { + this.maxThreads = maxThreads; + } + + public void setMinThreads(int minThreads) { + this.minThreads = minThreads; + } + + public static interface ThreadCreator { + public WorkerThread getWorkerThread(); } } Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioReceiver.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioReceiver.java?rev=387256&r1=387255&r2=387256&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioReceiver.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioReceiver.java Mon Mar 20 09:26:34 2006 @@ -54,11 +54,7 @@ */ public void start() throws IOException { try { - BioReplicationThread[] receivers = new BioReplicationThread[getTcpThreadCount()]; - for ( int i=0; i<receivers.length; i++ ) { - receivers[i] = getReplicationThread(); - } - setPool(new ThreadPool(new Object(), receivers)); + setPool(new ThreadPool(new Object(),getMaxThreads(),getMinThreads(),this)); } catch (Exception e) { log.error("ThreadPool can initilzed. Listener not started", e); return; @@ -72,6 +68,10 @@ } catch (Exception x) { log.fatal("Unable to start cluster receiver", x); } + } + + public WorkerThread getWorkerThread() { + return getReplicationThread(); } protected BioReplicationThread getReplicationThread() { Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioReplicationThread.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioReplicationThread.java?rev=387256&r1=387255&r2=387256&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioReplicationThread.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioReplicationThread.java Mon Mar 20 09:26:34 2006 @@ -67,17 +67,16 @@ // clear interrupt status Thread.interrupted(); } - if ( this.socket != null ) { - try { - drainSocket(); - } catch ( Exception x ) { - log.error("Unable to service bio socket"); - }finally { - try {socket.close();}catch ( Exception ignore){} - try {reader.close();}catch ( Exception ignore){} - reader = null; - socket = null; - } + if ( socket == null ) continue; + try { + drainSocket(); + } catch ( Exception x ) { + log.error("Unable to service bio socket"); + }finally { + try {socket.close();}catch ( Exception ignore){} + try {reader.close();}catch ( Exception ignore){} + reader = null; + socket = null; } // done, ready for more, return to pool if ( getPool() != null ) getPool().returnWorker (this); Modified: tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioReceiver.java URL: http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioReceiver.java?rev=387256&r1=387255&r2=387256&view=diff ============================================================================== --- tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioReceiver.java (original) +++ tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioReceiver.java Mon Mar 20 09:26:34 2006 @@ -87,13 +87,7 @@ */ public void start() { try { - NioReplicationThread[] receivers = new NioReplicationThread[getTcpThreadCount()]; - for ( int i=0; i<receivers.length; i++ ) { - receivers[i] = new NioReplicationThread(this); - receivers[i].setRxBufSize(getRxBufSize()); - receivers[i].setOptions(getWorkerThreadOptions()); - } - setPool(new ThreadPool(interestOpsMutex, receivers)); + setPool(new ThreadPool(interestOpsMutex, getMaxThreads(),getMinThreads(),this)); } catch (Exception e) { log.error("ThreadPool can initilzed. Listener not started", e); return; @@ -107,6 +101,13 @@ } catch (Exception x) { log.fatal("Unable to start cluster receiver", x); } + } + + public WorkerThread getWorkerThread() { + NioReplicationThread thread = new NioReplicationThread(this); + thread.setRxBufSize(getRxBufSize()); + thread.setOptions(getWorkerThreadOptions()); + return thread; } --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]