Author: markt Date: Wed Sep 26 16:55:56 2012 New Revision: 1390599 URL: http://svn.apache.org/viewvc?rev=1390599&view=rev Log: A slightly different alternative to ConcurrentLinkedQueue. This one is also largely GC free and has similar performance to the Stack version.
Added: tomcat/trunk/java/org/apache/tomcat/util/collections/SynchronizedQueue.java (with props) tomcat/trunk/test/org/apache/tomcat/util/collections/TestSynchronizedQueue.java (with props) tomcat/trunk/test/org/apache/tomcat/util/collections/TesterPerformanceSynchronizedQueue.java (with props) Added: tomcat/trunk/java/org/apache/tomcat/util/collections/SynchronizedQueue.java URL: http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/collections/SynchronizedQueue.java?rev=1390599&view=auto ============================================================================== --- tomcat/trunk/java/org/apache/tomcat/util/collections/SynchronizedQueue.java (added) +++ tomcat/trunk/java/org/apache/tomcat/util/collections/SynchronizedQueue.java Wed Sep 26 16:55:56 2012 @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tomcat.util.collections; + +/** + * This is intended as a (mostly) GC-free alternative to + * {@link java.util.concurrent.ConcurrentLinkedQueue} when the requirement is to + * create an unbounded queue with no requirement to shrink the queue. The aim is + * to provide the bare minimum of required functionality as quickly as possible + * with minimum garbage. + */ +public class SynchronizedQueue<T> { + + public static final int DEFAULT_SIZE = 128; + + private Object[] queue; + private int size; + private int insert = 0; + private int remove = 0; + + public SynchronizedQueue() { + this(DEFAULT_SIZE); + } + + public SynchronizedQueue(int initialSize) { + queue = new Object[initialSize]; + size = initialSize; + } + + public synchronized boolean offer(T t) { + queue[insert++] = t; + + // Wrap + if (insert == size) { + insert = 0; + } + + if (insert == remove) { + expand(); + } + return true; + } + + public synchronized T poll() { + if (insert == remove) { + // empty + return null; + } + + @SuppressWarnings("unchecked") + T result = (T) queue[remove]; + queue[remove] = null; + remove++; + + // Wrap + if (remove == size) { + remove = 0; + } + + return result; + } + + private void expand() { + int newSize = size * 2; + Object[] newQueue = new Object[newSize]; + + System.arraycopy(queue, insert, newQueue, 0, size - insert); + System.arraycopy(queue, 0, newQueue, size - insert, insert); + + insert = size; + remove = 0; + queue = newQueue; + size = newSize; + } + + public synchronized int size() { + int result = insert - remove; + if (result < 0) { + result += size; + } + return result; + } + + public synchronized void clear() { + queue = new Object[size]; + insert = 0; + remove = 0; + } +} Propchange: tomcat/trunk/java/org/apache/tomcat/util/collections/SynchronizedQueue.java ------------------------------------------------------------------------------ svn:eol-style = native Added: tomcat/trunk/test/org/apache/tomcat/util/collections/TestSynchronizedQueue.java URL: http://svn.apache.org/viewvc/tomcat/trunk/test/org/apache/tomcat/util/collections/TestSynchronizedQueue.java?rev=1390599&view=auto ============================================================================== --- tomcat/trunk/test/org/apache/tomcat/util/collections/TestSynchronizedQueue.java (added) +++ tomcat/trunk/test/org/apache/tomcat/util/collections/TestSynchronizedQueue.java Wed Sep 26 16:55:56 2012 @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tomcat.util.collections; + +import org.junit.Assert; +import org.junit.Test; + +public class TestSynchronizedQueue { + + public void testPollEmpty() { + SynchronizedQueue<Object> queue = new SynchronizedQueue<>(); + Assert.assertNull(queue.poll()); + } + + @Test + public void testOfferPollOrder() { + SynchronizedQueue<Object> queue = new SynchronizedQueue<>(); + + Object o1 = new Object(); + Object o2 = new Object(); + Object o3 = new Object(); + Object o4 = new Object(); + + queue.offer(o1); + queue.offer(o2); + queue.offer(o3); + queue.offer(o4); + + Assert.assertSame(queue.poll(), o1); + Assert.assertSame(queue.poll(), o2); + Assert.assertSame(queue.poll(), o3); + Assert.assertSame(queue.poll(), o4); + + Assert.assertNull(queue.poll()); + } + + @Test + public void testExpandOfferPollOrder() { + SynchronizedQueue<Object> queue = new SynchronizedQueue<>(); + + Object o1 = new Object(); + Object o2 = new Object(); + Object o3 = new Object(); + Object o4 = new Object(); + + for (int i = 0; i < 300; i++) { + queue.offer(o1); + queue.offer(o2); + queue.offer(o3); + queue.offer(o4); + } + + for (int i = 0; i < 300; i++) { + Assert.assertSame(queue.poll(), o1); + Assert.assertSame(queue.poll(), o2); + Assert.assertSame(queue.poll(), o3); + Assert.assertSame(queue.poll(), o4); + } + + Assert.assertNull(queue.poll()); + } + + @Test + public void testExpandOfferPollOrder2() { + SynchronizedQueue<Object> queue = new SynchronizedQueue<>(); + + Object o1 = new Object(); + Object o2 = new Object(); + Object o3 = new Object(); + Object o4 = new Object(); + + for (int i = 0; i < 100; i++) { + queue.offer(o1); + queue.offer(o2); + queue.offer(o3); + queue.offer(o4); + } + + for (int i = 0; i < 50; i++) { + Assert.assertSame(queue.poll(), o1); + Assert.assertSame(queue.poll(), o2); + Assert.assertSame(queue.poll(), o3); + Assert.assertSame(queue.poll(), o4); + } + + for (int i = 0; i < 200; i++) { + queue.offer(o1); + queue.offer(o2); + queue.offer(o3); + queue.offer(o4); + } + + for (int i = 0; i < 250; i++) { + Assert.assertSame(queue.poll(), o1); + Assert.assertSame(queue.poll(), o2); + Assert.assertSame(queue.poll(), o3); + Assert.assertSame(queue.poll(), o4); + } + + + Assert.assertNull(queue.poll()); + } +} Propchange: tomcat/trunk/test/org/apache/tomcat/util/collections/TestSynchronizedQueue.java ------------------------------------------------------------------------------ svn:eol-style = native Added: tomcat/trunk/test/org/apache/tomcat/util/collections/TesterPerformanceSynchronizedQueue.java URL: http://svn.apache.org/viewvc/tomcat/trunk/test/org/apache/tomcat/util/collections/TesterPerformanceSynchronizedQueue.java?rev=1390599&view=auto ============================================================================== --- tomcat/trunk/test/org/apache/tomcat/util/collections/TesterPerformanceSynchronizedQueue.java (added) +++ tomcat/trunk/test/org/apache/tomcat/util/collections/TesterPerformanceSynchronizedQueue.java Wed Sep 26 16:55:56 2012 @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tomcat.util.collections; + +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; + +import org.junit.Test; + +public class TesterPerformanceSynchronizedQueue { + + private static final int THREAD_COUNT = 4; + private static final int ITERATIONS = 1000000; + + private static final SynchronizedQueue<Object> S_QUEUE = + new SynchronizedQueue<>(); + + private static final Queue<Object> QUEUE = new ConcurrentLinkedQueue<>(); + + @Test + public void testSynchronizedQueue() throws InterruptedException { + Thread[] threads = new Thread[THREAD_COUNT]; + for (int i = 0; i < THREAD_COUNT; i++) { + threads[i] = new StackThread(); + } + + long start = System.currentTimeMillis(); + + for (int i = 0; i < THREAD_COUNT; i++) { + threads[i].start(); + } + + for (int i = 0; i < THREAD_COUNT; i++) { + threads[i].join(); + } + + long end = System.currentTimeMillis(); + + System.out.println("SynchronizedQueue: " + (end - start) + "ms"); + } + + public static class StackThread extends Thread { + + @Override + public void run() { + for(int i = 0; i < ITERATIONS; i++) { + Object obj = S_QUEUE.poll(); + if (obj == null) { + obj = new Object(); + } + S_QUEUE.offer(obj); + } + super.run(); + } + } + + @Test + public void testConcurrentQueue() throws InterruptedException { + Thread[] threads = new Thread[THREAD_COUNT]; + for (int i = 0; i < THREAD_COUNT; i++) { + threads[i] = new QueueThread(); + } + + long start = System.currentTimeMillis(); + + for (int i = 0; i < THREAD_COUNT; i++) { + threads[i].start(); + } + + for (int i = 0; i < THREAD_COUNT; i++) { + threads[i].join(); + } + + long end = System.currentTimeMillis(); + + System.out.println("ConcurrentLinkedQueue: " + (end - start) + "ms"); + } + + public static class QueueThread extends Thread { + + @Override + public void run() { + for(int i = 0; i < ITERATIONS; i++) { + Object obj = QUEUE.poll(); + if (obj == null) { + obj = new Object(); + } + QUEUE.offer(obj); + } + super.run(); + } + } +} Propchange: tomcat/trunk/test/org/apache/tomcat/util/collections/TesterPerformanceSynchronizedQueue.java ------------------------------------------------------------------------------ svn:eol-style = native --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org For additional commands, e-mail: dev-h...@tomcat.apache.org