Author: fhanik
Date: Fri Dec  5 14:02:15 2008
New Revision: 723889

URL: http://svn.apache.org/viewvc?rev=723889&view=rev
Log:
Add the ability to configure a job queue size, and a timeout for how long we 
want to try to add something to the queue.

Modified:
    tomcat/trunk/java/org/apache/catalina/Executor.java
    tomcat/trunk/java/org/apache/catalina/core/StandardThreadExecutor.java

Modified: tomcat/trunk/java/org/apache/catalina/Executor.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/Executor.java?rev=723889&r1=723888&r2=723889&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/catalina/Executor.java (original)
+++ tomcat/trunk/java/org/apache/catalina/Executor.java Fri Dec  5 14:02:15 2008
@@ -16,8 +16,26 @@
  */
 package org.apache.catalina;
 
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+
 
 
 public interface Executor extends java.util.concurrent.Executor, Lifecycle {
     public String getName();
+    
+    /**
+     * Executes the given command at some time in the future.  The command
+     * may execute in a new thread, in a pooled thread, or in the calling
+     * thread, at the discretion of the <tt>Executor</tt> implementation.
+     * If no threads are available, it will be added to the work queue.
+     * If the work queue is full, the system will wait for the specified 
+     * time until it throws a RejectedExecutionException
+     *
+     * @param command the runnable task
+     * @throws RejectedExecutionException if this task cannot be
+     * accepted for execution - the queue is full
+     * @throws NullPointerException if command or unit is null
+     */
+    void execute(Runnable command, long timeout, TimeUnit unit);
 }
\ No newline at end of file

Modified: tomcat/trunk/java/org/apache/catalina/core/StandardThreadExecutor.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/catalina/core/StandardThreadExecutor.java?rev=723889&r1=723888&r2=723889&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/catalina/core/StandardThreadExecutor.java 
(original)
+++ tomcat/trunk/java/org/apache/catalina/core/StandardThreadExecutor.java Fri 
Dec  5 14:02:15 2008
@@ -33,22 +33,51 @@
 public class StandardThreadExecutor implements Executor {
     
     // ---------------------------------------------- Properties
+    /**
+     * Default thread priority
+     */
     protected int threadPriority = Thread.NORM_PRIORITY;
 
+    /**
+     * Run threads in daemon or non-daemon state
+     */
     protected boolean daemon = true;
     
+    /**
+     * Default name prefix for the thread name
+     */
     protected String namePrefix = "tomcat-exec-";
     
+    /**
+     * max number of threads
+     */
     protected int maxThreads = 200;
     
+    /**
+     * min number of threads
+     */
     protected int minSpareThreads = 25;
     
+    /**
+     * idle time in milliseconds
+     */
     protected int maxIdleTime = 60000;
     
+    /**
+     * The executor we use for this component
+     */
     protected ThreadPoolExecutor executor = null;
     
+    /**
+     * the name of this thread pool
+     */
     protected String name;
     
+    /**
+     * The maximum number of elements that can queue up before we reject them
+     */
+    protected int maxQueueSize = Integer.MAX_VALUE;
+    
     private LifecycleSupport lifecycle = new LifecycleSupport(this);
     // ---------------------------------------------- Constructors
     public StandardThreadExecutor() {
@@ -60,7 +89,7 @@
     // ---------------------------------------------- Public Methods
     public void start() throws LifecycleException {
         lifecycle.fireLifecycleEvent(BEFORE_START_EVENT, null);
-        TaskQueue taskqueue = new TaskQueue();
+        TaskQueue taskqueue = new TaskQueue(maxQueueSize);
         TaskThreadFactory tf = new TaskThreadFactory(namePrefix);
         lifecycle.fireLifecycleEvent(START_EVENT, null);
         executor = new ThreadPoolExecutor(getMinSpareThreads(), 
getMaxThreads(), maxIdleTime, TimeUnit.MILLISECONDS,taskqueue, tf);
@@ -76,13 +105,29 @@
         lifecycle.fireLifecycleEvent(AFTER_STOP_EVENT, null);
     }
     
+    public void execute(Runnable command, long timeout, TimeUnit unit) {
+        if ( executor != null ) {
+            try {
+                executor.execute(command);
+            } catch (RejectedExecutionException rx) {
+                //there could have been contention around the queue
+                try {
+                    if ( !( (TaskQueue) 
executor.getQueue()).force(command,timeout,unit) ) throw new 
RejectedExecutionException("Work queue full.");
+                }catch (InterruptedException x) {
+                    throw new RejectedExecutionException("Interrupted.",x);
+                }
+            }
+        } else throw new IllegalStateException("StandardThreadPool not 
started.");
+    }
+    
+    
     public void execute(Runnable command) {
         if ( executor != null ) {
             try {
                 executor.execute(command);
             } catch (RejectedExecutionException rx) {
                 //there could have been contention around the queue
-                if ( !( (TaskQueue) executor.getQueue()).force(command) ) 
throw new RejectedExecutionException();
+                if ( !( (TaskQueue) executor.getQueue()).force(command) ) 
throw new RejectedExecutionException("Work queue full.");
             }
         } else throw new IllegalStateException("StandardThreadPool not 
started.");
     }
@@ -153,6 +198,14 @@
         this.name = name;
     }
     
+    public void setMaxQueueSize(int size) {
+        this.maxQueueSize = size;
+    }
+    
+    public int getMaxQueueSize() {
+        return maxQueueSize;
+    }
+    
     /**
      * Add a LifecycleEvent listener to this component.
      *
@@ -214,8 +267,8 @@
             super();
         }
 
-        public TaskQueue(int initialCapacity) {
-            super(initialCapacity);
+        public TaskQueue(int capacity) {
+            super(capacity);
         }
 
         public TaskQueue(Collection<? extends Runnable> c) {
@@ -231,6 +284,11 @@
             return super.offer(o); //forces the item onto the queue, to be 
used if the task is rejected
         }
 
+        public boolean force(Runnable o, long timeout, TimeUnit unit) throws 
InterruptedException {
+            if ( parent.isShutdown() ) throw new 
RejectedExecutionException("Executor not running, can't force a command into 
the queue");
+            return super.offer(o,timeout,unit); //forces the item onto the 
queue, to be used if the task is rejected
+        }
+
         public boolean offer(Runnable o) {
             //we can't do any checks
             if (parent==null) return super.offer(o);



---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to