Author: fhanik
Date: Tue Jul 14 17:49:03 2009
New Revision: 793991

URL: http://svn.apache.org/viewvc?rev=793991&view=rev
Log:
Add experimental new queue, runs faster but is not yet complete and needs some 
work around concurrency, iterators and remove operations

Added:
    
tomcat/trunk/modules/jdbc-pool/java/org/apache/tomcat/jdbc/pool/MultiLockFairBlockingQueue.java
   (with props)

Added: 
tomcat/trunk/modules/jdbc-pool/java/org/apache/tomcat/jdbc/pool/MultiLockFairBlockingQueue.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/modules/jdbc-pool/java/org/apache/tomcat/jdbc/pool/MultiLockFairBlockingQueue.java?rev=793991&view=auto
==============================================================================
--- 
tomcat/trunk/modules/jdbc-pool/java/org/apache/tomcat/jdbc/pool/MultiLockFairBlockingQueue.java
 (added)
+++ 
tomcat/trunk/modules/jdbc-pool/java/org/apache/tomcat/jdbc/pool/MultiLockFairBlockingQueue.java
 Tue Jul 14 17:49:03 2009
@@ -0,0 +1,537 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tomcat.jdbc.pool;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.NoSuchElementException;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * <b>EXPERIMENTAL AND NOT YET COMPLETE!</b>
+ *
+ *
+ * An implementation of a blocking queue with fairness waiting and lock 
dispersal to avoid contention.
+ * invocations to method poll(...) will get handed out in the order they were 
received.
+ * Locking is fine grained, a shared lock is only used during the first level 
of contention, waiting is done in a 
+ * lock per thread basis so that order is guaranteed once the thread goes into 
a suspended monitor state.
+ * <br/>
+ * Not all of the methods of the {...@link java.util.concurrent.BlockingQueue} 
are implemented.
+ * @author Filip Hanik
+ *
+ */
+
+public class MultiLockFairBlockingQueue<E> implements BlockingQueue<E> {
+    
+    final int LOCK_COUNT = Runtime.getRuntime().availableProcessors();
+    
+    final AtomicInteger putQueue = new AtomicInteger(0);
+    final AtomicInteger pollQueue = new AtomicInteger(0);
+    
+    public int getNextPut() {
+        int idx = Math.abs(putQueue.incrementAndGet()) % LOCK_COUNT;
+        return idx;
+    }
+    
+    public int getNextPoll() {
+        int idx = Math.abs(pollQueue.incrementAndGet()) % LOCK_COUNT;
+        return idx;
+    }
+    /**
+     * Phase one entry lock in order to give out 
+     * per-thread-locks for the waiting phase we have 
+     * a phase one lock during the contention period.
+     */
+    final ReentrantLock[] locks = new ReentrantLock[LOCK_COUNT];
+
+    /**
+     * All the objects in the pool are stored in a simple linked list
+     */
+    final LinkedList<E>[] items;
+
+    /**
+     * All threads waiting for an object are stored in a linked list
+     */
+    final LinkedList<ExchangeCountDownLatch<E>>[] waiters;
+    
+    /**
+     * Creates a new fair blocking queue.
+     */
+    public MultiLockFairBlockingQueue() {
+        items = new LinkedList[LOCK_COUNT];
+        waiters = new LinkedList[LOCK_COUNT];
+        for (int i=0; i<LOCK_COUNT; i++) {
+            items[i] = new LinkedList<E>();
+            waiters[i] = new LinkedList<ExchangeCountDownLatch<E>>();
+            locks[i] = new ReentrantLock(false);
+        }
+    }
+
+    //------------------------------------------------------------------
+    // USED BY CONPOOL IMPLEMENTATION
+    //------------------------------------------------------------------
+    /**
+     * Will always return true, queue is unbounded.
+     * {...@inheritdoc}
+     */
+    public boolean offer(E e) {
+        int idx = getNextPut();
+        //during the offer, we will grab the main lock
+        final ReentrantLock lock = this.locks[idx];
+        lock.lock();
+        ExchangeCountDownLatch<E> c = null;
+        try {
+            //check to see if threads are waiting for an object
+            if (waiters[idx].size() > 0) {
+                //if threads are waiting grab the latch for that thread
+                c = waiters[idx].poll();
+                //give the object to the thread instead of adding it to the 
pool
+                c.setItem(e);
+            } else {
+                //we always add first, so that the most recently used object 
will be given out
+                items[idx].addFirst(e);
+            }
+        } finally {
+            lock.unlock();
+        }
+        //if we exchanged an object with another thread, wake it up.
+        if (c!=null) c.countDown();
+        //we have an unbounded queue, so always return true
+        return true;
+    }
+
+    /**
+     * Will never timeout, as it invokes the {...@link #offer(Object)} method.
+     * Once a lock has been acquired, the  
+     * {...@inheritdoc}
+     */
+    public boolean offer(E e, long timeout, TimeUnit unit) throws 
InterruptedException {
+        return offer(e);
+    }
+
+    /**
+     * Fair retrieval of an object in the queue.
+     * Objects are returned in the order the threads requested them.
+     * {...@inheritdoc}
+     */
+    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
+        int idx = getNextPoll();
+        E result = null;
+        final ReentrantLock lock = this.locks[idx];
+        boolean error = true;
+        //acquire the global lock until we know what to do
+        lock.lock();
+        try {
+            //check to see if we have objects
+            result = items[idx].poll();
+            if (result==null && timeout>0) {
+                //the queue is empty we will wait for an object
+                ExchangeCountDownLatch<E> c = new ExchangeCountDownLatch<E>(1);
+                //add to the bottom of the wait list
+                waiters[idx].addLast(c);
+                //unlock the global lock
+                lock.unlock();
+                //wait for the specified timeout
+                if (!c.await(timeout, unit)) {
+                    //if we timed out, remove ourselves from the waitlist
+                    lock.lock();
+                    waiters[idx].remove(c);
+                    lock.unlock();
+                }
+                //return the item we received, can be null if we timed out
+                result = c.getItem();
+            } else {
+                //we have an object, release
+                lock.unlock();
+            }
+            error = false;
+        } finally {
+            if (error && lock.isHeldByCurrentThread()) {
+                lock.unlock();
+            }
+        }
+        return result;
+    }
+    
+    /**
+     * Request an item from the queue asynchronously
+     * @return - a future pending the result from the queue poll request
+     */
+    public Future<E> pollAsync() {
+        int idx = getNextPoll();
+        Future<E> result = null;
+        final ReentrantLock lock = this.locks[idx];
+        boolean error = true;
+        //grab the global lock
+        lock.lock();
+        try {
+            //check to see if we have objects in the queue
+            E item = items[idx].poll();
+            if (item==null) {
+                //queue is empty, add ourselves as waiters
+                ExchangeCountDownLatch<E> c = new ExchangeCountDownLatch<E>(1);
+                waiters[idx].addLast(c);
+                lock.unlock();
+                //return a future that will wait for the object
+                result = new ItemFuture(c);
+            } else {
+                lock.unlock();
+                //return a future with the item
+                result = new ItemFuture(item);
+            }
+            error = false;
+        } finally {
+            if (error && lock.isHeldByCurrentThread()) {
+                lock.unlock();
+            }
+        }
+        return result;
+    }
+    
+    /**
+     * {...@inheritdoc}
+     */
+    public boolean remove(Object e) {
+        for (int idx=0; idx<LOCK_COUNT; idx++) {
+            final ReentrantLock lock = this.locks[idx];
+            lock.lock();
+            try {
+                boolean result = items[idx].remove(e);
+                if (result) return result;
+            } finally {
+                lock.unlock();
+            }
+        }
+        return false;
+    }
+    
+    /**
+     * {...@inheritdoc}
+     */
+    public int size() {
+        int size = 0;
+        for (int idx=0; idx<LOCK_COUNT; idx++) {
+            size += items[idx].size();
+        }
+        return size;
+    }
+
+    /**
+     * {...@inheritdoc}
+     */
+    public Iterator<E> iterator() {
+        return new FairIterator();
+    }
+
+    /**
+     * {...@inheritdoc}
+     */
+    public E poll() {
+        int idx = getNextPoll();
+        final ReentrantLock lock = this.locks[idx];
+        lock.lock();
+        try {
+            return items[idx].poll();
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    /**
+     * {...@inheritdoc}
+     */
+    public boolean contains(Object e) {
+        for (int idx=0; idx<LOCK_COUNT; idx++) {
+            boolean result = items[idx].contains(e);
+            if (result) return result;
+        }
+        return false;
+    }
+
+
+    //------------------------------------------------------------------
+    // NOT USED BY CONPOOL IMPLEMENTATION
+    //------------------------------------------------------------------
+    /**
+     * {...@inheritdoc}
+     */
+    public boolean add(E e) {
+        return offer(e);
+    }
+
+    /**
+     * {...@inheritdoc}
+     * @throws UnsupportedOperation - this operation is not supported
+     */
+    public int drainTo(Collection<? super E> c, int maxElements) {
+        throw new UnsupportedOperationException("int drainTo(Collection<? 
super E> c, int maxElements)");
+    }
+
+    /**
+     * {...@inheritdoc}
+     * @throws UnsupportedOperation - this operation is not supported
+     */
+    public int drainTo(Collection<? super E> c) {
+        return drainTo(c,Integer.MAX_VALUE);
+    }
+
+    /**
+     * {...@inheritdoc}
+     */
+    public void put(E e) throws InterruptedException {
+        offer(e);
+    }
+
+    /**
+     * {...@inheritdoc}
+     */
+    public int remainingCapacity() {
+        return Integer.MAX_VALUE - size();
+    }
+
+    /**
+     * {...@inheritdoc}
+     */
+    public E take() throws InterruptedException {
+        return this.poll(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
+    }
+
+    /**
+     * {...@inheritdoc}
+     */
+    public boolean addAll(Collection<? extends E> c) {
+        Iterator i = c.iterator();
+        while (i.hasNext()) {
+            E e = (E)i.next();
+            offer(e);
+        }
+        return true;
+    }
+
+    /**
+     * {...@inheritdoc}
+     * @throws UnsupportedOperation - this operation is not supported
+     */
+    public void clear() {
+        throw new UnsupportedOperationException("void clear()");
+
+    }
+
+    /**
+     * {...@inheritdoc}
+     * @throws UnsupportedOperation - this operation is not supported
+     */
+    public boolean containsAll(Collection<?> c) {
+        throw new UnsupportedOperationException("boolean 
containsAll(Collection<?> c)");
+    }
+
+    /**
+     * {...@inheritdoc}
+     */
+    public boolean isEmpty() {
+        return size() == 0;
+    }
+
+    /**
+     * {...@inheritdoc}
+     * @throws UnsupportedOperation - this operation is not supported
+     */
+    public boolean removeAll(Collection<?> c) {
+        throw new UnsupportedOperationException("boolean 
removeAll(Collection<?> c)");
+    }
+
+    /**
+     * {...@inheritdoc}
+     * @throws UnsupportedOperation - this operation is not supported
+     */
+    public boolean retainAll(Collection<?> c) {
+        throw new UnsupportedOperationException("boolean 
retainAll(Collection<?> c)");
+    }
+
+    /**
+     * {...@inheritdoc}
+     * @throws UnsupportedOperation - this operation is not supported
+     */
+    public Object[] toArray() {
+        throw new UnsupportedOperationException("Object[] toArray()");
+    }
+
+    /**
+     * {...@inheritdoc}
+     * @throws UnsupportedOperation - this operation is not supported
+     */
+    public <T> T[] toArray(T[] a) {
+        throw new UnsupportedOperationException("<T> T[] toArray(T[] a)");
+    }
+
+    /**
+     * {...@inheritdoc}
+     * @throws UnsupportedOperation - this operation is not supported
+     */
+    public E element() {
+        throw new UnsupportedOperationException("E element()");
+    }
+
+    /**
+     * {...@inheritdoc}
+     * @throws UnsupportedOperation - this operation is not supported
+     */
+    public E peek() {
+        throw new UnsupportedOperationException("E peek()");
+    }
+
+    /**
+     * {...@inheritdoc}
+     * @throws UnsupportedOperation - this operation is not supported
+     */
+    public E remove() {
+        throw new UnsupportedOperationException("E remove()");
+    }
+
+
+
+    //------------------------------------------------------------------
+    // Non cancellable Future used to check and see if a connection has been 
made available
+    //------------------------------------------------------------------
+    protected class ItemFuture<T> implements Future<T> {
+        protected volatile T item = null;
+        protected volatile ExchangeCountDownLatch<T> latch = null;
+        protected volatile boolean canceled = false;
+        
+        public ItemFuture(T item) {
+            this.item = item;
+        }
+        
+        public ItemFuture(ExchangeCountDownLatch<T> latch) {
+            this.latch = latch;
+        }
+        
+        public boolean cancel(boolean mayInterruptIfRunning) {
+            return false; //don't allow cancel for now
+        }
+
+        public T get() throws InterruptedException, ExecutionException {
+            if (item!=null) {
+                return item;
+            } else if (latch!=null) {
+                latch.await();
+                return latch.getItem();
+            } else {
+                throw new ExecutionException("ItemFuture incorrectly 
instantiated. Bug in the code?", new Exception());
+            }
+        }
+
+        public T get(long timeout, TimeUnit unit) throws InterruptedException, 
ExecutionException, TimeoutException {
+            if (item!=null) {
+                return item;
+            } else if (latch!=null) {
+                boolean timedout = !latch.await(timeout, unit);
+                if (timedout) throw new TimeoutException();
+                else return latch.getItem();
+            } else {
+                throw new ExecutionException("ItemFuture incorrectly 
instantiated. Bug in the code?", new Exception());
+            }
+        }
+
+        public boolean isCancelled() {
+            return false;
+        }
+
+        public boolean isDone() {
+            return (item!=null || latch.getItem()!=null);
+        }
+        
+    }
+
+    //------------------------------------------------------------------
+    // Count down latch that can be used to exchange information
+    //------------------------------------------------------------------
+    protected class ExchangeCountDownLatch<T> extends CountDownLatch {
+        protected volatile T item;
+        public ExchangeCountDownLatch(int i) {
+            super(i);
+        }
+        public T getItem() {
+            return item;
+        }
+        public void setItem(T item) {
+            this.item = item;
+        }
+    }
+
+    //------------------------------------------------------------------
+    // Iterator safe from concurrent modification exceptions
+    //------------------------------------------------------------------
+    protected class FairIterator implements Iterator<E> {
+        E[] elements = null;
+        int index;
+        E element = null;
+
+        public FairIterator() {
+            ArrayList<E> list = new 
ArrayList<E>(MultiLockFairBlockingQueue.this.size());
+            for (int idx=0; idx<LOCK_COUNT; idx++) {
+                final ReentrantLock lock = 
MultiLockFairBlockingQueue.this.locks[idx];
+                lock.lock();
+                try {
+                    elements = (E[]) new 
Object[MultiLockFairBlockingQueue.this.items[idx].size()];
+                    
MultiLockFairBlockingQueue.this.items[idx].toArray(elements);
+                    
+                } finally {
+                    lock.unlock();
+                }
+            }
+            index = 0;
+            elements = (E[]) new Object[list.size()];
+            list.toArray(elements);
+        }
+        public boolean hasNext() {
+            return index<elements.length;
+        }
+
+        public E next() {
+            if (!hasNext()) {
+                throw new NoSuchElementException();
+            }
+            element = elements[index++];
+            return element;
+        }
+
+        public void remove() {
+            for (int idx=0; idx<LOCK_COUNT; idx++) {
+                final ReentrantLock lock = 
MultiLockFairBlockingQueue.this.locks[idx];
+                lock.lock();
+                try {
+                    boolean result = 
MultiLockFairBlockingQueue.this.items[idx].remove(elements[index]);
+                    if (result) break;
+                } finally {
+                    lock.unlock();
+                }
+            }
+            
+        }
+
+    }
+}

Propchange: 
tomcat/trunk/modules/jdbc-pool/java/org/apache/tomcat/jdbc/pool/MultiLockFairBlockingQueue.java
------------------------------------------------------------------------------
    svn:eol-style = native



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org
For additional commands, e-mail: dev-h...@tomcat.apache.org

Reply via email to