Author: fhanik
Date: Mon Mar 20 09:26:34 2006
New Revision: 387256
URL: http://svn.apache.org/viewcvs?rev=387256&view=rev
Log:
added in max/min threads, still need to make it shrink dynamically
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReceiverBase.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ThreadPool.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioReceiver.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioReplicationThread.java
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioReceiver.java
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReceiverBase.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReceiverBase.java?rev=387256&r1=387255&r2=387256&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReceiverBase.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ReceiverBase.java
Mon Mar 20 09:26:34 2006
@@ -38,7 +38,7 @@
* @author not attributable
* @version 1.0
*/
-public abstract class ReceiverBase implements ChannelReceiver, ListenCallback {
+public abstract class ReceiverBase implements ChannelReceiver, ListenCallback,
ThreadPool.ThreadCreator {
public static final int OPTION_DIRECT_BUFFER = 0x0004;
@@ -51,7 +51,6 @@
private int port;
private int rxBufSize = 43800;
private int txBufSize = 25188;
- private int tcpThreadCount;
private boolean listen = false;
private ThreadPool pool;
private boolean direct = true;
@@ -59,6 +58,8 @@
private String tcpListenAddress;
//how many times to search for an available socket
private int autoBind = 1;
+ private int maxThreads = 25;
+ private int minThreads = 6;
public ReceiverBase() {
}
@@ -89,9 +90,13 @@
public int getTxBufSize() {
return txBufSize;
}
-
+
+ /**
+ * @deprecated use getMinThreads()/getMaxThreads()
+ * @return int
+ */
public int getTcpThreadCount() {
- return tcpThreadCount;
+ return getMinThreads();
}
/**
@@ -121,7 +126,7 @@
}
public void setTcpThreadCount(int tcpThreadCount) {
- this.tcpThreadCount = tcpThreadCount;
+ setMinThreads(tcpThreadCount);
}
/**
@@ -235,6 +240,14 @@
return autoBind;
}
+ public int getMaxThreads() {
+ return maxThreads;
+ }
+
+ public int getMinThreads() {
+ return minThreads;
+ }
+
public void setTcpSelectorTimeout(long selTimeout) {
tcpSelectorTimeout = selTimeout;
}
@@ -266,5 +279,13 @@
public void setAutoBind(int autoBind) {
this.autoBind = autoBind;
if ( this.autoBind <= 0 ) this.autoBind = 1;
+ }
+
+ public void setMaxThreads(int maxThreads) {
+ this.maxThreads = maxThreads;
+ }
+
+ public void setMinThreads(int minThreads) {
+ this.minThreads = minThreads;
}
}
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ThreadPool.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ThreadPool.java?rev=387256&r1=387255&r2=387256&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ThreadPool.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/ThreadPool.java
Mon Mar 20 09:26:34 2006
@@ -15,10 +15,9 @@
*/
package org.apache.catalina.tribes.tcp;
+import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
-import java.util.Iterator;
-import org.apache.catalina.tribes.tcp.nio.*;
/**
* @author not attributable
@@ -34,41 +33,42 @@
*/
List idle = new LinkedList();
+ List used = new LinkedList();
+
Object mutex = new Object();
Object interestOpsMutex = null;
boolean running = true;
-
- public ThreadPool (Object interestOpsMutex, WorkerThread[] threads) throws
Exception {
- // fill up the pool with worker threads
- this.interestOpsMutex = interestOpsMutex;
- for (int i = 0; i < threads.length; i++) {
- WorkerThread thread = threads[i];
- thread.setPool(this);
- thread.setName (thread.getClass().getName()+"[" + (i + 1)+"]");
- thread.setDaemon(true);
- thread.setPriority(Thread.MAX_PRIORITY);
- thread.start();
- idle.add (thread);
- }
+
+ private static int counter = 1;
+ private int maxThreads;
+ private int minThreads;
+
+ private ThreadCreator creator = null;
+
+ private static synchronized int inc() {
+ return counter++;
}
-
- public ThreadPool (int poolSize, Class threadClass, Object
interestOpsMutex, int threadOptions) throws Exception {
+ public ThreadPool (Object interestOpsMutex, int maxThreads, int
minThreads, ThreadCreator creator) throws Exception {
// fill up the pool with worker threads
this.interestOpsMutex = interestOpsMutex;
- for (int i = 0; i < poolSize; i++) {
- WorkerThread thread = (WorkerThread)threadClass.newInstance();
- thread.setPool(this);
- // set thread name for debugging, start it
- thread.setName (threadClass.getName()+"[" + (i + 1)+"]");
- thread.setDaemon(true);
- thread.setPriority(Thread.MAX_PRIORITY);
- thread.setOptions(threadOptions);
- thread.start();
-
+ this.maxThreads = maxThreads;
+ this.minThreads = minThreads;
+ this.creator = creator;
+ for (int i = 0; i < minThreads; i++) {
+ WorkerThread thread = creator.getWorkerThread();
+ setupThread(thread);
idle.add (thread);
}
}
+
+ protected void setupThread(WorkerThread thread) {
+ thread.setPool(this);
+ thread.setName (thread.getClass().getName()+"[" + inc()+"]");
+ thread.setDaemon(true);
+ thread.setPriority(Thread.MAX_PRIORITY);
+ thread.start();
+ }
/**
* Find an idle worker thread, if any. Could return null.
@@ -79,7 +79,7 @@
synchronized (mutex) {
- while ( worker == null ) {
+ while ( worker == null && running ) {
if (idle.size() > 0) {
try {
worker = (WorkerThread) idle.remove(0);
@@ -87,12 +87,15 @@
//this means that there are no available workers
worker = null;
}
+ } else if ( used.size() < this.maxThreads && creator != null) {
+ worker = creator.getWorkerThread();
+ setupThread(worker);
} else {
- try { mutex.wait(); } catch (
java.lang.InterruptedException x ) {}
+ try { mutex.wait(); } catch (
java.lang.InterruptedException x ) {Thread.currentThread().interrupted();}
}
- }
+ }//while
+ if ( worker != null ) used.add(worker);
}
-
return (worker);
}
@@ -107,7 +110,12 @@
public void returnWorker (WorkerThread worker) {
if ( running ) {
synchronized (mutex) {
- idle.add(worker);
+ used.remove(worker);
+ if ( idle.size() < minThreads && !idle.contains(worker))
idle.add(worker);
+ else {
+ worker.setDoRun(false);
+ synchronized (worker){worker.notify();}
+ }
mutex.notify();
}
}else {
@@ -118,7 +126,15 @@
public Object getInterestOpsMutex() {
return interestOpsMutex;
}
-
+
+ public int getMaxThreads() {
+ return maxThreads;
+ }
+
+ public int getMinThreads() {
+ return minThreads;
+ }
+
public void stop() {
running = false;
synchronized (mutex) {
@@ -129,5 +145,17 @@
i.remove();
}
}
+ }
+
+ public void setMaxThreads(int maxThreads) {
+ this.maxThreads = maxThreads;
+ }
+
+ public void setMinThreads(int minThreads) {
+ this.minThreads = minThreads;
+ }
+
+ public static interface ThreadCreator {
+ public WorkerThread getWorkerThread();
}
}
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioReceiver.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioReceiver.java?rev=387256&r1=387255&r2=387256&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioReceiver.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioReceiver.java
Mon Mar 20 09:26:34 2006
@@ -54,11 +54,7 @@
*/
public void start() throws IOException {
try {
- BioReplicationThread[] receivers = new
BioReplicationThread[getTcpThreadCount()];
- for ( int i=0; i<receivers.length; i++ ) {
- receivers[i] = getReplicationThread();
- }
- setPool(new ThreadPool(new Object(), receivers));
+ setPool(new ThreadPool(new
Object(),getMaxThreads(),getMinThreads(),this));
} catch (Exception e) {
log.error("ThreadPool can initilzed. Listener not started", e);
return;
@@ -72,6 +68,10 @@
} catch (Exception x) {
log.fatal("Unable to start cluster receiver", x);
}
+ }
+
+ public WorkerThread getWorkerThread() {
+ return getReplicationThread();
}
protected BioReplicationThread getReplicationThread() {
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioReplicationThread.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioReplicationThread.java?rev=387256&r1=387255&r2=387256&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioReplicationThread.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/bio/BioReplicationThread.java
Mon Mar 20 09:26:34 2006
@@ -67,17 +67,16 @@
// clear interrupt status
Thread.interrupted();
}
- if ( this.socket != null ) {
- try {
- drainSocket();
- } catch ( Exception x ) {
- log.error("Unable to service bio socket");
- }finally {
- try {socket.close();}catch ( Exception ignore){}
- try {reader.close();}catch ( Exception ignore){}
- reader = null;
- socket = null;
- }
+ if ( socket == null ) continue;
+ try {
+ drainSocket();
+ } catch ( Exception x ) {
+ log.error("Unable to service bio socket");
+ }finally {
+ try {socket.close();}catch ( Exception ignore){}
+ try {reader.close();}catch ( Exception ignore){}
+ reader = null;
+ socket = null;
}
// done, ready for more, return to pool
if ( getPool() != null ) getPool().returnWorker (this);
Modified:
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioReceiver.java
URL:
http://svn.apache.org/viewcvs/tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioReceiver.java?rev=387256&r1=387255&r2=387256&view=diff
==============================================================================
---
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioReceiver.java
(original)
+++
tomcat/container/tc5.5.x/modules/groupcom/src/share/org/apache/catalina/tribes/tcp/nio/NioReceiver.java
Mon Mar 20 09:26:34 2006
@@ -87,13 +87,7 @@
*/
public void start() {
try {
- NioReplicationThread[] receivers = new
NioReplicationThread[getTcpThreadCount()];
- for ( int i=0; i<receivers.length; i++ ) {
- receivers[i] = new NioReplicationThread(this);
- receivers[i].setRxBufSize(getRxBufSize());
- receivers[i].setOptions(getWorkerThreadOptions());
- }
- setPool(new ThreadPool(interestOpsMutex, receivers));
+ setPool(new ThreadPool(interestOpsMutex,
getMaxThreads(),getMinThreads(),this));
} catch (Exception e) {
log.error("ThreadPool can initilzed. Listener not started", e);
return;
@@ -107,6 +101,13 @@
} catch (Exception x) {
log.fatal("Unable to start cluster receiver", x);
}
+ }
+
+ public WorkerThread getWorkerThread() {
+ NioReplicationThread thread = new NioReplicationThread(this);
+ thread.setRxBufSize(getRxBufSize());
+ thread.setOptions(getWorkerThreadOptions());
+ return thread;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]