Author: fhanik Date: Tue Jul 14 17:49:03 2009 New Revision: 793991 URL: http://svn.apache.org/viewvc?rev=793991&view=rev Log: Add experimental new queue, runs faster but is not yet complete and needs some work around concurrency, iterators and remove operations
Added: tomcat/trunk/modules/jdbc-pool/java/org/apache/tomcat/jdbc/pool/MultiLockFairBlockingQueue.java (with props) Added: tomcat/trunk/modules/jdbc-pool/java/org/apache/tomcat/jdbc/pool/MultiLockFairBlockingQueue.java URL: http://svn.apache.org/viewvc/tomcat/trunk/modules/jdbc-pool/java/org/apache/tomcat/jdbc/pool/MultiLockFairBlockingQueue.java?rev=793991&view=auto ============================================================================== --- tomcat/trunk/modules/jdbc-pool/java/org/apache/tomcat/jdbc/pool/MultiLockFairBlockingQueue.java (added) +++ tomcat/trunk/modules/jdbc-pool/java/org/apache/tomcat/jdbc/pool/MultiLockFairBlockingQueue.java Tue Jul 14 17:49:03 2009 @@ -0,0 +1,537 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tomcat.jdbc.pool; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.NoSuchElementException; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReentrantLock; + +/** + * <b>EXPERIMENTAL AND NOT YET COMPLETE!</b> + * + * + * An implementation of a blocking queue with fairness waiting and lock dispersal to avoid contention. + * invocations to method poll(...) will get handed out in the order they were received. + * Locking is fine grained, a shared lock is only used during the first level of contention, waiting is done in a + * lock per thread basis so that order is guaranteed once the thread goes into a suspended monitor state. + * <br/> + * Not all of the methods of the {...@link java.util.concurrent.BlockingQueue} are implemented. + * @author Filip Hanik + * + */ + +public class MultiLockFairBlockingQueue<E> implements BlockingQueue<E> { + + final int LOCK_COUNT = Runtime.getRuntime().availableProcessors(); + + final AtomicInteger putQueue = new AtomicInteger(0); + final AtomicInteger pollQueue = new AtomicInteger(0); + + public int getNextPut() { + int idx = Math.abs(putQueue.incrementAndGet()) % LOCK_COUNT; + return idx; + } + + public int getNextPoll() { + int idx = Math.abs(pollQueue.incrementAndGet()) % LOCK_COUNT; + return idx; + } + /** + * Phase one entry lock in order to give out + * per-thread-locks for the waiting phase we have + * a phase one lock during the contention period. + */ + final ReentrantLock[] locks = new ReentrantLock[LOCK_COUNT]; + + /** + * All the objects in the pool are stored in a simple linked list + */ + final LinkedList<E>[] items; + + /** + * All threads waiting for an object are stored in a linked list + */ + final LinkedList<ExchangeCountDownLatch<E>>[] waiters; + + /** + * Creates a new fair blocking queue. + */ + public MultiLockFairBlockingQueue() { + items = new LinkedList[LOCK_COUNT]; + waiters = new LinkedList[LOCK_COUNT]; + for (int i=0; i<LOCK_COUNT; i++) { + items[i] = new LinkedList<E>(); + waiters[i] = new LinkedList<ExchangeCountDownLatch<E>>(); + locks[i] = new ReentrantLock(false); + } + } + + //------------------------------------------------------------------ + // USED BY CONPOOL IMPLEMENTATION + //------------------------------------------------------------------ + /** + * Will always return true, queue is unbounded. + * {...@inheritdoc} + */ + public boolean offer(E e) { + int idx = getNextPut(); + //during the offer, we will grab the main lock + final ReentrantLock lock = this.locks[idx]; + lock.lock(); + ExchangeCountDownLatch<E> c = null; + try { + //check to see if threads are waiting for an object + if (waiters[idx].size() > 0) { + //if threads are waiting grab the latch for that thread + c = waiters[idx].poll(); + //give the object to the thread instead of adding it to the pool + c.setItem(e); + } else { + //we always add first, so that the most recently used object will be given out + items[idx].addFirst(e); + } + } finally { + lock.unlock(); + } + //if we exchanged an object with another thread, wake it up. + if (c!=null) c.countDown(); + //we have an unbounded queue, so always return true + return true; + } + + /** + * Will never timeout, as it invokes the {...@link #offer(Object)} method. + * Once a lock has been acquired, the + * {...@inheritdoc} + */ + public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { + return offer(e); + } + + /** + * Fair retrieval of an object in the queue. + * Objects are returned in the order the threads requested them. + * {...@inheritdoc} + */ + public E poll(long timeout, TimeUnit unit) throws InterruptedException { + int idx = getNextPoll(); + E result = null; + final ReentrantLock lock = this.locks[idx]; + boolean error = true; + //acquire the global lock until we know what to do + lock.lock(); + try { + //check to see if we have objects + result = items[idx].poll(); + if (result==null && timeout>0) { + //the queue is empty we will wait for an object + ExchangeCountDownLatch<E> c = new ExchangeCountDownLatch<E>(1); + //add to the bottom of the wait list + waiters[idx].addLast(c); + //unlock the global lock + lock.unlock(); + //wait for the specified timeout + if (!c.await(timeout, unit)) { + //if we timed out, remove ourselves from the waitlist + lock.lock(); + waiters[idx].remove(c); + lock.unlock(); + } + //return the item we received, can be null if we timed out + result = c.getItem(); + } else { + //we have an object, release + lock.unlock(); + } + error = false; + } finally { + if (error && lock.isHeldByCurrentThread()) { + lock.unlock(); + } + } + return result; + } + + /** + * Request an item from the queue asynchronously + * @return - a future pending the result from the queue poll request + */ + public Future<E> pollAsync() { + int idx = getNextPoll(); + Future<E> result = null; + final ReentrantLock lock = this.locks[idx]; + boolean error = true; + //grab the global lock + lock.lock(); + try { + //check to see if we have objects in the queue + E item = items[idx].poll(); + if (item==null) { + //queue is empty, add ourselves as waiters + ExchangeCountDownLatch<E> c = new ExchangeCountDownLatch<E>(1); + waiters[idx].addLast(c); + lock.unlock(); + //return a future that will wait for the object + result = new ItemFuture(c); + } else { + lock.unlock(); + //return a future with the item + result = new ItemFuture(item); + } + error = false; + } finally { + if (error && lock.isHeldByCurrentThread()) { + lock.unlock(); + } + } + return result; + } + + /** + * {...@inheritdoc} + */ + public boolean remove(Object e) { + for (int idx=0; idx<LOCK_COUNT; idx++) { + final ReentrantLock lock = this.locks[idx]; + lock.lock(); + try { + boolean result = items[idx].remove(e); + if (result) return result; + } finally { + lock.unlock(); + } + } + return false; + } + + /** + * {...@inheritdoc} + */ + public int size() { + int size = 0; + for (int idx=0; idx<LOCK_COUNT; idx++) { + size += items[idx].size(); + } + return size; + } + + /** + * {...@inheritdoc} + */ + public Iterator<E> iterator() { + return new FairIterator(); + } + + /** + * {...@inheritdoc} + */ + public E poll() { + int idx = getNextPoll(); + final ReentrantLock lock = this.locks[idx]; + lock.lock(); + try { + return items[idx].poll(); + } finally { + lock.unlock(); + } + } + + /** + * {...@inheritdoc} + */ + public boolean contains(Object e) { + for (int idx=0; idx<LOCK_COUNT; idx++) { + boolean result = items[idx].contains(e); + if (result) return result; + } + return false; + } + + + //------------------------------------------------------------------ + // NOT USED BY CONPOOL IMPLEMENTATION + //------------------------------------------------------------------ + /** + * {...@inheritdoc} + */ + public boolean add(E e) { + return offer(e); + } + + /** + * {...@inheritdoc} + * @throws UnsupportedOperation - this operation is not supported + */ + public int drainTo(Collection<? super E> c, int maxElements) { + throw new UnsupportedOperationException("int drainTo(Collection<? super E> c, int maxElements)"); + } + + /** + * {...@inheritdoc} + * @throws UnsupportedOperation - this operation is not supported + */ + public int drainTo(Collection<? super E> c) { + return drainTo(c,Integer.MAX_VALUE); + } + + /** + * {...@inheritdoc} + */ + public void put(E e) throws InterruptedException { + offer(e); + } + + /** + * {...@inheritdoc} + */ + public int remainingCapacity() { + return Integer.MAX_VALUE - size(); + } + + /** + * {...@inheritdoc} + */ + public E take() throws InterruptedException { + return this.poll(Long.MAX_VALUE, TimeUnit.MILLISECONDS); + } + + /** + * {...@inheritdoc} + */ + public boolean addAll(Collection<? extends E> c) { + Iterator i = c.iterator(); + while (i.hasNext()) { + E e = (E)i.next(); + offer(e); + } + return true; + } + + /** + * {...@inheritdoc} + * @throws UnsupportedOperation - this operation is not supported + */ + public void clear() { + throw new UnsupportedOperationException("void clear()"); + + } + + /** + * {...@inheritdoc} + * @throws UnsupportedOperation - this operation is not supported + */ + public boolean containsAll(Collection<?> c) { + throw new UnsupportedOperationException("boolean containsAll(Collection<?> c)"); + } + + /** + * {...@inheritdoc} + */ + public boolean isEmpty() { + return size() == 0; + } + + /** + * {...@inheritdoc} + * @throws UnsupportedOperation - this operation is not supported + */ + public boolean removeAll(Collection<?> c) { + throw new UnsupportedOperationException("boolean removeAll(Collection<?> c)"); + } + + /** + * {...@inheritdoc} + * @throws UnsupportedOperation - this operation is not supported + */ + public boolean retainAll(Collection<?> c) { + throw new UnsupportedOperationException("boolean retainAll(Collection<?> c)"); + } + + /** + * {...@inheritdoc} + * @throws UnsupportedOperation - this operation is not supported + */ + public Object[] toArray() { + throw new UnsupportedOperationException("Object[] toArray()"); + } + + /** + * {...@inheritdoc} + * @throws UnsupportedOperation - this operation is not supported + */ + public <T> T[] toArray(T[] a) { + throw new UnsupportedOperationException("<T> T[] toArray(T[] a)"); + } + + /** + * {...@inheritdoc} + * @throws UnsupportedOperation - this operation is not supported + */ + public E element() { + throw new UnsupportedOperationException("E element()"); + } + + /** + * {...@inheritdoc} + * @throws UnsupportedOperation - this operation is not supported + */ + public E peek() { + throw new UnsupportedOperationException("E peek()"); + } + + /** + * {...@inheritdoc} + * @throws UnsupportedOperation - this operation is not supported + */ + public E remove() { + throw new UnsupportedOperationException("E remove()"); + } + + + + //------------------------------------------------------------------ + // Non cancellable Future used to check and see if a connection has been made available + //------------------------------------------------------------------ + protected class ItemFuture<T> implements Future<T> { + protected volatile T item = null; + protected volatile ExchangeCountDownLatch<T> latch = null; + protected volatile boolean canceled = false; + + public ItemFuture(T item) { + this.item = item; + } + + public ItemFuture(ExchangeCountDownLatch<T> latch) { + this.latch = latch; + } + + public boolean cancel(boolean mayInterruptIfRunning) { + return false; //don't allow cancel for now + } + + public T get() throws InterruptedException, ExecutionException { + if (item!=null) { + return item; + } else if (latch!=null) { + latch.await(); + return latch.getItem(); + } else { + throw new ExecutionException("ItemFuture incorrectly instantiated. Bug in the code?", new Exception()); + } + } + + public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + if (item!=null) { + return item; + } else if (latch!=null) { + boolean timedout = !latch.await(timeout, unit); + if (timedout) throw new TimeoutException(); + else return latch.getItem(); + } else { + throw new ExecutionException("ItemFuture incorrectly instantiated. Bug in the code?", new Exception()); + } + } + + public boolean isCancelled() { + return false; + } + + public boolean isDone() { + return (item!=null || latch.getItem()!=null); + } + + } + + //------------------------------------------------------------------ + // Count down latch that can be used to exchange information + //------------------------------------------------------------------ + protected class ExchangeCountDownLatch<T> extends CountDownLatch { + protected volatile T item; + public ExchangeCountDownLatch(int i) { + super(i); + } + public T getItem() { + return item; + } + public void setItem(T item) { + this.item = item; + } + } + + //------------------------------------------------------------------ + // Iterator safe from concurrent modification exceptions + //------------------------------------------------------------------ + protected class FairIterator implements Iterator<E> { + E[] elements = null; + int index; + E element = null; + + public FairIterator() { + ArrayList<E> list = new ArrayList<E>(MultiLockFairBlockingQueue.this.size()); + for (int idx=0; idx<LOCK_COUNT; idx++) { + final ReentrantLock lock = MultiLockFairBlockingQueue.this.locks[idx]; + lock.lock(); + try { + elements = (E[]) new Object[MultiLockFairBlockingQueue.this.items[idx].size()]; + MultiLockFairBlockingQueue.this.items[idx].toArray(elements); + + } finally { + lock.unlock(); + } + } + index = 0; + elements = (E[]) new Object[list.size()]; + list.toArray(elements); + } + public boolean hasNext() { + return index<elements.length; + } + + public E next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + element = elements[index++]; + return element; + } + + public void remove() { + for (int idx=0; idx<LOCK_COUNT; idx++) { + final ReentrantLock lock = MultiLockFairBlockingQueue.this.locks[idx]; + lock.lock(); + try { + boolean result = MultiLockFairBlockingQueue.this.items[idx].remove(elements[index]); + if (result) break; + } finally { + lock.unlock(); + } + } + + } + + } +} Propchange: tomcat/trunk/modules/jdbc-pool/java/org/apache/tomcat/jdbc/pool/MultiLockFairBlockingQueue.java ------------------------------------------------------------------------------ svn:eol-style = native --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org For additional commands, e-mail: dev-h...@tomcat.apache.org