Author: fhanik Date: Wed Oct 22 13:00:58 2008 New Revision: 707181 URL: http://svn.apache.org/viewvc?rev=707181&view=rev Log: fix thread boundaries by adding a queue to the pool
Modified: tomcat/trunk/java/org/apache/catalina/tribes/transport/ReceiverBase.java Modified: tomcat/trunk/java/org/apache/catalina/tribes/transport/ReceiverBase.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/transport/ReceiverBase.java?rev=707181&r1=707180&r2=707181&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/catalina/tribes/transport/ReceiverBase.java (original) +++ tomcat/trunk/java/org/apache/catalina/tribes/transport/ReceiverBase.java Wed Oct 22 13:00:58 2008 @@ -21,10 +21,14 @@ import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.ServerSocket; +import java.util.Collection; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.catalina.tribes.ChannelMessage; import org.apache.catalina.tribes.ChannelReceiver; @@ -65,7 +69,7 @@ private long tcpSelectorTimeout = 5000; //how many times to search for an available socket private int autoBind = 100; - private int maxThreads = Integer.MAX_VALUE; + private int maxThreads = 15; private int minThreads = 6; private int maxTasks = 100; private int minTasks = 10; @@ -78,7 +82,9 @@ private int soTrafficClass = 0x04 | 0x08 | 0x010; private int timeout = 3000; //3 seconds private boolean useBufferPool = true; - + private boolean daemon = true; + private long maxIdleTime = 60000; + private ExecutorService executor; @@ -87,7 +93,11 @@ public void start() throws IOException { if ( executor == null ) { - executor = new ThreadPoolExecutor(minThreads,maxThreads,60,TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>()); + //executor = new ThreadPoolExecutor(minThreads,maxThreads,60,TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>()); + TaskQueue taskqueue = new TaskQueue(); + TaskThreadFactory tf = new TaskThreadFactory("Tribes-Task-Receiver-"); + executor = new ThreadPoolExecutor(minThreads, maxThreads, maxIdleTime, TimeUnit.MILLISECONDS,taskqueue, tf); + taskqueue.setParent((ThreadPoolExecutor)executor); } } @@ -539,4 +549,80 @@ this.udpTxBufSize = udpTxBufSize; } + // ---------------------------------------------- TaskQueue Inner Class + class TaskQueue extends LinkedBlockingQueue<Runnable> { + ThreadPoolExecutor parent = null; + + public TaskQueue() { + super(); + } + + public TaskQueue(int initialCapacity) { + super(initialCapacity); + } + + public TaskQueue(Collection<? extends Runnable> c) { + super(c); + } + + public void setParent(ThreadPoolExecutor tp) { + parent = tp; + } + + public boolean force(Runnable o) { + if ( parent.isShutdown() ) throw new RejectedExecutionException("Executor not running, can't force a command into the queue"); + return super.offer(o); //forces the item onto the queue, to be used if the task is rejected + } + + public boolean offer(Runnable o) { + //we can't do any checks + if (parent==null) return super.offer(o); + //we are maxed out on threads, simply queue the object + if (parent.getPoolSize() == parent.getMaximumPoolSize()) return super.offer(o); + //we have idle threads, just add it to the queue + //this is an approximation, so it could use some tuning + if (parent.getActiveCount()<(parent.getPoolSize())) return super.offer(o); + //if we have less threads than maximum force creation of a new thread + if (parent.getPoolSize()<parent.getMaximumPoolSize()) return false; + //if we reached here, we need to add it to the queue + return super.offer(o); + } + } + + // ---------------------------------------------- ThreadFactory Inner Class + class TaskThreadFactory implements ThreadFactory { + final ThreadGroup group; + final AtomicInteger threadNumber = new AtomicInteger(1); + final String namePrefix; + + TaskThreadFactory(String namePrefix) { + SecurityManager s = System.getSecurityManager(); + group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); + this.namePrefix = namePrefix; + } + + public Thread newThread(Runnable r) { + Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement()); + t.setDaemon(daemon); + t.setPriority(Thread.NORM_PRIORITY); + return t; + } + } + + public boolean isDaemon() { + return daemon; + } + + public long getMaxIdleTime() { + return maxIdleTime; + } + + public void setDaemon(boolean daemon) { + this.daemon = daemon; + } + + public void setMaxIdleTime(long maxIdleTime) { + this.maxIdleTime = maxIdleTime; + } + } \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]