Author: tv Date: Sun Dec 18 18:19:37 2016 New Revision: 1774925 URL: http://svn.apache.org/viewvc?rev=1774925&view=rev Log: Clean up API Use LinkedBlockingQueue instead of home-grown implementation
Modified: commons/proper/jcs/trunk/commons-jcs-core/src/main/java/org/apache/commons/jcs/engine/AbstractCacheEventQueue.java commons/proper/jcs/trunk/commons-jcs-core/src/main/java/org/apache/commons/jcs/engine/CacheEventQueue.java commons/proper/jcs/trunk/commons-jcs-core/src/main/java/org/apache/commons/jcs/engine/PooledCacheEventQueue.java Modified: commons/proper/jcs/trunk/commons-jcs-core/src/main/java/org/apache/commons/jcs/engine/AbstractCacheEventQueue.java URL: http://svn.apache.org/viewvc/commons/proper/jcs/trunk/commons-jcs-core/src/main/java/org/apache/commons/jcs/engine/AbstractCacheEventQueue.java?rev=1774925&r1=1774924&r2=1774925&view=diff ============================================================================== --- commons/proper/jcs/trunk/commons-jcs-core/src/main/java/org/apache/commons/jcs/engine/AbstractCacheEventQueue.java (original) +++ commons/proper/jcs/trunk/commons-jcs-core/src/main/java/org/apache/commons/jcs/engine/AbstractCacheEventQueue.java Sun Dec 18 18:19:37 2016 @@ -1,5 +1,8 @@ package org.apache.commons.jcs.engine; +import java.io.IOException; +import java.util.concurrent.atomic.AtomicBoolean; + /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -25,8 +28,6 @@ import org.apache.commons.jcs.engine.beh import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import java.io.IOException; - /** * An abstract base class to the different implementations */ @@ -45,34 +46,32 @@ public abstract class AbstractCacheEvent */ private int waitToDieMillis = DEFAULT_WAIT_TO_DIE_MILLIS; - // TODO privatise the fields - /** - * When the events are pulled off the queue, the tell the listener to handle the specific event + * When the events are pulled off the queue, then tell the listener to handle the specific event * type. The work is done by the listener. */ - protected ICacheListener<K, V> listener; + private ICacheListener<K, V> listener; /** Id of the listener registered with this queue */ - protected long listenerId; + private long listenerId; /** The cache region name, if applicable. */ - protected String cacheName; + private String cacheName; /** Maximum number of failures before we buy the farm. */ - protected int maxFailure; + private int maxFailure; /** in milliseconds */ - protected int waitBeforeRetry; + private int waitBeforeRetry; - /** this is true if there is no worker thread. */ - protected boolean destroyed = true; + /** this is true if there is any worker thread. */ + private final AtomicBoolean alive = new AtomicBoolean(false); /** * This means that the queue is functional. If we reached the max number of failures, the queue * is marked as non functional and will never work again. */ - private boolean working = true; + private final AtomicBoolean working = new AtomicBoolean(true); /** * Returns the time to wait for events before killing the background thread. @@ -111,9 +110,9 @@ public abstract class AbstractCacheEvent * @return The alive value */ @Override - public synchronized boolean isAlive() + public boolean isAlive() { - return !destroyed; + return alive.get(); } /** @@ -121,9 +120,9 @@ public abstract class AbstractCacheEvent * <p> * @param aState */ - public synchronized void setAlive( boolean aState ) + public void setAlive( boolean aState ) { - destroyed = !aState; + alive.set(aState); } /** @@ -136,6 +135,43 @@ public abstract class AbstractCacheEvent } /** + * @return the cacheName + */ + protected String getCacheName() + { + return cacheName; + } + + /** + * Initializes the queue. + * <p> + * @param listener + * @param listenerId + * @param cacheName + * @param maxFailure + * @param waitBeforeRetry + */ + protected void initialize( ICacheListener<K, V> listener, long listenerId, String cacheName, int maxFailure, + int waitBeforeRetry) + { + if ( listener == null ) + { + throw new IllegalArgumentException( "listener must not be null" ); + } + + this.listener = listener; + this.listenerId = listenerId; + this.cacheName = cacheName; + this.maxFailure = maxFailure <= 0 ? 3 : maxFailure; + this.waitBeforeRetry = waitBeforeRetry <= 0 ? 500 : waitBeforeRetry; + + if ( log.isDebugEnabled() ) + { + log.debug( "Constructed: " + this ); + } + } + + /** * This adds a put event to the queue. When it is processed, the element will be put to the * listener. * <p> @@ -150,12 +186,9 @@ public abstract class AbstractCacheEvent { put( new PutEvent( ce ) ); } - else + else if ( log.isWarnEnabled() ) { - if ( log.isWarnEnabled() ) - { - log.warn( "Not enqueuing Put Event for [" + this + "] because it's non-functional." ); - } + log.warn( "Not enqueuing Put Event for [" + this + "] because it's non-functional." ); } } @@ -174,12 +207,9 @@ public abstract class AbstractCacheEvent { put( new RemoveEvent( key ) ); } - else + else if ( log.isWarnEnabled() ) { - if ( log.isWarnEnabled() ) - { - log.warn( "Not enqueuing Remove Event for [" + this + "] because it's non-functional." ); - } + log.warn( "Not enqueuing Remove Event for [" + this + "] because it's non-functional." ); } } @@ -197,12 +227,9 @@ public abstract class AbstractCacheEvent { put( new RemoveAllEvent() ); } - else + else if ( log.isWarnEnabled() ) { - if ( log.isWarnEnabled() ) - { - log.warn( "Not enqueuing RemoveAll Event for [" + this + "] because it's non-functional." ); - } + log.warn( "Not enqueuing RemoveAll Event for [" + this + "] because it's non-functional." ); } } @@ -217,12 +244,9 @@ public abstract class AbstractCacheEvent { put( new DisposeEvent() ); } - else + else if ( log.isWarnEnabled() ) { - if ( log.isWarnEnabled() ) - { - log.warn( "Not enqueuing Dispose Event for [" + this + "] because it's non-functional." ); - } + log.warn( "Not enqueuing Dispose Event for [" + this + "] because it's non-functional." ); } } @@ -235,24 +259,12 @@ public abstract class AbstractCacheEvent // /////////////////////////// Inner classes ///////////////////////////// - - /** The queue is composed of nodes. */ - protected static class Node - { - /** Next node in the singly linked list. */ - Node next = null; - - /** The payload. */ - AbstractCacheEventQueue<?, ?>.AbstractCacheEvent event = null; - } - /** * Retries before declaring failure. * <p> * @author asmuts */ - protected abstract class AbstractCacheEvent - implements Runnable + protected abstract class AbstractCacheEvent implements Runnable { /** Number of failures encountered processing this event. */ int failures = 0; @@ -260,8 +272,8 @@ public abstract class AbstractCacheEvent /** * Main processing method for the AbstractCacheEvent object */ - @SuppressWarnings("synthetic-access") @Override + @SuppressWarnings("synthetic-access") public void run() { try @@ -442,7 +454,6 @@ public abstract class AbstractCacheEvent { return "RemoveAllEvent"; } - } /** @@ -483,7 +494,7 @@ public abstract class AbstractCacheEvent @Override public boolean isWorking() { - return working; + return working.get(); } /** @@ -494,6 +505,6 @@ public abstract class AbstractCacheEvent */ public void setWorking( boolean b ) { - working = b; + working.set(b); } } Modified: commons/proper/jcs/trunk/commons-jcs-core/src/main/java/org/apache/commons/jcs/engine/CacheEventQueue.java URL: http://svn.apache.org/viewvc/commons/proper/jcs/trunk/commons-jcs-core/src/main/java/org/apache/commons/jcs/engine/CacheEventQueue.java?rev=1774925&r1=1774924&r2=1774925&view=diff ============================================================================== --- commons/proper/jcs/trunk/commons-jcs-core/src/main/java/org/apache/commons/jcs/engine/CacheEventQueue.java (original) +++ commons/proper/jcs/trunk/commons-jcs-core/src/main/java/org/apache/commons/jcs/engine/CacheEventQueue.java Sun Dec 18 18:19:37 2016 @@ -1,5 +1,9 @@ package org.apache.commons.jcs.engine; +import java.util.ArrayList; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -27,8 +31,6 @@ import org.apache.commons.jcs.engine.sta import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import java.util.ArrayList; - /** * An event queue is used to propagate ordered cache events to one and only one target listener. * <p> @@ -48,17 +50,8 @@ public class CacheEventQueue<K, V> /** the thread that works the queue. */ private Thread processorThread; - /** sync */ - private final Object queueLock = new Object(); - - /** the head of the queue */ - private Node head = new Node(); - - /** the end of the queue */ - private Node tail = head; - - /** Number of items in the queue */ - private int size = 0; + /** Queue implementation */ + private LinkedBlockingQueue<AbstractCacheEvent> queue = new LinkedBlockingQueue<AbstractCacheEvent>(); /** * Constructs with the specified listener and the cache name. @@ -84,38 +77,7 @@ public class CacheEventQueue<K, V> public CacheEventQueue( ICacheListener<K, V> listener, long listenerId, String cacheName, int maxFailure, int waitBeforeRetry ) { - initialize( listener, listenerId, cacheName, maxFailure, waitBeforeRetry, null ); - } - - /** - * Initializes the queue. - * <p> - * @param listener - * @param listenerId - * @param cacheName - * @param maxFailure - * @param waitBeforeRetry - * @param threadPoolName - */ - @Override - public void initialize( ICacheListener<K, V> listener, long listenerId, String cacheName, int maxFailure, - int waitBeforeRetry, String threadPoolName ) - { - if ( listener == null ) - { - throw new IllegalArgumentException( "listener must not be null" ); - } - - this.listener = listener; - this.listenerId = listenerId; - this.cacheName = cacheName; - this.maxFailure = maxFailure <= 0 ? 3 : maxFailure; - this.waitBeforeRetry = waitBeforeRetry <= 0 ? 500 : waitBeforeRetry; - - if ( log.isDebugEnabled() ) - { - log.debug( "Constructed: " + this ); - } + initialize( listener, listenerId, cacheName, maxFailure, waitBeforeRetry ); } /** @@ -133,13 +95,10 @@ public class CacheEventQueue<K, V> * Kill the processor thread and indicate that the queue is destroyed and no longer alive, but it * can still be working. */ - public void stopProcessing() + protected void stopProcessing() { - synchronized (queueLock) - { - destroyed = true; - processorThread = null; - } + setAlive(false); + processorThread = null; } /** @@ -150,37 +109,31 @@ public class CacheEventQueue<K, V> @Override public void destroy() { - synchronized (queueLock) + if ( isAlive() ) { - if ( !destroyed ) - { - destroyed = true; + setAlive(false); - if ( log.isInfoEnabled() ) - { - log.info( "Destroying queue, stats = " + getStatistics() ); - } - - // Synchronize on queue so the thread will not wait forever, - // and then interrupt the QueueProcessor + if ( log.isInfoEnabled() ) + { + log.info( "Destroying queue, stats = " + getStatistics() ); + } - if ( processorThread != null ) - { - processorThread.interrupt(); - processorThread = null; - } + if ( processorThread != null ) + { + processorThread.interrupt(); + processorThread = null; + } - if ( log.isInfoEnabled() ) - { - log.info( "Cache event queue destroyed: " + this ); - } + if ( log.isInfoEnabled() ) + { + log.info( "Cache event queue destroyed: " + this ); } - else + } + else + { + if ( log.isInfoEnabled() ) { - if ( log.isInfoEnabled() ) - { - log.info( "Destroy was called after queue was destroyed. Doing nothing. Stats = " + getStatistics() ); - } + log.info( "Destroy was called after queue was destroyed. Doing nothing. Stats = " + getStatistics() ); } } } @@ -193,34 +146,23 @@ public class CacheEventQueue<K, V> @Override protected void put( AbstractCacheEvent event ) { - Node newNode = new Node(); if ( log.isDebugEnabled() ) { - log.debug( "Event entering Queue for " + cacheName + ": " + event ); + log.debug( "Event entering Queue for " + getCacheName() + ": " + event ); } - newNode.event = event; + queue.offer(event); - synchronized ( queueLock ) + if ( isWorking() ) { - size++; - tail.next = newNode; - tail = newNode; - if ( isWorking() ) + if ( !isAlive() ) { - if ( !isAlive() ) - { - destroyed = false; - processorThread = new QProcessor( this ); - processorThread.start(); - if ( log.isInfoEnabled() ) - { - log.info( "Cache event queue created: " + this ); - } - } - else + setAlive(true); + processorThread = new QProcessor(); + processorThread.start(); + if ( log.isInfoEnabled() ) { - queueLock.notify(); + log.info( "Cache event queue created: " + this ); } } } @@ -234,23 +176,18 @@ public class CacheEventQueue<K, V> * @author asmuts * @created January 15, 2002 */ - private class QProcessor + protected class QProcessor extends Thread { - /** The queue to work */ - CacheEventQueue<K, V> queue; - /** * Constructor for the QProcessor object * <p> * @param aQueue the event queue to take items from. */ - QProcessor( CacheEventQueue<K, V> aQueue ) + QProcessor() { - super( "CacheEventQueue.QProcessor-" + aQueue.cacheName ); - + super( "CacheEventQueue.QProcessor-" + getCacheName() ); setDaemon( true ); - queue = aQueue; } /** @@ -259,15 +196,22 @@ public class CacheEventQueue<K, V> * Waits for a specified time (waitToDieMillis) for something to come in and if no new * events come in during that period the run method can exit and the thread is dereferenced. */ - @SuppressWarnings("synthetic-access") @Override public void run() { - AbstractCacheEvent event = null; - while ( queue.isAlive() ) + while ( isAlive() ) { - event = queue.take(); + AbstractCacheEvent event = null; + + try + { + event = queue.poll(getWaitToDieMillis(), TimeUnit.MILLISECONDS); + } + catch (InterruptedException e) + { + // is ok + } if ( log.isDebugEnabled() ) { @@ -276,79 +220,18 @@ public class CacheEventQueue<K, V> if ( event == null ) { - synchronized ( queueLock ) - { - try - { - queueLock.wait( queue.getWaitToDieMillis() ); - } - catch ( InterruptedException e ) - { - log.warn( "Interrupted while waiting for another event to come in before we die." ); - return; - } - event = queue.take(); - if ( log.isDebugEnabled() ) - { - log.debug( "Event from queue after sleep = " + event ); - } - } - if ( event == null ) - { - queue.stopProcessing(); - } + stopProcessing(); } - if ( queue.isWorking() && queue.isAlive() && event != null ) + if ( event != null && isWorking() && isAlive() ) { event.run(); } } if ( log.isDebugEnabled() ) { - log.debug( "QProcessor exiting for " + queue ); - } - } - } - - /** - * Returns the next cache event from the queue or null if there are no events in the queue. - * <p> - * We have an empty node at the head and the tail. When we take an item from the queue we move - * the next node to the head and then clear the value from that node. This value is returned. - * <p> - * When the queue is empty the head node is the same as the tail node. - * <p> - * @return An event to process. - */ - protected AbstractCacheEvent take() - { - synchronized ( queueLock ) - { - // wait until there is something to read - if ( head == tail ) - { - return null; - } - - Node node = head.next; - - @SuppressWarnings("unchecked") // No generics for public fields - AbstractCacheEvent value = (AbstractCacheEvent) node.event; - - if ( log.isDebugEnabled() ) - { - log.debug( "head.event = " + head.event ); - log.debug( "node.event = " + node.event ); + log.debug( "QProcessor exiting for " + getCacheName() ); } - - // Node becomes the new head (head is always empty) - - node.event = null; - head = node; - - size--; - return value; } } @@ -366,30 +249,10 @@ public class CacheEventQueue<K, V> ArrayList<IStatElement<?>> elems = new ArrayList<IStatElement<?>>(); - elems.add(new StatElement<Boolean>( "Working", Boolean.valueOf(super.isWorking()) ) ); + elems.add(new StatElement<Boolean>( "Working", Boolean.valueOf(this.isWorking()) ) ); elems.add(new StatElement<Boolean>( "Alive", Boolean.valueOf(this.isAlive()) ) ); elems.add(new StatElement<Boolean>( "Empty", Boolean.valueOf(this.isEmpty()) ) ); - - int sz = 0; - synchronized ( queueLock ) - { - // wait until there is something to read - if ( head == tail ) - { - sz = 0; - } - else - { - Node n = head; - while ( n != null ) - { - n = n.next; - sz++; - } - } - - elems.add(new StatElement<Integer>( "Size", Integer.valueOf(sz) ) ); - } + elems.add(new StatElement<Integer>( "Size", Integer.valueOf(this.size()) ) ); stats.setStatElements( elems ); @@ -402,7 +265,7 @@ public class CacheEventQueue<K, V> @Override public boolean isEmpty() { - return tail == head; + return queue.isEmpty(); } /** @@ -413,6 +276,6 @@ public class CacheEventQueue<K, V> @Override public int size() { - return size; + return queue.size(); } } Modified: commons/proper/jcs/trunk/commons-jcs-core/src/main/java/org/apache/commons/jcs/engine/PooledCacheEventQueue.java URL: http://svn.apache.org/viewvc/commons/proper/jcs/trunk/commons-jcs-core/src/main/java/org/apache/commons/jcs/engine/PooledCacheEventQueue.java?rev=1774925&r1=1774924&r2=1774925&view=diff ============================================================================== --- commons/proper/jcs/trunk/commons-jcs-core/src/main/java/org/apache/commons/jcs/engine/PooledCacheEventQueue.java (original) +++ commons/proper/jcs/trunk/commons-jcs-core/src/main/java/org/apache/commons/jcs/engine/PooledCacheEventQueue.java Sun Dec 18 18:19:37 2016 @@ -1,5 +1,9 @@ package org.apache.commons.jcs.engine; +import java.util.ArrayList; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; + /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -28,10 +32,6 @@ import org.apache.commons.jcs.utils.thre import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import java.util.ArrayList; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; - /** * An event queue is used to propagate ordered cache events to one and only one target listener. * <p> @@ -80,29 +80,14 @@ public class PooledCacheEventQueue<K, V> * @param waitBeforeRetry * @param threadPoolName */ - @Override - public void initialize( ICacheListener<K, V> listener, long listenerId, String cacheName, int maxFailure, + protected void initialize( ICacheListener<K, V> listener, long listenerId, String cacheName, int maxFailure, int waitBeforeRetry, String threadPoolName ) { - if ( listener == null ) - { - throw new IllegalArgumentException( "listener must not be null" ); - } - - this.listener = listener; - this.listenerId = listenerId; - this.cacheName = cacheName; - this.maxFailure = maxFailure <= 0 ? 3 : maxFailure; - this.waitBeforeRetry = waitBeforeRetry <= 0 ? 500 : waitBeforeRetry; + super.initialize(listener, listenerId, cacheName, maxFailure, waitBeforeRetry); // this will share the same pool with other event queues by default. pool = ThreadPoolManager.getInstance().getPool( (threadPoolName == null) ? "cache_event_queue" : threadPoolName ); - - if ( log.isDebugEnabled() ) - { - log.debug( "Initialized: " + this ); - } } /** @@ -115,22 +100,14 @@ public class PooledCacheEventQueue<K, V> } /** - * Event Q is empty. - */ - public synchronized void stopProcessing() - { - destroyed = true; - } - - /** * Destroy the queue. Interrupt all threads. */ @Override public synchronized void destroy() { - if ( !destroyed ) + if ( isAlive() ) { - destroyed = true; + setAlive(false); pool.shutdownNow(); if ( log.isInfoEnabled() ) { @@ -151,14 +128,6 @@ public class PooledCacheEventQueue<K, V> } /** - * @return Statistics info - */ - public String getStats() - { - return getStatistics().toString(); - } - - /** * @return IStats */ @Override