Author: supun Date: Mon Jan 17 08:04:18 2011 New Revision: 1059789 URL: http://svn.apache.org/viewvc?rev=1059789&view=rev Log: adding a ThreadPoolExecutor with a waterMark to control the threading behavior
Added: axis/axis2/java/transports/trunk/modules/base/src/main/java/org/apache/axis2/transport/base/threads/watermark/ axis/axis2/java/transports/trunk/modules/base/src/main/java/org/apache/axis2/transport/base/threads/watermark/DefaultWaterMarkQueue.java axis/axis2/java/transports/trunk/modules/base/src/main/java/org/apache/axis2/transport/base/threads/watermark/WaterMarkExecutor.java axis/axis2/java/transports/trunk/modules/base/src/main/java/org/apache/axis2/transport/base/threads/watermark/WaterMarkQueue.java axis/axis2/java/transports/trunk/modules/base/src/main/java/org/apache/axis2/transport/base/threads/watermark/WaterMarkRejectionHandler.java axis/axis2/java/transports/trunk/modules/base/src/test/java/org/apache/axis2/transport/base/threads/ axis/axis2/java/transports/trunk/modules/base/src/test/java/org/apache/axis2/transport/base/threads/watermark/ axis/axis2/java/transports/trunk/modules/base/src/test/java/org/apache/axis2/transport/base/threads/watermark/WaterMarkExecutorTest.java Modified: axis/axis2/java/transports/trunk/modules/base/src/main/java/org/apache/axis2/transport/base/threads/NativeWorkerPool.java axis/axis2/java/transports/trunk/modules/base/src/main/java/org/apache/axis2/transport/base/threads/WorkerPoolFactory.java Modified: axis/axis2/java/transports/trunk/modules/base/src/main/java/org/apache/axis2/transport/base/threads/NativeWorkerPool.java URL: http://svn.apache.org/viewvc/axis/axis2/java/transports/trunk/modules/base/src/main/java/org/apache/axis2/transport/base/threads/NativeWorkerPool.java?rev=1059789&r1=1059788&r2=1059789&view=diff ============================================================================== --- axis/axis2/java/transports/trunk/modules/base/src/main/java/org/apache/axis2/transport/base/threads/NativeWorkerPool.java (original) +++ axis/axis2/java/transports/trunk/modules/base/src/main/java/org/apache/axis2/transport/base/threads/NativeWorkerPool.java Mon Jan 17 08:04:18 2011 @@ -19,11 +19,13 @@ package org.apache.axis2.transport.base.threads; +import org.apache.axis2.transport.base.threads.watermark.DefaultWaterMarkQueue; +import org.apache.axis2.transport.base.threads.watermark.WaterMarkExecutor; +import org.apache.axis2.transport.base.threads.watermark.WaterMarkQueue; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import java.util.concurrent.*; -import java.util.concurrent.atomic.AtomicInteger; /** * Worker pool implementation based on java.util.concurrent in JDK 1.5 or later. @@ -33,7 +35,7 @@ public class NativeWorkerPool implements static final Log log = LogFactory.getLog(NativeWorkerPool.class); private final ThreadPoolExecutor executor; - private final LinkedBlockingQueue<Runnable> blockingQueue; + private final BlockingQueue<Runnable> blockingQueue; public NativeWorkerPool(int core, int max, int keepAlive, int queueLength, String threadGroupName, String threadGroupId) { @@ -45,10 +47,123 @@ public class NativeWorkerPool implements (queueLength == -1 ? new LinkedBlockingQueue<Runnable>() : new LinkedBlockingQueue<Runnable>(queueLength)); executor = new ThreadPoolExecutor( - core, max, keepAlive, - TimeUnit.SECONDS, - blockingQueue, - new NativeThreadFactory(new ThreadGroup(threadGroupName), threadGroupId)); + core, max, keepAlive, + TimeUnit.SECONDS, + blockingQueue, + new NativeThreadFactory(new ThreadGroup(threadGroupName), threadGroupId)); + } + + public NativeWorkerPool(int core, int max, int keepAlive, + int queueLength, String threadGroupName, + String threadGroupId, BlockingQueue<Runnable> queue) { + + if (log.isDebugEnabled()) { + log.debug("Using native util.concurrent package.."); + } + + if (queue == null) { + blockingQueue = + (queueLength == -1 ? new LinkedBlockingQueue<Runnable>() + : new LinkedBlockingQueue<Runnable>(queueLength)); + } else { + blockingQueue = queue; + } + + executor = new ThreadPoolExecutor( + core, max, keepAlive, + TimeUnit.SECONDS, + blockingQueue, + new NativeThreadFactory(new ThreadGroup(threadGroupName), threadGroupId)); + } + + public NativeWorkerPool(int core, int max, int keepAlive, + int queueLength, String threadGroupName, + String threadGroupId, BlockingQueue<Runnable> queue, + RejectedExecutionHandler rejectedExecutionHandler) { + + if (log.isDebugEnabled()) { + log.debug("Using native util.concurrent package.."); + } + + if (queue == null) { + blockingQueue = + (queueLength == -1 ? new LinkedBlockingQueue<Runnable>() + : new LinkedBlockingQueue<Runnable>(queueLength)); + } else { + blockingQueue = queue; + } + + executor = new ThreadPoolExecutor( + core, max, keepAlive, + TimeUnit.SECONDS, + blockingQueue, + new NativeThreadFactory(new ThreadGroup(threadGroupName), threadGroupId), + rejectedExecutionHandler); + } + + public NativeWorkerPool(int core, int max, int keepAlive, + int queueLength, int waterMark, String threadGroupName, + String threadGroupId) { + + if (log.isDebugEnabled()) { + log.debug("Using native util.concurrent package.."); + } + + + blockingQueue = + (queueLength == -1 ? new DefaultWaterMarkQueue<Runnable>(waterMark) + : new DefaultWaterMarkQueue<Runnable>(waterMark, queueLength)); + + executor = new WaterMarkExecutor( + core, max, keepAlive, + TimeUnit.SECONDS, + (WaterMarkQueue<Runnable>) blockingQueue, + new NativeThreadFactory(new ThreadGroup(threadGroupName), threadGroupId)); + } + + public NativeWorkerPool(int core, int max, int keepAlive, + int queueLength, int waterMark, String threadGroupName, + String threadGroupId, WaterMarkQueue<Runnable> queue) { + + if (log.isDebugEnabled()) { + log.debug("Using native util.concurrent package.."); + } + + if (queue == null) { + blockingQueue = + (queueLength == -1 ? new DefaultWaterMarkQueue<Runnable>(waterMark) + : new DefaultWaterMarkQueue<Runnable>(waterMark, queueLength)); + } else { + blockingQueue = queue; + } + + executor = new WaterMarkExecutor( + core, max, keepAlive, + TimeUnit.SECONDS, + (WaterMarkQueue<Runnable>) blockingQueue, + new NativeThreadFactory(new ThreadGroup(threadGroupName), threadGroupId)); + } + + public NativeWorkerPool(int core, int max, int keepAlive, + int queueLength, int waterMark, String threadGroupName, + String threadGroupId, + RejectedExecutionHandler rejectedExecutionHandler) { + + if (log.isDebugEnabled()) { + log.debug("Using native util.concurrent package.."); + } + + + blockingQueue = + (queueLength == -1 ? new DefaultWaterMarkQueue<Runnable>(waterMark) + : new DefaultWaterMarkQueue<Runnable>(waterMark, queueLength)); + + executor = new WaterMarkExecutor( + core, max, keepAlive, + TimeUnit.SECONDS, + (WaterMarkQueue<Runnable>) blockingQueue, + new NativeThreadFactory(new ThreadGroup(threadGroupName), threadGroupId), + rejectedExecutionHandler); } public void execute(final Runnable task) { Modified: axis/axis2/java/transports/trunk/modules/base/src/main/java/org/apache/axis2/transport/base/threads/WorkerPoolFactory.java URL: http://svn.apache.org/viewvc/axis/axis2/java/transports/trunk/modules/base/src/main/java/org/apache/axis2/transport/base/threads/WorkerPoolFactory.java?rev=1059789&r1=1059788&r2=1059789&view=diff ============================================================================== --- axis/axis2/java/transports/trunk/modules/base/src/main/java/org/apache/axis2/transport/base/threads/WorkerPoolFactory.java (original) +++ axis/axis2/java/transports/trunk/modules/base/src/main/java/org/apache/axis2/transport/base/threads/WorkerPoolFactory.java Mon Jan 17 08:04:18 2011 @@ -19,6 +19,8 @@ package org.apache.axis2.transport.base.threads; +import java.util.concurrent.BlockingQueue; + /** * Worker pool factory. * For the moment this always creates {@link NativeWorkerPool} instances since @@ -27,8 +29,25 @@ package org.apache.axis2.transport.base. public class WorkerPoolFactory { public static WorkerPool getWorkerPool(int core, int max, int keepAlive, - int queueLength, String threadGroupName, String threadGroupId) { + int queueLength, String threadGroupName, + String threadGroupId) { return new NativeWorkerPool( core, max, keepAlive, queueLength, threadGroupName, threadGroupId); } + + public static WorkerPool getWorkerPool(int core, int max, int keepAlive, + int queueLength, int waterMark, String threadGroupName, + String threadGroupId) { + return new NativeWorkerPool(core, max, keepAlive, + queueLength, waterMark, threadGroupName, + threadGroupId); + } + + public static WorkerPool getWorkerPool(int core, int max, int keepAlive, + int queueLength, String threadGroupName, + String threadGroupId, BlockingQueue<Runnable> queue) { + return new NativeWorkerPool(core, max, keepAlive, + queueLength, threadGroupName, + threadGroupId, queue); + } } Added: axis/axis2/java/transports/trunk/modules/base/src/main/java/org/apache/axis2/transport/base/threads/watermark/DefaultWaterMarkQueue.java URL: http://svn.apache.org/viewvc/axis/axis2/java/transports/trunk/modules/base/src/main/java/org/apache/axis2/transport/base/threads/watermark/DefaultWaterMarkQueue.java?rev=1059789&view=auto ============================================================================== --- axis/axis2/java/transports/trunk/modules/base/src/main/java/org/apache/axis2/transport/base/threads/watermark/DefaultWaterMarkQueue.java (added) +++ axis/axis2/java/transports/trunk/modules/base/src/main/java/org/apache/axis2/transport/base/threads/watermark/DefaultWaterMarkQueue.java Mon Jan 17 08:04:18 2011 @@ -0,0 +1,285 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.axis2.transport.base.threads.watermark; + +import java.util.Collection; +import java.util.Iterator; +import java.util.Queue; +import java.util.concurrent.*; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +/** + * A Default implementation for WaterMarkQueue interface. The implementation uses an + * {@link ArrayBlockingQueue} up to water mark. Then it uses a {@link LinkedBlockingQueue} or + * ArrayBlocking queue from the water mark point. The LinkedBlockingQueue is used if a queue + * size is specified other than the waterMark. + * + * @param <T> + */ +public class DefaultWaterMarkQueue<T> implements WaterMarkQueue<T> { + + private volatile ArrayBlockingQueue<T> waterMarkQueue; + + private volatile Queue<T> afterWaterMarkQueue; + + private Lock lock = new ReentrantLock(); + + /** + * Create a {@link WaterMarkQueue} with a waterMark. The queue will first fill up + * to waterMark. These items will be inserted in to an {@link ArrayBlockingQueue}. + * After this an {@link LinkedBlockingQueue} will be used without a bound. + * + * @param waterMark the waterMark of the queue + */ + public DefaultWaterMarkQueue(int waterMark) { + afterWaterMarkQueue = new LinkedBlockingDeque<T>(); + + waterMarkQueue = new ArrayBlockingQueue<T>(waterMark); + } + + /** + * Create a {@link WaterMarkQueue} with a waterMark. The queue will first fill up + * to waterMark. These items will be inserted in to an {@link ArrayBlockingQueue}. + * After this an {@link LinkedBlockingQueue} will be used with capacity + * <code>size - waterMark.</code> + * + * @param waterMark the waterMark of the queue + * @param size the size of the queue + */ + public DefaultWaterMarkQueue(int waterMark, int size) { + if (waterMark <= size) { + afterWaterMarkQueue = new ArrayBlockingQueue<T>(size - waterMark); + } else { + throw new IllegalArgumentException("Size should be equal or greater than water mark"); + } + + waterMarkQueue = new ArrayBlockingQueue<T>(waterMark); + } + + public boolean add(T t) { + return waterMarkQueue.add(t); + + } + + public boolean offer(T t) { + return waterMarkQueue.offer(t); + } + + public T remove() { + T t = waterMarkQueue.remove(); + tryMoveTasks(); + return t; + } + + public T poll() { + T t = waterMarkQueue.poll(); + tryMoveTasks(); + return t; + } + + public T element() { + return waterMarkQueue.element(); + } + + public T peek() { + return waterMarkQueue.peek(); + } + + public void put(T t) throws InterruptedException { + waterMarkQueue.put(t); + } + + public boolean offer(T t, long l, TimeUnit timeUnit) throws InterruptedException { + return waterMarkQueue.offer(t, l, timeUnit); + } + + public T take() throws InterruptedException { + T t = waterMarkQueue.take(); + tryMoveTasks(); + return t; + } + + public T poll(long l, TimeUnit timeUnit) throws InterruptedException { + T t = waterMarkQueue.poll(l, timeUnit); + tryMoveTasks(); + return t; + } + + public int remainingCapacity() { + return waterMarkQueue.remainingCapacity(); + } + + public boolean remove(Object o) { + boolean b = waterMarkQueue.remove(o); + tryMoveTasks(); + return b; + } + + public boolean containsAll(Collection<?> objects) { + return waterMarkQueue.containsAll(objects); + } + + public boolean addAll(Collection<? extends T> ts) { + return waterMarkQueue.addAll(ts); + } + + public boolean removeAll(Collection<?> objects) { + boolean b = waterMarkQueue.removeAll(objects); + tryMoveTasks(); + + return b; + } + + public boolean retainAll(Collection<?> objects) { + return waterMarkQueue.retainAll(objects); + } + + public void clear() { + waterMarkQueue.clear(); + afterWaterMarkQueue.clear(); + } + + public int size() { + return waterMarkQueue.size() + afterWaterMarkQueue.size(); + } + + public boolean isEmpty() { + tryMoveTasks(); + return waterMarkQueue.isEmpty(); + } + + private void tryMoveTasks() { + if (afterWaterMarkQueue.size() > 0) { + lock.lock(); + try { + while (afterWaterMarkQueue.size() > 0) { + T w = afterWaterMarkQueue.poll(); + boolean offer = waterMarkQueue.offer(w); + if (!offer) { + afterWaterMarkQueue.offer(w); + break; + } + } + } finally { + lock.unlock(); + } + } + } + + public boolean contains(Object o) { + return waterMarkQueue.contains(o) || afterWaterMarkQueue.contains(o); + } + + public Iterator<T> iterator() { + return new IteratorImpl(); + } + + public Object[] toArray() { + return waterMarkQueue.toArray(); + } + + public <T> T[] toArray(T[] ts) { + T[] waterMarkArray = waterMarkQueue.toArray(ts); + T[] afterWaterMarkArray = afterWaterMarkQueue.toArray(ts); + + final int alen = waterMarkArray.length; + final int blen = afterWaterMarkArray.length; + if (alen == 0) { + return afterWaterMarkArray; + } + + if (blen == 0) { + return waterMarkArray; + } + + final T[] result = (T[]) java.lang.reflect.Array. + newInstance(waterMarkArray.getClass().getComponentType(), alen + blen); + System.arraycopy(waterMarkArray, 0, result, 0, alen); + System.arraycopy(afterWaterMarkArray, 0, result, alen, blen); + return result; + } + + public int drainTo(Collection<? super T> objects) { + int n = waterMarkQueue.drainTo(objects); + tryMoveTasks(); + + return n; + } + + public int drainTo(Collection<? super T> objects, int i) { + int n = waterMarkQueue.drainTo(objects, i); + tryMoveTasks(); + return n; + } + + public boolean offerAfter(T t) { + lock.lock(); + try { + return afterWaterMarkQueue.offer(t); + } finally { + lock.unlock(); + } + } + + /** + * Iterator for DefaultWaterMarkQueue + */ + private class IteratorImpl implements Iterator<T> { + Iterator<T> waterMarkIterator = null; + + Iterator<T> afterWaterMarkIterator = null; + + boolean waterMarkQueueDone = false; + + private IteratorImpl() { + waterMarkIterator = waterMarkQueue.iterator(); + afterWaterMarkIterator = afterWaterMarkQueue.iterator(); + + waterMarkQueueDone = false; + } + + public boolean hasNext() { + return waterMarkIterator.hasNext() || afterWaterMarkIterator.hasNext(); + } + + public T next() { + lock.lock(); + try { + if (waterMarkIterator.hasNext()) { + return waterMarkIterator.next(); + } else { + waterMarkQueueDone = true; + return afterWaterMarkIterator.next(); + } + } finally { + lock.unlock(); + } + } + + public void remove() { + if (!waterMarkQueueDone) { + waterMarkIterator.remove(); + } else { + afterWaterMarkIterator.remove(); + } + } + } +} Added: axis/axis2/java/transports/trunk/modules/base/src/main/java/org/apache/axis2/transport/base/threads/watermark/WaterMarkExecutor.java URL: http://svn.apache.org/viewvc/axis/axis2/java/transports/trunk/modules/base/src/main/java/org/apache/axis2/transport/base/threads/watermark/WaterMarkExecutor.java?rev=1059789&view=auto ============================================================================== --- axis/axis2/java/transports/trunk/modules/base/src/main/java/org/apache/axis2/transport/base/threads/watermark/WaterMarkExecutor.java (added) +++ axis/axis2/java/transports/trunk/modules/base/src/main/java/org/apache/axis2/transport/base/threads/watermark/WaterMarkExecutor.java Mon Jan 17 08:04:18 2011 @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.axis2.transport.base.threads.watermark; + +import java.util.concurrent.*; + +/** + * An {@link ExecutorService} that executes each submitted task using + * one of possibly several pooled threads, but the execution happens differently + * from the {@link ThreadPoolExecutor}. In this executor after all the core pool threads + * are used queuing happens until the water mark. If the more tasks are submitted after + * the queue is filled up to the water mark the number of threads increases to max. + * If the number of tasks continue to increase the Queue begins to fill up. If the queue + * is a bounded queue and the queue is completely filled a {@link RejectedExecutionHandler} + * is executed if one specified. Otherwise the task is rejected. + */ +public class WaterMarkExecutor extends ThreadPoolExecutor { + public WaterMarkExecutor(int core, int max, long keepAlive, + TimeUnit timeUnit, WaterMarkQueue<Runnable> queue) { + super(core, max, keepAlive, timeUnit, queue, new WaterMarkRejectionHandler(null)); + } + + public WaterMarkExecutor(int core, int max, long keepAlive, + TimeUnit timeUnit, WaterMarkQueue<Runnable> queue, + ThreadFactory threadFactory) { + super(core, max, keepAlive, + timeUnit, queue, threadFactory, new WaterMarkRejectionHandler(null)); + } + + public WaterMarkExecutor(int core, int max, + long keepAlive, TimeUnit timeUnit, + WaterMarkQueue<Runnable> queue, + RejectedExecutionHandler rejectedExecutionHandler) { + + super(core, max, keepAlive, timeUnit, + queue, new WaterMarkRejectionHandler(rejectedExecutionHandler)); + } + + public WaterMarkExecutor(int core, int max, long keepAlive, + TimeUnit timeUnit, WaterMarkQueue<Runnable> queue, + ThreadFactory threadFactory, + RejectedExecutionHandler rejectedExecutionHandler) { + super(core, max, keepAlive, timeUnit, + queue, threadFactory, new WaterMarkRejectionHandler(rejectedExecutionHandler)); + } +} Added: axis/axis2/java/transports/trunk/modules/base/src/main/java/org/apache/axis2/transport/base/threads/watermark/WaterMarkQueue.java URL: http://svn.apache.org/viewvc/axis/axis2/java/transports/trunk/modules/base/src/main/java/org/apache/axis2/transport/base/threads/watermark/WaterMarkQueue.java?rev=1059789&view=auto ============================================================================== --- axis/axis2/java/transports/trunk/modules/base/src/main/java/org/apache/axis2/transport/base/threads/watermark/WaterMarkQueue.java (added) +++ axis/axis2/java/transports/trunk/modules/base/src/main/java/org/apache/axis2/transport/base/threads/watermark/WaterMarkQueue.java Mon Jan 17 08:04:18 2011 @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.axis2.transport.base.threads.watermark; + +import java.util.concurrent.BlockingQueue; + +/** + * This queue acts as a queue with a mark. The methods exposed by the <code>BlockingQueue</code> + * interface will add elements up to the mark. We call this mark the waterMark. After the + * water mark the all the insertion operations will fails as if the queue is bounded by + * this waterMark. After this to add values to the queue the offerAfter method should be called. + * + * @param <T> The object + */ +public interface WaterMarkQueue<T> extends BlockingQueue<T> { + /** + * Offer the element after the water mark. + * + * @param object object to be inserted + * @return true if the insert is successful + */ + public boolean offerAfter(T object); +} + Added: axis/axis2/java/transports/trunk/modules/base/src/main/java/org/apache/axis2/transport/base/threads/watermark/WaterMarkRejectionHandler.java URL: http://svn.apache.org/viewvc/axis/axis2/java/transports/trunk/modules/base/src/main/java/org/apache/axis2/transport/base/threads/watermark/WaterMarkRejectionHandler.java?rev=1059789&view=auto ============================================================================== --- axis/axis2/java/transports/trunk/modules/base/src/main/java/org/apache/axis2/transport/base/threads/watermark/WaterMarkRejectionHandler.java (added) +++ axis/axis2/java/transports/trunk/modules/base/src/main/java/org/apache/axis2/transport/base/threads/watermark/WaterMarkRejectionHandler.java Mon Jan 17 08:04:18 2011 @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.axis2.transport.base.threads.watermark; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.ThreadPoolExecutor; + +/** + * This class implements the {@link RejectedExecutionHandler} and provide a mechanism for + * having the water mark in the {@link WaterMarkExecutor}. This is an internal class used by + * the {@link WaterMarkExecutor}. + */ +class WaterMarkRejectionHandler implements RejectedExecutionHandler { + RejectedExecutionHandler rejectedExecutionHandler = new ThreadPoolExecutor.AbortPolicy(); + + public WaterMarkRejectionHandler(RejectedExecutionHandler rejectedExecutionHandler) { + if (rejectedExecutionHandler != null) { + this.rejectedExecutionHandler = rejectedExecutionHandler; + } + } + + public void rejectedExecution(Runnable runnable, ThreadPoolExecutor threadPoolExecutor) { + BlockingQueue q = threadPoolExecutor.getQueue(); + if (q instanceof WaterMarkQueue) { + WaterMarkQueue wq = (WaterMarkQueue) q; + + if (!wq.offerAfter(runnable)) { + if (rejectedExecutionHandler != null) { + rejectedExecutionHandler.rejectedExecution(runnable, threadPoolExecutor); + } + } + } + } +} Added: axis/axis2/java/transports/trunk/modules/base/src/test/java/org/apache/axis2/transport/base/threads/watermark/WaterMarkExecutorTest.java URL: http://svn.apache.org/viewvc/axis/axis2/java/transports/trunk/modules/base/src/test/java/org/apache/axis2/transport/base/threads/watermark/WaterMarkExecutorTest.java?rev=1059789&view=auto ============================================================================== --- axis/axis2/java/transports/trunk/modules/base/src/test/java/org/apache/axis2/transport/base/threads/watermark/WaterMarkExecutorTest.java (added) +++ axis/axis2/java/transports/trunk/modules/base/src/test/java/org/apache/axis2/transport/base/threads/watermark/WaterMarkExecutorTest.java Mon Jan 17 08:04:18 2011 @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.axis2.transport.base.threads.watermark; + +import junit.framework.TestCase; + +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +public class WaterMarkExecutorTest extends TestCase { + + private WaterMarkExecutor executor = null; + + private WaterMarkExecutor executor2 = null; + + private final int TASKS = 1000; + + private volatile int runTasks = 0; + + private volatile int[] tasksSubmitted = new int[TASKS]; + + private Lock lock = new ReentrantLock(); + + @Override + protected void setUp() throws Exception { + executor = new WaterMarkExecutor(10, 100, 10, + TimeUnit.SECONDS, new DefaultWaterMarkQueue<Runnable>(100, 500), + new ThreadPoolExecutor.CallerRunsPolicy()); + + executor2 = new WaterMarkExecutor(10, 100, 10, + TimeUnit.SECONDS, new DefaultWaterMarkQueue<Runnable>(100), + new ThreadPoolExecutor.CallerRunsPolicy()); + } + + public void testExecutor() { + for (int i = 0; i < TASKS; i++) { + tasksSubmitted[i] = i + 1; + } + + for (int i = 0; i < TASKS; i++) { + executor.execute(new Test(i + 1, lock)); + } + + // this is an best effort number so we wait another 1 second for + // the executor to finish the tasks + while (executor.getActiveCount() > 0) {} + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + + } + + int tasks = 0; + for (int aTasksSubmitted : tasksSubmitted) { + if (aTasksSubmitted != 0) { + tasks++; + } + } + + assertEquals(TASKS, runTasks); + assertEquals(tasks, 0); + + executor.shutdown(); + + } + + public void testExecutor2() { + for (int i = 0; i < TASKS; i++) { + tasksSubmitted[i] = i + 1; + } + + for (int i = 0; i < TASKS; i++) { + executor2.execute(new Test(i + 1, lock)); + } + + // this is an best effort number so we wait another 1 second for + // the executor to finish the tasks + while (executor2.getActiveCount() > 0) {} + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + + } + + int tasks = 0; + for (int aTasksSubmitted : tasksSubmitted) { + if (aTasksSubmitted != 0) { + tasks++; + } + } + + assertEquals(TASKS, runTasks); + assertEquals(tasks, 0); + + executor2.shutdown(); + + } + + private class Test implements Runnable { + long taskId; + Lock tLock; + + private Test(long taskId, Lock lock) { + this.taskId = taskId; + tLock = lock; + } + + public void run() { + tLock.lock(); + try { + runTasks++; + for (int i = 0; i < TASKS; i++) { + if (taskId == tasksSubmitted[i]) { + tasksSubmitted[i] = 0; + } + } + } finally { + tLock.unlock(); + } + + } + } +}