Author: fhanik Date: Tue Jul 14 14:46:34 2009 New Revision: 793913 URL: http://svn.apache.org/viewvc?rev=793913&view=rev Log: Patch by arieland...@hotmail.com Fix for https://issues.apache.org/bugzilla/show_bug.cgi?id=47524
Added: tomcat/trunk/java/org/apache/catalina/tribes/util/ExecutorFactory.java (with props) Modified: tomcat/trunk/java/org/apache/catalina/tribes/membership/McastServiceImpl.java tomcat/trunk/java/org/apache/catalina/tribes/transport/ReceiverBase.java Modified: tomcat/trunk/java/org/apache/catalina/tribes/membership/McastServiceImpl.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/membership/McastServiceImpl.java?rev=793913&r1=793912&r2=793913&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/catalina/tribes/membership/McastServiceImpl.java (original) +++ tomcat/trunk/java/org/apache/catalina/tribes/membership/McastServiceImpl.java Tue Jul 14 14:46:34 2009 @@ -27,8 +27,6 @@ import java.net.SocketTimeoutException; import java.util.Arrays; import java.util.concurrent.ExecutorService; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.apache.catalina.tribes.Channel; @@ -37,6 +35,7 @@ import org.apache.catalina.tribes.MessageListener; import org.apache.catalina.tribes.io.ChannelData; import org.apache.catalina.tribes.io.XByteBuffer; +import org.apache.catalina.tribes.util.ExecutorFactory; /** * A <b>membership</b> implementation using simple multicast. @@ -145,7 +144,7 @@ /** * Dont interrupt the sender/receiver thread, but pass off to an executor */ - protected ExecutorService executor = new ThreadPoolExecutor(0, 2, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); + protected ExecutorService executor = ExecutorFactory.newThreadPool(0, 2, 2, TimeUnit.SECONDS); /** * disable/enable local loopback message Modified: tomcat/trunk/java/org/apache/catalina/tribes/transport/ReceiverBase.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/transport/ReceiverBase.java?rev=793913&r1=793912&r2=793913&view=diff ============================================================================== --- tomcat/trunk/java/org/apache/catalina/tribes/transport/ReceiverBase.java (original) +++ tomcat/trunk/java/org/apache/catalina/tribes/transport/ReceiverBase.java Tue Jul 14 14:46:34 2009 @@ -21,12 +21,8 @@ import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.ServerSocket; -import java.util.Collection; import java.util.concurrent.ExecutorService; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ThreadFactory; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -34,6 +30,7 @@ import org.apache.catalina.tribes.ChannelReceiver; import org.apache.catalina.tribes.MessageListener; import org.apache.catalina.tribes.io.ListenCallback; +import org.apache.catalina.tribes.util.ExecutorFactory; import org.apache.juli.logging.Log; /** @@ -94,10 +91,8 @@ public void start() throws IOException { if ( executor == null ) { //executor = new ThreadPoolExecutor(minThreads,maxThreads,60,TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>()); - TaskQueue taskqueue = new TaskQueue(); TaskThreadFactory tf = new TaskThreadFactory("Tribes-Task-Receiver-"); - executor = new ThreadPoolExecutor(minThreads, maxThreads, maxIdleTime, TimeUnit.MILLISECONDS,taskqueue, tf); - taskqueue.setParent((ThreadPoolExecutor)executor); + executor = ExecutorFactory.newThreadPool(minThreads, maxThreads, maxIdleTime, TimeUnit.MILLISECONDS, tf); } } @@ -549,46 +544,6 @@ this.udpTxBufSize = udpTxBufSize; } - // ---------------------------------------------- TaskQueue Inner Class - class TaskQueue extends LinkedBlockingQueue<Runnable> { - ThreadPoolExecutor parent = null; - - public TaskQueue() { - super(); - } - - public TaskQueue(int initialCapacity) { - super(initialCapacity); - } - - public TaskQueue(Collection<? extends Runnable> c) { - super(c); - } - - public void setParent(ThreadPoolExecutor tp) { - parent = tp; - } - - public boolean force(Runnable o) { - if ( parent.isShutdown() ) throw new RejectedExecutionException("Executor not running, can't force a command into the queue"); - return super.offer(o); //forces the item onto the queue, to be used if the task is rejected - } - - public boolean offer(Runnable o) { - //we can't do any checks - if (parent==null) return super.offer(o); - //we are maxed out on threads, simply queue the object - if (parent.getPoolSize() == parent.getMaximumPoolSize()) return super.offer(o); - //we have idle threads, just add it to the queue - //this is an approximation, so it could use some tuning - if (parent.getActiveCount()<(parent.getPoolSize())) return super.offer(o); - //if we have less threads than maximum force creation of a new thread - if (parent.getPoolSize()<parent.getMaximumPoolSize()) return false; - //if we reached here, we need to add it to the queue - return super.offer(o); - } - } - // ---------------------------------------------- ThreadFactory Inner Class class TaskThreadFactory implements ThreadFactory { final ThreadGroup group; Added: tomcat/trunk/java/org/apache/catalina/tribes/util/ExecutorFactory.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/tribes/util/ExecutorFactory.java?rev=793913&view=auto ============================================================================== --- tomcat/trunk/java/org/apache/catalina/tribes/util/ExecutorFactory.java (added) +++ tomcat/trunk/java/org/apache/catalina/tribes/util/ExecutorFactory.java Tue Jul 14 14:46:34 2009 @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.catalina.tribes.util; + +import java.util.Collection; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +public class ExecutorFactory { + + public static ExecutorService newThreadPool(int minThreads, int maxThreads, long maxIdleTime, TimeUnit unit) { + TaskQueue taskqueue = new TaskQueue(); + ThreadPoolExecutor service = new ThreadPoolExecutor(minThreads, maxThreads, maxIdleTime, unit,taskqueue); + taskqueue.setParent(service); + return service; + } + + public static ExecutorService newThreadPool(int minThreads, int maxThreads, long maxIdleTime, TimeUnit unit, ThreadFactory threadFactory) { + TaskQueue taskqueue = new TaskQueue(); + ThreadPoolExecutor service = new ThreadPoolExecutor(minThreads, maxThreads, maxIdleTime, unit,taskqueue, threadFactory); + taskqueue.setParent(service); + return service; + } + + // ---------------------------------------------- TaskQueue Inner Class + private static class TaskQueue extends LinkedBlockingQueue<Runnable> { + ThreadPoolExecutor parent = null; + + public TaskQueue() { + super(); + } + + public TaskQueue(int initialCapacity) { + super(initialCapacity); + } + + public TaskQueue(Collection<? extends Runnable> c) { + super(c); + } + + public void setParent(ThreadPoolExecutor tp) { + parent = tp; + } + + public boolean force(Runnable o) { + if ( parent.isShutdown() ) throw new RejectedExecutionException("Executor not running, can't force a command into the queue"); + return super.offer(o); //forces the item onto the queue, to be used if the task is rejected + } + + public boolean offer(Runnable o) { + //we can't do any checks + if (parent==null) return super.offer(o); + //we are maxed out on threads, simply queue the object + if (parent.getPoolSize() == parent.getMaximumPoolSize()) return super.offer(o); + //we have idle threads, just add it to the queue + //this is an approximation, so it could use some tuning + if (parent.getActiveCount()<(parent.getPoolSize())) return super.offer(o); + //if we have less threads than maximum force creation of a new thread + if (parent.getPoolSize()<parent.getMaximumPoolSize()) return false; + //if we reached here, we need to add it to the queue + return super.offer(o); + } + } +} Propchange: tomcat/trunk/java/org/apache/catalina/tribes/util/ExecutorFactory.java ------------------------------------------------------------------------------ svn:eol-style = native --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org For additional commands, e-mail: dev-h...@tomcat.apache.org