Author: fhanik
Date: Tue Dec 9 12:56:59 2008
New Revision: 724886
URL: http://svn.apache.org/viewvc?rev=724886&view=rev
Log:
Refactored the thread pooling when using an executor, this gets rid of
duplicate code in the NIO connector as well as in the
org.apache.catalina.core.StandardThreadExecutor package.
I provided a ThreadPoolExecutor with a small extension to the
java.util.concurrent
The connector method setExecutor still take a java.util.concurrent.Executor as
an argument to provide the most flexibility
Added:
tomcat/trunk/java/org/apache/tomcat/util/threads/TaskQueue.java (with
props)
tomcat/trunk/java/org/apache/tomcat/util/threads/TaskThreadFactory.java
(with props)
tomcat/trunk/java/org/apache/tomcat/util/threads/ThreadPoolExecutor.java
(with props)
Modified:
tomcat/trunk/java/org/apache/catalina/core/StandardThreadExecutor.java
tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java
Modified: tomcat/trunk/java/org/apache/catalina/core/StandardThreadExecutor.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/core/StandardThreadExecutor.java?rev=724886&r1=724885&r2=724886&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/catalina/core/StandardThreadExecutor.java
(original)
+++ tomcat/trunk/java/org/apache/catalina/core/StandardThreadExecutor.java Tue
Dec 9 12:56:59 2008
@@ -17,18 +17,16 @@
package org.apache.catalina.core;
-import java.util.Collection;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
import org.apache.catalina.Executor;
import org.apache.catalina.LifecycleException;
import org.apache.catalina.LifecycleListener;
import org.apache.catalina.util.LifecycleSupport;
-import java.util.concurrent.RejectedExecutionException;
+import org.apache.tomcat.util.threads.TaskQueue;
+import org.apache.tomcat.util.threads.TaskThreadFactory;
+import org.apache.tomcat.util.threads.ThreadPoolExecutor;
public class StandardThreadExecutor implements Executor {
@@ -90,7 +88,7 @@
public void start() throws LifecycleException {
lifecycle.fireLifecycleEvent(BEFORE_START_EVENT, null);
TaskQueue taskqueue = new TaskQueue(maxQueueSize);
- TaskThreadFactory tf = new TaskThreadFactory(namePrefix);
+ TaskThreadFactory tf = new
TaskThreadFactory(namePrefix,daemon,getThreadPriority());
lifecycle.fireLifecycleEvent(START_EVENT, null);
executor = new ThreadPoolExecutor(getMinSpareThreads(),
getMaxThreads(), maxIdleTime, TimeUnit.MILLISECONDS,taskqueue, tf);
taskqueue.setParent( (ThreadPoolExecutor) executor);
@@ -107,17 +105,10 @@
public void execute(Runnable command, long timeout, TimeUnit unit) {
if ( executor != null ) {
- try {
- executor.execute(command);
- } catch (RejectedExecutionException rx) {
- //there could have been contention around the queue
- try {
- if ( !( (TaskQueue)
executor.getQueue()).force(command,timeout,unit) ) throw new
RejectedExecutionException("Work queue full.");
- }catch (InterruptedException x) {
- throw new RejectedExecutionException("Interrupted.",x);
- }
- }
- } else throw new IllegalStateException("StandardThreadPool not
started.");
+ executor.execute(command,timeout,unit);
+ } else {
+ throw new IllegalStateException("StandardThreadExecutor not
started.");
+ }
}
@@ -258,71 +249,4 @@
public int getQueueSize() {
return (executor != null) ? executor.getQueue().size() : -1;
}
-
- // ---------------------------------------------- TaskQueue Inner Class
- class TaskQueue extends LinkedBlockingQueue<Runnable> {
- ThreadPoolExecutor parent = null;
-
- public TaskQueue() {
- super();
- }
-
- public TaskQueue(int capacity) {
- super(capacity);
- }
-
- public TaskQueue(Collection<? extends Runnable> c) {
- super(c);
- }
-
- public void setParent(ThreadPoolExecutor tp) {
- parent = tp;
- }
-
- public boolean force(Runnable o) {
- if ( parent.isShutdown() ) throw new
RejectedExecutionException("Executor not running, can't force a command into
the queue");
- return super.offer(o); //forces the item onto the queue, to be
used if the task is rejected
- }
-
- public boolean force(Runnable o, long timeout, TimeUnit unit) throws
InterruptedException {
- if ( parent.isShutdown() ) throw new
RejectedExecutionException("Executor not running, can't force a command into
the queue");
- return super.offer(o,timeout,unit); //forces the item onto the
queue, to be used if the task is rejected
- }
-
- public boolean offer(Runnable o) {
- //we can't do any checks
- if (parent==null) return super.offer(o);
- //we are maxed out on threads, simply queue the object
- if (parent.getPoolSize() == parent.getMaximumPoolSize()) return
super.offer(o);
- //we have idle threads, just add it to the queue
- //this is an approximation, so it could use some tuning
- if (parent.getActiveCount()<(parent.getPoolSize())) return
super.offer(o);
- //if we have less threads than maximum force creation of a new
thread
- if (parent.getPoolSize()<parent.getMaximumPoolSize()) return false;
- //if we reached here, we need to add it to the queue
- return super.offer(o);
- }
- }
-
- // ---------------------------------------------- ThreadFactory Inner Class
- class TaskThreadFactory implements ThreadFactory {
- final ThreadGroup group;
- final AtomicInteger threadNumber = new AtomicInteger(1);
- final String namePrefix;
-
- TaskThreadFactory(String namePrefix) {
- SecurityManager s = System.getSecurityManager();
- group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
- this.namePrefix = namePrefix;
- }
-
- public Thread newThread(Runnable r) {
- Thread t = new Thread(group, r, namePrefix +
threadNumber.getAndIncrement());
- t.setDaemon(daemon);
- t.setPriority(getThreadPriority());
- return t;
- }
- }
-
-
}
Modified: tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java?rev=724886&r1=724885&r2=724886&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java Tue Dec 9
12:56:59 2008
@@ -60,6 +60,8 @@
import org.apache.tomcat.util.net.SecureNioChannel.ApplicationBufferHandler;
import org.apache.tomcat.util.net.jsse.NioX509KeyManager;
import org.apache.tomcat.util.res.StringManager;
+import org.apache.tomcat.util.threads.TaskQueue;
+import org.apache.tomcat.util.threads.TaskThreadFactory;
/**
* NIO tailored thread pool, providing the following services:
@@ -339,8 +341,15 @@
* External Executor based thread pool.
*/
protected Executor executor = null;
- public void setExecutor(Executor executor) { this.executor = executor; }
+ public void setExecutor(Executor executor) {
+ this.executor = executor;
+ this.internalExecutor = (executor==null);
+ }
public Executor getExecutor() { return executor; }
+ /**
+ * Are we using an internal executor
+ */
+ protected boolean internalExecutor = true;
protected boolean useExecutor = true;
/**
@@ -356,12 +365,8 @@
protected int maxThreads = 200;
public void setMaxThreads(int maxThreads) {
this.maxThreads = maxThreads;
- if (running) {
- if (executor!=null) {
- if (executor instanceof ThreadPoolExecutor) {
-
((ThreadPoolExecutor)executor).setMaximumPoolSize(maxThreads);
- }
- }
+ if (running && executor!=null && executor instanceof
ThreadPoolExecutor) {
+ ((ThreadPoolExecutor)executor).setMaximumPoolSize(maxThreads);
}
}
public int getMaxThreads() { return maxThreads; }
@@ -872,10 +877,11 @@
// Create worker collection
if ( executor == null ) {
+ internalExecutor = true;
TaskQueue taskqueue = new TaskQueue();
- TaskThreadFactory tf = new TaskThreadFactory(getName() +
"-exec-");
+ TaskThreadFactory tf = new TaskThreadFactory(getName() +
"-exec-", daemon, getThreadPriority());
executor = new ThreadPoolExecutor(getMinSpareThreads(),
getMaxThreads(), 60, TimeUnit.SECONDS,taskqueue, tf);
- taskqueue.setParent( (ThreadPoolExecutor) executor, this);
+ taskqueue.setParent( (ThreadPoolExecutor) executor);
}
// Start poller threads
@@ -938,13 +944,13 @@
keyCache.clear();
nioChannels.clear();
processorCache.clear();
- if ( executor!=null ) {
+ if ( executor!=null && internalExecutor ) {
if ( executor instanceof ThreadPoolExecutor ) {
//this is our internal one, so we need to shut it down
ThreadPoolExecutor tpe = (ThreadPoolExecutor) executor;
tpe.shutdown();
TaskQueue queue = (TaskQueue) tpe.getQueue();
- queue.setParent(null,null);
+ queue.setParent(null);
}
executor = null;
}
@@ -1955,68 +1961,8 @@
}
}
-
- // ---------------------------------------------- TaskQueue Inner Class
- public static class TaskQueue extends LinkedBlockingQueue<Runnable> {
- ThreadPoolExecutor parent = null;
- NioEndpoint endpoint = null;
-
- public TaskQueue() {
- super();
- }
- public TaskQueue(int initialCapacity) {
- super(initialCapacity);
- }
-
- public TaskQueue(Collection<? extends Runnable> c) {
- super(c);
- }
-
-
- public void setParent(ThreadPoolExecutor tp, NioEndpoint ep) {
- parent = tp;
- this.endpoint = ep;
- }
-
- public boolean offer(Runnable o) {
- //we can't do any checks
- if (parent==null) return super.offer(o);
- //we are maxed out on threads, simply queue the object
- if (parent.getPoolSize() == parent.getMaximumPoolSize()) return
super.offer(o);
- //we have idle threads, just add it to the queue
- //this is an approximation, so it could use some tuning
- if (endpoint.activeSocketProcessors.get()<(parent.getPoolSize()))
return super.offer(o);
- //if we have less threads than maximum force creation of a new
thread
- if (parent.getPoolSize()<parent.getMaximumPoolSize()) return false;
- //if we reached here, we need to add it to the queue
- return super.offer(o);
- }
- }
-
- // ---------------------------------------------- ThreadFactory Inner Class
- class TaskThreadFactory implements ThreadFactory {
- final ThreadGroup group;
- final AtomicInteger threadNumber = new AtomicInteger(1);
- final String namePrefix;
-
- TaskThreadFactory(String namePrefix) {
- SecurityManager s = System.getSecurityManager();
- group = (s != null)? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
- this.namePrefix = namePrefix;
- }
-
- public Thread newThread(Runnable r) {
- Thread t = new Thread(group, r, namePrefix +
threadNumber.getAndIncrement());
- t.setDaemon(daemon);
- t.setPriority(getThreadPriority());
- return t;
- }
- }
-
// ----------------------------------------------- SendfileData Inner Class
-
-
/**
* SendfileData class.
*/
@@ -2029,5 +1975,4 @@
// KeepAlive flag
public boolean keepAlive;
}
-
}
Added: tomcat/trunk/java/org/apache/tomcat/util/threads/TaskQueue.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/threads/TaskQueue.java?rev=724886&view=auto
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/threads/TaskQueue.java (added)
+++ tomcat/trunk/java/org/apache/tomcat/util/threads/TaskQueue.java Tue Dec 9
12:56:59 2008
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tomcat.util.threads;
+
+import java.util.Collection;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+/**
+ * As task queue specifically designed to run with a thread pool executor.
+ * The task queue is optimised to properly utilize threads within
+ * a thread pool executor. If you use a normal queue, the executor will spawn
threads
+ * when there are idle threads and you wont be able to force items unto the
queue itself
+ * @author fhanik
+ *
+ */
+public class TaskQueue extends LinkedBlockingQueue<Runnable> {
+ ThreadPoolExecutor parent = null;
+
+ public TaskQueue() {
+ super();
+ }
+
+ public TaskQueue(int capacity) {
+ super(capacity);
+ }
+
+ public TaskQueue(Collection<? extends Runnable> c) {
+ super(c);
+ }
+
+ public void setParent(ThreadPoolExecutor tp) {
+ parent = tp;
+ }
+
+ public boolean force(Runnable o) {
+ if ( parent.isShutdown() ) throw new
RejectedExecutionException("Executor not running, can't force a command into
the queue");
+ return super.offer(o); //forces the item onto the queue, to be used if
the task is rejected
+ }
+
+ public boolean force(Runnable o, long timeout, TimeUnit unit) throws
InterruptedException {
+ if ( parent.isShutdown() ) throw new
RejectedExecutionException("Executor not running, can't force a command into
the queue");
+ return super.offer(o,timeout,unit); //forces the item onto the queue,
to be used if the task is rejected
+ }
+
+ public boolean offer(Runnable o) {
+ //we can't do any checks
+ if (parent==null) return super.offer(o);
+ //we are maxed out on threads, simply queue the object
+ if (parent.getPoolSize() == parent.getMaximumPoolSize()) return
super.offer(o);
+ //we have idle threads, just add it to the queue
+ if (parent.getActiveCount()<(parent.getPoolSize())) return
super.offer(o);
+ //if we have less threads than maximum force creation of a new thread
+ if (parent.getPoolSize()<parent.getMaximumPoolSize()) return false;
+ //if we reached here, we need to add it to the queue
+ return super.offer(o);
+ }
+}
Propchange: tomcat/trunk/java/org/apache/tomcat/util/threads/TaskQueue.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: tomcat/trunk/java/org/apache/tomcat/util/threads/TaskThreadFactory.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/threads/TaskThreadFactory.java?rev=724886&view=auto
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/threads/TaskThreadFactory.java
(added)
+++ tomcat/trunk/java/org/apache/tomcat/util/threads/TaskThreadFactory.java Tue
Dec 9 12:56:59 2008
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tomcat.util.threads;
+
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
+/**
+ * Simple task thread factory to use to create threads for an executor
implementation.
+ * @author fhanik
+ *
+ */
+public class TaskThreadFactory implements ThreadFactory {
+ final ThreadGroup group;
+ final AtomicInteger threadNumber = new AtomicInteger(1);
+ final String namePrefix;
+ final boolean daemon;
+ final int threadPriority;
+ public TaskThreadFactory(String namePrefix, boolean daemon, int priority) {
+ SecurityManager s = System.getSecurityManager();
+ group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
+ this.namePrefix = namePrefix;
+ this.daemon = daemon;
+ this.threadPriority = priority;
+ }
+
+ public Thread newThread(Runnable r) {
+ Thread t = new Thread(group, r, namePrefix +
threadNumber.getAndIncrement());
+ t.setDaemon(daemon);
+ t.setPriority(threadPriority);
+ return t;
+ }
+
+}
Propchange:
tomcat/trunk/java/org/apache/tomcat/util/threads/TaskThreadFactory.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: tomcat/trunk/java/org/apache/tomcat/util/threads/ThreadPoolExecutor.java
URL:
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/threads/ThreadPoolExecutor.java?rev=724886&view=auto
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/threads/ThreadPoolExecutor.java
(added)
+++ tomcat/trunk/java/org/apache/tomcat/util/threads/ThreadPoolExecutor.java
Tue Dec 9 12:56:59 2008
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tomcat.util.threads;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+/**
+ * Same as a java.util.concurrent.ThreadPoolExecutor but implements a much
more efficient
+ * getActiveCount method, to be used to properly handle the work queue
+ * @author fhanik
+ *
+ */
+public class ThreadPoolExecutor extends
java.util.concurrent.ThreadPoolExecutor {
+
+ final AtomicInteger activeCount = new AtomicInteger(0);
+
+ public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long
keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
+ super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
handler);
+ }
+
+ public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long
keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory
threadFactory,
+ RejectedExecutionHandler handler) {
+ super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, handler);
+ }
+
+ public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long
keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory
threadFactory) {
+ super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory);
+ }
+
+ public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long
keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
+ super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
+ }
+
+ @Override
+ protected void afterExecute(Runnable r, Throwable t) {
+ activeCount.decrementAndGet();
+ }
+
+ @Override
+ protected void beforeExecute(Thread t, Runnable r) {
+ activeCount.incrementAndGet();
+ }
+
+ @Override
+ public int getActiveCount() {
+ return activeCount.get();
+ }
+
+ public void execute(Runnable command, long timeout, TimeUnit unit) {
+
+ }
+
+
+
+
+
+}
Propchange:
tomcat/trunk/java/org/apache/tomcat/util/threads/ThreadPoolExecutor.java
------------------------------------------------------------------------------
svn:eol-style = native
---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]